refactor: Restructure workers to work with services

This commit is contained in:
Faruk AYDIN
2022-10-14 22:33:26 +02:00
parent 628f872180
commit 237ab48d33
10 changed files with 81 additions and 114 deletions

View File

@@ -1,25 +1,43 @@
import { Worker } from 'bullmq';
import redisConfig from '../config/redis';
import Flow from '../models/flow';
import logger from '../helpers/logger';
import globalVariable from '../helpers/global-variable';
import triggerQueue from '../queues/trigger';
import { processFlow } from '../services/flow';
import Flow from '../models/flow';
export const worker = new Worker(
'flow',
async (job) => {
const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound();
const { flowId } = job.data;
const flow = await Flow.query().findById(flowId).throwIfNotFound();
const triggerStep = await flow.getTriggerStep();
const triggerCommand = await triggerStep.getTriggerCommand();
const $ = await globalVariable({
flow,
connection: await triggerStep.$relatedQuery('connection'),
app: await triggerStep.getApp(),
step: triggerStep,
});
const { data, error } = await processFlow({ flowId });
await triggerCommand.run($);
for (const triggerDataItem of data) {
const jobName = `${triggerStep.id}-${triggerDataItem.meta.internalId}`;
const jobPayload = {
flowId,
stepId: triggerStep.id,
triggerDataItem,
};
await triggerQueue.add(jobName, jobPayload);
}
if (error) {
const jobName = `${triggerStep.id}-error`;
const jobPayload = {
flowId,
stepId: triggerStep.id,
error,
};
await triggerQueue.add(jobName, jobPayload);
}
},
{ connection: redisConfig }
);