feat: Implement webhook logic along with new entry typeform trigger

This commit is contained in:
Faruk AYDIN
2022-11-28 23:30:03 +01:00
committed by Ali BARIN
parent 397926f994
commit d83e8dabf8
13 changed files with 246 additions and 12 deletions

View File

@@ -2,6 +2,7 @@ HOST=localhost
PROTOCOL=http
PORT=3000
WEB_APP_URL=http://localhost:3001
WEBHOOK_URL=http://localhost:3000
APP_ENV=development
POSTGRES_DATABASE=automatisch_development
POSTGRES_PORT=5432

View File

@@ -2,7 +2,6 @@ import createError from 'http-errors';
import express, { Request, Response, NextFunction } from 'express';
import cors from 'cors';
import corsOptions from './config/cors-options';
import graphQLInstance from './helpers/graphql-instance';
import morgan from './helpers/morgan';
import appAssetsHandler from './helpers/app-assets-handler';
import webUIHandler from './helpers/web-ui-handler';
@@ -13,6 +12,8 @@ import {
serverAdapter,
} from './helpers/create-bull-board-handler';
import injectBullBoardHandler from './helpers/inject-bull-board-handler';
import router from './routes';
import { IRequest } from '@automatisch/types';
createBullBoardHandler(serverAdapter);
@@ -23,10 +24,16 @@ injectBullBoardHandler(app, serverAdapter);
appAssetsHandler(app);
app.use(morgan);
app.use(express.json());
app.use(
express.json({
verify: (req, res, buf) => {
(req as IRequest).rawBody = buf;
},
})
);
app.use(express.urlencoded({ extended: false }));
app.use(cors(corsOptions));
app.use('/graphql', graphQLInstance);
app.use('/', router);
webUIHandler(app);

View File

