feat: Implement auto-run interval for triggers of completed flows

This commit is contained in:
Faruk AYDIN
2022-03-23 17:32:13 +03:00
committed by Ömer Faruk Aydın
parent 3715291df7
commit 22e1fe5c44
13 changed files with 134 additions and 37 deletions

View File

@@ -10,10 +10,23 @@ import appAssetsHandler from './helpers/app-assets-handler';
import webUIHandler from './helpers/web-ui-handler';
import errorHandler from './helpers/error-handler';
import './config/database';
import {
createBullBoardHandler,
serverAdapter,
} from './helpers/create-bull-board-handler';
import injectBullBoardHandler from './helpers/inject-bull-board-handler';
if (appConfig.appEnv === 'development') {
createBullBoardHandler(serverAdapter);
}
const app = express();
const port = appConfig.port;
if (appConfig.appEnv === 'development') {
injectBullBoardHandler(app, serverAdapter);
}
appAssetsHandler(app);
app.use(morgan);

View File

@@ -19,7 +19,7 @@ export default class SearchTweet {
async run() {
const response = await this.client.v2.get('tweets/search/recent', {
query: this.parameters.searchTerm as string,
max_results: 100,
max_results: 10,
});
return response.data;

View File

@@ -14,7 +14,7 @@ const executeFlow = async (
params: Params,
context: Context
) => {
const step = await context.currentUser
const untilStep = await context.currentUser
.$relatedQuery('steps')
.withGraphFetched('connection')
.findOne({
@@ -22,20 +22,18 @@ const executeFlow = async (
})
.throwIfNotFound();
const flow = await step.$relatedQuery('flow');
const data = await new Processor(flow, step, { testRun: true }).run();
const flow = await untilStep.$relatedQuery('flow');
// TODO: Use this snippet to execute flows with the background job.
// const data = processorQueue.add('processorJob', {
// flowId: flow.id,
// stepId: step.id,
// });
const data = await new Processor(flow, {
untilStep,
testRun: true,
}).run();
await step.$query().patch({
await untilStep.$query().patch({
status: 'completed',
});
return { data, step };
return { data, step: untilStep };
};
export default executeFlow;

View File

@@ -1,4 +1,5 @@
import Context from '../../types/express/context';
import processorQueue from '../../queues/processor';
type Params = {
input: {
@@ -7,6 +8,11 @@ type Params = {
};
};
const JOB_NAME = 'processorJob';
const REPEAT_OPTIONS = {
every: 60000, // 1 minute
};
const updateFlowStatus = async (
_parent: unknown,
params: Params,
@@ -23,10 +29,23 @@ const updateFlowStatus = async (
return flow;
}
flow = await flow.$query().patchAndFetch({
flow = await flow.$query().withGraphFetched('steps').patchAndFetch({
active: params.input.active,
});
if (flow.active) {
await processorQueue.add(
JOB_NAME,
{ flowId: flow.id },
{
repeat: REPEAT_OPTIONS,
jobId: flow.id,
}
);
} else {
await processorQueue.removeRepeatable(JOB_NAME, REPEAT_OPTIONS, flow.id);
}
return flow;
};

View File

@@ -7,7 +7,8 @@ const getFlows = async (
) => {
const flows = await context.currentUser
.$relatedQuery('flows')
.withGraphJoined('[steps.[connection]]');
.withGraphJoined('[steps.[connection]]')
.orderBy('created_at', 'desc');
return flows;
};

View File

@@ -0,0 +1,15 @@
import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import processorQueue from '../queues/processor';
const serverAdapter = new ExpressAdapter();
const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => {
createBullBoard({
queues: [new BullMQAdapter(processorQueue)],
serverAdapter: serverAdapter,
});
};
export { createBullBoardHandler, serverAdapter };

View File

@@ -0,0 +1,13 @@
import { Application } from 'express';
import { ExpressAdapter } from '@bull-board/express';
const injectBullBoardHandler = async (
app: Application,
serverAdapter: ExpressAdapter
) => {
const queueDashboardBasePath = '/admin/queues';
serverAdapter.setBasePath(queueDashboardBasePath);
app.use(queueDashboardBasePath, serverAdapter.getRouter());
};
export default injectBullBoardHandler;

View File

@@ -1,4 +1,5 @@
import { ValidationError } from 'objection';
import type { QueryContext, ModelOptions } from 'objection';
import Base from './base';
import Step from './step';
import Execution from './execution';
@@ -42,10 +43,12 @@ class Flow extends Base {
},
});
async $beforeUpdate(): Promise<void> {
async $beforeUpdate(opt: ModelOptions): Promise<void> {
if (!this.active) return;
const incompleteStep = await this.$relatedQuery('steps').findOne({
const oldFlow = opt.old as Flow;
const incompleteStep = await oldFlow.$relatedQuery('steps').findOne({
status: 'incomplete',
});
@@ -56,7 +59,7 @@ class Flow extends Base {
});
}
const allSteps = await this.$relatedQuery('steps');
const allSteps = await oldFlow.$relatedQuery('steps');
if (allSteps.length < 2) {
throw new ValidationError({

View File

@@ -1,8 +1,11 @@
import { Queue } from 'bullmq';
import { Queue, QueueScheduler } from 'bullmq';
import redisConfig from '../config/redis';
const processorQueue = new Queue('processor', {
const redisConnection = {
connection: redisConfig,
});
};
new QueueScheduler('processor', redisConnection);
const processorQueue = new Queue('processor', redisConnection);
export default processorQueue;

View File

@@ -7,6 +7,11 @@ import ExecutionStep from '../models/execution-step';
type ExecutionSteps = Record<string, ExecutionStep>;
type ProcessorOptions = {
untilStep?: Step;
testRun?: boolean;
};
class Processor {
flow: Flow;
untilStep: Step;
@@ -14,10 +19,10 @@ class Processor {
static variableRegExp = /({{step\..+\..+}})/g;
constructor(flow: Flow, untilStep: Step, { testRun = false }) {
constructor(flow: Flow, processorOptions: ProcessorOptions) {
this.flow = flow;
this.untilStep = untilStep;
this.testRun = testRun;
this.untilStep = processorOptions.untilStep;
this.testRun = processorOptions.testRun;
}
async run() {
@@ -89,7 +94,7 @@ class Processor {
priorExecutionSteps[id] = previousExecutionStep;
if (id === this.untilStep.id) {
if (id === this.untilStep?.id) {
break;
}
}

View File

@@ -1,31 +1,26 @@
import { Worker } from 'bullmq';
import Processor from '../services/processor';
import redisConfig from '../config/redis';
import Step from '../models/step';
import Flow from '../models/flow';
import logger from '../helpers/logger';
const worker = new Worker(
'processor',
async (job) => {
const step = await Step.query()
.withGraphFetched('connection')
.findOne({
'steps.id': job.data.stepId,
})
.throwIfNotFound();
const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound();
const data = await new Processor(flow, { testRun: false }).run();
const flow = await step.$relatedQuery('flow');
const data = await new Processor(flow, step).run();
return data;
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`${job.id} has completed!`);
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has completed!`);
});
worker.on('failed', (job, err) => {
logger.info(`${job.id} has failed with ${err.message}`);
logger.info(
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed with ${err.message}`
);
});