diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts index 67f4e5a6..2eb450aa 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts @@ -33,7 +33,6 @@ export default { async run($: IGlobalVariable) { return await searchTweets($, { searchTerm: $.step.parameters.searchTerm as string, - lastInternalId: $.flow.lastInternalId, }); }, diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index 1c8125d7..d3fc6857 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -68,7 +68,9 @@ const searchTweets = async ( return (tweet.raw.id as number) - (nextTweet.raw.id as number); }); - return tweets; + for (const tweet of tweets.data) { + await $.process(tweet); + } }; export default searchTweets; diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index 063259af..39756abf 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -1,7 +1,6 @@ import Context from '../../types/express/context'; -import Processor from '../../services/processor'; // eslint-disable-next-line @typescript-eslint/no-unused-vars -import processorQueue from '../../queues/processor'; +import flowQueue from '../../queues/flow'; type Params = { input: { @@ -14,30 +13,25 @@ const executeFlow = async ( params: Params, context: Context ) => { - const untilStep = await context.currentUser - .$relatedQuery('steps') - .withGraphFetched('connection') - .findOne({ - 'steps.id': params.input.stepId, - }) - .throwIfNotFound(); - - const flow = await untilStep.$relatedQuery('flow'); - - const executionStep = await new Processor(flow, { - untilStep, - testRun: true, - }).run(); - - await untilStep.$query().patch({ - status: 'completed', - }); - - if (executionStep.errorDetails) { - throw new Error(JSON.stringify(executionStep.errorDetails)); - } - - return { data: executionStep.dataOut, step: untilStep }; + // const untilStep = await context.currentUser + // .$relatedQuery('steps') + // .withGraphFetched('connection') + // .findOne({ + // 'steps.id': params.input.stepId, + // }) + // .throwIfNotFound(); + // const flow = await untilStep.$relatedQuery('flow'); + // const executionStep = await new Processor(flow, { + // untilStep, + // testRun: true, + // }).run(); + // await untilStep.$query().patch({ + // status: 'completed', + // }); + // if (executionStep.errorDetails) { + // throw new Error(JSON.stringify(executionStep.errorDetails)); + // } + // return { data: executionStep.dataOut, step: untilStep }; }; export default executeFlow; diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 97e6a69e..711a4029 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -1,5 +1,5 @@ import Context from '../../types/express/context'; -import processorQueue from '../../queues/processor'; +import flowQueue from '../../queues/flow'; type Params = { input: { @@ -9,7 +9,7 @@ type Params = { }; const JOB_NAME = 'processorJob'; -const EVERY_15_MINUTES_CRON = '*/15 * * * *'; +const EVERY_15_MINUTES_CRON = '*/1 * * * *'; const updateFlowStatus = async ( _parent: unknown, @@ -32,7 +32,7 @@ const updateFlowStatus = async ( }); const triggerStep = await flow.getTriggerStep(); - const trigger = await triggerStep.getTrigger(); + const trigger = await triggerStep.getTriggerCommand(); const interval = trigger.getInterval?.(triggerStep.parameters); const repeatOptions = { cron: interval || EVERY_15_MINUTES_CRON, @@ -43,7 +43,7 @@ const updateFlowStatus = async ( published_at: new Date().toISOString(), }); - await processorQueue.add( + await flowQueue.add( JOB_NAME, { flowId: flow.id }, { @@ -52,10 +52,10 @@ const updateFlowStatus = async ( } ); } else { - const repeatableJobs = await processorQueue.getRepeatableJobs(); + const repeatableJobs = await flowQueue.getRepeatableJobs(); const job = repeatableJobs.find((job) => job.id === flow.id); - await processorQueue.removeRepeatableByKey(job.key); + await flowQueue.removeRepeatableByKey(job.key); } return flow; diff --git a/packages/backend/src/helpers/create-bull-board-handler.ts b/packages/backend/src/helpers/create-bull-board-handler.ts index 7525aaa9..c74199e7 100644 --- a/packages/backend/src/helpers/create-bull-board-handler.ts +++ b/packages/backend/src/helpers/create-bull-board-handler.ts @@ -1,13 +1,19 @@ import { ExpressAdapter } from '@bull-board/express'; import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; -import processorQueue from '../queues/processor'; +import flowQueue from '../queues/flow'; +import triggerQueue from '../queues/trigger'; +import actionQueue from '../queues/action'; const serverAdapter = new ExpressAdapter(); const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => { createBullBoard({ - queues: [new BullMQAdapter(processorQueue)], + queues: [ + new BullMQAdapter(flowQueue), + new BullMQAdapter(triggerQueue), + new BullMQAdapter(actionQueue), + ], serverAdapter: serverAdapter, }); }; diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 7077d914..37783f8c 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -2,23 +2,37 @@ import createHttpClient from './http-client'; import Connection from '../models/connection'; import Flow from '../models/flow'; import Step from '../models/step'; -import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types'; +import Execution from '../models/execution'; +import { + IJSONObject, + IApp, + IGlobalVariable, + ITriggerDataItem, +} from '@automatisch/types'; +import triggerQueue from '../queues/trigger'; type GlobalVariableOptions = { connection?: Connection; app: IApp; flow?: Flow; step?: Step; + execution?: Execution; }; const globalVariable = async ( options: GlobalVariableOptions ): Promise => { - const { connection, app, flow, step } = options; + const { connection, app, flow, step, execution } = options; const lastInternalId = await flow?.lastInternalId(); - return { + const trigger = await step?.getTriggerCommand(); + const nextStep = await flow + ?.$relatedQuery('steps') + .where({ position: step.position + 1 }) + .first(); + + const variable: IGlobalVariable = { auth: { set: async (args: IJSONObject) => { if (connection) { @@ -37,12 +51,45 @@ const globalVariable = async ( app: app, http: createHttpClient({ baseURL: app.baseUrl }), flow: { + id: flow?.id, lastInternalId, }, step: { + id: step?.id, + appKey: step?.appKey, parameters: step?.parameters || {}, }, + nextStep: { + id: nextStep?.id, + appKey: nextStep?.appKey, + parameters: nextStep?.parameters || {}, + }, + execution: { + id: execution?.id, + }, }; + + variable.process = async (triggerDataItem: ITriggerDataItem) => { + const jobName = `${step.appKey}-${triggerDataItem.meta.internalId}`; + const jobPayload = { + $: variable, + triggerDataItem, + }; + + await triggerQueue.add(jobName, jobPayload); + }; + + if (trigger && trigger.dedupeStrategy === 'unique') { + const lastInternalIds = await flow?.lastInternalIds(); + + const isAlreadyProcessed = (internalId: string) => { + return lastInternalIds?.includes(internalId); + }; + + variable.flow.isAlreadyProcessed = isAlreadyProcessed; + } + + return variable; }; export default globalVariable; diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index db536431..fea848e1 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -12,6 +12,7 @@ class Flow extends Base { active: boolean; steps: Step[]; published_at: string; + executions?: Execution[]; static tableName = 'flows'; @@ -57,6 +58,15 @@ class Flow extends Base { return lastExecution ? (lastExecution as Execution).internalId : null; } + async lastInternalIds(itemCount = 50) { + const lastExecutions = await this.$relatedQuery('executions') + .select('internal_id') + .orderBy('created_at', 'desc') + .limit(itemCount); + + return lastExecutions.map((execution) => execution.internalId); + } + async $beforeUpdate( opt: ModelOptions, queryContext: QueryContext diff --git a/packages/backend/src/models/step.ts b/packages/backend/src/models/step.ts index b69f3d20..78c0d31d 100644 --- a/packages/backend/src/models/step.ts +++ b/packages/backend/src/models/step.ts @@ -92,16 +92,35 @@ class Step extends Base { return this.type === 'trigger'; } - async getTrigger() { - if (!this.isTrigger) return null; + get isAction(): boolean { + return this.type === 'action'; + } - const { appKey, key } = this; + async getApp() { + if (!this.appKey) return null; + + return await App.findOneByKey(this.appKey); + } + + async getTriggerCommand() { + const { appKey, key, isTrigger } = this; + if (!isTrigger || !appKey || !key) return null; const app = await App.findOneByKey(appKey); const command = app.triggers.find((trigger) => trigger.key === key); return command; } + + async getActionCommand() { + const { appKey, key, isAction } = this; + if (!isAction || !appKey || !key) return null; + + const app = await App.findOneByKey(appKey); + const command = app.actions.find((action) => action.key === key); + + return command; + } } export default Step; diff --git a/packages/backend/src/queues/action.ts b/packages/backend/src/queues/action.ts new file mode 100644 index 00000000..44d8b023 --- /dev/null +++ b/packages/backend/src/queues/action.ts @@ -0,0 +1,25 @@ +import process from 'process'; +import { Queue } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; + +const CONNECTION_REFUSED = 'ECONNREFUSED'; + +const redisConnection = { + connection: redisConfig, +}; + +const actionQueue = new Queue('action', redisConnection); + +process.on('SIGTERM', async () => { + await actionQueue.close(); +}); + +actionQueue.on('error', (err) => { + if ((err as any).code === CONNECTION_REFUSED) { + logger.error('Make sure you have installed Redis and it is running.', err); + process.exit(); + } +}); + +export default actionQueue; diff --git a/packages/backend/src/queues/processor.ts b/packages/backend/src/queues/flow.ts similarity index 70% rename from packages/backend/src/queues/processor.ts rename to packages/backend/src/queues/flow.ts index b9b13c37..9353d72e 100644 --- a/packages/backend/src/queues/processor.ts +++ b/packages/backend/src/queues/flow.ts @@ -9,18 +9,18 @@ const redisConnection = { connection: redisConfig, }; -const processorQueue = new Queue('processor', redisConnection); -const queueScheduler = new QueueScheduler('processor', redisConnection); +const flowQueue = new Queue('flow', redisConnection); +const queueScheduler = new QueueScheduler('flow', redisConnection); process.on('SIGTERM', async () => { await queueScheduler.close(); }); -processorQueue.on('error', (err) => { +flowQueue.on('error', (err) => { if ((err as any).code === CONNECTION_REFUSED) { logger.error('Make sure you have installed Redis and it is running.', err); process.exit(); } }); -export default processorQueue; +export default flowQueue; diff --git a/packages/backend/src/queues/trigger.ts b/packages/backend/src/queues/trigger.ts new file mode 100644 index 00000000..4535deb1 --- /dev/null +++ b/packages/backend/src/queues/trigger.ts @@ -0,0 +1,25 @@ +import process from 'process'; +import { Queue } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; + +const CONNECTION_REFUSED = 'ECONNREFUSED'; + +const redisConnection = { + connection: redisConfig, +}; + +const triggerQueue = new Queue('trigger', redisConnection); + +process.on('SIGTERM', async () => { + await triggerQueue.close(); +}); + +triggerQueue.on('error', (err) => { + if ((err as any).code === CONNECTION_REFUSED) { + logger.error('Make sure you have installed Redis and it is running.', err); + process.exit(); + } +}); + +export default triggerQueue; diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index 0656cceb..0942cc78 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -28,165 +28,9 @@ class Processor { this.testRun = processorOptions.testRun; } - async run() { - const steps = await this.flow - .$relatedQuery('steps') - .withGraphFetched('connection') - .orderBy('position', 'asc'); - - const triggerStep = steps.find((step) => step.type === 'trigger'); - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const initialTriggerData = await this.getInitialTriggerData(triggerStep!); - - if (!initialTriggerData.error && initialTriggerData.data.length === 0) { - const lastInternalId = await this.flow.lastInternalId(); - - const executionData: Partial = { - flowId: this.flow.id, - testRun: this.testRun, - }; - - if (lastInternalId) { - executionData.internalId = lastInternalId; - } - - await Execution.query().insert(executionData); - - return; - } - - if (this.testRun && initialTriggerData.data.length > 0) { - initialTriggerData.data = [initialTriggerData.data[0]]; - } - - const executions: Execution[] = []; - - for await (const data of initialTriggerData.data) { - const execution = await Execution.query().insert({ - flowId: this.flow.id, - testRun: this.testRun, - internalId: data.meta.internalId as string, - }); - - executions.push(execution); - - let previousExecutionStep: ExecutionStep; - const priorExecutionSteps: ExecutionSteps = {}; - - let fetchedActionData: IActionOutput = { - data: null, - }; - - for await (const step of steps) { - if (!step.appKey) continue; - - const { appKey, key, type, parameters: rawParameters = {}, id } = step; - - const isTrigger = type === 'trigger'; - const app = await App.findOneByKey(appKey); - - const computedParameters = Processor.computeParameters( - rawParameters, - priorExecutionSteps - ); - - const clonedStep = Object.assign({}, step); - clonedStep.parameters = computedParameters; - - const $ = await globalVariable({ - connection: step.connection, - app, - flow: this.flow, - step: clonedStep, - }); - - if (!isTrigger && key) { - const command = app.actions.find((action) => action.key === key); - fetchedActionData = await command.run($); - } - - if (!isTrigger && fetchedActionData.error) { - await execution.$relatedQuery('executionSteps').insertAndFetch({ - stepId: id, - status: 'failure', - dataIn: null, - dataOut: computedParameters, - errorDetails: fetchedActionData.error, - }); - - break; - } - - previousExecutionStep = await execution - .$relatedQuery('executionSteps') - .insertAndFetch({ - stepId: id, - status: 'success', - dataIn: isTrigger ? rawParameters : computedParameters, - dataOut: isTrigger ? data.raw : fetchedActionData.data.raw, - }); - - priorExecutionSteps[id] = previousExecutionStep; - - if (id === this.untilStep?.id) { - break; - } - } - } - - if (initialTriggerData.error) { - const executionWithError = await Execution.query().insert({ - flowId: this.flow.id, - testRun: this.testRun, - }); - - executions.push(executionWithError); - - await executionWithError.$relatedQuery('executionSteps').insertAndFetch({ - stepId: triggerStep.id, - status: 'failure', - dataIn: triggerStep.parameters, - errorDetails: initialTriggerData.error, - }); - } - - if (!this.testRun) return; - - const lastExecutionStepFromFirstExecution = await executions[0] - .$relatedQuery('executionSteps') - .orderBy('created_at', 'desc') - .first(); - - return lastExecutionStepFromFirstExecution; - } - - async getInitialTriggerData(step: Step) { - if (!step.appKey || !step.key) return null; - - const app = await App.findOneByKey(step.appKey); - const $ = await globalVariable({ - connection: step.connection, - app, - flow: this.flow, - step, - }); - - const command = app.triggers.find((trigger) => trigger.key === step.key); - - let fetchedData; - - if (this.testRun) { - fetchedData = await command.testRun($); - } else { - fetchedData = await command.run($); - } - - return fetchedData; - } - static computeParameters( parameters: Step['parameters'], - executionSteps: ExecutionSteps + executionSteps: ExecutionStep[] ): Step['parameters'] { const entries = Object.entries(parameters); return entries.reduce((result, [key, value]: [string, unknown]) => { @@ -203,7 +47,9 @@ class Processor { ) as string; const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); const keyPath = keyPaths.join('.'); - const executionStep = executionSteps[stepId.toString() as string]; + const executionStep = executionSteps.find((executionStep) => { + return executionStep.stepId === stepId; + }); const data = executionStep?.dataOut; const dataValue = get(data, keyPath); return dataValue; diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index 242562f0..a69b069b 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -1,5 +1,7 @@ import './config/orm'; -export { worker } from './workers/processor'; +import './workers/flow'; +import './workers/trigger'; +import './workers/action'; import telemetry from './helpers/telemetry'; telemetry.setServiceType('worker'); diff --git a/packages/backend/src/workers/action.ts b/packages/backend/src/workers/action.ts new file mode 100644 index 00000000..f1022e1a --- /dev/null +++ b/packages/backend/src/workers/action.ts @@ -0,0 +1,99 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import Flow from '../models/flow'; +import logger from '../helpers/logger'; +import globalVariable from '../helpers/global-variable'; +import { IGlobalVariable } from '@automatisch/types'; +import Execution from '../models/execution'; +import Processor from '../services/processor'; +import ExecutionStep from '../models/execution-step'; +import Step from '../models/step'; +import actionQueue from '../queues/action'; + +type JobData = { + flowId: string; + executionId: string; + stepId: string; +}; + +export const worker = new Worker( + 'action', + async (job) => { + const { flowId, stepId, executionId } = job.data as JobData; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const execution = await Execution.query() + .findById(executionId) + .throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + execution: execution, + }); + + const priorExecutionSteps = await ExecutionStep.query().where({ + execution_id: $.execution.id, + }); + + const computedParameters = Processor.computeParameters( + $.step.parameters, + priorExecutionSteps + ); + + const actionCommand = await step.getActionCommand(); + + $.step.parameters = computedParameters; + const actionDataItem = await actionCommand.run($); + + await execution.$relatedQuery('executionSteps').insertAndFetch({ + stepId: $.step.id, + status: 'success', + dataIn: computedParameters, + dataOut: actionDataItem.data.raw, + }); + + // TODO: Add until step id logic here! + // TODO: Change job name for the action data item! + const jobName = `${$.step.appKey}-sample`; + + if (!$.nextStep.id) return; + + const nextStep = await Step.query() + .findById($.nextStep.id) + .throwIfNotFound(); + + console.log('hello world'); + + const variable = await globalVariable({ + flow: await Flow.query().findById($.flow.id), + app: await nextStep.getApp(), + step: nextStep, + connection: await nextStep.$relatedQuery('connection'), + execution: execution, + }); + + const jobPayload = { + $: variable, + }; + + await actionQueue.add(jobName, jobPayload); + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed22 to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/backend/src/workers/processor.ts b/packages/backend/src/workers/flow.ts similarity index 57% rename from packages/backend/src/workers/processor.ts rename to packages/backend/src/workers/flow.ts index 17c24e52..cd315fa3 100644 --- a/packages/backend/src/workers/processor.ts +++ b/packages/backend/src/workers/flow.ts @@ -1,27 +1,36 @@ import { Worker } from 'bullmq'; -import Processor from '../services/processor'; import redisConfig from '../config/redis'; import Flow from '../models/flow'; import logger from '../helpers/logger'; +import globalVariable from '../helpers/global-variable'; export const worker = new Worker( - 'processor', + 'flow', async (job) => { const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound(); - const data = await new Processor(flow, { testRun: false }).run(); - return data; + const triggerStep = await flow.getTriggerStep(); + const triggerCommand = await triggerStep.getTriggerCommand(); + + const $ = await globalVariable({ + flow, + connection: await triggerStep.$relatedQuery('connection'), + app: await triggerStep.getApp(), + step: triggerStep, + }); + + await triggerCommand.run($); }, { connection: redisConfig } ); worker.on('completed', (job) => { - logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has completed!`); + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); }); worker.on('failed', (job, err) => { logger.info( - `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed with ${err.message}` + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` ); }); diff --git a/packages/backend/src/workers/trigger.ts b/packages/backend/src/workers/trigger.ts new file mode 100644 index 00000000..29d07fde --- /dev/null +++ b/packages/backend/src/workers/trigger.ts @@ -0,0 +1,64 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import Flow from '../models/flow'; +import logger from '../helpers/logger'; +import globalVariable from '../helpers/global-variable'; +import { ITriggerDataItem, IGlobalVariable } from '@automatisch/types'; +import Execution from '../models/execution'; +import actionQueue from '../queues/action'; +import Step from '../models/step'; + +type JobData = { + $: IGlobalVariable; + triggerDataItem: ITriggerDataItem; +}; + +export const worker = new Worker( + 'trigger', + async (job) => { + const { $, triggerDataItem } = job.data as JobData; + + // check if we already process this trigger data item or not! + + const execution = await Execution.query().insert({ + flowId: $.flow.id, + // TODO: Check the testRun logic and adjust following line! + testRun: true, + internalId: triggerDataItem.meta.internalId, + }); + + await execution.$relatedQuery('executionSteps').insertAndFetch({ + stepId: $.step.id, + status: 'success', + dataIn: $.step.parameters, + dataOut: triggerDataItem.raw, + }); + + const jobName = `${$.step.appKey}-${triggerDataItem.meta.internalId}`; + + const nextStep = await Step.query().findById($.nextStep.id); + + const jobPayload = { + flowId: $.flow.id, + executionId: execution.id, + stepId: nextStep.id, + }; + + await actionQueue.add(jobName, jobPayload); + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 8925fa8d..c63a751f 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -204,6 +204,7 @@ export interface ITrigger { key: string; pollInterval: number; description: string; + dedupeStrategy: 'greatest' | 'unique' | 'last'; substeps: ISubstep[]; getInterval(parameters: IGlobalVariable['step']['parameters']): string; run($: IGlobalVariable): Promise; @@ -252,12 +253,25 @@ export type IGlobalVariable = { }; app: IApp; http: IHttpClient; - flow: { + flow?: { + id: string; lastInternalId: string; + isAlreadyProcessed?: (internalId: string) => boolean; }; - step: { + step?: { + id: string; + appKey: string; parameters: IJSONObject; }; + nextStep?: { + id: string; + appKey: string; + parameters: IJSONObject; + }; + execution?: { + id: string; + } + process?: (triggerDataItem: ITriggerDataItem) => Promise; }; declare module 'axios' {