fix: Remove deleted flows from Redis
This commit is contained in:
@@ -1,17 +1,44 @@
|
||||
import { Duration } from 'luxon';
|
||||
import Context from '../../types/express/context';
|
||||
import deleteUserQueue from '../../queues/delete-user.ee';
|
||||
import flowQueue from '../../queues/flow';
|
||||
import Flow from '../../models/flow';
|
||||
|
||||
const deleteCurrentUser = async (_parent: unknown, params: never, context: Context) => {
|
||||
const deleteCurrentUser = async (
|
||||
_parent: unknown,
|
||||
params: never,
|
||||
context: Context
|
||||
) => {
|
||||
const id = context.currentUser.id;
|
||||
|
||||
const flows = await context.currentUser.$relatedQuery('flows').where({
|
||||
status: 'active',
|
||||
});
|
||||
|
||||
const repeatableJobs = await flowQueue.getRepeatableJobs();
|
||||
|
||||
for (const flow of flows) {
|
||||
const job = repeatableJobs.find((job) => job.id === flow.id);
|
||||
|
||||
if (job) {
|
||||
await flowQueue.removeRepeatableByKey(job.key);
|
||||
}
|
||||
}
|
||||
|
||||
await context.currentUser.$query().delete();
|
||||
|
||||
await Flow.query()
|
||||
.whereIn(
|
||||
'id',
|
||||
flows.map((flow) => flow.id)
|
||||
)
|
||||
.delete();
|
||||
|
||||
const jobName = `Delete user - ${id}`;
|
||||
const jobPayload = { id };
|
||||
const millisecondsFor30Days = Duration.fromObject({ days: 30 }).toMillis();
|
||||
const jobOptions = {
|
||||
delay: millisecondsFor30Days
|
||||
delay: millisecondsFor30Days,
|
||||
};
|
||||
|
||||
await deleteUserQueue.add(jobName, jobPayload, jobOptions);
|
||||
|
@@ -3,6 +3,7 @@ import { Worker } from 'bullmq';
|
||||
import * as Sentry from '../helpers/sentry.ee';
|
||||
import redisConfig from '../config/redis';
|
||||
import logger from '../helpers/logger';
|
||||
import flowQueue from '../queues/flow';
|
||||
import triggerQueue from '../queues/trigger';
|
||||
import { processFlow } from '../services/flow';
|
||||
import Flow from '../models/flow';
|
||||
@@ -66,7 +67,7 @@ worker.on('completed', (job) => {
|
||||
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
|
||||
});
|
||||
|
||||
worker.on('failed', (job, err) => {
|
||||
worker.on('failed', async (job, err) => {
|
||||
const errorMessage = `
|
||||
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}
|
||||
\n ${err.stack}
|
||||
@@ -74,6 +75,18 @@ worker.on('failed', (job, err) => {
|
||||
|
||||
logger.error(errorMessage);
|
||||
|
||||
const flow = await Flow.query().findById(job.data.flowId);
|
||||
|
||||
if (!flow) {
|
||||
await flowQueue.removeRepeatableByKey(job.repeatJobKey);
|
||||
|
||||
const flowNotFoundErrorMessage = `
|
||||
JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found!
|
||||
`;
|
||||
|
||||
logger.error(flowNotFoundErrorMessage);
|
||||
}
|
||||
|
||||
Sentry.captureException(err, {
|
||||
extra: {
|
||||
jobId: job.id,
|
||||
|
Reference in New Issue
Block a user