Compare commits

...

1 Commits

Author SHA1 Message Date
Ali BARIN
e7df19ae17 refactor(queues): abstract queue management 2024-12-03 11:22:03 +00:00
8 changed files with 191 additions and 42 deletions

View File

@@ -13,7 +13,7 @@ if (appConfig.redisSentinelHost) {
{
host: appConfig.redisSentinelHost,
port: appConfig.redisSentinelPort,
}
},
];
redisConfig.sentinelUsername = appConfig.redisSentinelUsername;

View File

@@ -12,7 +12,7 @@ import appConfig from '../config/app.js';
const serverAdapter = new ExpressAdapter();
const queues = [
new BullMQAdapter(flowQueue),
new BullMQAdapter(flowQueue.queue),
new BullMQAdapter(triggerQueue),
new BullMQAdapter(actionQueue),
new BullMQAdapter(emailQueue),

View File

@@ -386,10 +386,7 @@ class Flow extends Base {
}
);
} else {
const repeatableJobs = await flowQueue.getRepeatableJobs();
const job = repeatableJobs.find((job) => job.id === this.id);
await flowQueue.removeRepeatableByKey(job.key);
await flowQueue.removeRepeatableJobById(this.id);
}
}

View File

@@ -305,14 +305,8 @@ class User extends Base {
active: true,
});
const repeatableJobs = await flowQueue.getRepeatableJobs();
for (const flow of flows) {
const job = repeatableJobs.find((job) => job.id === flow.id);
if (job) {
await flowQueue.removeRepeatableByKey(job.key);
}
await flowQueue.removeRepeatableJobById(flow.id);
}
const executionIds = (

View File

@@ -0,0 +1,30 @@
/* eslint-disable no-unused-vars */
class BaseQueue {
constructor(name) {
if (new.target === BaseQueue) {
throw new Error('Cannot instantiate abstract class BaseQueue directly.');
}
this.name = name;
}
// Abstract methods to be implemented by subclasses
async add(jobName, data, options) {
throw new Error('Method "add" must be implemented.');
}
async remove(jobId) {
throw new Error('Method "remove" must be implemented.');
}
async getRepeatableJobs() {
throw new Error('Method "getRepeatableJobs" must be implemented.');
}
async removeRepeatableJobByKey(jobKey) {
throw new Error('Method "removeRepeatableJobByKey" must be implemented.');
}
}
export default BaseQueue;

View File

@@ -0,0 +1,154 @@
import { Queue, Worker } from 'bullmq';
import redisConfig from '../config/redis.js';
import logger from '../helpers/logger.js';
import BaseQueue from './base.js';
const CONNECTION_REFUSED = 'ECONNREFUSED';
class BullMQQueue extends BaseQueue {
static queueOptions = {
connection: redisConfig,
};
constructor(name) {
super(name);
this.queue = new Queue(name, this.constructor.queueOptions);
this.workers = [];
this.setupErrorHandlers();
this.setupGracefulShutdown();
}
async add(jobName, data, options = {}) {
try {
const job = await this.queue.add(jobName, data, options);
return job;
} catch (error) {
logger.error(`Failed to add job to queue "${this.name}":`, error);
throw error;
}
}
async remove(jobId) {
try {
const job = await this.getJob(jobId);
if (job) {
await job.remove();
return true;
}
return false;
} catch (error) {
logger.error(`Failed to remove job from queue "${this.name}":`, error);
throw error;
}
}
async getJob(jobId) {
return await this.queue.getJob(jobId);
}
async getRepeatableJobById(jobId) {
const repeatableJobs = await this.getRepeatableJobs();
const job = repeatableJobs.find((job) => job.id === jobId);
return job;
}
async getRepeatableJobs() {
try {
return await this.queue.getRepeatableJobs();
} catch (error) {
logger.error(
`Failed to get repeatable jobs from queue "${this.name}":`,
error
);
throw error;
}
}
async removeRepeatableJobByKey(jobKey) {
try {
await this.queue.removeRepeatableByKey(jobKey);
return true;
} catch (error) {
logger.error(
`Failed to remove repeatable job from queue "${this.name}":`,
error
);
throw error;
}
}
async removeRepeatableJobById(jobId) {
const job = await this.getRepeatableJobById(jobId);
return await this.removeRepeatableJobByKey(job.key);
}
startWorker(processor, workerOptions = {}) {
const worker = new Worker(this.name, processor, {
...this.queueOptions,
...workerOptions,
});
worker.on('error', (error) => {
logger.error(`Worker error in queue "${this.name}":`, error);
});
this.workers.push(worker);
return worker;
}
setupErrorHandlers() {
this.queue.on('error', (error) => {
if (error.code === CONNECTION_REFUSED) {
logger.error(
'Make sure you have installed Redis and it is running.',
error
);
process.exit();
}
logger.error(`Queue error in "${this.name}":`, error);
});
}
setupGracefulShutdown() {
const shutdown = async () => {
logger.log(`Shutting down queue "${this.name}"...`);
try {
// Close all workers gracefully
for (const worker of this.workers) {
await worker.close();
}
await this.queue.close();
logger.log(`Queue "${this.name}" shut down successfully.`);
process.exit();
} catch (error) {
logger.error(`Error during shutdown of queue "${this.name}":`, error);
process.exit(1);
}
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
}
}
export default BullMQQueue;

View File

@@ -1,31 +1,5 @@
import process from 'process';
import { Queue } from 'bullmq';
import redisConfig from '../config/redis.js';
import logger from '../helpers/logger.js';
import BullMQQueue from './bullmq.js';
const CONNECTION_REFUSED = 'ECONNREFUSED';
const redisConnection = {
connection: redisConfig,
};
const flowQueue = new Queue('flow', redisConnection);
process.on('SIGTERM', async () => {
await flowQueue.close();
});
flowQueue.on('error', (error) => {
if (error.code === CONNECTION_REFUSED) {
logger.error(
'Make sure you have installed Redis and it is running.',
error
);
process.exit();
}
logger.error('Error happened in flow queue!', error);
});
const flowQueue = new BullMQQueue('flow');
export default flowQueue;

View File

@@ -79,7 +79,7 @@ worker.on('failed', async (job, err) => {
const flow = await Flow.query().findById(job.data.flowId);
if (!flow) {
await flowQueue.removeRepeatableByKey(job.repeatJobKey);
await flowQueue.removeRepeatableJobByKey(job.repeatJobKey);
const flowNotFoundErrorMessage = `
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!