From 237ab48d330653bb4572098b75b283e1d65a8398 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Fri, 14 Oct 2022 22:33:26 +0200 Subject: [PATCH] refactor: Restructure workers to work with services --- .../triggers/search-tweets/search-tweets.ts | 2 +- .../src/graphql/mutations/execute-flow.ts | 2 +- .../backend/src/helpers/global-variable.ts | 5 +- packages/backend/src/models/execution-step.ts | 4 ++ packages/backend/src/models/step.ts | 8 +++ packages/backend/src/services/test-run.ts | 22 +++--- packages/backend/src/services/trigger.ts | 2 +- packages/backend/src/workers/action.ts | 68 +++---------------- packages/backend/src/workers/flow.ts | 40 ++++++++--- packages/backend/src/workers/trigger.ts | 42 ++++-------- 10 files changed, 81 insertions(+), 114 deletions(-) diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index e0d25027..a9d38636 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -25,7 +25,7 @@ const searchTweets = async ( do { const params: IJSONObject = { query: options.searchTerm, - since_id: options.lastInternalId, + since_id: $.execution.testRun ? null : $.flow.lastInternalId, pagination_token: response?.data?.meta?.next_token, }; diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index c9ac3344..b35e4a82 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -23,7 +23,7 @@ const executeFlow = async ( status: 'completed', }); - if (executionStep.errorDetails) { + if (executionStep.isFailed) { throw new Error(JSON.stringify(executionStep.errorDetails)); } diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 934dbf94..e30d212e 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -22,10 +22,7 @@ const globalVariable = async ( const lastInternalId = await flow?.lastInternalId(); const trigger = await step?.getTriggerCommand(); - const nextStep = await flow - ?.$relatedQuery('steps') - .where({ position: step.position + 1 }) - .first(); + const nextStep = await step?.getNextStep(); const variable: IGlobalVariable = { auth: { diff --git a/packages/backend/src/models/execution-step.ts b/packages/backend/src/models/execution-step.ts index 2d01e7fb..e063d344 100644 --- a/packages/backend/src/models/execution-step.ts +++ b/packages/backend/src/models/execution-step.ts @@ -50,6 +50,10 @@ class ExecutionStep extends Base { }, }); + get isFailed() { + return this.status === 'failure'; + } + async $afterInsert(queryContext: QueryContext) { await super.$afterInsert(queryContext); Telemetry.executionStepCreated(this); diff --git a/packages/backend/src/models/step.ts b/packages/backend/src/models/step.ts index 78c0d31d..1885c7ad 100644 --- a/packages/backend/src/models/step.ts +++ b/packages/backend/src/models/step.ts @@ -102,6 +102,14 @@ class Step extends Base { return await App.findOneByKey(this.appKey); } + async getNextStep() { + const flow = await this.$relatedQuery('flow'); + + return await flow + .$relatedQuery('steps') + .findOne({ position: this.position + 1 }); + } + async getTriggerCommand() { const { appKey, key, isTrigger } = this; if (!isTrigger || !appKey || !key) return null; diff --git a/packages/backend/src/services/test-run.ts b/packages/backend/src/services/test-run.ts index debc011d..950aecc3 100644 --- a/packages/backend/src/services/test-run.ts +++ b/packages/backend/src/services/test-run.ts @@ -23,16 +23,6 @@ const testRun = async (options: TestRunOptions) => { testRun: true, }); - const firstTriggerDataItem = data[0]; - - const { executionId, executionStep: triggerExecutionStep } = - await processTrigger({ - flowId: flow.id, - stepId: triggerStep.id, - triggerDataItem: firstTriggerDataItem, - testRun: true, - }); - if (triggerError) { const { executionStep: triggerExecutionStepWithError } = await processTrigger({ @@ -45,6 +35,16 @@ const testRun = async (options: TestRunOptions) => { return { executionStep: triggerExecutionStepWithError }; } + const firstTriggerDataItem = data[0]; + + const { executionId, executionStep: triggerExecutionStep } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + triggerDataItem: firstTriggerDataItem, + testRun: true, + }); + if (triggerStep.id === untilStep.id) { return { executionStep: triggerExecutionStep }; } @@ -56,7 +56,7 @@ const testRun = async (options: TestRunOptions) => { executionId, }); - if (actionStep.id === untilStep.id || actionExecutionStep.errorDetails) { + if (actionStep.id === untilStep.id || actionExecutionStep.isFailed) { return { executionStep: actionExecutionStep }; } } diff --git a/packages/backend/src/services/trigger.ts b/packages/backend/src/services/trigger.ts index 61ee8c2e..5bf92185 100644 --- a/packages/backend/src/services/trigger.ts +++ b/packages/backend/src/services/trigger.ts @@ -29,7 +29,7 @@ export const processTrigger = async (options: ProcessTriggerOptions) => { const execution = await Execution.query().insert({ flowId: $.flow.id, testRun, - internalId: triggerDataItem.meta.internalId, + internalId: triggerDataItem?.meta.internalId, }); const executionStep = await execution diff --git a/packages/backend/src/workers/action.ts b/packages/backend/src/workers/action.ts index f1022e1a..75d9bd86 100644 --- a/packages/backend/src/workers/action.ts +++ b/packages/backend/src/workers/action.ts @@ -1,14 +1,9 @@ import { Worker } from 'bullmq'; import redisConfig from '../config/redis'; -import Flow from '../models/flow'; import logger from '../helpers/logger'; -import globalVariable from '../helpers/global-variable'; -import { IGlobalVariable } from '@automatisch/types'; -import Execution from '../models/execution'; -import Processor from '../services/processor'; -import ExecutionStep from '../models/execution-step'; import Step from '../models/step'; import actionQueue from '../queues/action'; +import { processAction } from '../services/action'; type JobData = { flowId: string; @@ -19,64 +14,21 @@ type JobData = { export const worker = new Worker( 'action', async (job) => { - const { flowId, stepId, executionId } = job.data as JobData; - - const step = await Step.query().findById(stepId).throwIfNotFound(); - const execution = await Execution.query() - .findById(executionId) - .throwIfNotFound(); - - const $ = await globalVariable({ - flow: await Flow.query().findById(flowId).throwIfNotFound(), - app: await step.getApp(), - step: step, - connection: await step.$relatedQuery('connection'), - execution: execution, - }); - - const priorExecutionSteps = await ExecutionStep.query().where({ - execution_id: $.execution.id, - }); - - const computedParameters = Processor.computeParameters( - $.step.parameters, - priorExecutionSteps + const { stepId, flowId, executionId } = await processAction( + job.data as JobData ); - const actionCommand = await step.getActionCommand(); + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); - $.step.parameters = computedParameters; - const actionDataItem = await actionCommand.run($); + if (!nextStep) return; - await execution.$relatedQuery('executionSteps').insertAndFetch({ - stepId: $.step.id, - status: 'success', - dataIn: computedParameters, - dataOut: actionDataItem.data.raw, - }); - - // TODO: Add until step id logic here! - // TODO: Change job name for the action data item! - const jobName = `${$.step.appKey}-sample`; - - if (!$.nextStep.id) return; - - const nextStep = await Step.query() - .findById($.nextStep.id) - .throwIfNotFound(); - - console.log('hello world'); - - const variable = await globalVariable({ - flow: await Flow.query().findById($.flow.id), - app: await nextStep.getApp(), - step: nextStep, - connection: await nextStep.$relatedQuery('connection'), - execution: execution, - }); + const jobName = `${executionId}-${nextStep.id}`; const jobPayload = { - $: variable, + flowId, + executionId, + stepId: nextStep.id, }; await actionQueue.add(jobName, jobPayload); diff --git a/packages/backend/src/workers/flow.ts b/packages/backend/src/workers/flow.ts index cd315fa3..917583cf 100644 --- a/packages/backend/src/workers/flow.ts +++ b/packages/backend/src/workers/flow.ts @@ -1,25 +1,43 @@ import { Worker } from 'bullmq'; import redisConfig from '../config/redis'; -import Flow from '../models/flow'; import logger from '../helpers/logger'; -import globalVariable from '../helpers/global-variable'; +import triggerQueue from '../queues/trigger'; +import { processFlow } from '../services/flow'; +import Flow from '../models/flow'; export const worker = new Worker( 'flow', async (job) => { - const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound(); + const { flowId } = job.data; + const flow = await Flow.query().findById(flowId).throwIfNotFound(); const triggerStep = await flow.getTriggerStep(); - const triggerCommand = await triggerStep.getTriggerCommand(); - const $ = await globalVariable({ - flow, - connection: await triggerStep.$relatedQuery('connection'), - app: await triggerStep.getApp(), - step: triggerStep, - }); + const { data, error } = await processFlow({ flowId }); - await triggerCommand.run($); + for (const triggerDataItem of data) { + const jobName = `${triggerStep.id}-${triggerDataItem.meta.internalId}`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + triggerDataItem, + }; + + await triggerQueue.add(jobName, jobPayload); + } + + if (error) { + const jobName = `${triggerStep.id}-error`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + error, + }; + + await triggerQueue.add(jobName, jobPayload); + } }, { connection: redisConfig } ); diff --git a/packages/backend/src/workers/trigger.ts b/packages/backend/src/workers/trigger.ts index 29d07fde..f82f568d 100644 --- a/packages/backend/src/workers/trigger.ts +++ b/packages/backend/src/workers/trigger.ts @@ -1,46 +1,34 @@ import { Worker } from 'bullmq'; import redisConfig from '../config/redis'; -import Flow from '../models/flow'; import logger from '../helpers/logger'; -import globalVariable from '../helpers/global-variable'; -import { ITriggerDataItem, IGlobalVariable } from '@automatisch/types'; -import Execution from '../models/execution'; +import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; import actionQueue from '../queues/action'; import Step from '../models/step'; +import { processTrigger } from '../services/trigger'; type JobData = { - $: IGlobalVariable; - triggerDataItem: ITriggerDataItem; + flowId: string; + stepId: string; + triggerDataItem?: ITriggerDataItem; + error?: IJSONObject; }; export const worker = new Worker( 'trigger', async (job) => { - const { $, triggerDataItem } = job.data as JobData; + const { flowId, executionId, stepId, executionStep } = await processTrigger( + job.data as JobData + ); - // check if we already process this trigger data item or not! + if (executionStep.isFailed) return; - const execution = await Execution.query().insert({ - flowId: $.flow.id, - // TODO: Check the testRun logic and adjust following line! - testRun: true, - internalId: triggerDataItem.meta.internalId, - }); - - await execution.$relatedQuery('executionSteps').insertAndFetch({ - stepId: $.step.id, - status: 'success', - dataIn: $.step.parameters, - dataOut: triggerDataItem.raw, - }); - - const jobName = `${$.step.appKey}-${triggerDataItem.meta.internalId}`; - - const nextStep = await Step.query().findById($.nextStep.id); + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); + const jobName = `${executionId}-${nextStep.id}`; const jobPayload = { - flowId: $.flow.id, - executionId: execution.id, + flowId, + executionId, stepId: nextStep.id, };