feat: Implement process worker with bullmq

This commit is contained in:
Faruk AYDIN
2022-03-05 00:15:46 +03:00
committed by Ömer Faruk Aydın
parent b288dc8c35
commit 20e725b590
9 changed files with 194 additions and 3 deletions

View File

@@ -10,3 +10,5 @@ POSTGRES_USERNAME=automatish_development_user
POSTGRES_PASSWORD=
POSTGRES_ENABLE_SSL=false
ENCRYPTION_KEY=sample-encryption-key
REDIS_PORT=6379
REDIS_HOST=127.0.0.1

View File

@@ -4,6 +4,7 @@
"description": "> TODO: description",
"scripts": {
"dev": "nodemon --watch 'src/**/*.ts' --exec 'ts-node' src/app.ts",
"worker": "nodemon --watch 'src/**/*.ts' --exec 'ts-node' src/worker.ts",
"build": "tsc",
"start": "node dist/src/app.js",
"test": "ava",
@@ -21,6 +22,7 @@
"ajv-formats": "^2.1.1",
"axios": "0.24.0",
"bcrypt": "^5.0.1",
"bullmq": "^1.76.1",
"cors": "^2.8.5",
"crypto-js": "^4.1.1",
"debug": "~2.6.9",

View File

@@ -16,6 +16,8 @@ type AppConfig = {
baseUrl?: string;
encryptionKey: string;
serveWebAppSeparately: boolean;
redisHost: string;
redisPort: number;
};
const appConfig: AppConfig = {
@@ -33,6 +35,8 @@ const appConfig: AppConfig = {
encryptionKey: process.env.ENCRYPTION_KEY,
serveWebAppSeparately:
process.env.SERVE_WEB_APP_SEPARATELY === 'true' ? true : false,
redisHost: process.env.REDIS_HOST || '127.0.0.1',
redisPort: parseInt(process.env.REDIS_PORT) || 6379,
};
if (appConfig.serveWebAppSeparately) {

View File

@@ -0,0 +1,8 @@
import appConfig from './app';
const redisConfig = {
host: appConfig.redisHost,
port: appConfig.redisPort,
};
export default redisConfig;

View File

@@ -2,6 +2,7 @@ import { GraphQLString, GraphQLNonNull } from 'graphql';
import RequestWithCurrentUser from '../../types/express/request-with-current-user';
import executeFlowType from '../types/execute-flow';
import Processor from '../../services/processor';
import processorQueue from '../../queues/processor';
type Params = {
stepId: string;
@@ -21,6 +22,12 @@ const executeFlowResolver = async (
const flow = await step.$relatedQuery('flow');
const data = await new Processor(flow, step).run();
// TODO: Use this snippet to execute flows with the background job.
// const data = processorQueue.add('processorJob', {
// flowId: flow.id,
// stepId: step.id,
// });
await step.$query().patch({
status: 'completed',
});

View File

@@ -0,0 +1,8 @@
import { Queue } from 'bullmq';
import redisConfig from '../config/redis';
const processorQueue = new Queue('processor', {
connection: redisConfig,
});
export default processorQueue;

View File

@@ -0,0 +1,2 @@
import './config/database';
import './workers/processor';

View File

@@ -0,0 +1,31 @@
import { Worker } from 'bullmq';
import Processor from '../services/processor';
import redisConfig from '../config/redis';
import Step from '../models/step';
import logger from '../helpers/logger';
const worker = new Worker(
'processor',
async (job) => {
const step = await Step.query()
.withGraphFetched('connection')
.findOne({
'steps.id': job.data.stepId,
})
.throwIfNotFound();
const flow = await step.$relatedQuery('flow');
const data = await new Processor(flow, step).run();
return data;
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`${job.id} has completed!`);
});
worker.on('failed', (job, err) => {
logger.info(`${job.id} has failed with ${err.message}`);
});

133
yarn.lock
View File

@@ -5868,6 +5868,21 @@ builtins@^1.0.3:
resolved "https://registry.yarnpkg.com/builtins/-/builtins-1.0.3.tgz#cb94faeb61c8696451db36534e1422f94f0aee88"
integrity sha1-y5T662HIaWRR2zZTThQi+U8K7og=
bullmq@^1.76.1:
version "1.76.1"
resolved "https://registry.yarnpkg.com/bullmq/-/bullmq-1.76.1.tgz#202cfe2a9eee0cd00f3f4995c04b6fe5f1bc1995"
integrity sha512-K+TAT2mbuDEz99IuC/rY+vwmA6jpUH0w5y61pdNx0xVdHdAWn6dd9Vs0MnCObZDHCdbNgGvFumfOjCzHjD4VVg==
dependencies:
cron-parser "^2.18.0"
get-port "^5.1.1"
glob "^7.2.0"
ioredis "^4.28.2"
lodash "^4.17.21"
msgpackr "^1.4.6"
semver "^6.3.0"
tslib "^1.14.1"
uuid "^8.3.2"
byline@^5.0.0:
version "5.0.0"
resolved "https://registry.yarnpkg.com/byline/-/byline-5.0.0.tgz#741c5216468eadc457b03410118ad77de8c1ddb1"
@@ -6310,6 +6325,11 @@ clsx@^1.1.0, clsx@^1.1.1:
resolved "https://registry.yarnpkg.com/clsx/-/clsx-1.1.1.tgz#98b3134f9abbdf23b2663491ace13c5c03a73188"
integrity sha512-6/bPho624p3S2pMyvP5kKBPXnI3ufHLObBFCfgx+LkeR5lg2XYy2hqZqUf45ypD8COn2bhgGJSUE+l5dhNBieA==
cluster-key-slot@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/cluster-key-slot/-/cluster-key-slot-1.1.0.tgz#30474b2a981fb12172695833052bc0d01336d10d"
integrity sha512-2Nii8p3RwAPiFwsnZvukotvow2rIHM+yQ6ZcBXGHdniadkYGZYiGmkHJIbZPIV9nfv7m/U1IPMVVcAhoWFeklw==
cmd-shim@^4.0.1, cmd-shim@^4.1.0:
version "4.1.0"
resolved "https://registry.yarnpkg.com/cmd-shim/-/cmd-shim-4.1.0.tgz#b3a904a6743e9fede4148c6f3800bf2a08135bdd"
@@ -6833,6 +6853,14 @@ create-require@^1.1.0:
resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333"
integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==
cron-parser@^2.18.0:
version "2.18.0"
resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-2.18.0.tgz#de1bb0ad528c815548371993f81a54e5a089edcf"
integrity sha512-s4odpheTyydAbTBQepsqd2rNWGa2iV3cyo8g7zbI2QQYGLVsfbhmwukayS1XHppe02Oy1fg7mg6xoaraVJeEcg==
dependencies:
is-nan "^1.3.0"
moment-timezone "^0.5.31"
cross-fetch@^3.0.4, cross-fetch@^3.0.6:
version "3.1.4"
resolved "https://registry.yarnpkg.com/cross-fetch/-/cross-fetch-3.1.4.tgz#9723f3a3a247bf8b89039f3a380a9244e8fa2f39"
@@ -7344,6 +7372,11 @@ delegates@^1.0.0:
resolved "https://registry.yarnpkg.com/delegates/-/delegates-1.0.0.tgz#84c6e159b81904fdca59a0ef44cd870d31250f9a"
integrity sha1-hMbhWbgZBP3KWaDvRM2HDTElD5o=
denque@^1.1.0:
version "1.5.1"
resolved "https://registry.yarnpkg.com/denque/-/denque-1.5.1.tgz#07f670e29c9a78f8faecb2566a1e2c11929c5cbf"
integrity sha512-XwE+iZ4D6ZUB7mfYRMb5wByE8L74HCn30FBN7sWnXksWc1LO1bPDl67pBR9o/kC4z/xSNAwkMYcGgqDV3BE3Hw==
depd@^1.1.2, depd@~1.1.2:
version "1.1.2"
resolved "https://registry.yarnpkg.com/depd/-/depd-1.1.2.tgz#9bcd52e14c097763e749b274c4346ed2e560b5a9"
@@ -10161,6 +10194,23 @@ invariant@^2.2.4:
dependencies:
loose-envify "^1.0.0"
ioredis@^4.28.2:
version "4.28.5"
resolved "https://registry.yarnpkg.com/ioredis/-/ioredis-4.28.5.tgz#5c149e6a8d76a7f8fa8a504ffc85b7d5b6797f9f"
integrity sha512-3GYo0GJtLqgNXj4YhrisLaNNvWSNwSS2wS4OELGfGxH8I69+XfNdnmV1AyN+ZqMh0i7eX+SWjrwFKDBDgfBC1A==
dependencies:
cluster-key-slot "^1.1.0"
debug "^4.3.1"
denque "^1.1.0"
lodash.defaults "^4.2.0"
lodash.flatten "^4.4.0"
lodash.isarguments "^3.1.0"
p-map "^2.1.0"
redis-commands "1.7.0"
redis-errors "^1.2.0"
redis-parser "^3.0.0"
standard-as-callback "^2.1.0"
ip@^1.1.0, ip@^1.1.5:
version "1.1.5"
resolved "https://registry.yarnpkg.com/ip/-/ip-1.1.5.tgz#bdded70114290828c0a039e72ef25f5aaec4354a"
@@ -10359,6 +10409,14 @@ is-module@^1.0.0:
resolved "https://registry.yarnpkg.com/is-module/-/is-module-1.0.0.tgz#3258fb69f78c14d5b815d664336b4cffb6441591"
integrity sha1-Mlj7afeMFNW4FdZkM2tM/7ZEFZE=
is-nan@^1.3.0:
version "1.3.2"
resolved "https://registry.yarnpkg.com/is-nan/-/is-nan-1.3.2.tgz#043a54adea31748b55b6cd4e09aadafa69bd9e1d"
integrity sha512-E+zBKpQ2t6MEo1VsonYmluk9NxGrbzpeeLC2xIViuO2EjU2xsXsBPwTr3Ykv9l08UYEVEdWeRZNouaZqF6RN0w==
dependencies:
call-bind "^1.0.0"
define-properties "^1.1.3"
is-negative-zero@^2.0.1:
version "2.0.2"
resolved "https://registry.yarnpkg.com/is-negative-zero/-/is-negative-zero-2.0.2.tgz#7bf6f03a28003b8b3965de3ac26f664d765f3150"
@@ -11622,7 +11680,7 @@ lodash.debounce@^4.0.8:
resolved "https://registry.yarnpkg.com/lodash.debounce/-/lodash.debounce-4.0.8.tgz#82d79bff30a67c4005ffd5e2515300ad9ca4d7af"
integrity sha1-gteb/zCmfEAF/9XiUVMArZyk168=
lodash.defaults@^4.0.1:
lodash.defaults@^4.0.1, lodash.defaults@^4.2.0:
version "4.2.0"
resolved "https://registry.yarnpkg.com/lodash.defaults/-/lodash.defaults-4.2.0.tgz#d09178716ffea4dde9e5fb7b37f6f0802274580c"
integrity sha1-0JF4cW/+pN3p5ft7N/bwgCJ0WAw=
@@ -11632,7 +11690,7 @@ lodash.filter@^4.4.0:
resolved "https://registry.yarnpkg.com/lodash.filter/-/lodash.filter-4.6.0.tgz#668b1d4981603ae1cc5a6fa760143e480b4c4ace"
integrity sha1-ZosdSYFgOuHMWm+nYBQ+SAtMSs4=
lodash.flatten@^4.2.0:
lodash.flatten@^4.2.0, lodash.flatten@^4.4.0:
version "4.4.0"
resolved "https://registry.yarnpkg.com/lodash.flatten/-/lodash.flatten-4.4.0.tgz#f31c22225a9632d2bbf8e4addbef240aa765a61f"
integrity sha1-8xwiIlqWMtK7+OSt2+8kCqdlph8=
@@ -11657,6 +11715,11 @@ lodash.includes@^4.3.0:
resolved "https://registry.yarnpkg.com/lodash.includes/-/lodash.includes-4.3.0.tgz#60bb98a87cb923c68ca1e51325483314849f553f"
integrity sha1-YLuYqHy5I8aMoeUTJUgzFISfVT8=
lodash.isarguments@^3.1.0:
version "3.1.0"
resolved "https://registry.yarnpkg.com/lodash.isarguments/-/lodash.isarguments-3.1.0.tgz#2f573d85c6a24289ff00663b491c1d338ff3458a"
integrity sha1-L1c9hcaiQon/AGY7SRwdM4/zRYo=
lodash.isboolean@^3.0.3:
version "3.0.3"
resolved "https://registry.yarnpkg.com/lodash.isboolean/-/lodash.isboolean-3.0.3.tgz#6c2e171db2a257cd96802fd43b01b20d5f5870f6"
@@ -12335,6 +12398,18 @@ modify-values@^1.0.0:
resolved "https://registry.yarnpkg.com/modify-values/-/modify-values-1.0.1.tgz#b3939fa605546474e3e3e3c63d64bd43b4ee6022"
integrity sha512-xV2bxeN6F7oYjZWTe/YPAy6MN2M+sL4u/Rlm2AHCIVGfo2p1yGmBHQ6vHehl4bRTZBdHu3TSkWdYgkwpYzAGSw==
moment-timezone@^0.5.31:
version "0.5.34"
resolved "https://registry.yarnpkg.com/moment-timezone/-/moment-timezone-0.5.34.tgz#a75938f7476b88f155d3504a9343f7519d9a405c"
integrity sha512-3zAEHh2hKUs3EXLESx/wsgw6IQdusOT8Bxm3D9UrHPQR7zlMmzwybC8zHEM1tQ4LJwP7fcxrWr8tuBg05fFCbg==
dependencies:
moment ">= 2.9.0"
"moment@>= 2.9.0":
version "2.29.1"
resolved "https://registry.yarnpkg.com/moment/-/moment-2.29.1.tgz#b2be769fa31940be9eeea6469c075e35006fa3d3"
integrity sha512-kHmoybcPV8Sqy59DwNDY3Jefr64lK/by/da0ViFcuA4DH0vQg5Q6Ze5VimxkfQNSC+Mls/Kx53s7TjP1RhFEDQ==
morgan@^1.10.0:
version "1.10.0"
resolved "https://registry.yarnpkg.com/morgan/-/morgan-1.10.0.tgz#091778abc1fc47cd3509824653dae1faab6b17d7"
@@ -12371,6 +12446,21 @@ ms@2.1.3, ms@^2.0.0, ms@^2.1.1, ms@^2.1.3:
resolved "https://registry.yarnpkg.com/ms/-/ms-2.1.3.tgz#574c8138ce1d2b5861f0b44579dbadd60c6615b2"
integrity sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==
msgpackr-extract@^1.0.14:
version "1.0.16"
resolved "https://registry.yarnpkg.com/msgpackr-extract/-/msgpackr-extract-1.0.16.tgz#701c4f6e6f25c100ae84557092274e8fffeefe45"
integrity sha512-fxdRfQUxPrL/TizyfYfMn09dK58e+d65bRD/fcaVH4052vj30QOzzqxcQIS7B0NsqlypEQ/6Du3QmP2DhWFfCA==
dependencies:
nan "^2.14.2"
node-gyp-build "^4.2.3"
msgpackr@^1.4.6:
version "1.5.4"
resolved "https://registry.yarnpkg.com/msgpackr/-/msgpackr-1.5.4.tgz#2b6ea6cb7d79c0ad98fc76c68163c48eda50cf0d"
integrity sha512-Z7w5Jg+2Q9z9gJxeM68d7tSuWZZGnFIRhZnyqcZCa/1dKkhOCNvR1TUV3zzJ3+vj78vlwKRzUgVDlW4jiSOeDA==
optionalDependencies:
msgpackr-extract "^1.0.14"
multicast-dns-service-types@^1.1.0:
version "1.1.0"
resolved "https://registry.yarnpkg.com/multicast-dns-service-types/-/multicast-dns-service-types-1.1.0.tgz#899f11d9686e5e05cb91b35d5f0e63b773cfc901"
@@ -12405,6 +12495,11 @@ mute-stream@0.0.8, mute-stream@~0.0.4:
resolved "https://registry.yarnpkg.com/mute-stream/-/mute-stream-0.0.8.tgz#1630c42b2251ff81e2a283de96a5497ea92e5e0d"
integrity sha512-nnbWWOkoWyUsTjKrhgD0dcz22mdkSnpYqbEjIm2nhwhuxlSkpywJmBo8h0ZqJdkp73mb90SssHkN4rsRaBAfAA==
nan@^2.14.2:
version "2.15.0"
resolved "https://registry.yarnpkg.com/nan/-/nan-2.15.0.tgz#3f34a473ff18e15c1b5626b62903b5ad6e665fee"
integrity sha512-8ZtvEnA2c5aYCZYd1cvgdnU6cqwixRoYg70xPLWUws5ORTa/lnw+u4amixRS/Ac5U5mQVgp9pnlSUnbNWFaWZQ==
nanoid@^3.1.30:
version "3.2.0"
resolved "https://registry.yarnpkg.com/nanoid/-/nanoid-3.2.0.tgz#62667522da6673971cca916a6d3eff3f415ff80c"
@@ -12498,6 +12593,11 @@ node-forge@^1.0.0, node-forge@^1.2.0:
resolved "https://registry.yarnpkg.com/node-forge/-/node-forge-1.2.1.tgz#82794919071ef2eb5c509293325cec8afd0fd53c"
integrity sha512-Fcvtbb+zBcZXbTTVwqGA5W+MKBj56UjVRevvchv5XrcyXbmNdesfZL37nlcWOfpgHhgmxApw3tQbTr4CqNmX4w==
node-gyp-build@^4.2.3:
version "4.3.0"
resolved "https://registry.yarnpkg.com/node-gyp-build/-/node-gyp-build-4.3.0.tgz#9f256b03e5826150be39c764bf51e993946d71a3"
integrity sha512-iWjXZvmboq0ja1pUGULQBexmxq8CV4xBhX7VDOTbL7ZR4FOowwY/VOtRxBN/yKxmdGoIp4j5ysNT4u3S2pDQ3Q==
node-gyp@^5.0.2:
version "5.1.1"
resolved "https://registry.yarnpkg.com/node-gyp/-/node-gyp-5.1.1.tgz#eb915f7b631c937d282e33aed44cb7a025f62a3e"
@@ -13178,6 +13278,11 @@ p-map-series@^2.1.0:
resolved "https://registry.yarnpkg.com/p-map-series/-/p-map-series-2.1.0.tgz#7560d4c452d9da0c07e692fdbfe6e2c81a2a91f2"
integrity sha512-RpYIIK1zXSNEOdwxcfe7FdvGcs7+y5n8rifMhMNWvaxRNMPINJHF5GDeuVxWqnfrcHPSCnp7Oo5yNXHId9Av2Q==
p-map@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/p-map/-/p-map-2.1.0.tgz#310928feef9c9ecc65b68b17693018a665cea175"
integrity sha512-y3b8Kpd8OAN444hxfBbFfj1FY/RjtTd8tzYwhUqNYXx0fXx2iX4maP4Qr6qhIKbQXI02wTLAda4fYUbDagTUFw==
p-map@^4.0.0:
version "4.0.0"
resolved "https://registry.yarnpkg.com/p-map/-/p-map-4.0.0.tgz#bb2f95a5eda2ec168ec9274e06a747c3e2904d2b"
@@ -15173,6 +15278,23 @@ redeyed@~2.1.0:
dependencies:
esprima "~4.0.0"
redis-commands@1.7.0:
version "1.7.0"
resolved "https://registry.yarnpkg.com/redis-commands/-/redis-commands-1.7.0.tgz#15a6fea2d58281e27b1cd1acfb4b293e278c3a89"
integrity sha512-nJWqw3bTFy21hX/CPKHth6sfhZbdiHP6bTawSgQBlKOVRG7EZkfHbbHwQJnrE4vsQf0CMNE+3gJ4Fmm16vdVlQ==
redis-errors@^1.0.0, redis-errors@^1.2.0:
version "1.2.0"
resolved "https://registry.yarnpkg.com/redis-errors/-/redis-errors-1.2.0.tgz#eb62d2adb15e4eaf4610c04afe1529384250abad"
integrity sha1-62LSrbFeTq9GEMBK/hUpOEJQq60=
redis-parser@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/redis-parser/-/redis-parser-3.0.0.tgz#b66d828cdcafe6b4b8a428a7def4c6bcac31c8b4"
integrity sha1-tm2CjNyv5rS4pCin3vTGvKwxyLQ=
dependencies:
redis-errors "^1.0.0"
regenerate-unicode-properties@^9.0.0:
version "9.0.0"
resolved "https://registry.yarnpkg.com/regenerate-unicode-properties/-/regenerate-unicode-properties-9.0.0.tgz#54d09c7115e1f53dc2314a974b32c1c344efe326"
@@ -16362,6 +16484,11 @@ stackframe@^1.1.1:
resolved "https://registry.yarnpkg.com/stackframe/-/stackframe-1.2.0.tgz#52429492d63c62eb989804c11552e3d22e779303"
integrity sha512-GrdeshiRmS1YLMYgzF16olf2jJ/IzxXY9lhKOskuVziubpTYcYqyOwYeJKzQkwy7uN0fYSsbsC4RQaXf9LCrYA==
standard-as-callback@^2.1.0:
version "2.1.0"
resolved "https://registry.yarnpkg.com/standard-as-callback/-/standard-as-callback-2.1.0.tgz#8953fc05359868a77b5b9739a665c5977bb7df45"
integrity sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==
state-toggle@^1.0.0:
version "1.0.3"
resolved "https://registry.yarnpkg.com/state-toggle/-/state-toggle-1.0.3.tgz#e123b16a88e143139b09c6852221bc9815917dfe"
@@ -17201,7 +17328,7 @@ tsconfig-paths@^3.12.0:
minimist "^1.2.0"
strip-bom "^3.0.0"
tslib@^1.8.1, tslib@^1.9.0:
tslib@^1.14.1, tslib@^1.8.1, tslib@^1.9.0:
version "1.14.1"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-1.14.1.tgz#cf2d38bdc34a134bcaf1091c41f6619e2f672d00"
integrity sha512-Xni35NKzjgMrwevysHTCArtLDpPvye8zV/0E4EyYn43P7/7qvQwPh9BGkHewbMulVntbigmcT7rdX3BNo9wRJg==