diff --git a/packages/backend/src/apps/scheduler/triggers/every-day/index.ts b/packages/backend/src/apps/scheduler/triggers/every-day/index.ts index b765cb75..b92de092 100644 --- a/packages/backend/src/apps/scheduler/triggers/every-day/index.ts +++ b/packages/backend/src/apps/scheduler/triggers/every-day/index.ts @@ -24,13 +24,13 @@ export default { options: [ { label: 'Yes', - value: true + value: true, }, { label: 'No', - value: false - } - ] + value: false, + }, + ], }, { label: 'Time of day', @@ -42,111 +42,111 @@ export default { options: [ { label: '00:00', - value: 0 + value: 0, }, { label: '01:00', - value: 1 + value: 1, }, { label: '02:00', - value: 2 + value: 2, }, { label: '03:00', - value: 3 + value: 3, }, { label: '04:00', - value: 4 + value: 4, }, { label: '05:00', - value: 5 + value: 5, }, { label: '06:00', - value: 6 + value: 6, }, { label: '07:00', - value: 7 + value: 7, }, { label: '08:00', - value: 8 + value: 8, }, { label: '09:00', - value: 9 + value: 9, }, { label: '10:00', - value: 10 + value: 10, }, { label: '11:00', - value: 11 + value: 11, }, { label: '12:00', - value: 12 + value: 12, }, { label: '13:00', - value: 13 + value: 13, }, { label: '14:00', - value: 14 + value: 14, }, { label: '15:00', - value: 15 + value: 15, }, { label: '16:00', - value: 16 + value: 16, }, { label: '17:00', - value: 17 + value: 17, }, { label: '18:00', - value: 18 + value: 18, }, { label: '19:00', - value: 19 + value: 19, }, { label: '20:00', - value: 20 + value: 20, }, { label: '21:00', - value: 21 + value: 21, }, { label: '22:00', - value: 22 + value: 22, }, { label: '23:00', - value: 23 - } - ] - } - ] + value: 23, + }, + ], + }, + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) { + getInterval(parameters: IGlobalVariable['step']['parameters']) { if (parameters.triggersOnWeekend as boolean) { return cronTimes.everyDayAt(parameters.hour as number); } @@ -156,14 +156,20 @@ export default { async run($: IGlobalVariable, startDateTime: Date) { const dateTime = DateTime.fromJSDate(startDateTime); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue; + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + dateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, async testRun($: IGlobalVariable) { - const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters)); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue; + const nextCronDateTime = getNextCronDateTime( + this.getInterval($.step.parameters) + ); + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + nextCronDateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, diff --git a/packages/backend/src/apps/scheduler/triggers/every-hour/index.ts b/packages/backend/src/apps/scheduler/triggers/every-hour/index.ts index 063e81d1..b0c11cd8 100644 --- a/packages/backend/src/apps/scheduler/triggers/every-hour/index.ts +++ b/packages/backend/src/apps/scheduler/triggers/every-hour/index.ts @@ -24,25 +24,25 @@ export default { options: [ { label: 'Yes', - value: true + value: true, }, { label: 'No', - value: false - } - ] - } - ] + value: false, + }, + ], + }, + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) { + getInterval(parameters: IGlobalVariable['step']['parameters']) { if (parameters.triggersOnWeekend) { - return cronTimes.everyHour + return cronTimes.everyHour; } return cronTimes.everyHourExcludingWeekends; @@ -50,14 +50,20 @@ export default { async run($: IGlobalVariable, startDateTime: Date) { const dateTime = DateTime.fromJSDate(startDateTime); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue; + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + dateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, async testRun($: IGlobalVariable) { - const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters)); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue; + const nextCronDateTime = getNextCronDateTime( + this.getInterval($.step.parameters) + ); + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + nextCronDateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, diff --git a/packages/backend/src/apps/scheduler/triggers/every-month/index.ts b/packages/backend/src/apps/scheduler/triggers/every-month/index.ts index d09a3935..5850503b 100644 --- a/packages/backend/src/apps/scheduler/triggers/every-month/index.ts +++ b/packages/backend/src/apps/scheduler/triggers/every-month/index.ts @@ -23,129 +23,129 @@ export default { options: [ { label: 1, - value: 1 + value: 1, }, { label: 2, - value: 2 + value: 2, }, { label: 3, - value: 3 + value: 3, }, { label: 4, - value: 4 + value: 4, }, { label: 5, - value: 5 + value: 5, }, { label: 6, - value: 6 + value: 6, }, { label: 7, - value: 7 + value: 7, }, { label: 8, - value: 8 + value: 8, }, { label: 9, - value: 9 + value: 9, }, { label: 10, - value: 10 + value: 10, }, { label: 11, - value: 11 + value: 11, }, { label: 12, - value: 12 + value: 12, }, { label: 13, - value: 13 + value: 13, }, { label: 14, - value: 14 + value: 14, }, { label: 15, - value: 15 + value: 15, }, { label: 16, - value: 16 + value: 16, }, { label: 17, - value: 17 + value: 17, }, { label: 18, - value: 18 + value: 18, }, { label: 19, - value: 19 + value: 19, }, { label: 20, - value: 20 + value: 20, }, { label: 21, - value: 21 + value: 21, }, { label: 22, - value: 22 + value: 22, }, { label: 23, - value: 23 + value: 23, }, { label: 24, - value: 24 + value: 24, }, { label: 25, - value: 25 + value: 25, }, { label: 26, - value: 26 + value: 26, }, { label: 27, - value: 27 + value: 27, }, { label: 28, - value: 28 + value: 28, }, { label: 29, - value: 29 + value: 29, }, { label: 30, - value: 30 + value: 30, }, { label: 31, - value: 31 - } - ] + value: 31, + }, + ], }, { label: 'Time of day', @@ -157,126 +157,135 @@ export default { options: [ { label: '00:00', - value: 0 + value: 0, }, { label: '01:00', - value: 1 + value: 1, }, { label: '02:00', - value: 2 + value: 2, }, { label: '03:00', - value: 3 + value: 3, }, { label: '04:00', - value: 4 + value: 4, }, { label: '05:00', - value: 5 + value: 5, }, { label: '06:00', - value: 6 + value: 6, }, { label: '07:00', - value: 7 + value: 7, }, { label: '08:00', - value: 8 + value: 8, }, { label: '09:00', - value: 9 + value: 9, }, { label: '10:00', - value: 10 + value: 10, }, { label: '11:00', - value: 11 + value: 11, }, { label: '12:00', - value: 12 + value: 12, }, { label: '13:00', - value: 13 + value: 13, }, { label: '14:00', - value: 14 + value: 14, }, { label: '15:00', - value: 15 + value: 15, }, { label: '16:00', - value: 16 + value: 16, }, { label: '17:00', - value: 17 + value: 17, }, { label: '18:00', - value: 18 + value: 18, }, { label: '19:00', - value: 19 + value: 19, }, { label: '20:00', - value: 20 + value: 20, }, { label: '21:00', - value: 21 + value: 21, }, { label: '22:00', - value: 22 + value: 22, }, { label: '23:00', - value: 23 - } - ] - } - ] + value: 23, + }, + ], + }, + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) { - const interval = cronTimes.everyMonthOnAndAt(parameters.day as number, parameters.hour as number); + getInterval(parameters: IGlobalVariable['step']['parameters']) { + const interval = cronTimes.everyMonthOnAndAt( + parameters.day as number, + parameters.hour as number + ); return interval; }, async run($: IGlobalVariable, startDateTime: Date) { const dateTime = DateTime.fromJSDate(startDateTime); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue; + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + dateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, async testRun($: IGlobalVariable) { - const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters)); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue; + const nextCronDateTime = getNextCronDateTime( + this.getInterval($.step.parameters) + ); + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + nextCronDateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, diff --git a/packages/backend/src/apps/scheduler/triggers/every-week/index.ts b/packages/backend/src/apps/scheduler/triggers/every-week/index.ts index 0fa3b4ab..878b005b 100644 --- a/packages/backend/src/apps/scheduler/triggers/every-week/index.ts +++ b/packages/backend/src/apps/scheduler/triggers/every-week/index.ts @@ -23,33 +23,33 @@ export default { options: [ { label: 'Monday', - value: 1 + value: 1, }, { label: 'Tuesday', - value: 2 + value: 2, }, { label: 'Wednesday', - value: 3 + value: 3, }, { label: 'Thursday', - value: 4 + value: 4, }, { label: 'Friday', - value: 5 + value: 5, }, { label: 'Saturday', - value: 6 + value: 6, }, { label: 'Sunday', - value: 0 - } - ] + value: 0, + }, + ], }, { label: 'Time of day', @@ -61,126 +61,135 @@ export default { options: [ { label: '00:00', - value: 0 + value: 0, }, { label: '01:00', - value: 1 + value: 1, }, { label: '02:00', - value: 2 + value: 2, }, { label: '03:00', - value: 3 + value: 3, }, { label: '04:00', - value: 4 + value: 4, }, { label: '05:00', - value: 5 + value: 5, }, { label: '06:00', - value: 6 + value: 6, }, { label: '07:00', - value: 7 + value: 7, }, { label: '08:00', - value: 8 + value: 8, }, { label: '09:00', - value: 9 + value: 9, }, { label: '10:00', - value: 10 + value: 10, }, { label: '11:00', - value: 11 + value: 11, }, { label: '12:00', - value: 12 + value: 12, }, { label: '13:00', - value: 13 + value: 13, }, { label: '14:00', - value: 14 + value: 14, }, { label: '15:00', - value: 15 + value: 15, }, { label: '16:00', - value: 16 + value: 16, }, { label: '17:00', - value: 17 + value: 17, }, { label: '18:00', - value: 18 + value: 18, }, { label: '19:00', - value: 19 + value: 19, }, { label: '20:00', - value: 20 + value: 20, }, { label: '21:00', - value: 21 + value: 21, }, { label: '22:00', - value: 22 + value: 22, }, { label: '23:00', - value: 23 - } - ] - } - ] + value: 23, + }, + ], + }, + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) { - const interval = cronTimes.everyWeekOnAndAt(parameters.weekday as number, parameters.hour as number); + getInterval(parameters: IGlobalVariable['step']['parameters']) { + const interval = cronTimes.everyWeekOnAndAt( + parameters.weekday as number, + parameters.hour as number + ); return interval; }, async run($: IGlobalVariable, startDateTime: Date) { const dateTime = DateTime.fromJSDate(startDateTime); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue; + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + dateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, async testRun($: IGlobalVariable) { - const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters)); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue; + const nextCronDateTime = getNextCronDateTime( + this.getInterval($.step.parameters) + ); + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + nextCronDateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, diff --git a/packages/backend/src/apps/slack/actions/find-message/find-message.ts b/packages/backend/src/apps/slack/actions/find-message/find-message.ts index 57c93041..31206bb6 100644 --- a/packages/backend/src/apps/slack/actions/find-message/find-message.ts +++ b/packages/backend/src/apps/slack/actions/find-message/find-message.ts @@ -1,4 +1,4 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { IGlobalVariable, IActionOutput } from '@automatisch/types'; type FindMessageOptions = { query: string; @@ -8,11 +8,6 @@ type FindMessageOptions = { }; const findMessage = async ($: IGlobalVariable, options: FindMessageOptions) => { - const message: { - data?: IJSONObject; - error?: IJSONObject; - } = {}; - const headers = { Authorization: `Bearer ${$.auth.data.accessToken}`, }; @@ -29,20 +24,14 @@ const findMessage = async ($: IGlobalVariable, options: FindMessageOptions) => { params, }); - if (response.integrationError) { - message.error = response.integrationError; - return message; - } - const data = response.data; - if (!data.ok) { - message.error = data; - return message; - } - - const messages = data.messages.matches; - message.data = messages?.[0]; + const message: IActionOutput = { + data: { + raw: data?.data?.messages.matches[0], + }, + error: response?.integrationError || (!data.ok && data), + }; return message; }; diff --git a/packages/backend/src/apps/slack/actions/find-message/index.ts b/packages/backend/src/apps/slack/actions/find-message/index.ts index c6132e2c..26d7a333 100644 --- a/packages/backend/src/apps/slack/actions/find-message/index.ts +++ b/packages/backend/src/apps/slack/actions/find-message/index.ts @@ -72,7 +72,7 @@ export default { ], async run($: IGlobalVariable) { - const parameters = $.db.step.parameters; + const parameters = $.step.parameters; const query = parameters.query as string; const sortBy = parameters.sortBy as string; const sortDirection = parameters.sortDirection as string; diff --git a/packages/backend/src/apps/slack/actions/send-a-message-to-channel/index.ts b/packages/backend/src/apps/slack/actions/send-a-message-to-channel/index.ts index d2033c1e..d5806108 100644 --- a/packages/backend/src/apps/slack/actions/send-a-message-to-channel/index.ts +++ b/packages/backend/src/apps/slack/actions/send-a-message-to-channel/index.ts @@ -49,8 +49,8 @@ export default { ], async run($: IGlobalVariable) { - const channelId = $.db.step.parameters.channel as string; - const text = $.db.step.parameters.message as string; + const channelId = $.step.parameters.channel as string; + const text = $.step.parameters.message as string; const message = await postMessage($, channelId, text); diff --git a/packages/backend/src/apps/slack/actions/send-a-message-to-channel/post-message.ts b/packages/backend/src/apps/slack/actions/send-a-message-to-channel/post-message.ts index 8a499767..40d9969e 100644 --- a/packages/backend/src/apps/slack/actions/send-a-message-to-channel/post-message.ts +++ b/packages/backend/src/apps/slack/actions/send-a-message-to-channel/post-message.ts @@ -1,18 +1,10 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { IGlobalVariable, IActionOutput } from '@automatisch/types'; const postMessage = async ( $: IGlobalVariable, channelId: string, text: string ) => { - const message: { - data: IJSONObject | null | undefined; - error: IJSONObject | null | undefined; - } = { - data: null, - error: null, - }; - const headers = { Authorization: `Bearer ${$.auth.data.accessToken}`, }; @@ -24,8 +16,12 @@ const postMessage = async ( const response = await $.http.post('/chat.postMessage', params, { headers }); - message.error = response?.integrationError; - message.data = response?.data?.message; + const message: IActionOutput = { + data: { + raw: response?.data?.message, + }, + error: response?.integrationError, + }; if (response.data.ok === false) { message.error = response.data; diff --git a/packages/backend/src/apps/twitter/common/get-user-followers.ts b/packages/backend/src/apps/twitter/common/get-user-followers.ts index 0d4e1e64..1874f61a 100644 --- a/packages/backend/src/apps/twitter/common/get-user-followers.ts +++ b/packages/backend/src/apps/twitter/common/get-user-followers.ts @@ -1,4 +1,8 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { + IGlobalVariable, + IJSONObject, + ITriggerOutput, +} from '@automatisch/types'; import { URLSearchParams } from 'url'; import { omitBy, isEmpty } from 'lodash'; import generateRequest from './generate-request'; @@ -14,12 +18,8 @@ const getUserFollowers = async ( ) => { let response; - const followers: { - data: IJSONObject[]; - error: IJSONObject | null; - } = { + const followers: ITriggerOutput = { data: [], - error: null, }; do { @@ -49,19 +49,19 @@ const getUserFollowers = async ( } if (response.data.meta.result_count > 0) { - response.data.data.forEach((tweet: IJSONObject) => { - if ( - !options.lastInternalId || - Number(tweet.id) > Number(options.lastInternalId) - ) { - followers.data.push(tweet); - } else { - return; - } + response.data.data.forEach((follower: IJSONObject) => { + followers.data.push({ + raw: follower, + meta: { internalId: follower.id as string }, + }); }); } } while (response.data.meta.next_token && options.lastInternalId); + followers.data.sort((follower, nextFollower) => { + return (follower.raw.id as number) - (nextFollower.raw.id as number); + }); + return followers; }; diff --git a/packages/backend/src/apps/twitter/common/get-user-tweets.ts b/packages/backend/src/apps/twitter/common/get-user-tweets.ts index 901d22a3..4cce058f 100644 --- a/packages/backend/src/apps/twitter/common/get-user-tweets.ts +++ b/packages/backend/src/apps/twitter/common/get-user-tweets.ts @@ -1,4 +1,8 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { + IGlobalVariable, + IJSONObject, + ITriggerOutput, +} from '@automatisch/types'; import { URLSearchParams } from 'url'; import omitBy from 'lodash/omitBy'; import isEmpty from 'lodash/isEmpty'; @@ -22,19 +26,15 @@ const getUserTweets = async ( const currentUser = await getCurrentUser($); username = currentUser.username as string; } else { - username = $.db.step.parameters.username as string; + username = $.step.parameters.username as string; } const user = await getUserByUsername($, username); let response; - const tweets: { - data: IJSONObject[]; - error: IJSONObject | null; - } = { + const tweets: ITriggerOutput = { data: [], - error: null, }; do { @@ -61,18 +61,18 @@ const getUserTweets = async ( if (response.data.meta.result_count > 0) { response.data.data.forEach((tweet: IJSONObject) => { - if ( - !options.lastInternalId || - Number(tweet.id) > Number(options.lastInternalId) - ) { - tweets.data.push(tweet); - } else { - return; - } + tweets.data.push({ + raw: tweet, + meta: { internalId: tweet.id as string }, + }); }); } } while (response.data.meta.next_token && options.lastInternalId); + tweets.data.sort((tweet, nextTweet) => { + return (tweet.raw.id as number) - (nextTweet.raw.id as number); + }); + return tweets; }; diff --git a/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts index e6cb95a0..591e6a28 100644 --- a/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts @@ -20,7 +20,7 @@ export default { async run($: IGlobalVariable) { return await getUserTweets($, { currentUser: true, - lastInternalId: $.db.flow.lastInternalId, + lastInternalId: $.flow.lastInternalId, }); }, diff --git a/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts b/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts index c9b4be84..3bc6335d 100644 --- a/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts +++ b/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts @@ -18,7 +18,7 @@ export default { ], async run($: IGlobalVariable) { - return await myFollowers($, $.db.flow.lastInternalId); + return await myFollowers($, $.flow.lastInternalId); }, async testRun($: IGlobalVariable) { diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts index 0f67d908..2eb450aa 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts @@ -32,14 +32,13 @@ export default { async run($: IGlobalVariable) { return await searchTweets($, { - searchTerm: $.db.step.parameters.searchTerm as string, - lastInternalId: $.db.flow.lastInternalId, + searchTerm: $.step.parameters.searchTerm as string, }); }, async testRun($: IGlobalVariable) { return await searchTweets($, { - searchTerm: $.db.step.parameters.searchTerm as string, + searchTerm: $.step.parameters.searchTerm as string, }); }, }; diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index 8e3886c6..a9d38636 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -1,4 +1,8 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { + IGlobalVariable, + IJSONObject, + ITriggerOutput, +} from '@automatisch/types'; import qs from 'qs'; import generateRequest from '../../common/generate-request'; import { omitBy, isEmpty } from 'lodash'; @@ -14,18 +18,14 @@ const searchTweets = async ( ) => { let response; - const tweets: { - data: IJSONObject[]; - error: IJSONObject | null; - } = { + const tweets: ITriggerOutput = { data: [], - error: null, }; do { const params: IJSONObject = { query: options.searchTerm, - since_id: options.lastInternalId, + since_id: $.execution.testRun ? null : $.flow.lastInternalId, pagination_token: response?.data?.meta?.next_token, }; @@ -52,17 +52,21 @@ const searchTweets = async ( if (response.data.meta.result_count > 0) { response.data.data.forEach((tweet: IJSONObject) => { - if ( - !options.lastInternalId || - Number(tweet.id) > Number(options.lastInternalId) - ) { - tweets.data.push(tweet); - } else { - return; - } + const dataItem = { + raw: tweet, + meta: { + internalId: tweet.id as string, + }, + }; + + tweets.data.push(dataItem); }); } - } while (response.data.meta.next_token && options.lastInternalId); + } while (response.data.meta.next_token && !$.execution.testRun); + + tweets.data.sort((tweet, nextTweet) => { + return (tweet.raw.id as number) - (nextTweet.raw.id as number); + }); return tweets; }; diff --git a/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts index 23517c9e..a116e6fe 100644 --- a/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts @@ -32,15 +32,15 @@ export default { async run($: IGlobalVariable) { return await getUserTweets($, { currentUser: false, - userId: $.db.step.parameters.username as string, - lastInternalId: $.db.flow.lastInternalId, + userId: $.step.parameters.username as string, + lastInternalId: $.flow.lastInternalId, }); }, async testRun($: IGlobalVariable) { return await getUserTweets($, { currentUser: false, - userId: $.db.step.parameters.username as string, + userId: $.step.parameters.username as string, }); }, }; diff --git a/packages/backend/src/graphql/mutations/create-auth-data.ts b/packages/backend/src/graphql/mutations/create-auth-data.ts index 91ff8d60..fd3e020b 100644 --- a/packages/backend/src/graphql/mutations/create-auth-data.ts +++ b/packages/backend/src/graphql/mutations/create-auth-data.ts @@ -29,7 +29,7 @@ const createAuthData = async ( .default; const app = await App.findOneByKey(connection.key); - const $ = await globalVariable(connection, app); + const $ = await globalVariable({ connection, app }); await authInstance.createAuthData($); try { diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index 063259af..b35e4a82 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -1,7 +1,5 @@ import Context from '../../types/express/context'; -import Processor from '../../services/processor'; -// eslint-disable-next-line @typescript-eslint/no-unused-vars -import processorQueue from '../../queues/processor'; +import testRun from '../../services/test-run'; type Params = { input: { @@ -14,26 +12,18 @@ const executeFlow = async ( params: Params, context: Context ) => { + const { stepId } = params.input; + const { executionStep } = await testRun({ stepId }); + const untilStep = await context.currentUser .$relatedQuery('steps') - .withGraphFetched('connection') - .findOne({ - 'steps.id': params.input.stepId, - }) - .throwIfNotFound(); - - const flow = await untilStep.$relatedQuery('flow'); - - const executionStep = await new Processor(flow, { - untilStep, - testRun: true, - }).run(); + .findById(stepId); await untilStep.$query().patch({ status: 'completed', }); - if (executionStep.errorDetails) { + if (executionStep.isFailed) { throw new Error(JSON.stringify(executionStep.errorDetails)); } diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 97e6a69e..a81b31e0 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -1,5 +1,5 @@ import Context from '../../types/express/context'; -import processorQueue from '../../queues/processor'; +import flowQueue from '../../queues/flow'; type Params = { input: { @@ -8,7 +8,7 @@ type Params = { }; }; -const JOB_NAME = 'processorJob'; +const JOB_NAME = 'flow'; const EVERY_15_MINUTES_CRON = '*/15 * * * *'; const updateFlowStatus = async ( @@ -32,7 +32,7 @@ const updateFlowStatus = async ( }); const triggerStep = await flow.getTriggerStep(); - const trigger = await triggerStep.getTrigger(); + const trigger = await triggerStep.getTriggerCommand(); const interval = trigger.getInterval?.(triggerStep.parameters); const repeatOptions = { cron: interval || EVERY_15_MINUTES_CRON, @@ -43,8 +43,10 @@ const updateFlowStatus = async ( published_at: new Date().toISOString(), }); - await processorQueue.add( - JOB_NAME, + const jobName = `${JOB_NAME}-${flow.id}`; + + await flowQueue.add( + jobName, { flowId: flow.id }, { repeat: repeatOptions, @@ -52,10 +54,10 @@ const updateFlowStatus = async ( } ); } else { - const repeatableJobs = await processorQueue.getRepeatableJobs(); + const repeatableJobs = await flowQueue.getRepeatableJobs(); const job = repeatableJobs.find((job) => job.id === flow.id); - await processorQueue.removeRepeatableByKey(job.key); + await flowQueue.removeRepeatableByKey(job.key); } return flow; diff --git a/packages/backend/src/graphql/mutations/verify-connection.ts b/packages/backend/src/graphql/mutations/verify-connection.ts index 081752b9..61f119d3 100644 --- a/packages/backend/src/graphql/mutations/verify-connection.ts +++ b/packages/backend/src/graphql/mutations/verify-connection.ts @@ -21,7 +21,7 @@ const verifyConnection = async ( .throwIfNotFound(); const app = await App.findOneByKey(connection.key); - const $ = await globalVariable(connection, app); + const $ = await globalVariable({ connection, app }); await app.auth.verifyCredentials($); connection = await connection.$query().patchAndFetch({ diff --git a/packages/backend/src/graphql/queries/get-data.ts b/packages/backend/src/graphql/queries/get-data.ts index 5b158869..e8ea2acd 100644 --- a/packages/backend/src/graphql/queries/get-data.ts +++ b/packages/backend/src/graphql/queries/get-data.ts @@ -25,7 +25,7 @@ const getData = async (_parent: unknown, params: Params, context: Context) => { if (!connection || !step.appKey) return null; const app = await App.findOneByKey(step.appKey); - const $ = await globalVariable(connection, app, step.flow, step); + const $ = await globalVariable({ connection, app, flow: step.flow, step }); const command = app.data.find((data: IData) => data.key === params.key); diff --git a/packages/backend/src/graphql/queries/test-connection.ts b/packages/backend/src/graphql/queries/test-connection.ts index 5aa90fd1..1fa4d036 100644 --- a/packages/backend/src/graphql/queries/test-connection.ts +++ b/packages/backend/src/graphql/queries/test-connection.ts @@ -20,10 +20,9 @@ const testConnection = async ( .throwIfNotFound(); const app = await App.findOneByKey(connection.key, false); - const $ = await globalVariable(connection, app); + const $ = await globalVariable({ connection, app }); - const isStillVerified = - await app.auth.isStillVerified($); + const isStillVerified = await app.auth.isStillVerified($); connection = await connection.$query().patchAndFetch({ formattedData: connection.formattedData, diff --git a/packages/backend/src/helpers/compute-parameters.ts b/packages/backend/src/helpers/compute-parameters.ts new file mode 100644 index 00000000..c37328bd --- /dev/null +++ b/packages/backend/src/helpers/compute-parameters.ts @@ -0,0 +1,43 @@ +import Step from '../models/step'; +import ExecutionStep from '../models/execution-step'; +import get from 'lodash.get'; + +const variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g; + +export default function computeParameters( + parameters: Step['parameters'], + executionSteps: ExecutionStep[] +): Step['parameters'] { + const entries = Object.entries(parameters); + return entries.reduce((result, [key, value]: [string, unknown]) => { + if (typeof value === 'string') { + const parts = value.split(variableRegExp); + + const computedValue = parts + .map((part: string) => { + const isVariable = part.match(variableRegExp); + if (isVariable) { + const stepIdAndKeyPath = part.replace(/{{step.|}}/g, '') as string; + const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); + const keyPath = keyPaths.join('.'); + const executionStep = executionSteps.find((executionStep) => { + return executionStep.stepId === stepId; + }); + const data = executionStep?.dataOut; + const dataValue = get(data, keyPath); + return dataValue; + } + + return part; + }) + .join(''); + + return { + ...result, + [key]: computedValue, + }; + } + + return result; + }, {}); +} diff --git a/packages/backend/src/helpers/create-bull-board-handler.ts b/packages/backend/src/helpers/create-bull-board-handler.ts index 7525aaa9..c74199e7 100644 --- a/packages/backend/src/helpers/create-bull-board-handler.ts +++ b/packages/backend/src/helpers/create-bull-board-handler.ts @@ -1,13 +1,19 @@ import { ExpressAdapter } from '@bull-board/express'; import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; -import processorQueue from '../queues/processor'; +import flowQueue from '../queues/flow'; +import triggerQueue from '../queues/trigger'; +import actionQueue from '../queues/action'; const serverAdapter = new ExpressAdapter(); const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => { createBullBoard({ - queues: [new BullMQAdapter(processorQueue)], + queues: [ + new BullMQAdapter(flowQueue), + new BullMQAdapter(triggerQueue), + new BullMQAdapter(actionQueue), + ], serverAdapter: serverAdapter, }); }; diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 30a8f6be..e30d212e 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -2,17 +2,29 @@ import createHttpClient from './http-client'; import Connection from '../models/connection'; import Flow from '../models/flow'; import Step from '../models/step'; +import Execution from '../models/execution'; import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types'; +type GlobalVariableOptions = { + connection?: Connection; + app: IApp; + flow?: Flow; + step?: Step; + execution?: Execution; + testRun?: boolean; +}; + const globalVariable = async ( - connection: Connection, - appData: IApp, - flow?: Flow, - currentStep?: Step + options: GlobalVariableOptions ): Promise => { + const { connection, app, flow, step, execution, testRun = false } = options; + const lastInternalId = await flow?.lastInternalId(); - return { + const trigger = await step?.getTriggerCommand(); + const nextStep = await step?.getNextStep(); + + const variable: IGlobalVariable = { auth: { set: async (args: IJSONObject) => { if (connection) { @@ -28,17 +40,39 @@ const globalVariable = async ( }, data: connection?.formattedData, }, - app: appData, - http: createHttpClient({ baseURL: appData.baseUrl }), - db: { - flow: { - lastInternalId, - }, - step: { - parameters: currentStep?.parameters || {}, - }, + app: app, + http: createHttpClient({ baseURL: app.baseUrl }), + flow: { + id: flow?.id, + lastInternalId, + }, + step: { + id: step?.id, + appKey: step?.appKey, + parameters: step?.parameters || {}, + }, + nextStep: { + id: nextStep?.id, + appKey: nextStep?.appKey, + parameters: nextStep?.parameters || {}, + }, + execution: { + id: execution?.id, + testRun, }, }; + + if (trigger && trigger.dedupeStrategy === 'unique') { + const lastInternalIds = await flow?.lastInternalIds(); + + const isAlreadyProcessed = (internalId: string) => { + return lastInternalIds?.includes(internalId); + }; + + variable.flow.isAlreadyProcessed = isAlreadyProcessed; + } + + return variable; }; export default globalVariable; diff --git a/packages/backend/src/models/execution-step.ts b/packages/backend/src/models/execution-step.ts index 2d01e7fb..e063d344 100644 --- a/packages/backend/src/models/execution-step.ts +++ b/packages/backend/src/models/execution-step.ts @@ -50,6 +50,10 @@ class ExecutionStep extends Base { }, }); + get isFailed() { + return this.status === 'failure'; + } + async $afterInsert(queryContext: QueryContext) { await super.$afterInsert(queryContext); Telemetry.executionStepCreated(this); diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index ab714614..bb4015e0 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -13,6 +13,7 @@ class Flow extends Base { active: boolean; steps: Step[]; published_at: string; + executions?: Execution[]; static tableName = 'flows'; @@ -58,6 +59,15 @@ class Flow extends Base { return lastExecution ? (lastExecution as Execution).internalId : null; } + async lastInternalIds(itemCount = 50) { + const lastExecutions = await this.$relatedQuery('executions') + .select('internal_id') + .orderBy('created_at', 'desc') + .limit(itemCount); + + return lastExecutions.map((execution) => execution.internalId); + } + async $beforeUpdate( opt: ModelOptions, queryContext: QueryContext diff --git a/packages/backend/src/models/step.ts b/packages/backend/src/models/step.ts index b69f3d20..1885c7ad 100644 --- a/packages/backend/src/models/step.ts +++ b/packages/backend/src/models/step.ts @@ -92,16 +92,43 @@ class Step extends Base { return this.type === 'trigger'; } - async getTrigger() { - if (!this.isTrigger) return null; + get isAction(): boolean { + return this.type === 'action'; + } - const { appKey, key } = this; + async getApp() { + if (!this.appKey) return null; + + return await App.findOneByKey(this.appKey); + } + + async getNextStep() { + const flow = await this.$relatedQuery('flow'); + + return await flow + .$relatedQuery('steps') + .findOne({ position: this.position + 1 }); + } + + async getTriggerCommand() { + const { appKey, key, isTrigger } = this; + if (!isTrigger || !appKey || !key) return null; const app = await App.findOneByKey(appKey); const command = app.triggers.find((trigger) => trigger.key === key); return command; } + + async getActionCommand() { + const { appKey, key, isAction } = this; + if (!isAction || !appKey || !key) return null; + + const app = await App.findOneByKey(appKey); + const command = app.actions.find((action) => action.key === key); + + return command; + } } export default Step; diff --git a/packages/backend/src/queues/action.ts b/packages/backend/src/queues/action.ts new file mode 100644 index 00000000..44d8b023 --- /dev/null +++ b/packages/backend/src/queues/action.ts @@ -0,0 +1,25 @@ +import process from 'process'; +import { Queue } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; + +const CONNECTION_REFUSED = 'ECONNREFUSED'; + +const redisConnection = { + connection: redisConfig, +}; + +const actionQueue = new Queue('action', redisConnection); + +process.on('SIGTERM', async () => { + await actionQueue.close(); +}); + +actionQueue.on('error', (err) => { + if ((err as any).code === CONNECTION_REFUSED) { + logger.error('Make sure you have installed Redis and it is running.', err); + process.exit(); + } +}); + +export default actionQueue; diff --git a/packages/backend/src/queues/processor.ts b/packages/backend/src/queues/flow.ts similarity index 70% rename from packages/backend/src/queues/processor.ts rename to packages/backend/src/queues/flow.ts index b9b13c37..9353d72e 100644 --- a/packages/backend/src/queues/processor.ts +++ b/packages/backend/src/queues/flow.ts @@ -9,18 +9,18 @@ const redisConnection = { connection: redisConfig, }; -const processorQueue = new Queue('processor', redisConnection); -const queueScheduler = new QueueScheduler('processor', redisConnection); +const flowQueue = new Queue('flow', redisConnection); +const queueScheduler = new QueueScheduler('flow', redisConnection); process.on('SIGTERM', async () => { await queueScheduler.close(); }); -processorQueue.on('error', (err) => { +flowQueue.on('error', (err) => { if ((err as any).code === CONNECTION_REFUSED) { logger.error('Make sure you have installed Redis and it is running.', err); process.exit(); } }); -export default processorQueue; +export default flowQueue; diff --git a/packages/backend/src/queues/trigger.ts b/packages/backend/src/queues/trigger.ts new file mode 100644 index 00000000..4535deb1 --- /dev/null +++ b/packages/backend/src/queues/trigger.ts @@ -0,0 +1,25 @@ +import process from 'process'; +import { Queue } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; + +const CONNECTION_REFUSED = 'ECONNREFUSED'; + +const redisConnection = { + connection: redisConfig, +}; + +const triggerQueue = new Queue('trigger', redisConnection); + +process.on('SIGTERM', async () => { + await triggerQueue.close(); +}); + +triggerQueue.on('error', (err) => { + if ((err as any).code === CONNECTION_REFUSED) { + logger.error('Make sure you have installed Redis and it is running.', err); + process.exit(); + } +}); + +export default triggerQueue; diff --git a/packages/backend/src/services/action.ts b/packages/backend/src/services/action.ts new file mode 100644 index 00000000..bbac7990 --- /dev/null +++ b/packages/backend/src/services/action.ts @@ -0,0 +1,55 @@ +import Step from '../models/step'; +import Flow from '../models/flow'; +import Execution from '../models/execution'; +import ExecutionStep from '../models/execution-step'; +import computeParameters from '../helpers/compute-parameters'; +import globalVariable from '../helpers/global-variable'; + +type ProcessActionOptions = { + flowId: string; + executionId: string; + stepId: string; +}; + +export const processAction = async (options: ProcessActionOptions) => { + const { flowId, stepId, executionId } = options; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const execution = await Execution.query() + .findById(executionId) + .throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + execution: execution, + }); + + const priorExecutionSteps = await ExecutionStep.query().where({ + execution_id: $.execution.id, + }); + + const computedParameters = computeParameters( + $.step.parameters, + priorExecutionSteps + ); + + const actionCommand = await step.getActionCommand(); + + $.step.parameters = computedParameters; + const actionOutput = await actionCommand.run($); + + const executionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: $.step.id, + status: actionOutput.error ? 'failure' : 'success', + dataIn: computedParameters, + dataOut: actionOutput.error ? null : actionOutput.data.raw, + errorDetails: actionOutput.error, + }); + + return { flowId, stepId, executionId, executionStep }; +}; diff --git a/packages/backend/src/services/flow.ts b/packages/backend/src/services/flow.ts new file mode 100644 index 00000000..c11473a2 --- /dev/null +++ b/packages/backend/src/services/flow.ts @@ -0,0 +1,24 @@ +import Flow from '../models/flow'; +import globalVariable from '../helpers/global-variable'; + +type ProcessFlowOptions = { + flowId: string; + testRun?: boolean; +}; + +export const processFlow = async (options: ProcessFlowOptions) => { + const flow = await Flow.query().findById(options.flowId).throwIfNotFound(); + + const triggerStep = await flow.getTriggerStep(); + const triggerCommand = await triggerStep.getTriggerCommand(); + + const $ = await globalVariable({ + flow, + connection: await triggerStep.$relatedQuery('connection'), + app: await triggerStep.getApp(), + step: triggerStep, + testRun: options.testRun, + }); + + return await triggerCommand.run($); +}; diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts deleted file mode 100644 index 896cbf38..00000000 --- a/packages/backend/src/services/processor.ts +++ /dev/null @@ -1,239 +0,0 @@ -import get from 'lodash.get'; -import { IJSONObject } from '@automatisch/types'; - -import App from '../models/app'; -import Flow from '../models/flow'; -import Step from '../models/step'; -import Execution from '../models/execution'; -import ExecutionStep from '../models/execution-step'; -import globalVariable from '../helpers/global-variable'; - -type ExecutionSteps = Record; - -type ProcessorOptions = { - untilStep?: Step; - testRun?: boolean; -}; - -class Processor { - flow: Flow; - untilStep?: Step; - testRun?: boolean; - - static variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g; - - constructor(flow: Flow, processorOptions: ProcessorOptions) { - this.flow = flow; - this.untilStep = processorOptions.untilStep; - this.testRun = processorOptions.testRun; - } - - async run() { - const steps = await this.flow - .$relatedQuery('steps') - .withGraphFetched('connection') - .orderBy('position', 'asc'); - - const triggerStep = steps.find((step) => step.type === 'trigger'); - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const initialTriggerData = await this.getInitialTriggerData(triggerStep!); - - if (!initialTriggerData.error && initialTriggerData.data.length === 0) { - const lastInternalId = await this.flow.lastInternalId(); - - const executionData: Partial = { - flowId: this.flow.id, - testRun: this.testRun, - }; - - if (lastInternalId) { - executionData.internalId = lastInternalId; - } - - await Execution.query().insert(executionData); - - return; - } - - if (this.testRun && initialTriggerData.data.length > 0) { - initialTriggerData.data = [initialTriggerData.data[0]]; - } - - if (initialTriggerData.data.length > 1) { - initialTriggerData.data = initialTriggerData.data.sort( - (item: IJSONObject, nextItem: IJSONObject) => { - return (item.id as number) - (nextItem.id as number); - } - ); - } - - const executions: Execution[] = []; - - for await (const data of initialTriggerData.data) { - const execution = await Execution.query().insert({ - flowId: this.flow.id, - testRun: this.testRun, - internalId: data.id as string, - }); - - executions.push(execution); - - let previousExecutionStep: ExecutionStep; - const priorExecutionSteps: ExecutionSteps = {}; - - let fetchedActionData: { - data: IJSONObject | null; - error: IJSONObject | null; - } = { - data: null, - error: null, - }; - - for await (const step of steps) { - if (!step.appKey) continue; - - const { appKey, key, type, parameters: rawParameters = {}, id } = step; - - const isTrigger = type === 'trigger'; - const app = await App.findOneByKey(appKey); - - const computedParameters = Processor.computeParameters( - rawParameters, - priorExecutionSteps - ); - - const clonedStep = Object.assign({}, step); - clonedStep.parameters = computedParameters; - - const $ = await globalVariable( - step.connection, - app, - this.flow, - clonedStep - ); - - if (!isTrigger && key) { - const command = app.actions.find((action) => action.key === key); - fetchedActionData = await command.run($); - } - - if (!isTrigger && fetchedActionData.error) { - await execution.$relatedQuery('executionSteps').insertAndFetch({ - stepId: id, - status: 'failure', - dataIn: null, - dataOut: computedParameters, - errorDetails: fetchedActionData.error, - }); - - break; - } - - previousExecutionStep = await execution - .$relatedQuery('executionSteps') - .insertAndFetch({ - stepId: id, - status: 'success', - dataIn: isTrigger ? rawParameters : computedParameters, - dataOut: isTrigger ? data : fetchedActionData.data, - }); - - priorExecutionSteps[id] = previousExecutionStep; - - if (id === this.untilStep?.id) { - break; - } - } - } - - if (initialTriggerData.error) { - const executionWithError = await Execution.query().insert({ - flowId: this.flow.id, - testRun: this.testRun, - }); - - executions.push(executionWithError); - - await executionWithError.$relatedQuery('executionSteps').insertAndFetch({ - stepId: triggerStep.id, - status: 'failure', - dataIn: triggerStep.parameters, - errorDetails: initialTriggerData.error, - }); - } - - if (!this.testRun) return; - - const lastExecutionStepFromFirstExecution = await executions[0] - .$relatedQuery('executionSteps') - .orderBy('created_at', 'desc') - .first(); - - return lastExecutionStepFromFirstExecution; - } - - async getInitialTriggerData(step: Step) { - if (!step.appKey || !step.key) return null; - - const app = await App.findOneByKey(step.appKey); - const $ = await globalVariable( - step.connection, - app, - this.flow, - step, - ) - - const command = app.triggers.find((trigger) => trigger.key === step.key); - - let fetchedData; - - if (this.testRun) { - fetchedData = await command.testRun($); - } else { - fetchedData = await command.run($); - } - - return fetchedData; - } - - static computeParameters( - parameters: Step['parameters'], - executionSteps: ExecutionSteps - ): Step['parameters'] { - const entries = Object.entries(parameters); - return entries.reduce((result, [key, value]: [string, unknown]) => { - if (typeof value === 'string') { - const parts = value.split(Processor.variableRegExp); - - const computedValue = parts - .map((part: string) => { - const isVariable = part.match(Processor.variableRegExp); - if (isVariable) { - const stepIdAndKeyPath = part.replace( - /{{step.|}}/g, - '' - ) as string; - const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); - const keyPath = keyPaths.join('.'); - const executionStep = executionSteps[stepId.toString() as string]; - const data = executionStep?.dataOut; - const dataValue = get(data, keyPath); - return dataValue; - } - - return part; - }) - .join(''); - - return { - ...result, - [key]: computedValue, - }; - } - - return result; - }, {}); - } -} - -export default Processor; diff --git a/packages/backend/src/services/test-run.ts b/packages/backend/src/services/test-run.ts new file mode 100644 index 00000000..950aecc3 --- /dev/null +++ b/packages/backend/src/services/test-run.ts @@ -0,0 +1,65 @@ +import Step from '../models/step'; +import { processFlow } from '../services/flow'; +import { processTrigger } from '../services/trigger'; +import { processAction } from '../services/action'; + +type TestRunOptions = { + stepId: string; +}; + +const testRun = async (options: TestRunOptions) => { + const untilStep = await Step.query() + .findById(options.stepId) + .throwIfNotFound(); + + const flow = await untilStep.$relatedQuery('flow'); + const [triggerStep, ...actionSteps] = await flow + .$relatedQuery('steps') + .withGraphFetched('connection') + .orderBy('position', 'asc'); + + const { data, error: triggerError } = await processFlow({ + flowId: flow.id, + testRun: true, + }); + + if (triggerError) { + const { executionStep: triggerExecutionStepWithError } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + error: triggerError, + testRun: true, + }); + + return { executionStep: triggerExecutionStepWithError }; + } + + const firstTriggerDataItem = data[0]; + + const { executionId, executionStep: triggerExecutionStep } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + triggerDataItem: firstTriggerDataItem, + testRun: true, + }); + + if (triggerStep.id === untilStep.id) { + return { executionStep: triggerExecutionStep }; + } + + for (const actionStep of actionSteps) { + const { executionStep: actionExecutionStep } = await processAction({ + flowId: flow.id, + stepId: actionStep.id, + executionId, + }); + + if (actionStep.id === untilStep.id || actionExecutionStep.isFailed) { + return { executionStep: actionExecutionStep }; + } + } +}; + +export default testRun; diff --git a/packages/backend/src/services/trigger.ts b/packages/backend/src/services/trigger.ts new file mode 100644 index 00000000..5bf92185 --- /dev/null +++ b/packages/backend/src/services/trigger.ts @@ -0,0 +1,46 @@ +import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; +import Step from '../models/step'; +import Flow from '../models/flow'; +import Execution from '../models/execution'; +import globalVariable from '../helpers/global-variable'; + +type ProcessTriggerOptions = { + flowId: string; + stepId: string; + triggerDataItem?: ITriggerDataItem; + error?: IJSONObject; + testRun?: boolean; +}; + +export const processTrigger = async (options: ProcessTriggerOptions) => { + const { flowId, stepId, triggerDataItem, error, testRun } = options; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + }); + + // check if we already process this trigger data item or not! + + const execution = await Execution.query().insert({ + flowId: $.flow.id, + testRun, + internalId: triggerDataItem?.meta.internalId, + }); + + const executionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: $.step.id, + status: error ? 'failure' : 'success', + dataIn: $.step.parameters, + dataOut: !error ? triggerDataItem.raw : null, + errorDetails: error, + }); + + return { flowId, stepId, executionId: execution.id, executionStep }; +}; diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index 242562f0..a69b069b 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -1,5 +1,7 @@ import './config/orm'; -export { worker } from './workers/processor'; +import './workers/flow'; +import './workers/trigger'; +import './workers/action'; import telemetry from './helpers/telemetry'; telemetry.setServiceType('worker'); diff --git a/packages/backend/src/workers/action.ts b/packages/backend/src/workers/action.ts new file mode 100644 index 00000000..75d9bd86 --- /dev/null +++ b/packages/backend/src/workers/action.ts @@ -0,0 +1,51 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; +import Step from '../models/step'; +import actionQueue from '../queues/action'; +import { processAction } from '../services/action'; + +type JobData = { + flowId: string; + executionId: string; + stepId: string; +}; + +export const worker = new Worker( + 'action', + async (job) => { + const { stepId, flowId, executionId } = await processAction( + job.data as JobData + ); + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); + + if (!nextStep) return; + + const jobName = `${executionId}-${nextStep.id}`; + + const jobPayload = { + flowId, + executionId, + stepId: nextStep.id, + }; + + await actionQueue.add(jobName, jobPayload); + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed22 to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/backend/src/workers/flow.ts b/packages/backend/src/workers/flow.ts new file mode 100644 index 00000000..917583cf --- /dev/null +++ b/packages/backend/src/workers/flow.ts @@ -0,0 +1,57 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; +import triggerQueue from '../queues/trigger'; +import { processFlow } from '../services/flow'; +import Flow from '../models/flow'; + +export const worker = new Worker( + 'flow', + async (job) => { + const { flowId } = job.data; + + const flow = await Flow.query().findById(flowId).throwIfNotFound(); + const triggerStep = await flow.getTriggerStep(); + + const { data, error } = await processFlow({ flowId }); + + for (const triggerDataItem of data) { + const jobName = `${triggerStep.id}-${triggerDataItem.meta.internalId}`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + triggerDataItem, + }; + + await triggerQueue.add(jobName, jobPayload); + } + + if (error) { + const jobName = `${triggerStep.id}-error`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + error, + }; + + await triggerQueue.add(jobName, jobPayload); + } + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/backend/src/workers/processor.ts b/packages/backend/src/workers/processor.ts deleted file mode 100644 index 17c24e52..00000000 --- a/packages/backend/src/workers/processor.ts +++ /dev/null @@ -1,30 +0,0 @@ -import { Worker } from 'bullmq'; -import Processor from '../services/processor'; -import redisConfig from '../config/redis'; -import Flow from '../models/flow'; -import logger from '../helpers/logger'; - -export const worker = new Worker( - 'processor', - async (job) => { - const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound(); - const data = await new Processor(flow, { testRun: false }).run(); - - return data; - }, - { connection: redisConfig } -); - -worker.on('completed', (job) => { - logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has completed!`); -}); - -worker.on('failed', (job, err) => { - logger.info( - `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed with ${err.message}` - ); -}); - -process.on('SIGTERM', async () => { - await worker.close(); -}); diff --git a/packages/backend/src/workers/trigger.ts b/packages/backend/src/workers/trigger.ts new file mode 100644 index 00000000..f82f568d --- /dev/null +++ b/packages/backend/src/workers/trigger.ts @@ -0,0 +1,52 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; +import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; +import actionQueue from '../queues/action'; +import Step from '../models/step'; +import { processTrigger } from '../services/trigger'; + +type JobData = { + flowId: string; + stepId: string; + triggerDataItem?: ITriggerDataItem; + error?: IJSONObject; +}; + +export const worker = new Worker( + 'trigger', + async (job) => { + const { flowId, executionId, stepId, executionStep } = await processTrigger( + job.data as JobData + ); + + if (executionStep.isFailed) return; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); + const jobName = `${executionId}-${nextStep.id}`; + + const jobPayload = { + flowId, + executionId, + stepId: nextStep.id, + }; + + await actionQueue.add(jobName, jobPayload); + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 43898860..61452076 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -172,9 +172,9 @@ export interface IData { } export interface IAuth { - createAuthData($: IGlobalVariable): Promise, - verifyCredentials($: IGlobalVariable): Promise, - isStillVerified($: IGlobalVariable): Promise, + createAuthData($: IGlobalVariable): Promise; + verifyCredentials($: IGlobalVariable): Promise; + isStillVerified($: IGlobalVariable): Promise; fields: IField[]; authenticationSteps: IAuthenticationStep[]; reconnectionSteps: IAuthenticationStep[]; @@ -187,23 +187,47 @@ export interface IService { data?: any; } +export interface ITriggerOutput { + data: ITriggerDataItem[]; + error?: IJSONObject; +} + +export interface ITriggerDataItem { + raw: IJSONObject; + meta: { + internalId: string; + }; +} + export interface ITrigger { name: string; - key: string, + key: string; pollInterval: number; description: string; + dedupeStrategy: 'greatest' | 'unique' | 'last'; substeps: ISubstep[]; - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]): string; - run($: IGlobalVariable): Promise<{ data: IJSONObject[], error: IJSONObject | null }>; - testRun($: IGlobalVariable, startTime?: Date): Promise<{ data: IJSONObject[], error: IJSONObject | null }>; + getInterval(parameters: IGlobalVariable['step']['parameters']): string; + run($: IGlobalVariable): Promise; + testRun($: IGlobalVariable): Promise; +} + +export interface IActionOutput { + data: IActionDataItem; + error?: IJSONObject; +} + +export interface IActionDataItem { + raw: { + data?: IJSONObject; + }; } export interface IAction { name: string; - key: string, + key: string; description: string; substeps: ISubstep[]; - run($: IGlobalVariable): Promise<{ data: IJSONObject, error: IJSONObject | null }>; + run($: IGlobalVariable): Promise; } export interface IAuthentication { @@ -229,14 +253,26 @@ export type IGlobalVariable = { }; app: IApp; http: IHttpClient; - db: { - flow: { - lastInternalId: string; - }; - step: { - parameters: IJSONObject; - } + flow?: { + id: string; + lastInternalId: string; + isAlreadyProcessed?: (internalId: string) => boolean; }; + step?: { + id: string; + appKey: string; + parameters: IJSONObject; + }; + nextStep?: { + id: string; + appKey: string; + parameters: IJSONObject; + }; + execution?: { + id: string; + testRun: boolean; + } + process?: (triggerDataItem: ITriggerDataItem) => Promise; }; declare module 'axios' {