From e7df19ae178eae1be43893a061f60718c578da98 Mon Sep 17 00:00:00 2001 From: Ali BARIN Date: Tue, 3 Dec 2024 11:22:03 +0000 Subject: [PATCH] refactor(queues): abstract queue management --- packages/backend/src/config/redis.js | 2 +- .../src/helpers/create-bull-board-handler.js | 2 +- packages/backend/src/models/flow.js | 5 +- packages/backend/src/models/user.js | 8 +- packages/backend/src/queues/base.js | 30 ++++ packages/backend/src/queues/bullmq.js | 154 ++++++++++++++++++ packages/backend/src/queues/flow.js | 30 +--- packages/backend/src/workers/flow.js | 2 +- 8 files changed, 191 insertions(+), 42 deletions(-) create mode 100644 packages/backend/src/queues/base.js create mode 100644 packages/backend/src/queues/bullmq.js diff --git a/packages/backend/src/config/redis.js b/packages/backend/src/config/redis.js index 09f0ced5..ee774f39 100644 --- a/packages/backend/src/config/redis.js +++ b/packages/backend/src/config/redis.js @@ -13,7 +13,7 @@ if (appConfig.redisSentinelHost) { { host: appConfig.redisSentinelHost, port: appConfig.redisSentinelPort, - } + }, ]; redisConfig.sentinelUsername = appConfig.redisSentinelUsername; diff --git a/packages/backend/src/helpers/create-bull-board-handler.js b/packages/backend/src/helpers/create-bull-board-handler.js index 03f949b6..fd009899 100644 --- a/packages/backend/src/helpers/create-bull-board-handler.js +++ b/packages/backend/src/helpers/create-bull-board-handler.js @@ -12,7 +12,7 @@ import appConfig from '../config/app.js'; const serverAdapter = new ExpressAdapter(); const queues = [ - new BullMQAdapter(flowQueue), + new BullMQAdapter(flowQueue.queue), new BullMQAdapter(triggerQueue), new BullMQAdapter(actionQueue), new BullMQAdapter(emailQueue), diff --git a/packages/backend/src/models/flow.js b/packages/backend/src/models/flow.js index 56744396..5543e095 100644 --- a/packages/backend/src/models/flow.js +++ b/packages/backend/src/models/flow.js @@ -386,10 +386,7 @@ class Flow extends Base { } ); } else { - const repeatableJobs = await flowQueue.getRepeatableJobs(); - const job = repeatableJobs.find((job) => job.id === this.id); - - await flowQueue.removeRepeatableByKey(job.key); + await flowQueue.removeRepeatableJobById(this.id); } } diff --git a/packages/backend/src/models/user.js b/packages/backend/src/models/user.js index 99314e07..db4b6843 100644 --- a/packages/backend/src/models/user.js +++ b/packages/backend/src/models/user.js @@ -305,14 +305,8 @@ class User extends Base { active: true, }); - const repeatableJobs = await flowQueue.getRepeatableJobs(); - for (const flow of flows) { - const job = repeatableJobs.find((job) => job.id === flow.id); - - if (job) { - await flowQueue.removeRepeatableByKey(job.key); - } + await flowQueue.removeRepeatableJobById(flow.id); } const executionIds = ( diff --git a/packages/backend/src/queues/base.js b/packages/backend/src/queues/base.js new file mode 100644 index 00000000..3b621dbb --- /dev/null +++ b/packages/backend/src/queues/base.js @@ -0,0 +1,30 @@ +/* eslint-disable no-unused-vars */ + +class BaseQueue { + constructor(name) { + if (new.target === BaseQueue) { + throw new Error('Cannot instantiate abstract class BaseQueue directly.'); + } + + this.name = name; + } + + // Abstract methods to be implemented by subclasses + async add(jobName, data, options) { + throw new Error('Method "add" must be implemented.'); + } + + async remove(jobId) { + throw new Error('Method "remove" must be implemented.'); + } + + async getRepeatableJobs() { + throw new Error('Method "getRepeatableJobs" must be implemented.'); + } + + async removeRepeatableJobByKey(jobKey) { + throw new Error('Method "removeRepeatableJobByKey" must be implemented.'); + } +} + +export default BaseQueue; diff --git a/packages/backend/src/queues/bullmq.js b/packages/backend/src/queues/bullmq.js new file mode 100644 index 00000000..858fd0e5 --- /dev/null +++ b/packages/backend/src/queues/bullmq.js @@ -0,0 +1,154 @@ +import { Queue, Worker } from 'bullmq'; +import redisConfig from '../config/redis.js'; +import logger from '../helpers/logger.js'; +import BaseQueue from './base.js'; + +const CONNECTION_REFUSED = 'ECONNREFUSED'; + +class BullMQQueue extends BaseQueue { + static queueOptions = { + connection: redisConfig, + }; + + constructor(name) { + super(name); + + this.queue = new Queue(name, this.constructor.queueOptions); + this.workers = []; + + this.setupErrorHandlers(); + this.setupGracefulShutdown(); + } + + async add(jobName, data, options = {}) { + try { + const job = await this.queue.add(jobName, data, options); + + return job; + } catch (error) { + logger.error(`Failed to add job to queue "${this.name}":`, error); + + throw error; + } + } + + async remove(jobId) { + try { + const job = await this.getJob(jobId); + + if (job) { + await job.remove(); + return true; + } + + return false; + } catch (error) { + logger.error(`Failed to remove job from queue "${this.name}":`, error); + + throw error; + } + } + + async getJob(jobId) { + return await this.queue.getJob(jobId); + } + + async getRepeatableJobById(jobId) { + const repeatableJobs = await this.getRepeatableJobs(); + const job = repeatableJobs.find((job) => job.id === jobId); + + return job; + } + + async getRepeatableJobs() { + try { + return await this.queue.getRepeatableJobs(); + } catch (error) { + logger.error( + `Failed to get repeatable jobs from queue "${this.name}":`, + error + ); + + throw error; + } + } + + async removeRepeatableJobByKey(jobKey) { + try { + await this.queue.removeRepeatableByKey(jobKey); + + return true; + } catch (error) { + logger.error( + `Failed to remove repeatable job from queue "${this.name}":`, + error + ); + + throw error; + } + } + + async removeRepeatableJobById(jobId) { + const job = await this.getRepeatableJobById(jobId); + + return await this.removeRepeatableJobByKey(job.key); + } + + startWorker(processor, workerOptions = {}) { + const worker = new Worker(this.name, processor, { + ...this.queueOptions, + ...workerOptions, + }); + + worker.on('error', (error) => { + logger.error(`Worker error in queue "${this.name}":`, error); + }); + + this.workers.push(worker); + + return worker; + } + + setupErrorHandlers() { + this.queue.on('error', (error) => { + if (error.code === CONNECTION_REFUSED) { + logger.error( + 'Make sure you have installed Redis and it is running.', + error + ); + + process.exit(); + } + + logger.error(`Queue error in "${this.name}":`, error); + }); + } + + setupGracefulShutdown() { + const shutdown = async () => { + logger.log(`Shutting down queue "${this.name}"...`); + + try { + // Close all workers gracefully + for (const worker of this.workers) { + await worker.close(); + } + + await this.queue.close(); + + logger.log(`Queue "${this.name}" shut down successfully.`); + + process.exit(); + } catch (error) { + logger.error(`Error during shutdown of queue "${this.name}":`, error); + + process.exit(1); + } + }; + + process.on('SIGTERM', shutdown); + process.on('SIGINT', shutdown); + } +} + +export default BullMQQueue; diff --git a/packages/backend/src/queues/flow.js b/packages/backend/src/queues/flow.js index aa4ae713..91a369ea 100644 --- a/packages/backend/src/queues/flow.js +++ b/packages/backend/src/queues/flow.js @@ -1,31 +1,5 @@ -import process from 'process'; -import { Queue } from 'bullmq'; -import redisConfig from '../config/redis.js'; -import logger from '../helpers/logger.js'; +import BullMQQueue from './bullmq.js'; -const CONNECTION_REFUSED = 'ECONNREFUSED'; - -const redisConnection = { - connection: redisConfig, -}; - -const flowQueue = new Queue('flow', redisConnection); - -process.on('SIGTERM', async () => { - await flowQueue.close(); -}); - -flowQueue.on('error', (error) => { - if (error.code === CONNECTION_REFUSED) { - logger.error( - 'Make sure you have installed Redis and it is running.', - error - ); - - process.exit(); - } - - logger.error('Error happened in flow queue!', error); -}); +const flowQueue = new BullMQQueue('flow'); export default flowQueue; diff --git a/packages/backend/src/workers/flow.js b/packages/backend/src/workers/flow.js index b1bcce78..bddfdb8c 100644 --- a/packages/backend/src/workers/flow.js +++ b/packages/backend/src/workers/flow.js @@ -79,7 +79,7 @@ worker.on('failed', async (job, err) => { const flow = await Flow.query().findById(job.data.flowId); if (!flow) { - await flowQueue.removeRepeatableByKey(job.repeatJobKey); + await flowQueue.removeRepeatableJobByKey(job.repeatJobKey); const flowNotFoundErrorMessage = ` JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!