feat: introduce singleton webhook URL

This commit is contained in:
Ali BARIN
2023-06-07 22:29:40 +00:00
parent 92638c2e97
commit de7a35dfe9
19 changed files with 285 additions and 78 deletions

View File

@@ -34,6 +34,9 @@ export default defineTrigger({
}, },
], ],
useSingletonWebhook: true,
singletonWebhookRefValueParameter: 'phoneNumberSid',
async testRun($) { async testRun($) {
await fetchMessages($); await fetchMessages($);

View File

@@ -5,6 +5,7 @@ export default defineTrigger({
name: 'Catch raw webhook', name: 'Catch raw webhook',
key: 'catchRawWebhook', key: 'catchRawWebhook',
type: 'webhook', type: 'webhook',
showWebhookUrl: true,
description: 'Triggers when the webhook receives a request.', description: 'Triggers when the webhook receives a request.',
async testRun($) { async testRun($) {

View File

@@ -0,0 +1,40 @@
import path from 'node:path';
import { Response } from 'express';
import { IRequest } from '@automatisch/types';
import Connection from '../../models/connection';
import logger from '../../helpers/logger';
import handler from '../../helpers/webhook-handler';
export default async (request: IRequest, response: Response) => {
const computedRequestPayload = {
headers: request.headers,
body: request.body,
query: request.query,
params: request.params,
};
logger.debug(`Handling incoming webhook request at ${request.originalUrl}.`);
logger.debug(JSON.stringify(computedRequestPayload, null, 2));
const { connectionId } = request.params;
const connection = await Connection.query()
.findById(connectionId)
.throwIfNotFound();
if (!await connection.verifyWebhook(request)) {
return response.sendStatus(401);
}
const triggerSteps = await connection
.$relatedQuery('triggerSteps')
.where('webhook_path', path.join(request.baseUrl, request.path));
if (triggerSteps.length === 0) return response.sendStatus(404);
for (const triggerStep of triggerSteps) {
await handler(triggerStep.flowId, request, response);
}
response.sendStatus(204);
};

View File

@@ -0,0 +1,34 @@
import path from 'node:path';
import { Response } from 'express';
import { IRequest } from '@automatisch/types';
import Step from '../../models/step';
import logger from '../../helpers/logger';
import handler from '../../helpers/webhook-handler';
export default async (request: IRequest, response: Response) => {
const computedRequestPayload = {
headers: request.headers,
body: request.body,
query: request.query,
params: request.params,
};
logger.debug(`Handling incoming webhook request at ${request.originalUrl}.`);
logger.debug(JSON.stringify(computedRequestPayload, null, 2));
const flowId = request.params.flowId;
const triggerStep = await Step.query()
.findOne({
webhook_path: path.join(request.baseUrl, request.path),
})
.throwIfNotFound();
const connection = await triggerStep.$relatedQuery('connection');
if (!await connection.verifyWebhook(request)) {
return response.sendStatus(401);
}
await handler(flowId, request, response);
response.sendStatus(204);
};

View File

@@ -0,0 +1,13 @@
import { Knex } from 'knex';
export async function up(knex: Knex): Promise<void> {
return knex.schema.table('steps', (table) => {
table.string('webhook_path');
});
}
export async function down(knex: Knex): Promise<void> {
return knex.schema.table('steps', (table) => {
table.dropColumn('webhook_path');
});
}

View File

@@ -0,0 +1,16 @@
import { Knex } from 'knex';
export async function up(knex: Knex): Promise<void> {
return await knex('steps')
.where('type', 'trigger')
.whereIn('app_key', ['gitlab', 'typeform', 'twilio', 'flowers-software', 'webhook'])
.update({
webhook_path: knex.raw('? || ??', ['/webhooks/flows/', knex.ref('flow_id')]),
});
}
export async function down(knex: Knex): Promise<void> {
return await knex('steps').update({
webhook_path: null
});
}

View File

@@ -60,6 +60,8 @@ const updateStep = async (
}) })
.withGraphFetched('connection'); .withGraphFetched('connection');
await step.updateWebhookUrl();
return step; return step;
}; };

View File

@@ -82,6 +82,7 @@ type Trigger {
name: String name: String
key: String key: String
description: String description: String
showWebhookUrl: Boolean
pollInterval: Int pollInterval: Int
type: String type: String
substeps: [Substep] substeps: [Substep]

View File

@@ -3,7 +3,6 @@ import Connection from '../models/connection';
import Flow from '../models/flow'; import Flow from '../models/flow';
import Step from '../models/step'; import Step from '../models/step';
import Execution from '../models/execution'; import Execution from '../models/execution';
import appConfig from '../config/app';
import { import {
IJSONObject, IJSONObject,
IApp, IApp,
@@ -17,7 +16,7 @@ import AlreadyProcessedError from '../errors/already-processed';
type GlobalVariableOptions = { type GlobalVariableOptions = {
connection?: Connection; connection?: Connection;
app: IApp; app?: IApp;
flow?: Flow; flow?: Flow;
step?: Step; step?: Step;
execution?: Execution; execution?: Execution;
@@ -117,32 +116,36 @@ const globalVariable = async (
$.request = request; $.request = request;
} }
$.http = createHttpClient({ if (app) {
$, $.http = createHttpClient({
baseURL: app.apiBaseUrl, $,
beforeRequest: app.beforeRequest, baseURL: app.apiBaseUrl,
}); beforeRequest: app.beforeRequest,
});
if (flow) {
const webhookUrl = appConfig.webhookUrl + '/webhooks/' + flow.id;
$.webhookUrl = webhookUrl;
} }
if (isTrigger && (await step.getTriggerCommand()).type === 'webhook') { if (step) {
$.flow.setRemoteWebhookId = async (remoteWebhookId) => { $.webhookUrl = await step.getWebhookUrl();
await flow.$query().patchAndFetch({ }
remoteWebhookId,
});
$.flow.remoteWebhookId = remoteWebhookId; if (isTrigger) {
}; const triggerCommand = await step.getTriggerCommand();
$.flow.remoteWebhookId = flow.remoteWebhookId; if (triggerCommand.type === 'webhook') {
$.flow.setRemoteWebhookId = async (remoteWebhookId) => {
await flow.$query().patchAndFetch({
remoteWebhookId,
});
$.flow.remoteWebhookId = remoteWebhookId;
};
$.flow.remoteWebhookId = flow.remoteWebhookId;
}
} }
const lastInternalIds = const lastInternalIds =
testRun || (flow && step.isAction) ? [] : await flow?.lastInternalIds(2000); testRun || (flow && step?.isAction) ? [] : await flow?.lastInternalIds(2000);
const isAlreadyProcessed = (internalId: string) => { const isAlreadyProcessed = (internalId: string) => {
return lastInternalIds?.includes(internalId); return lastInternalIds?.includes(internalId);

View File

@@ -2,28 +2,23 @@ import Crypto from 'node:crypto';
import { Response } from 'express'; import { Response } from 'express';
import { IRequest, ITriggerItem } from '@automatisch/types'; import { IRequest, ITriggerItem } from '@automatisch/types';
import logger from '../../helpers/logger'; import Flow from '../models/flow';
import Flow from '../../models/flow'; import { processTrigger } from '../services/trigger';
import { processTrigger } from '../../services/trigger'; import actionQueue from '../queues/action';
import actionQueue from '../../queues/action'; import globalVariable from './global-variable';
import globalVariable from '../../helpers/global-variable'; import QuotaExceededError from '../errors/quote-exceeded';
import QuotaExceededError from '../../errors/quote-exceeded';
import { import {
REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_30_DAYS_OR_150_JOBS,
REMOVE_AFTER_7_DAYS_OR_50_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS,
} from '../../helpers/remove-job-configuration'; } from './remove-job-configuration';
export default async (request: IRequest, response: Response) => {
const flowId = request.params.flowId;
export default async (flowId: string, request: IRequest, response: Response) => {
// in case it's our built-in generic webhook trigger // in case it's our built-in generic webhook trigger
let computedRequestPayload = { let computedRequestPayload = {
headers: request.headers, headers: request.headers,
body: request.body, body: request.body,
query: request.query, query: request.query,
}; };
logger.debug(`Handling incoming webhook request at ${request.originalUrl}.`);
logger.debug(JSON.stringify(computedRequestPayload, null, 2));
const flow = await Flow.query() const flow = await Flow.query()
.findById(flowId) .findById(flowId)
@@ -39,32 +34,11 @@ export default async (request: IRequest, response: Response) => {
} }
const triggerStep = await flow.getTriggerStep(); const triggerStep = await flow.getTriggerStep();
const triggerCommand = await triggerStep.getTriggerCommand();
const app = await triggerStep.getApp(); const app = await triggerStep.getApp();
const isWebhookApp = app.key === 'webhook'; const isWebhookApp = app.key === 'webhook';
if (testRun && !isWebhookApp) { if ((testRun && !isWebhookApp)) {
return response.sendStatus(404); return response.status(404);
}
if (triggerCommand.type !== 'webhook') {
return response.sendStatus(404);
}
if (app.auth?.verifyWebhook) {
const $ = await globalVariable({
flow,
connection: await triggerStep.$relatedQuery('connection'),
app,
step: triggerStep,
request,
});
const verified = await app.auth.verifyWebhook($);
if (!verified) {
return response.sendStatus(401);
}
} }
// in case trigger type is 'webhook' // in case trigger type is 'webhook'
@@ -87,7 +61,7 @@ export default async (request: IRequest, response: Response) => {
}); });
if (testRun) { if (testRun) {
return response.sendStatus(204); return response.status(204);
} }
const nextStep = await triggerStep.getNextStep(); const nextStep = await triggerStep.getNextStep();
@@ -106,5 +80,5 @@ export default async (request: IRequest, response: Response) => {
await actionQueue.add(jobName, jobPayload, jobOptions); await actionQueue.add(jobName, jobPayload, jobOptions);
return response.sendStatus(204); return response.status(204);
}; };

View File

@@ -1,12 +1,16 @@
import { QueryContext, ModelOptions } from 'objection'; import { QueryContext, ModelOptions } from 'objection';
import type { RelationMappings } from 'objection'; import type { RelationMappings } from 'objection';
import { AES, enc } from 'crypto-js'; import { AES, enc } from 'crypto-js';
import { IRequest } from '@automatisch/types';
import App from './app';
import Base from './base'; import Base from './base';
import User from './user'; import User from './user';
import Step from './step'; import Step from './step';
import ExtendedQueryBuilder from './query-builder';
import appConfig from '../config/app'; import appConfig from '../config/app';
import { IJSONObject } from '@automatisch/types'; import { IJSONObject } from '@automatisch/types';
import Telemetry from '../helpers/telemetry'; import Telemetry from '../helpers/telemetry';
import globalVariable from '../helpers/global-variable';
class Connection extends Base { class Connection extends Base {
id!: string; id!: string;
@@ -18,6 +22,9 @@ class Connection extends Base {
draft: boolean; draft: boolean;
count?: number; count?: number;
flowCount?: number; flowCount?: number;
user?: User;
steps?: Step[];
triggerSteps?: Step[];
static tableName = 'connections'; static tableName = 'connections';
@@ -53,6 +60,17 @@ class Connection extends Base {
to: 'steps.connection_id', to: 'steps.connection_id',
}, },
}, },
triggerSteps: {
relation: Base.HasManyRelation,
modelClass: Step,
join: {
from: 'connections.id',
to: 'steps.connection_id',
},
filter(builder: ExtendedQueryBuilder<Step>) {
builder.where('type', '=', 'trigger');
},
},
}); });
encryptData(): void { encryptData(): void {
@@ -110,6 +128,27 @@ class Connection extends Base {
await super.$afterUpdate(opt, queryContext); await super.$afterUpdate(opt, queryContext);
Telemetry.connectionUpdated(this); Telemetry.connectionUpdated(this);
} }
async getApp() {
if (!this.key) return null;
return await App.findOneByKey(this.key);
}
async verifyWebhook(request: IRequest) {
if (!this.key) return true;
const app = await this.getApp();
const $ = await globalVariable({
connection: this,
request,
});
if (!app.auth?.verifyWebhook) return true;
return app.auth.verifyWebhook($);
}
} }
export default Connection; export default Connection;

