Merge pull request #1050 from automatisch/add-virtual-flow-status

feat(flow): add virtual paused/published/draft status
This commit is contained in:
Ömer Faruk Aydın
2023-04-11 17:04:32 +02:00
committed by GitHub
12 changed files with 110 additions and 64 deletions

View File

@@ -39,6 +39,7 @@ type AppConfig = {
smtpPassword: string; smtpPassword: string;
fromEmail: string; fromEmail: string;
isCloud: boolean; isCloud: boolean;
isSelfHosted: boolean;
paddleVendorId: number; paddleVendorId: number;
paddleVendorAuthCode: string; paddleVendorAuthCode: string;
paddlePublicKey: string; paddlePublicKey: string;
@@ -110,6 +111,7 @@ const appConfig: AppConfig = {
smtpPassword: process.env.SMTP_PASSWORD, smtpPassword: process.env.SMTP_PASSWORD,
fromEmail: process.env.FROM_EMAIL, fromEmail: process.env.FROM_EMAIL,
isCloud: process.env.AUTOMATISCH_CLOUD === 'true', isCloud: process.env.AUTOMATISCH_CLOUD === 'true',
isSelfHosted: process.env.AUTOMATISCH_CLOUD !== 'true',
paddleVendorId: Number(process.env.PADDLE_VENDOR_ID), paddleVendorId: Number(process.env.PADDLE_VENDOR_ID),
paddleVendorAuthCode: process.env.PADDLE_VENDOR_AUTH_CODE, paddleVendorAuthCode: process.env.PADDLE_VENDOR_AUTH_CODE,
paddlePublicKey: process.env.PADDLE_PUBLIC_KEY, paddlePublicKey: process.env.PADDLE_PUBLIC_KEY,

View File

@@ -6,17 +6,24 @@ import Flow from '../../models/flow';
import { processTrigger } from '../../services/trigger'; import { processTrigger } from '../../services/trigger';
import actionQueue from '../../queues/action'; import actionQueue from '../../queues/action';
import globalVariable from '../../helpers/global-variable'; import globalVariable from '../../helpers/global-variable';
import { REMOVE_AFTER_30_DAYS_OR_150_JOBS, REMOVE_AFTER_7_DAYS_OR_50_JOBS } from '../../helpers/remove-job-configuration'; import QuotaExceededError from '../../errors/quote-exceeded';
import {
REMOVE_AFTER_30_DAYS_OR_150_JOBS,
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
} from '../../helpers/remove-job-configuration';
export default async (request: IRequest, response: Response) => { export default async (request: IRequest, response: Response) => {
const flow = await Flow.query() const flow = await Flow.query()
.findById(request.params.flowId) .findById(request.params.flowId)
.throwIfNotFound(); .throwIfNotFound();
const testRun = !flow.active; const user = await flow.$relatedQuery('user');
if (!testRun) { const testRun = !flow.active;
await flow.throwIfQuotaExceeded(); const quotaExceeded = !testRun && !(await user.isAllowedToRunFlows());
if (quotaExceeded) {
throw new QuotaExceededError();
} }
const triggerStep = await flow.getTriggerStep(); const triggerStep = await flow.getTriggerStep();
@@ -58,7 +65,7 @@ export default async (request: IRequest, response: Response) => {
headers: request.headers, headers: request.headers,
body: request.body, body: request.body,
query: request.query, query: request.query,
} };
rawInternalId = JSON.stringify(payload); rawInternalId = JSON.stringify(payload);
} }
@@ -74,7 +81,7 @@ export default async (request: IRequest, response: Response) => {
flowId: flow.id, flowId: flow.id,
stepId: triggerStep.id, stepId: triggerStep.id,
triggerItem, triggerItem,
testRun testRun,
}); });
if (testRun) { if (testRun) {
@@ -93,7 +100,7 @@ export default async (request: IRequest, response: Response) => {
const jobOptions = { const jobOptions = {
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS, removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS, removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS,
} };
await actionQueue.add(jobName, jobPayload, jobOptions); await actionQueue.add(jobName, jobPayload, jobOptions);

View File

@@ -250,6 +250,12 @@ type FlowEdge {
node: Flow node: Flow
} }
enum FlowStatus {
paused
published
draft
}
type Flow { type Flow {
id: String id: String
name: String name: String
@@ -257,6 +263,7 @@ type Flow {
steps: [Step] steps: [Step]
createdAt: String createdAt: String
updatedAt: String updatedAt: String
status: FlowStatus
} }
type Execution { type Execution {

View File

@@ -1,19 +1,22 @@
import { ValidationError } from 'objection'; import { ValidationError } from 'objection';
import type { ModelOptions, QueryContext } from 'objection'; import type {
import appConfig from '../config/app'; ModelOptions,
QueryContext,
StaticHookArguments,
} from 'objection';
import ExtendedQueryBuilder from './query-builder'; import ExtendedQueryBuilder from './query-builder';
import Base from './base'; import Base from './base';
import Step from './step'; import Step from './step';
import User from './user'; import User from './user';
import Execution from './execution'; import Execution from './execution';
import Telemetry from '../helpers/telemetry'; import Telemetry from '../helpers/telemetry';
import QuotaExceededError from '../errors/quote-exceeded';
class Flow extends Base { class Flow extends Base {
id!: string; id!: string;
name!: string; name!: string;
userId!: string; userId!: string;
active: boolean; active: boolean;
status: 'paused' | 'published' | 'draft';
steps: Step[]; steps: Step[];
published_at: string; published_at: string;
remoteWebhookId: string; remoteWebhookId: string;
@@ -65,6 +68,26 @@ class Flow extends Base {
}, },
}); });
static async afterFind(args: StaticHookArguments<any>): Promise<any> {
const { result } = args;
const referenceFlow = result[0];
if (referenceFlow) {
const shouldBePaused = await referenceFlow.isPaused();
for (const flow of result) {
if (!flow.active) {
flow.status = 'draft';
} else if (flow.active && shouldBePaused) {
flow.status = 'paused';
} else {
flow.status = 'published';
}
}
}
}
async lastInternalId() { async lastInternalId() {
const lastExecution = await this.$relatedQuery('executions') const lastExecution = await this.$relatedQuery('executions')
.orderBy('created_at', 'desc') .orderBy('created_at', 'desc')
@@ -132,31 +155,9 @@ class Flow extends Base {
}); });
} }
async checkIfQuotaExceeded() { async isPaused() {
if (!appConfig.isCloud) return;
const user = await this.$relatedQuery('user'); const user = await this.$relatedQuery('user');
const usageData = await user.$relatedQuery('currentUsageData'); return await user.isAllowedToRunFlows();
const hasExceeded = await usageData.checkIfLimitExceeded();
if (hasExceeded) {
return true;
}
return false;
}
async throwIfQuotaExceeded() {
if (!appConfig.isCloud) return;
const hasExceeded = await this.checkIfQuotaExceeded();
if (hasExceeded) {
throw new QuotaExceededError();
}
return this;
} }
} }

