From 22e1fe5c44db80d4994c2abdb623b8e45b6dbb35 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Wed, 23 Mar 2022 17:32:13 +0300 Subject: [PATCH] feat: Implement auto-run interval for triggers of completed flows --- packages/backend/package.json | 1 + packages/backend/src/app.ts | 13 +++++++ .../src/apps/twitter/triggers/search-tweet.ts | 2 +- .../src/graphql/mutations/execute-flow.ts | 18 +++++----- .../graphql/mutations/update-flow-status.ts | 21 ++++++++++- .../backend/src/graphql/queries/get-flows.ts | 3 +- .../src/helpers/create-bull-board-handler.ts | 15 ++++++++ .../src/helpers/inject-bull-board-handler.ts | 13 +++++++ packages/backend/src/models/flow.ts | 9 +++-- packages/backend/src/queues/processor.ts | 9 +++-- packages/backend/src/services/processor.ts | 13 ++++--- packages/backend/src/workers/processor.ts | 19 ++++------ yarn.lock | 35 +++++++++++++++++-- 13 files changed, 134 insertions(+), 37 deletions(-) create mode 100644 packages/backend/src/helpers/create-bull-board-handler.ts create mode 100644 packages/backend/src/helpers/inject-bull-board-handler.ts diff --git a/packages/backend/package.json b/packages/backend/package.json index 0eb1835b..8ccac2d9 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -18,6 +18,7 @@ }, "dependencies": { "@automatisch/web": "0.1.0", + "@bull-board/express": "^3.10.1", "@graphql-tools/graphql-file-loader": "^7.3.4", "@graphql-tools/load": "^7.5.2", "@octokit/oauth-methods": "^1.2.6", diff --git a/packages/backend/src/app.ts b/packages/backend/src/app.ts index 656fdb1b..1fd02c92 100644 --- a/packages/backend/src/app.ts +++ b/packages/backend/src/app.ts @@ -10,10 +10,23 @@ import appAssetsHandler from './helpers/app-assets-handler'; import webUIHandler from './helpers/web-ui-handler'; import errorHandler from './helpers/error-handler'; import './config/database'; +import { + createBullBoardHandler, + serverAdapter, +} from './helpers/create-bull-board-handler'; +import injectBullBoardHandler from './helpers/inject-bull-board-handler'; + +if (appConfig.appEnv === 'development') { + createBullBoardHandler(serverAdapter); +} const app = express(); const port = appConfig.port; +if (appConfig.appEnv === 'development') { + injectBullBoardHandler(app, serverAdapter); +} + appAssetsHandler(app); app.use(morgan); diff --git a/packages/backend/src/apps/twitter/triggers/search-tweet.ts b/packages/backend/src/apps/twitter/triggers/search-tweet.ts index 68196cc5..59f341d0 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweet.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweet.ts @@ -19,7 +19,7 @@ export default class SearchTweet { async run() { const response = await this.client.v2.get('tweets/search/recent', { query: this.parameters.searchTerm as string, - max_results: 100, + max_results: 10, }); return response.data; diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index fc6fc9dc..dfcd726a 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -14,7 +14,7 @@ const executeFlow = async ( params: Params, context: Context ) => { - const step = await context.currentUser + const untilStep = await context.currentUser .$relatedQuery('steps') .withGraphFetched('connection') .findOne({ @@ -22,20 +22,18 @@ const executeFlow = async ( }) .throwIfNotFound(); - const flow = await step.$relatedQuery('flow'); - const data = await new Processor(flow, step, { testRun: true }).run(); + const flow = await untilStep.$relatedQuery('flow'); - // TODO: Use this snippet to execute flows with the background job. - // const data = processorQueue.add('processorJob', { - // flowId: flow.id, - // stepId: step.id, - // }); + const data = await new Processor(flow, { + untilStep, + testRun: true, + }).run(); - await step.$query().patch({ + await untilStep.$query().patch({ status: 'completed', }); - return { data, step }; + return { data, step: untilStep }; }; export default executeFlow; diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 58027701..be6843b9 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -1,4 +1,5 @@ import Context from '../../types/express/context'; +import processorQueue from '../../queues/processor'; type Params = { input: { @@ -7,6 +8,11 @@ type Params = { }; }; +const JOB_NAME = 'processorJob'; +const REPEAT_OPTIONS = { + every: 60000, // 1 minute +}; + const updateFlowStatus = async ( _parent: unknown, params: Params, @@ -23,10 +29,23 @@ const updateFlowStatus = async ( return flow; } - flow = await flow.$query().patchAndFetch({ + flow = await flow.$query().withGraphFetched('steps').patchAndFetch({ active: params.input.active, }); + if (flow.active) { + await processorQueue.add( + JOB_NAME, + { flowId: flow.id }, + { + repeat: REPEAT_OPTIONS, + jobId: flow.id, + } + ); + } else { + await processorQueue.removeRepeatable(JOB_NAME, REPEAT_OPTIONS, flow.id); + } + return flow; }; diff --git a/packages/backend/src/graphql/queries/get-flows.ts b/packages/backend/src/graphql/queries/get-flows.ts index ba8819ba..0fa92bd4 100644 --- a/packages/backend/src/graphql/queries/get-flows.ts +++ b/packages/backend/src/graphql/queries/get-flows.ts @@ -7,7 +7,8 @@ const getFlows = async ( ) => { const flows = await context.currentUser .$relatedQuery('flows') - .withGraphJoined('[steps.[connection]]'); + .withGraphJoined('[steps.[connection]]') + .orderBy('created_at', 'desc'); return flows; }; diff --git a/packages/backend/src/helpers/create-bull-board-handler.ts b/packages/backend/src/helpers/create-bull-board-handler.ts new file mode 100644 index 00000000..7525aaa9 --- /dev/null +++ b/packages/backend/src/helpers/create-bull-board-handler.ts @@ -0,0 +1,15 @@ +import { ExpressAdapter } from '@bull-board/express'; +import { createBullBoard } from '@bull-board/api'; +import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; +import processorQueue from '../queues/processor'; + +const serverAdapter = new ExpressAdapter(); + +const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => { + createBullBoard({ + queues: [new BullMQAdapter(processorQueue)], + serverAdapter: serverAdapter, + }); +}; + +export { createBullBoardHandler, serverAdapter }; diff --git a/packages/backend/src/helpers/inject-bull-board-handler.ts b/packages/backend/src/helpers/inject-bull-board-handler.ts new file mode 100644 index 00000000..1afc97c2 --- /dev/null +++ b/packages/backend/src/helpers/inject-bull-board-handler.ts @@ -0,0 +1,13 @@ +import { Application } from 'express'; +import { ExpressAdapter } from '@bull-board/express'; + +const injectBullBoardHandler = async ( + app: Application, + serverAdapter: ExpressAdapter +) => { + const queueDashboardBasePath = '/admin/queues'; + serverAdapter.setBasePath(queueDashboardBasePath); + app.use(queueDashboardBasePath, serverAdapter.getRouter()); +}; + +export default injectBullBoardHandler; diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index 45acc283..c78e28b1 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -1,4 +1,5 @@ import { ValidationError } from 'objection'; +import type { QueryContext, ModelOptions } from 'objection'; import Base from './base'; import Step from './step'; import Execution from './execution'; @@ -42,10 +43,12 @@ class Flow extends Base { }, }); - async $beforeUpdate(): Promise { + async $beforeUpdate(opt: ModelOptions): Promise { if (!this.active) return; - const incompleteStep = await this.$relatedQuery('steps').findOne({ + const oldFlow = opt.old as Flow; + + const incompleteStep = await oldFlow.$relatedQuery('steps').findOne({ status: 'incomplete', }); @@ -56,7 +59,7 @@ class Flow extends Base { }); } - const allSteps = await this.$relatedQuery('steps'); + const allSteps = await oldFlow.$relatedQuery('steps'); if (allSteps.length < 2) { throw new ValidationError({ diff --git a/packages/backend/src/queues/processor.ts b/packages/backend/src/queues/processor.ts index b46d7782..7960c54c 100644 --- a/packages/backend/src/queues/processor.ts +++ b/packages/backend/src/queues/processor.ts @@ -1,8 +1,11 @@ -import { Queue } from 'bullmq'; +import { Queue, QueueScheduler } from 'bullmq'; import redisConfig from '../config/redis'; -const processorQueue = new Queue('processor', { +const redisConnection = { connection: redisConfig, -}); +}; + +new QueueScheduler('processor', redisConnection); +const processorQueue = new Queue('processor', redisConnection); export default processorQueue; diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index a5325aac..bf2e0f56 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -7,6 +7,11 @@ import ExecutionStep from '../models/execution-step'; type ExecutionSteps = Record; +type ProcessorOptions = { + untilStep?: Step; + testRun?: boolean; +}; + class Processor { flow: Flow; untilStep: Step; @@ -14,10 +19,10 @@ class Processor { static variableRegExp = /({{step\..+\..+}})/g; - constructor(flow: Flow, untilStep: Step, { testRun = false }) { + constructor(flow: Flow, processorOptions: ProcessorOptions) { this.flow = flow; - this.untilStep = untilStep; - this.testRun = testRun; + this.untilStep = processorOptions.untilStep; + this.testRun = processorOptions.testRun; } async run() { @@ -89,7 +94,7 @@ class Processor { priorExecutionSteps[id] = previousExecutionStep; - if (id === this.untilStep.id) { + if (id === this.untilStep?.id) { break; } } diff --git a/packages/backend/src/workers/processor.ts b/packages/backend/src/workers/processor.ts index e5cc8ea8..09c1dd95 100644 --- a/packages/backend/src/workers/processor.ts +++ b/packages/backend/src/workers/processor.ts @@ -1,31 +1,26 @@ import { Worker } from 'bullmq'; import Processor from '../services/processor'; import redisConfig from '../config/redis'; -import Step from '../models/step'; +import Flow from '../models/flow'; import logger from '../helpers/logger'; const worker = new Worker( 'processor', async (job) => { - const step = await Step.query() - .withGraphFetched('connection') - .findOne({ - 'steps.id': job.data.stepId, - }) - .throwIfNotFound(); + const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound(); + const data = await new Processor(flow, { testRun: false }).run(); - const flow = await step.$relatedQuery('flow'); - - const data = await new Processor(flow, step).run(); return data; }, { connection: redisConfig } ); worker.on('completed', (job) => { - logger.info(`${job.id} has completed!`); + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has completed!`); }); worker.on('failed', (job, err) => { - logger.info(`${job.id} has failed with ${err.message}`); + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed with ${err.message}` + ); }); diff --git a/yarn.lock b/yarn.lock index f63d7eab..6c2d993a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1449,6 +1449,30 @@ resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39" integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw== +"@bull-board/api@3.10.1": + version "3.10.1" + resolved "https://registry.yarnpkg.com/@bull-board/api/-/api-3.10.1.tgz#c9608d501c887abcfa8f1907bc3dedee179bdea3" + integrity sha512-ZYjNBdoBQu+UVbLAHQuEhJL96C+i7vYioc2n7FL/XoVea44XIw2WiKFcFxq0LnActPErja26QyZBQht23ph1lg== + dependencies: + redis-info "^3.0.8" + +"@bull-board/express@^3.10.1": + version "3.10.1" + resolved "https://registry.yarnpkg.com/@bull-board/express/-/express-3.10.1.tgz#f9ddd3ed0cb37e623895c438ee73e24095a9f91a" + integrity sha512-jygcJBZhTZf34FXo//m6rFVcCwkXQBxSXdj2KUL5JZX8GBh80z4o95GqJtyB8A+vBN/zGr+bIG0ikkm2x8mTtQ== + dependencies: + "@bull-board/api" "3.10.1" + "@bull-board/ui" "3.10.1" + ejs "3.1.6" + express "4.17.3" + +"@bull-board/ui@3.10.1": + version "3.10.1" + resolved "https://registry.yarnpkg.com/@bull-board/ui/-/ui-3.10.1.tgz#edf7c7752a78d9829f7a944bb87a0e70812b749f" + integrity sha512-K2qEAvTuyHZxUdK31HaBb9sdTFSOSKAZkxsl/LeiT4FGNF/h54iYGmWF9+HSFytggcnGdM0XnK3wLihCaIQAOQ== + dependencies: + "@bull-board/api" "3.10.1" + "@concordance/react@^2.0.0": version "2.0.0" resolved "https://registry.yarnpkg.com/@concordance/react/-/react-2.0.0.tgz#aef913f27474c53731f4fd79cc2f54897de90fde" @@ -8036,7 +8060,7 @@ ee-first@1.1.1: resolved "https://registry.yarnpkg.com/ee-first/-/ee-first-1.1.1.tgz#590c61156b0ae2f4f0255732a158b266bc56b21d" integrity sha1-WQxhFWsK4vTwJVcyoViyZrxWsh0= -ejs@^3.1.6: +ejs@3.1.6, ejs@^3.1.6: version "3.1.6" resolved "https://registry.yarnpkg.com/ejs/-/ejs-3.1.6.tgz#5bfd0a0689743bb5268b3550cceeebbc1702822a" integrity sha512-9lt9Zse4hPucPkoP7FHDF0LQAlGyF9JVpnClFLFH3aSSbxmyoqINRpp/9wePWJTUl4KOQwRL72Iw3InHPDkoGw== @@ -8815,7 +8839,7 @@ express-graphql@^0.12.0: http-errors "1.8.0" raw-body "^2.4.1" -express@^4.16.4: +express@4.17.3, express@^4.16.4: version "4.17.3" resolved "https://registry.yarnpkg.com/express/-/express-4.17.3.tgz#f6c7302194a4fb54271b73a1fe7a06478c8f85a1" integrity sha512-yuSQpz5I+Ch7gFrPCk4/c+dIBKlQUxtgwqzph132bsT6qhuzss6I8cLJQz7B3rFblzd6wtcI0ZbGltH/C4LjUg== @@ -15837,6 +15861,13 @@ redis-errors@^1.0.0, redis-errors@^1.2.0: resolved "https://registry.yarnpkg.com/redis-errors/-/redis-errors-1.2.0.tgz#eb62d2adb15e4eaf4610c04afe1529384250abad" integrity sha1-62LSrbFeTq9GEMBK/hUpOEJQq60= +redis-info@^3.0.8: + version "3.1.0" + resolved "https://registry.yarnpkg.com/redis-info/-/redis-info-3.1.0.tgz#5e349c8720e82d27ac84c73136dce0931e10469a" + integrity sha512-ER4L9Sh/vm63DkIE0bkSjxluQlioBiBgf5w1UuldaW/3vPcecdljVDisZhmnCMvsxHNiARTTDDHGg9cGwTfrKg== + dependencies: + lodash "^4.17.11" + redis-parser@^3.0.0: version "3.0.0" resolved "https://registry.yarnpkg.com/redis-parser/-/redis-parser-3.0.0.tgz#b66d828cdcafe6b4b8a428a7def4c6bcac31c8b4"