View File

@@ -18,6 +18,7 @@ class Flow extends Base {
active: boolean; active: boolean;
status: 'paused' | 'published' | 'draft'; status: 'paused' | 'published' | 'draft';
steps: Step[]; steps: Step[];
triggerStep: Step;
published_at: string; published_at: string;
remoteWebhookId: string; remoteWebhookId: string;
executions?: Execution[]; executions?: Execution[];
@@ -51,6 +52,20 @@ class Flow extends Base {
builder.orderBy('position', 'asc'); builder.orderBy('position', 'asc');
}, },
}, },
triggerStep: {
relation: Base.HasOneRelation,
modelClass: Step,
join: {
from: 'flows.id',
to: 'steps.flow_id',
},
filter(builder: ExtendedQueryBuilder<Step>) {
builder
.where('type', 'trigger')
.limit(1)
.first();
},
},
executions: { executions: {
relation: Base.HasManyRelation, relation: Base.HasManyRelation,
modelClass: Execution, modelClass: Execution,

View File

@@ -1,5 +1,6 @@
import { URL } from 'node:url'; import { URL } from 'node:url';
import { QueryContext, ModelOptions } from 'objection'; import { QueryContext, ModelOptions } from 'objection';
import get from 'lodash.get';
import type { IJSONObject, IStep } from '@automatisch/types'; import type { IJSONObject, IStep } from '@automatisch/types';
import Base from './base'; import Base from './base';
import App from './app'; import App from './app';
@@ -22,6 +23,7 @@ class Step extends Base {
connection?: Connection; connection?: Connection;
flow: Flow; flow: Flow;
executionSteps: ExecutionStep[]; executionSteps: ExecutionStep[];
webhookPath?: string;
static tableName = 'steps'; static tableName = 'steps';
@@ -43,6 +45,7 @@ class Step extends Base {
}, },
position: { type: 'integer' }, position: { type: 'integer' },
parameters: { type: 'object' }, parameters: { type: 'object' },
webhookPath: { type: ['string', 'null'] },
}, },
}; };
@@ -77,17 +80,52 @@ class Step extends Base {
}, },
}); });
get webhookUrl() {
return new URL(this.webhookPath, appConfig.webhookUrl).toString();
}
get iconUrl() { get iconUrl() {
if (!this.appKey) return null; if (!this.appKey) return null;
return `${appConfig.baseUrl}/apps/${this.appKey}/assets/favicon.svg`; return `${appConfig.baseUrl}/apps/${this.appKey}/assets/favicon.svg`;
} }
get webhookUrl() { async computeWebhookPath() {
if (this.appKey !== 'webhook') return null; if (this.type === 'action') return null;
const url = new URL(`/webhooks/${this.flowId}`, appConfig.webhookUrl); const triggerCommand = await this.getTriggerCommand();
return url.toString();
if (!triggerCommand) return null;
const {
useSingletonWebhook,
singletonWebhookRefValueParameter,
type,
} = triggerCommand;
const isWebhook = type === 'webhook';
if (!isWebhook) return null;
if (singletonWebhookRefValueParameter) {
const parameterValue = get(this.parameters, singletonWebhookRefValueParameter);
return `/webhooks/connections/${this.connectionId}/${parameterValue}`;
}
if (useSingletonWebhook) {
return `/webhooks/connections/${this.connectionId}`;
}
return `/webhooks/flows/${this.flowId}`;
}
async getWebhookUrl() {
if (this.type === 'action') return;
const path = await this.computeWebhookPath();
const webhookUrl = new URL(path, appConfig.webhookUrl).toString();
return webhookUrl;
} }
async $afterInsert(queryContext: QueryContext) { async $afterInsert(queryContext: QueryContext) {
@@ -166,6 +204,18 @@ class Step extends Base {
return existingArguments; return existingArguments;
} }
async updateWebhookUrl() {
if (this.isAction) return this;
const payload = {
webhookPath: await this.computeWebhookPath(),
};
await this.$query().patchAndFetch(payload);
return this;
}
} }
export default Step; export default Step;