@@ -2,6 +2,7 @@ import generateAuthUrl from './generate-auth-url';
import verifyCredentials from './verify-credentials';
import isStillVerified from './is-still-verified';
import refreshToken from './refresh-token';
import verifyWebhook from './verify-webhook';
export default {
fields: [
@@ -45,4 +46,5 @@ export default {
verifyCredentials,
isStillVerified,
refreshToken,
verifyWebhook,
};

View File

@@ -0,0 +1,20 @@
import crypto from 'crypto';
import { IGlobalVariable } from '@automatisch/types';
import appConfig from '../../../config/app';
const verifyWebhook = async ($: IGlobalVariable) => {
const signature = $.request.headers['typeform-signature'] as string;
const isValid = verifySignature(signature, $.request.rawBody.toString());
return isValid;
};
const verifySignature = function (receivedSignature: string, payload: string) {
const hash = crypto
.createHmac('sha256', appConfig.appSecretKey)
.update(payload)
.digest('base64');
return receivedSignature === `sha256=${hash}`;
};
export default verifyWebhook;

View File

@@ -1,14 +1,16 @@
import { IJSONObject } from '@automatisch/types';
import appConfig from '../../../../config/app';
import defineTrigger from '../../../../helpers/define-trigger';
export default defineTrigger({
name: 'New entry',
key: 'newEntry',
pollInterval: 15,
type: 'webhook',
description: 'Triggers when a new form submitted.',
arguments: [
{
label: 'Form',
key: 'form',
key: 'formId',
type: 'dropdown' as const,
required: true,
description: 'Pick a form to receive submissions.',
@@ -26,7 +28,81 @@ export default defineTrigger({
},
],
async run($) {
// await getUserTweets($, { currentUser: true });
async testRun($) {
const createApiResponse = await $.http.get(
`/forms/${$.step.parameters.formId}`
);
const responsesApiResponse = await $.http.get(
`/forms/${$.step.parameters.formId}/responses`
);
const lastResponse = responsesApiResponse.data.items[0];
const computedResponseItem = {
event_type: 'form_response',
form_response: {
form_id: $.step.parameters.formId,
token: lastResponse.token,
landed_at: lastResponse.landed_at,
submitted_at: lastResponse.submitted_at,
definion: {
id: $.step.parameters.formId,
title: createApiResponse.data.title,
fields: createApiResponse.data?.fields?.map((field: IJSONObject) => ({
id: field.id,
ref: field.ref,
type: field.type,
title: field.title,
properties: {},
choices: (
(field?.properties as IJSONObject)?.choices as IJSONObject[]
)?.map((choice) => ({
id: choice.id,
label: choice.label,
})),
})),
},
answers: lastResponse.answers?.map((answer: IJSONObject) => ({
type: answer.type,
choice: {
label: (answer?.choice as IJSONObject)?.label,
},
field: {
id: (answer.field as IJSONObject).id,
ref: (answer.field as IJSONObject).ref,
type: (answer.field as IJSONObject).type,
},
})),
},
};
const dataItem = {
raw: computedResponseItem,
meta: {
internalId: computedResponseItem.form_response.token,
},
};
$.pushTriggerItem(dataItem);
},
async registerHook($) {
const subscriptionPayload = {
enabled: true,
url: $.webhookUrl,
secret: appConfig.appSecretKey,
};
await $.http.put(
`/forms/${$.step.parameters.formId}/webhooks/${$.flow.id}`,
subscriptionPayload
);
},
async unregisterHook($) {
await $.http.delete(
`/forms/${$.step.parameters.formId}/webhooks/${$.flow.id}`
);
},
});

View File

@@ -6,6 +6,7 @@ type AppConfig = {
protocol: string;
port: string;
webAppUrl: string;
webhookUrl: string;
appEnv: string;
isDev: boolean;
postgresDatabase: string;
@@ -37,6 +38,8 @@ const serveWebAppSeparately =
process.env.SERVE_WEB_APP_SEPARATELY === 'true' ? true : false;
let webAppUrl = `${protocol}://${host}:${port}`;
const webhookUrl = process.env.WEBHOOK_URL || webAppUrl;
if (serveWebAppSeparately) {
webAppUrl = process.env.WEB_APP_URL || 'http://localhost:3001';
}
@@ -73,6 +76,7 @@ const appConfig: AppConfig = {
bullMQDashboardPassword: process.env.BULLMQ_DASHBOARD_PASSWORD,
baseUrl,
webAppUrl,
webhookUrl,
telemetryEnabled: process.env.TELEMETRY_ENABLED === 'false' ? false : true,
};

View File

@@ -0,0 +1,55 @@
import { Request, Response } from 'express';
import { ITriggerItem } from '@automatisch/types';
import Flow from '../../models/flow';
import triggerQueue from '../../queues/trigger';
import globalVariable from '../../helpers/global-variable';
export default async (request: Request, response: Response) => {
const flow = await Flow.query()
.findById(request.params.flowId)
.throwIfNotFound();
if (!flow.active) {
return response.send(404);
}
const triggerStep = await flow.getTriggerStep();
const app = await triggerStep.getApp();
if (app.auth.verifyWebhook) {
const $ = await globalVariable({
flow,
connection: await triggerStep.$relatedQuery('connection'),
app,
step: triggerStep,
testRun: false,
request,
});
const verified = await app.auth.verifyWebhook($);
if (!verified) {
return response.sendStatus(401);
}
}
const triggerItem: ITriggerItem = {
raw: request.body,
meta: {
internalId: request.body.form_response.token,
},
};
const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`;
const jobPayload = {
flowId: flow.id,
stepId: triggerStep.id,
triggerItem,
};
await triggerQueue.add(jobName, jobPayload);
return response.sendStatus(200);
};

View File

@@ -1,6 +1,7 @@
import Context from '../../types/express/context';
import flowQueue from '../../queues/flow';
import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../../helpers/remove-job-configuration';
import globalVariable from '../../helpers/global-variable';
type Params = {
input: {
@@ -50,12 +51,22 @@ const updateFlowStatus = async (
jobName,
{ flowId: flow.id },
{
repeat: repeatOptions,
repeat: trigger.type === 'webhook' ? null : repeatOptions,
jobId: flow.id,
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS
}
);
} else if (!flow.active && trigger.type === 'webhook') {
const $ = await globalVariable({
flow,
connection: await triggerStep.$relatedQuery('connection'),
app: await triggerStep.getApp(),
step: triggerStep,
testRun: false,
});
await trigger.unregisterHook($);
} else {
const repeatableJobs = await flowQueue.getRepeatableJobs();
const job = repeatableJobs.find((job) => job.id === flow.id);

View File

@@ -3,12 +3,14 @@ import Connection from '../models/connection';
import Flow from '../models/flow';
import Step from '../models/step';
import Execution from '../models/execution';
import appConfig from '../config/app';
import {
IJSONObject,
IApp,
IGlobalVariable,
ITriggerItem,
IActionItem,
IRequest,
} from '@automatisch/types';
import EarlyExitError from '../errors/early-exit';
@@ -19,12 +21,21 @@ type GlobalVariableOptions = {
step?: Step;
execution?: Execution;
testRun?: boolean;
request?: IRequest;
};
const globalVariable = async (
options: GlobalVariableOptions
): Promise<IGlobalVariable> => {
const { connection, app, flow, step, execution, testRun = false } = options;
const {
connection,
app,
flow,
step,
execution,
request,
testRun = false,
} = options;
const lastInternalId = testRun ? undefined : await flow?.lastInternalId();
const nextStep = await step?.getNextStep();
@@ -95,12 +106,22 @@ const globalVariable = async (
},
};
if (request) {
$.request = request;
}
$.http = createHttpClient({
$,
baseURL: app.apiBaseUrl,
beforeRequest: app.beforeRequest,
});
if (flow) {
const webhookUrl = appConfig.webhookUrl + '/webhooks/' + flow.id;
$.webhookUrl = webhookUrl;
}
const lastInternalIds =
testRun || (flow && step.isAction) ? [] : await flow?.lastInternalIds(2000);

View File

@@ -0,0 +1,10 @@
import { Router } from 'express';
import graphQLInstance from '../helpers/graphql-instance';
import webhooksRouter from './webhooks';
const router = Router();
router.use('/graphql', graphQLInstance);
router.use('/webhooks', webhooksRouter);
export default router;

View File

@@ -0,0 +1,8 @@
import createAction from '../controllers/webhooks/create';
import { Router } from 'express';
const router = Router();
router.post('/:flowId', createAction);
export default router;

View File

@@ -23,7 +23,13 @@ export const processFlow = async (options: ProcessFlowOptions) => {
});
try {
await triggerCommand.run($);
if (triggerCommand.type === 'webhook' && !flow.active) {
await triggerCommand.testRun($);
} else if (triggerCommand.type === 'webhook' && flow.active) {
await triggerCommand.registerHook($);
} else {
await triggerCommand.run($);
}
} catch (error) {
if (error instanceof EarlyExitError === false) {
if (error instanceof HttpError) {

View File

@@ -1,5 +1,6 @@
import type { AxiosInstance, AxiosRequestConfig } from 'axios';
export type IHttpClient = AxiosInstance;
import type { Request } from 'express';
// Type definitions for automatisch
@@ -182,6 +183,7 @@ export interface IAuth {
verifyCredentials($: IGlobalVariable): Promise<void>;
isStillVerified($: IGlobalVariable): Promise<boolean>;
refreshToken?($: IGlobalVariable): Promise<void>;
verifyWebhook?($: IGlobalVariable): Promise<boolean>;
isRefreshTokenRequested?: boolean;
fields: IField[];
authenticationSteps?: IAuthenticationStep[];
@@ -210,10 +212,14 @@ export interface ITriggerItem {
export interface IBaseTrigger {
name: string;
key: string;
type?: 'webhook' | 'polling';
pollInterval?: number;
description: string;
getInterval?(parameters: IStep['parameters']): string;
run($: IGlobalVariable): Promise<void>;
run?($: IGlobalVariable): Promise<void>;
testRun?($: IGlobalVariable): Promise<void>;
registerHook?($: IGlobalVariable): Promise<void>;
unregisterHook?($: IGlobalVariable): Promise<void>;
sort?(item: ITriggerItem, nextItem: ITriggerItem): number;
}
@@ -238,7 +244,7 @@ export interface IBaseAction {
name: string;
key: string;
description: string;
run($: IGlobalVariable): Promise<void>;
run?($: IGlobalVariable): Promise<void>;
}
export interface IRawAction extends IBaseAction {
@@ -274,6 +280,7 @@ export type IGlobalVariable = {
};
app: IApp;
http?: IHttpClient;
request?: IRequest;
flow?: {
id: string;
lastInternalId: string;
@@ -293,6 +300,7 @@ export type IGlobalVariable = {
id: string;
testRun: boolean;
};
webhookUrl?: string;
triggerOutput?: ITriggerOutput;
actionOutput?: IActionOutput;
pushTriggerItem?: (triggerItem: ITriggerItem) => void;
@@ -308,3 +316,8 @@ declare module 'axios' {
additionalProperties?: Record<string, unknown>;
}
}
export interface IRequest extends Request {
rawBody?: Buffer;
}