From 7484bf7403f59696a3bc77e0e4c06b4f4572491f Mon Sep 17 00:00:00 2001 From: Ali BARIN Date: Fri, 23 Feb 2024 16:19:26 +0000 Subject: [PATCH] feat(webhook/catch-raw-webhook): add sync support --- .../triggers/catch-raw-webhook/index.js | 15 +++- .../webhooks/handler-sync-by-flow-id.js | 31 +++++++ .../src/helpers/webhook-handler-sync.js | 86 +++++++++++++++++++ packages/backend/src/models/step.js | 4 + packages/backend/src/routes/webhooks.js | 2 + 5 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 packages/backend/src/controllers/webhooks/handler-sync-by-flow-id.js create mode 100644 packages/backend/src/helpers/webhook-handler-sync.js diff --git a/packages/backend/src/apps/webhook/triggers/catch-raw-webhook/index.js b/packages/backend/src/apps/webhook/triggers/catch-raw-webhook/index.js index 88bbb0a5..d396b617 100644 --- a/packages/backend/src/apps/webhook/triggers/catch-raw-webhook/index.js +++ b/packages/backend/src/apps/webhook/triggers/catch-raw-webhook/index.js @@ -7,7 +7,20 @@ export default defineTrigger({ key: 'catchRawWebhook', type: 'webhook', showWebhookUrl: true, - description: 'Triggers when the webhook receives a request.', + description: + 'Triggers in a synchronous way when the webhook receives a request.', + arguments: [ + { + label: 'Wait until flow is done', + key: 'workSynchronously', + type: 'dropdown', + required: true, + options: [ + { label: 'Yes', value: true }, + { label: 'No', value: false }, + ], + }, + ], async run($) { const dataItem = { diff --git a/packages/backend/src/controllers/webhooks/handler-sync-by-flow-id.js b/packages/backend/src/controllers/webhooks/handler-sync-by-flow-id.js new file mode 100644 index 00000000..86941664 --- /dev/null +++ b/packages/backend/src/controllers/webhooks/handler-sync-by-flow-id.js @@ -0,0 +1,31 @@ +import Flow from '../../models/flow.js'; +import logger from '../../helpers/logger.js'; +import handlerSync from '../../helpers/webhook-handler-sync.js'; + +export default async (request, response) => { + const computedRequestPayload = { + headers: request.headers, + body: request.body, + query: request.query, + params: request.params, + }; + + logger.debug(`Handling incoming webhook request at ${request.originalUrl}.`); + logger.debug(JSON.stringify(computedRequestPayload, null, 2)); + + const flowId = request.params.flowId; + const flow = await Flow.query().findById(flowId).throwIfNotFound(); + const triggerStep = await flow.getTriggerStep(); + + if (triggerStep.appKey !== 'webhook') { + const connection = await triggerStep.$relatedQuery('connection'); + + if (!(await connection.verifyWebhook(request))) { + return response.sendStatus(401); + } + } + + await handlerSync(flowId, request, response); + + response.sendStatus(204); +}; diff --git a/packages/backend/src/helpers/webhook-handler-sync.js b/packages/backend/src/helpers/webhook-handler-sync.js new file mode 100644 index 00000000..60fa2dc5 --- /dev/null +++ b/packages/backend/src/helpers/webhook-handler-sync.js @@ -0,0 +1,86 @@ +import isEmpty from 'lodash/isEmpty.js'; + +import Flow from '../models/flow.js'; +import { processTrigger } from '../services/trigger.js'; +import { processAction } from '../services/action.js'; +import globalVariable from './global-variable.js'; +import QuotaExceededError from '../errors/quote-exceeded.js'; + +export default async (flowId, request, response) => { + const flow = await Flow.query().findById(flowId).throwIfNotFound(); + const user = await flow.$relatedQuery('user'); + + const testRun = !flow.active; + const quotaExceeded = !testRun && !(await user.isAllowedToRunFlows()); + + if (quotaExceeded) { + throw new QuotaExceededError(); + } + + const [triggerStep, ...actionSteps] = await flow + .$relatedQuery('steps') + .withGraphFetched('connection') + .orderBy('position', 'asc'); + const app = await triggerStep.getApp(); + const isWebhookApp = app.key === 'webhook'; + + if (testRun && !isWebhookApp) { + return response.status(404); + } + + const connection = await triggerStep.$relatedQuery('connection'); + + const $ = await globalVariable({ + flow, + connection, + app, + step: triggerStep, + testRun, + request, + }); + + const triggerCommand = await triggerStep.getTriggerCommand(); + await triggerCommand.run($); + + const reversedTriggerItems = $.triggerOutput.data.reverse(); + + // This is the case when we filter out the incoming data + // in the run method of the webhook trigger. + // In this case, we don't want to process anything. + if (isEmpty(reversedTriggerItems)) { + return response.status(204); + } + + // set default status, but do not send it yet! + response.status(204); + + for (const triggerItem of reversedTriggerItems) { + const { executionId } = await processTrigger({ + flowId, + stepId: triggerStep.id, + triggerItem, + testRun, + }); + + if (testRun) { + // in case of testing, we do not process the whole process. + continue; + } + + for (const actionStep of actionSteps) { + const { executionStep: actionExecutionStep } = await processAction({ + flowId: flow.id, + stepId: actionStep.id, + executionId, + }); + + if (actionStep.key === 'respondWith' && !response.headersSent) { + // we send the response only if it's not sent yet. This allows us to early respond from the flow. + response.status(actionExecutionStep.dataOut.statusCode); + response.send(actionExecutionStep.dataOut.body); + } + } + } + + return response; +}; diff --git a/packages/backend/src/models/step.js b/packages/backend/src/models/step.js index 03af3312..1bb62190 100644 --- a/packages/backend/src/models/step.js +++ b/packages/backend/src/models/step.js @@ -103,6 +103,10 @@ class Step extends Base { return `/webhooks/connections/${this.connectionId}`; } + if (this.parameters.workSynchronously) { + return `/webhooks/flows/${this.flowId}/sync`; + } + return `/webhooks/flows/${this.flowId}`; } diff --git a/packages/backend/src/routes/webhooks.js b/packages/backend/src/routes/webhooks.js index 6d6898c9..98cadef0 100644 --- a/packages/backend/src/routes/webhooks.js +++ b/packages/backend/src/routes/webhooks.js @@ -3,6 +3,7 @@ import multer from 'multer'; import appConfig from '../config/app.js'; import webhookHandlerByFlowId from '../controllers/webhooks/handler-by-flow-id.js'; +import webhookHandlerSyncByFlowId from '../controllers/webhooks/handler-sync-by-flow-id.js'; import webhookHandlerByConnectionIdAndRefValue from '../controllers/webhooks/handler-by-connection-id-and-ref-value.js'; const router = Router(); @@ -46,6 +47,7 @@ createRouteHandler( '/connections/:connectionId', webhookHandlerByConnectionIdAndRefValue ); +createRouteHandler('/flows/:flowId/sync', webhookHandlerSyncByFlowId); createRouteHandler('/flows/:flowId', webhookHandlerByFlowId); createRouteHandler('/:flowId', webhookHandlerByFlowId);