From 3f7a888429e1be3de2f8e89311fc8985dbb7fc74 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Sat, 12 Mar 2022 14:20:45 +0300 Subject: [PATCH] fix: Make processor to work with multiple trigger objects --- .../src/apps/twitter/triggers/my-tweet.ts | 2 +- .../src/graphql/mutations/execute-flow.ts | 2 +- packages/backend/src/services/processor.ts | 130 ++++++++++++------ 3 files changed, 89 insertions(+), 45 deletions(-) diff --git a/packages/backend/src/apps/twitter/triggers/my-tweet.ts b/packages/backend/src/apps/twitter/triggers/my-tweet.ts index 095fc04d..db129064 100644 --- a/packages/backend/src/apps/twitter/triggers/my-tweet.ts +++ b/packages/backend/src/apps/twitter/triggers/my-tweet.ts @@ -20,6 +20,6 @@ export default class MyTweet { const userTimeline = await this.client.v1.userTimelineByUsername(username); const fetchedTweets = userTimeline.tweets; - return fetchedTweets[0]; + return fetchedTweets; } } diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index f9adaf49..fc6fc9dc 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -23,7 +23,7 @@ const executeFlow = async ( .throwIfNotFound(); const flow = await step.$relatedQuery('flow'); - const data = await new Processor(flow, step).run(); + const data = await new Processor(flow, step, { testRun: true }).run(); // TODO: Use this snippet to execute flows with the background job. // const data = processorQueue.add('processorJob', { diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index 019b06f0..2e0af63c 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -10,12 +10,14 @@ type ExecutionSteps = Record; class Processor { flow: Flow; untilStep: Step; + testRun: boolean; static variableRegExp = /({{step\..+\..+}})/g; - constructor(flow: Flow, untilStep: Step) { + constructor(flow: Flow, untilStep: Step, { testRun = false }) { this.flow = flow; this.untilStep = untilStep; + this.testRun = testRun; } async run() { @@ -24,56 +26,98 @@ class Processor { .withGraphFetched('connection') .orderBy('position', 'asc'); - const execution = await Execution.query().insert({ - flowId: this.flow.id, - testRun: true, - }); + const triggerStep = steps.find((step) => step.type === 'trigger'); + let initialTriggerData = await this.getInitialTriggerData(triggerStep); - let previousExecutionStep: ExecutionStep; - let fetchedData; - const priorExecutionSteps: ExecutionSteps = {}; + if (this.testRun) { + initialTriggerData = [initialTriggerData[0]]; + } - for await (const step of steps) { - const appData = App.findOneByKey(step.appKey); - const { - appKey, - connection, - key, - type, - parameters: rawParameters = {}, - id, - } = step; - const isTrigger = type === 'trigger'; - const AppClass = (await import(`../apps/${appKey}`)).default; - const computedParameters = Processor.computeParameters( - rawParameters, - priorExecutionSteps - ); - const appInstance = new AppClass( - appData, - connection.formattedData, - computedParameters - ); - const commands = isTrigger ? appInstance.triggers : appInstance.actions; - const command = commands[key]; - fetchedData = await command.run(); + const executions: Execution[] = []; - previousExecutionStep = await execution - .$relatedQuery('executionSteps') - .insertAndFetch({ - stepId: id, - status: 'success', - dataIn: previousExecutionStep?.dataOut, - dataOut: fetchedData, - }); + for await (const data of initialTriggerData) { + const execution = await Execution.query().insert({ + flowId: this.flow.id, + testRun: this.testRun, + }); - priorExecutionSteps[id] = previousExecutionStep; + executions.push(execution); - if (id === this.untilStep.id) { - return fetchedData; + let previousExecutionStep: ExecutionStep; + const priorExecutionSteps: ExecutionSteps = {}; + let fetchedActionData = {}; + + for await (const step of steps) { + const appData = App.findOneByKey(step.appKey); + + const { + appKey, + connection, + key, + type, + parameters: rawParameters = {}, + id, + } = step; + + const isTrigger = type === 'trigger'; + const AppClass = (await import(`../apps/${appKey}`)).default; + + const computedParameters = Processor.computeParameters( + rawParameters, + priorExecutionSteps + ); + + const appInstance = new AppClass( + appData, + connection.formattedData, + computedParameters + ); + + if (!isTrigger) { + const command = appInstance.actions[key]; + fetchedActionData = await command.run(); + } + + previousExecutionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: id, + status: 'success', + dataIn: isTrigger ? rawParameters : previousExecutionStep?.dataOut, + dataOut: isTrigger ? data : fetchedActionData, + }); + + priorExecutionSteps[id] = previousExecutionStep; + + if (id === this.untilStep.id) { + break; + } } } + if (!this.testRun) return; + + const lastExecutionStepFromFirstExecution = await executions[0] + .$relatedQuery('executionSteps') + .orderBy('created_at', 'desc') + .first(); + + return lastExecutionStepFromFirstExecution.dataOut; + } + + async getInitialTriggerData(step: Step) { + const appData = App.findOneByKey(step.appKey); + const { appKey, connection, key, parameters: rawParameters = {} } = step; + + const AppClass = (await import(`../apps/${appKey}`)).default; + const appInstance = new AppClass( + appData, + connection.formattedData, + rawParameters + ); + + const command = appInstance.triggers[key]; + const fetchedData = await command.run(); return fetchedData; }