Merge branch 'main' into feature/signalwire-integration
This commit is contained in:
@@ -27,6 +27,8 @@
|
||||
"@graphql-tools/graphql-file-loader": "^7.3.4",
|
||||
"@graphql-tools/load": "^7.5.2",
|
||||
"@rudderstack/rudder-sdk-node": "^1.1.2",
|
||||
"@sentry/node": "^7.42.0",
|
||||
"@sentry/tracing": "^7.42.0",
|
||||
"@types/luxon": "^2.3.1",
|
||||
"ajv-formats": "^2.1.1",
|
||||
"axios": "0.24.0",
|
||||
|
@@ -1,9 +1,12 @@
|
||||
import createError from 'http-errors';
|
||||
import express from 'express';
|
||||
import appConfig from './config/app';
|
||||
import cors from 'cors';
|
||||
|
||||
import { IRequest } from '@automatisch/types';
|
||||
import appConfig from './config/app';
|
||||
import corsOptions from './config/cors-options';
|
||||
import morgan from './helpers/morgan';
|
||||
import * as Sentry from './helpers/sentry.ee';
|
||||
import appAssetsHandler from './helpers/app-assets-handler';
|
||||
import webUIHandler from './helpers/web-ui-handler';
|
||||
import errorHandler from './helpers/error-handler';
|
||||
@@ -14,12 +17,16 @@ import {
|
||||
} from './helpers/create-bull-board-handler';
|
||||
import injectBullBoardHandler from './helpers/inject-bull-board-handler';
|
||||
import router from './routes';
|
||||
import { IRequest } from '@automatisch/types';
|
||||
|
||||
createBullBoardHandler(serverAdapter);
|
||||
|
||||
const app = express();
|
||||
|
||||
Sentry.init(app);
|
||||
|
||||
Sentry.attachRequestHandler(app);
|
||||
Sentry.attachTracingHandler(app);
|
||||
|
||||
injectBullBoardHandler(app, serverAdapter);
|
||||
|
||||
appAssetsHandler(app);
|
||||
@@ -50,6 +57,8 @@ app.use(function (req, res, next) {
|
||||
next(createError(404));
|
||||
});
|
||||
|
||||
Sentry.attachErrorHandler(app);
|
||||
|
||||
app.use(errorHandler);
|
||||
|
||||
export default app;
|
||||
|
@@ -44,6 +44,7 @@ type AppConfig = {
|
||||
stripeStarterPriceKey: string;
|
||||
stripeGrowthPriceKey: string;
|
||||
licenseKey: string;
|
||||
sentryDsn: string;
|
||||
};
|
||||
|
||||
const host = process.env.HOST || 'localhost';
|
||||
@@ -115,6 +116,7 @@ const appConfig: AppConfig = {
|
||||
stripeStarterPriceKey: process.env.STRIPE_STARTER_PRICE_KEY,
|
||||
stripeGrowthPriceKey: process.env.STRIPE_GROWTH_PRICE_KEY,
|
||||
licenseKey: process.env.LICENSE_KEY,
|
||||
sentryDsn: process.env.SENTRY_DSN,
|
||||
};
|
||||
|
||||
if (!appConfig.encryptionKey) {
|
||||
|
@@ -1,5 +1,7 @@
|
||||
import { Response } from 'express';
|
||||
import { IRequest } from '@automatisch/types';
|
||||
|
||||
import * as Sentry from '../../helpers/sentry.ee';
|
||||
import Billing from '../../helpers/billing/index.ee';
|
||||
import appConfig from '../../config/app';
|
||||
import logger from '../../helpers/logger';
|
||||
@@ -18,6 +20,8 @@ export default async (request: IRequest, response: Response) => {
|
||||
return response.sendStatus(200);
|
||||
} catch (error) {
|
||||
logger.error(`Webhook Error: ${error.message}`);
|
||||
|
||||
Sentry.captureException(error);
|
||||
return response.sendStatus(400);
|
||||
}
|
||||
};
|
||||
|
@@ -14,6 +14,11 @@ export default async (request: IRequest, response: Response) => {
|
||||
.throwIfNotFound();
|
||||
|
||||
const testRun = !flow.active;
|
||||
|
||||
if (!testRun) {
|
||||
await flow.throwIfQuotaExceeded();
|
||||
}
|
||||
|
||||
const triggerStep = await flow.getTriggerStep();
|
||||
const triggerCommand = await triggerStep.getTriggerCommand();
|
||||
const app = await triggerStep.getApp();
|
||||
|
@@ -2,6 +2,7 @@ import { IJSONObject } from '@automatisch/types';
|
||||
|
||||
export default class BaseError extends Error {
|
||||
details = {};
|
||||
statusCode?: number;
|
||||
|
||||
constructor(error?: string | IJSONObject) {
|
||||
let computedError: Record<string, unknown>;
|
||||
|
9
packages/backend/src/errors/quote-exceeded.ts
Normal file
9
packages/backend/src/errors/quote-exceeded.ts
Normal file
@@ -0,0 +1,9 @@
|
||||
import BaseError from './base';
|
||||
|
||||
export default class QuotaExceededError extends BaseError {
|
||||
constructor(error = 'The allowed task quota has been exhausted!') {
|
||||
super(error);
|
||||
|
||||
this.statusCode = 422;
|
||||
}
|
||||
}
|
@@ -1,3 +1,4 @@
|
||||
import App from '../../models/app';
|
||||
import Step from '../../models/step';
|
||||
import Context from '../../types/express/context';
|
||||
|
||||
@@ -16,15 +17,20 @@ const createFlow = async (
|
||||
const connectionId = params?.input?.connectionId;
|
||||
const appKey = params?.input?.triggerAppKey;
|
||||
|
||||
await App.findOneByKey(appKey);
|
||||
|
||||
const flow = await context.currentUser.$relatedQuery('flows').insert({
|
||||
name: 'Name your flow',
|
||||
});
|
||||
|
||||
if (connectionId) {
|
||||
await context.currentUser
|
||||
const hasConnection = await context.currentUser
|
||||
.$relatedQuery('connections')
|
||||
.findById(connectionId)
|
||||
.throwIfNotFound();
|
||||
.findById(connectionId);
|
||||
|
||||
if (!hasConnection) {
|
||||
throw new Error('The connection does not exist!');
|
||||
}
|
||||
}
|
||||
|
||||
await Step.query().insert({
|
||||
|
@@ -1,3 +1,4 @@
|
||||
import App from '../../models/app';
|
||||
import Context from '../../types/express/context';
|
||||
|
||||
type Params = {
|
||||
@@ -23,6 +24,14 @@ const createStep = async (
|
||||
) => {
|
||||
const { input } = params;
|
||||
|
||||
if (input.appKey && input.key) {
|
||||
await App.checkAppAndAction(input.appKey, input.key);
|
||||
}
|
||||
|
||||
if (input.appKey && !input.key) {
|
||||
await App.findOneByKey(input.appKey);
|
||||
}
|
||||
|
||||
const flow = await context.currentUser
|
||||
.$relatedQuery('flows')
|
||||
.findOne({
|
||||
|
@@ -27,7 +27,7 @@ const createUser = async (_parent: unknown, params: Params) => {
|
||||
});
|
||||
|
||||
if (appConfig.isCloud) {
|
||||
Billing.createSubscription(user);
|
||||
await Billing.createSubscription(user);
|
||||
}
|
||||
|
||||
return user;
|
||||
|
@@ -13,11 +13,13 @@ const executeFlow = async (
|
||||
context: Context
|
||||
) => {
|
||||
const { stepId } = params.input;
|
||||
const { executionStep } = await testRun({ stepId });
|
||||
|
||||
const untilStep = await context.currentUser
|
||||
.$relatedQuery('steps')
|
||||
.findById(stepId);
|
||||
.findById(stepId)
|
||||
.throwIfNotFound();
|
||||
|
||||
const { executionStep } = await testRun({ stepId });
|
||||
|
||||
if (executionStep.isFailed) {
|
||||
throw new Error(JSON.stringify(executionStep.errorDetails));
|
||||
|
@@ -1,4 +1,5 @@
|
||||
import { IJSONObject } from '@automatisch/types';
|
||||
import App from '../../models/app';
|
||||
import Step from '../../models/step';
|
||||
import Context from '../../types/express/context';
|
||||
|
||||
@@ -32,6 +33,24 @@ const updateStep = async (
|
||||
})
|
||||
.throwIfNotFound();
|
||||
|
||||
if (input.connection.id) {
|
||||
const hasConnection = await context.currentUser
|
||||
.$relatedQuery('connections')
|
||||
.findById(input.connection?.id);
|
||||
|
||||
if (!hasConnection) {
|
||||
throw new Error('The connection does not exist!');
|
||||
}
|
||||
}
|
||||
|
||||
if (step.isTrigger) {
|
||||
await App.checkAppAndTrigger(input.appKey, input.key);
|
||||
}
|
||||
|
||||
if (step.isAction) {
|
||||
await App.checkAppAndAction(input.appKey, input.key);
|
||||
}
|
||||
|
||||
step = await Step.query()
|
||||
.patchAndFetchById(input.id, {
|
||||
key: input.key,
|
||||
|
@@ -83,7 +83,7 @@ const createPaymentPortalUrl = async (user: User) => {
|
||||
|
||||
const userSession = await stripe.billingPortal.sessions.create({
|
||||
customer: paymentPlan.stripeCustomerId,
|
||||
return_url: 'https://cloud.automatisch.io',
|
||||
return_url: 'https://cloud.automatisch.io/settings/billing',
|
||||
});
|
||||
|
||||
return userSession.url;
|
||||
|
@@ -1,16 +1,15 @@
|
||||
import { Request, Response } from 'express';
|
||||
import { NextFunction, Request, Response } from 'express';
|
||||
import logger from './logger';
|
||||
|
||||
type Error = {
|
||||
message: string;
|
||||
};
|
||||
import BaseError from '../errors/base';
|
||||
|
||||
const errorHandler = (err: Error, req: Request, res: Response): void => {
|
||||
// Do not remove `next` argument as the function signature will not fit for an error handler middleware
|
||||
const errorHandler = (err: BaseError, req: Request, res: Response, next: NextFunction): void => {
|
||||
if (err.message === 'Not Found') {
|
||||
res.status(404).end();
|
||||
} else {
|
||||
logger.error(err.message);
|
||||
res.status(500).end();
|
||||
res.status(err.statusCode || 500).send(err.message);
|
||||
}
|
||||
};
|
||||
|
||||
|
@@ -22,7 +22,11 @@ const apps = fs
|
||||
return apps;
|
||||
}, {} as TApps);
|
||||
|
||||
async function getDefaultExport(appKey: string) {
|
||||
async function getAppDefaultExport(appKey: string) {
|
||||
if (!Object.prototype.hasOwnProperty.call(apps, appKey)) {
|
||||
throw new Error(`An application with the "${appKey}" key couldn't be found.`);
|
||||
}
|
||||
|
||||
return (await apps[appKey]).default;
|
||||
}
|
||||
|
||||
@@ -31,7 +35,7 @@ function stripFunctions<C>(data: C): C {
|
||||
}
|
||||
|
||||
const getApp = async (appKey: string, stripFuncs = true) => {
|
||||
let appData: IApp = cloneDeep(await getDefaultExport(appKey));
|
||||
let appData: IApp = cloneDeep(await getAppDefaultExport(appKey));
|
||||
|
||||
if (appData.auth) {
|
||||
appData = addAuthenticationSteps(appData);
|
||||
|
@@ -4,8 +4,10 @@ import { loadSchemaSync } from '@graphql-tools/load';
|
||||
import { GraphQLFileLoader } from '@graphql-tools/graphql-file-loader';
|
||||
import { addResolversToSchema } from '@graphql-tools/schema';
|
||||
import { applyMiddleware } from 'graphql-middleware';
|
||||
|
||||
import logger from '../helpers/logger';
|
||||
import authentication from '../helpers/authentication';
|
||||
import * as Sentry from '../helpers/sentry.ee';
|
||||
import resolvers from '../graphql/resolvers';
|
||||
import HttpError from '../errors/http';
|
||||
|
||||
@@ -28,6 +30,15 @@ const graphQLInstance = graphqlHTTP({
|
||||
delete (error.originalError as HttpError).response;
|
||||
}
|
||||
|
||||
Sentry.captureException(error, {
|
||||
tags: { graphql: true },
|
||||
extra: {
|
||||
source: error.source?.body,
|
||||
positions: error.positions,
|
||||
path: error.path
|
||||
}
|
||||
})
|
||||
|
||||
return error;
|
||||
},
|
||||
});
|
||||
|
51
packages/backend/src/helpers/sentry.ee.ts
Normal file
51
packages/backend/src/helpers/sentry.ee.ts
Normal file
@@ -0,0 +1,51 @@
|
||||
import { Express } from 'express';
|
||||
import * as Sentry from '@sentry/node';
|
||||
import type { CaptureContext } from '@sentry/types';
|
||||
import * as Tracing from '@sentry/tracing';
|
||||
|
||||
import appConfig from '../config/app';
|
||||
|
||||
export function init(app?: Express) {
|
||||
if (!appConfig.isCloud) return;
|
||||
|
||||
return Sentry.init({
|
||||
enabled: !!appConfig.sentryDsn,
|
||||
dsn: appConfig.sentryDsn,
|
||||
integrations: [
|
||||
app && new Sentry.Integrations.Http({ tracing: true }),
|
||||
app && new Tracing.Integrations.Express({ app }),
|
||||
app && new Tracing.Integrations.GraphQL(),
|
||||
].filter(Boolean),
|
||||
tracesSampleRate: 1.0,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
export function attachRequestHandler(app: Express) {
|
||||
if (!appConfig.isCloud) return;
|
||||
|
||||
app.use(Sentry.Handlers.requestHandler());
|
||||
}
|
||||
|
||||
export function attachTracingHandler(app: Express) {
|
||||
if (!appConfig.isCloud) return;
|
||||
|
||||
app.use(Sentry.Handlers.tracingHandler());
|
||||
}
|
||||
|
||||
export function attachErrorHandler(app: Express) {
|
||||
if (!appConfig.isCloud) return;
|
||||
|
||||
app.use(Sentry.Handlers.errorHandler({
|
||||
shouldHandleError() {
|
||||
// TODO: narrow down the captured errors in time as we receive samples
|
||||
return true;
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
export function captureException(exception: any, captureContext?: CaptureContext) {
|
||||
if (!appConfig.isCloud) return;
|
||||
|
||||
return Sentry.captureException(exception, captureContext);
|
||||
}
|
@@ -36,6 +36,26 @@ class App {
|
||||
|
||||
return appInfoConverter(rawAppData);
|
||||
}
|
||||
|
||||
static async checkAppAndAction(appKey: string, actionKey: string): Promise<void> {
|
||||
const app = await this.findOneByKey(appKey);
|
||||
|
||||
const hasAction = app.actions?.find(action => action.key === actionKey);
|
||||
|
||||
if (!hasAction) {
|
||||
throw new Error(`${app.name} does not have an action with the "${actionKey}" key!`);
|
||||
}
|
||||
}
|
||||
|
||||
static async checkAppAndTrigger(appKey: string, triggerKey: string): Promise<void> {
|
||||
const app = await this.findOneByKey(appKey);
|
||||
|
||||
const hasTrigger = app.triggers?.find(trigger => trigger.key === triggerKey);
|
||||
|
||||
if (!hasTrigger) {
|
||||
throw new Error(`${app.name} does not have a trigger with the "${triggerKey}" key!`);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export default App;
|
||||
|
@@ -1,11 +1,13 @@
|
||||
import { ValidationError } from 'objection';
|
||||
import type { ModelOptions, QueryContext } from 'objection';
|
||||
import appConfig from '../config/app';
|
||||
import ExtendedQueryBuilder from './query-builder';
|
||||
import Base from './base';
|
||||
import Step from './step';
|
||||
import User from './user';
|
||||
import Execution from './execution';
|
||||
import Telemetry from '../helpers/telemetry';
|
||||
import QuotaExceededError from '../errors/quote-exceeded';
|
||||
|
||||
class Flow extends Base {
|
||||
id!: string;
|
||||
@@ -129,6 +131,33 @@ class Flow extends Base {
|
||||
type: 'trigger',
|
||||
});
|
||||
}
|
||||
|
||||
async checkIfQuotaExceeded() {
|
||||
if (!appConfig.isCloud) return;
|
||||
|
||||
const user = await this.$relatedQuery('user');
|
||||
const usageData = await user.$relatedQuery('usageData');
|
||||
|
||||
const hasExceeded = await usageData.checkIfLimitExceeded();
|
||||
|
||||
if (hasExceeded) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
async throwIfQuotaExceeded() {
|
||||
if (!appConfig.isCloud) return;
|
||||
|
||||
const hasExceeded = await this.checkIfQuotaExceeded();
|
||||
|
||||
if (hasExceeded) {
|
||||
throw new QuotaExceededError();
|
||||
}
|
||||
|
||||
return this;
|
||||
}
|
||||
}
|
||||
|
||||
export default Flow;
|
||||
|
@@ -1,12 +1,14 @@
|
||||
import { raw } from 'objection';
|
||||
import Base from './base';
|
||||
import User from './user';
|
||||
import PaymentPlan from './payment-plan.ee';
|
||||
|
||||
class UsageData extends Base {
|
||||
id!: string;
|
||||
userId!: string;
|
||||
consumedTaskCount!: number;
|
||||
nextResetAt!: string;
|
||||
paymentPlan?: PaymentPlan;
|
||||
|
||||
static tableName = 'usage_data';
|
||||
|
||||
@@ -31,8 +33,22 @@ class UsageData extends Base {
|
||||
to: 'users.id',
|
||||
},
|
||||
},
|
||||
paymentPlan: {
|
||||
relation: Base.BelongsToOneRelation,
|
||||
modelClass: PaymentPlan,
|
||||
join: {
|
||||
from: 'usage_data.user_id',
|
||||
to: 'payment_plans.user_id',
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
async checkIfLimitExceeded() {
|
||||
const paymentPlan = await this.$relatedQuery('paymentPlan');
|
||||
|
||||
return this.consumedTaskCount >= paymentPlan.taskCount;
|
||||
}
|
||||
|
||||
async increaseConsumedTaskCountByOne() {
|
||||
return await this.$query().patch({ consumedTaskCount: raw('consumed_task_count + 1') });
|
||||
}
|
||||
|
@@ -1,5 +1,6 @@
|
||||
import express, { Router } from 'express';
|
||||
import express, { Response, Router, NextFunction, RequestHandler } from 'express';
|
||||
import multer from 'multer';
|
||||
|
||||
import { IRequest } from '@automatisch/types';
|
||||
import appConfig from '../config/app';
|
||||
import webhookHandler from '../controllers/webhooks/handler';
|
||||
@@ -16,9 +17,17 @@ router.use(express.text({
|
||||
},
|
||||
}));
|
||||
|
||||
router.get('/:flowId', webhookHandler);
|
||||
router.put('/:flowId', webhookHandler);
|
||||
router.patch('/:flowId', webhookHandler);
|
||||
router.post('/:flowId', webhookHandler);
|
||||
const exposeError = (handler: RequestHandler) => async (req: IRequest, res: Response, next: NextFunction) => {
|
||||
try {
|
||||
await handler(req, res, next);
|
||||
} catch (err) {
|
||||
next(err);
|
||||
}
|
||||
}
|
||||
|
||||
router.get('/:flowId', exposeError(webhookHandler));
|
||||
router.put('/:flowId', exposeError(webhookHandler));
|
||||
router.patch('/:flowId', exposeError(webhookHandler));
|
||||
router.post('/:flowId', exposeError(webhookHandler));
|
||||
|
||||
export default router;
|
||||
|
@@ -17,17 +17,19 @@ type ProcessActionOptions = {
|
||||
export const processAction = async (options: ProcessActionOptions) => {
|
||||
const { flowId, stepId, executionId } = options;
|
||||
|
||||
const step = await Step.query().findById(stepId).throwIfNotFound();
|
||||
const flow = await Flow.query().findById(flowId).throwIfNotFound();
|
||||
const execution = await Execution.query()
|
||||
.findById(executionId)
|
||||
.throwIfNotFound();
|
||||
|
||||
const step = await Step.query().findById(stepId).throwIfNotFound();
|
||||
|
||||
const $ = await globalVariable({
|
||||
flow: await Flow.query().findById(flowId).throwIfNotFound(),
|
||||
flow,
|
||||
app: await step.getApp(),
|
||||
step: step,
|
||||
connection: await step.$relatedQuery('connection'),
|
||||
execution: execution,
|
||||
execution,
|
||||
});
|
||||
|
||||
const priorExecutionSteps = await ExecutionStep.query().where({
|
||||
|
@@ -10,8 +10,8 @@ type ProcessFlowOptions = {
|
||||
};
|
||||
|
||||
export const processFlow = async (options: ProcessFlowOptions) => {
|
||||
const flow = await Flow.query().findById(options.flowId).throwIfNotFound();
|
||||
|
||||
const { testRun, flowId } = options;
|
||||
const flow = await Flow.query().findById(flowId).throwIfNotFound();
|
||||
const triggerStep = await flow.getTriggerStep();
|
||||
const triggerCommand = await triggerStep.getTriggerCommand();
|
||||
|
||||
@@ -20,7 +20,7 @@ export const processFlow = async (options: ProcessFlowOptions) => {
|
||||
connection: await triggerStep.$relatedQuery('connection'),
|
||||
app: await triggerStep.getApp(),
|
||||
step: triggerStep,
|
||||
testRun: options.testRun,
|
||||
testRun,
|
||||
});
|
||||
|
||||
try {
|
||||
|
@@ -1,3 +1,7 @@
|
||||
import * as Sentry from './helpers/sentry.ee';
|
||||
|
||||
Sentry.init();
|
||||
|
||||
import './config/orm';
|
||||
import './helpers/check-worker-readiness';
|
||||
import './workers/flow';
|
||||
|
@@ -1,4 +1,6 @@
|
||||
import { Worker } from 'bullmq';
|
||||
|
||||
import * as Sentry from '../helpers/sentry.ee';
|
||||
import redisConfig from '../config/redis';
|
||||
import logger from '../helpers/logger';
|
||||
import Step from '../models/step';
|
||||
@@ -65,6 +67,12 @@ worker.on('failed', (job, err) => {
|
||||
logger.info(
|
||||
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}`
|
||||
);
|
||||
|
||||
Sentry.captureException(err, {
|
||||
extra: {
|
||||
jobId: job.id,
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
|
@@ -1,4 +1,6 @@
|
||||
import { Worker } from 'bullmq';
|
||||
|
||||
import * as Sentry from '../helpers/sentry.ee';
|
||||
import redisConfig from '../config/redis';
|
||||
import logger from '../helpers/logger';
|
||||
import User from '../models/user';
|
||||
@@ -37,6 +39,12 @@ worker.on('failed', (job, err) => {
|
||||
logger.info(
|
||||
`JOB ID: ${job.id} - The user with the ID of '${job.data.id}' has failed to be deleted! ${err.message}`
|
||||
);
|
||||
|
||||
Sentry.captureException(err, {
|
||||
extra: {
|
||||
jobId: job.id,
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
|
@@ -1,4 +1,6 @@
|
||||
import { Worker } from 'bullmq';
|
||||
|
||||
import * as Sentry from '../helpers/sentry.ee';
|
||||
import redisConfig from '../config/redis';
|
||||
import logger from '../helpers/logger';
|
||||
import mailer from '../helpers/mailer.ee';
|
||||
@@ -30,6 +32,12 @@ worker.on('failed', (job, err) => {
|
||||
logger.info(
|
||||
`JOB ID: ${job.id} - ${job.data.subject} email to ${job.data.email} has failed to send with ${err.message}`
|
||||
);
|
||||
|
||||
Sentry.captureException(err, {
|
||||
extra: {
|
||||
jobId: job.id,
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
|
@@ -1,4 +1,6 @@
|
||||
import { Worker } from 'bullmq';
|
||||
|
||||
import * as Sentry from '../helpers/sentry.ee';
|
||||
import redisConfig from '../config/redis';
|
||||
import logger from '../helpers/logger';
|
||||
import triggerQueue from '../queues/trigger';
|
||||
@@ -12,6 +14,13 @@ export const worker = new Worker(
|
||||
const { flowId } = job.data;
|
||||
|
||||
const flow = await Flow.query().findById(flowId).throwIfNotFound();
|
||||
|
||||
const quotaExceeded = await flow.checkIfQuotaExceeded();
|
||||
|
||||
if (quotaExceeded) {
|
||||
return;
|
||||
}
|
||||
|
||||
const triggerStep = await flow.getTriggerStep();
|
||||
|
||||
const { data, error } = await processFlow({ flowId });
|
||||
@@ -58,6 +67,12 @@ worker.on('failed', (job, err) => {
|
||||
logger.info(
|
||||
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}`
|
||||
);
|
||||
|
||||
Sentry.captureException(err, {
|
||||
extra: {
|
||||
jobId: job.id,
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
|
@@ -1,7 +1,9 @@
|
||||
import { Worker } from 'bullmq';
|
||||
|
||||
import { IJSONObject, ITriggerItem } from '@automatisch/types';
|
||||
import * as Sentry from '../helpers/sentry.ee';
|
||||
import redisConfig from '../config/redis';
|
||||
import logger from '../helpers/logger';
|
||||
import { IJSONObject, ITriggerItem } from '@automatisch/types';
|
||||
import actionQueue from '../queues/action';
|
||||
import Step from '../models/step';
|
||||
import { processTrigger } from '../services/trigger';
|
||||
@@ -51,6 +53,12 @@ worker.on('failed', (job, err) => {
|
||||
logger.info(
|
||||
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}`
|
||||
);
|
||||
|
||||
Sentry.captureException(err, {
|
||||
extra: {
|
||||
jobId: job.id,
|
||||
}
|
||||
});
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
|
Reference in New Issue
Block a user