Compare commits
1 Commits
Author | SHA1 | Date | |
---|---|---|---|
![]() |
e7df19ae17 |
@@ -1,52 +0,0 @@
|
|||||||
name: release-tag
|
|
||||||
on:
|
|
||||||
push:
|
|
||||||
branches:
|
|
||||||
- 'main'
|
|
||||||
jobs:
|
|
||||||
release-image:
|
|
||||||
runs-on: ubuntu-latest
|
|
||||||
env:
|
|
||||||
DOCKER_ORG: groot
|
|
||||||
DOCKER_LATEST: latest
|
|
||||||
RUNNER_TOOL_CACHE: /toolcache
|
|
||||||
steps:
|
|
||||||
- name: Checkout
|
|
||||||
uses: actions/checkout@v3
|
|
||||||
|
|
||||||
- name: Set up QEMU
|
|
||||||
uses: docker/setup-qemu-action@v2
|
|
||||||
|
|
||||||
- name: Set up Docker BuildX
|
|
||||||
uses: docker/setup-buildx-action@v2
|
|
||||||
with: # replace it with your local IP
|
|
||||||
config-inline: |
|
|
||||||
[registry."git.send.nrw"]
|
|
||||||
http = true
|
|
||||||
insecure = true
|
|
||||||
|
|
||||||
- name: Login to DockerHub
|
|
||||||
uses: docker/login-action@v2
|
|
||||||
with:
|
|
||||||
registry: git.send.nrw # replace it with your local IP
|
|
||||||
username: ${{ secrets.DOCKER_USERNAME }}
|
|
||||||
password: ${{ secrets.DOCKER_PASSWORD }}
|
|
||||||
|
|
||||||
- name: Get Meta
|
|
||||||
id: meta
|
|
||||||
run: |
|
|
||||||
echo REPO_NAME=$(echo ${GITHUB_REPOSITORY} | awk -F"/" '{print $2}') >> $GITHUB_OUTPUT
|
|
||||||
echo REPO_VERSION=$(git describe --tags --always | sed 's/^v//') >> $GITHUB_OUTPUT
|
|
||||||
|
|
||||||
- name: Build and push
|
|
||||||
uses: docker/build-push-action@v4
|
|
||||||
with:
|
|
||||||
context: ./docker
|
|
||||||
file: ./docker/Dockerfile.compose
|
|
||||||
entrypoint: ./docker/compose-entrypoint.sh
|
|
||||||
platforms: |
|
|
||||||
linux/amd64
|
|
||||||
push: true
|
|
||||||
tags: | # replace it with your local IP and tags
|
|
||||||
git.send.nrw/${{ env.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}:${{ steps.meta.outputs.REPO_VERSION }}
|
|
||||||
git.send.nrw/${{ env.DOCKER_ORG }}/${{ steps.meta.outputs.REPO_NAME }}:${{ env.DOCKER_LATEST }}
|
|
@@ -1,7 +1,10 @@
|
|||||||
version: '3.9'
|
version: '3.9'
|
||||||
services:
|
services:
|
||||||
main:
|
main:
|
||||||
image: git.send.nrw/groot/automatisch:latest
|
build:
|
||||||
|
context: ./docker
|
||||||
|
dockerfile: Dockerfile.compose
|
||||||
|
entrypoint: /compose-entrypoint.sh
|
||||||
ports:
|
ports:
|
||||||
- '3000:3000'
|
- '3000:3000'
|
||||||
depends_on:
|
depends_on:
|
||||||
@@ -25,7 +28,10 @@ services:
|
|||||||
volumes:
|
volumes:
|
||||||
- automatisch_storage:/automatisch/storage
|
- automatisch_storage:/automatisch/storage
|
||||||
worker:
|
worker:
|
||||||
image: git.send.nrw/groot/automatisch:latest
|
build:
|
||||||
|
context: ./docker
|
||||||
|
dockerfile: Dockerfile.compose
|
||||||
|
entrypoint: /compose-entrypoint.sh
|
||||||
depends_on:
|
depends_on:
|
||||||
- main
|
- main
|
||||||
environment:
|
environment:
|
||||||
|
@@ -13,7 +13,7 @@ if (appConfig.redisSentinelHost) {
|
|||||||
{
|
{
|
||||||
host: appConfig.redisSentinelHost,
|
host: appConfig.redisSentinelHost,
|
||||||
port: appConfig.redisSentinelPort,
|
port: appConfig.redisSentinelPort,
|
||||||
}
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
redisConfig.sentinelUsername = appConfig.redisSentinelUsername;
|
redisConfig.sentinelUsername = appConfig.redisSentinelUsername;
|
||||||
|
@@ -87,14 +87,14 @@ describe('GET /api/v1/apps/:appKey/connections', () => {
|
|||||||
|
|
||||||
it('should return not found response for invalid connection UUID', async () => {
|
it('should return not found response for invalid connection UUID', async () => {
|
||||||
await createPermission({
|
await createPermission({
|
||||||
action: 'read',
|
action: 'update',
|
||||||
subject: 'Connection',
|
subject: 'Connection',
|
||||||
roleId: currentUserRole.id,
|
roleId: currentUserRole.id,
|
||||||
conditions: ['isCreator'],
|
conditions: ['isCreator'],
|
||||||
});
|
});
|
||||||
|
|
||||||
await request(app)
|
await request(app)
|
||||||
.get('/api/v1/apps/invalid-connection-id/connections')
|
.get('/api/v1/connections/invalid-connection-id/connections')
|
||||||
.set('Authorization', token)
|
.set('Authorization', token)
|
||||||
.expect(404);
|
.expect(404);
|
||||||
});
|
});
|
||||||
|
@@ -193,7 +193,7 @@ describe('POST /api/v1/steps/:stepId/dynamic-data', () => {
|
|||||||
const notExistingStepUUID = Crypto.randomUUID();
|
const notExistingStepUUID = Crypto.randomUUID();
|
||||||
|
|
||||||
await request(app)
|
await request(app)
|
||||||
.post(`/api/v1/steps/${notExistingStepUUID}/dynamic-data`)
|
.get(`/api/v1/steps/${notExistingStepUUID}/dynamic-data`)
|
||||||
.set('Authorization', token)
|
.set('Authorization', token)
|
||||||
.expect(404);
|
.expect(404);
|
||||||
});
|
});
|
||||||
@@ -216,7 +216,7 @@ describe('POST /api/v1/steps/:stepId/dynamic-data', () => {
|
|||||||
const step = await createStep({ appKey: null });
|
const step = await createStep({ appKey: null });
|
||||||
|
|
||||||
await request(app)
|
await request(app)
|
||||||
.post(`/api/v1/steps/${step.id}/dynamic-data`)
|
.get(`/api/v1/steps/${step.id}/dynamic-data`)
|
||||||
.set('Authorization', token)
|
.set('Authorization', token)
|
||||||
.expect(404);
|
.expect(404);
|
||||||
});
|
});
|
||||||
|
@@ -118,7 +118,7 @@ describe('POST /api/v1/steps/:stepId/dynamic-fields', () => {
|
|||||||
const notExistingStepUUID = Crypto.randomUUID();
|
const notExistingStepUUID = Crypto.randomUUID();
|
||||||
|
|
||||||
await request(app)
|
await request(app)
|
||||||
.post(`/api/v1/steps/${notExistingStepUUID}/dynamic-fields`)
|
.get(`/api/v1/steps/${notExistingStepUUID}/dynamic-fields`)
|
||||||
.set('Authorization', token)
|
.set('Authorization', token)
|
||||||
.expect(404);
|
.expect(404);
|
||||||
});
|
});
|
||||||
@@ -138,11 +138,10 @@ describe('POST /api/v1/steps/:stepId/dynamic-fields', () => {
|
|||||||
conditions: [],
|
conditions: [],
|
||||||
});
|
});
|
||||||
|
|
||||||
const step = await createStep();
|
const step = await createStep({ appKey: null });
|
||||||
await step.$query().patch({ appKey: null });
|
|
||||||
|
|
||||||
await request(app)
|
await request(app)
|
||||||
.post(`/api/v1/steps/${step.id}/dynamic-fields`)
|
.get(`/api/v1/steps/${step.id}/dynamic-fields`)
|
||||||
.set('Authorization', token)
|
.set('Authorization', token)
|
||||||
.expect(404);
|
.expect(404);
|
||||||
});
|
});
|
||||||
|
@@ -12,7 +12,7 @@ import appConfig from '../config/app.js';
|
|||||||
const serverAdapter = new ExpressAdapter();
|
const serverAdapter = new ExpressAdapter();
|
||||||
|
|
||||||
const queues = [
|
const queues = [
|
||||||
new BullMQAdapter(flowQueue),
|
new BullMQAdapter(flowQueue.queue),
|
||||||
new BullMQAdapter(triggerQueue),
|
new BullMQAdapter(triggerQueue),
|
||||||
new BullMQAdapter(actionQueue),
|
new BullMQAdapter(actionQueue),
|
||||||
new BullMQAdapter(emailQueue),
|
new BullMQAdapter(emailQueue),
|
||||||
|
@@ -386,10 +386,7 @@ class Flow extends Base {
|
|||||||
}
|
}
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
const repeatableJobs = await flowQueue.getRepeatableJobs();
|
await flowQueue.removeRepeatableJobById(this.id);
|
||||||
const job = repeatableJobs.find((job) => job.id === this.id);
|
|
||||||
|
|
||||||
await flowQueue.removeRepeatableByKey(job.key);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@@ -305,14 +305,8 @@ class User extends Base {
|
|||||||
active: true,
|
active: true,
|
||||||
});
|
});
|
||||||
|
|
||||||
const repeatableJobs = await flowQueue.getRepeatableJobs();
|
|
||||||
|
|
||||||
for (const flow of flows) {
|
for (const flow of flows) {
|
||||||
const job = repeatableJobs.find((job) => job.id === flow.id);
|
await flowQueue.removeRepeatableJobById(flow.id);
|
||||||
|
|
||||||
if (job) {
|
|
||||||
await flowQueue.removeRepeatableByKey(job.key);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
const executionIds = (
|
const executionIds = (
|
||||||
|
@@ -996,9 +996,21 @@ describe('User model', () => {
|
|||||||
|
|
||||||
const user = await createUser();
|
const user = await createUser();
|
||||||
|
|
||||||
|
const presentDate = DateTime.fromObject(
|
||||||
|
{ year: 2024, month: 11, day: 17, hour: 11, minute: 30 },
|
||||||
|
{ zone: 'UTC+0' }
|
||||||
|
);
|
||||||
|
|
||||||
|
vi.setSystemTime(presentDate);
|
||||||
|
|
||||||
await user.startTrialPeriod();
|
await user.startTrialPeriod();
|
||||||
|
|
||||||
vi.setSystemTime(DateTime.now().plus({ month: 1 }));
|
const futureDate = DateTime.fromObject(
|
||||||
|
{ year: 2025, month: 1, day: 1 },
|
||||||
|
{ zone: 'UTC+0' }
|
||||||
|
);
|
||||||
|
|
||||||
|
vi.setSystemTime(futureDate);
|
||||||
|
|
||||||
const refetchedUser = await user.$query();
|
const refetchedUser = await user.$query();
|
||||||
|
|
||||||
@@ -1106,9 +1118,7 @@ describe('User model', () => {
|
|||||||
|
|
||||||
const user = await createUser();
|
const user = await createUser();
|
||||||
|
|
||||||
await expect(() => user.getPlanAndUsage()).rejects.toThrow(
|
expect(() => user.getPlanAndUsage()).rejects.toThrow('NotFoundError');
|
||||||
'NotFoundError'
|
|
||||||
);
|
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
@@ -1179,7 +1189,7 @@ describe('User model', () => {
|
|||||||
});
|
});
|
||||||
|
|
||||||
it('should throw not found error when user role does not exist', async () => {
|
it('should throw not found error when user role does not exist', async () => {
|
||||||
await expect(() =>
|
expect(() =>
|
||||||
User.registerUser({
|
User.registerUser({
|
||||||
fullName: 'Sample user',
|
fullName: 'Sample user',
|
||||||
email: 'user@automatisch.io',
|
email: 'user@automatisch.io',
|
||||||
@@ -1247,8 +1257,6 @@ describe('User model', () => {
|
|||||||
|
|
||||||
describe('createUsageData', () => {
|
describe('createUsageData', () => {
|
||||||
it('should create usage data if Automatisch is a cloud installation', async () => {
|
it('should create usage data if Automatisch is a cloud installation', async () => {
|
||||||
vi.useFakeTimers();
|
|
||||||
|
|
||||||
vi.spyOn(appConfig, 'isCloud', 'get').mockReturnValue(true);
|
vi.spyOn(appConfig, 'isCloud', 'get').mockReturnValue(true);
|
||||||
|
|
||||||
const user = await createUser({
|
const user = await createUser({
|
||||||
@@ -1256,14 +1264,10 @@ describe('User model', () => {
|
|||||||
email: 'user@automatisch.io',
|
email: 'user@automatisch.io',
|
||||||
});
|
});
|
||||||
|
|
||||||
vi.setSystemTime(DateTime.now().plus({ month: 1 }));
|
|
||||||
|
|
||||||
const usageData = await user.createUsageData();
|
const usageData = await user.createUsageData();
|
||||||
const currentUsageData = await user.$relatedQuery('currentUsageData');
|
const currentUsageData = await user.$relatedQuery('currentUsageData');
|
||||||
|
|
||||||
expect(usageData).toStrictEqual(currentUsageData);
|
expect(usageData).toStrictEqual(currentUsageData);
|
||||||
|
|
||||||
vi.useRealTimers();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
it('should not create usage data if Automatisch is not a cloud installation', async () => {
|
it('should not create usage data if Automatisch is not a cloud installation', async () => {
|
||||||
|
@@ -1,4 +1,31 @@
|
|||||||
import { generateQueue } from './queue.js';
|
import process from 'process';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import redisConfig from '../config/redis.js';
|
||||||
|
import logger from '../helpers/logger.js';
|
||||||
|
|
||||||
|
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||||
|
|
||||||
|
const redisConnection = {
|
||||||
|
connection: redisConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
const actionQueue = new Queue('action', redisConnection);
|
||||||
|
|
||||||
|
process.on('SIGTERM', async () => {
|
||||||
|
await actionQueue.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
actionQueue.on('error', (error) => {
|
||||||
|
if (error.code === CONNECTION_REFUSED) {
|
||||||
|
logger.error(
|
||||||
|
'Make sure you have installed Redis and it is running.',
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.error('Error happened in action queue!', error);
|
||||||
|
});
|
||||||
|
|
||||||
const actionQueue = generateQueue('action');
|
|
||||||
export default actionQueue;
|
export default actionQueue;
|
||||||
|
30
packages/backend/src/queues/base.js
Normal file
30
packages/backend/src/queues/base.js
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
/* eslint-disable no-unused-vars */
|
||||||
|
|
||||||
|
class BaseQueue {
|
||||||
|
constructor(name) {
|
||||||
|
if (new.target === BaseQueue) {
|
||||||
|
throw new Error('Cannot instantiate abstract class BaseQueue directly.');
|
||||||
|
}
|
||||||
|
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Abstract methods to be implemented by subclasses
|
||||||
|
async add(jobName, data, options) {
|
||||||
|
throw new Error('Method "add" must be implemented.');
|
||||||
|
}
|
||||||
|
|
||||||
|
async remove(jobId) {
|
||||||
|
throw new Error('Method "remove" must be implemented.');
|
||||||
|
}
|
||||||
|
|
||||||
|
async getRepeatableJobs() {
|
||||||
|
throw new Error('Method "getRepeatableJobs" must be implemented.');
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeRepeatableJobByKey(jobKey) {
|
||||||
|
throw new Error('Method "removeRepeatableJobByKey" must be implemented.');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default BaseQueue;
|
154
packages/backend/src/queues/bullmq.js
Normal file
154
packages/backend/src/queues/bullmq.js
Normal file
@@ -0,0 +1,154 @@
|
|||||||
|
import { Queue, Worker } from 'bullmq';
|
||||||
|
import redisConfig from '../config/redis.js';
|
||||||
|
import logger from '../helpers/logger.js';
|
||||||
|
import BaseQueue from './base.js';
|
||||||
|
|
||||||
|
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||||
|
|
||||||
|
class BullMQQueue extends BaseQueue {
|
||||||
|
static queueOptions = {
|
||||||
|
connection: redisConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
constructor(name) {
|
||||||
|
super(name);
|
||||||
|
|
||||||
|
this.queue = new Queue(name, this.constructor.queueOptions);
|
||||||
|
this.workers = [];
|
||||||
|
|
||||||
|
this.setupErrorHandlers();
|
||||||
|
this.setupGracefulShutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
async add(jobName, data, options = {}) {
|
||||||
|
try {
|
||||||
|
const job = await this.queue.add(jobName, data, options);
|
||||||
|
|
||||||
|
return job;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to add job to queue "${this.name}":`, error);
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async remove(jobId) {
|
||||||
|
try {
|
||||||
|
const job = await this.getJob(jobId);
|
||||||
|
|
||||||
|
if (job) {
|
||||||
|
await job.remove();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Failed to remove job from queue "${this.name}":`, error);
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async getJob(jobId) {
|
||||||
|
return await this.queue.getJob(jobId);
|
||||||
|
}
|
||||||
|
|
||||||
|
async getRepeatableJobById(jobId) {
|
||||||
|
const repeatableJobs = await this.getRepeatableJobs();
|
||||||
|
const job = repeatableJobs.find((job) => job.id === jobId);
|
||||||
|
|
||||||
|
return job;
|
||||||
|
}
|
||||||
|
|
||||||
|
async getRepeatableJobs() {
|
||||||
|
try {
|
||||||
|
return await this.queue.getRepeatableJobs();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to get repeatable jobs from queue "${this.name}":`,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeRepeatableJobByKey(jobKey) {
|
||||||
|
try {
|
||||||
|
await this.queue.removeRepeatableByKey(jobKey);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to remove repeatable job from queue "${this.name}":`,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async removeRepeatableJobById(jobId) {
|
||||||
|
const job = await this.getRepeatableJobById(jobId);
|
||||||
|
|
||||||
|
return await this.removeRepeatableJobByKey(job.key);
|
||||||
|
}
|
||||||
|
|
||||||
|
startWorker(processor, workerOptions = {}) {
|
||||||
|
const worker = new Worker(this.name, processor, {
|
||||||
|
...this.queueOptions,
|
||||||
|
...workerOptions,
|
||||||
|
});
|
||||||
|
|
||||||
|
worker.on('error', (error) => {
|
||||||
|
logger.error(`Worker error in queue "${this.name}":`, error);
|
||||||
|
});
|
||||||
|
|
||||||
|
this.workers.push(worker);
|
||||||
|
|
||||||
|
return worker;
|
||||||
|
}
|
||||||
|
|
||||||
|
setupErrorHandlers() {
|
||||||
|
this.queue.on('error', (error) => {
|
||||||
|
if (error.code === CONNECTION_REFUSED) {
|
||||||
|
logger.error(
|
||||||
|
'Make sure you have installed Redis and it is running.',
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.error(`Queue error in "${this.name}":`, error);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
setupGracefulShutdown() {
|
||||||
|
const shutdown = async () => {
|
||||||
|
logger.log(`Shutting down queue "${this.name}"...`);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// Close all workers gracefully
|
||||||
|
for (const worker of this.workers) {
|
||||||
|
await worker.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.queue.close();
|
||||||
|
|
||||||
|
logger.log(`Queue "${this.name}" shut down successfully.`);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(`Error during shutdown of queue "${this.name}":`, error);
|
||||||
|
|
||||||
|
process.exit(1);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
process.on('SIGTERM', shutdown);
|
||||||
|
process.on('SIGINT', shutdown);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export default BullMQQueue;
|
@@ -1,4 +1,31 @@
|
|||||||
import { generateQueue } from './queue.js';
|
import process from 'process';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import redisConfig from '../config/redis.js';
|
||||||
|
import logger from '../helpers/logger.js';
|
||||||
|
|
||||||
|
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||||
|
|
||||||
|
const redisConnection = {
|
||||||
|
connection: redisConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
const deleteUserQueue = new Queue('delete-user', redisConnection);
|
||||||
|
|
||||||
|
process.on('SIGTERM', async () => {
|
||||||
|
await deleteUserQueue.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
deleteUserQueue.on('error', (error) => {
|
||||||
|
if (error.code === CONNECTION_REFUSED) {
|
||||||
|
logger.error(
|
||||||
|
'Make sure you have installed Redis and it is running.',
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.error('Error happened in delete user queue!', error);
|
||||||
|
});
|
||||||
|
|
||||||
const deleteUserQueue = generateQueue('delete-user');
|
|
||||||
export default deleteUserQueue;
|
export default deleteUserQueue;
|
||||||
|
@@ -1,4 +1,31 @@
|
|||||||
import { generateQueue } from './queue.js';
|
import process from 'process';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import redisConfig from '../config/redis.js';
|
||||||
|
import logger from '../helpers/logger.js';
|
||||||
|
|
||||||
|
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||||
|
|
||||||
|
const redisConnection = {
|
||||||
|
connection: redisConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
const emailQueue = new Queue('email', redisConnection);
|
||||||
|
|
||||||
|
process.on('SIGTERM', async () => {
|
||||||
|
await emailQueue.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
emailQueue.on('error', (error) => {
|
||||||
|
if (error.code === CONNECTION_REFUSED) {
|
||||||
|
logger.error(
|
||||||
|
'Make sure you have installed Redis and it is running.',
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.error('Error happened in email queue!', error);
|
||||||
|
});
|
||||||
|
|
||||||
const emailQueue = generateQueue('email');
|
|
||||||
export default emailQueue;
|
export default emailQueue;
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { generateQueue } from './queue.js';
|
import BullMQQueue from './bullmq.js';
|
||||||
|
|
||||||
|
const flowQueue = new BullMQQueue('flow');
|
||||||
|
|
||||||
const flowQueue = generateQueue('flow');
|
|
||||||
export default flowQueue;
|
export default flowQueue;
|
||||||
|
@@ -1,21 +0,0 @@
|
|||||||
import appConfig from '../config/app.js';
|
|
||||||
import actionQueue from './action.js';
|
|
||||||
import emailQueue from './email.js';
|
|
||||||
import flowQueue from './flow.js';
|
|
||||||
import triggerQueue from './trigger.js';
|
|
||||||
import deleteUserQueue from './delete-user.ee.js';
|
|
||||||
import removeCancelledSubscriptionsQueue from './remove-cancelled-subscriptions.ee.js';
|
|
||||||
|
|
||||||
const queues = [
|
|
||||||
actionQueue,
|
|
||||||
emailQueue,
|
|
||||||
flowQueue,
|
|
||||||
triggerQueue,
|
|
||||||
deleteUserQueue,
|
|
||||||
];
|
|
||||||
|
|
||||||
if (appConfig.isCloud) {
|
|
||||||
queues.push(removeCancelledSubscriptionsQueue);
|
|
||||||
}
|
|
||||||
|
|
||||||
export default queues;
|
|
@@ -1,44 +0,0 @@
|
|||||||
import process from 'process';
|
|
||||||
import { Queue } from 'bullmq';
|
|
||||||
import redisConfig from '../config/redis.js';
|
|
||||||
import logger from '../helpers/logger.js';
|
|
||||||
|
|
||||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
|
||||||
|
|
||||||
const redisConnection = {
|
|
||||||
connection: redisConfig,
|
|
||||||
};
|
|
||||||
|
|
||||||
export const generateQueue = (queueName, options) => {
|
|
||||||
const queue = new Queue(queueName, redisConnection);
|
|
||||||
|
|
||||||
queue.on('error', (error) => queueOnError(error, queueName));
|
|
||||||
|
|
||||||
if (options?.runDaily) addScheduler(queueName, queue);
|
|
||||||
|
|
||||||
return queue;
|
|
||||||
};
|
|
||||||
|
|
||||||
const queueOnError = (error, queueName) => {
|
|
||||||
if (error.code === CONNECTION_REFUSED) {
|
|
||||||
const errorMessage =
|
|
||||||
'Make sure you have installed Redis and it is running.';
|
|
||||||
|
|
||||||
logger.error(errorMessage, error);
|
|
||||||
|
|
||||||
process.exit();
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.error(`Error happened in ${queueName} queue!`, error);
|
|
||||||
};
|
|
||||||
|
|
||||||
const addScheduler = (queueName, queue) => {
|
|
||||||
const everydayAtOneOclock = '0 1 * * *';
|
|
||||||
|
|
||||||
queue.add(queueName, null, {
|
|
||||||
jobId: queueName,
|
|
||||||
repeat: {
|
|
||||||
pattern: everydayAtOneOclock,
|
|
||||||
},
|
|
||||||
});
|
|
||||||
};
|
|
@@ -1,8 +1,44 @@
|
|||||||
import { generateQueue } from './queue.js';
|
import process from 'process';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import redisConfig from '../config/redis.js';
|
||||||
|
import logger from '../helpers/logger.js';
|
||||||
|
|
||||||
const removeCancelledSubscriptionsQueue = generateQueue(
|
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||||
|
|
||||||
|
const redisConnection = {
|
||||||
|
connection: redisConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
const removeCancelledSubscriptionsQueue = new Queue(
|
||||||
'remove-cancelled-subscriptions',
|
'remove-cancelled-subscriptions',
|
||||||
{ runDaily: true }
|
redisConnection
|
||||||
);
|
);
|
||||||
|
|
||||||
|
process.on('SIGTERM', async () => {
|
||||||
|
await removeCancelledSubscriptionsQueue.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
removeCancelledSubscriptionsQueue.on('error', (error) => {
|
||||||
|
if (error.code === CONNECTION_REFUSED) {
|
||||||
|
logger.error(
|
||||||
|
'Make sure you have installed Redis and it is running.',
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.error(
|
||||||
|
'Error happened in remove cancelled subscriptions queue!',
|
||||||
|
error
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
removeCancelledSubscriptionsQueue.add('remove-cancelled-subscriptions', null, {
|
||||||
|
jobId: 'remove-cancelled-subscriptions',
|
||||||
|
repeat: {
|
||||||
|
pattern: '0 1 * * *',
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
export default removeCancelledSubscriptionsQueue;
|
export default removeCancelledSubscriptionsQueue;
|
||||||
|
@@ -1,4 +1,31 @@
|
|||||||
import { generateQueue } from './queue.js';
|
import process from 'process';
|
||||||
|
import { Queue } from 'bullmq';
|
||||||
|
import redisConfig from '../config/redis.js';
|
||||||
|
import logger from '../helpers/logger.js';
|
||||||
|
|
||||||
|
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||||
|
|
||||||
|
const redisConnection = {
|
||||||
|
connection: redisConfig,
|
||||||
|
};
|
||||||
|
|
||||||
|
const triggerQueue = new Queue('trigger', redisConnection);
|
||||||
|
|
||||||
|
process.on('SIGTERM', async () => {
|
||||||
|
await triggerQueue.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
triggerQueue.on('error', (error) => {
|
||||||
|
if (error.code === CONNECTION_REFUSED) {
|
||||||
|
logger.error(
|
||||||
|
'Make sure you have installed Redis and it is running.',
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
process.exit();
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.error('Error happened in trigger queue!', error);
|
||||||
|
});
|
||||||
|
|
||||||
const triggerQueue = generateQueue('trigger');
|
|
||||||
export default triggerQueue;
|
export default triggerQueue;
|
||||||
|
@@ -1,22 +1,20 @@
|
|||||||
import * as Sentry from './helpers/sentry.ee.js';
|
import * as Sentry from './helpers/sentry.ee.js';
|
||||||
import process from 'node:process';
|
import appConfig from './config/app.js';
|
||||||
|
|
||||||
Sentry.init();
|
Sentry.init();
|
||||||
|
|
||||||
import './config/orm.js';
|
import './config/orm.js';
|
||||||
import './helpers/check-worker-readiness.js';
|
import './helpers/check-worker-readiness.js';
|
||||||
import queues from './queues/index.js';
|
import './workers/flow.js';
|
||||||
import workers from './workers/index.js';
|
import './workers/trigger.js';
|
||||||
|
import './workers/action.js';
|
||||||
|
import './workers/email.js';
|
||||||
|
import './workers/delete-user.ee.js';
|
||||||
|
|
||||||
process.on('SIGTERM', async () => {
|
if (appConfig.isCloud) {
|
||||||
for (const queue of queues) {
|
import('./workers/remove-cancelled-subscriptions.ee.js');
|
||||||
await queue.close();
|
import('./queues/remove-cancelled-subscriptions.ee.js');
|
||||||
}
|
}
|
||||||
|
|
||||||
for (const worker of workers) {
|
|
||||||
await worker.close();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
import telemetry from './helpers/telemetry/index.js';
|
import telemetry from './helpers/telemetry/index.js';
|
||||||
|
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { Worker } from 'bullmq';
|
import { Worker } from 'bullmq';
|
||||||
|
import process from 'node:process';
|
||||||
|
|
||||||
import * as Sentry from '../helpers/sentry.ee.js';
|
import * as Sentry from '../helpers/sentry.ee.js';
|
||||||
import redisConfig from '../config/redis.js';
|
import redisConfig from '../config/redis.js';
|
||||||
@@ -14,7 +15,7 @@ import delayAsMilliseconds from '../helpers/delay-as-milliseconds.js';
|
|||||||
|
|
||||||
const DEFAULT_DELAY_DURATION = 0;
|
const DEFAULT_DELAY_DURATION = 0;
|
||||||
|
|
||||||
const actionWorker = new Worker(
|
export const worker = new Worker(
|
||||||
'action',
|
'action',
|
||||||
async (job) => {
|
async (job) => {
|
||||||
const { stepId, flowId, executionId, computedParameters, executionStep } =
|
const { stepId, flowId, executionId, computedParameters, executionStep } =
|
||||||
@@ -54,11 +55,11 @@ const actionWorker = new Worker(
|
|||||||
{ connection: redisConfig }
|
{ connection: redisConfig }
|
||||||
);
|
);
|
||||||
|
|
||||||
actionWorker.on('completed', (job) => {
|
worker.on('completed', (job) => {
|
||||||
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
|
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
|
||||||
});
|
});
|
||||||
|
|
||||||
actionWorker.on('failed', (job, err) => {
|
worker.on('failed', (job, err) => {
|
||||||
const errorMessage = `
|
const errorMessage = `
|
||||||
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
|
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
|
||||||
\n ${err.stack}
|
\n ${err.stack}
|
||||||
@@ -73,4 +74,6 @@ actionWorker.on('failed', (job, err) => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
export default actionWorker;
|
process.on('SIGTERM', async () => {
|
||||||
|
await worker.close();
|
||||||
|
});
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { Worker } from 'bullmq';
|
import { Worker } from 'bullmq';
|
||||||
|
import process from 'node:process';
|
||||||
|
|
||||||
import * as Sentry from '../helpers/sentry.ee.js';
|
import * as Sentry from '../helpers/sentry.ee.js';
|
||||||
import redisConfig from '../config/redis.js';
|
import redisConfig from '../config/redis.js';
|
||||||
@@ -7,7 +8,7 @@ import appConfig from '../config/app.js';
|
|||||||
import User from '../models/user.js';
|
import User from '../models/user.js';
|
||||||
import ExecutionStep from '../models/execution-step.js';
|
import ExecutionStep from '../models/execution-step.js';
|
||||||
|
|
||||||
const deleteUserWorker = new Worker(
|
export const worker = new Worker(
|
||||||
'delete-user',
|
'delete-user',
|
||||||
async (job) => {
|
async (job) => {
|
||||||
const { id } = job.data;
|
const { id } = job.data;
|
||||||
@@ -45,13 +46,13 @@ const deleteUserWorker = new Worker(
|
|||||||
{ connection: redisConfig }
|
{ connection: redisConfig }
|
||||||
);
|
);
|
||||||
|
|
||||||
deleteUserWorker.on('completed', (job) => {
|
worker.on('completed', (job) => {
|
||||||
logger.info(
|
logger.info(
|
||||||
`JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has been deleted!`
|
`JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has been deleted!`
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
deleteUserWorker.on('failed', (job, err) => {
|
worker.on('failed', (job, err) => {
|
||||||
const errorMessage = `
|
const errorMessage = `
|
||||||
JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has failed to be deleted! ${err.message}
|
JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has failed to be deleted! ${err.message}
|
||||||
\n ${err.stack}
|
\n ${err.stack}
|
||||||
@@ -66,4 +67,6 @@ deleteUserWorker.on('failed', (job, err) => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
export default deleteUserWorker;
|
process.on('SIGTERM', async () => {
|
||||||
|
await worker.close();
|
||||||
|
});
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { Worker } from 'bullmq';
|
import { Worker } from 'bullmq';
|
||||||
|
import process from 'node:process';
|
||||||
|
|
||||||
import * as Sentry from '../helpers/sentry.ee.js';
|
import * as Sentry from '../helpers/sentry.ee.js';
|
||||||
import redisConfig from '../config/redis.js';
|
import redisConfig from '../config/redis.js';
|
||||||
@@ -15,7 +16,7 @@ const isAutomatischEmail = (email) => {
|
|||||||
return email.endsWith('@automatisch.io');
|
return email.endsWith('@automatisch.io');
|
||||||
};
|
};
|
||||||
|
|
||||||
const emailWorker = new Worker(
|
export const worker = new Worker(
|
||||||
'email',
|
'email',
|
||||||
async (job) => {
|
async (job) => {
|
||||||
const { email, subject, template, params } = job.data;
|
const { email, subject, template, params } = job.data;
|
||||||
@@ -38,13 +39,13 @@ const emailWorker = new Worker(
|
|||||||
{ connection: redisConfig }
|
{ connection: redisConfig }
|
||||||
);
|
);
|
||||||
|
|
||||||
emailWorker.on('completed', (job) => {
|
worker.on('completed', (job) => {
|
||||||
logger.info(
|
logger.info(
|
||||||
`JOB ID: ${job.id} - ${job.data.subject} email sent to ${job.data.email}!`
|
`JOB ID: ${job.id} - ${job.data.subject} email sent to ${job.data.email}!`
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
emailWorker.on('failed', (job, err) => {
|
worker.on('failed', (job, err) => {
|
||||||
const errorMessage = `
|
const errorMessage = `
|
||||||
JOB ID: ${job.id} - ${job.data.subject} email to ${job.data.email} has failed to send with ${err.message}
|
JOB ID: ${job.id} - ${job.data.subject} email to ${job.data.email} has failed to send with ${err.message}
|
||||||
\n ${err.stack}
|
\n ${err.stack}
|
||||||
@@ -59,4 +60,6 @@ emailWorker.on('failed', (job, err) => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
export default emailWorker;
|
process.on('SIGTERM', async () => {
|
||||||
|
await worker.close();
|
||||||
|
});
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { Worker } from 'bullmq';
|
import { Worker } from 'bullmq';
|
||||||
|
import process from 'node:process';
|
||||||
|
|
||||||
import * as Sentry from '../helpers/sentry.ee.js';
|
import * as Sentry from '../helpers/sentry.ee.js';
|
||||||
import redisConfig from '../config/redis.js';
|
import redisConfig from '../config/redis.js';
|
||||||
@@ -12,7 +13,7 @@ import {
|
|||||||
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
||||||
} from '../helpers/remove-job-configuration.js';
|
} from '../helpers/remove-job-configuration.js';
|
||||||
|
|
||||||
const flowWorker = new Worker(
|
export const worker = new Worker(
|
||||||
'flow',
|
'flow',
|
||||||
async (job) => {
|
async (job) => {
|
||||||
const { flowId } = job.data;
|
const { flowId } = job.data;
|
||||||
@@ -63,11 +64,11 @@ const flowWorker = new Worker(
|
|||||||
{ connection: redisConfig }
|
{ connection: redisConfig }
|
||||||
);
|
);
|
||||||
|
|
||||||
flowWorker.on('completed', (job) => {
|
worker.on('completed', (job) => {
|
||||||
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
|
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
|
||||||
});
|
});
|
||||||
|
|
||||||
flowWorker.on('failed', async (job, err) => {
|
worker.on('failed', async (job, err) => {
|
||||||
const errorMessage = `
|
const errorMessage = `
|
||||||
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
|
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
|
||||||
\n ${err.stack}
|
\n ${err.stack}
|
||||||
@@ -78,7 +79,7 @@ flowWorker.on('failed', async (job, err) => {
|
|||||||
const flow = await Flow.query().findById(job.data.flowId);
|
const flow = await Flow.query().findById(job.data.flowId);
|
||||||
|
|
||||||
if (!flow) {
|
if (!flow) {
|
||||||
await flowQueue.removeRepeatableByKey(job.repeatJobKey);
|
await flowQueue.removeRepeatableJobByKey(job.repeatJobKey);
|
||||||
|
|
||||||
const flowNotFoundErrorMessage = `
|
const flowNotFoundErrorMessage = `
|
||||||
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!
|
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!
|
||||||
@@ -94,4 +95,6 @@ flowWorker.on('failed', async (job, err) => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
export default flowWorker;
|
process.on('SIGTERM', async () => {
|
||||||
|
await worker.close();
|
||||||
|
});
|
||||||
|
@@ -1,21 +0,0 @@
|
|||||||
import appConfig from '../config/app.js';
|
|
||||||
import actionWorker from './action.js';
|
|
||||||
import emailWorker from './email.js';
|
|
||||||
import flowWorker from './flow.js';
|
|
||||||
import triggerWorker from './trigger.js';
|
|
||||||
import deleteUserWorker from './delete-user.ee.js';
|
|
||||||
import removeCancelledSubscriptionsWorker from './remove-cancelled-subscriptions.ee.js';
|
|
||||||
|
|
||||||
const workers = [
|
|
||||||
actionWorker,
|
|
||||||
emailWorker,
|
|
||||||
flowWorker,
|
|
||||||
triggerWorker,
|
|
||||||
deleteUserWorker,
|
|
||||||
];
|
|
||||||
|
|
||||||
if (appConfig.isCloud) {
|
|
||||||
workers.push(removeCancelledSubscriptionsWorker);
|
|
||||||
}
|
|
||||||
|
|
||||||
export default workers;
|
|
@@ -1,11 +1,12 @@
|
|||||||
import { Worker } from 'bullmq';
|
import { Worker } from 'bullmq';
|
||||||
|
import process from 'node:process';
|
||||||
import { DateTime } from 'luxon';
|
import { DateTime } from 'luxon';
|
||||||
import * as Sentry from '../helpers/sentry.ee.js';
|
import * as Sentry from '../helpers/sentry.ee.js';
|
||||||
import redisConfig from '../config/redis.js';
|
import redisConfig from '../config/redis.js';
|
||||||
import logger from '../helpers/logger.js';
|
import logger from '../helpers/logger.js';
|
||||||
import Subscription from '../models/subscription.ee.js';
|
import Subscription from '../models/subscription.ee.js';
|
||||||
|
|
||||||
const removeCancelledSubscriptionsWorker = new Worker(
|
export const worker = new Worker(
|
||||||
'remove-cancelled-subscriptions',
|
'remove-cancelled-subscriptions',
|
||||||
async () => {
|
async () => {
|
||||||
await Subscription.query()
|
await Subscription.query()
|
||||||
@@ -22,13 +23,13 @@ const removeCancelledSubscriptionsWorker = new Worker(
|
|||||||
{ connection: redisConfig }
|
{ connection: redisConfig }
|
||||||
);
|
);
|
||||||
|
|
||||||
removeCancelledSubscriptionsWorker.on('completed', (job) => {
|
worker.on('completed', (job) => {
|
||||||
logger.info(
|
logger.info(
|
||||||
`JOB ID: ${job.id} - The cancelled subscriptions have been removed!`
|
`JOB ID: ${job.id} - The cancelled subscriptions have been removed!`
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
removeCancelledSubscriptionsWorker.on('failed', (job, err) => {
|
worker.on('failed', (job, err) => {
|
||||||
const errorMessage = `
|
const errorMessage = `
|
||||||
JOB ID: ${job.id} - ERROR: The cancelled subscriptions can not be removed! ${err.message}
|
JOB ID: ${job.id} - ERROR: The cancelled subscriptions can not be removed! ${err.message}
|
||||||
\n ${err.stack}
|
\n ${err.stack}
|
||||||
@@ -41,4 +42,6 @@ removeCancelledSubscriptionsWorker.on('failed', (job, err) => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
export default removeCancelledSubscriptionsWorker;
|
process.on('SIGTERM', async () => {
|
||||||
|
await worker.close();
|
||||||
|
});
|
||||||
|
@@ -1,4 +1,5 @@
|
|||||||
import { Worker } from 'bullmq';
|
import { Worker } from 'bullmq';
|
||||||
|
import process from 'node:process';
|
||||||
|
|
||||||
import * as Sentry from '../helpers/sentry.ee.js';
|
import * as Sentry from '../helpers/sentry.ee.js';
|
||||||
import redisConfig from '../config/redis.js';
|
import redisConfig from '../config/redis.js';
|
||||||
@@ -11,7 +12,7 @@ import {
|
|||||||
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
||||||
} from '../helpers/remove-job-configuration.js';
|
} from '../helpers/remove-job-configuration.js';
|
||||||
|
|
||||||
const triggerWorker = new Worker(
|
export const worker = new Worker(
|
||||||
'trigger',
|
'trigger',
|
||||||
async (job) => {
|
async (job) => {
|
||||||
const { flowId, executionId, stepId, executionStep } = await processTrigger(
|
const { flowId, executionId, stepId, executionStep } = await processTrigger(
|
||||||
@@ -40,11 +41,11 @@ const triggerWorker = new Worker(
|
|||||||
{ connection: redisConfig }
|
{ connection: redisConfig }
|
||||||
);
|
);
|
||||||
|
|
||||||
triggerWorker.on('completed', (job) => {
|
worker.on('completed', (job) => {
|
||||||
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
|
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
|
||||||
});
|
});
|
||||||
|
|
||||||
triggerWorker.on('failed', (job, err) => {
|
worker.on('failed', (job, err) => {
|
||||||
const errorMessage = `
|
const errorMessage = `
|
||||||
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
|
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
|
||||||
\n ${err.stack}
|
\n ${err.stack}
|
||||||
@@ -59,4 +60,6 @@ triggerWorker.on('failed', (job, err) => {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
export default triggerWorker;
|
process.on('SIGTERM', async () => {
|
||||||
|
await worker.close();
|
||||||
|
});
|
||||||
|
Reference in New Issue
Block a user