From 687c6a09bcd951ccf4c186d4ef8fb8938d0e213b Mon Sep 17 00:00:00 2001 From: Ali BARIN Date: Wed, 30 Nov 2022 19:33:57 +0100 Subject: [PATCH] feat(webhook): register in mutation calls --- .../graphql/mutations/update-flow-status.ts | 54 ++++++++++--------- packages/backend/src/services/flow.ts | 2 - 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 83f05750..1cb80d5a 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -40,27 +40,7 @@ const updateFlowStatus = async ( pattern: interval || EVERY_15_MINUTES_CRON, }; - if (flow.active) { - // add the flow job in the queue. - flow = await flow.$query().patchAndFetch({ - published_at: new Date().toISOString(), - }); - - const jobName = `${JOB_NAME}-${flow.id}`; - - await flowQueue.add( - jobName, - { flowId: flow.id }, - { - // do not repeat webhook job for immediate webhook registration - repeat: trigger.type === 'webhook' ? null : repeatOptions, - jobId: flow.id, - removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, - removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS - } - ); - } else if (!flow.active && trigger.type === 'webhook') { - // unregister webhook from the application + if (trigger.type === 'webhook') { const $ = await globalVariable({ flow, connection: await triggerStep.$relatedQuery('connection'), @@ -69,13 +49,35 @@ const updateFlowStatus = async ( testRun: false, }); - await trigger.unregisterHook($); + if (flow.active) { + await trigger.registerHook($); + } else { + await trigger.unregisterHook($); + } } else { - // remove the job out of the queue - const repeatableJobs = await flowQueue.getRepeatableJobs(); - const job = repeatableJobs.find((job) => job.id === flow.id); + if (flow.active) { + flow = await flow.$query().patchAndFetch({ + published_at: new Date().toISOString(), + }); - await flowQueue.removeRepeatableByKey(job.key); + const jobName = `${JOB_NAME}-${flow.id}`; + + await flowQueue.add( + jobName, + { flowId: flow.id }, + { + repeat: repeatOptions, + jobId: flow.id, + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS + } + ); + } else { + const repeatableJobs = await flowQueue.getRepeatableJobs(); + const job = repeatableJobs.find((job) => job.id === flow.id); + + await flowQueue.removeRepeatableByKey(job.key); + } } return flow; diff --git a/packages/backend/src/services/flow.ts b/packages/backend/src/services/flow.ts index d76d9be9..862066f7 100644 --- a/packages/backend/src/services/flow.ts +++ b/packages/backend/src/services/flow.ts @@ -25,8 +25,6 @@ export const processFlow = async (options: ProcessFlowOptions) => { try { if (triggerCommand.type === 'webhook' && !flow.active) { await triggerCommand.testRun($); - } else if (triggerCommand.type === 'webhook' && flow.active) { - await triggerCommand.registerHook($); } else { await triggerCommand.run($); }