From 628f8721809b57ab16217f197e4a052075198319 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Fri, 14 Oct 2022 20:18:58 +0200 Subject: [PATCH] refactor: Implement test run helper to work with services --- .../triggers/search-tweets/search-tweets.ts | 6 +- .../src/graphql/mutations/execute-flow.ts | 38 +++++----- .../graphql/mutations/update-flow-status.ts | 8 +- .../backend/src/helpers/compute-parameters.ts | 43 +++++++++++ .../backend/src/helpers/global-variable.ts | 22 +----- packages/backend/src/services/action.ts | 55 ++++++++++++++ packages/backend/src/services/flow.ts | 24 ++++++ packages/backend/src/services/processor.ts | 73 ------------------- packages/backend/src/services/test-run.ts | 65 +++++++++++++++++ packages/backend/src/services/trigger.ts | 46 ++++++++++++ packages/types/index.d.ts | 1 + 11 files changed, 262 insertions(+), 119 deletions(-) create mode 100644 packages/backend/src/helpers/compute-parameters.ts create mode 100644 packages/backend/src/services/action.ts create mode 100644 packages/backend/src/services/flow.ts delete mode 100644 packages/backend/src/services/processor.ts create mode 100644 packages/backend/src/services/test-run.ts create mode 100644 packages/backend/src/services/trigger.ts diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index d3fc6857..e0d25027 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -62,15 +62,13 @@ const searchTweets = async ( tweets.data.push(dataItem); }); } - } while (response.data.meta.next_token && options.lastInternalId); + } while (response.data.meta.next_token && !$.execution.testRun); tweets.data.sort((tweet, nextTweet) => { return (tweet.raw.id as number) - (nextTweet.raw.id as number); }); - for (const tweet of tweets.data) { - await $.process(tweet); - } + return tweets; }; export default searchTweets; diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index 39756abf..c9ac3344 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -1,6 +1,5 @@ import Context from '../../types/express/context'; -// eslint-disable-next-line @typescript-eslint/no-unused-vars -import flowQueue from '../../queues/flow'; +import testRun from '../../services/test-run'; type Params = { input: { @@ -13,25 +12,22 @@ const executeFlow = async ( params: Params, context: Context ) => { - // const untilStep = await context.currentUser - // .$relatedQuery('steps') - // .withGraphFetched('connection') - // .findOne({ - // 'steps.id': params.input.stepId, - // }) - // .throwIfNotFound(); - // const flow = await untilStep.$relatedQuery('flow'); - // const executionStep = await new Processor(flow, { - // untilStep, - // testRun: true, - // }).run(); - // await untilStep.$query().patch({ - // status: 'completed', - // }); - // if (executionStep.errorDetails) { - // throw new Error(JSON.stringify(executionStep.errorDetails)); - // } - // return { data: executionStep.dataOut, step: untilStep }; + const { stepId } = params.input; + const { executionStep } = await testRun({ stepId }); + + const untilStep = await context.currentUser + .$relatedQuery('steps') + .findById(stepId); + + await untilStep.$query().patch({ + status: 'completed', + }); + + if (executionStep.errorDetails) { + throw new Error(JSON.stringify(executionStep.errorDetails)); + } + + return { data: executionStep.dataOut, step: untilStep }; }; export default executeFlow; diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 711a4029..a81b31e0 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -8,8 +8,8 @@ type Params = { }; }; -const JOB_NAME = 'processorJob'; -const EVERY_15_MINUTES_CRON = '*/1 * * * *'; +const JOB_NAME = 'flow'; +const EVERY_15_MINUTES_CRON = '*/15 * * * *'; const updateFlowStatus = async ( _parent: unknown, @@ -43,8 +43,10 @@ const updateFlowStatus = async ( published_at: new Date().toISOString(), }); + const jobName = `${JOB_NAME}-${flow.id}`; + await flowQueue.add( - JOB_NAME, + jobName, { flowId: flow.id }, { repeat: repeatOptions, diff --git a/packages/backend/src/helpers/compute-parameters.ts b/packages/backend/src/helpers/compute-parameters.ts new file mode 100644 index 00000000..c37328bd --- /dev/null +++ b/packages/backend/src/helpers/compute-parameters.ts @@ -0,0 +1,43 @@ +import Step from '../models/step'; +import ExecutionStep from '../models/execution-step'; +import get from 'lodash.get'; + +const variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g; + +export default function computeParameters( + parameters: Step['parameters'], + executionSteps: ExecutionStep[] +): Step['parameters'] { + const entries = Object.entries(parameters); + return entries.reduce((result, [key, value]: [string, unknown]) => { + if (typeof value === 'string') { + const parts = value.split(variableRegExp); + + const computedValue = parts + .map((part: string) => { + const isVariable = part.match(variableRegExp); + if (isVariable) { + const stepIdAndKeyPath = part.replace(/{{step.|}}/g, '') as string; + const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); + const keyPath = keyPaths.join('.'); + const executionStep = executionSteps.find((executionStep) => { + return executionStep.stepId === stepId; + }); + const data = executionStep?.dataOut; + const dataValue = get(data, keyPath); + return dataValue; + } + + return part; + }) + .join(''); + + return { + ...result, + [key]: computedValue, + }; + } + + return result; + }, {}); +} diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 37783f8c..934dbf94 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -3,13 +3,7 @@ import Connection from '../models/connection'; import Flow from '../models/flow'; import Step from '../models/step'; import Execution from '../models/execution'; -import { - IJSONObject, - IApp, - IGlobalVariable, - ITriggerDataItem, -} from '@automatisch/types'; -import triggerQueue from '../queues/trigger'; +import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types'; type GlobalVariableOptions = { connection?: Connection; @@ -17,12 +11,13 @@ type GlobalVariableOptions = { flow?: Flow; step?: Step; execution?: Execution; + testRun?: boolean; }; const globalVariable = async ( options: GlobalVariableOptions ): Promise => { - const { connection, app, flow, step, execution } = options; + const { connection, app, flow, step, execution, testRun = false } = options; const lastInternalId = await flow?.lastInternalId(); @@ -66,19 +61,10 @@ const globalVariable = async ( }, execution: { id: execution?.id, + testRun, }, }; - variable.process = async (triggerDataItem: ITriggerDataItem) => { - const jobName = `${step.appKey}-${triggerDataItem.meta.internalId}`; - const jobPayload = { - $: variable, - triggerDataItem, - }; - - await triggerQueue.add(jobName, jobPayload); - }; - if (trigger && trigger.dedupeStrategy === 'unique') { const lastInternalIds = await flow?.lastInternalIds(); diff --git a/packages/backend/src/services/action.ts b/packages/backend/src/services/action.ts new file mode 100644 index 00000000..bbac7990 --- /dev/null +++ b/packages/backend/src/services/action.ts @@ -0,0 +1,55 @@ +import Step from '../models/step'; +import Flow from '../models/flow'; +import Execution from '../models/execution'; +import ExecutionStep from '../models/execution-step'; +import computeParameters from '../helpers/compute-parameters'; +import globalVariable from '../helpers/global-variable'; + +type ProcessActionOptions = { + flowId: string; + executionId: string; + stepId: string; +}; + +export const processAction = async (options: ProcessActionOptions) => { + const { flowId, stepId, executionId } = options; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const execution = await Execution.query() + .findById(executionId) + .throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + execution: execution, + }); + + const priorExecutionSteps = await ExecutionStep.query().where({ + execution_id: $.execution.id, + }); + + const computedParameters = computeParameters( + $.step.parameters, + priorExecutionSteps + ); + + const actionCommand = await step.getActionCommand(); + + $.step.parameters = computedParameters; + const actionOutput = await actionCommand.run($); + + const executionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: $.step.id, + status: actionOutput.error ? 'failure' : 'success', + dataIn: computedParameters, + dataOut: actionOutput.error ? null : actionOutput.data.raw, + errorDetails: actionOutput.error, + }); + + return { flowId, stepId, executionId, executionStep }; +}; diff --git a/packages/backend/src/services/flow.ts b/packages/backend/src/services/flow.ts new file mode 100644 index 00000000..c11473a2 --- /dev/null +++ b/packages/backend/src/services/flow.ts @@ -0,0 +1,24 @@ +import Flow from '../models/flow'; +import globalVariable from '../helpers/global-variable'; + +type ProcessFlowOptions = { + flowId: string; + testRun?: boolean; +}; + +export const processFlow = async (options: ProcessFlowOptions) => { + const flow = await Flow.query().findById(options.flowId).throwIfNotFound(); + + const triggerStep = await flow.getTriggerStep(); + const triggerCommand = await triggerStep.getTriggerCommand(); + + const $ = await globalVariable({ + flow, + connection: await triggerStep.$relatedQuery('connection'), + app: await triggerStep.getApp(), + step: triggerStep, + testRun: options.testRun, + }); + + return await triggerCommand.run($); +}; diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts deleted file mode 100644 index 0942cc78..00000000 --- a/packages/backend/src/services/processor.ts +++ /dev/null @@ -1,73 +0,0 @@ -import get from 'lodash.get'; -import { IActionOutput } from '@automatisch/types'; - -import App from '../models/app'; -import Flow from '../models/flow'; -import Step from '../models/step'; -import Execution from '../models/execution'; -import ExecutionStep from '../models/execution-step'; -import globalVariable from '../helpers/global-variable'; - -type ExecutionSteps = Record; - -type ProcessorOptions = { - untilStep?: Step; - testRun?: boolean; -}; - -class Processor { - flow: Flow; - untilStep?: Step; - testRun?: boolean; - - static variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g; - - constructor(flow: Flow, processorOptions: ProcessorOptions) { - this.flow = flow; - this.untilStep = processorOptions.untilStep; - this.testRun = processorOptions.testRun; - } - - static computeParameters( - parameters: Step['parameters'], - executionSteps: ExecutionStep[] - ): Step['parameters'] { - const entries = Object.entries(parameters); - return entries.reduce((result, [key, value]: [string, unknown]) => { - if (typeof value === 'string') { - const parts = value.split(Processor.variableRegExp); - - const computedValue = parts - .map((part: string) => { - const isVariable = part.match(Processor.variableRegExp); - if (isVariable) { - const stepIdAndKeyPath = part.replace( - /{{step.|}}/g, - '' - ) as string; - const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); - const keyPath = keyPaths.join('.'); - const executionStep = executionSteps.find((executionStep) => { - return executionStep.stepId === stepId; - }); - const data = executionStep?.dataOut; - const dataValue = get(data, keyPath); - return dataValue; - } - - return part; - }) - .join(''); - - return { - ...result, - [key]: computedValue, - }; - } - - return result; - }, {}); - } -} - -export default Processor; diff --git a/packages/backend/src/services/test-run.ts b/packages/backend/src/services/test-run.ts new file mode 100644 index 00000000..debc011d --- /dev/null +++ b/packages/backend/src/services/test-run.ts @@ -0,0 +1,65 @@ +import Step from '../models/step'; +import { processFlow } from '../services/flow'; +import { processTrigger } from '../services/trigger'; +import { processAction } from '../services/action'; + +type TestRunOptions = { + stepId: string; +}; + +const testRun = async (options: TestRunOptions) => { + const untilStep = await Step.query() + .findById(options.stepId) + .throwIfNotFound(); + + const flow = await untilStep.$relatedQuery('flow'); + const [triggerStep, ...actionSteps] = await flow + .$relatedQuery('steps') + .withGraphFetched('connection') + .orderBy('position', 'asc'); + + const { data, error: triggerError } = await processFlow({ + flowId: flow.id, + testRun: true, + }); + + const firstTriggerDataItem = data[0]; + + const { executionId, executionStep: triggerExecutionStep } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + triggerDataItem: firstTriggerDataItem, + testRun: true, + }); + + if (triggerError) { + const { executionStep: triggerExecutionStepWithError } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + error: triggerError, + testRun: true, + }); + + return { executionStep: triggerExecutionStepWithError }; + } + + if (triggerStep.id === untilStep.id) { + return { executionStep: triggerExecutionStep }; + } + + for (const actionStep of actionSteps) { + const { executionStep: actionExecutionStep } = await processAction({ + flowId: flow.id, + stepId: actionStep.id, + executionId, + }); + + if (actionStep.id === untilStep.id || actionExecutionStep.errorDetails) { + return { executionStep: actionExecutionStep }; + } + } +}; + +export default testRun; diff --git a/packages/backend/src/services/trigger.ts b/packages/backend/src/services/trigger.ts new file mode 100644 index 00000000..61ee8c2e --- /dev/null +++ b/packages/backend/src/services/trigger.ts @@ -0,0 +1,46 @@ +import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; +import Step from '../models/step'; +import Flow from '../models/flow'; +import Execution from '../models/execution'; +import globalVariable from '../helpers/global-variable'; + +type ProcessTriggerOptions = { + flowId: string; + stepId: string; + triggerDataItem?: ITriggerDataItem; + error?: IJSONObject; + testRun?: boolean; +}; + +export const processTrigger = async (options: ProcessTriggerOptions) => { + const { flowId, stepId, triggerDataItem, error, testRun } = options; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + }); + + // check if we already process this trigger data item or not! + + const execution = await Execution.query().insert({ + flowId: $.flow.id, + testRun, + internalId: triggerDataItem.meta.internalId, + }); + + const executionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: $.step.id, + status: error ? 'failure' : 'success', + dataIn: $.step.parameters, + dataOut: !error ? triggerDataItem.raw : null, + errorDetails: error, + }); + + return { flowId, stepId, executionId: execution.id, executionStep }; +}; diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index c63a751f..61452076 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -270,6 +270,7 @@ export type IGlobalVariable = { }; execution?: { id: string; + testRun: boolean; } process?: (triggerDataItem: ITriggerDataItem) => Promise; };