diff --git a/packages/backend/src/apps/twilio/triggers/receive-sms/index.ts b/packages/backend/src/apps/twilio/triggers/receive-sms/index.ts index 27124de0..4aaeb9f1 100644 --- a/packages/backend/src/apps/twilio/triggers/receive-sms/index.ts +++ b/packages/backend/src/apps/twilio/triggers/receive-sms/index.ts @@ -34,6 +34,9 @@ export default defineTrigger({ }, ], + useSingletonWebhook: true, + singletonWebhookRefValueParameter: 'phoneNumberSid', + async testRun($) { await fetchMessages($); diff --git a/packages/backend/src/apps/webhook/triggers/catch-raw-webhook/index.ts b/packages/backend/src/apps/webhook/triggers/catch-raw-webhook/index.ts index 9e17b670..00054814 100644 --- a/packages/backend/src/apps/webhook/triggers/catch-raw-webhook/index.ts +++ b/packages/backend/src/apps/webhook/triggers/catch-raw-webhook/index.ts @@ -5,6 +5,7 @@ export default defineTrigger({ name: 'Catch raw webhook', key: 'catchRawWebhook', type: 'webhook', + showWebhookUrl: true, description: 'Triggers when the webhook receives a request.', async testRun($) { diff --git a/packages/backend/src/controllers/webhooks/handler-by-connection-id-and-ref-value.ts b/packages/backend/src/controllers/webhooks/handler-by-connection-id-and-ref-value.ts new file mode 100644 index 00000000..4a98b38d --- /dev/null +++ b/packages/backend/src/controllers/webhooks/handler-by-connection-id-and-ref-value.ts @@ -0,0 +1,40 @@ +import path from 'node:path'; +import { Response } from 'express'; +import { IRequest } from '@automatisch/types'; + +import Connection from '../../models/connection'; +import logger from '../../helpers/logger'; +import handler from '../../helpers/webhook-handler'; + +export default async (request: IRequest, response: Response) => { + const computedRequestPayload = { + headers: request.headers, + body: request.body, + query: request.query, + params: request.params, + }; + logger.debug(`Handling incoming webhook request at ${request.originalUrl}.`); + logger.debug(JSON.stringify(computedRequestPayload, null, 2)); + + const { connectionId } = request.params; + + const connection = await Connection.query() + .findById(connectionId) + .throwIfNotFound(); + + if (!await connection.verifyWebhook(request)) { + return response.sendStatus(401); + } + + const triggerSteps = await connection + .$relatedQuery('triggerSteps') + .where('webhook_path', path.join(request.baseUrl, request.path)); + + if (triggerSteps.length === 0) return response.sendStatus(404); + + for (const triggerStep of triggerSteps) { + await handler(triggerStep.flowId, request, response); + } + + response.sendStatus(204); +}; diff --git a/packages/backend/src/controllers/webhooks/handler-by-flow-id.ts b/packages/backend/src/controllers/webhooks/handler-by-flow-id.ts new file mode 100644 index 00000000..b7b98328 --- /dev/null +++ b/packages/backend/src/controllers/webhooks/handler-by-flow-id.ts @@ -0,0 +1,34 @@ +import { Response } from 'express'; +import { IRequest } from '@automatisch/types'; + +import Flow from '../../models/flow'; +import logger from '../../helpers/logger'; +import handler from '../../helpers/webhook-handler'; + +export default async (request: IRequest, response: Response) => { + const computedRequestPayload = { + headers: request.headers, + body: request.body, + query: request.query, + params: request.params, + }; + + logger.debug(`Handling incoming webhook request at ${request.originalUrl}.`); + logger.debug(JSON.stringify(computedRequestPayload, null, 2)); + + const flowId = request.params.flowId; + const flow = await Flow.query().findById(flowId).throwIfNotFound(); + const triggerStep = await flow.getTriggerStep(); + + if (triggerStep.appKey !== 'webhook') { + const connection = await triggerStep.$relatedQuery('connection'); + + if (!(await connection.verifyWebhook(request))) { + return response.sendStatus(401); + } + } + + await handler(flowId, request, response); + + response.sendStatus(204); +}; diff --git a/packages/backend/src/db/migrations/20230609201228_add_webhook_path_in_step.ts b/packages/backend/src/db/migrations/20230609201228_add_webhook_path_in_step.ts new file mode 100644 index 00000000..ecf264eb --- /dev/null +++ b/packages/backend/src/db/migrations/20230609201228_add_webhook_path_in_step.ts @@ -0,0 +1,13 @@ +import { Knex } from 'knex'; + +export async function up(knex: Knex): Promise { + return knex.schema.table('steps', (table) => { + table.string('webhook_path'); + }); +} + +export async function down(knex: Knex): Promise { + return knex.schema.table('steps', (table) => { + table.dropColumn('webhook_path'); + }); +} diff --git a/packages/backend/src/db/migrations/20230609201909_populate_data_in_webhook_path_in_step.ts b/packages/backend/src/db/migrations/20230609201909_populate_data_in_webhook_path_in_step.ts new file mode 100644 index 00000000..83b414e2 --- /dev/null +++ b/packages/backend/src/db/migrations/20230609201909_populate_data_in_webhook_path_in_step.ts @@ -0,0 +1,16 @@ +import { Knex } from 'knex'; + +export async function up(knex: Knex): Promise { + return await knex('steps') + .where('type', 'trigger') + .whereIn('app_key', ['gitlab', 'typeform', 'twilio', 'flowers-software', 'webhook']) + .update({ + webhook_path: knex.raw('? || ??', ['/webhooks/flows/', knex.ref('flow_id')]), + }); +} + +export async function down(knex: Knex): Promise { + return await knex('steps').update({ + webhook_path: null + }); +} diff --git a/packages/backend/src/graphql/mutations/update-step.ts b/packages/backend/src/graphql/mutations/update-step.ts index 11398287..376f60b8 100644 --- a/packages/backend/src/graphql/mutations/update-step.ts +++ b/packages/backend/src/graphql/mutations/update-step.ts @@ -60,6 +60,8 @@ const updateStep = async ( }) .withGraphFetched('connection'); + await step.updateWebhookUrl(); + return step; }; diff --git a/packages/backend/src/graphql/schema.graphql b/packages/backend/src/graphql/schema.graphql index 0e7a24ea..b9203bff 100644 --- a/packages/backend/src/graphql/schema.graphql +++ b/packages/backend/src/graphql/schema.graphql @@ -82,6 +82,7 @@ type Trigger { name: String key: String description: String + showWebhookUrl: Boolean pollInterval: Int type: String substeps: [Substep] diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 87c2340c..0130515b 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -3,7 +3,6 @@ import Connection from '../models/connection'; import Flow from '../models/flow'; import Step from '../models/step'; import Execution from '../models/execution'; -import appConfig from '../config/app'; import { IJSONObject, IApp, @@ -17,7 +16,7 @@ import AlreadyProcessedError from '../errors/already-processed'; type GlobalVariableOptions = { connection?: Connection; - app: IApp; + app?: IApp; flow?: Flow; step?: Step; execution?: Execution; @@ -117,32 +116,36 @@ const globalVariable = async ( $.request = request; } - $.http = createHttpClient({ - $, - baseURL: app.apiBaseUrl, - beforeRequest: app.beforeRequest, - }); - - if (flow) { - const webhookUrl = appConfig.webhookUrl + '/webhooks/' + flow.id; - - $.webhookUrl = webhookUrl; + if (app) { + $.http = createHttpClient({ + $, + baseURL: app.apiBaseUrl, + beforeRequest: app.beforeRequest, + }); } - if (isTrigger && (await step.getTriggerCommand()).type === 'webhook') { - $.flow.setRemoteWebhookId = async (remoteWebhookId) => { - await flow.$query().patchAndFetch({ - remoteWebhookId, - }); + if (step) { + $.webhookUrl = await step.getWebhookUrl(); + } - $.flow.remoteWebhookId = remoteWebhookId; - }; + if (isTrigger) { + const triggerCommand = await step.getTriggerCommand(); - $.flow.remoteWebhookId = flow.remoteWebhookId; + if (triggerCommand.type === 'webhook') { + $.flow.setRemoteWebhookId = async (remoteWebhookId) => { + await flow.$query().patchAndFetch({ + remoteWebhookId, + }); + + $.flow.remoteWebhookId = remoteWebhookId; + }; + + $.flow.remoteWebhookId = flow.remoteWebhookId; + } } const lastInternalIds = - testRun || (flow && step.isAction) ? [] : await flow?.lastInternalIds(2000); + testRun || (flow && step?.isAction) ? [] : await flow?.lastInternalIds(2000); const isAlreadyProcessed = (internalId: string) => { return lastInternalIds?.includes(internalId); diff --git a/packages/backend/src/controllers/webhooks/handler.ts b/packages/backend/src/helpers/webhook-handler.ts similarity index 56% rename from packages/backend/src/controllers/webhooks/handler.ts rename to packages/backend/src/helpers/webhook-handler.ts index 4fb22695..050c9d1d 100644 --- a/packages/backend/src/controllers/webhooks/handler.ts +++ b/packages/backend/src/helpers/webhook-handler.ts @@ -2,28 +2,23 @@ import Crypto from 'node:crypto'; import { Response } from 'express'; import { IRequest, ITriggerItem } from '@automatisch/types'; -import logger from '../../helpers/logger'; -import Flow from '../../models/flow'; -import { processTrigger } from '../../services/trigger'; -import actionQueue from '../../queues/action'; -import globalVariable from '../../helpers/global-variable'; -import QuotaExceededError from '../../errors/quote-exceeded'; +import Flow from '../models/flow'; +import { processTrigger } from '../services/trigger'; +import actionQueue from '../queues/action'; +import globalVariable from './global-variable'; +import QuotaExceededError from '../errors/quote-exceeded'; import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS, -} from '../../helpers/remove-job-configuration'; - -export default async (request: IRequest, response: Response) => { - const flowId = request.params.flowId; +} from './remove-job-configuration'; +export default async (flowId: string, request: IRequest, response: Response) => { // in case it's our built-in generic webhook trigger let computedRequestPayload = { headers: request.headers, body: request.body, query: request.query, }; - logger.debug(`Handling incoming webhook request at ${request.originalUrl}.`); - logger.debug(JSON.stringify(computedRequestPayload, null, 2)); const flow = await Flow.query() .findById(flowId) @@ -39,32 +34,11 @@ export default async (request: IRequest, response: Response) => { } const triggerStep = await flow.getTriggerStep(); - const triggerCommand = await triggerStep.getTriggerCommand(); const app = await triggerStep.getApp(); const isWebhookApp = app.key === 'webhook'; - if (testRun && !isWebhookApp) { - return response.sendStatus(404); - } - - if (triggerCommand.type !== 'webhook') { - return response.sendStatus(404); - } - - if (app.auth?.verifyWebhook) { - const $ = await globalVariable({ - flow, - connection: await triggerStep.$relatedQuery('connection'), - app, - step: triggerStep, - request, - }); - - const verified = await app.auth.verifyWebhook($); - - if (!verified) { - return response.sendStatus(401); - } + if ((testRun && !isWebhookApp)) { + return response.status(404); } // in case trigger type is 'webhook' @@ -87,7 +61,7 @@ export default async (request: IRequest, response: Response) => { }); if (testRun) { - return response.sendStatus(204); + return response.status(204); } const nextStep = await triggerStep.getNextStep(); @@ -106,5 +80,5 @@ export default async (request: IRequest, response: Response) => { await actionQueue.add(jobName, jobPayload, jobOptions); - return response.sendStatus(204); + return response.status(204); }; diff --git a/packages/backend/src/models/connection.ts b/packages/backend/src/models/connection.ts index 4b215ca4..105d8d4b 100644 --- a/packages/backend/src/models/connection.ts +++ b/packages/backend/src/models/connection.ts @@ -1,12 +1,16 @@ import { QueryContext, ModelOptions } from 'objection'; import type { RelationMappings } from 'objection'; import { AES, enc } from 'crypto-js'; +import { IRequest } from '@automatisch/types'; +import App from './app'; import Base from './base'; import User from './user'; import Step from './step'; +import ExtendedQueryBuilder from './query-builder'; import appConfig from '../config/app'; import { IJSONObject } from '@automatisch/types'; import Telemetry from '../helpers/telemetry'; +import globalVariable from '../helpers/global-variable'; class Connection extends Base { id!: string; @@ -18,6 +22,9 @@ class Connection extends Base { draft: boolean; count?: number; flowCount?: number; + user?: User; + steps?: Step[]; + triggerSteps?: Step[]; static tableName = 'connections'; @@ -53,6 +60,17 @@ class Connection extends Base { to: 'steps.connection_id', }, }, + triggerSteps: { + relation: Base.HasManyRelation, + modelClass: Step, + join: { + from: 'connections.id', + to: 'steps.connection_id', + }, + filter(builder: ExtendedQueryBuilder) { + builder.where('type', '=', 'trigger'); + }, + }, }); encryptData(): void { @@ -110,6 +128,27 @@ class Connection extends Base { await super.$afterUpdate(opt, queryContext); Telemetry.connectionUpdated(this); } + + async getApp() { + if (!this.key) return null; + + return await App.findOneByKey(this.key); + } + + async verifyWebhook(request: IRequest) { + if (!this.key) return true; + + const app = await this.getApp(); + + const $ = await globalVariable({ + connection: this, + request, + }); + + if (!app.auth?.verifyWebhook) return true; + + return app.auth.verifyWebhook($); + } } export default Connection; diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index e00beccd..20b48c73 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -18,6 +18,7 @@ class Flow extends Base { active: boolean; status: 'paused' | 'published' | 'draft'; steps: Step[]; + triggerStep: Step; published_at: string; remoteWebhookId: string; executions?: Execution[]; @@ -51,6 +52,20 @@ class Flow extends Base { builder.orderBy('position', 'asc'); }, }, + triggerStep: { + relation: Base.HasOneRelation, + modelClass: Step, + join: { + from: 'flows.id', + to: 'steps.flow_id', + }, + filter(builder: ExtendedQueryBuilder) { + builder + .where('type', 'trigger') + .limit(1) + .first(); + }, + }, executions: { relation: Base.HasManyRelation, modelClass: Execution, diff --git a/packages/backend/src/models/step.ts b/packages/backend/src/models/step.ts index b46853ba..2f086898 100644 --- a/packages/backend/src/models/step.ts +++ b/packages/backend/src/models/step.ts @@ -1,5 +1,6 @@ import { URL } from 'node:url'; import { QueryContext, ModelOptions } from 'objection'; +import get from 'lodash.get'; import type { IJSONObject, IStep } from '@automatisch/types'; import Base from './base'; import App from './app'; @@ -22,6 +23,7 @@ class Step extends Base { connection?: Connection; flow: Flow; executionSteps: ExecutionStep[]; + webhookPath?: string; static tableName = 'steps'; @@ -43,6 +45,7 @@ class Step extends Base { }, position: { type: 'integer' }, parameters: { type: 'object' }, + webhookPath: { type: ['string', 'null'] }, }, }; @@ -77,17 +80,52 @@ class Step extends Base { }, }); + get webhookUrl() { + return new URL(this.webhookPath, appConfig.webhookUrl).toString(); + } + get iconUrl() { if (!this.appKey) return null; return `${appConfig.baseUrl}/apps/${this.appKey}/assets/favicon.svg`; } - get webhookUrl() { - if (this.appKey !== 'webhook') return null; + async computeWebhookPath() { + if (this.type === 'action') return null; - const url = new URL(`/webhooks/${this.flowId}`, appConfig.webhookUrl); - return url.toString(); + const triggerCommand = await this.getTriggerCommand(); + + if (!triggerCommand) return null; + + const { + useSingletonWebhook, + singletonWebhookRefValueParameter, + type, + } = triggerCommand; + + const isWebhook = type === 'webhook'; + + if (!isWebhook) return null; + + if (singletonWebhookRefValueParameter) { + const parameterValue = get(this.parameters, singletonWebhookRefValueParameter); + return `/webhooks/connections/${this.connectionId}/${parameterValue}`; + } + + if (useSingletonWebhook) { + return `/webhooks/connections/${this.connectionId}`; + } + + return `/webhooks/flows/${this.flowId}`; + } + + async getWebhookUrl() { + if (this.type === 'action') return; + + const path = await this.computeWebhookPath(); + const webhookUrl = new URL(path, appConfig.webhookUrl).toString(); + + return webhookUrl; } async $afterInsert(queryContext: QueryContext) { @@ -166,6 +204,18 @@ class Step extends Base { return existingArguments; } + + async updateWebhookUrl() { + if (this.isAction) return this; + + const payload = { + webhookPath: await this.computeWebhookPath(), + }; + + await this.$query().patchAndFetch(payload); + + return this; + } } export default Step; diff --git a/packages/backend/src/routes/webhooks.ts b/packages/backend/src/routes/webhooks.ts index 31e51297..fc24ca2a 100644 --- a/packages/backend/src/routes/webhooks.ts +++ b/packages/backend/src/routes/webhooks.ts @@ -3,7 +3,8 @@ import multer from 'multer'; import { IRequest } from '@automatisch/types'; import appConfig from '../config/app'; -import webhookHandler from '../controllers/webhooks/handler'; +import webhookHandlerByFlowId from '../controllers/webhooks/handler-by-flow-id'; +import webhookHandlerByConnectionIdAndRefValue from '../controllers/webhooks/handler-by-connection-id-and-ref-value'; const router = Router(); const upload = multer(); @@ -25,9 +26,20 @@ const exposeError = (handler: RequestHandler) => async (req: IRequest, res: Resp } } -router.get('/:flowId', exposeError(webhookHandler)); -router.put('/:flowId', exposeError(webhookHandler)); -router.patch('/:flowId', exposeError(webhookHandler)); -router.post('/:flowId', exposeError(webhookHandler)); +function createRouteHandler(path: string, handler: (req: IRequest, res: Response, next: NextFunction) => void) { + const wrappedHandler = exposeError(handler); + + router + .route(path) + .get(wrappedHandler) + .put(wrappedHandler) + .patch(wrappedHandler) + .post(wrappedHandler); +}; + +createRouteHandler('/connections/:connectionId/:refValue', webhookHandlerByConnectionIdAndRefValue); +createRouteHandler('/connections/:connectionId', webhookHandlerByConnectionIdAndRefValue); +createRouteHandler('/flows/:flowId', webhookHandlerByFlowId); +createRouteHandler('/:flowId', webhookHandlerByFlowId); export default router; diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index b1e90e97..b1ae48ee 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -60,7 +60,7 @@ export interface IStep { key?: string; appKey?: string; iconUrl: string; - webhookUrl: string; + webhookUrl?: string; type: 'action' | 'trigger'; connectionId?: string; status: string; @@ -241,14 +241,16 @@ export interface IBaseTrigger { name: string; key: string; type?: 'webhook' | 'polling'; + showWebhookUrl?: boolean; pollInterval?: number; description: string; + useSingletonWebhook?: boolean; + singletonWebhookRefValueParameter?: string; getInterval?(parameters: IStep['parameters']): string; run?($: IGlobalVariable): Promise; testRun?($: IGlobalVariable): Promise; registerHook?($: IGlobalVariable): Promise; unregisterHook?($: IGlobalVariable): Promise; - sort?(item: ITriggerItem, nextItem: ITriggerItem): number; } export interface IRawTrigger extends IBaseTrigger { @@ -306,7 +308,7 @@ export type IGlobalVariable = { set: (args: IJSONObject) => Promise; data: IJSONObject; }; - app: IApp; + app?: IApp; http?: IHttpClient; request?: IRequest; flow?: { @@ -333,6 +335,7 @@ export type IGlobalVariable = { }; getLastExecutionStep?: () => Promise; webhookUrl?: string; + singletonWebhookUrl?: string; triggerOutput?: ITriggerOutput; actionOutput?: IActionOutput; pushTriggerItem?: (triggerItem: ITriggerItem) => void; diff --git a/packages/web/src/components/FlowStep/index.tsx b/packages/web/src/components/FlowStep/index.tsx index 9c251ffb..d971b042 100644 --- a/packages/web/src/components/FlowStep/index.tsx +++ b/packages/web/src/components/FlowStep/index.tsx @@ -166,12 +166,8 @@ export default function FlowStep( const actionsOrTriggers: Array = (isTrigger ? app?.triggers : app?.actions) || []; - const substeps = React.useMemo( - () => - actionsOrTriggers?.find(({ key }: ITrigger | IAction) => key === step.key) - ?.substeps || [], - [actionsOrTriggers, step?.key] - ); + const actionOrTrigger = actionsOrTriggers?.find(({ key }) => key === step.key); + const substeps = actionOrTrigger?.substeps || []; const handleChange = React.useCallback(({ step }: { step: IStep }) => { onChange(step); @@ -283,7 +279,7 @@ export default function FlowStep( step={step} /> - {substeps?.length > 0 && + {actionOrTrigger && substeps?.length > 0 && substeps.map((substep: ISubstep, index: number) => ( {substep.key === 'chooseConnection' && app && ( @@ -308,6 +304,7 @@ export default function FlowStep( onSubmit={expandNextStep} onChange={handleChange} onContinue={onContinue} + showWebhookUrl={'showWebhookUrl' in actionOrTrigger ? actionOrTrigger.showWebhookUrl : false} step={step} /> )} diff --git a/packages/web/src/components/TestSubstep/index.tsx b/packages/web/src/components/TestSubstep/index.tsx index c3f5b53a..694eb332 100644 --- a/packages/web/src/components/TestSubstep/index.tsx +++ b/packages/web/src/components/TestSubstep/index.tsx @@ -18,6 +18,7 @@ import type { IStep, ISubstep } from '@automatisch/types'; type TestSubstepProps = { substep: ISubstep; expanded?: boolean; + showWebhookUrl?: boolean; onExpand: () => void; onCollapse: () => void; onChange?: ({ step }: { step: IStep }) => void; @@ -52,6 +53,7 @@ function TestSubstep(props: TestSubstepProps): React.ReactElement { onSubmit, onContinue, step, + showWebhookUrl = false, } = props; const formatMessage = useFormatMessage(); @@ -119,7 +121,7 @@ function TestSubstep(props: TestSubstepProps): React.ReactElement { )} - {step.webhookUrl && ( + {step.webhookUrl && showWebhookUrl && ( )} diff --git a/packages/web/src/graphql/queries/get-app.ts b/packages/web/src/graphql/queries/get-app.ts index f9009c75..5b929571 100644 --- a/packages/web/src/graphql/queries/get-app.ts +++ b/packages/web/src/graphql/queries/get-app.ts @@ -60,6 +60,7 @@ export const GET_APP = gql` name key type + showWebhookUrl pollInterval description substeps { diff --git a/packages/web/src/graphql/queries/get-apps.ts b/packages/web/src/graphql/queries/get-apps.ts index f358ae7d..7c212efd 100644 --- a/packages/web/src/graphql/queries/get-apps.ts +++ b/packages/web/src/graphql/queries/get-apps.ts @@ -67,6 +67,7 @@ export const GET_APPS = gql` name key type + showWebhookUrl pollInterval description substeps {