Merge branch 'main' into issue-553

This commit is contained in:
Ömer Faruk Aydın
2022-10-14 22:36:45 +02:00
committed by GitHub
41 changed files with 889 additions and 587 deletions

View File

@@ -24,13 +24,13 @@ export default {
options: [
{
label: 'Yes',
value: true
value: true,
},
{
label: 'No',
value: false
}
]
value: false,
},
],
},
{
label: 'Time of day',
@@ -42,111 +42,111 @@ export default {
options: [
{
label: '00:00',
value: 0
value: 0,
},
{
label: '01:00',
value: 1
value: 1,
},
{
label: '02:00',
value: 2
value: 2,
},
{
label: '03:00',
value: 3
value: 3,
},
{
label: '04:00',
value: 4
value: 4,
},
{
label: '05:00',
value: 5
value: 5,
},
{
label: '06:00',
value: 6
value: 6,
},
{
label: '07:00',
value: 7
value: 7,
},
{
label: '08:00',
value: 8
value: 8,
},
{
label: '09:00',
value: 9
value: 9,
},
{
label: '10:00',
value: 10
value: 10,
},
{
label: '11:00',
value: 11
value: 11,
},
{
label: '12:00',
value: 12
value: 12,
},
{
label: '13:00',
value: 13
value: 13,
},
{
label: '14:00',
value: 14
value: 14,
},
{
label: '15:00',
value: 15
value: 15,
},
{
label: '16:00',
value: 16
value: 16,
},
{
label: '17:00',
value: 17
value: 17,
},
{
label: '18:00',
value: 18
value: 18,
},
{
label: '19:00',
value: 19
value: 19,
},
{
label: '20:00',
value: 20
value: 20,
},
{
label: '21:00',
value: 21
value: 21,
},
{
label: '22:00',
value: 22
value: 22,
},
{
label: '23:00',
value: 23
}
]
}
]
value: 23,
},
],
},
],
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) {
getInterval(parameters: IGlobalVariable['step']['parameters']) {
if (parameters.triggersOnWeekend as boolean) {
return cronTimes.everyDayAt(parameters.hour as number);
}
@@ -156,14 +156,20 @@ export default {
async run($: IGlobalVariable, startDateTime: Date) {
const dateTime = DateTime.fromJSDate(startDateTime);
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue;
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(
dateTime
) as IJSONValue;
return { data: [dateTimeObjectRepresentation] };
},
async testRun($: IGlobalVariable) {
const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters));
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue;
const nextCronDateTime = getNextCronDateTime(
this.getInterval($.step.parameters)
);
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(
nextCronDateTime
) as IJSONValue;
return { data: [dateTimeObjectRepresentation] };
},

View File

@@ -24,25 +24,25 @@ export default {
options: [
{
label: 'Yes',
value: true
value: true,
},
{
label: 'No',
value: false
}
]
}
]
value: false,
},
],
},
],
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) {
getInterval(parameters: IGlobalVariable['step']['parameters']) {
if (parameters.triggersOnWeekend) {
return cronTimes.everyHour
return cronTimes.everyHour;
}
return cronTimes.everyHourExcludingWeekends;
@@ -50,14 +50,20 @@ export default {
async run($: IGlobalVariable, startDateTime: Date) {
const dateTime = DateTime.fromJSDate(startDateTime);
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue;
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(
dateTime
) as IJSONValue;
return { data: [dateTimeObjectRepresentation] };
},
async testRun($: IGlobalVariable) {
const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters));
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue;
const nextCronDateTime = getNextCronDateTime(
this.getInterval($.step.parameters)
);
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(
nextCronDateTime
) as IJSONValue;
return { data: [dateTimeObjectRepresentation] };
},

View File

@@ -23,129 +23,129 @@ export default {
options: [
{
label: 1,
value: 1
value: 1,
},
{
label: 2,
value: 2
value: 2,
},
{
label: 3,
value: 3
value: 3,
},
{
label: 4,
value: 4
value: 4,
},
{
label: 5,
value: 5
value: 5,
},
{
label: 6,
value: 6
value: 6,
},
{
label: 7,
value: 7
value: 7,
},
{
label: 8,
value: 8
value: 8,
},
{
label: 9,
value: 9
value: 9,
},
{
label: 10,
value: 10
value: 10,
},
{
label: 11,
value: 11
value: 11,
},
{
label: 12,
value: 12
value: 12,
},
{
label: 13,
value: 13
value: 13,
},
{
label: 14,
value: 14
value: 14,
},
{
label: 15,
value: 15
value: 15,
},
{
label: 16,
value: 16
value: 16,
},
{
label: 17,
value: 17
value: 17,
},
{
label: 18,
value: 18
value: 18,
},
{
label: 19,
value: 19
value: 19,
},
{
label: 20,
value: 20
value: 20,
},
{
label: 21,
value: 21
value: 21,
},
{
label: 22,
value: 22
value: 22,
},
{
label: 23,
value: 23
value: 23,
},
{
label: 24,
value: 24
value: 24,
},
{
label: 25,
value: 25
value: 25,
},
{
label: 26,
value: 26
value: 26,
},
{
label: 27,
value: 27
value: 27,
},
{
label: 28,
value: 28
value: 28,
},
{
label: 29,
value: 29
value: 29,
},
{
label: 30,
value: 30
value: 30,
},
{
label: 31,
value: 31
}
]
value: 31,
},
],
},
{
label: 'Time of day',
@@ -157,126 +157,135 @@ export default {
options: [
{
label: '00:00',
value: 0
value: 0,
},
{
label: '01:00',
value: 1
value: 1,
},
{
label: '02:00',
value: 2
value: 2,
},
{
label: '03:00',
value: 3
value: 3,
},
{
label: '04:00',
value: 4
value: 4,
},
{
label: '05:00',
value: 5
value: 5,
},
{
label: '06:00',
value: 6
value: 6,
},
{
label: '07:00',
value: 7
value: 7,
},
{
label: '08:00',
value: 8
value: 8,
},
{
label: '09:00',
value: 9
value: 9,
},
{
label: '10:00',
value: 10
value: 10,
},
{
label: '11:00',
value: 11
value: 11,
},
{
label: '12:00',
value: 12
value: 12,
},
{
label: '13:00',
value: 13
value: 13,
},
{
label: '14:00',
value: 14
value: 14,
},
{
label: '15:00',
value: 15
value: 15,
},
{
label: '16:00',
value: 16
value: 16,
},
{
label: '17:00',
value: 17
value: 17,
},
{
label: '18:00',
value: 18
value: 18,
},
{
label: '19:00',
value: 19
value: 19,
},
{
label: '20:00',
value: 20
value: 20,
},
{
label: '21:00',
value: 21
value: 21,
},
{
label: '22:00',
value: 22
value: 22,
},
{
label: '23:00',
value: 23
}
]
}
]
value: 23,
},
],
},
],
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) {
const interval = cronTimes.everyMonthOnAndAt(parameters.day as number, parameters.hour as number);
getInterval(parameters: IGlobalVariable['step']['parameters']) {
const interval = cronTimes.everyMonthOnAndAt(
parameters.day as number,
parameters.hour as number
);
return interval;
},
async run($: IGlobalVariable, startDateTime: Date) {
const dateTime = DateTime.fromJSDate(startDateTime);
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue;
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(
dateTime
) as IJSONValue;
return { data: [dateTimeObjectRepresentation] };
},
async testRun($: IGlobalVariable) {
const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters));
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue;
const nextCronDateTime = getNextCronDateTime(
this.getInterval($.step.parameters)
);
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(
nextCronDateTime
) as IJSONValue;
return { data: [dateTimeObjectRepresentation] };
},

