refactor: Abstract queue generation and configuration
This commit is contained in:
@@ -1,27 +1,4 @@
|
|||||||
import process from 'process';
|
import { generateQueue } from './queue.js';
|
||||||
import { Queue } from 'bullmq';
|
|
||||||
import redisConfig from '../config/redis.js';
|
|
||||||
import logger from '../helpers/logger.js';
|
|
||||||
|
|
||||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
|
||||||
|
|
||||||
const redisConnection = {
|
|
||||||
connection: redisConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
const actionQueue = new Queue('action', redisConnection);
|
|
||||||
|
|
||||||
actionQueue.on('error', (error) => {
|
|
||||||
if (error.code === CONNECTION_REFUSED) {
|
|
||||||
logger.error(
|
|
||||||
'Make sure you have installed Redis and it is running.',
|
|
||||||
error
|
|
||||||
);
|
|
||||||
|
|
||||||
process.exit();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.error('Error happened in action queue!', error);
|
|
||||||
});
|
|
||||||
|
|
||||||
|
const actionQueue = generateQueue('action');
|
||||||
export default actionQueue;
|
export default actionQueue;
|
||||||
|
@@ -1,27 +1,4 @@
|
|||||||
import process from 'process';
|
import { generateQueue } from './queue.js';
|
||||||
import { Queue } from 'bullmq';
|
|
||||||
import redisConfig from '../config/redis.js';
|
|
||||||
import logger from '../helpers/logger.js';
|
|
||||||
|
|
||||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
|
||||||
|
|
||||||
const redisConnection = {
|
|
||||||
connection: redisConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
const deleteUserQueue = new Queue('delete-user', redisConnection);
|
|
||||||
|
|
||||||
deleteUserQueue.on('error', (error) => {
|
|
||||||
if (error.code === CONNECTION_REFUSED) {
|
|
||||||
logger.error(
|
|
||||||
'Make sure you have installed Redis and it is running.',
|
|
||||||
error
|
|
||||||
);
|
|
||||||
|
|
||||||
process.exit();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.error('Error happened in delete user queue!', error);
|
|
||||||
});
|
|
||||||
|
|
||||||
|
const deleteUserQueue = generateQueue('delete-user');
|
||||||
export default deleteUserQueue;
|
export default deleteUserQueue;
|
||||||
|
@@ -1,27 +1,4 @@
|
|||||||
import process from 'process';
|
import { generateQueue } from './queue.js';
|
||||||
import { Queue } from 'bullmq';
|
|
||||||
import redisConfig from '../config/redis.js';
|
|
||||||
import logger from '../helpers/logger.js';
|
|
||||||
|
|
||||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
|
||||||
|
|
||||||
const redisConnection = {
|
|
||||||
connection: redisConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
const emailQueue = new Queue('email', redisConnection);
|
|
||||||
|
|
||||||
emailQueue.on('error', (error) => {
|
|
||||||
if (error.code === CONNECTION_REFUSED) {
|
|
||||||
logger.error(
|
|
||||||
'Make sure you have installed Redis and it is running.',
|
|
||||||
error
|
|
||||||
);
|
|
||||||
|
|
||||||
process.exit();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.error('Error happened in email queue!', error);
|
|
||||||
});
|
|
||||||
|
|
||||||
|
const emailQueue = generateQueue('email');
|
||||||
export default emailQueue;
|
export default emailQueue;
|
||||||
|
@@ -1,27 +1,4 @@
|
|||||||
import process from 'process';
|
import { generateQueue } from './queue.js';
|
||||||
import { Queue } from 'bullmq';
|
|
||||||
import redisConfig from '../config/redis.js';
|
|
||||||
import logger from '../helpers/logger.js';
|
|
||||||
|
|
||||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
|
||||||
|
|
||||||
const redisConnection = {
|
|
||||||
connection: redisConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
const flowQueue = new Queue('flow', redisConnection);
|
|
||||||
|
|
||||||
flowQueue.on('error', (error) => {
|
|
||||||
if (error.code === CONNECTION_REFUSED) {
|
|
||||||
logger.error(
|
|
||||||
'Make sure you have installed Redis and it is running.',
|
|
||||||
error
|
|
||||||
);
|
|
||||||
|
|
||||||
process.exit();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.error('Error happened in flow queue!', error);
|
|
||||||
});
|
|
||||||
|
|
||||||
|
const flowQueue = generateQueue('flow');
|
||||||
export default flowQueue;
|
export default flowQueue;
|
||||||
|
44
packages/backend/src/queues/queue.js
Normal file
44
packages/backend/src/queues/queue.js
Normal file
@@ -0,0 +1,44 @@
|
|||||||
|
import process from 'process';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import redisConfig from '../config/redis.js';
|
||||||
|
import logger from '../helpers/logger.js';
|
||||||
|
|
||||||
|
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||||
|
|
||||||
|
const redisConnection = {
|
||||||
|
connection: redisConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
export const generateQueue = (queueName, options) => {
|
||||||
|
const queue = new Queue(queueName, redisConnection);
|
||||||
|
|
||||||
|
queue.on('error', (error) => queueOnError(error, queueName));
|
||||||
|
|
||||||
|
if (options?.runDaily) addScheduler(queueName, queue);
|
||||||
|
|
||||||
|
return queue;
|
||||||
|
};
|
||||||
|
|
||||||
|
const queueOnError = (error, queueName) => {
|
||||||
|
if (error.code === CONNECTION_REFUSED) {
|
||||||
|
const errorMessage =
|
||||||
|
'Make sure you have installed Redis and it is running.';
|
||||||
|
|
||||||
|
logger.error(errorMessage, error);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.error(`Error happened in ${queueName} queue!`, error);
|
||||||
|
};
|
||||||
|
|
||||||
|
const addScheduler = (queueName, queue) => {
|
||||||
|
const everydayAtOneOclock = '0 1 * * *';
|
||||||
|
|
||||||
|
queue.add(queueName, null, {
|
||||||
|
jobId: queueName,
|
||||||
|
repeat: {
|
||||||
|
pattern: everydayAtOneOclock,
|
||||||
|
},
|
||||||
|
});
|
||||||
|
};
|
@@ -1,40 +1,8 @@
|
|||||||
import process from 'process';
|
import { generateQueue } from './queue.js';
|
||||||
import { Queue } from 'bullmq';
|
|
||||||
import redisConfig from '../config/redis.js';
|
|
||||||
import logger from '../helpers/logger.js';
|
|
||||||
|
|
||||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
const removeCancelledSubscriptionsQueue = generateQueue(
|
||||||
|
|
||||||
const redisConnection = {
|
|
||||||
connection: redisConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
const removeCancelledSubscriptionsQueue = new Queue(
|
|
||||||
'remove-cancelled-subscriptions',
|
'remove-cancelled-subscriptions',
|
||||||
redisConnection
|
{ runDaily: true }
|
||||||
);
|
);
|
||||||
|
|
||||||
removeCancelledSubscriptionsQueue.on('error', (error) => {
|
|
||||||
if (error.code === CONNECTION_REFUSED) {
|
|
||||||
logger.error(
|
|
||||||
'Make sure you have installed Redis and it is running.',
|
|
||||||
error
|
|
||||||
);
|
|
||||||
|
|
||||||
process.exit();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.error(
|
|
||||||
'Error happened in remove cancelled subscriptions queue!',
|
|
||||||
error
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
removeCancelledSubscriptionsQueue.add('remove-cancelled-subscriptions', null, {
|
|
||||||
jobId: 'remove-cancelled-subscriptions',
|
|
||||||
repeat: {
|
|
||||||
pattern: '0 1 * * *',
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
export default removeCancelledSubscriptionsQueue;
|
export default removeCancelledSubscriptionsQueue;
|
||||||
|
@@ -1,27 +1,4 @@
|
|||||||
import process from 'process';
|
import { generateQueue } from './queue.js';
|
||||||
import { Queue } from 'bullmq';
|
|
||||||
import redisConfig from '../config/redis.js';
|
|
||||||
import logger from '../helpers/logger.js';
|
|
||||||
|
|
||||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
|
||||||
|
|
||||||
const redisConnection = {
|
|
||||||
connection: redisConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
const triggerQueue = new Queue('trigger', redisConnection);
|
|
||||||
|
|
||||||
triggerQueue.on('error', (error) => {
|
|
||||||
if (error.code === CONNECTION_REFUSED) {
|
|
||||||
logger.error(
|
|
||||||
'Make sure you have installed Redis and it is running.',
|
|
||||||
error
|
|
||||||
);
|
|
||||||
|
|
||||||
process.exit();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.error('Error happened in trigger queue!', error);
|
|
||||||
});
|
|
||||||
|
|
||||||
|
const triggerQueue = generateQueue('trigger');
|
||||||
export default triggerQueue;
|
export default triggerQueue;
|
||||||
|
Reference in New Issue
Block a user