feat(webhook/catch-raw-webhook): add sync support

This commit is contained in:
Ali BARIN
2024-02-23 16:19:26 +00:00
parent f6b2312c49
commit 7484bf7403
5 changed files with 137 additions and 1 deletions

View File

@@ -7,7 +7,20 @@ export default defineTrigger({
key: 'catchRawWebhook', key: 'catchRawWebhook',
type: 'webhook', type: 'webhook',
showWebhookUrl: true, showWebhookUrl: true,
description: 'Triggers when the webhook receives a request.', description:
'Triggers in a synchronous way when the webhook receives a request.',
arguments: [
{
label: 'Wait until flow is done',
key: 'workSynchronously',
type: 'dropdown',
required: true,
options: [
{ label: 'Yes', value: true },
{ label: 'No', value: false },
],
},
],
async run($) { async run($) {
const dataItem = { const dataItem = {

View File

@@ -0,0 +1,31 @@
import Flow from '../../models/flow.js';
import logger from '../../helpers/logger.js';
import handlerSync from '../../helpers/webhook-handler-sync.js';
export default async (request, response) => {
const computedRequestPayload = {
headers: request.headers,
body: request.body,
query: request.query,
params: request.params,
};
logger.debug(`Handling incoming webhook request at ${request.originalUrl}.`);
logger.debug(JSON.stringify(computedRequestPayload, null, 2));
const flowId = request.params.flowId;
const flow = await Flow.query().findById(flowId).throwIfNotFound();
const triggerStep = await flow.getTriggerStep();
if (triggerStep.appKey !== 'webhook') {
const connection = await triggerStep.$relatedQuery('connection');
if (!(await connection.verifyWebhook(request))) {
return response.sendStatus(401);
}
}
await handlerSync(flowId, request, response);
response.sendStatus(204);
};

View File

@@ -0,0 +1,86 @@
import isEmpty from 'lodash/isEmpty.js';
import Flow from '../models/flow.js';
import { processTrigger } from '../services/trigger.js';
import { processAction } from '../services/action.js';
import globalVariable from './global-variable.js';
import QuotaExceededError from '../errors/quote-exceeded.js';
export default async (flowId, request, response) => {
const flow = await Flow.query().findById(flowId).throwIfNotFound();
const user = await flow.$relatedQuery('user');
const testRun = !flow.active;
const quotaExceeded = !testRun && !(await user.isAllowedToRunFlows());
if (quotaExceeded) {
throw new QuotaExceededError();
}
const [triggerStep, ...actionSteps] = await flow
.$relatedQuery('steps')
.withGraphFetched('connection')
.orderBy('position', 'asc');
const app = await triggerStep.getApp();
const isWebhookApp = app.key === 'webhook';
if (testRun && !isWebhookApp) {
return response.status(404);
}
const connection = await triggerStep.$relatedQuery('connection');
const $ = await globalVariable({
flow,
connection,
app,
step: triggerStep,
testRun,
request,
});
const triggerCommand = await triggerStep.getTriggerCommand();
await triggerCommand.run($);
const reversedTriggerItems = $.triggerOutput.data.reverse();
// This is the case when we filter out the incoming data
// in the run method of the webhook trigger.
// In this case, we don't want to process anything.
if (isEmpty(reversedTriggerItems)) {
return response.status(204);
}
// set default status, but do not send it yet!
response.status(204);
for (const triggerItem of reversedTriggerItems) {
const { executionId } = await processTrigger({
flowId,
stepId: triggerStep.id,
triggerItem,
testRun,
});
if (testRun) {
// in case of testing, we do not process the whole process.
continue;
}
for (const actionStep of actionSteps) {
const { executionStep: actionExecutionStep } = await processAction({
flowId: flow.id,
stepId: actionStep.id,
executionId,
});
if (actionStep.key === 'respondWith' && !response.headersSent) {
// we send the response only if it's not sent yet. This allows us to early respond from the flow.
response.status(actionExecutionStep.dataOut.statusCode);
response.send(actionExecutionStep.dataOut.body);
}
}
}
return response;
};

View File

@@ -103,6 +103,10 @@ class Step extends Base {
return `/webhooks/connections/${this.connectionId}`; return `/webhooks/connections/${this.connectionId}`;
} }
if (this.parameters.workSynchronously) {
return `/webhooks/flows/${this.flowId}/sync`;
}
return `/webhooks/flows/${this.flowId}`; return `/webhooks/flows/${this.flowId}`;
} }

View File

@@ -3,6 +3,7 @@ import multer from 'multer';
import appConfig from '../config/app.js'; import appConfig from '../config/app.js';
import webhookHandlerByFlowId from '../controllers/webhooks/handler-by-flow-id.js'; import webhookHandlerByFlowId from '../controllers/webhooks/handler-by-flow-id.js';
import webhookHandlerSyncByFlowId from '../controllers/webhooks/handler-sync-by-flow-id.js';
import webhookHandlerByConnectionIdAndRefValue from '../controllers/webhooks/handler-by-connection-id-and-ref-value.js'; import webhookHandlerByConnectionIdAndRefValue from '../controllers/webhooks/handler-by-connection-id-and-ref-value.js';
const router = Router(); const router = Router();
@@ -46,6 +47,7 @@ createRouteHandler(
'/connections/:connectionId', '/connections/:connectionId',
webhookHandlerByConnectionIdAndRefValue webhookHandlerByConnectionIdAndRefValue
); );
createRouteHandler('/flows/:flowId/sync', webhookHandlerSyncByFlowId);
createRouteHandler('/flows/:flowId', webhookHandlerByFlowId); createRouteHandler('/flows/:flowId', webhookHandlerByFlowId);
createRouteHandler('/:flowId', webhookHandlerByFlowId); createRouteHandler('/:flowId', webhookHandlerByFlowId);