import { Worker } from 'bullmq'; import redisConfig from '../config/redis'; import logger from '../helpers/logger'; import { IJSONObject, ITriggerItem } from '@automatisch/types'; import actionQueue from '../queues/action'; import Step from '../models/step'; import { processTrigger } from '../services/trigger'; import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../helpers/remove-job-configuration'; type JobData = { flowId: string; stepId: string; triggerItem?: ITriggerItem; error?: IJSONObject; }; export const worker = new Worker( 'trigger', async (job) => { const { flowId, executionId, stepId, executionStep } = await processTrigger( job.data as JobData ); if (executionStep.isFailed) return; const step = await Step.query().findById(stepId).throwIfNotFound(); const nextStep = await step.getNextStep(); const jobName = `${executionId}-${nextStep.id}`; const jobPayload = { flowId, executionId, stepId: nextStep.id, }; const jobOptions = { removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, } await actionQueue.add(jobName, jobPayload, jobOptions); }, { connection: redisConfig } ); worker.on('completed', (job) => { logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); }); worker.on('failed', (job, err) => { logger.info( `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` ); }); process.on('SIGTERM', async () => { await worker.close(); });