refactor(queues): abstract queue management
This commit is contained in:
@@ -13,7 +13,7 @@ if (appConfig.redisSentinelHost) {
|
|||||||
{
|
{
|
||||||
host: appConfig.redisSentinelHost,
|
host: appConfig.redisSentinelHost,
|
||||||
port: appConfig.redisSentinelPort,
|
port: appConfig.redisSentinelPort,
|
||||||
}
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
redisConfig.sentinelUsername = appConfig.redisSentinelUsername;
|
redisConfig.sentinelUsername = appConfig.redisSentinelUsername;
|
||||||
|
@@ -12,7 +12,7 @@ import appConfig from '../config/app.js';
|
|||||||
const serverAdapter = new ExpressAdapter();
|
const serverAdapter = new ExpressAdapter();
|
||||||
|
|
||||||
const queues = [
|
const queues = [
|
||||||
new BullMQAdapter(flowQueue),
|
new BullMQAdapter(flowQueue.queue),
|
||||||
new BullMQAdapter(triggerQueue),
|
new BullMQAdapter(triggerQueue),
|
||||||
new BullMQAdapter(actionQueue),
|
new BullMQAdapter(actionQueue),
|
||||||
new BullMQAdapter(emailQueue),
|
new BullMQAdapter(emailQueue),
|
||||||
|
@@ -386,10 +386,7 @@ class Flow extends Base {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
const repeatableJobs = await flowQueue.getRepeatableJobs();
|
await flowQueue.removeRepeatableJobById(this.id);
|
||||||
const job = repeatableJobs.find((job) => job.id === this.id);
|
|
||||||
|
|
||||||
await flowQueue.removeRepeatableByKey(job.key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -305,14 +305,8 @@ class User extends Base {
|
|||||||
active: true,
|
active: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
const repeatableJobs = await flowQueue.getRepeatableJobs();
|
|
||||||
|
|
||||||
for (const flow of flows) {
|
for (const flow of flows) {
|
||||||
const job = repeatableJobs.find((job) => job.id === flow.id);
|
await flowQueue.removeRepeatableJobById(flow.id);
|
||||||
|
|
||||||
if (job) {
|
|
||||||
await flowQueue.removeRepeatableByKey(job.key);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const executionIds = (
|
const executionIds = (
|
||||||
|
30
packages/backend/src/queues/base.js
Normal file
30
packages/backend/src/queues/base.js
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
/* eslint-disable no-unused-vars */
|
||||||
|
|
||||||
|
class BaseQueue {
|
||||||
|
constructor(name) {
|
||||||
|
if (new.target === BaseQueue) {
|
||||||
|
throw new Error('Cannot instantiate abstract class BaseQueue directly.');
|
||||||
|
}
|
||||||
|
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abstract methods to be implemented by subclasses
|
||||||
|
async add(jobName, data, options) {
|
||||||
|
throw new Error('Method "add" must be implemented.');
|
||||||
|
}
|
||||||
|
|
||||||
|
async remove(jobId) {
|
||||||
|
throw new Error('Method "remove" must be implemented.');
|
||||||
|
}
|
||||||
|
|
||||||
|
async getRepeatableJobs() {
|
||||||
|
throw new Error('Method "getRepeatableJobs" must be implemented.');
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeRepeatableJobByKey(jobKey) {
|
||||||
|
throw new Error('Method "removeRepeatableJobByKey" must be implemented.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default BaseQueue;
|
154
packages/backend/src/queues/bullmq.js
Normal file
154
packages/backend/src/queues/bullmq.js
Normal file
@@ -0,0 +1,154 @@
|
|||||||
|
import { Queue, Worker } from 'bullmq';
|
||||||
|
import redisConfig from '../config/redis.js';
|
||||||
|
import logger from '../helpers/logger.js';
|
||||||
|
import BaseQueue from './base.js';
|
||||||
|
|
||||||
|
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||||
|
|
||||||
|
class BullMQQueue extends BaseQueue {
|
||||||
|
static queueOptions = {
|
||||||
|
connection: redisConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
constructor(name) {
|
||||||
|
super(name);
|
||||||
|
|
||||||
|
this.queue = new Queue(name, this.constructor.queueOptions);
|
||||||
|
this.workers = [];
|
||||||
|
|
||||||
|
this.setupErrorHandlers();
|
||||||
|
this.setupGracefulShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
async add(jobName, data, options = {}) {
|
||||||
|
try {
|
||||||
|
const job = await this.queue.add(jobName, data, options);
|
||||||
|
|
||||||
|
return job;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to add job to queue "${this.name}":`, error);
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async remove(jobId) {
|
||||||
|
try {
|
||||||
|
const job = await this.getJob(jobId);
|
||||||
|
|
||||||
|
if (job) {
|
||||||
|
await job.remove();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to remove job from queue "${this.name}":`, error);
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async getJob(jobId) {
|
||||||
|
return await this.queue.getJob(jobId);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getRepeatableJobById(jobId) {
|
||||||
|
const repeatableJobs = await this.getRepeatableJobs();
|
||||||
|
const job = repeatableJobs.find((job) => job.id === jobId);
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getRepeatableJobs() {
|
||||||
|
try {
|
||||||
|
return await this.queue.getRepeatableJobs();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to get repeatable jobs from queue "${this.name}":`,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeRepeatableJobByKey(jobKey) {
|
||||||
|
try {
|
||||||
|
await this.queue.removeRepeatableByKey(jobKey);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to remove repeatable job from queue "${this.name}":`,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeRepeatableJobById(jobId) {
|
||||||
|
const job = await this.getRepeatableJobById(jobId);
|
||||||
|
|
||||||
|
return await this.removeRepeatableJobByKey(job.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
startWorker(processor, workerOptions = {}) {
|
||||||
|
const worker = new Worker(this.name, processor, {
|
||||||
|
...this.queueOptions,
|
||||||
|
...workerOptions,
|
||||||
|
});
|
||||||
|
|
||||||
|
worker.on('error', (error) => {
|
||||||
|
logger.error(`Worker error in queue "${this.name}":`, error);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.workers.push(worker);
|
||||||
|
|
||||||
|
return worker;
|
||||||
|
}
|
||||||
|
|
||||||
|
setupErrorHandlers() {
|
||||||
|
this.queue.on('error', (error) => {
|
||||||
|
if (error.code === CONNECTION_REFUSED) {
|
||||||
|
logger.error(
|
||||||
|
'Make sure you have installed Redis and it is running.',
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.error(`Queue error in "${this.name}":`, error);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
setupGracefulShutdown() {
|
||||||
|
const shutdown = async () => {
|
||||||
|
logger.log(`Shutting down queue "${this.name}"...`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Close all workers gracefully
|
||||||
|
for (const worker of this.workers) {
|
||||||
|
await worker.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.queue.close();
|
||||||
|
|
||||||
|
logger.log(`Queue "${this.name}" shut down successfully.`);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Error during shutdown of queue "${this.name}":`, error);
|
||||||
|
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
process.on('SIGTERM', shutdown);
|
||||||
|
process.on('SIGINT', shutdown);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default BullMQQueue;
|
@@ -1,31 +1,5 @@
|
|||||||
import process from 'process';
|
import BullMQQueue from './bullmq.js';
|
||||||
import { Queue } from 'bullmq';
|
|
||||||
import redisConfig from '../config/redis.js';
|
|
||||||
import logger from '../helpers/logger.js';
|
|
||||||
|
|
||||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
const flowQueue = new BullMQQueue('flow');
|
||||||
|
|
||||||
const redisConnection = {
|
|
||||||
connection: redisConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
const flowQueue = new Queue('flow', redisConnection);
|
|
||||||
|
|
||||||
process.on('SIGTERM', async () => {
|
|
||||||
await flowQueue.close();
|
|
||||||
});
|
|
||||||
|
|
||||||
flowQueue.on('error', (error) => {
|
|
||||||
if (error.code === CONNECTION_REFUSED) {
|
|
||||||
logger.error(
|
|
||||||
'Make sure you have installed Redis and it is running.',
|
|
||||||
error
|
|
||||||
);
|
|
||||||
|
|
||||||
process.exit();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.error('Error happened in flow queue!', error);
|
|
||||||
});
|
|
||||||
|
|
||||||
export default flowQueue;
|
export default flowQueue;
|
||||||
|
@@ -79,7 +79,7 @@ worker.on('failed', async (job, err) => {
|
|||||||
const flow = await Flow.query().findById(job.data.flowId);
|
const flow = await Flow.query().findById(job.data.flowId);
|
||||||
|
|
||||||
if (!flow) {
|
if (!flow) {
|
||||||
await flowQueue.removeRepeatableByKey(job.repeatJobKey);
|
await flowQueue.removeRepeatableJobByKey(job.repeatJobKey);
|
||||||
|
|
||||||
const flowNotFoundErrorMessage = `
|
const flowNotFoundErrorMessage = `
|
||||||
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!
|
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!
|
||||||
|
Reference in New Issue
Block a user