View File

@@ -23,33 +23,33 @@ export default {
options: [
{
label: 'Monday',
value: 1
value: 1,
},
{
label: 'Tuesday',
value: 2
value: 2,
},
{
label: 'Wednesday',
value: 3
value: 3,
},
{
label: 'Thursday',
value: 4
value: 4,
},
{
label: 'Friday',
value: 5
value: 5,
},
{
label: 'Saturday',
value: 6
value: 6,
},
{
label: 'Sunday',
value: 0
}
]
value: 0,
},
],
},
{
label: 'Time of day',
@@ -61,126 +61,135 @@ export default {
options: [
{
label: '00:00',
value: 0
value: 0,
},
{
label: '01:00',
value: 1
value: 1,
},
{
label: '02:00',
value: 2
value: 2,
},
{
label: '03:00',
value: 3
value: 3,
},
{
label: '04:00',
value: 4
value: 4,
},
{
label: '05:00',
value: 5
value: 5,
},
{
label: '06:00',
value: 6
value: 6,
},
{
label: '07:00',
value: 7
value: 7,
},
{
label: '08:00',
value: 8
value: 8,
},
{
label: '09:00',
value: 9
value: 9,
},
{
label: '10:00',
value: 10
value: 10,
},
{
label: '11:00',
value: 11
value: 11,
},
{
label: '12:00',
value: 12
value: 12,
},
{
label: '13:00',
value: 13
value: 13,
},
{
label: '14:00',
value: 14
value: 14,
},
{
label: '15:00',
value: 15
value: 15,
},
{
label: '16:00',
value: 16
value: 16,
},
{
label: '17:00',
value: 17
value: 17,
},
{
label: '18:00',
value: 18
value: 18,
},
{
label: '19:00',
value: 19
value: 19,
},
{
label: '20:00',
value: 20
value: 20,
},
{
label: '21:00',
value: 21
value: 21,
},
{
label: '22:00',
value: 22
value: 22,
},
{
label: '23:00',
value: 23
}
]
}
]
value: 23,
},
],
},
],
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) {
const interval = cronTimes.everyWeekOnAndAt(parameters.weekday as number, parameters.hour as number);
getInterval(parameters: IGlobalVariable['step']['parameters']) {
const interval = cronTimes.everyWeekOnAndAt(
parameters.weekday as number,
parameters.hour as number
);
return interval;
},
async run($: IGlobalVariable, startDateTime: Date) {
const dateTime = DateTime.fromJSDate(startDateTime);
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue;
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(
dateTime
) as IJSONValue;
return { data: [dateTimeObjectRepresentation] };
},
async testRun($: IGlobalVariable) {
const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters));
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue;
const nextCronDateTime = getNextCronDateTime(
this.getInterval($.step.parameters)
);
const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(
nextCronDateTime
) as IJSONValue;
return { data: [dateTimeObjectRepresentation] };
},

View File

@@ -1,4 +1,4 @@
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import { IGlobalVariable, IActionOutput } from '@automatisch/types';
type FindMessageOptions = {
query: string;
@@ -8,11 +8,6 @@ type FindMessageOptions = {
};
const findMessage = async ($: IGlobalVariable, options: FindMessageOptions) => {
const message: {
data?: IJSONObject;
error?: IJSONObject;
} = {};
const headers = {
Authorization: `Bearer ${$.auth.data.accessToken}`,
};
@@ -29,20 +24,14 @@ const findMessage = async ($: IGlobalVariable, options: FindMessageOptions) => {
params,
});
if (response.integrationError) {
message.error = response.integrationError;
return message;
}
const data = response.data;
if (!data.ok) {
message.error = data;
return message;
}
const messages = data.messages.matches;
message.data = messages?.[0];
const message: IActionOutput = {
data: {
raw: data?.data?.messages.matches[0],
},
error: response?.integrationError || (!data.ok && data),
};
return message;
};

