diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 0130515b..db3c8d85 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -102,7 +102,9 @@ const globalVariable = async ( $.triggerOutput.data.push(triggerItem); - if ($.execution.testRun) { + const isWebhookApp = app.key === 'webhook'; + + if ($.execution.testRun && !isWebhookApp) { // early exit after receiving one item as it is enough for test execution throw new EarlyExitError(); } @@ -145,7 +147,9 @@ const globalVariable = async ( } const lastInternalIds = - testRun || (flow && step?.isAction) ? [] : await flow?.lastInternalIds(2000); + testRun || (flow && step?.isAction) + ? [] + : await flow?.lastInternalIds(2000); const isAlreadyProcessed = (internalId: string) => { return lastInternalIds?.includes(internalId); diff --git a/packages/backend/src/helpers/webhook-handler.ts b/packages/backend/src/helpers/webhook-handler.ts index 050c9d1d..e1a0c96d 100644 --- a/packages/backend/src/helpers/webhook-handler.ts +++ b/packages/backend/src/helpers/webhook-handler.ts @@ -1,6 +1,6 @@ -import Crypto from 'node:crypto'; import { Response } from 'express'; -import { IRequest, ITriggerItem } from '@automatisch/types'; +import { IRequest } from '@automatisch/types'; +import isEmpty from 'lodash/isEmpty'; import Flow from '../models/flow'; import { processTrigger } from '../services/trigger'; @@ -12,18 +12,12 @@ import { REMOVE_AFTER_7_DAYS_OR_50_JOBS, } from './remove-job-configuration'; -export default async (flowId: string, request: IRequest, response: Response) => { - // in case it's our built-in generic webhook trigger - let computedRequestPayload = { - headers: request.headers, - body: request.body, - query: request.query, - }; - - const flow = await Flow.query() - .findById(flowId) - .throwIfNotFound(); - +export default async ( + flowId: string, + request: IRequest, + response: Response +) => { + const flow = await Flow.query().findById(flowId).throwIfNotFound(); const user = await flow.$relatedQuery('user'); const testRun = !flow.active; @@ -37,48 +31,61 @@ export default async (flowId: string, request: IRequest, response: Response) => const app = await triggerStep.getApp(); const isWebhookApp = app.key === 'webhook'; - if ((testRun && !isWebhookApp)) { + if (testRun && !isWebhookApp) { return response.status(404); } - // in case trigger type is 'webhook' - if (!isWebhookApp) { - computedRequestPayload = request.body; - } + const connection = await triggerStep.$relatedQuery('connection'); - const triggerItem: ITriggerItem = { - raw: computedRequestPayload, - meta: { - internalId: Crypto.randomUUID(), - }, - }; - - const { executionId } = await processTrigger({ - flowId, - stepId: triggerStep.id, - triggerItem, + const $ = await globalVariable({ + flow, + connection, + app, + step: triggerStep, testRun, + request, }); - if (testRun) { + const triggerCommand = await triggerStep.getTriggerCommand(); + await triggerCommand.run($); + + const reversedTriggerItems = $.triggerOutput.data.reverse(); + + // This is the case when we filter out the incoming data + // in the run method of the webhook trigger. + // In this case, we don't want to process anything. + if (isEmpty(reversedTriggerItems)) { return response.status(204); } - const nextStep = await triggerStep.getNextStep(); - const jobName = `${executionId}-${nextStep.id}`; + for (const triggerItem of reversedTriggerItems) { + const { executionId } = await processTrigger({ + flowId, + stepId: triggerStep.id, + triggerItem, + testRun, + }); - const jobPayload = { - flowId, - executionId, - stepId: nextStep.id, - }; + if (testRun) { + continue; + } - const jobOptions = { - removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, - removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, - }; + const nextStep = await triggerStep.getNextStep(); + const jobName = `${executionId}-${nextStep.id}`; - await actionQueue.add(jobName, jobPayload, jobOptions); + const jobPayload = { + flowId, + executionId, + stepId: nextStep.id, + }; + + const jobOptions = { + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, + }; + + await actionQueue.add(jobName, jobPayload, jobOptions); + } return response.status(204); };