65 lines
1.4 KiB
TypeScript
65 lines
1.4 KiB
TypeScript
import Context from '../../types/express/context';
|
|
import processorQueue from '../../queues/processor';
|
|
|
|
type Params = {
|
|
input: {
|
|
id: string;
|
|
active: boolean;
|
|
};
|
|
};
|
|
|
|
const JOB_NAME = 'processorJob';
|
|
const EVERY_15_MINUTES_CRON = '*/15 * * * *';
|
|
|
|
const updateFlowStatus = async (
|
|
_parent: unknown,
|
|
params: Params,
|
|
context: Context
|
|
) => {
|
|
let flow = await context.currentUser
|
|
.$relatedQuery('flows')
|
|
.findOne({
|
|
id: params.input.id,
|
|
})
|
|
.throwIfNotFound();
|
|
|
|
if (flow.active === params.input.active) {
|
|
return flow;
|
|
}
|
|
|
|
flow = await flow.$query().withGraphFetched('steps').patchAndFetch({
|
|
active: params.input.active,
|
|
});
|
|
|
|
const triggerStep = await flow.getTriggerStep();
|
|
const trigger = await triggerStep.getTrigger();
|
|
const interval = trigger.getInterval?.(triggerStep.parameters);
|
|
const repeatOptions = {
|
|
cron: interval || EVERY_15_MINUTES_CRON,
|
|
};
|
|
|
|
if (flow.active) {
|
|
flow = await flow.$query().patchAndFetch({
|
|
published_at: new Date().toISOString(),
|
|
});
|
|
|
|
await processorQueue.add(
|
|
JOB_NAME,
|
|
{ flowId: flow.id },
|
|
{
|
|
repeat: repeatOptions,
|
|
jobId: flow.id,
|
|
}
|
|
);
|
|
} else {
|
|
const repeatableJobs = await processorQueue.getRepeatableJobs();
|
|
const job = repeatableJobs.find((job) => job.id === flow.id);
|
|
|
|
await processorQueue.removeRepeatableByKey(job.key);
|
|
}
|
|
|
|
return flow;
|
|
};
|
|
|
|
export default updateFlowStatus;
|