View File

@@ -72,7 +72,7 @@ export default {
],
async run($: IGlobalVariable) {
const parameters = $.db.step.parameters;
const parameters = $.step.parameters;
const query = parameters.query as string;
const sortBy = parameters.sortBy as string;
const sortDirection = parameters.sortDirection as string;

View File

@@ -49,8 +49,8 @@ export default {
],
async run($: IGlobalVariable) {
const channelId = $.db.step.parameters.channel as string;
const text = $.db.step.parameters.message as string;
const channelId = $.step.parameters.channel as string;
const text = $.step.parameters.message as string;
const message = await postMessage($, channelId, text);

View File

@@ -1,18 +1,10 @@
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import { IGlobalVariable, IActionOutput } from '@automatisch/types';
const postMessage = async (
$: IGlobalVariable,
channelId: string,
text: string
) => {
const message: {
data: IJSONObject | null | undefined;
error: IJSONObject | null | undefined;
} = {
data: null,
error: null,
};
const headers = {
Authorization: `Bearer ${$.auth.data.accessToken}`,
};
@@ -24,8 +16,12 @@ const postMessage = async (
const response = await $.http.post('/chat.postMessage', params, { headers });
message.error = response?.integrationError;
message.data = response?.data?.message;
const message: IActionOutput = {
data: {
raw: response?.data?.message,
},
error: response?.integrationError,
};
if (response.data.ok === false) {
message.error = response.data;

View File

@@ -1,4 +1,8 @@
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import {
IGlobalVariable,
IJSONObject,
ITriggerOutput,
} from '@automatisch/types';
import { URLSearchParams } from 'url';
import { omitBy, isEmpty } from 'lodash';
import generateRequest from './generate-request';
@@ -14,12 +18,8 @@ const getUserFollowers = async (
) => {
let response;
const followers: {
data: IJSONObject[];
error: IJSONObject | null;
} = {
const followers: ITriggerOutput = {
data: [],
error: null,
};
do {
@@ -49,19 +49,19 @@ const getUserFollowers = async (
}
if (response.data.meta.result_count > 0) {
response.data.data.forEach((tweet: IJSONObject) => {
if (
!options.lastInternalId ||
Number(tweet.id) > Number(options.lastInternalId)
) {
followers.data.push(tweet);
} else {
return;
}
response.data.data.forEach((follower: IJSONObject) => {
followers.data.push({
raw: follower,
meta: { internalId: follower.id as string },
});
});
}
} while (response.data.meta.next_token && options.lastInternalId);
followers.data.sort((follower, nextFollower) => {
return (follower.raw.id as number) - (nextFollower.raw.id as number);
});
return followers;
};

View File

@@ -1,4 +1,8 @@
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import {
IGlobalVariable,
IJSONObject,
ITriggerOutput,
} from '@automatisch/types';
import { URLSearchParams } from 'url';
import omitBy from 'lodash/omitBy';
import isEmpty from 'lodash/isEmpty';
@@ -22,19 +26,15 @@ const getUserTweets = async (
const currentUser = await getCurrentUser($);
username = currentUser.username as string;
} else {
username = $.db.step.parameters.username as string;
username = $.step.parameters.username as string;
}
const user = await getUserByUsername($, username);
let response;
const tweets: {
data: IJSONObject[];
error: IJSONObject | null;
} = {
const tweets: ITriggerOutput = {
data: [],
error: null,
};
do {
@@ -61,18 +61,18 @@ const getUserTweets = async (
if (response.data.meta.result_count > 0) {
response.data.data.forEach((tweet: IJSONObject) => {
if (
!options.lastInternalId ||
Number(tweet.id) > Number(options.lastInternalId)
) {
tweets.data.push(tweet);
} else {
return;
}
tweets.data.push({
raw: tweet,
meta: { internalId: tweet.id as string },
});
});
}
} while (response.data.meta.next_token && options.lastInternalId);
tweets.data.sort((tweet, nextTweet) => {
return (tweet.raw.id as number) - (nextTweet.raw.id as number);
});
return tweets;
};

View File

@@ -20,7 +20,7 @@ export default {
async run($: IGlobalVariable) {
return await getUserTweets($, {
currentUser: true,
lastInternalId: $.db.flow.lastInternalId,
lastInternalId: $.flow.lastInternalId,
});
},

View File

@@ -18,7 +18,7 @@ export default {
],
async run($: IGlobalVariable) {
return await myFollowers($, $.db.flow.lastInternalId);
return await myFollowers($, $.flow.lastInternalId);
},
async testRun($: IGlobalVariable) {

View File

@@ -32,14 +32,13 @@ export default {
async run($: IGlobalVariable) {
return await searchTweets($, {
searchTerm: $.db.step.parameters.searchTerm as string,
lastInternalId: $.db.flow.lastInternalId,
searchTerm: $.step.parameters.searchTerm as string,
});
},
async testRun($: IGlobalVariable) {
return await searchTweets($, {
searchTerm: $.db.step.parameters.searchTerm as string,
searchTerm: $.step.parameters.searchTerm as string,
});
},
};

View File

@@ -1,4 +1,8 @@
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import {
IGlobalVariable,
IJSONObject,
ITriggerOutput,
} from '@automatisch/types';
import qs from 'qs';
import generateRequest from '../../common/generate-request';
import { omitBy, isEmpty } from 'lodash';
@@ -14,18 +18,14 @@ const searchTweets = async (
) => {
let response;
const tweets: {
data: IJSONObject[];
error: IJSONObject | null;
} = {
const tweets: ITriggerOutput = {
data: [],
error: null,
};
do {
const params: IJSONObject = {
query: options.searchTerm,
since_id: options.lastInternalId,
since_id: $.execution.testRun ? null : $.flow.lastInternalId,
pagination_token: response?.data?.meta?.next_token,
};
@@ -52,17 +52,21 @@ const searchTweets = async (
if (response.data.meta.result_count > 0) {
response.data.data.forEach((tweet: IJSONObject) => {
if (
!options.lastInternalId ||
Number(tweet.id) > Number(options.lastInternalId)
) {
tweets.data.push(tweet);
} else {
return;
}
const dataItem = {
raw: tweet,
meta: {
internalId: tweet.id as string,
},
};
tweets.data.push(dataItem);
});
}
} while (response.data.meta.next_token && options.lastInternalId);
} while (response.data.meta.next_token && !$.execution.testRun);
tweets.data.sort((tweet, nextTweet) => {
return (tweet.raw.id as number) - (nextTweet.raw.id as number);
});
return tweets;
};

View File

@@ -32,15 +32,15 @@ export default {
async run($: IGlobalVariable) {
return await getUserTweets($, {
currentUser: false,
userId: $.db.step.parameters.username as string,
lastInternalId: $.db.flow.lastInternalId,
userId: $.step.parameters.username as string,
lastInternalId: $.flow.lastInternalId,
});
},
async testRun($: IGlobalVariable) {
return await getUserTweets($, {
currentUser: false,
userId: $.db.step.parameters.username as string,
userId: $.step.parameters.username as string,
});
},
};

View File

@@ -29,7 +29,7 @@ const createAuthData = async (
.default;
const app = await App.findOneByKey(connection.key);
const $ = await globalVariable(connection, app);
const $ = await globalVariable({ connection, app });
await authInstance.createAuthData($);
try {

View File

@@ -1,7 +1,5 @@
import Context from '../../types/express/context';
import Processor from '../../services/processor';
// eslint-disable-next-line @typescript-eslint/no-unused-vars
import processorQueue from '../../queues/processor';
import testRun from '../../services/test-run';
type Params = {
input: {
@@ -14,26 +12,18 @@ const executeFlow = async (
params: Params,
context: Context
) => {
const { stepId } = params.input;
const { executionStep } = await testRun({ stepId });
const untilStep = await context.currentUser
.$relatedQuery('steps')
.withGraphFetched('connection')
.findOne({
'steps.id': params.input.stepId,
})
.throwIfNotFound();
const flow = await untilStep.$relatedQuery('flow');
const executionStep = await new Processor(flow, {
untilStep,
testRun: true,
}).run();
.findById(stepId);
await untilStep.$query().patch({
status: 'completed',
});
if (executionStep.errorDetails) {
if (executionStep.isFailed) {
throw new Error(JSON.stringify(executionStep.errorDetails));
}

View File

@@ -1,5 +1,5 @@
import Context from '../../types/express/context';
import processorQueue from '../../queues/processor';
import flowQueue from '../../queues/flow';
type Params = {
input: {
@@ -8,7 +8,7 @@ type Params = {
};
};
const JOB_NAME = 'processorJob';
const JOB_NAME = 'flow';
const EVERY_15_MINUTES_CRON = '*/15 * * * *';
const updateFlowStatus = async (
@@ -32,7 +32,7 @@ const updateFlowStatus = async (
});
const triggerStep = await flow.getTriggerStep();
const trigger = await triggerStep.getTrigger();
const trigger = await triggerStep.getTriggerCommand();
const interval = trigger.getInterval?.(triggerStep.parameters);
const repeatOptions = {
cron: interval || EVERY_15_MINUTES_CRON,
@@ -43,8 +43,10 @@ const updateFlowStatus = async (
published_at: new Date().toISOString(),
});
await processorQueue.add(
JOB_NAME,
const jobName = `${JOB_NAME}-${flow.id}`;
await flowQueue.add(
jobName,
{ flowId: flow.id },
{
repeat: repeatOptions,
@@ -52,10 +54,10 @@ const updateFlowStatus = async (
}
);
} else {
const repeatableJobs = await processorQueue.getRepeatableJobs();
const repeatableJobs = await flowQueue.getRepeatableJobs();
const job = repeatableJobs.find((job) => job.id === flow.id);
await processorQueue.removeRepeatableByKey(job.key);
await flowQueue.removeRepeatableByKey(job.key);
}
return flow;

View File

@@ -21,7 +21,7 @@ const verifyConnection = async (
.throwIfNotFound();
const app = await App.findOneByKey(connection.key);
const $ = await globalVariable(connection, app);
const $ = await globalVariable({ connection, app });
await app.auth.verifyCredentials($);
connection = await connection.$query().patchAndFetch({

View File

@@ -25,7 +25,7 @@ const getData = async (_parent: unknown, params: Params, context: Context) => {
if (!connection || !step.appKey) return null;
const app = await App.findOneByKey(step.appKey);
const $ = await globalVariable(connection, app, step.flow, step);
const $ = await globalVariable({ connection, app, flow: step.flow, step });
const command = app.data.find((data: IData) => data.key === params.key);

View File

@@ -20,10 +20,9 @@ const testConnection = async (
.throwIfNotFound();
const app = await App.findOneByKey(connection.key, false);
const $ = await globalVariable(connection, app);
const $ = await globalVariable({ connection, app });
const isStillVerified =
await app.auth.isStillVerified($);
const isStillVerified = await app.auth.isStillVerified($);
connection = await connection.$query().patchAndFetch({
formattedData: connection.formattedData,

View File

@@ -0,0 +1,43 @@
import Step from '../models/step';
import ExecutionStep from '../models/execution-step';
import get from 'lodash.get';
const variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g;
export default function computeParameters(
parameters: Step['parameters'],
executionSteps: ExecutionStep[]
): Step['parameters'] {
const entries = Object.entries(parameters);
return entries.reduce((result, [key, value]: [string, unknown]) => {
if (typeof value === 'string') {
const parts = value.split(variableRegExp);
const computedValue = parts
.map((part: string) => {
const isVariable = part.match(variableRegExp);
if (isVariable) {
const stepIdAndKeyPath = part.replace(/{{step.|}}/g, '') as string;
const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.');
const keyPath = keyPaths.join('.');
const executionStep = executionSteps.find((executionStep) => {
return executionStep.stepId === stepId;
});
const data = executionStep?.dataOut;
const dataValue = get(data, keyPath);
return dataValue;
}
return part;
})
.join('');
return {
...result,
[key]: computedValue,
};
}
return result;
}, {});
}

View File

@@ -1,13 +1,19 @@
import { ExpressAdapter } from '@bull-board/express';
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import processorQueue from '../queues/processor';
import flowQueue from '../queues/flow';
import triggerQueue from '../queues/trigger';
import actionQueue from '../queues/action';
const serverAdapter = new ExpressAdapter();
const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => {
createBullBoard({
queues: [new BullMQAdapter(processorQueue)],
queues: [
new BullMQAdapter(flowQueue),
new BullMQAdapter(triggerQueue),
new BullMQAdapter(actionQueue),
],
serverAdapter: serverAdapter,
});
};

View File

@@ -2,17 +2,29 @@ import createHttpClient from './http-client';
import Connection from '../models/connection';
import Flow from '../models/flow';
import Step from '../models/step';
import Execution from '../models/execution';
import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types';
type GlobalVariableOptions = {
connection?: Connection;
app: IApp;
flow?: Flow;
step?: Step;
execution?: Execution;
testRun?: boolean;
};
const globalVariable = async (
connection: Connection,
appData: IApp,
flow?: Flow,
currentStep?: Step
options: GlobalVariableOptions
): Promise<IGlobalVariable> => {
const { connection, app, flow, step, execution, testRun = false } = options;
const lastInternalId = await flow?.lastInternalId();
return {
const trigger = await step?.getTriggerCommand();
const nextStep = await step?.getNextStep();
const variable: IGlobalVariable = {
auth: {
set: async (args: IJSONObject) => {
if (connection) {
@@ -28,17 +40,39 @@ const globalVariable = async (
},
data: connection?.formattedData,
},
app: appData,
http: createHttpClient({ baseURL: appData.baseUrl }),
db: {
flow: {
lastInternalId,
},
step: {
parameters: currentStep?.parameters || {},
},
app: app,
http: createHttpClient({ baseURL: app.baseUrl }),
flow: {
id: flow?.id,
lastInternalId,
},
step: {
id: step?.id,
appKey: step?.appKey,
parameters: step?.parameters || {},
},
nextStep: {
id: nextStep?.id,
appKey: nextStep?.appKey,
parameters: nextStep?.parameters || {},
},
execution: {
id: execution?.id,
testRun,
},
};
if (trigger && trigger.dedupeStrategy === 'unique') {
const lastInternalIds = await flow?.lastInternalIds();
const isAlreadyProcessed = (internalId: string) => {
return lastInternalIds?.includes(internalId);
};
variable.flow.isAlreadyProcessed = isAlreadyProcessed;
}
return variable;
};
export default globalVariable;

View File

@@ -50,6 +50,10 @@ class ExecutionStep extends Base {
},
});
get isFailed() {
return this.status === 'failure';
}
async $afterInsert(queryContext: QueryContext) {
await super.$afterInsert(queryContext);
Telemetry.executionStepCreated(this);

View File

@@ -13,6 +13,7 @@ class Flow extends Base {
active: boolean;
steps: Step[];
published_at: string;
executions?: Execution[];
static tableName = 'flows';
@@ -58,6 +59,15 @@ class Flow extends Base {
return lastExecution ? (lastExecution as Execution).internalId : null;
}
async lastInternalIds(itemCount = 50) {
const lastExecutions = await this.$relatedQuery('executions')
.select('internal_id')
.orderBy('created_at', 'desc')
.limit(itemCount);
return lastExecutions.map((execution) => execution.internalId);
}
async $beforeUpdate(
opt: ModelOptions,
queryContext: QueryContext

View File

@@ -92,16 +92,43 @@ class Step extends Base {
return this.type === 'trigger';
}
async getTrigger() {
if (!this.isTrigger) return null;
get isAction(): boolean {
return this.type === 'action';
}
const { appKey, key } = this;
async getApp() {
if (!this.appKey) return null;
return await App.findOneByKey(this.appKey);
}
async getNextStep() {
const flow = await this.$relatedQuery('flow');
return await flow
.$relatedQuery('steps')
.findOne({ position: this.position + 1 });
}
async getTriggerCommand() {
const { appKey, key, isTrigger } = this;
if (!isTrigger || !appKey || !key) return null;
const app = await App.findOneByKey(appKey);
const command = app.triggers.find((trigger) => trigger.key === key);
return command;
}
async getActionCommand() {
const { appKey, key, isAction } = this;
if (!isAction || !appKey || !key) return null;
const app = await App.findOneByKey(appKey);
const command = app.actions.find((action) => action.key === key);
return command;
}
}
export default Step;

View File

@@ -0,0 +1,25 @@
import process from 'process';
import { Queue } from 'bullmq';
import redisConfig from '../config/redis';
import logger from '../helpers/logger';
const CONNECTION_REFUSED = 'ECONNREFUSED';
const redisConnection = {
connection: redisConfig,
};
const actionQueue = new Queue('action', redisConnection);
process.on('SIGTERM', async () => {
await actionQueue.close();
});
actionQueue.on('error', (err) => {
if ((err as any).code === CONNECTION_REFUSED) {
logger.error('Make sure you have installed Redis and it is running.', err);
process.exit();
}
});
export default actionQueue;

View File

@@ -9,18 +9,18 @@ const redisConnection = {
connection: redisConfig,
};
const processorQueue = new Queue('processor', redisConnection);
const queueScheduler = new QueueScheduler('processor', redisConnection);
const flowQueue = new Queue('flow', redisConnection);
const queueScheduler = new QueueScheduler('flow', redisConnection);
process.on('SIGTERM', async () => {
await queueScheduler.close();
});
processorQueue.on('error', (err) => {
flowQueue.on('error', (err) => {
if ((err as any).code === CONNECTION_REFUSED) {
logger.error('Make sure you have installed Redis and it is running.', err);
process.exit();
}
});
export default processorQueue;
export default flowQueue;

View File

@@ -0,0 +1,25 @@
import process from 'process';
import { Queue } from 'bullmq';
import redisConfig from '../config/redis';
import logger from '../helpers/logger';
const CONNECTION_REFUSED = 'ECONNREFUSED';
const redisConnection = {
connection: redisConfig,
};
const triggerQueue = new Queue('trigger', redisConnection);
process.on('SIGTERM', async () => {
await triggerQueue.close();
});
triggerQueue.on('error', (err) => {
if ((err as any).code === CONNECTION_REFUSED) {
logger.error('Make sure you have installed Redis and it is running.', err);
process.exit();
}
});
export default triggerQueue;

View File

@@ -0,0 +1,55 @@
import Step from '../models/step';
import Flow from '../models/flow';
import Execution from '../models/execution';
import ExecutionStep from '../models/execution-step';
import computeParameters from '../helpers/compute-parameters';
import globalVariable from '../helpers/global-variable';
type ProcessActionOptions = {
flowId: string;
executionId: string;
stepId: string;
};
export const processAction = async (options: ProcessActionOptions) => {
const { flowId, stepId, executionId } = options;
const step = await Step.query().findById(stepId).throwIfNotFound();
const execution = await Execution.query()
.findById(executionId)
.throwIfNotFound();
const $ = await globalVariable({
flow: await Flow.query().findById(flowId).throwIfNotFound(),
app: await step.getApp(),
step: step,
connection: await step.$relatedQuery('connection'),
execution: execution,
});
const priorExecutionSteps = await ExecutionStep.query().where({
execution_id: $.execution.id,
});
const computedParameters = computeParameters(
$.step.parameters,
priorExecutionSteps
);
const actionCommand = await step.getActionCommand();
$.step.parameters = computedParameters;
const actionOutput = await actionCommand.run($);
const executionStep = await execution
.$relatedQuery('executionSteps')
.insertAndFetch({
stepId: $.step.id,
status: actionOutput.error ? 'failure' : 'success',
dataIn: computedParameters,
dataOut: actionOutput.error ? null : actionOutput.data.raw,
errorDetails: actionOutput.error,
});
return { flowId, stepId, executionId, executionStep };
};

View File

@@ -0,0 +1,24 @@
import Flow from '../models/flow';
import globalVariable from '../helpers/global-variable';
type ProcessFlowOptions = {
flowId: string;
testRun?: boolean;
};
export const processFlow = async (options: ProcessFlowOptions) => {
const flow = await Flow.query().findById(options.flowId).throwIfNotFound();
const triggerStep = await flow.getTriggerStep();
const triggerCommand = await triggerStep.getTriggerCommand();
const $ = await globalVariable({
flow,
connection: await triggerStep.$relatedQuery('connection'),
app: await triggerStep.getApp(),
step: triggerStep,
testRun: options.testRun,
});
return await triggerCommand.run($);
};

View File

@@ -1,239 +0,0 @@
import get from 'lodash.get';
import { IJSONObject } from '@automatisch/types';
import App from '../models/app';
import Flow from '../models/flow';
import Step from '../models/step';
import Execution from '../models/execution';
import ExecutionStep from '../models/execution-step';
import globalVariable from '../helpers/global-variable';
type ExecutionSteps = Record<string, ExecutionStep>;
type ProcessorOptions = {
untilStep?: Step;
testRun?: boolean;
};
class Processor {
flow: Flow;
untilStep?: Step;
testRun?: boolean;
static variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g;
constructor(flow: Flow, processorOptions: ProcessorOptions) {
this.flow = flow;
this.untilStep = processorOptions.untilStep;
this.testRun = processorOptions.testRun;
}
async run() {
const steps = await this.flow
.$relatedQuery('steps')
.withGraphFetched('connection')
.orderBy('position', 'asc');
const triggerStep = steps.find((step) => step.type === 'trigger');
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const initialTriggerData = await this.getInitialTriggerData(triggerStep!);
if (!initialTriggerData.error && initialTriggerData.data.length === 0) {
const lastInternalId = await this.flow.lastInternalId();
const executionData: Partial<Execution> = {
flowId: this.flow.id,
testRun: this.testRun,
};
if (lastInternalId) {
executionData.internalId = lastInternalId;
}
await Execution.query().insert(executionData);
return;
}
if (this.testRun && initialTriggerData.data.length > 0) {
initialTriggerData.data = [initialTriggerData.data[0]];
}
if (initialTriggerData.data.length > 1) {
initialTriggerData.data = initialTriggerData.data.sort(
(item: IJSONObject, nextItem: IJSONObject) => {
return (item.id as number) - (nextItem.id as number);
}
);
}
const executions: Execution[] = [];
for await (const data of initialTriggerData.data) {
const execution = await Execution.query().insert({
flowId: this.flow.id,
testRun: this.testRun,
internalId: data.id as string,
});
executions.push(execution);
let previousExecutionStep: ExecutionStep;
const priorExecutionSteps: ExecutionSteps = {};
let fetchedActionData: {
data: IJSONObject | null;
error: IJSONObject | null;
} = {
data: null,
error: null,
};
for await (const step of steps) {
if (!step.appKey) continue;
const { appKey, key, type, parameters: rawParameters = {}, id } = step;
const isTrigger = type === 'trigger';
const app = await App.findOneByKey(appKey);
const computedParameters = Processor.computeParameters(
rawParameters,
priorExecutionSteps
);
const clonedStep = Object.assign({}, step);
clonedStep.parameters = computedParameters;
const $ = await globalVariable(
step.connection,
app,
this.flow,
clonedStep
);
if (!isTrigger && key) {
const command = app.actions.find((action) => action.key === key);
fetchedActionData = await command.run($);
}
if (!isTrigger && fetchedActionData.error) {
await execution.$relatedQuery('executionSteps').insertAndFetch({
stepId: id,
status: 'failure',
dataIn: null,
dataOut: computedParameters,
errorDetails: fetchedActionData.error,
});
break;
}
previousExecutionStep = await execution
.$relatedQuery('executionSteps')
.insertAndFetch({
stepId: id,
status: 'success',
dataIn: isTrigger ? rawParameters : computedParameters,
dataOut: isTrigger ? data : fetchedActionData.data,
});
priorExecutionSteps[id] = previousExecutionStep;
if (id === this.untilStep?.id) {
break;
}
}
}
if (initialTriggerData.error) {
const executionWithError = await Execution.query().insert({
flowId: this.flow.id,
testRun: this.testRun,
});
executions.push(executionWithError);
await executionWithError.$relatedQuery('executionSteps').insertAndFetch({
stepId: triggerStep.id,
status: 'failure',
dataIn: triggerStep.parameters,
errorDetails: initialTriggerData.error,
});
}
if (!this.testRun) return;
const lastExecutionStepFromFirstExecution = await executions[0]
.$relatedQuery('executionSteps')
.orderBy('created_at', 'desc')
.first();
return lastExecutionStepFromFirstExecution;
}
async getInitialTriggerData(step: Step) {
if (!step.appKey || !step.key) return null;
const app = await App.findOneByKey(step.appKey);
const $ = await globalVariable(
step.connection,
app,
this.flow,
step,
)
const command = app.triggers.find((trigger) => trigger.key === step.key);
let fetchedData;
if (this.testRun) {
fetchedData = await command.testRun($);
} else {
fetchedData = await command.run($);
}
return fetchedData;
}
static computeParameters(
parameters: Step['parameters'],
executionSteps: ExecutionSteps
): Step['parameters'] {
const entries = Object.entries(parameters);
return entries.reduce((result, [key, value]: [string, unknown]) => {
if (typeof value === 'string') {
const parts = value.split(Processor.variableRegExp);
const computedValue = parts
.map((part: string) => {
const isVariable = part.match(Processor.variableRegExp);
if (isVariable) {
const stepIdAndKeyPath = part.replace(
/{{step.|}}/g,
''
) as string;
const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.');
const keyPath = keyPaths.join('.');
const executionStep = executionSteps[stepId.toString() as string];
const data = executionStep?.dataOut;
const dataValue = get(data, keyPath);
return dataValue;
}
return part;
})
.join('');
return {
...result,
[key]: computedValue,
};
}
return result;
}, {});
}
}
export default Processor;

View File

@@ -0,0 +1,65 @@
import Step from '../models/step';
import { processFlow } from '../services/flow';
import { processTrigger } from '../services/trigger';
import { processAction } from '../services/action';
type TestRunOptions = {
stepId: string;
};
const testRun = async (options: TestRunOptions) => {
const untilStep = await Step.query()
.findById(options.stepId)
.throwIfNotFound();
const flow = await untilStep.$relatedQuery('flow');
const [triggerStep, ...actionSteps] = await flow
.$relatedQuery('steps')
.withGraphFetched('connection')
.orderBy('position', 'asc');
const { data, error: triggerError } = await processFlow({
flowId: flow.id,
testRun: true,
});
if (triggerError) {
const { executionStep: triggerExecutionStepWithError } =
await processTrigger({
flowId: flow.id,
stepId: triggerStep.id,
error: triggerError,
testRun: true,
});
return { executionStep: triggerExecutionStepWithError };
}
const firstTriggerDataItem = data[0];
const { executionId, executionStep: triggerExecutionStep } =
await processTrigger({
flowId: flow.id,
stepId: triggerStep.id,
triggerDataItem: firstTriggerDataItem,
testRun: true,
});
if (triggerStep.id === untilStep.id) {
return { executionStep: triggerExecutionStep };
}
for (const actionStep of actionSteps) {
const { executionStep: actionExecutionStep } = await processAction({
flowId: flow.id,
stepId: actionStep.id,
executionId,
});
if (actionStep.id === untilStep.id || actionExecutionStep.isFailed) {
return { executionStep: actionExecutionStep };
}
}
};
export default testRun;

View File

@@ -0,0 +1,46 @@
import { IJSONObject, ITriggerDataItem } from '@automatisch/types';
import Step from '../models/step';
import Flow from '../models/flow';
import Execution from '../models/execution';
import globalVariable from '../helpers/global-variable';
type ProcessTriggerOptions = {
flowId: string;
stepId: string;
triggerDataItem?: ITriggerDataItem;
error?: IJSONObject;
testRun?: boolean;
};
export const processTrigger = async (options: ProcessTriggerOptions) => {
const { flowId, stepId, triggerDataItem, error, testRun } = options;
const step = await Step.query().findById(stepId).throwIfNotFound();
const $ = await globalVariable({
flow: await Flow.query().findById(flowId).throwIfNotFound(),
app: await step.getApp(),
step: step,
connection: await step.$relatedQuery('connection'),
});
// check if we already process this trigger data item or not!
const execution = await Execution.query().insert({
flowId: $.flow.id,
testRun,
internalId: triggerDataItem?.meta.internalId,
});
const executionStep = await execution
.$relatedQuery('executionSteps')
.insertAndFetch({
stepId: $.step.id,
status: error ? 'failure' : 'success',
dataIn: $.step.parameters,
dataOut: !error ? triggerDataItem.raw : null,
errorDetails: error,
});
return { flowId, stepId, executionId: execution.id, executionStep };
};

View File

@@ -1,5 +1,7 @@
import './config/orm';
export { worker } from './workers/processor';
import './workers/flow';
import './workers/trigger';
import './workers/action';
import telemetry from './helpers/telemetry';
telemetry.setServiceType('worker');

View File

@@ -0,0 +1,51 @@
import { Worker } from 'bullmq';
import redisConfig from '../config/redis';
import logger from '../helpers/logger';
import Step from '../models/step';
import actionQueue from '../queues/action';
import { processAction } from '../services/action';
type JobData = {
flowId: string;
executionId: string;
stepId: string;
};
export const worker = new Worker(
'action',
async (job) => {
const { stepId, flowId, executionId } = await processAction(
job.data as JobData
);
const step = await Step.query().findById(stepId).throwIfNotFound();
const nextStep = await step.getNextStep();
if (!nextStep) return;
const jobName = `${executionId}-${nextStep.id}`;
const jobPayload = {
flowId,
executionId,
stepId: nextStep.id,
};
await actionQueue.add(jobName, jobPayload);
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
});
worker.on('failed', (job, err) => {
logger.info(
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed22 to start with ${err.message}`
);
});
process.on('SIGTERM', async () => {
await worker.close();
});

View File

@@ -0,0 +1,57 @@
import { Worker } from 'bullmq';
import redisConfig from '../config/redis';
import logger from '../helpers/logger';
import triggerQueue from '../queues/trigger';
import { processFlow } from '../services/flow';
import Flow from '../models/flow';
export const worker = new Worker(
'flow',
async (job) => {
const { flowId } = job.data;
const flow = await Flow.query().findById(flowId).throwIfNotFound();
const triggerStep = await flow.getTriggerStep();
const { data, error } = await processFlow({ flowId });
for (const triggerDataItem of data) {
const jobName = `${triggerStep.id}-${triggerDataItem.meta.internalId}`;
const jobPayload = {
flowId,
stepId: triggerStep.id,
triggerDataItem,
};
await triggerQueue.add(jobName, jobPayload);
}
if (error) {
const jobName = `${triggerStep.id}-error`;
const jobPayload = {
flowId,
stepId: triggerStep.id,
error,
};
await triggerQueue.add(jobName, jobPayload);
}
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
});
worker.on('failed', (job, err) => {
logger.info(
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}`
);
});
process.on('SIGTERM', async () => {
await worker.close();
});

View File

@@ -1,30 +0,0 @@
import { Worker } from 'bullmq';
import Processor from '../services/processor';
import redisConfig from '../config/redis';
import Flow from '../models/flow';
import logger from '../helpers/logger';
export const worker = new Worker(
'processor',
async (job) => {
const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound();
const data = await new Processor(flow, { testRun: false }).run();
return data;
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has completed!`);
});
worker.on('failed', (job, err) => {
logger.info(
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed with ${err.message}`
);
});
process.on('SIGTERM', async () => {
await worker.close();
});

View File

@@ -0,0 +1,52 @@
import { Worker } from 'bullmq';
import redisConfig from '../config/redis';
import logger from '../helpers/logger';
import { IJSONObject, ITriggerDataItem } from '@automatisch/types';
import actionQueue from '../queues/action';
import Step from '../models/step';
import { processTrigger } from '../services/trigger';
type JobData = {
flowId: string;
stepId: string;
triggerDataItem?: ITriggerDataItem;
error?: IJSONObject;
};
export const worker = new Worker(
'trigger',
async (job) => {
const { flowId, executionId, stepId, executionStep } = await processTrigger(
job.data as JobData
);
if (executionStep.isFailed) return;
const step = await Step.query().findById(stepId).throwIfNotFound();
const nextStep = await step.getNextStep();
const jobName = `${executionId}-${nextStep.id}`;
const jobPayload = {
flowId,
executionId,
stepId: nextStep.id,
};
await actionQueue.add(jobName, jobPayload);
},
{ connection: redisConfig }
);
worker.on('completed', (job) => {
logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`);
});
worker.on('failed', (job, err) => {
logger.info(
`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}`
);
});
process.on('SIGTERM', async () => {
await worker.close();
});