From 770b07179f7574bf333536c69975fa5003e5bd3f Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Thu, 5 Dec 2024 03:21:50 +0300 Subject: [PATCH] refactor: Workers and queues and eliminate redundant process listeners --- packages/backend/src/queues/action.js | 4 ---- packages/backend/src/queues/delete-user.ee.js | 4 ---- packages/backend/src/queues/email.js | 4 ---- packages/backend/src/queues/flow.js | 4 ---- packages/backend/src/queues/index.js | 21 ++++++++++++++++++ .../remove-cancelled-subscriptions.ee.js | 4 ---- packages/backend/src/queues/trigger.js | 4 ---- packages/backend/src/worker.js | 22 ++++++++++--------- packages/backend/src/workers/action.js | 11 ++++------ .../backend/src/workers/delete-user.ee.js | 11 ++++------ packages/backend/src/workers/email.js | 11 ++++------ packages/backend/src/workers/flow.js | 11 ++++------ packages/backend/src/workers/index.js | 21 ++++++++++++++++++ .../remove-cancelled-subscriptions.ee.js | 11 ++++------ packages/backend/src/workers/trigger.js | 11 ++++------ 15 files changed, 78 insertions(+), 76 deletions(-) create mode 100644 packages/backend/src/queues/index.js create mode 100644 packages/backend/src/workers/index.js diff --git a/packages/backend/src/queues/action.js b/packages/backend/src/queues/action.js index f1f4126d..3c413173 100644 --- a/packages/backend/src/queues/action.js +++ b/packages/backend/src/queues/action.js @@ -11,10 +11,6 @@ const redisConnection = { const actionQueue = new Queue('action', redisConnection); -process.on('SIGTERM', async () => { - await actionQueue.close(); -}); - actionQueue.on('error', (error) => { if (error.code === CONNECTION_REFUSED) { logger.error( diff --git a/packages/backend/src/queues/delete-user.ee.js b/packages/backend/src/queues/delete-user.ee.js index b8437500..11794005 100644 --- a/packages/backend/src/queues/delete-user.ee.js +++ b/packages/backend/src/queues/delete-user.ee.js @@ -11,10 +11,6 @@ const redisConnection = { const deleteUserQueue = new Queue('delete-user', redisConnection); -process.on('SIGTERM', async () => { - await deleteUserQueue.close(); -}); - deleteUserQueue.on('error', (error) => { if (error.code === CONNECTION_REFUSED) { logger.error( diff --git a/packages/backend/src/queues/email.js b/packages/backend/src/queues/email.js index db6eda0d..5755f7c2 100644 --- a/packages/backend/src/queues/email.js +++ b/packages/backend/src/queues/email.js @@ -11,10 +11,6 @@ const redisConnection = { const emailQueue = new Queue('email', redisConnection); -process.on('SIGTERM', async () => { - await emailQueue.close(); -}); - emailQueue.on('error', (error) => { if (error.code === CONNECTION_REFUSED) { logger.error( diff --git a/packages/backend/src/queues/flow.js b/packages/backend/src/queues/flow.js index aa4ae713..48de083a 100644 --- a/packages/backend/src/queues/flow.js +++ b/packages/backend/src/queues/flow.js @@ -11,10 +11,6 @@ const redisConnection = { const flowQueue = new Queue('flow', redisConnection); -process.on('SIGTERM', async () => { - await flowQueue.close(); -}); - flowQueue.on('error', (error) => { if (error.code === CONNECTION_REFUSED) { logger.error( diff --git a/packages/backend/src/queues/index.js b/packages/backend/src/queues/index.js new file mode 100644 index 00000000..fcdd4c89 --- /dev/null +++ b/packages/backend/src/queues/index.js @@ -0,0 +1,21 @@ +import appConfig from '../config/app.js'; +import actionQueue from './action.js'; +import emailQueue from './email.js'; +import flowQueue from './flow.js'; +import triggerQueue from './trigger.js'; +import deleteUserQueue from './delete-user.ee.js'; +import removeCancelledSubscriptionsQueue from './remove-cancelled-subscriptions.ee.js'; + +const queues = [ + actionQueue, + emailQueue, + flowQueue, + triggerQueue, + deleteUserQueue, +]; + +if (appConfig.isCloud) { + queues.push(removeCancelledSubscriptionsQueue); +} + +export default queues; diff --git a/packages/backend/src/queues/remove-cancelled-subscriptions.ee.js b/packages/backend/src/queues/remove-cancelled-subscriptions.ee.js index 1bdddebc..f5f574a8 100644 --- a/packages/backend/src/queues/remove-cancelled-subscriptions.ee.js +++ b/packages/backend/src/queues/remove-cancelled-subscriptions.ee.js @@ -14,10 +14,6 @@ const removeCancelledSubscriptionsQueue = new Queue( redisConnection ); -process.on('SIGTERM', async () => { - await removeCancelledSubscriptionsQueue.close(); -}); - removeCancelledSubscriptionsQueue.on('error', (error) => { if (error.code === CONNECTION_REFUSED) { logger.error( diff --git a/packages/backend/src/queues/trigger.js b/packages/backend/src/queues/trigger.js index 66a3d9ec..bc0f9b46 100644 --- a/packages/backend/src/queues/trigger.js +++ b/packages/backend/src/queues/trigger.js @@ -11,10 +11,6 @@ const redisConnection = { const triggerQueue = new Queue('trigger', redisConnection); -process.on('SIGTERM', async () => { - await triggerQueue.close(); -}); - triggerQueue.on('error', (error) => { if (error.code === CONNECTION_REFUSED) { logger.error( diff --git a/packages/backend/src/worker.js b/packages/backend/src/worker.js index 988cb0cc..214e343d 100644 --- a/packages/backend/src/worker.js +++ b/packages/backend/src/worker.js @@ -1,20 +1,22 @@ import * as Sentry from './helpers/sentry.ee.js'; -import appConfig from './config/app.js'; +import process from 'node:process'; Sentry.init(); import './config/orm.js'; import './helpers/check-worker-readiness.js'; -import './workers/flow.js'; -import './workers/trigger.js'; -import './workers/action.js'; -import './workers/email.js'; -import './workers/delete-user.ee.js'; +import queues from './queues/index.js'; +import workers from './workers/index.js'; -if (appConfig.isCloud) { - import('./workers/remove-cancelled-subscriptions.ee.js'); - import('./queues/remove-cancelled-subscriptions.ee.js'); -} +process.on('SIGTERM', async () => { + for (const queue of queues) { + await queue.close(); + } + + for (const worker of workers) { + await worker.close(); + } +}); import telemetry from './helpers/telemetry/index.js'; diff --git a/packages/backend/src/workers/action.js b/packages/backend/src/workers/action.js index 9564d9a4..3159a7d6 100644 --- a/packages/backend/src/workers/action.js +++ b/packages/backend/src/workers/action.js @@ -1,5 +1,4 @@ import { Worker } from 'bullmq'; -import process from 'node:process'; import * as Sentry from '../helpers/sentry.ee.js'; import redisConfig from '../config/redis.js'; @@ -15,7 +14,7 @@ import delayAsMilliseconds from '../helpers/delay-as-milliseconds.js'; const DEFAULT_DELAY_DURATION = 0; -export const worker = new Worker( +const actionWorker = new Worker( 'action', async (job) => { const { stepId, flowId, executionId, computedParameters, executionStep } = @@ -55,11 +54,11 @@ export const worker = new Worker( { connection: redisConfig } ); -worker.on('completed', (job) => { +actionWorker.on('completed', (job) => { logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); }); -worker.on('failed', (job, err) => { +actionWorker.on('failed', (job, err) => { const errorMessage = ` JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} \n ${err.stack} @@ -74,6 +73,4 @@ worker.on('failed', (job, err) => { }); }); -process.on('SIGTERM', async () => { - await worker.close(); -}); +export default actionWorker; diff --git a/packages/backend/src/workers/delete-user.ee.js b/packages/backend/src/workers/delete-user.ee.js index 64a4bfa4..9081df20 100644 --- a/packages/backend/src/workers/delete-user.ee.js +++ b/packages/backend/src/workers/delete-user.ee.js @@ -1,5 +1,4 @@ import { Worker } from 'bullmq'; -import process from 'node:process'; import * as Sentry from '../helpers/sentry.ee.js'; import redisConfig from '../config/redis.js'; @@ -8,7 +7,7 @@ import appConfig from '../config/app.js'; import User from '../models/user.js'; import ExecutionStep from '../models/execution-step.js'; -export const worker = new Worker( +const deleteUserWorker = new Worker( 'delete-user', async (job) => { const { id } = job.data; @@ -46,13 +45,13 @@ export const worker = new Worker( { connection: redisConfig } ); -worker.on('completed', (job) => { +deleteUserWorker.on('completed', (job) => { logger.info( `JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has been deleted!` ); }); -worker.on('failed', (job, err) => { +deleteUserWorker.on('failed', (job, err) => { const errorMessage = ` JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has failed to be deleted! ${err.message} \n ${err.stack} @@ -67,6 +66,4 @@ worker.on('failed', (job, err) => { }); }); -process.on('SIGTERM', async () => { - await worker.close(); -}); +export default deleteUserWorker; diff --git a/packages/backend/src/workers/email.js b/packages/backend/src/workers/email.js index 446d8309..92bf0367 100644 --- a/packages/backend/src/workers/email.js +++ b/packages/backend/src/workers/email.js @@ -1,5 +1,4 @@ import { Worker } from 'bullmq'; -import process from 'node:process'; import * as Sentry from '../helpers/sentry.ee.js'; import redisConfig from '../config/redis.js'; @@ -16,7 +15,7 @@ const isAutomatischEmail = (email) => { return email.endsWith('@automatisch.io'); }; -export const worker = new Worker( +const emailWorker = new Worker( 'email', async (job) => { const { email, subject, template, params } = job.data; @@ -39,13 +38,13 @@ export const worker = new Worker( { connection: redisConfig } ); -worker.on('completed', (job) => { +emailWorker.on('completed', (job) => { logger.info( `JOB ID: ${job.id} - ${job.data.subject} email sent to ${job.data.email}!` ); }); -worker.on('failed', (job, err) => { +emailWorker.on('failed', (job, err) => { const errorMessage = ` JOB ID: ${job.id} - ${job.data.subject} email to ${job.data.email} has failed to send with ${err.message} \n ${err.stack} @@ -60,6 +59,4 @@ worker.on('failed', (job, err) => { }); }); -process.on('SIGTERM', async () => { - await worker.close(); -}); +export default emailWorker; diff --git a/packages/backend/src/workers/flow.js b/packages/backend/src/workers/flow.js index b1bcce78..8c08d5e1 100644 --- a/packages/backend/src/workers/flow.js +++ b/packages/backend/src/workers/flow.js @@ -1,5 +1,4 @@ import { Worker } from 'bullmq'; -import process from 'node:process'; import * as Sentry from '../helpers/sentry.ee.js'; import redisConfig from '../config/redis.js'; @@ -13,7 +12,7 @@ import { REMOVE_AFTER_7_DAYS_OR_50_JOBS, } from '../helpers/remove-job-configuration.js'; -export const worker = new Worker( +const flowWorker = new Worker( 'flow', async (job) => { const { flowId } = job.data; @@ -64,11 +63,11 @@ export const worker = new Worker( { connection: redisConfig } ); -worker.on('completed', (job) => { +flowWorker.on('completed', (job) => { logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); }); -worker.on('failed', async (job, err) => { +flowWorker.on('failed', async (job, err) => { const errorMessage = ` JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} \n ${err.stack} @@ -95,6 +94,4 @@ worker.on('failed', async (job, err) => { }); }); -process.on('SIGTERM', async () => { - await worker.close(); -}); +export default flowWorker; diff --git a/packages/backend/src/workers/index.js b/packages/backend/src/workers/index.js new file mode 100644 index 00000000..81446f2a --- /dev/null +++ b/packages/backend/src/workers/index.js @@ -0,0 +1,21 @@ +import appConfig from '../config/app.js'; +import actionWorker from './action.js'; +import emailWorker from './email.js'; +import flowWorker from './flow.js'; +import triggerWorker from './trigger.js'; +import deleteUserWorker from './delete-user.ee.js'; +import removeCancelledSubscriptionsWorker from './remove-cancelled-subscriptions.ee.js'; + +const workers = [ + actionWorker, + emailWorker, + flowWorker, + triggerWorker, + deleteUserWorker, +]; + +if (appConfig.isCloud) { + workers.push(removeCancelledSubscriptionsWorker); +} + +export default workers; diff --git a/packages/backend/src/workers/remove-cancelled-subscriptions.ee.js b/packages/backend/src/workers/remove-cancelled-subscriptions.ee.js index 7b3952bf..6ee0ae17 100644 --- a/packages/backend/src/workers/remove-cancelled-subscriptions.ee.js +++ b/packages/backend/src/workers/remove-cancelled-subscriptions.ee.js @@ -1,12 +1,11 @@ import { Worker } from 'bullmq'; -import process from 'node:process'; import { DateTime } from 'luxon'; import * as Sentry from '../helpers/sentry.ee.js'; import redisConfig from '../config/redis.js'; import logger from '../helpers/logger.js'; import Subscription from '../models/subscription.ee.js'; -export const worker = new Worker( +const removeCancelledSubscriptionsWorker = new Worker( 'remove-cancelled-subscriptions', async () => { await Subscription.query() @@ -23,13 +22,13 @@ export const worker = new Worker( { connection: redisConfig } ); -worker.on('completed', (job) => { +removeCancelledSubscriptionsWorker.on('completed', (job) => { logger.info( `JOB ID: ${job.id} - The cancelled subscriptions have been removed!` ); }); -worker.on('failed', (job, err) => { +removeCancelledSubscriptionsWorker.on('failed', (job, err) => { const errorMessage = ` JOB ID: ${job.id} - ERROR: The cancelled subscriptions can not be removed! ${err.message} \n ${err.stack} @@ -42,6 +41,4 @@ worker.on('failed', (job, err) => { }); }); -process.on('SIGTERM', async () => { - await worker.close(); -}); +export default removeCancelledSubscriptionsWorker; diff --git a/packages/backend/src/workers/trigger.js b/packages/backend/src/workers/trigger.js index 58749f75..64056dd9 100644 --- a/packages/backend/src/workers/trigger.js +++ b/packages/backend/src/workers/trigger.js @@ -1,5 +1,4 @@ import { Worker } from 'bullmq'; -import process from 'node:process'; import * as Sentry from '../helpers/sentry.ee.js'; import redisConfig from '../config/redis.js'; @@ -12,7 +11,7 @@ import { REMOVE_AFTER_7_DAYS_OR_50_JOBS, } from '../helpers/remove-job-configuration.js'; -export const worker = new Worker( +const triggerWorker = new Worker( 'trigger', async (job) => { const { flowId, executionId, stepId, executionStep } = await processTrigger( @@ -41,11 +40,11 @@ export const worker = new Worker( { connection: redisConfig } ); -worker.on('completed', (job) => { +triggerWorker.on('completed', (job) => { logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); }); -worker.on('failed', (job, err) => { +triggerWorker.on('failed', (job, err) => { const errorMessage = ` JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} \n ${err.stack} @@ -60,6 +59,4 @@ worker.on('failed', (job, err) => { }); }); -process.on('SIGTERM', async () => { - await worker.close(); -}); +export default triggerWorker;