Merge pull request #747 from automatisch/issue-728
feat(queue): auto clean up complete and fail jobs
This commit is contained in:
@@ -1,5 +1,6 @@
|
|||||||
import Context from '../../types/express/context';
|
import Context from '../../types/express/context';
|
||||||
import flowQueue from '../../queues/flow';
|
import flowQueue from '../../queues/flow';
|
||||||
|
import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../../helpers/remove-job-configuration';
|
||||||
|
|
||||||
type Params = {
|
type Params = {
|
||||||
input: {
|
input: {
|
||||||
@@ -51,6 +52,8 @@ const updateFlowStatus = async (
|
|||||||
{
|
{
|
||||||
repeat: repeatOptions,
|
repeat: repeatOptions,
|
||||||
jobId: flow.id,
|
jobId: flow.id,
|
||||||
|
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
||||||
|
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
|
10
packages/backend/src/helpers/remove-job-configuration.ts
Normal file
10
packages/backend/src/helpers/remove-job-configuration.ts
Normal file
@@ -0,0 +1,10 @@
|
|||||||
|
export const REMOVE_AFTER_30_DAYS_OR_150_JOBS = {
|
||||||
|
age: 30 * 24 * 3600,
|
||||||
|
count: 150,
|
||||||
|
};
|
||||||
|
|
||||||
|
export const REMOVE_AFTER_7_DAYS_OR_50_JOBS = {
|
||||||
|
age: 7 * 24 * 3600,
|
||||||
|
count: 50,
|
||||||
|
};
|
||||||
|
|
@@ -4,6 +4,7 @@ import logger from '../helpers/logger';
|
|||||||
import Step from '../models/step';
|
import Step from '../models/step';
|
||||||
import actionQueue from '../queues/action';
|
import actionQueue from '../queues/action';
|
||||||
import { processAction } from '../services/action';
|
import { processAction } from '../services/action';
|
||||||
|
import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../helpers/remove-job-configuration';
|
||||||
|
|
||||||
type JobData = {
|
type JobData = {
|
||||||
flowId: string;
|
flowId: string;
|
||||||
@@ -31,7 +32,12 @@ export const worker = new Worker(
|
|||||||
stepId: nextStep.id,
|
stepId: nextStep.id,
|
||||||
};
|
};
|
||||||
|
|
||||||
await actionQueue.add(jobName, jobPayload);
|
const jobOptions = {
|
||||||
|
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
||||||
|
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS,
|
||||||
|
}
|
||||||
|
|
||||||
|
await actionQueue.add(jobName, jobPayload, jobOptions);
|
||||||
},
|
},
|
||||||
{ connection: redisConfig }
|
{ connection: redisConfig }
|
||||||
);
|
);
|
||||||
@@ -42,7 +48,7 @@ worker.on('completed', (job) => {
|
|||||||
|
|
||||||
worker.on('failed', (job, err) => {
|
worker.on('failed', (job, err) => {
|
||||||
logger.info(
|
logger.info(
|
||||||
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed22 to start with ${err.message}`
|
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}`
|
||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@@ -4,6 +4,7 @@ import logger from '../helpers/logger';
|
|||||||
import triggerQueue from '../queues/trigger';
|
import triggerQueue from '../queues/trigger';
|
||||||
import { processFlow } from '../services/flow';
|
import { processFlow } from '../services/flow';
|
||||||
import Flow from '../models/flow';
|
import Flow from '../models/flow';
|
||||||
|
import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../helpers/remove-job-configuration';
|
||||||
|
|
||||||
export const worker = new Worker(
|
export const worker = new Worker(
|
||||||
'flow',
|
'flow',
|
||||||
@@ -17,6 +18,11 @@ export const worker = new Worker(
|
|||||||
|
|
||||||
const reversedData = data.reverse();
|
const reversedData = data.reverse();
|
||||||
|
|
||||||
|
const jobOptions = {
|
||||||
|
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
||||||
|
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS,
|
||||||
|
}
|
||||||
|
|
||||||
for (const triggerItem of reversedData) {
|
for (const triggerItem of reversedData) {
|
||||||
const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`;
|
const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`;
|
||||||
|
|
||||||
@@ -26,7 +32,7 @@ export const worker = new Worker(
|
|||||||
triggerItem,
|
triggerItem,
|
||||||
};
|
};
|
||||||
|
|
||||||
await triggerQueue.add(jobName, jobPayload);
|
await triggerQueue.add(jobName, jobPayload, jobOptions);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (error) {
|
if (error) {
|
||||||
@@ -38,7 +44,7 @@ export const worker = new Worker(
|
|||||||
error,
|
error,
|
||||||
};
|
};
|
||||||
|
|
||||||
await triggerQueue.add(jobName, jobPayload);
|
await triggerQueue.add(jobName, jobPayload, jobOptions);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ connection: redisConfig }
|
{ connection: redisConfig }
|
||||||
|
@@ -5,6 +5,7 @@ import { IJSONObject, ITriggerItem } from '@automatisch/types';
|
|||||||
import actionQueue from '../queues/action';
|
import actionQueue from '../queues/action';
|
||||||
import Step from '../models/step';
|
import Step from '../models/step';
|
||||||
import { processTrigger } from '../services/trigger';
|
import { processTrigger } from '../services/trigger';
|
||||||
|
import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../helpers/remove-job-configuration';
|
||||||
|
|
||||||
type JobData = {
|
type JobData = {
|
||||||
flowId: string;
|
flowId: string;
|
||||||
@@ -32,7 +33,12 @@ export const worker = new Worker(
|
|||||||
stepId: nextStep.id,
|
stepId: nextStep.id,
|
||||||
};
|
};
|
||||||
|
|
||||||
await actionQueue.add(jobName, jobPayload);
|
const jobOptions = {
|
||||||
|
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
||||||
|
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS,
|
||||||
|
}
|
||||||
|
|
||||||
|
await actionQueue.add(jobName, jobPayload, jobOptions);
|
||||||
},
|
},
|
||||||
{ connection: redisConfig }
|
{ connection: redisConfig }
|
||||||
);
|
);
|
||||||
|
Reference in New Issue
Block a user