feat: Implement update flow status rest API endpoint

This commit is contained in:
Faruk AYDIN
2024-09-17 16:07:12 +03:00
parent 712a5756e2
commit 1790ef0ee6
7 changed files with 350 additions and 3 deletions

View File

@@ -7,6 +7,14 @@ import ExecutionStep from './execution-step.js';
import globalVariable from '../helpers/global-variable.js';
import logger from '../helpers/logger.js';
import Telemetry from '../helpers/telemetry/index.js';
import flowQueue from '../queues/flow.js';
import {
REMOVE_AFTER_30_DAYS_OR_150_JOBS,
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
} from '../helpers/remove-job-configuration.js';
const JOB_NAME = 'flow';
const EVERY_15_MINUTES_CRON = '*/15 * * * *';
class Flow extends Base {
static tableName = 'flows';
@@ -277,6 +285,68 @@ class Flow extends Base {
return duplicatedFlowWithSteps;
}
async updateStatus(newActiveValue) {
if (this.active === newActiveValue) {
return this;
}
const triggerStep = await this.getTriggerStep();
if (triggerStep.status === 'incomplete') {
throw this.IncompleteStepsError;
}
const trigger = await triggerStep.getTriggerCommand();
const interval = trigger.getInterval?.(triggerStep.parameters);
const repeatOptions = {
pattern: interval || EVERY_15_MINUTES_CRON,
};
if (trigger.type === 'webhook') {
const $ = await globalVariable({
flow: this,
connection: await triggerStep.$relatedQuery('connection'),
app: await triggerStep.getApp(),
step: triggerStep,
testRun: false,
});
if (newActiveValue && trigger.registerHook) {
await trigger.registerHook($);
} else if (!newActiveValue && trigger.unregisterHook) {
await trigger.unregisterHook($);
}
} else {
if (newActiveValue) {
await this.$query().patchAndFetch({
publishedAt: new Date().toISOString(),
});
const jobName = `${JOB_NAME}-${this.id}`;
await flowQueue.add(
jobName,
{ flowId: this.id },
{
repeat: repeatOptions,
jobId: this.id,
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS,
}
);
} else {
const repeatableJobs = await flowQueue.getRepeatableJobs();
const job = repeatableJobs.find((job) => job.id === this.id);
await flowQueue.removeRepeatableByKey(job.key);
}
}
return await this.$query().withGraphFetched('steps').patchAndFetch({
active: newActiveValue,
});
}
async $beforeUpdate(opt, queryContext) {
await super.$beforeUpdate(opt, queryContext);