View File

@@ -2,7 +2,6 @@ import { raw } from 'objection';
import Base from './base'; import Base from './base';
import User from './user'; import User from './user';
import Subscription from './subscription.ee'; import Subscription from './subscription.ee';
import { getPlanById } from '../helpers/billing/plans.ee';
class UsageData extends Base { class UsageData extends Base {
id!: string; id!: string;
@@ -47,24 +46,6 @@ class UsageData extends Base {
}, },
}); });
async checkIfLimitExceeded() {
const user = await this.$relatedQuery('user');
if (await user.inTrial()) {
return false;
}
const subscription = await this.$relatedQuery('subscription');
if (!subscription.isActive) {
return true;
}
const plan = subscription.plan;
return this.consumedTaskCount >= plan.quota;
}
async increaseConsumedTaskCountByOne() { async increaseConsumedTaskCountByOne() {
return await this.$query().patch({ return await this.$query().patch({
consumedTaskCount: raw('consumed_task_count + 1'), consumedTaskCount: raw('consumed_task_count + 1'),

View File

@@ -165,18 +165,24 @@ class User extends Base {
this.trialExpiryDate = DateTime.now().plus({ days: 30 }).toISODate(); this.trialExpiryDate = DateTime.now().plus({ days: 30 }).toISODate();
} }
async hasActiveSubscription() { async isAllowedToRunFlows() {
if (!appConfig.isCloud) { if (appConfig.isSelfHosted) {
return false; return true;
} }
const subscription = await this.$relatedQuery('currentSubscription'); if (await this.inTrial()) {
return true;
}
return subscription?.isActive; if ((await this.hasActiveSubscription()) && (await this.withinLimits())) {
return true;
}
return false;
} }
async inTrial() { async inTrial() {
if (!appConfig.isCloud) { if (appConfig.isSelfHosted) {
return false; return false;
} }
@@ -196,6 +202,24 @@ class User extends Base {
return now < expiryDate; return now < expiryDate;
} }
async hasActiveSubscription() {
if (!appConfig.isCloud) {
return false;
}
const subscription = await this.$relatedQuery('currentSubscription');
return subscription?.isActive;
}
async withinLimits() {
const currentSubscription = await this.$relatedQuery('currentSubscription');
const plan = currentSubscription.plan;
const currentUsageData = await this.$relatedQuery('currentUsageData');
return currentUsageData.consumedTaskCount >= plan.quota;
}
async $beforeInsert(queryContext: QueryContext) { async $beforeInsert(queryContext: QueryContext) {
await super.$beforeInsert(queryContext); await super.$beforeInsert(queryContext);
await this.generateHash(); await this.generateHash();

View File

@@ -17,10 +17,10 @@ export const worker = new Worker(
const { flowId } = job.data; const { flowId } = job.data;
const flow = await Flow.query().findById(flowId).throwIfNotFound(); const flow = await Flow.query().findById(flowId).throwIfNotFound();
const user = await flow.$relatedQuery('user');
const allowedToRunFlows = await user.isAllowedToRunFlows();
const quotaExceeded = await flow.checkIfQuotaExceeded(); if (!allowedToRunFlows) {
if (quotaExceeded) {
return; return;
} }

View File

@@ -73,6 +73,7 @@ export interface IFlow {
name: string; name: string;
userId: string; userId: string;
active: boolean; active: boolean;
status: 'paused' | 'published' | 'draft';
steps: IStep[]; steps: IStep[];
createdAt: string; createdAt: string;
updatedAt: string; updatedAt: string;

View File

@@ -18,6 +18,26 @@ type FlowRowProps = {
flow: IFlow; flow: IFlow;
}; };
function getFlowStatusTranslationKey(status: IFlow["status"]): string {
if (status === 'published') {
return 'flow.published';
} else if (status === 'paused') {
return 'flow.paused';
}
return 'flow.draft';
}
function getFlowStatusColor(status: IFlow["status"]): 'default' | 'primary' | 'secondary' | 'error' | 'info' | 'success' | 'warning' {
if (status === 'published') {
return 'success';
} else if (status === 'paused') {
return 'error';
}
return 'info';
}
export default function FlowRow(props: FlowRowProps): React.ReactElement { export default function FlowRow(props: FlowRowProps): React.ReactElement {
const formatMessage = useFormatMessage(); const formatMessage = useFormatMessage();
const contextButtonRef = React.useRef<HTMLButtonElement | null>(null); const contextButtonRef = React.useRef<HTMLButtonElement | null>(null);
@@ -76,10 +96,10 @@ export default function FlowRow(props: FlowRowProps): React.ReactElement {
<ContextMenu> <ContextMenu>
<Chip <Chip
size="small" size="small"
color={flow?.active ? 'success' : 'info'} color={getFlowStatusColor(flow?.status)}
variant={flow?.active ? 'filled' : 'outlined'} variant={flow?.active ? 'filled' : 'outlined'}
label={formatMessage( label={formatMessage(
flow?.active ? 'flow.published' : 'flow.draft' getFlowStatusTranslationKey(flow?.status)
)} )}
/> />

View File

@@ -6,6 +6,7 @@ export const GET_FLOW = gql`
id id
name name
active active
status
steps { steps {
id id
type type

View File

@@ -26,6 +26,7 @@ export const GET_FLOWS = gql`
createdAt createdAt
updatedAt updatedAt
active active
status
steps { steps {
iconUrl iconUrl
} }

View File

@@ -43,6 +43,7 @@
"flow.active": "ON", "flow.active": "ON",
"flow.inactive": "OFF", "flow.inactive": "OFF",
"flow.published": "Published", "flow.published": "Published",
"flow.paused": "Paused",
"flow.draft": "Draft", "flow.draft": "Draft",
"flow.successfullyDeleted": "The flow and associated executions have been deleted.", "flow.successfullyDeleted": "The flow and associated executions have been deleted.",
"flowEditor.publish": "PUBLISH", "flowEditor.publish": "PUBLISH",