View File

@@ -3,7 +3,8 @@ import multer from 'multer';
import { IRequest } from '@automatisch/types'; import { IRequest } from '@automatisch/types';
import appConfig from '../config/app'; import appConfig from '../config/app';
import webhookHandler from '../controllers/webhooks/handler'; import webhookHandlerByFlowId from '../controllers/webhooks/handler-by-flow-id';
import webhookHandlerByConnectionIdAndRefValue from '../controllers/webhooks/handler-by-connection-id-and-ref-value';
const router = Router(); const router = Router();
const upload = multer(); const upload = multer();
@@ -25,9 +26,20 @@ const exposeError = (handler: RequestHandler) => async (req: IRequest, res: Resp
} }
} }
router.get('/:flowId', exposeError(webhookHandler)); function createRouteHandler(path: string, handler: (req: IRequest, res: Response, next: NextFunction) => void) {
router.put('/:flowId', exposeError(webhookHandler)); const wrappedHandler = exposeError(handler);
router.patch('/:flowId', exposeError(webhookHandler));
router.post('/:flowId', exposeError(webhookHandler)); router
.route(path)
.get(wrappedHandler)
.put(wrappedHandler)
.patch(wrappedHandler)
.post(wrappedHandler);
};
createRouteHandler('/connections/:connectionId/:refValue', webhookHandlerByConnectionIdAndRefValue);
createRouteHandler('/connections/:connectionId', webhookHandlerByConnectionIdAndRefValue);
createRouteHandler('/flows/:flowId', webhookHandlerByFlowId);
createRouteHandler('/:flowId', webhookHandlerByFlowId);
export default router; export default router;

