Merge pull request #989 from automatisch/skip-processing-steps-over-quota
feat: skip processing tasks over task quota
This commit is contained in:
@@ -14,6 +14,11 @@ export default async (request: IRequest, response: Response) => {
|
|||||||
.throwIfNotFound();
|
.throwIfNotFound();
|
||||||
|
|
||||||
const testRun = !flow.active;
|
const testRun = !flow.active;
|
||||||
|
|
||||||
|
if (!testRun) {
|
||||||
|
await flow.throwIfQuotaExceeded();
|
||||||
|
}
|
||||||
|
|
||||||
const triggerStep = await flow.getTriggerStep();
|
const triggerStep = await flow.getTriggerStep();
|
||||||
const triggerCommand = await triggerStep.getTriggerCommand();
|
const triggerCommand = await triggerStep.getTriggerCommand();
|
||||||
const app = await triggerStep.getApp();
|
const app = await triggerStep.getApp();
|
||||||
|
@@ -1,5 +1,6 @@
|
|||||||
import { ValidationError } from 'objection';
|
import { ValidationError } from 'objection';
|
||||||
import type { ModelOptions, QueryContext } from 'objection';
|
import type { ModelOptions, QueryContext } from 'objection';
|
||||||
|
import appConfig from '../config/app';
|
||||||
import ExtendedQueryBuilder from './query-builder';
|
import ExtendedQueryBuilder from './query-builder';
|
||||||
import Base from './base';
|
import Base from './base';
|
||||||
import Step from './step';
|
import Step from './step';
|
||||||
@@ -129,6 +130,21 @@ class Flow extends Base {
|
|||||||
type: 'trigger',
|
type: 'trigger',
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async throwIfQuotaExceeded() {
|
||||||
|
if (!appConfig.isCloud) return;
|
||||||
|
|
||||||
|
const user = await this.$relatedQuery('user');
|
||||||
|
const usageData = await user.$relatedQuery('usageData');
|
||||||
|
|
||||||
|
const hasExceeded = await usageData.checkIfLimitExceeded();
|
||||||
|
|
||||||
|
if (hasExceeded) {
|
||||||
|
throw new Error('The allowed task quota has been exhausted!');
|
||||||
|
}
|
||||||
|
|
||||||
|
return this;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export default Flow;
|
export default Flow;
|
||||||
|
@@ -1,12 +1,14 @@
|
|||||||
import { raw } from 'objection';
|
import { raw } from 'objection';
|
||||||
import Base from './base';
|
import Base from './base';
|
||||||
import User from './user';
|
import User from './user';
|
||||||
|
import PaymentPlan from './payment-plan.ee';
|
||||||
|
|
||||||
class UsageData extends Base {
|
class UsageData extends Base {
|
||||||
id!: string;
|
id!: string;
|
||||||
userId!: string;
|
userId!: string;
|
||||||
consumedTaskCount!: number;
|
consumedTaskCount!: number;
|
||||||
nextResetAt!: string;
|
nextResetAt!: string;
|
||||||
|
paymentPlan?: PaymentPlan;
|
||||||
|
|
||||||
static tableName = 'usage_data';
|
static tableName = 'usage_data';
|
||||||
|
|
||||||
@@ -31,8 +33,22 @@ class UsageData extends Base {
|
|||||||
to: 'users.id',
|
to: 'users.id',
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
paymentPlan: {
|
||||||
|
relation: Base.BelongsToOneRelation,
|
||||||
|
modelClass: PaymentPlan,
|
||||||
|
join: {
|
||||||
|
from: 'usage_data.user_id',
|
||||||
|
to: 'payment_plans.user_id',
|
||||||
|
},
|
||||||
|
},
|
||||||
});
|
});
|
||||||
|
|
||||||
|
async checkIfLimitExceeded() {
|
||||||
|
const paymentPlan = await this.$relatedQuery('paymentPlan');
|
||||||
|
|
||||||
|
return this.consumedTaskCount >= paymentPlan.taskCount;
|
||||||
|
}
|
||||||
|
|
||||||
async increaseConsumedTaskCountByOne() {
|
async increaseConsumedTaskCountByOne() {
|
||||||
return await this.$query().patch({ consumedTaskCount: raw('consumed_task_count + 1') });
|
return await this.$query().patch({ consumedTaskCount: raw('consumed_task_count + 1') });
|
||||||
}
|
}
|
||||||
|
@@ -1,5 +1,6 @@
|
|||||||
import express, { Router } from 'express';
|
import express, { Response, Router, NextFunction, RequestHandler } from 'express';
|
||||||
import multer from 'multer';
|
import multer from 'multer';
|
||||||
|
|
||||||
import { IRequest } from '@automatisch/types';
|
import { IRequest } from '@automatisch/types';
|
||||||
import appConfig from '../config/app';
|
import appConfig from '../config/app';
|
||||||
import webhookHandler from '../controllers/webhooks/handler';
|
import webhookHandler from '../controllers/webhooks/handler';
|
||||||
@@ -16,9 +17,17 @@ router.use(express.text({
|
|||||||
},
|
},
|
||||||
}));
|
}));
|
||||||
|
|
||||||
router.get('/:flowId', webhookHandler);
|
const exposeError = (handler: RequestHandler) => async (req: IRequest, res: Response, next: NextFunction) => {
|
||||||
router.put('/:flowId', webhookHandler);
|
try {
|
||||||
router.patch('/:flowId', webhookHandler);
|
await handler(req, res, next);
|
||||||
router.post('/:flowId', webhookHandler);
|
} catch (err) {
|
||||||
|
res.status(422).send(err);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
router.get('/:flowId', exposeError(webhookHandler));
|
||||||
|
router.put('/:flowId', exposeError(webhookHandler));
|
||||||
|
router.patch('/:flowId', exposeError(webhookHandler));
|
||||||
|
router.post('/:flowId', exposeError(webhookHandler));
|
||||||
|
|
||||||
export default router;
|
export default router;
|
||||||
|
@@ -1,3 +1,4 @@
|
|||||||
|
import appConfig from '../config/app';
|
||||||
import Step from '../models/step';
|
import Step from '../models/step';
|
||||||
import Flow from '../models/flow';
|
import Flow from '../models/flow';
|
||||||
import Execution from '../models/execution';
|
import Execution from '../models/execution';
|
||||||
@@ -17,17 +18,23 @@ type ProcessActionOptions = {
|
|||||||
export const processAction = async (options: ProcessActionOptions) => {
|
export const processAction = async (options: ProcessActionOptions) => {
|
||||||
const { flowId, stepId, executionId } = options;
|
const { flowId, stepId, executionId } = options;
|
||||||
|
|
||||||
const step = await Step.query().findById(stepId).throwIfNotFound();
|
const flow = await Flow.query().findById(flowId).throwIfNotFound();
|
||||||
const execution = await Execution.query()
|
const execution = await Execution.query()
|
||||||
.findById(executionId)
|
.findById(executionId)
|
||||||
.throwIfNotFound();
|
.throwIfNotFound();
|
||||||
|
|
||||||
|
if (!execution.testRun) {
|
||||||
|
await flow.throwIfQuotaExceeded();
|
||||||
|
}
|
||||||
|
|
||||||
|
const step = await Step.query().findById(stepId).throwIfNotFound();
|
||||||
|
|
||||||
const $ = await globalVariable({
|
const $ = await globalVariable({
|
||||||
flow: await Flow.query().findById(flowId).throwIfNotFound(),
|
flow,
|
||||||
app: await step.getApp(),
|
app: await step.getApp(),
|
||||||
step: step,
|
step: step,
|
||||||
connection: await step.$relatedQuery('connection'),
|
connection: await step.$relatedQuery('connection'),
|
||||||
execution: execution,
|
execution,
|
||||||
});
|
});
|
||||||
|
|
||||||
const priorExecutionSteps = await ExecutionStep.query().where({
|
const priorExecutionSteps = await ExecutionStep.query().where({
|
||||||
|
@@ -1,3 +1,4 @@
|
|||||||
|
import appConfig from '../config/app';
|
||||||
import Flow from '../models/flow';
|
import Flow from '../models/flow';
|
||||||
import globalVariable from '../helpers/global-variable';
|
import globalVariable from '../helpers/global-variable';
|
||||||
import EarlyExitError from '../errors/early-exit';
|
import EarlyExitError from '../errors/early-exit';
|
||||||
@@ -10,7 +11,12 @@ type ProcessFlowOptions = {
|
|||||||
};
|
};
|
||||||
|
|
||||||
export const processFlow = async (options: ProcessFlowOptions) => {
|
export const processFlow = async (options: ProcessFlowOptions) => {
|
||||||
const flow = await Flow.query().findById(options.flowId).throwIfNotFound();
|
const { testRun, flowId } = options;
|
||||||
|
const flow = await Flow.query().findById(flowId).throwIfNotFound();
|
||||||
|
|
||||||
|
if (!testRun) {
|
||||||
|
await flow.throwIfQuotaExceeded();
|
||||||
|
}
|
||||||
|
|
||||||
const triggerStep = await flow.getTriggerStep();
|
const triggerStep = await flow.getTriggerStep();
|
||||||
const triggerCommand = await triggerStep.getTriggerCommand();
|
const triggerCommand = await triggerStep.getTriggerCommand();
|
||||||
@@ -20,7 +26,7 @@ export const processFlow = async (options: ProcessFlowOptions) => {
|
|||||||
connection: await triggerStep.$relatedQuery('connection'),
|
connection: await triggerStep.$relatedQuery('connection'),
|
||||||
app: await triggerStep.getApp(),
|
app: await triggerStep.getApp(),
|
||||||
step: triggerStep,
|
step: triggerStep,
|
||||||
testRun: options.testRun,
|
testRun,
|
||||||
});
|
});
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
Reference in New Issue
Block a user