Merge branch 'main' into feature/signalwire-integration
This commit is contained in:
@@ -29,6 +29,7 @@ const userScopes = [
|
||||
'groups:history',
|
||||
'groups:read',
|
||||
'groups:write',
|
||||
'im:read',
|
||||
'im:write',
|
||||
'mpim:write',
|
||||
'reactions:read',
|
||||
|
@@ -1,5 +1,24 @@
|
||||
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
|
||||
|
||||
type TChannel = {
|
||||
id: string;
|
||||
name: string;
|
||||
}
|
||||
|
||||
type TConversationListResponseData = {
|
||||
channels: TChannel[],
|
||||
response_metadata?: {
|
||||
next_cursor: string
|
||||
};
|
||||
needed?: string;
|
||||
error?: string;
|
||||
ok: boolean;
|
||||
}
|
||||
|
||||
type TResponse = {
|
||||
data: TConversationListResponseData;
|
||||
}
|
||||
|
||||
export default {
|
||||
name: 'List channels',
|
||||
key: 'listChannels',
|
||||
@@ -13,24 +32,33 @@ export default {
|
||||
error: null,
|
||||
};
|
||||
|
||||
const response = await $.http.get('/conversations.list', {
|
||||
params: {
|
||||
types: 'public_channel,private_channel',
|
||||
limit: 1000,
|
||||
exclude_archived: true,
|
||||
let nextCursor;
|
||||
do {
|
||||
const response: TResponse = await $.http.get('/conversations.list', {
|
||||
params: {
|
||||
types: 'public_channel,private_channel,im',
|
||||
cursor: nextCursor,
|
||||
limit: 1000,
|
||||
}
|
||||
});
|
||||
|
||||
nextCursor = response.data.response_metadata?.next_cursor;
|
||||
|
||||
if (response.data.error === 'missing_scope') {
|
||||
throw new Error(`Missing "${response.data.needed}" scope while authorizing. Please, reconnect your connection!`);
|
||||
}
|
||||
});
|
||||
|
||||
if (response.data.ok === false) {
|
||||
throw new Error(response.data);
|
||||
}
|
||||
if (response.data.ok === false) {
|
||||
throw new Error(JSON.stringify(response.data, null, 2));
|
||||
}
|
||||
|
||||
channels.data = response.data.channels.map((channel: IJSONObject) => {
|
||||
return {
|
||||
value: channel.id,
|
||||
name: channel.name,
|
||||
};
|
||||
});
|
||||
for (const channel of response.data.channels) {
|
||||
channels.data.push({
|
||||
value: channel.id as string,
|
||||
name: channel.name as string,
|
||||
});
|
||||
}
|
||||
} while (nextCursor);
|
||||
|
||||
return channels;
|
||||
},
|
||||
|
@@ -32,6 +32,13 @@ type AppConfig = {
|
||||
bullMQDashboardPassword: string;
|
||||
telemetryEnabled: boolean;
|
||||
requestBodySizeLimit: string;
|
||||
smtpHost: string;
|
||||
smtpPort: number;
|
||||
smtpSecure: boolean;
|
||||
smtpUser: string;
|
||||
smtpPassword: string;
|
||||
fromEmail: string;
|
||||
licenseKey: string;
|
||||
};
|
||||
|
||||
const host = process.env.HOST || 'localhost';
|
||||
@@ -40,7 +47,7 @@ const port = process.env.PORT || '3000';
|
||||
const serveWebAppSeparately =
|
||||
process.env.SERVE_WEB_APP_SEPARATELY === 'true' ? true : false;
|
||||
|
||||
let apiUrl = (new URL(`${protocol}://${host}:${port}`)).toString();
|
||||
let apiUrl = new URL(`${protocol}://${host}:${port}`).toString();
|
||||
apiUrl = apiUrl.substring(0, apiUrl.length - 1);
|
||||
|
||||
// use apiUrl by default, which has less priority over the following cases
|
||||
@@ -48,14 +55,14 @@ let webAppUrl = apiUrl;
|
||||
|
||||
if (process.env.WEB_APP_URL) {
|
||||
// use env. var. if provided
|
||||
webAppUrl = (new URL(process.env.WEB_APP_URL)).toString();
|
||||
webAppUrl = new URL(process.env.WEB_APP_URL).toString();
|
||||
webAppUrl = webAppUrl.substring(0, webAppUrl.length - 1);
|
||||
} else if (serveWebAppSeparately) {
|
||||
// no env. var. and serving separately, sign of development
|
||||
webAppUrl = 'http://localhost:3001'
|
||||
webAppUrl = 'http://localhost:3001';
|
||||
}
|
||||
|
||||
let webhookUrl = (new URL(process.env.WEBHOOK_URL || apiUrl)).toString();
|
||||
let webhookUrl = new URL(process.env.WEBHOOK_URL || apiUrl).toString();
|
||||
webhookUrl = webhookUrl.substring(0, webhookUrl.length - 1);
|
||||
|
||||
const appEnv = process.env.APP_ENV || 'development';
|
||||
@@ -91,6 +98,13 @@ const appConfig: AppConfig = {
|
||||
webhookUrl,
|
||||
telemetryEnabled: process.env.TELEMETRY_ENABLED === 'false' ? false : true,
|
||||
requestBodySizeLimit: '1mb',
|
||||
smtpHost: process.env.SMTP_HOST,
|
||||
smtpPort: parseInt(process.env.SMTP_PORT || '587'),
|
||||
smtpSecure: process.env.SMTP_SECURE === 'true',
|
||||
smtpUser: process.env.SMTP_USER,
|
||||
smtpPassword: process.env.SMTP_PASSWORD,
|
||||
fromEmail: process.env.FROM_EMAIL,
|
||||
licenseKey: process.env.LICENSE_KEY,
|
||||
};
|
||||
|
||||
if (!appConfig.encryptionKey) {
|
||||
|
@@ -0,0 +1,15 @@
|
||||
import { Knex } from 'knex';
|
||||
|
||||
export async function up(knex: Knex): Promise<void> {
|
||||
return knex.schema.table('users', async (table) => {
|
||||
table.string('role');
|
||||
|
||||
await knex('users').update({ role: 'admin' });
|
||||
});
|
||||
}
|
||||
|
||||
export async function down(knex: Knex): Promise<void> {
|
||||
return knex.schema.table('users', (table) => {
|
||||
table.dropColumn('role');
|
||||
});
|
||||
}
|
@@ -0,0 +1,13 @@
|
||||
import { Knex } from 'knex';
|
||||
|
||||
export async function up(knex: Knex): Promise<void> {
|
||||
return knex.schema.alterTable('users', (table) => {
|
||||
table.string('role').notNullable().alter();
|
||||
});
|
||||
}
|
||||
|
||||
export async function down(knex: Knex): Promise<void> {
|
||||
return knex.schema.alterTable('users', (table) => {
|
||||
table.string('role').nullable().alter();
|
||||
});
|
||||
}
|
@@ -0,0 +1,13 @@
|
||||
import { Knex } from 'knex';
|
||||
|
||||
export async function up(knex: Knex): Promise<void> {
|
||||
return knex.schema.table('users', (table) => {
|
||||
table.string('reset_password_token');
|
||||
});
|
||||
}
|
||||
|
||||
export async function down(knex: Knex): Promise<void> {
|
||||
return knex.schema.table('users', (table) => {
|
||||
table.dropColumn('reset_password_token');
|
||||
});
|
||||
}
|
@@ -0,0 +1,13 @@
|
||||
import { Knex } from 'knex';
|
||||
|
||||
export async function up(knex: Knex): Promise<void> {
|
||||
return knex.schema.table('users', (table) => {
|
||||
table.timestamp('reset_password_token_sent_at');
|
||||
});
|
||||
}
|
||||
|
||||
export async function down(knex: Knex): Promise<void> {
|
||||
return knex.schema.table('users', (table) => {
|
||||
table.dropColumn('reset_password_token_sent_at');
|
||||
});
|
||||
}
|
@@ -12,7 +12,10 @@ import deleteFlow from './mutations/delete-flow';
|
||||
import createStep from './mutations/create-step';
|
||||
import updateStep from './mutations/update-step';
|
||||
import deleteStep from './mutations/delete-step';
|
||||
import createUser from './mutations/create-user.ee';
|
||||
import updateUser from './mutations/update-user';
|
||||
import forgotPassword from './mutations/forgot-password.ee';
|
||||
import resetPassword from './mutations/reset-password.ee';
|
||||
import login from './mutations/login';
|
||||
|
||||
const mutationResolvers = {
|
||||
@@ -30,7 +33,10 @@ const mutationResolvers = {
|
||||
createStep,
|
||||
updateStep,
|
||||
deleteStep,
|
||||
createUser,
|
||||
updateUser,
|
||||
forgotPassword,
|
||||
resetPassword,
|
||||
login,
|
||||
};
|
||||
|
||||
|
28
packages/backend/src/graphql/mutations/create-user.ee.ts
Normal file
28
packages/backend/src/graphql/mutations/create-user.ee.ts
Normal file
@@ -0,0 +1,28 @@
|
||||
import User from '../../models/user';
|
||||
|
||||
type Params = {
|
||||
input: {
|
||||
email: string;
|
||||
password: string;
|
||||
};
|
||||
};
|
||||
|
||||
const createUser = async (_parent: unknown, params: Params) => {
|
||||
const { email, password } = params.input;
|
||||
|
||||
const existingUser = await User.query().findOne({ email });
|
||||
|
||||
if (existingUser) {
|
||||
throw new Error('User already exists!');
|
||||
}
|
||||
|
||||
const user = await User.query().insert({
|
||||
email,
|
||||
password,
|
||||
role: 'user',
|
||||
});
|
||||
|
||||
return user;
|
||||
};
|
||||
|
||||
export default createUser;
|
46
packages/backend/src/graphql/mutations/forgot-password.ee.ts
Normal file
46
packages/backend/src/graphql/mutations/forgot-password.ee.ts
Normal file
@@ -0,0 +1,46 @@
|
||||
import User from '../../models/user';
|
||||
import emailQueue from '../../queues/email';
|
||||
import {
|
||||
REMOVE_AFTER_30_DAYS_OR_150_JOBS,
|
||||
REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
||||
} from '../../helpers/remove-job-configuration';
|
||||
|
||||
type Params = {
|
||||
input: {
|
||||
email: string;
|
||||
};
|
||||
};
|
||||
|
||||
const forgotPassword = async (_parent: unknown, params: Params) => {
|
||||
const { email } = params.input;
|
||||
|
||||
const user = await User.query().findOne({ email });
|
||||
|
||||
if (!user) {
|
||||
throw new Error('Email address not found!');
|
||||
}
|
||||
|
||||
await user.generateResetPasswordToken();
|
||||
|
||||
const jobName = `Reset Password Email - ${user.id}`;
|
||||
|
||||
const jobPayload = {
|
||||
email: user.email,
|
||||
subject: 'Reset Password',
|
||||
template: 'reset-password-instructions',
|
||||
params: {
|
||||
token: user.resetPasswordToken,
|
||||
},
|
||||
};
|
||||
|
||||
const jobOptions = {
|
||||
removeOnComplete: REMOVE_AFTER_7_DAYS_OR_50_JOBS,
|
||||
removeOnFail: REMOVE_AFTER_30_DAYS_OR_150_JOBS,
|
||||
};
|
||||
|
||||
await emailQueue.add(jobName, jobPayload, jobOptions);
|
||||
|
||||
return;
|
||||
};
|
||||
|
||||
export default forgotPassword;
|
30
packages/backend/src/graphql/mutations/reset-password.ee.ts
Normal file
30
packages/backend/src/graphql/mutations/reset-password.ee.ts
Normal file
@@ -0,0 +1,30 @@
|
||||
import User from '../../models/user';
|
||||
|
||||
type Params = {
|
||||
input: {
|
||||
token: string;
|
||||
password: string;
|
||||
};
|
||||
};
|
||||
|
||||
const resetPassword = async (_parent: unknown, params: Params) => {
|
||||
const { token, password } = params.input;
|
||||
|
||||
if (!token) {
|
||||
throw new Error('Reset password token is required!');
|
||||
}
|
||||
|
||||
const user = await User.query().findOne({ reset_password_token: token });
|
||||
|
||||
if (!user || !user.isResetPasswordTokenValid()) {
|
||||
throw new Error(
|
||||
'Reset password link is not valid or expired. Try generating a new link.'
|
||||
);
|
||||
}
|
||||
|
||||
await user.resetPassword(password);
|
||||
|
||||
return;
|
||||
};
|
||||
|
||||
export default resetPassword;
|
11
packages/backend/src/graphql/queries/get-license.ee.ts
Normal file
11
packages/backend/src/graphql/queries/get-license.ee.ts
Normal file
@@ -0,0 +1,11 @@
|
||||
import checkLicense from '../../helpers/check-license.ee';
|
||||
|
||||
const getLicense = async () => {
|
||||
const license = await checkLicense();
|
||||
|
||||
return {
|
||||
type: license ? 'ee' : 'ce',
|
||||
};
|
||||
};
|
||||
|
||||
export default getLicense;
|
@@ -10,6 +10,7 @@ import getExecutions from './queries/get-executions';
|
||||
import getExecutionSteps from './queries/get-execution-steps';
|
||||
import getDynamicData from './queries/get-dynamic-data';
|
||||
import getCurrentUser from './queries/get-current-user';
|
||||
import getLicense from './queries/get-license.ee';
|
||||
import healthcheck from './queries/healthcheck';
|
||||
|
||||
const queryResolvers = {
|
||||
@@ -25,6 +26,7 @@ const queryResolvers = {
|
||||
getExecutionSteps,
|
||||
getDynamicData,
|
||||
getCurrentUser,
|
||||
getLicense,
|
||||
healthcheck,
|
||||
};
|
||||
|
||||
|
@@ -29,6 +29,7 @@ type Query {
|
||||
parameters: JSONObject
|
||||
): JSONObject
|
||||
getCurrentUser: User
|
||||
getLicense: GetLicense
|
||||
healthcheck: AppHealth
|
||||
}
|
||||
|
||||
@@ -47,7 +48,10 @@ type Mutation {
|
||||
createStep(input: CreateStepInput): Step
|
||||
updateStep(input: UpdateStepInput): Step
|
||||
deleteStep(input: DeleteStepInput): Step
|
||||
createUser(input: CreateUserInput): User
|
||||
updateUser(input: UpdateUserInput): User
|
||||
forgotPassword(input: ForgotPasswordInput): Boolean
|
||||
resetPassword(input: ResetPasswordInput): Boolean
|
||||
login(input: LoginInput): Auth
|
||||
}
|
||||
|
||||
@@ -299,11 +303,25 @@ input DeleteStepInput {
|
||||
id: String!
|
||||
}
|
||||
|
||||
input CreateUserInput {
|
||||
email: String!
|
||||
password: String!
|
||||
}
|
||||
|
||||
input UpdateUserInput {
|
||||
email: String
|
||||
password: String
|
||||
}
|
||||
|
||||
input ForgotPasswordInput {
|
||||
email: String!
|
||||
}
|
||||
|
||||
input ResetPasswordInput {
|
||||
token: String!
|
||||
password: String!
|
||||
}
|
||||
|
||||
input LoginInput {
|
||||
email: String!
|
||||
password: String!
|
||||
@@ -453,6 +471,10 @@ type AppHealth {
|
||||
version: String
|
||||
}
|
||||
|
||||
type GetLicense {
|
||||
type: String
|
||||
}
|
||||
|
||||
schema {
|
||||
query: Query
|
||||
mutation: Mutation
|
||||
|
@@ -29,6 +29,8 @@ const authentication = shield(
|
||||
Mutation: {
|
||||
'*': isAuthenticated,
|
||||
login: allow,
|
||||
createUser: allow,
|
||||
forgotPassword: allow,
|
||||
},
|
||||
},
|
||||
{
|
||||
|
31
packages/backend/src/helpers/check-license.ee.ts
Normal file
31
packages/backend/src/helpers/check-license.ee.ts
Normal file
@@ -0,0 +1,31 @@
|
||||
import axios from 'axios';
|
||||
import appConfig from '../config/app';
|
||||
import memoryCache from 'memory-cache';
|
||||
|
||||
const CACHE_DURATION = 1000 * 60 * 60 * 24; // 24 hours in milliseconds
|
||||
|
||||
const checkLicense = async () => {
|
||||
const licenseKey = appConfig.licenseKey;
|
||||
|
||||
if (!licenseKey) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const url = 'https://license.automatisch.io/api/v1/licenses/verify';
|
||||
const cachedResponse = memoryCache.get(url);
|
||||
|
||||
if (cachedResponse) {
|
||||
return cachedResponse;
|
||||
} else {
|
||||
try {
|
||||
const { data } = await axios.post(url, { licenseKey });
|
||||
memoryCache.put(url, data.verified, CACHE_DURATION);
|
||||
|
||||
return data.verified;
|
||||
} catch (error) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
export default checkLicense;
|
12
packages/backend/src/helpers/compile-email.ee.ts
Normal file
12
packages/backend/src/helpers/compile-email.ee.ts
Normal file
@@ -0,0 +1,12 @@
|
||||
import * as path from 'path';
|
||||
import * as fs from 'fs';
|
||||
import * as handlebars from 'handlebars';
|
||||
|
||||
const compileEmail = (emailPath: string, replacements: object = {}): string => {
|
||||
const filePath = path.join(__dirname, `../views/emails/${emailPath}.ee.hbs`);
|
||||
const source = fs.readFileSync(filePath, 'utf-8').toString();
|
||||
const template = handlebars.compile(source);
|
||||
return template(replacements);
|
||||
};
|
||||
|
||||
export default compileEmail;
|
@@ -4,18 +4,18 @@ import delayForAsMilliseconds, {
|
||||
} from './delay-for-as-milliseconds';
|
||||
import delayUntilAsMilliseconds from './delay-until-as-milliseconds';
|
||||
|
||||
const delayAsMilliseconds = (step: Step) => {
|
||||
const delayAsMilliseconds = (eventKey: Step["key"], computedParameters: Step["parameters"]) => {
|
||||
let delayDuration = 0;
|
||||
|
||||
if (step.key === 'delayFor') {
|
||||
const { delayForUnit, delayForValue } = step.parameters;
|
||||
if (eventKey === 'delayFor') {
|
||||
const { delayForUnit, delayForValue } = computedParameters;
|
||||
|
||||
delayDuration = delayForAsMilliseconds(
|
||||
delayForUnit as TDelayForUnit,
|
||||
Number(delayForValue)
|
||||
);
|
||||
} else if (step.key === 'delayUntil') {
|
||||
const { delayUntil } = step.parameters;
|
||||
} else if (eventKey === 'delayUntil') {
|
||||
const { delayUntil } = computedParameters;
|
||||
delayDuration = delayUntilAsMilliseconds(delayUntil as string);
|
||||
}
|
||||
|
||||
|
14
packages/backend/src/helpers/mailer.ee.ts
Normal file
14
packages/backend/src/helpers/mailer.ee.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import nodemailer from 'nodemailer';
|
||||
import appConfig from '../config/app';
|
||||
|
||||
const mailer = nodemailer.createTransport({
|
||||
host: appConfig.smtpHost,
|
||||
port: appConfig.smtpPort,
|
||||
secure: appConfig.smtpSecure,
|
||||
auth: {
|
||||
user: appConfig.smtpUser,
|
||||
pass: appConfig.smtpPassword,
|
||||
},
|
||||
});
|
||||
|
||||
export default mailer;
|
@@ -5,11 +5,15 @@ import Flow from './flow';
|
||||
import Step from './step';
|
||||
import Execution from './execution';
|
||||
import bcrypt from 'bcrypt';
|
||||
import crypto from 'crypto';
|
||||
|
||||
class User extends Base {
|
||||
id!: string;
|
||||
email!: string;
|
||||
password!: string;
|
||||
role: string;
|
||||
resetPasswordToken: string;
|
||||
resetPasswordTokenSentAt: string;
|
||||
connections?: Connection[];
|
||||
flows?: Flow[];
|
||||
steps?: Step[];
|
||||
@@ -25,6 +29,7 @@ class User extends Base {
|
||||
id: { type: 'string', format: 'uuid' },
|
||||
email: { type: 'string', format: 'email', minLength: 1, maxLength: 255 },
|
||||
password: { type: 'string', minLength: 1, maxLength: 255 },
|
||||
role: { type: 'string', enum: ['admin', 'user'] },
|
||||
},
|
||||
};
|
||||
|
||||
@@ -75,6 +80,33 @@ class User extends Base {
|
||||
return bcrypt.compare(password, this.password);
|
||||
}
|
||||
|
||||
async generateResetPasswordToken() {
|
||||
const resetPasswordToken = crypto.randomBytes(64).toString('hex');
|
||||
const resetPasswordTokenSentAt = new Date().toISOString();
|
||||
|
||||
await this.$query().patch({ resetPasswordToken, resetPasswordTokenSentAt });
|
||||
}
|
||||
|
||||
async resetPassword(password: string) {
|
||||
return await this.$query().patch({
|
||||
resetPasswordToken: null,
|
||||
resetPasswordTokenSentAt: null,
|
||||
password,
|
||||
});
|
||||
}
|
||||
|
||||
async isResetPasswordTokenValid() {
|
||||
if (!this.resetPasswordTokenSentAt) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const sentAt = new Date(this.resetPasswordTokenSentAt);
|
||||
const now = new Date();
|
||||
const fourHoursInMilliseconds = 1000 * 60 * 60 * 4;
|
||||
|
||||
return now.getTime() - sentAt.getTime() < fourHoursInMilliseconds;
|
||||
}
|
||||
|
||||
async generateHash() {
|
||||
this.password = await bcrypt.hash(this.password, 10);
|
||||
}
|
||||
|
25
packages/backend/src/queues/email.ts
Normal file
25
packages/backend/src/queues/email.ts
Normal file
@@ -0,0 +1,25 @@
|
||||
import process from 'process';
|
||||
import { Queue } from 'bullmq';
|
||||
import redisConfig from '../config/redis';
|
||||
import logger from '../helpers/logger';
|
||||
|
||||
const CONNECTION_REFUSED = 'ECONNREFUSED';
|
||||
|
||||
const redisConnection = {
|
||||
connection: redisConfig,
|
||||
};
|
||||
|
||||
const emailQueue = new Queue('email', redisConnection);
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
await emailQueue.close();
|
||||
});
|
||||
|
||||
emailQueue.on('error', (err) => {
|
||||
if ((err as any).code === CONNECTION_REFUSED) {
|
||||
logger.error('Make sure you have installed Redis and it is running.', err);
|
||||
process.exit();
|
||||
}
|
||||
});
|
||||
|
||||
export default emailQueue;
|
@@ -65,5 +65,5 @@ export const processAction = async (options: ProcessActionOptions) => {
|
||||
errorDetails: $.actionOutput.error ? $.actionOutput.error : null,
|
||||
});
|
||||
|
||||
return { flowId, stepId, executionId, executionStep };
|
||||
return { flowId, stepId, executionId, executionStep, computedParameters };
|
||||
};
|
||||
|
@@ -0,0 +1,16 @@
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<title>Title</title>
|
||||
</head>
|
||||
<body>
|
||||
Hello {{ email }}
|
||||
|
||||
Someone has requested a link to change your password, and you can do this through the link below.
|
||||
|
||||
<a href="/reset-password">Change my password</a>
|
||||
|
||||
If you didn't request this, please ignore this email.
|
||||
Your password won't change until you access the link above and create a new one.
|
||||
</body>
|
||||
</html>
|
@@ -3,6 +3,7 @@ import './helpers/check-worker-readiness';
|
||||
import './workers/flow';
|
||||
import './workers/trigger';
|
||||
import './workers/action';
|
||||
import './workers/email';
|
||||
import telemetry from './helpers/telemetry';
|
||||
|
||||
telemetry.setServiceType('worker');
|
||||
|
@@ -21,7 +21,7 @@ const DEFAULT_DELAY_DURATION = 0;
|
||||
export const worker = new Worker(
|
||||
'action',
|
||||
async (job) => {
|
||||
const { stepId, flowId, executionId } = await processAction(
|
||||
const { stepId, flowId, executionId, computedParameters } = await processAction(
|
||||
job.data as JobData
|
||||
);
|
||||
|
||||
@@ -45,7 +45,7 @@ export const worker = new Worker(
|
||||
};
|
||||
|
||||
if (step.appKey === 'delay') {
|
||||
jobOptions.delay = delayAsMilliseconds(step);
|
||||
jobOptions.delay = delayAsMilliseconds(step.key, computedParameters);
|
||||
}
|
||||
|
||||
await actionQueue.add(jobName, jobPayload, jobOptions);
|
||||
|
37
packages/backend/src/workers/email.ts
Normal file
37
packages/backend/src/workers/email.ts
Normal file
@@ -0,0 +1,37 @@
|
||||
import { Worker } from 'bullmq';
|
||||
import redisConfig from '../config/redis';
|
||||
import logger from '../helpers/logger';
|
||||
import mailer from '../helpers/mailer.ee';
|
||||
import compileEmail from '../helpers/compile-email.ee';
|
||||
import appConfig from '../config/app';
|
||||
|
||||
export const worker = new Worker(
|
||||
'email',
|
||||
async (job) => {
|
||||
const { email, subject, templateName, params } = job.data;
|
||||
|
||||
await mailer.sendMail({
|
||||
to: email,
|
||||
from: appConfig.fromEmail,
|
||||
subject: subject,
|
||||
html: compileEmail(templateName, params),
|
||||
});
|
||||
},
|
||||
{ connection: redisConfig }
|
||||
);
|
||||
|
||||
worker.on('completed', (job) => {
|
||||
logger.info(
|
||||
`JOB ID: ${job.id} - ${job.data.subject} email sent to ${job.data.email}!`
|
||||
);
|
||||
});
|
||||
|
||||
worker.on('failed', (job, err) => {
|
||||
logger.info(
|
||||
`JOB ID: ${job.id} - ${job.data.subject} email to ${job.data.email} has failed to send with ${err.message}`
|
||||
);
|
||||
});
|
||||
|
||||
process.on('SIGTERM', async () => {
|
||||
await worker.close();
|
||||
});
|
Reference in New Issue
Block a user