diff --git a/packages/backend/.env-example b/packages/backend/.env-example index 53505520..924bd16c 100644 --- a/packages/backend/.env-example +++ b/packages/backend/.env-example @@ -2,6 +2,7 @@ HOST=localhost PROTOCOL=http PORT=3000 WEB_APP_URL=http://localhost:3001 +WEBHOOK_URL=http://localhost:3000 APP_ENV=development POSTGRES_DATABASE=automatisch_development POSTGRES_PORT=5432 diff --git a/packages/backend/src/app.ts b/packages/backend/src/app.ts index e078652a..7eec9c96 100644 --- a/packages/backend/src/app.ts +++ b/packages/backend/src/app.ts @@ -2,7 +2,6 @@ import createError from 'http-errors'; import express, { Request, Response, NextFunction } from 'express'; import cors from 'cors'; import corsOptions from './config/cors-options'; -import graphQLInstance from './helpers/graphql-instance'; import morgan from './helpers/morgan'; import appAssetsHandler from './helpers/app-assets-handler'; import webUIHandler from './helpers/web-ui-handler'; @@ -13,6 +12,8 @@ import { serverAdapter, } from './helpers/create-bull-board-handler'; import injectBullBoardHandler from './helpers/inject-bull-board-handler'; +import router from './routes'; +import { IRequest } from '@automatisch/types'; createBullBoardHandler(serverAdapter); @@ -23,10 +24,16 @@ injectBullBoardHandler(app, serverAdapter); appAssetsHandler(app); app.use(morgan); -app.use(express.json()); +app.use( + express.json({ + verify: (req, res, buf) => { + (req as IRequest).rawBody = buf; + }, + }) +); app.use(express.urlencoded({ extended: false })); app.use(cors(corsOptions)); -app.use('/graphql', graphQLInstance); +app.use('/', router); webUIHandler(app); diff --git a/packages/backend/src/apps/typeform/auth/index.ts b/packages/backend/src/apps/typeform/auth/index.ts index 4bc4dc66..0aa070f2 100644 --- a/packages/backend/src/apps/typeform/auth/index.ts +++ b/packages/backend/src/apps/typeform/auth/index.ts @@ -2,6 +2,7 @@ import generateAuthUrl from './generate-auth-url'; import verifyCredentials from './verify-credentials'; import isStillVerified from './is-still-verified'; import refreshToken from './refresh-token'; +import verifyWebhook from './verify-webhook'; export default { fields: [ @@ -45,4 +46,5 @@ export default { verifyCredentials, isStillVerified, refreshToken, + verifyWebhook, }; diff --git a/packages/backend/src/apps/typeform/auth/verify-webhook.ts b/packages/backend/src/apps/typeform/auth/verify-webhook.ts new file mode 100644 index 00000000..f5ba0d46 --- /dev/null +++ b/packages/backend/src/apps/typeform/auth/verify-webhook.ts @@ -0,0 +1,20 @@ +import crypto from 'crypto'; +import { IGlobalVariable } from '@automatisch/types'; +import appConfig from '../../../config/app'; + +const verifyWebhook = async ($: IGlobalVariable) => { + const signature = $.request.headers['typeform-signature'] as string; + const isValid = verifySignature(signature, $.request.rawBody.toString()); + + return isValid; +}; + +const verifySignature = function (receivedSignature: string, payload: string) { + const hash = crypto + .createHmac('sha256', appConfig.appSecretKey) + .update(payload) + .digest('base64'); + return receivedSignature === `sha256=${hash}`; +}; + +export default verifyWebhook; diff --git a/packages/backend/src/apps/typeform/triggers/new-entry/index.ts b/packages/backend/src/apps/typeform/triggers/new-entry/index.ts index 1d4f167f..0c46a7d7 100644 --- a/packages/backend/src/apps/typeform/triggers/new-entry/index.ts +++ b/packages/backend/src/apps/typeform/triggers/new-entry/index.ts @@ -1,14 +1,16 @@ +import { IJSONObject } from '@automatisch/types'; +import appConfig from '../../../../config/app'; import defineTrigger from '../../../../helpers/define-trigger'; export default defineTrigger({ name: 'New entry', key: 'newEntry', - pollInterval: 15, + type: 'webhook', description: 'Triggers when a new form submitted.', arguments: [ { label: 'Form', - key: 'form', + key: 'formId', type: 'dropdown' as const, required: true, description: 'Pick a form to receive submissions.', @@ -26,7 +28,81 @@ export default defineTrigger({ }, ], - async run($) { - // await getUserTweets($, { currentUser: true }); + async testRun($) { + const createApiResponse = await $.http.get( + `/forms/${$.step.parameters.formId}` + ); + + const responsesApiResponse = await $.http.get( + `/forms/${$.step.parameters.formId}/responses` + ); + + const lastResponse = responsesApiResponse.data.items[0]; + + const computedResponseItem = { + event_type: 'form_response', + form_response: { + form_id: $.step.parameters.formId, + token: lastResponse.token, + landed_at: lastResponse.landed_at, + submitted_at: lastResponse.submitted_at, + definion: { + id: $.step.parameters.formId, + title: createApiResponse.data.title, + fields: createApiResponse.data?.fields?.map((field: IJSONObject) => ({ + id: field.id, + ref: field.ref, + type: field.type, + title: field.title, + properties: {}, + choices: ( + (field?.properties as IJSONObject)?.choices as IJSONObject[] + )?.map((choice) => ({ + id: choice.id, + label: choice.label, + })), + })), + }, + answers: lastResponse.answers?.map((answer: IJSONObject) => ({ + type: answer.type, + choice: { + label: (answer?.choice as IJSONObject)?.label, + }, + field: { + id: (answer.field as IJSONObject).id, + ref: (answer.field as IJSONObject).ref, + type: (answer.field as IJSONObject).type, + }, + })), + }, + }; + + const dataItem = { + raw: computedResponseItem, + meta: { + internalId: computedResponseItem.form_response.token, + }, + }; + + $.pushTriggerItem(dataItem); + }, + + async registerHook($) { + const subscriptionPayload = { + enabled: true, + url: $.webhookUrl, + secret: appConfig.appSecretKey, + }; + + await $.http.put( + `/forms/${$.step.parameters.formId}/webhooks/${$.flow.id}`, + subscriptionPayload + ); + }, + + async unregisterHook($) { + await $.http.delete( + `/forms/${$.step.parameters.formId}/webhooks/${$.flow.id}` + ); }, }); diff --git a/packages/backend/src/config/app.ts b/packages/backend/src/config/app.ts index 6d41956b..7403421e 100644 --- a/packages/backend/src/config/app.ts +++ b/packages/backend/src/config/app.ts @@ -6,6 +6,7 @@ type AppConfig = { protocol: string; port: string; webAppUrl: string; + webhookUrl: string; appEnv: string; isDev: boolean; postgresDatabase: string; @@ -37,6 +38,8 @@ const serveWebAppSeparately = process.env.SERVE_WEB_APP_SEPARATELY === 'true' ? true : false; let webAppUrl = `${protocol}://${host}:${port}`; +const webhookUrl = process.env.WEBHOOK_URL || webAppUrl; + if (serveWebAppSeparately) { webAppUrl = process.env.WEB_APP_URL || 'http://localhost:3001'; } @@ -73,6 +76,7 @@ const appConfig: AppConfig = { bullMQDashboardPassword: process.env.BULLMQ_DASHBOARD_PASSWORD, baseUrl, webAppUrl, + webhookUrl, telemetryEnabled: process.env.TELEMETRY_ENABLED === 'false' ? false : true, }; diff --git a/packages/backend/src/controllers/webhooks/create.ts b/packages/backend/src/controllers/webhooks/create.ts new file mode 100644 index 00000000..a56f73e8 --- /dev/null +++ b/packages/backend/src/controllers/webhooks/create.ts @@ -0,0 +1,55 @@ +import { Request, Response } from 'express'; + +import { ITriggerItem } from '@automatisch/types'; +import Flow from '../../models/flow'; +import triggerQueue from '../../queues/trigger'; +import globalVariable from '../../helpers/global-variable'; + +export default async (request: Request, response: Response) => { + const flow = await Flow.query() + .findById(request.params.flowId) + .throwIfNotFound(); + + if (!flow.active) { + return response.send(404); + } + + const triggerStep = await flow.getTriggerStep(); + const app = await triggerStep.getApp(); + + if (app.auth.verifyWebhook) { + const $ = await globalVariable({ + flow, + connection: await triggerStep.$relatedQuery('connection'), + app, + step: triggerStep, + testRun: false, + request, + }); + + const verified = await app.auth.verifyWebhook($); + + if (!verified) { + return response.sendStatus(401); + } + } + + const triggerItem: ITriggerItem = { + raw: request.body, + meta: { + internalId: request.body.form_response.token, + }, + }; + + const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`; + + const jobPayload = { + flowId: flow.id, + stepId: triggerStep.id, + triggerItem, + }; + + await triggerQueue.add(jobName, jobPayload); + + return response.sendStatus(200); +}; diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 10e2f9f5..ccc6ac3d 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -1,6 +1,7 @@ import Context from '../../types/express/context'; import flowQueue from '../../queues/flow'; import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../../helpers/remove-job-configuration'; +import globalVariable from '../../helpers/global-variable'; type Params = { input: { @@ -50,12 +51,22 @@ const updateFlowStatus = async ( jobName, { flowId: flow.id }, { - repeat: repeatOptions, + repeat: trigger.type === 'webhook' ? null : repeatOptions, jobId: flow.id, removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS } ); + } else if (!flow.active && trigger.type === 'webhook') { + const $ = await globalVariable({ + flow, + connection: await triggerStep.$relatedQuery('connection'), + app: await triggerStep.getApp(), + step: triggerStep, + testRun: false, + }); + + await trigger.unregisterHook($); } else { const repeatableJobs = await flowQueue.getRepeatableJobs(); const job = repeatableJobs.find((job) => job.id === flow.id); diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 9fbb4446..de9a09e8 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -3,12 +3,14 @@ import Connection from '../models/connection'; import Flow from '../models/flow'; import Step from '../models/step'; import Execution from '../models/execution'; +import appConfig from '../config/app'; import { IJSONObject, IApp, IGlobalVariable, ITriggerItem, IActionItem, + IRequest, } from '@automatisch/types'; import EarlyExitError from '../errors/early-exit'; @@ -19,12 +21,21 @@ type GlobalVariableOptions = { step?: Step; execution?: Execution; testRun?: boolean; + request?: IRequest; }; const globalVariable = async ( options: GlobalVariableOptions ): Promise => { - const { connection, app, flow, step, execution, testRun = false } = options; + const { + connection, + app, + flow, + step, + execution, + request, + testRun = false, + } = options; const lastInternalId = testRun ? undefined : await flow?.lastInternalId(); const nextStep = await step?.getNextStep(); @@ -95,12 +106,22 @@ const globalVariable = async ( }, }; + if (request) { + $.request = request; + } + $.http = createHttpClient({ $, baseURL: app.apiBaseUrl, beforeRequest: app.beforeRequest, }); + if (flow) { + const webhookUrl = appConfig.webhookUrl + '/webhooks/' + flow.id; + + $.webhookUrl = webhookUrl; + } + const lastInternalIds = testRun || (flow && step.isAction) ? [] : await flow?.lastInternalIds(2000); diff --git a/packages/backend/src/routes/index.ts b/packages/backend/src/routes/index.ts new file mode 100644 index 00000000..1cb26316 --- /dev/null +++ b/packages/backend/src/routes/index.ts @@ -0,0 +1,10 @@ +import { Router } from 'express'; +import graphQLInstance from '../helpers/graphql-instance'; +import webhooksRouter from './webhooks'; + +const router = Router(); + +router.use('/graphql', graphQLInstance); +router.use('/webhooks', webhooksRouter); + +export default router; diff --git a/packages/backend/src/routes/webhooks.ts b/packages/backend/src/routes/webhooks.ts new file mode 100644 index 00000000..c08eced3 --- /dev/null +++ b/packages/backend/src/routes/webhooks.ts @@ -0,0 +1,8 @@ +import createAction from '../controllers/webhooks/create'; +import { Router } from 'express'; + +const router = Router(); + +router.post('/:flowId', createAction); + +export default router; diff --git a/packages/backend/src/services/flow.ts b/packages/backend/src/services/flow.ts index f6a256fe..d76d9be9 100644 --- a/packages/backend/src/services/flow.ts +++ b/packages/backend/src/services/flow.ts @@ -23,7 +23,13 @@ export const processFlow = async (options: ProcessFlowOptions) => { }); try { - await triggerCommand.run($); + if (triggerCommand.type === 'webhook' && !flow.active) { + await triggerCommand.testRun($); + } else if (triggerCommand.type === 'webhook' && flow.active) { + await triggerCommand.registerHook($); + } else { + await triggerCommand.run($); + } } catch (error) { if (error instanceof EarlyExitError === false) { if (error instanceof HttpError) { diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index df2521dd..0b2fb1f8 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -1,5 +1,6 @@ import type { AxiosInstance, AxiosRequestConfig } from 'axios'; export type IHttpClient = AxiosInstance; +import type { Request } from 'express'; // Type definitions for automatisch @@ -182,6 +183,7 @@ export interface IAuth { verifyCredentials($: IGlobalVariable): Promise; isStillVerified($: IGlobalVariable): Promise; refreshToken?($: IGlobalVariable): Promise; + verifyWebhook?($: IGlobalVariable): Promise; isRefreshTokenRequested?: boolean; fields: IField[]; authenticationSteps?: IAuthenticationStep[]; @@ -210,10 +212,14 @@ export interface ITriggerItem { export interface IBaseTrigger { name: string; key: string; + type?: 'webhook' | 'polling'; pollInterval?: number; description: string; getInterval?(parameters: IStep['parameters']): string; - run($: IGlobalVariable): Promise; + run?($: IGlobalVariable): Promise; + testRun?($: IGlobalVariable): Promise; + registerHook?($: IGlobalVariable): Promise; + unregisterHook?($: IGlobalVariable): Promise; sort?(item: ITriggerItem, nextItem: ITriggerItem): number; } @@ -238,7 +244,7 @@ export interface IBaseAction { name: string; key: string; description: string; - run($: IGlobalVariable): Promise; + run?($: IGlobalVariable): Promise; } export interface IRawAction extends IBaseAction { @@ -274,6 +280,7 @@ export type IGlobalVariable = { }; app: IApp; http?: IHttpClient; + request?: IRequest; flow?: { id: string; lastInternalId: string; @@ -293,6 +300,7 @@ export type IGlobalVariable = { id: string; testRun: boolean; }; + webhookUrl?: string; triggerOutput?: ITriggerOutput; actionOutput?: IActionOutput; pushTriggerItem?: (triggerItem: ITriggerItem) => void; @@ -308,3 +316,8 @@ declare module 'axios' { additionalProperties?: Record; } } + +export interface IRequest extends Request { + rawBody?: Buffer; +} +