diff --git a/packages/backend/src/graphql/mutations/delete-current-user.ee.ts b/packages/backend/src/graphql/mutations/delete-current-user.ee.ts index 84462445..734300cb 100644 --- a/packages/backend/src/graphql/mutations/delete-current-user.ee.ts +++ b/packages/backend/src/graphql/mutations/delete-current-user.ee.ts @@ -1,17 +1,44 @@ import { Duration } from 'luxon'; import Context from '../../types/express/context'; import deleteUserQueue from '../../queues/delete-user.ee'; +import flowQueue from '../../queues/flow'; +import Flow from '../../models/flow'; -const deleteCurrentUser = async (_parent: unknown, params: never, context: Context) => { +const deleteCurrentUser = async ( + _parent: unknown, + params: never, + context: Context +) => { const id = context.currentUser.id; + const flows = await context.currentUser.$relatedQuery('flows').where({ + status: 'active', + }); + + const repeatableJobs = await flowQueue.getRepeatableJobs(); + + for (const flow of flows) { + const job = repeatableJobs.find((job) => job.id === flow.id); + + if (job) { + await flowQueue.removeRepeatableByKey(job.key); + } + } + await context.currentUser.$query().delete(); + await Flow.query() + .whereIn( + 'id', + flows.map((flow) => flow.id) + ) + .delete(); + const jobName = `Delete user - ${id}`; const jobPayload = { id }; const millisecondsFor30Days = Duration.fromObject({ days: 30 }).toMillis(); const jobOptions = { - delay: millisecondsFor30Days + delay: millisecondsFor30Days, }; await deleteUserQueue.add(jobName, jobPayload, jobOptions); diff --git a/packages/backend/src/workers/flow.ts b/packages/backend/src/workers/flow.ts index 49ced1f8..d97e954c 100644 --- a/packages/backend/src/workers/flow.ts +++ b/packages/backend/src/workers/flow.ts @@ -3,6 +3,7 @@ import { Worker } from 'bullmq'; import * as Sentry from '../helpers/sentry.ee'; import redisConfig from '../config/redis'; import logger from '../helpers/logger'; +import flowQueue from '../queues/flow'; import triggerQueue from '../queues/trigger'; import { processFlow } from '../services/flow'; import Flow from '../models/flow'; @@ -66,7 +67,7 @@ worker.on('completed', (job) => { logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); }); -worker.on('failed', (job, err) => { +worker.on('failed', async (job, err) => { const errorMessage = ` JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message} \n ${err.stack} @@ -74,6 +75,18 @@ worker.on('failed', (job, err) => { logger.error(errorMessage); + const flow = await Flow.query().findById(job.data.flowId); + + if (!flow) { + await flowQueue.removeRepeatableByKey(job.repeatJobKey); + + const flowNotFoundErrorMessage = ` + JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has been deleted from Redis because flow was not found! + `; + + logger.error(flowNotFoundErrorMessage); + } + Sentry.captureException(err, { extra: { jobId: job.id,