diff --git a/packages/backend/src/controllers/webhooks/handler.ts b/packages/backend/src/controllers/webhooks/handler.ts index 17c15339..72329adc 100644 --- a/packages/backend/src/controllers/webhooks/handler.ts +++ b/packages/backend/src/controllers/webhooks/handler.ts @@ -3,8 +3,10 @@ import bcrypt from 'bcrypt'; import { IRequest, ITriggerItem } from '@automatisch/types'; import Flow from '../../models/flow'; -import triggerQueue from '../../queues/trigger'; +import { processTrigger } from '../../services/trigger'; +import actionQueue from '../../queues/action'; import globalVariable from '../../helpers/global-variable'; +import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../../helpers/remove-job-configuration'; export default async (request: IRequest, response: Response) => { const flow = await Flow.query() @@ -47,15 +49,27 @@ export default async (request: IRequest, response: Response) => { }, }; - const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`; - - const jobPayload = { + const { flowId, executionId } = await processTrigger({ flowId: flow.id, stepId: triggerStep.id, triggerItem, + }); + + const nextStep = await triggerStep.getNextStep(); + const jobName = `${executionId}-${nextStep.id}`; + + const jobPayload = { + flowId, + executionId, + stepId: nextStep.id, }; - await triggerQueue.add(jobName, jobPayload); + const jobOptions = { + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, + } + + await actionQueue.add(jobName, jobPayload, jobOptions); return response.sendStatus(200); };