diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts index 67f4e5a6..2eb450aa 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts @@ -33,7 +33,6 @@ export default { async run($: IGlobalVariable) { return await searchTweets($, { searchTerm: $.step.parameters.searchTerm as string, - lastInternalId: $.flow.lastInternalId, }); }, diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index 1c8125d7..a9d38636 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -25,7 +25,7 @@ const searchTweets = async ( do { const params: IJSONObject = { query: options.searchTerm, - since_id: options.lastInternalId, + since_id: $.execution.testRun ? null : $.flow.lastInternalId, pagination_token: response?.data?.meta?.next_token, }; @@ -62,7 +62,7 @@ const searchTweets = async ( tweets.data.push(dataItem); }); } - } while (response.data.meta.next_token && options.lastInternalId); + } while (response.data.meta.next_token && !$.execution.testRun); tweets.data.sort((tweet, nextTweet) => { return (tweet.raw.id as number) - (nextTweet.raw.id as number); diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index 063259af..b35e4a82 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -1,7 +1,5 @@ import Context from '../../types/express/context'; -import Processor from '../../services/processor'; -// eslint-disable-next-line @typescript-eslint/no-unused-vars -import processorQueue from '../../queues/processor'; +import testRun from '../../services/test-run'; type Params = { input: { @@ -14,26 +12,18 @@ const executeFlow = async ( params: Params, context: Context ) => { + const { stepId } = params.input; + const { executionStep } = await testRun({ stepId }); + const untilStep = await context.currentUser .$relatedQuery('steps') - .withGraphFetched('connection') - .findOne({ - 'steps.id': params.input.stepId, - }) - .throwIfNotFound(); - - const flow = await untilStep.$relatedQuery('flow'); - - const executionStep = await new Processor(flow, { - untilStep, - testRun: true, - }).run(); + .findById(stepId); await untilStep.$query().patch({ status: 'completed', }); - if (executionStep.errorDetails) { + if (executionStep.isFailed) { throw new Error(JSON.stringify(executionStep.errorDetails)); } diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 97e6a69e..a81b31e0 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -1,5 +1,5 @@ import Context from '../../types/express/context'; -import processorQueue from '../../queues/processor'; +import flowQueue from '../../queues/flow'; type Params = { input: { @@ -8,7 +8,7 @@ type Params = { }; }; -const JOB_NAME = 'processorJob'; +const JOB_NAME = 'flow'; const EVERY_15_MINUTES_CRON = '*/15 * * * *'; const updateFlowStatus = async ( @@ -32,7 +32,7 @@ const updateFlowStatus = async ( }); const triggerStep = await flow.getTriggerStep(); - const trigger = await triggerStep.getTrigger(); + const trigger = await triggerStep.getTriggerCommand(); const interval = trigger.getInterval?.(triggerStep.parameters); const repeatOptions = { cron: interval || EVERY_15_MINUTES_CRON, @@ -43,8 +43,10 @@ const updateFlowStatus = async ( published_at: new Date().toISOString(), }); - await processorQueue.add( - JOB_NAME, + const jobName = `${JOB_NAME}-${flow.id}`; + + await flowQueue.add( + jobName, { flowId: flow.id }, { repeat: repeatOptions, @@ -52,10 +54,10 @@ const updateFlowStatus = async ( } ); } else { - const repeatableJobs = await processorQueue.getRepeatableJobs(); + const repeatableJobs = await flowQueue.getRepeatableJobs(); const job = repeatableJobs.find((job) => job.id === flow.id); - await processorQueue.removeRepeatableByKey(job.key); + await flowQueue.removeRepeatableByKey(job.key); } return flow; diff --git a/packages/backend/src/helpers/compute-parameters.ts b/packages/backend/src/helpers/compute-parameters.ts new file mode 100644 index 00000000..c37328bd --- /dev/null +++ b/packages/backend/src/helpers/compute-parameters.ts @@ -0,0 +1,43 @@ +import Step from '../models/step'; +import ExecutionStep from '../models/execution-step'; +import get from 'lodash.get'; + +const variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g; + +export default function computeParameters( + parameters: Step['parameters'], + executionSteps: ExecutionStep[] +): Step['parameters'] { + const entries = Object.entries(parameters); + return entries.reduce((result, [key, value]: [string, unknown]) => { + if (typeof value === 'string') { + const parts = value.split(variableRegExp); + + const computedValue = parts + .map((part: string) => { + const isVariable = part.match(variableRegExp); + if (isVariable) { + const stepIdAndKeyPath = part.replace(/{{step.|}}/g, '') as string; + const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); + const keyPath = keyPaths.join('.'); + const executionStep = executionSteps.find((executionStep) => { + return executionStep.stepId === stepId; + }); + const data = executionStep?.dataOut; + const dataValue = get(data, keyPath); + return dataValue; + } + + return part; + }) + .join(''); + + return { + ...result, + [key]: computedValue, + }; + } + + return result; + }, {}); +} diff --git a/packages/backend/src/helpers/create-bull-board-handler.ts b/packages/backend/src/helpers/create-bull-board-handler.ts index 7525aaa9..c74199e7 100644 --- a/packages/backend/src/helpers/create-bull-board-handler.ts +++ b/packages/backend/src/helpers/create-bull-board-handler.ts @@ -1,13 +1,19 @@ import { ExpressAdapter } from '@bull-board/express'; import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; -import processorQueue from '../queues/processor'; +import flowQueue from '../queues/flow'; +import triggerQueue from '../queues/trigger'; +import actionQueue from '../queues/action'; const serverAdapter = new ExpressAdapter(); const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => { createBullBoard({ - queues: [new BullMQAdapter(processorQueue)], + queues: [ + new BullMQAdapter(flowQueue), + new BullMQAdapter(triggerQueue), + new BullMQAdapter(actionQueue), + ], serverAdapter: serverAdapter, }); }; diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 7077d914..e30d212e 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -2,6 +2,7 @@ import createHttpClient from './http-client'; import Connection from '../models/connection'; import Flow from '../models/flow'; import Step from '../models/step'; +import Execution from '../models/execution'; import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types'; type GlobalVariableOptions = { @@ -9,16 +10,21 @@ type GlobalVariableOptions = { app: IApp; flow?: Flow; step?: Step; + execution?: Execution; + testRun?: boolean; }; const globalVariable = async ( options: GlobalVariableOptions ): Promise => { - const { connection, app, flow, step } = options; + const { connection, app, flow, step, execution, testRun = false } = options; const lastInternalId = await flow?.lastInternalId(); - return { + const trigger = await step?.getTriggerCommand(); + const nextStep = await step?.getNextStep(); + + const variable: IGlobalVariable = { auth: { set: async (args: IJSONObject) => { if (connection) { @@ -37,12 +43,36 @@ const globalVariable = async ( app: app, http: createHttpClient({ baseURL: app.baseUrl }), flow: { + id: flow?.id, lastInternalId, }, step: { + id: step?.id, + appKey: step?.appKey, parameters: step?.parameters || {}, }, + nextStep: { + id: nextStep?.id, + appKey: nextStep?.appKey, + parameters: nextStep?.parameters || {}, + }, + execution: { + id: execution?.id, + testRun, + }, }; + + if (trigger && trigger.dedupeStrategy === 'unique') { + const lastInternalIds = await flow?.lastInternalIds(); + + const isAlreadyProcessed = (internalId: string) => { + return lastInternalIds?.includes(internalId); + }; + + variable.flow.isAlreadyProcessed = isAlreadyProcessed; + } + + return variable; }; export default globalVariable; diff --git a/packages/backend/src/models/execution-step.ts b/packages/backend/src/models/execution-step.ts index 2d01e7fb..e063d344 100644 --- a/packages/backend/src/models/execution-step.ts +++ b/packages/backend/src/models/execution-step.ts @@ -50,6 +50,10 @@ class ExecutionStep extends Base { }, }); + get isFailed() { + return this.status === 'failure'; + } + async $afterInsert(queryContext: QueryContext) { await super.$afterInsert(queryContext); Telemetry.executionStepCreated(this); diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index db536431..fea848e1 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -12,6 +12,7 @@ class Flow extends Base { active: boolean; steps: Step[]; published_at: string; + executions?: Execution[]; static tableName = 'flows'; @@ -57,6 +58,15 @@ class Flow extends Base { return lastExecution ? (lastExecution as Execution).internalId : null; } + async lastInternalIds(itemCount = 50) { + const lastExecutions = await this.$relatedQuery('executions') + .select('internal_id') + .orderBy('created_at', 'desc') + .limit(itemCount); + + return lastExecutions.map((execution) => execution.internalId); + } + async $beforeUpdate( opt: ModelOptions, queryContext: QueryContext diff --git a/packages/backend/src/models/step.ts b/packages/backend/src/models/step.ts index b69f3d20..1885c7ad 100644 --- a/packages/backend/src/models/step.ts +++ b/packages/backend/src/models/step.ts @@ -92,16 +92,43 @@ class Step extends Base { return this.type === 'trigger'; } - async getTrigger() { - if (!this.isTrigger) return null; + get isAction(): boolean { + return this.type === 'action'; + } - const { appKey, key } = this; + async getApp() { + if (!this.appKey) return null; + + return await App.findOneByKey(this.appKey); + } + + async getNextStep() { + const flow = await this.$relatedQuery('flow'); + + return await flow + .$relatedQuery('steps') + .findOne({ position: this.position + 1 }); + } + + async getTriggerCommand() { + const { appKey, key, isTrigger } = this; + if (!isTrigger || !appKey || !key) return null; const app = await App.findOneByKey(appKey); const command = app.triggers.find((trigger) => trigger.key === key); return command; } + + async getActionCommand() { + const { appKey, key, isAction } = this; + if (!isAction || !appKey || !key) return null; + + const app = await App.findOneByKey(appKey); + const command = app.actions.find((action) => action.key === key); + + return command; + } } export default Step; diff --git a/packages/backend/src/queues/action.ts b/packages/backend/src/queues/action.ts new file mode 100644 index 00000000..44d8b023 --- /dev/null +++ b/packages/backend/src/queues/action.ts @@ -0,0 +1,25 @@ +import process from 'process'; +import { Queue } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; + +const CONNECTION_REFUSED = 'ECONNREFUSED'; + +const redisConnection = { + connection: redisConfig, +}; + +const actionQueue = new Queue('action', redisConnection); + +process.on('SIGTERM', async () => { + await actionQueue.close(); +}); + +actionQueue.on('error', (err) => { + if ((err as any).code === CONNECTION_REFUSED) { + logger.error('Make sure you have installed Redis and it is running.', err); + process.exit(); + } +}); + +export default actionQueue; diff --git a/packages/backend/src/queues/processor.ts b/packages/backend/src/queues/flow.ts similarity index 70% rename from packages/backend/src/queues/processor.ts rename to packages/backend/src/queues/flow.ts index b9b13c37..9353d72e 100644 --- a/packages/backend/src/queues/processor.ts +++ b/packages/backend/src/queues/flow.ts @@ -9,18 +9,18 @@ const redisConnection = { connection: redisConfig, }; -const processorQueue = new Queue('processor', redisConnection); -const queueScheduler = new QueueScheduler('processor', redisConnection); +const flowQueue = new Queue('flow', redisConnection); +const queueScheduler = new QueueScheduler('flow', redisConnection); process.on('SIGTERM', async () => { await queueScheduler.close(); }); -processorQueue.on('error', (err) => { +flowQueue.on('error', (err) => { if ((err as any).code === CONNECTION_REFUSED) { logger.error('Make sure you have installed Redis and it is running.', err); process.exit(); } }); -export default processorQueue; +export default flowQueue; diff --git a/packages/backend/src/queues/trigger.ts b/packages/backend/src/queues/trigger.ts new file mode 100644 index 00000000..4535deb1 --- /dev/null +++ b/packages/backend/src/queues/trigger.ts @@ -0,0 +1,25 @@ +import process from 'process'; +import { Queue } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; + +const CONNECTION_REFUSED = 'ECONNREFUSED'; + +const redisConnection = { + connection: redisConfig, +}; + +const triggerQueue = new Queue('trigger', redisConnection); + +process.on('SIGTERM', async () => { + await triggerQueue.close(); +}); + +triggerQueue.on('error', (err) => { + if ((err as any).code === CONNECTION_REFUSED) { + logger.error('Make sure you have installed Redis and it is running.', err); + process.exit(); + } +}); + +export default triggerQueue; diff --git a/packages/backend/src/services/action.ts b/packages/backend/src/services/action.ts new file mode 100644 index 00000000..bbac7990 --- /dev/null +++ b/packages/backend/src/services/action.ts @@ -0,0 +1,55 @@ +import Step from '../models/step'; +import Flow from '../models/flow'; +import Execution from '../models/execution'; +import ExecutionStep from '../models/execution-step'; +import computeParameters from '../helpers/compute-parameters'; +import globalVariable from '../helpers/global-variable'; + +type ProcessActionOptions = { + flowId: string; + executionId: string; + stepId: string; +}; + +export const processAction = async (options: ProcessActionOptions) => { + const { flowId, stepId, executionId } = options; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const execution = await Execution.query() + .findById(executionId) + .throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + execution: execution, + }); + + const priorExecutionSteps = await ExecutionStep.query().where({ + execution_id: $.execution.id, + }); + + const computedParameters = computeParameters( + $.step.parameters, + priorExecutionSteps + ); + + const actionCommand = await step.getActionCommand(); + + $.step.parameters = computedParameters; + const actionOutput = await actionCommand.run($); + + const executionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: $.step.id, + status: actionOutput.error ? 'failure' : 'success', + dataIn: computedParameters, + dataOut: actionOutput.error ? null : actionOutput.data.raw, + errorDetails: actionOutput.error, + }); + + return { flowId, stepId, executionId, executionStep }; +}; diff --git a/packages/backend/src/services/flow.ts b/packages/backend/src/services/flow.ts new file mode 100644 index 00000000..c11473a2 --- /dev/null +++ b/packages/backend/src/services/flow.ts @@ -0,0 +1,24 @@ +import Flow from '../models/flow'; +import globalVariable from '../helpers/global-variable'; + +type ProcessFlowOptions = { + flowId: string; + testRun?: boolean; +}; + +export const processFlow = async (options: ProcessFlowOptions) => { + const flow = await Flow.query().findById(options.flowId).throwIfNotFound(); + + const triggerStep = await flow.getTriggerStep(); + const triggerCommand = await triggerStep.getTriggerCommand(); + + const $ = await globalVariable({ + flow, + connection: await triggerStep.$relatedQuery('connection'), + app: await triggerStep.getApp(), + step: triggerStep, + testRun: options.testRun, + }); + + return await triggerCommand.run($); +}; diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts deleted file mode 100644 index 0656cceb..00000000 --- a/packages/backend/src/services/processor.ts +++ /dev/null @@ -1,227 +0,0 @@ -import get from 'lodash.get'; -import { IActionOutput } from '@automatisch/types'; - -import App from '../models/app'; -import Flow from '../models/flow'; -import Step from '../models/step'; -import Execution from '../models/execution'; -import ExecutionStep from '../models/execution-step'; -import globalVariable from '../helpers/global-variable'; - -type ExecutionSteps = Record; - -type ProcessorOptions = { - untilStep?: Step; - testRun?: boolean; -}; - -class Processor { - flow: Flow; - untilStep?: Step; - testRun?: boolean; - - static variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g; - - constructor(flow: Flow, processorOptions: ProcessorOptions) { - this.flow = flow; - this.untilStep = processorOptions.untilStep; - this.testRun = processorOptions.testRun; - } - - async run() { - const steps = await this.flow - .$relatedQuery('steps') - .withGraphFetched('connection') - .orderBy('position', 'asc'); - - const triggerStep = steps.find((step) => step.type === 'trigger'); - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const initialTriggerData = await this.getInitialTriggerData(triggerStep!); - - if (!initialTriggerData.error && initialTriggerData.data.length === 0) { - const lastInternalId = await this.flow.lastInternalId(); - - const executionData: Partial = { - flowId: this.flow.id, - testRun: this.testRun, - }; - - if (lastInternalId) { - executionData.internalId = lastInternalId; - } - - await Execution.query().insert(executionData); - - return; - } - - if (this.testRun && initialTriggerData.data.length > 0) { - initialTriggerData.data = [initialTriggerData.data[0]]; - } - - const executions: Execution[] = []; - - for await (const data of initialTriggerData.data) { - const execution = await Execution.query().insert({ - flowId: this.flow.id, - testRun: this.testRun, - internalId: data.meta.internalId as string, - }); - - executions.push(execution); - - let previousExecutionStep: ExecutionStep; - const priorExecutionSteps: ExecutionSteps = {}; - - let fetchedActionData: IActionOutput = { - data: null, - }; - - for await (const step of steps) { - if (!step.appKey) continue; - - const { appKey, key, type, parameters: rawParameters = {}, id } = step; - - const isTrigger = type === 'trigger'; - const app = await App.findOneByKey(appKey); - - const computedParameters = Processor.computeParameters( - rawParameters, - priorExecutionSteps - ); - - const clonedStep = Object.assign({}, step); - clonedStep.parameters = computedParameters; - - const $ = await globalVariable({ - connection: step.connection, - app, - flow: this.flow, - step: clonedStep, - }); - - if (!isTrigger && key) { - const command = app.actions.find((action) => action.key === key); - fetchedActionData = await command.run($); - } - - if (!isTrigger && fetchedActionData.error) { - await execution.$relatedQuery('executionSteps').insertAndFetch({ - stepId: id, - status: 'failure', - dataIn: null, - dataOut: computedParameters, - errorDetails: fetchedActionData.error, - }); - - break; - } - - previousExecutionStep = await execution - .$relatedQuery('executionSteps') - .insertAndFetch({ - stepId: id, - status: 'success', - dataIn: isTrigger ? rawParameters : computedParameters, - dataOut: isTrigger ? data.raw : fetchedActionData.data.raw, - }); - - priorExecutionSteps[id] = previousExecutionStep; - - if (id === this.untilStep?.id) { - break; - } - } - } - - if (initialTriggerData.error) { - const executionWithError = await Execution.query().insert({ - flowId: this.flow.id, - testRun: this.testRun, - }); - - executions.push(executionWithError); - - await executionWithError.$relatedQuery('executionSteps').insertAndFetch({ - stepId: triggerStep.id, - status: 'failure', - dataIn: triggerStep.parameters, - errorDetails: initialTriggerData.error, - }); - } - - if (!this.testRun) return; - - const lastExecutionStepFromFirstExecution = await executions[0] - .$relatedQuery('executionSteps') - .orderBy('created_at', 'desc') - .first(); - - return lastExecutionStepFromFirstExecution; - } - - async getInitialTriggerData(step: Step) { - if (!step.appKey || !step.key) return null; - - const app = await App.findOneByKey(step.appKey); - const $ = await globalVariable({ - connection: step.connection, - app, - flow: this.flow, - step, - }); - - const command = app.triggers.find((trigger) => trigger.key === step.key); - - let fetchedData; - - if (this.testRun) { - fetchedData = await command.testRun($); - } else { - fetchedData = await command.run($); - } - - return fetchedData; - } - - static computeParameters( - parameters: Step['parameters'], - executionSteps: ExecutionSteps - ): Step['parameters'] { - const entries = Object.entries(parameters); - return entries.reduce((result, [key, value]: [string, unknown]) => { - if (typeof value === 'string') { - const parts = value.split(Processor.variableRegExp); - - const computedValue = parts - .map((part: string) => { - const isVariable = part.match(Processor.variableRegExp); - if (isVariable) { - const stepIdAndKeyPath = part.replace( - /{{step.|}}/g, - '' - ) as string; - const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); - const keyPath = keyPaths.join('.'); - const executionStep = executionSteps[stepId.toString() as string]; - const data = executionStep?.dataOut; - const dataValue = get(data, keyPath); - return dataValue; - } - - return part; - }) - .join(''); - - return { - ...result, - [key]: computedValue, - }; - } - - return result; - }, {}); - } -} - -export default Processor; diff --git a/packages/backend/src/services/test-run.ts b/packages/backend/src/services/test-run.ts new file mode 100644 index 00000000..950aecc3 --- /dev/null +++ b/packages/backend/src/services/test-run.ts @@ -0,0 +1,65 @@ +import Step from '../models/step'; +import { processFlow } from '../services/flow'; +import { processTrigger } from '../services/trigger'; +import { processAction } from '../services/action'; + +type TestRunOptions = { + stepId: string; +}; + +const testRun = async (options: TestRunOptions) => { + const untilStep = await Step.query() + .findById(options.stepId) + .throwIfNotFound(); + + const flow = await untilStep.$relatedQuery('flow'); + const [triggerStep, ...actionSteps] = await flow + .$relatedQuery('steps') + .withGraphFetched('connection') + .orderBy('position', 'asc'); + + const { data, error: triggerError } = await processFlow({ + flowId: flow.id, + testRun: true, + }); + + if (triggerError) { + const { executionStep: triggerExecutionStepWithError } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + error: triggerError, + testRun: true, + }); + + return { executionStep: triggerExecutionStepWithError }; + } + + const firstTriggerDataItem = data[0]; + + const { executionId, executionStep: triggerExecutionStep } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + triggerDataItem: firstTriggerDataItem, + testRun: true, + }); + + if (triggerStep.id === untilStep.id) { + return { executionStep: triggerExecutionStep }; + } + + for (const actionStep of actionSteps) { + const { executionStep: actionExecutionStep } = await processAction({ + flowId: flow.id, + stepId: actionStep.id, + executionId, + }); + + if (actionStep.id === untilStep.id || actionExecutionStep.isFailed) { + return { executionStep: actionExecutionStep }; + } + } +}; + +export default testRun; diff --git a/packages/backend/src/services/trigger.ts b/packages/backend/src/services/trigger.ts new file mode 100644 index 00000000..5bf92185 --- /dev/null +++ b/packages/backend/src/services/trigger.ts @@ -0,0 +1,46 @@ +import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; +import Step from '../models/step'; +import Flow from '../models/flow'; +import Execution from '../models/execution'; +import globalVariable from '../helpers/global-variable'; + +type ProcessTriggerOptions = { + flowId: string; + stepId: string; + triggerDataItem?: ITriggerDataItem; + error?: IJSONObject; + testRun?: boolean; +}; + +export const processTrigger = async (options: ProcessTriggerOptions) => { + const { flowId, stepId, triggerDataItem, error, testRun } = options; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + }); + + // check if we already process this trigger data item or not! + + const execution = await Execution.query().insert({ + flowId: $.flow.id, + testRun, + internalId: triggerDataItem?.meta.internalId, + }); + + const executionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: $.step.id, + status: error ? 'failure' : 'success', + dataIn: $.step.parameters, + dataOut: !error ? triggerDataItem.raw : null, + errorDetails: error, + }); + + return { flowId, stepId, executionId: execution.id, executionStep }; +}; diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index 242562f0..a69b069b 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -1,5 +1,7 @@ import './config/orm'; -export { worker } from './workers/processor'; +import './workers/flow'; +import './workers/trigger'; +import './workers/action'; import telemetry from './helpers/telemetry'; telemetry.setServiceType('worker'); diff --git a/packages/backend/src/workers/action.ts b/packages/backend/src/workers/action.ts new file mode 100644 index 00000000..75d9bd86 --- /dev/null +++ b/packages/backend/src/workers/action.ts @@ -0,0 +1,51 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; +import Step from '../models/step'; +import actionQueue from '../queues/action'; +import { processAction } from '../services/action'; + +type JobData = { + flowId: string; + executionId: string; + stepId: string; +}; + +export const worker = new Worker( + 'action', + async (job) => { + const { stepId, flowId, executionId } = await processAction( + job.data as JobData + ); + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); + + if (!nextStep) return; + + const jobName = `${executionId}-${nextStep.id}`; + + const jobPayload = { + flowId, + executionId, + stepId: nextStep.id, + }; + + await actionQueue.add(jobName, jobPayload); + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed22 to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/backend/src/workers/flow.ts b/packages/backend/src/workers/flow.ts new file mode 100644 index 00000000..917583cf --- /dev/null +++ b/packages/backend/src/workers/flow.ts @@ -0,0 +1,57 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; +import triggerQueue from '../queues/trigger'; +import { processFlow } from '../services/flow'; +import Flow from '../models/flow'; + +export const worker = new Worker( + 'flow', + async (job) => { + const { flowId } = job.data; + + const flow = await Flow.query().findById(flowId).throwIfNotFound(); + const triggerStep = await flow.getTriggerStep(); + + const { data, error } = await processFlow({ flowId }); + + for (const triggerDataItem of data) { + const jobName = `${triggerStep.id}-${triggerDataItem.meta.internalId}`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + triggerDataItem, + }; + + await triggerQueue.add(jobName, jobPayload); + } + + if (error) { + const jobName = `${triggerStep.id}-error`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + error, + }; + + await triggerQueue.add(jobName, jobPayload); + } + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/backend/src/workers/processor.ts b/packages/backend/src/workers/processor.ts deleted file mode 100644 index 17c24e52..00000000 --- a/packages/backend/src/workers/processor.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { Worker } from 'bullmq'; -import Processor from '../services/processor'; -import redisConfig from '../config/redis'; -import Flow from '../models/flow'; -import logger from '../helpers/logger'; - -export const worker = new Worker( - 'processor', - async (job) => { - const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound(); - const data = await new Processor(flow, { testRun: false }).run(); - - return data; - }, - { connection: redisConfig } -); - -worker.on('completed', (job) => { - logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has completed!`); -}); - -worker.on('failed', (job, err) => { - logger.info( - `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed with ${err.message}` - ); -}); - -process.on('SIGTERM', async () => { - await worker.close(); -}); diff --git a/packages/backend/src/workers/trigger.ts b/packages/backend/src/workers/trigger.ts new file mode 100644 index 00000000..f82f568d --- /dev/null +++ b/packages/backend/src/workers/trigger.ts @@ -0,0 +1,52 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; +import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; +import actionQueue from '../queues/action'; +import Step from '../models/step'; +import { processTrigger } from '../services/trigger'; + +type JobData = { + flowId: string; + stepId: string; + triggerDataItem?: ITriggerDataItem; + error?: IJSONObject; +}; + +export const worker = new Worker( + 'trigger', + async (job) => { + const { flowId, executionId, stepId, executionStep } = await processTrigger( + job.data as JobData + ); + + if (executionStep.isFailed) return; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); + const jobName = `${executionId}-${nextStep.id}`; + + const jobPayload = { + flowId, + executionId, + stepId: nextStep.id, + }; + + await actionQueue.add(jobName, jobPayload); + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 8925fa8d..61452076 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -204,6 +204,7 @@ export interface ITrigger { key: string; pollInterval: number; description: string; + dedupeStrategy: 'greatest' | 'unique' | 'last'; substeps: ISubstep[]; getInterval(parameters: IGlobalVariable['step']['parameters']): string; run($: IGlobalVariable): Promise; @@ -252,12 +253,26 @@ export type IGlobalVariable = { }; app: IApp; http: IHttpClient; - flow: { + flow?: { + id: string; lastInternalId: string; + isAlreadyProcessed?: (internalId: string) => boolean; }; - step: { + step?: { + id: string; + appKey: string; parameters: IJSONObject; }; + nextStep?: { + id: string; + appKey: string; + parameters: IJSONObject; + }; + execution?: { + id: string; + testRun: boolean; + } + process?: (triggerDataItem: ITriggerDataItem) => Promise; }; declare module 'axios' {