View File

@@ -60,7 +60,7 @@ export interface IStep {
key?: string; key?: string;
appKey?: string; appKey?: string;
iconUrl: string; iconUrl: string;
webhookUrl: string; webhookUrl?: string;
type: 'action' | 'trigger'; type: 'action' | 'trigger';
connectionId?: string; connectionId?: string;
status: string; status: string;
@@ -241,14 +241,16 @@ export interface IBaseTrigger {
name: string; name: string;
key: string; key: string;
type?: 'webhook' | 'polling'; type?: 'webhook' | 'polling';
showWebhookUrl?: boolean;
pollInterval?: number; pollInterval?: number;
description: string; description: string;
useSingletonWebhook?: boolean;
singletonWebhookRefValueParameter?: string;
getInterval?(parameters: IStep['parameters']): string; getInterval?(parameters: IStep['parameters']): string;
run?($: IGlobalVariable): Promise<void>; run?($: IGlobalVariable): Promise<void>;
testRun?($: IGlobalVariable): Promise<void>; testRun?($: IGlobalVariable): Promise<void>;
registerHook?($: IGlobalVariable): Promise<void>; registerHook?($: IGlobalVariable): Promise<void>;
unregisterHook?($: IGlobalVariable): Promise<void>; unregisterHook?($: IGlobalVariable): Promise<void>;
sort?(item: ITriggerItem, nextItem: ITriggerItem): number;
} }
export interface IRawTrigger extends IBaseTrigger { export interface IRawTrigger extends IBaseTrigger {
@@ -306,7 +308,7 @@ export type IGlobalVariable = {
set: (args: IJSONObject) => Promise<null>; set: (args: IJSONObject) => Promise<null>;
data: IJSONObject; data: IJSONObject;
}; };
app: IApp; app?: IApp;
http?: IHttpClient; http?: IHttpClient;
request?: IRequest; request?: IRequest;
flow?: { flow?: {
@@ -333,6 +335,7 @@ export type IGlobalVariable = {
}; };
getLastExecutionStep?: () => Promise<IExecutionStep>; getLastExecutionStep?: () => Promise<IExecutionStep>;
webhookUrl?: string; webhookUrl?: string;
singletonWebhookUrl?: string;
triggerOutput?: ITriggerOutput; triggerOutput?: ITriggerOutput;
actionOutput?: IActionOutput; actionOutput?: IActionOutput;
pushTriggerItem?: (triggerItem: ITriggerItem) => void; pushTriggerItem?: (triggerItem: ITriggerItem) => void;

View File

@@ -166,12 +166,8 @@ export default function FlowStep(
const actionsOrTriggers: Array<ITrigger | IAction> = const actionsOrTriggers: Array<ITrigger | IAction> =
(isTrigger ? app?.triggers : app?.actions) || []; (isTrigger ? app?.triggers : app?.actions) || [];
const substeps = React.useMemo( const actionOrTrigger = actionsOrTriggers?.find(({ key }) => key === step.key);
() => const substeps = actionOrTrigger?.substeps || [];
actionsOrTriggers?.find(({ key }: ITrigger | IAction) => key === step.key)
?.substeps || [],
[actionsOrTriggers, step?.key]
);
const handleChange = React.useCallback(({ step }: { step: IStep }) => { const handleChange = React.useCallback(({ step }: { step: IStep }) => {
onChange(step); onChange(step);
@@ -283,7 +279,7 @@ export default function FlowStep(
step={step} step={step}
/> />
{substeps?.length > 0 && {actionOrTrigger && substeps?.length > 0 &&
substeps.map((substep: ISubstep, index: number) => ( substeps.map((substep: ISubstep, index: number) => (
<React.Fragment key={`${substep?.name}-${index}`}> <React.Fragment key={`${substep?.name}-${index}`}>
{substep.key === 'chooseConnection' && app && ( {substep.key === 'chooseConnection' && app && (
@@ -308,6 +304,7 @@ export default function FlowStep(
onSubmit={expandNextStep} onSubmit={expandNextStep}
onChange={handleChange} onChange={handleChange}
onContinue={onContinue} onContinue={onContinue}
showWebhookUrl={'showWebhookUrl' in actionOrTrigger ? actionOrTrigger.showWebhookUrl : false}
step={step} step={step}
/> />
)} )}

View File

@@ -18,6 +18,7 @@ import type { IStep, ISubstep } from '@automatisch/types';
type TestSubstepProps = { type TestSubstepProps = {
substep: ISubstep; substep: ISubstep;
expanded?: boolean; expanded?: boolean;
showWebhookUrl?: boolean;
onExpand: () => void; onExpand: () => void;
onCollapse: () => void; onCollapse: () => void;
onChange?: ({ step }: { step: IStep }) => void; onChange?: ({ step }: { step: IStep }) => void;
@@ -52,6 +53,7 @@ function TestSubstep(props: TestSubstepProps): React.ReactElement {
onSubmit, onSubmit,
onContinue, onContinue,
step, step,
showWebhookUrl = false,
} = props; } = props;
const formatMessage = useFormatMessage(); const formatMessage = useFormatMessage();
@@ -119,7 +121,7 @@ function TestSubstep(props: TestSubstepProps): React.ReactElement {
</Alert> </Alert>
)} )}
{step.webhookUrl && ( {step.webhookUrl && showWebhookUrl && (
<WebhookUrlInfo webhookUrl={step.webhookUrl} sx={{ mb: 2 }} /> <WebhookUrlInfo webhookUrl={step.webhookUrl} sx={{ mb: 2 }} />
)} )}

View File

@@ -60,6 +60,7 @@ export const GET_APP = gql`
name name
key key
type type
showWebhookUrl
pollInterval pollInterval
description description
substeps { substeps {

View File

@@ -67,6 +67,7 @@ export const GET_APPS = gql`
name name
key key
type type
showWebhookUrl
pollInterval pollInterval
description description
substeps { substeps {