refactor: Workers and queues and eliminate redundant process listeners

This commit is contained in:
Faruk AYDIN
2024-12-05 03:21:50 +03:00
parent 6d15167ad9
commit 770b07179f
15 changed files with 78 additions and 76 deletions

View File

@@ -11,10 +11,6 @@ const redisConnection = {
const actionQueue = new Queue('action', redisConnection); const actionQueue = new Queue('action', redisConnection);
process.on('SIGTERM', async () => {
await actionQueue.close();
});
actionQueue.on('error', (error) => { actionQueue.on('error', (error) => {
if (error.code === CONNECTION_REFUSED) { if (error.code === CONNECTION_REFUSED) {
logger.error( logger.error(

View File

@@ -11,10 +11,6 @@ const redisConnection = {
const deleteUserQueue = new Queue('delete-user', redisConnection); const deleteUserQueue = new Queue('delete-user', redisConnection);
process.on('SIGTERM', async () => {
await deleteUserQueue.close();
});
deleteUserQueue.on('error', (error) => { deleteUserQueue.on('error', (error) => {
if (error.code === CONNECTION_REFUSED) { if (error.code === CONNECTION_REFUSED) {
logger.error( logger.error(

View File

@@ -11,10 +11,6 @@ const redisConnection = {
const emailQueue = new Queue('email', redisConnection); const emailQueue = new Queue('email', redisConnection);
process.on('SIGTERM', async () => {
await emailQueue.close();
});
emailQueue.on('error', (error) => { emailQueue.on('error', (error) => {
if (error.code === CONNECTION_REFUSED) { if (error.code === CONNECTION_REFUSED) {
logger.error( logger.error(

View File

@@ -11,10 +11,6 @@ const redisConnection = {
const flowQueue = new Queue('flow', redisConnection); const flowQueue = new Queue('flow', redisConnection);
process.on('SIGTERM', async () => {
await flowQueue.close();
});
flowQueue.on('error', (error) => { flowQueue.on('error', (error) => {
if (error.code === CONNECTION_REFUSED) { if (error.code === CONNECTION_REFUSED) {
logger.error( logger.error(

View File

@@ -0,0 +1,21 @@
import appConfig from '../config/app.js';
import actionQueue from './action.js';
import emailQueue from './email.js';
import flowQueue from './flow.js';
import triggerQueue from './trigger.js';
import deleteUserQueue from './delete-user.ee.js';
import removeCancelledSubscriptionsQueue from './remove-cancelled-subscriptions.ee.js';
const queues = [
actionQueue,
emailQueue,
flowQueue,
triggerQueue,
deleteUserQueue,
];
if (appConfig.isCloud) {
queues.push(removeCancelledSubscriptionsQueue);
}
export default queues;

View File

@@ -14,10 +14,6 @@ const removeCancelledSubscriptionsQueue = new Queue(
redisConnection redisConnection
); );
process.on('SIGTERM', async () => {
await removeCancelledSubscriptionsQueue.close();
});
removeCancelledSubscriptionsQueue.on('error', (error) => { removeCancelledSubscriptionsQueue.on('error', (error) => {
if (error.code === CONNECTION_REFUSED) { if (error.code === CONNECTION_REFUSED) {
logger.error( logger.error(

View File

@@ -11,10 +11,6 @@ const redisConnection = {
const triggerQueue = new Queue('trigger', redisConnection); const triggerQueue = new Queue('trigger', redisConnection);
process.on('SIGTERM', async () => {
await triggerQueue.close();
});
triggerQueue.on('error', (error) => { triggerQueue.on('error', (error) => {
if (error.code === CONNECTION_REFUSED) { if (error.code === CONNECTION_REFUSED) {
logger.error( logger.error(

View File

@@ -1,20 +1,22 @@
import * as Sentry from './helpers/sentry.ee.js'; import * as Sentry from './helpers/sentry.ee.js';
import appConfig from './config/app.js'; import process from 'node:process';
Sentry.init(); Sentry.init();
import './config/orm.js'; import './config/orm.js';
import './helpers/check-worker-readiness.js'; import './helpers/check-worker-readiness.js';
import './workers/flow.js'; import queues from './queues/index.js';
import './workers/trigger.js'; import workers from './workers/index.js';
import './workers/action.js';
import './workers/email.js';
import './workers/delete-user.ee.js';
if (appConfig.isCloud) { process.on('SIGTERM', async () => {
import('./workers/remove-cancelled-subscriptions.ee.js'); for (const queue of queues) {
import('./queues/remove-cancelled-subscriptions.ee.js'); await queue.close();
} }
for (const worker of workers) {
await worker.close();
}
});
import telemetry from './helpers/telemetry/index.js'; import telemetry from './helpers/telemetry/index.js';

View File

@@ -1,5 +1,4 @@
import { Worker } from 'bullmq'; import { Worker } from 'bullmq';
import process from 'node:process';
import * as Sentry from '../helpers/sentry.ee.js'; import * as Sentry from '../helpers/sentry.ee.js';
import redisConfig from '../config/redis.js'; import redisConfig from '../config/redis.js';
@@ -15,7 +14,7 @@ import delayAsMilliseconds from '../helpers/delay-as-milliseconds.js';
const DEFAULT_DELAY_DURATION = 0; const DEFAULT_DELAY_DURATION = 0;
export const worker = new Worker( const actionWorker = new Worker(
'action', 'action',
async (job) => { async (job) => {
const { stepId, flowId, executionId, computedParameters, executionStep } = const { stepId, flowId, executionId, computedParameters, executionStep } =
@@ -55,11 +54,11 @@ export const worker = new Worker(
{ connection: redisConfig } { connection: redisConfig }
); );
worker.on('completed', (job) => { actionWorker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
}); });
worker.on('failed', (job, err) => { actionWorker.on('failed', (job, err) => {
const errorMessage = ` const errorMessage = `
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
\n ${err.stack} \n ${err.stack}
@@ -74,6 +73,4 @@ worker.on('failed', (job, err) => {
}); });
}); });
process.on('SIGTERM', async () => { export default actionWorker;
await worker.close();
});

View File

@@ -1,5 +1,4 @@
import { Worker } from 'bullmq'; import { Worker } from 'bullmq';
import process from 'node:process';
import * as Sentry from '../helpers/sentry.ee.js'; import * as Sentry from '../helpers/sentry.ee.js';
import redisConfig from '../config/redis.js'; import redisConfig from '../config/redis.js';
@@ -8,7 +7,7 @@ import appConfig from '../config/app.js';
import User from '../models/user.js'; import User from '../models/user.js';
import ExecutionStep from '../models/execution-step.js'; import ExecutionStep from '../models/execution-step.js';
export const worker = new Worker( const deleteUserWorker = new Worker(
'delete-user', 'delete-user',
async (job) => { async (job) => {
const { id } = job.data; const { id } = job.data;
@@ -46,13 +45,13 @@ export const worker = new Worker(
{ connection: redisConfig } { connection: redisConfig }
); );
worker.on('completed', (job) => { deleteUserWorker.on('completed', (job) => {
logger.info( logger.info(
`JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has been deleted!` `JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has been deleted!`
); );
}); });
worker.on('failed', (job, err) => { deleteUserWorker.on('failed', (job, err) => {
const errorMessage = ` const errorMessage = `
JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has failed to be deleted! ${err.message} JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has failed to be deleted! ${err.message}
\n ${err.stack} \n ${err.stack}
@@ -67,6 +66,4 @@ worker.on('failed', (job, err) => {
}); });
}); });
process.on('SIGTERM', async () => { export default deleteUserWorker;
await worker.close();
});

View File

@@ -1,5 +1,4 @@
import { Worker } from 'bullmq'; import { Worker } from 'bullmq';
import process from 'node:process';
import * as Sentry from '../helpers/sentry.ee.js'; import * as Sentry from '../helpers/sentry.ee.js';
import redisConfig from '../config/redis.js'; import redisConfig from '../config/redis.js';
@@ -16,7 +15,7 @@ const isAutomatischEmail = (email) => {
return email.endsWith('@automatisch.io'); return email.endsWith('@automatisch.io');
}; };
export const worker = new Worker( const emailWorker = new Worker(
'email', 'email',
async (job) => { async (job) => {
const { email, subject, template, params } = job.data; const { email, subject, template, params } = job.data;
@@ -39,13 +38,13 @@ export const worker = new Worker(
{ connection: redisConfig } { connection: redisConfig }
); );
worker.on('completed', (job) => { emailWorker.on('completed', (job) => {
logger.info( logger.info(
`JOB ID: ${job.id} - ${job.data.subject} email sent to ${job.data.email}!` `JOB ID: ${job.id} - ${job.data.subject} email sent to ${job.data.email}!`
); );
}); });
worker.on('failed', (job, err) => { emailWorker.on('failed', (job, err) => {
const errorMessage = ` const errorMessage = `
JOB ID: ${job.id} - ${job.data.subject} email to ${job.data.email} has failed to send with ${err.message} JOB ID: ${job.id} - ${job.data.subject} email to ${job.data.email} has failed to send with ${err.message}
\n ${err.stack} \n ${err.stack}
@@ -60,6 +59,4 @@ worker.on('failed', (job, err) => {
}); });
}); });
process.on('SIGTERM', async () => { export default emailWorker;
await worker.close();
});

View File

@@ -1,5 +1,4 @@
import { Worker } from 'bullmq'; import { Worker } from 'bullmq';
import process from 'node:process';
import * as Sentry from '../helpers/sentry.ee.js'; import * as Sentry from '../helpers/sentry.ee.js';
import redisConfig from '../config/redis.js'; import redisConfig from '../config/redis.js';
@@ -13,7 +12,7 @@ import {
REMOVE_AFTER_7_DAYS_OR_50_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS,
} from '../helpers/remove-job-configuration.js'; } from '../helpers/remove-job-configuration.js';
export const worker = new Worker( const flowWorker = new Worker(
'flow', 'flow',
async (job) => { async (job) => {
const { flowId } = job.data; const { flowId } = job.data;
@@ -64,11 +63,11 @@ export const worker = new Worker(
{ connection: redisConfig } { connection: redisConfig }
); );
worker.on('completed', (job) => { flowWorker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
}); });
worker.on('failed', async (job, err) => { flowWorker.on('failed', async (job, err) => {
const errorMessage = ` const errorMessage = `
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
\n ${err.stack} \n ${err.stack}
@@ -95,6 +94,4 @@ worker.on('failed', async (job, err) => {
}); });
}); });
process.on('SIGTERM', async () => { export default flowWorker;
await worker.close();
});

View File

@@ -0,0 +1,21 @@
import appConfig from '../config/app.js';
import actionWorker from './action.js';
import emailWorker from './email.js';
import flowWorker from './flow.js';
import triggerWorker from './trigger.js';
import deleteUserWorker from './delete-user.ee.js';
import removeCancelledSubscriptionsWorker from './remove-cancelled-subscriptions.ee.js';
const workers = [
actionWorker,
emailWorker,
flowWorker,
triggerWorker,
deleteUserWorker,
];
if (appConfig.isCloud) {
workers.push(removeCancelledSubscriptionsWorker);
}
export default workers;

View File

@@ -1,12 +1,11 @@
import { Worker } from 'bullmq'; import { Worker } from 'bullmq';
import process from 'node:process';
import { DateTime } from 'luxon'; import { DateTime } from 'luxon';
import * as Sentry from '../helpers/sentry.ee.js'; import * as Sentry from '../helpers/sentry.ee.js';
import redisConfig from '../config/redis.js'; import redisConfig from '../config/redis.js';
import logger from '../helpers/logger.js'; import logger from '../helpers/logger.js';
import Subscription from '../models/subscription.ee.js'; import Subscription from '../models/subscription.ee.js';
export const worker = new Worker( const removeCancelledSubscriptionsWorker = new Worker(
'remove-cancelled-subscriptions', 'remove-cancelled-subscriptions',
async () => { async () => {
await Subscription.query() await Subscription.query()
@@ -23,13 +22,13 @@ export const worker = new Worker(
{ connection: redisConfig } { connection: redisConfig }
); );
worker.on('completed', (job) => { removeCancelledSubscriptionsWorker.on('completed', (job) => {
logger.info( logger.info(
`JOB ID: ${job.id} - The cancelled subscriptions have been removed!` `JOB ID: ${job.id} - The cancelled subscriptions have been removed!`
); );
}); });
worker.on('failed', (job, err) => { removeCancelledSubscriptionsWorker.on('failed', (job, err) => {
const errorMessage = ` const errorMessage = `
JOB ID: ${job.id} - ERROR: The cancelled subscriptions can not be removed! ${err.message} JOB ID: ${job.id} - ERROR: The cancelled subscriptions can not be removed! ${err.message}
\n ${err.stack} \n ${err.stack}
@@ -42,6 +41,4 @@ worker.on('failed', (job, err) => {
}); });
}); });
process.on('SIGTERM', async () => { export default removeCancelledSubscriptionsWorker;
await worker.close();
});

View File

@@ -1,5 +1,4 @@
import { Worker } from 'bullmq'; import { Worker } from 'bullmq';
import process from 'node:process';
import * as Sentry from '../helpers/sentry.ee.js'; import * as Sentry from '../helpers/sentry.ee.js';
import redisConfig from '../config/redis.js'; import redisConfig from '../config/redis.js';
@@ -12,7 +11,7 @@ import {
REMOVE_AFTER_7_DAYS_OR_50_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS,
} from '../helpers/remove-job-configuration.js'; } from '../helpers/remove-job-configuration.js';
export const worker = new Worker( const triggerWorker = new Worker(
'trigger', 'trigger',
async (job) => { async (job) => {
const { flowId, executionId, stepId, executionStep } = await processTrigger( const { flowId, executionId, stepId, executionStep } = await processTrigger(
@@ -41,11 +40,11 @@ export const worker = new Worker(
{ connection: redisConfig } { connection: redisConfig }
); );
worker.on('completed', (job) => { triggerWorker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
}); });
worker.on('failed', (job, err) => { triggerWorker.on('failed', (job, err) => {
const errorMessage = ` const errorMessage = `
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
\n ${err.stack} \n ${err.stack}
@@ -60,6 +59,4 @@ worker.on('failed', (job, err) => {
}); });
}); });
process.on('SIGTERM', async () => { export default triggerWorker;
await worker.close();
});