diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 1fc783ed..10e2f9f5 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -1,5 +1,6 @@ import Context from '../../types/express/context'; import flowQueue from '../../queues/flow'; +import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../../helpers/remove-job-configuration'; type Params = { input: { @@ -51,6 +52,8 @@ const updateFlowStatus = async ( { repeat: repeatOptions, jobId: flow.id, + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS } ); } else { diff --git a/packages/backend/src/helpers/remove-job-configuration.ts b/packages/backend/src/helpers/remove-job-configuration.ts new file mode 100644 index 00000000..e6e01443 --- /dev/null +++ b/packages/backend/src/helpers/remove-job-configuration.ts @@ -0,0 +1,10 @@ +export const REMOVE_AFTER_30_DAYS_OR_150_JOBS = { + age: 30 * 24 * 3600, + count: 150, +}; + +export const REMOVE_AFTER_7_DAYS_OR_50_JOBS = { + age: 7 * 24 * 3600, + count: 50, +}; + diff --git a/packages/backend/src/workers/action.ts b/packages/backend/src/workers/action.ts index 75d9bd86..80dd2849 100644 --- a/packages/backend/src/workers/action.ts +++ b/packages/backend/src/workers/action.ts @@ -4,6 +4,7 @@ import logger from '../helpers/logger'; import Step from '../models/step'; import actionQueue from '../queues/action'; import { processAction } from '../services/action'; +import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../helpers/remove-job-configuration'; type JobData = { flowId: string; @@ -31,7 +32,12 @@ export const worker = new Worker( stepId: nextStep.id, }; - await actionQueue.add(jobName, jobPayload); + const jobOptions = { + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, + } + + await actionQueue.add(jobName, jobPayload, jobOptions); }, { connection: redisConfig } ); @@ -42,7 +48,7 @@ worker.on('completed', (job) => { worker.on('failed', (job, err) => { logger.info( - `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed22 to start with ${err.message}` + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` ); }); diff --git a/packages/backend/src/workers/flow.ts b/packages/backend/src/workers/flow.ts index d8dc670a..7ca05782 100644 --- a/packages/backend/src/workers/flow.ts +++ b/packages/backend/src/workers/flow.ts @@ -4,6 +4,7 @@ import logger from '../helpers/logger'; import triggerQueue from '../queues/trigger'; import { processFlow } from '../services/flow'; import Flow from '../models/flow'; +import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../helpers/remove-job-configuration'; export const worker = new Worker( 'flow', @@ -17,6 +18,11 @@ export const worker = new Worker( const reversedData = data.reverse(); + const jobOptions = { + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, + } + for (const triggerItem of reversedData) { const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`; @@ -26,7 +32,7 @@ export const worker = new Worker( triggerItem, }; - await triggerQueue.add(jobName, jobPayload); + await triggerQueue.add(jobName, jobPayload, jobOptions); } if (error) { @@ -38,7 +44,7 @@ export const worker = new Worker( error, }; - await triggerQueue.add(jobName, jobPayload); + await triggerQueue.add(jobName, jobPayload, jobOptions); } }, { connection: redisConfig } diff --git a/packages/backend/src/workers/trigger.ts b/packages/backend/src/workers/trigger.ts index 8d64e032..a7492328 100644 --- a/packages/backend/src/workers/trigger.ts +++ b/packages/backend/src/workers/trigger.ts @@ -5,6 +5,7 @@ import { IJSONObject, ITriggerItem } from '@automatisch/types'; import actionQueue from '../queues/action'; import Step from '../models/step'; import { processTrigger } from '../services/trigger'; +import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../helpers/remove-job-configuration'; type JobData = { flowId: string; @@ -32,7 +33,12 @@ export const worker = new Worker( stepId: nextStep.id, }; - await actionQueue.add(jobName, jobPayload); + const jobOptions = { + removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, + removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, + } + + await actionQueue.add(jobName, jobPayload, jobOptions); }, { connection: redisConfig } );