refactor: Extract processor job into separate background jobs

This commit is contained in:
Faruk AYDIN
2022-10-13 18:45:01 +02:00
parent 3c3bb82e97
commit 56a9aeece7
17 changed files with 374 additions and 213 deletions

View File

@@ -33,7 +33,6 @@ export default {
async run($: IGlobalVariable) {
return await searchTweets($, {
searchTerm: $.step.parameters.searchTerm as string,
lastInternalId: $.flow.lastInternalId,
});
},

View File

@@ -68,7 +68,9 @@ const searchTweets = async (
return (tweet.raw.id as number) - (nextTweet.raw.id as number);
});
return tweets;
for (const tweet of tweets.data) {
await $.process(tweet);
}
};
export default searchTweets;

View File

@@ -1,7 +1,6 @@
import Context from '../../types/express/context';
import Processor from '../../services/processor';
// eslint-disable-next-line @typescript-eslint/no-unused-vars
import processorQueue from '../../queues/processor';
import flowQueue from '../../queues/flow';
type Params = {
input: {
@@ -14,30 +13,25 @@ const executeFlow = async (
params: Params,
context: Context
) => {
const untilStep = await context.currentUser
.$relatedQuery('steps')
.withGraphFetched('connection')
.findOne({
'steps.id': params.input.stepId,
})
.throwIfNotFound();
const flow = await untilStep.$relatedQuery('flow');
const executionStep = await new Processor(flow, {
untilStep,
testRun: true,
}).run();
await untilStep.$query().patch({
status: 'completed',
});
if (executionStep.errorDetails) {
throw new Error(JSON.stringify(executionStep.errorDetails));
}
return { data: executionStep.dataOut, step: untilStep };
// const untilStep = await context.currentUser
// .$relatedQuery('steps')
// .withGraphFetched('connection')
// .findOne({
// 'steps.id': params.input.stepId,
// })
// .throwIfNotFound();
// const flow = await untilStep.$relatedQuery('flow');
// const executionStep = await new Processor(flow, {
// untilStep,
// testRun: true,
// }).run();
// await untilStep.$query().patch({
// status: 'completed',
// });
// if (executionStep.errorDetails) {
// throw new Error(JSON.stringify(executionStep.errorDetails));
// }
// return { data: executionStep.dataOut, step: untilStep };
};
export default executeFlow;

View File

@@ -1,5 +1,5 @@
import Context from '../../types/express/context';
import processorQueue from '../../queues/processor';
import flowQueue from '../../queues/flow';
type Params = {
input: {
@@ -9,7 +9,7 @@ type Params = {
};
const JOB_NAME = 'processorJob';
const EVERY_15_MINUTES_CRON = '*/15 * * * *';
const EVERY_15_MINUTES_CRON = '*/1 * * * *';
const updateFlowStatus = async (
_parent: unknown,
@@ -32,7 +32,7 @@ const updateFlowStatus = async (
});
const triggerStep = await flow.getTriggerStep();
const trigger = await triggerStep.getTrigger();
const trigger = await triggerStep.getTriggerCommand();
const interval = trigger.getInterval?.(triggerStep.parameters);
const repeatOptions = {
cron: interval || EVERY_15_MINUTES_CRON,
@@ -43,7 +43,7 @@ const updateFlowStatus = async (
published_at: new Date().toISOString(),
});
await processorQueue.add(
await flowQueue.add(
JOB_NAME,
{ flowId: flow.id },
{
@@ -52,10 +52,10 @@ const updateFlowStatus = async (
}
);
} else {
const repeatableJobs = await processorQueue.getRepeatableJobs();
const repeatableJobs = await flowQueue.getRepeatableJobs();
const job = repeatableJobs.find((job) => job.id === flow.id);
await processorQueue.removeRepeatableByKey(job.key);
await flowQueue.removeRepeatableByKey(job.key);
}
return flow;

View File

@@ -1,13 +1,19 @@
import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import processorQueue from '../queues/processor';
import flowQueue from '../queues/flow';
import triggerQueue from '../queues/trigger';
import actionQueue from '../queues/action';
const serverAdapter = new ExpressAdapter();
const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => {
createBullBoard({
queues: [new BullMQAdapter(processorQueue)],
queues: [
new BullMQAdapter(flowQueue),
new BullMQAdapter(triggerQueue),
new BullMQAdapter(actionQueue),
],
serverAdapter: serverAdapter,
});
};

View File

@@ -2,23 +2,37 @@ import createHttpClient from './http-client';
import Connection from '../models/connection';
import Flow from '../models/flow';
import Step from '../models/step';
import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types';
import Execution from '../models/execution';
import {
IJSONObject,
IApp,
IGlobalVariable,
ITriggerDataItem,
} from '@automatisch/types';
import triggerQueue from '../queues/trigger';
type GlobalVariableOptions = {
connection?: Connection;
app: IApp;
flow?: Flow;
step?: Step;
execution?: Execution;
};
const globalVariable = async (
options: GlobalVariableOptions
): Promise<IGlobalVariable> => {
const { connection, app, flow, step } = options;
const { connection, app, flow, step, execution } = options;
const lastInternalId = await flow?.lastInternalId();
return {
const trigger = await step?.getTriggerCommand();
const nextStep = await flow
?.$relatedQuery('steps')
.where({ position: step.position + 1 })
.first();
const variable: IGlobalVariable = {
auth: {
set: async (args: IJSONObject) => {
if (connection) {
@@ -37,12 +51,45 @@ const globalVariable = async (
app: app,
http: createHttpClient({ baseURL: app.baseUrl }),
flow: {
id: flow?.id,
lastInternalId,
},
step: {
id: step?.id,
appKey: step?.appKey,
parameters: step?.parameters || {},
},
nextStep: {
id: nextStep?.id,
appKey: nextStep?.appKey,
parameters: nextStep?.parameters || {},
},
execution: {
id: execution?.id,
},
};
variable.process = async (triggerDataItem: ITriggerDataItem) => {
const jobName = `${step.appKey}-${triggerDataItem.meta.internalId}`;
const jobPayload = {
$: variable,
triggerDataItem,
};
await triggerQueue.add(jobName, jobPayload);
};
if (trigger && trigger.dedupeStrategy === 'unique') {
const lastInternalIds = await flow?.lastInternalIds();
const isAlreadyProcessed = (internalId: string) => {
return lastInternalIds?.includes(internalId);
};
variable.flow.isAlreadyProcessed = isAlreadyProcessed;
}
return variable;
};
export default globalVariable;

View File

@@ -12,6 +12,7 @@ class Flow extends Base {
active: boolean;
steps: Step[];
published_at: string;
executions?: Execution[];
static tableName = 'flows';
@@ -57,6 +58,15 @@ class Flow extends Base {
return lastExecution ? (lastExecution as Execution).internalId : null;
}
async lastInternalIds(itemCount = 50) {
const lastExecutions = await this.$relatedQuery('executions')
.select('internal_id')
.orderBy('created_at', 'desc')
.limit(itemCount);
return lastExecutions.map((execution) => execution.internalId);
}
async $beforeUpdate(
opt: ModelOptions,
queryContext: QueryContext

View File

@@ -92,16 +92,35 @@ class Step extends Base {
return this.type === 'trigger';
}
async getTrigger() {
if (!this.isTrigger) return null;
get isAction(): boolean {
return this.type === 'action';
}
const { appKey, key } = this;
async getApp() {
if (!this.appKey) return null;
return await App.findOneByKey(this.appKey);
}
async getTriggerCommand() {
const { appKey, key, isTrigger } = this;
if (!isTrigger || !appKey || !key) return null;
const app = await App.findOneByKey(appKey);
const command = app.triggers.find((trigger) => trigger.key === key);
return command;
}
async getActionCommand() {
const { appKey, key, isAction } = this;
if (!isAction || !appKey || !key) return null;
const app = await App.findOneByKey(appKey);
const command = app.actions.find((action) => action.key === key);
return command;
}
}
export default Step;

View File

@@ -0,0 +1,25 @@
import process from 'process';
import { Queue } from 'bullmq';
import redisConfig from '../config/redis';
import logger from '../helpers/logger';
const CONNECTION_REFUSED = 'ECONNREFUSED';
const redisConnection = {
connection: redisConfig,
};
const actionQueue = new Queue('action', redisConnection);
process.on('SIGTERM', async () => {
await actionQueue.close();
});
actionQueue.on('error', (err) => {
if ((err as any).code === CONNECTION_REFUSED) {
logger.error('Make sure you have installed Redis and it is running.', err);
process.exit();
}
});
export default actionQueue;

View File

@@ -9,18 +9,18 @@ const redisConnection = {
connection: redisConfig,
};
const processorQueue = new Queue('processor', redisConnection);
const queueScheduler = new QueueScheduler('processor', redisConnection);
const flowQueue = new Queue('flow', redisConnection);
const queueScheduler = new QueueScheduler('flow', redisConnection);
process.on('SIGTERM', async () => {
await queueScheduler.close();
});
processorQueue.on('error', (err) => {
flowQueue.on('error', (err) => {
if ((err as any).code === CONNECTION_REFUSED) {
logger.error('Make sure you have installed Redis and it is running.', err);
process.exit();
}
});
export default processorQueue;
export default flowQueue;

View File

@@ -0,0 +1,25 @@
import process from 'process';
import { Queue } from 'bullmq';
import redisConfig from '../config/redis';
import logger from '../helpers/logger';
const CONNECTION_REFUSED = 'ECONNREFUSED';
const redisConnection = {
connection: redisConfig,
};
const triggerQueue = new Queue('trigger', redisConnection);
process.on('SIGTERM', async () => {
await triggerQueue.close();
});
triggerQueue.on('error', (err) => {
if ((err as any).code === CONNECTION_REFUSED) {
logger.error('Make sure you have installed Redis and it is running.', err);
process.exit();
}
});
export default triggerQueue;

View File

@@ -28,165 +28,9 @@ class Processor {
this.testRun = processorOptions.testRun;
}
async run() {
const steps = await this.flow
.$relatedQuery('steps')
.withGraphFetched('connection')
.orderBy('position', 'asc');
const triggerStep = steps.find((step) => step.type === 'trigger');
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const initialTriggerData = await this.getInitialTriggerData(triggerStep!);
if (!initialTriggerData.error && initialTriggerData.data.length === 0) {
const lastInternalId = await this.flow.lastInternalId();
const executionData: Partial<Execution> = {
flowId: this.flow.id,
testRun: this.testRun,
};
if (lastInternalId) {
executionData.internalId = lastInternalId;
}
await Execution.query().insert(executionData);
return;
}
if (this.testRun && initialTriggerData.data.length > 0) {
initialTriggerData.data = [initialTriggerData.data[0]];
}
const executions: Execution[] = [];
for await (const data of initialTriggerData.data) {
const execution = await Execution.query().insert({
flowId: this.flow.id,
testRun: this.testRun,
internalId: data.meta.internalId as string,
});
executions.push(execution);
let previousExecutionStep: ExecutionStep;
const priorExecutionSteps: ExecutionSteps = {};
let fetchedActionData: IActionOutput = {
data: null,
};
for await (const step of steps) {
if (!step.appKey) continue;
const { appKey, key, type, parameters: rawParameters = {}, id } = step;
const isTrigger = type === 'trigger';
const app = await App.findOneByKey(appKey);
const computedParameters = Processor.computeParameters(
rawParameters,
priorExecutionSteps
);
const clonedStep = Object.assign({}, step);
clonedStep.parameters = computedParameters;
const $ = await globalVariable({
connection: step.connection,
app,
flow: this.flow,
step: clonedStep,
});
if (!isTrigger && key) {
const command = app.actions.find((action) => action.key === key);
fetchedActionData = await command.run($);
}
if (!isTrigger && fetchedActionData.error) {
await execution.$relatedQuery('executionSteps').insertAndFetch({
stepId: id,
status: 'failure',
dataIn: null,
dataOut: computedParameters,
errorDetails: fetchedActionData.error,
});
break;
}
previousExecutionStep = await execution
.$relatedQuery('executionSteps')
.insertAndFetch({
stepId: id,
status: 'success',
dataIn: isTrigger ? rawParameters : computedParameters,
dataOut: isTrigger ? data.raw : fetchedActionData.data.raw,
});
priorExecutionSteps[id] = previousExecutionStep;
if (id === this.untilStep?.id) {
break;
}
}
}
if (initialTriggerData.error) {
const executionWithError = await Execution.query().insert({
flowId: this.flow.id,
testRun: this.testRun,
});
executions.push(executionWithError);
await executionWithError.$relatedQuery('executionSteps').insertAndFetch({
stepId: triggerStep.id,
status: 'failure',
dataIn: triggerStep.parameters,
errorDetails: initialTriggerData.error,
});
}
if (!this.testRun) return;
const lastExecutionStepFromFirstExecution = await executions[0]
.$relatedQuery('executionSteps')
.orderBy('created_at', 'desc')
.first();
return lastExecutionStepFromFirstExecution;
}
async getInitialTriggerData(step: Step) {
if (!step.appKey || !step.key) return null;
const app = await App.findOneByKey(step.appKey);
const $ = await globalVariable({
connection: step.connection,
app,
flow: this.flow,
step,
});
const command = app.triggers.find((trigger) => trigger.key === step.key);
let fetchedData;
if (this.testRun) {
fetchedData = await command.testRun($);
} else {
fetchedData = await command.run($);
}
return fetchedData;
}
static computeParameters(
parameters: Step['parameters'],
executionSteps: ExecutionSteps
executionSteps: ExecutionStep[]
): Step['parameters'] {
const entries = Object.entries(parameters);
return entries.reduce((result, [key, value]: [string, unknown]) => {
@@ -203,7 +47,9 @@ class Processor {
) as string;
const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.');
const keyPath = keyPaths.join('.');
const executionStep = executionSteps[stepId.toString() as string];
const executionStep = executionSteps.find((executionStep) => {
return executionStep.stepId === stepId;
});
const data = executionStep?.dataOut;
const dataValue = get(data, keyPath);
return dataValue;

View File

@@ -1,5 +1,7 @@
import './config/orm';
export { worker } from './workers/processor';
import './workers/flow';
import './workers/trigger';
import './workers/action';
import telemetry from './helpers/telemetry';
telemetry.setServiceType('worker');

View File

@@ -0,0 +1,99 @@
import { Worker } from 'bullmq';
import redisConfig from '../config/redis';
import Flow from '../models/flow';
import logger from '../helpers/logger';
import globalVariable from '../helpers/global-variable';
import { IGlobalVariable } from '@automatisch/types';
import Execution from '../models/execution';
import Processor from '../services/processor';
import ExecutionStep from '../models/execution-step';
import Step from '../models/step';
import actionQueue from '../queues/action';
type JobData = {
flowId: string;
executionId: string;
stepId: string;
};
export const worker = new Worker(
'action',
async (job) => {
const { flowId, stepId, executionId } = job.data as JobData;
const step = await Step.query().findById(stepId).throwIfNotFound();
const execution = await Execution.query()
.findById(executionId)
.throwIfNotFound();
const $ = await globalVariable({
flow: await Flow.query().findById(flowId).throwIfNotFound(),
app: await step.getApp(),
step: step,
connection: await step.$relatedQuery('connection'),
execution: execution,
});
const priorExecutionSteps = await ExecutionStep.query().where({
execution_id: $.execution.id,
});
const computedParameters = Processor.computeParameters(
$.step.parameters,
priorExecutionSteps
);
const actionCommand = await step.getActionCommand();
$.step.parameters = computedParameters;
const actionDataItem = await actionCommand.run($);
await execution.$relatedQuery('executionSteps').insertAndFetch({
stepId: $.step.id,
status: 'success',
dataIn: computedParameters,
dataOut: actionDataItem.data.raw,
});
// TODO: Add until step id logic here!
// TODO: Change job name for the action data item!
const jobName = `${$.step.appKey}-sample`;
if (!$.nextStep.id) return;
const nextStep = await Step.query()
.findById($.nextStep.id)
.throwIfNotFound();
console.log('hello world');
const variable = await globalVariable({
flow: await Flow.query().findById($.flow.id),
app: await nextStep.getApp(),
step: nextStep,
connection: await nextStep.$relatedQuery('connection'),
execution: execution,
});
const jobPayload = {
$: variable,
};
await actionQueue.add(jobName, jobPayload);
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
});
worker.on('failed', (job, err) => {
logger.info(
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed22 to start with ${err.message}`
);
});
process.on('SIGTERM', async () => {
await worker.close();
});

View File

@@ -1,27 +1,36 @@
import { Worker } from 'bullmq';
import Processor from '../services/processor';
import redisConfig from '../config/redis';
import Flow from '../models/flow';
import logger from '../helpers/logger';
import globalVariable from '../helpers/global-variable';
export const worker = new Worker(
'processor',
'flow',
async (job) => {
const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound();
const data = await new Processor(flow, { testRun: false }).run();
return data;
const triggerStep = await flow.getTriggerStep();
const triggerCommand = await triggerStep.getTriggerCommand();
const $ = await globalVariable({
flow,
connection: await triggerStep.$relatedQuery('connection'),
app: await triggerStep.getApp(),
step: triggerStep,
});
await triggerCommand.run($);
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has completed!`);
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
});
worker.on('failed', (job, err) => {
logger.info(
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed with ${err.message}`
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}`
);
});

View File

@@ -0,0 +1,64 @@
import { Worker } from 'bullmq';
import redisConfig from '../config/redis';
import Flow from '../models/flow';
import logger from '../helpers/logger';
import globalVariable from '../helpers/global-variable';
import { ITriggerDataItem, IGlobalVariable } from '@automatisch/types';
import Execution from '../models/execution';
import actionQueue from '../queues/action';
import Step from '../models/step';
type JobData = {
$: IGlobalVariable;
triggerDataItem: ITriggerDataItem;
};
export const worker = new Worker(
'trigger',
async (job) => {
const { $, triggerDataItem } = job.data as JobData;
// check if we already process this trigger data item or not!
const execution = await Execution.query().insert({
flowId: $.flow.id,
// TODO: Check the testRun logic and adjust following line!
testRun: true,
internalId: triggerDataItem.meta.internalId,
});
await execution.$relatedQuery('executionSteps').insertAndFetch({
stepId: $.step.id,
status: 'success',
dataIn: $.step.parameters,
dataOut: triggerDataItem.raw,
});
const jobName = `${$.step.appKey}-${triggerDataItem.meta.internalId}`;
const nextStep = await Step.query().findById($.nextStep.id);
const jobPayload = {
flowId: $.flow.id,
executionId: execution.id,
stepId: nextStep.id,
};
await actionQueue.add(jobName, jobPayload);
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
});
worker.on('failed', (job, err) => {
logger.info(
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}`
);
});
process.on('SIGTERM', async () => {
await worker.close();
});

View File

@@ -204,6 +204,7 @@ export interface ITrigger {
key: string;
pollInterval: number;
description: string;
dedupeStrategy: 'greatest' | 'unique' | 'last';
substeps: ISubstep[];
getInterval(parameters: IGlobalVariable['step']['parameters']): string;
run($: IGlobalVariable): Promise<ITriggerOutput>;
@@ -252,12 +253,25 @@ export type IGlobalVariable = {
};
app: IApp;
http: IHttpClient;
flow: {
flow?: {
id: string;
lastInternalId: string;
isAlreadyProcessed?: (internalId: string) => boolean;
};
step: {
step?: {
id: string;
appKey: string;
parameters: IJSONObject;
};
nextStep?: {
id: string;
appKey: string;
parameters: IJSONObject;
};
execution?: {
id: string;
}
process?: (triggerDataItem: ITriggerDataItem) => Promise<void>;
};
declare module 'axios' {