Compare commits
1 Commits
AUT-1376-n
...
aut-1374
Author | SHA1 | Date | |
---|---|---|---|
![]() |
e7df19ae17 |
@@ -13,7 +13,7 @@ if (appConfig.redisSentinelHost) {
|
||||
{
|
||||
host: appConfig.redisSentinelHost,
|
||||
port: appConfig.redisSentinelPort,
|
||||
}
|
||||
},
|
||||
];
|
||||
|
||||
redisConfig.sentinelUsername = appConfig.redisSentinelUsername;
|
||||
|
@@ -12,7 +12,7 @@ import appConfig from '../config/app.js';
|
||||
const serverAdapter = new ExpressAdapter();
|
||||
|
||||
const queues = [
|
||||
new BullMQAdapter(flowQueue),
|
||||
new BullMQAdapter(flowQueue.queue),
|
||||
new BullMQAdapter(triggerQueue),
|
||||
new BullMQAdapter(actionQueue),
|
||||
new BullMQAdapter(emailQueue),
|
||||
|
@@ -386,10 +386,7 @@ class Flow extends Base {
|
||||
}
|
||||
);
|
||||
} else {
|
||||
const repeatableJobs = await flowQueue.getRepeatableJobs();
|
||||
const job = repeatableJobs.find((job) => job.id === this.id);
|
||||
|
||||
await flowQueue.removeRepeatableByKey(job.key);
|
||||
await flowQueue.removeRepeatableJobById(this.id);
|
||||
}
|
||||
}
|
||||
|
||||
|
@@ -305,14 +305,8 @@ class User extends Base {
|
||||
active: true,
|
||||
});
|
||||
|
||||
const repeatableJobs = await flowQueue.getRepeatableJobs();
|
||||
|
||||
for (const flow of flows) {
|
||||
const job = repeatableJobs.find((job) => job.id === flow.id);
|
||||
|
||||
if (job) {
|
||||
await flowQueue.removeRepeatableByKey(job.key);
|
||||
}
|
||||
await flowQueue.removeRepeatableJobById(flow.id);
|
||||
}
|
||||
|
||||
const executionIds = (
|
||||
|
30
packages/backend/src/queues/base.js
Normal file
30
packages/backend/src/queues/base.js
Normal file
@@ -0,0 +1,30 @@
|
||||
/* eslint-disable no-unused-vars */
|
||||
|
||||
class BaseQueue {
|
||||
constructor(name) {
|
||||
if (new.target === BaseQueue) {
|
||||
throw new Error('Cannot instantiate abstract class BaseQueue directly.');
|
||||
}
|
||||
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
// Abstract methods to be implemented by subclasses
|
||||
async add(jobName, data, options) {
|
||||
throw new Error('Method "add" must be implemented.');
|
||||
}
|
||||
|
||||
async remove(jobId) {
|
||||
throw new Error('Method "remove" must be implemented.');
|
||||
}
|
||||
|
||||
async getRepeatableJobs() {
|
||||
throw new Error('Method "getRepeatableJobs" must be implemented.');
|
||||
}
|
||||
|
||||
async removeRepeatableJobByKey(jobKey) {
|
||||
throw new Error('Method "removeRepeatableJobByKey" must be implemented.');
|
||||
}
|
||||
}
|
||||
|
||||
export default BaseQueue;
|
154
packages/backend/src/queues/bullmq.js
Normal file
154
packages/backend/src/queues/bullmq.js
Normal file
@@ -0,0 +1,154 @@
|
||||
import { Queue, Worker } from 'bullmq';
|
||||
import redisConfig from '../config/redis.js';
|
||||
import logger from '../helpers/logger.js';
|
||||
import BaseQueue from './base.js';
|
||||
|
||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||
|
||||
class BullMQQueue extends BaseQueue {
|
||||
static queueOptions = {
|
||||
connection: redisConfig,
|
||||
};
|
||||
|
||||
constructor(name) {
|
||||
super(name);
|
||||
|
||||
this.queue = new Queue(name, this.constructor.queueOptions);
|
||||
this.workers = [];
|
||||
|
||||
this.setupErrorHandlers();
|
||||
this.setupGracefulShutdown();
|
||||
}
|
||||
|
||||
async add(jobName, data, options = {}) {
|
||||
try {
|
||||
const job = await this.queue.add(jobName, data, options);
|
||||
|
||||
return job;
|
||||
} catch (error) {
|
||||
logger.error(`Failed to add job to queue "${this.name}":`, error);
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async remove(jobId) {
|
||||
try {
|
||||
const job = await this.getJob(jobId);
|
||||
|
||||
if (job) {
|
||||
await job.remove();
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
} catch (error) {
|
||||
logger.error(`Failed to remove job from queue "${this.name}":`, error);
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async getJob(jobId) {
|
||||
return await this.queue.getJob(jobId);
|
||||
}
|
||||
|
||||
async getRepeatableJobById(jobId) {
|
||||
const repeatableJobs = await this.getRepeatableJobs();
|
||||
const job = repeatableJobs.find((job) => job.id === jobId);
|
||||
|
||||
return job;
|
||||
}
|
||||
|
||||
async getRepeatableJobs() {
|
||||
try {
|
||||
return await this.queue.getRepeatableJobs();
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to get repeatable jobs from queue "${this.name}":`,
|
||||
error
|
||||
);
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async removeRepeatableJobByKey(jobKey) {
|
||||
try {
|
||||
await this.queue.removeRepeatableByKey(jobKey);
|
||||
|
||||
return true;
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to remove repeatable job from queue "${this.name}":`,
|
||||
error
|
||||
);
|
||||
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
async removeRepeatableJobById(jobId) {
|
||||
const job = await this.getRepeatableJobById(jobId);
|
||||
|
||||
return await this.removeRepeatableJobByKey(job.key);
|
||||
}
|
||||
|
||||
startWorker(processor, workerOptions = {}) {
|
||||
const worker = new Worker(this.name, processor, {
|
||||
...this.queueOptions,
|
||||
...workerOptions,
|
||||
});
|
||||
|
||||
worker.on('error', (error) => {
|
||||
logger.error(`Worker error in queue "${this.name}":`, error);
|
||||
});
|
||||
|
||||
this.workers.push(worker);
|
||||
|
||||
return worker;
|
||||
}
|
||||
|
||||
setupErrorHandlers() {
|
||||
this.queue.on('error', (error) => {
|
||||
if (error.code === CONNECTION_REFUSED) {
|
||||
logger.error(
|
||||
'Make sure you have installed Redis and it is running.',
|
||||
error
|
||||
);
|
||||
|
||||
process.exit();
|
||||
}
|
||||
|
||||
logger.error(`Queue error in "${this.name}":`, error);
|
||||
});
|
||||
}
|
||||
|
||||
setupGracefulShutdown() {
|
||||
const shutdown = async () => {
|
||||
logger.log(`Shutting down queue "${this.name}"...`);
|
||||
|
||||
try {
|
||||
// Close all workers gracefully
|
||||
for (const worker of this.workers) {
|
||||
await worker.close();
|
||||
}
|
||||
|
||||
await this.queue.close();
|
||||
|
||||
logger.log(`Queue "${this.name}" shut down successfully.`);
|
||||
|
||||
process.exit();
|
||||
} catch (error) {
|
||||
logger.error(`Error during shutdown of queue "${this.name}":`, error);
|
||||
|
||||
process.exit(1);
|
||||
}
|
||||
};
|
||||
|
||||
process.on('SIGTERM', shutdown);
|
||||
process.on('SIGINT', shutdown);
|
||||
}
|
||||
}
|
||||
|
||||
export default BullMQQueue;
|
@@ -1,31 +1,5 @@
|
||||
import process from 'process';
|
||||
import { Queue } from 'bullmq';
|
||||
import redisConfig from '../config/redis.js';
|
||||
import logger from '../helpers/logger.js';
|
||||
import BullMQQueue from './bullmq.js';
|
||||
|
||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||
|
||||
const redisConnection = {
|
||||
connection: redisConfig,
|
||||
};
|
||||
|
||||
const flowQueue = new Queue('flow', redisConnection);
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
await flowQueue.close();
|
||||
});
|
||||
|
||||
flowQueue.on('error', (error) => {
|
||||
if (error.code === CONNECTION_REFUSED) {
|
||||
logger.error(
|
||||
'Make sure you have installed Redis and it is running.',
|
||||
error
|
||||
);
|
||||
|
||||
process.exit();
|
||||
}
|
||||
|
||||
logger.error('Error happened in flow queue!', error);
|
||||
});
|
||||
const flowQueue = new BullMQQueue('flow');
|
||||
|
||||
export default flowQueue;
|
||||
|
@@ -79,7 +79,7 @@ worker.on('failed', async (job, err) => {
|
||||
const flow = await Flow.query().findById(job.data.flowId);
|
||||
|
||||
if (!flow) {
|
||||
await flowQueue.removeRepeatableByKey(job.repeatJobKey);
|
||||
await flowQueue.removeRepeatableJobByKey(job.repeatJobKey);
|
||||
|
||||
const flowNotFoundErrorMessage = `
|
||||
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!
|
||||
|
Reference in New Issue
Block a user