From d9192f6e6b47ba48190d061d1cf8e8a75591b639 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Wed, 12 Oct 2022 21:10:44 +0200 Subject: [PATCH 1/5] refactor: Introduce IActionOutput and ITriggerOutput types --- .../scheduler/triggers/every-day/index.ts | 82 +++++----- .../scheduler/triggers/every-hour/index.ts | 32 ++-- .../scheduler/triggers/every-month/index.ts | 145 ++++++++++-------- .../scheduler/triggers/every-week/index.ts | 97 ++++++------ .../actions/find-message/find-message.ts | 25 +-- .../apps/slack/actions/find-message/index.ts | 2 +- .../send-a-message-to-channel/index.ts | 4 +- .../send-a-message-to-channel/post-message.ts | 18 +-- .../apps/twitter/common/get-user-followers.ts | 17 +- .../apps/twitter/common/get-user-tweets.ts | 19 ++- .../apps/twitter/triggers/my-tweets/index.ts | 2 +- .../triggers/new-follower-of-me/index.ts | 2 +- .../twitter/triggers/search-tweets/index.ts | 6 +- .../triggers/search-tweets/search-tweets.ts | 21 ++- .../twitter/triggers/user-tweets/index.ts | 6 +- .../src/graphql/mutations/create-auth-data.ts | 2 +- .../graphql/mutations/verify-connection.ts | 2 +- .../backend/src/graphql/queries/get-data.ts | 2 +- .../src/graphql/queries/test-connection.ts | 5 +- .../backend/src/helpers/global-variable.ts | 30 ++-- packages/backend/src/services/processor.ts | 34 ++-- packages/types/index.d.ts | 53 +++++-- 22 files changed, 327 insertions(+), 279 deletions(-) diff --git a/packages/backend/src/apps/scheduler/triggers/every-day/index.ts b/packages/backend/src/apps/scheduler/triggers/every-day/index.ts index b765cb75..b92de092 100644 --- a/packages/backend/src/apps/scheduler/triggers/every-day/index.ts +++ b/packages/backend/src/apps/scheduler/triggers/every-day/index.ts @@ -24,13 +24,13 @@ export default { options: [ { label: 'Yes', - value: true + value: true, }, { label: 'No', - value: false - } - ] + value: false, + }, + ], }, { label: 'Time of day', @@ -42,111 +42,111 @@ export default { options: [ { label: '00:00', - value: 0 + value: 0, }, { label: '01:00', - value: 1 + value: 1, }, { label: '02:00', - value: 2 + value: 2, }, { label: '03:00', - value: 3 + value: 3, }, { label: '04:00', - value: 4 + value: 4, }, { label: '05:00', - value: 5 + value: 5, }, { label: '06:00', - value: 6 + value: 6, }, { label: '07:00', - value: 7 + value: 7, }, { label: '08:00', - value: 8 + value: 8, }, { label: '09:00', - value: 9 + value: 9, }, { label: '10:00', - value: 10 + value: 10, }, { label: '11:00', - value: 11 + value: 11, }, { label: '12:00', - value: 12 + value: 12, }, { label: '13:00', - value: 13 + value: 13, }, { label: '14:00', - value: 14 + value: 14, }, { label: '15:00', - value: 15 + value: 15, }, { label: '16:00', - value: 16 + value: 16, }, { label: '17:00', - value: 17 + value: 17, }, { label: '18:00', - value: 18 + value: 18, }, { label: '19:00', - value: 19 + value: 19, }, { label: '20:00', - value: 20 + value: 20, }, { label: '21:00', - value: 21 + value: 21, }, { label: '22:00', - value: 22 + value: 22, }, { label: '23:00', - value: 23 - } - ] - } - ] + value: 23, + }, + ], + }, + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) { + getInterval(parameters: IGlobalVariable['step']['parameters']) { if (parameters.triggersOnWeekend as boolean) { return cronTimes.everyDayAt(parameters.hour as number); } @@ -156,14 +156,20 @@ export default { async run($: IGlobalVariable, startDateTime: Date) { const dateTime = DateTime.fromJSDate(startDateTime); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue; + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + dateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, async testRun($: IGlobalVariable) { - const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters)); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue; + const nextCronDateTime = getNextCronDateTime( + this.getInterval($.step.parameters) + ); + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + nextCronDateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, diff --git a/packages/backend/src/apps/scheduler/triggers/every-hour/index.ts b/packages/backend/src/apps/scheduler/triggers/every-hour/index.ts index 063e81d1..b0c11cd8 100644 --- a/packages/backend/src/apps/scheduler/triggers/every-hour/index.ts +++ b/packages/backend/src/apps/scheduler/triggers/every-hour/index.ts @@ -24,25 +24,25 @@ export default { options: [ { label: 'Yes', - value: true + value: true, }, { label: 'No', - value: false - } - ] - } - ] + value: false, + }, + ], + }, + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) { + getInterval(parameters: IGlobalVariable['step']['parameters']) { if (parameters.triggersOnWeekend) { - return cronTimes.everyHour + return cronTimes.everyHour; } return cronTimes.everyHourExcludingWeekends; @@ -50,14 +50,20 @@ export default { async run($: IGlobalVariable, startDateTime: Date) { const dateTime = DateTime.fromJSDate(startDateTime); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue; + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + dateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, async testRun($: IGlobalVariable) { - const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters)); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue; + const nextCronDateTime = getNextCronDateTime( + this.getInterval($.step.parameters) + ); + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + nextCronDateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, diff --git a/packages/backend/src/apps/scheduler/triggers/every-month/index.ts b/packages/backend/src/apps/scheduler/triggers/every-month/index.ts index d09a3935..5850503b 100644 --- a/packages/backend/src/apps/scheduler/triggers/every-month/index.ts +++ b/packages/backend/src/apps/scheduler/triggers/every-month/index.ts @@ -23,129 +23,129 @@ export default { options: [ { label: 1, - value: 1 + value: 1, }, { label: 2, - value: 2 + value: 2, }, { label: 3, - value: 3 + value: 3, }, { label: 4, - value: 4 + value: 4, }, { label: 5, - value: 5 + value: 5, }, { label: 6, - value: 6 + value: 6, }, { label: 7, - value: 7 + value: 7, }, { label: 8, - value: 8 + value: 8, }, { label: 9, - value: 9 + value: 9, }, { label: 10, - value: 10 + value: 10, }, { label: 11, - value: 11 + value: 11, }, { label: 12, - value: 12 + value: 12, }, { label: 13, - value: 13 + value: 13, }, { label: 14, - value: 14 + value: 14, }, { label: 15, - value: 15 + value: 15, }, { label: 16, - value: 16 + value: 16, }, { label: 17, - value: 17 + value: 17, }, { label: 18, - value: 18 + value: 18, }, { label: 19, - value: 19 + value: 19, }, { label: 20, - value: 20 + value: 20, }, { label: 21, - value: 21 + value: 21, }, { label: 22, - value: 22 + value: 22, }, { label: 23, - value: 23 + value: 23, }, { label: 24, - value: 24 + value: 24, }, { label: 25, - value: 25 + value: 25, }, { label: 26, - value: 26 + value: 26, }, { label: 27, - value: 27 + value: 27, }, { label: 28, - value: 28 + value: 28, }, { label: 29, - value: 29 + value: 29, }, { label: 30, - value: 30 + value: 30, }, { label: 31, - value: 31 - } - ] + value: 31, + }, + ], }, { label: 'Time of day', @@ -157,126 +157,135 @@ export default { options: [ { label: '00:00', - value: 0 + value: 0, }, { label: '01:00', - value: 1 + value: 1, }, { label: '02:00', - value: 2 + value: 2, }, { label: '03:00', - value: 3 + value: 3, }, { label: '04:00', - value: 4 + value: 4, }, { label: '05:00', - value: 5 + value: 5, }, { label: '06:00', - value: 6 + value: 6, }, { label: '07:00', - value: 7 + value: 7, }, { label: '08:00', - value: 8 + value: 8, }, { label: '09:00', - value: 9 + value: 9, }, { label: '10:00', - value: 10 + value: 10, }, { label: '11:00', - value: 11 + value: 11, }, { label: '12:00', - value: 12 + value: 12, }, { label: '13:00', - value: 13 + value: 13, }, { label: '14:00', - value: 14 + value: 14, }, { label: '15:00', - value: 15 + value: 15, }, { label: '16:00', - value: 16 + value: 16, }, { label: '17:00', - value: 17 + value: 17, }, { label: '18:00', - value: 18 + value: 18, }, { label: '19:00', - value: 19 + value: 19, }, { label: '20:00', - value: 20 + value: 20, }, { label: '21:00', - value: 21 + value: 21, }, { label: '22:00', - value: 22 + value: 22, }, { label: '23:00', - value: 23 - } - ] - } - ] + value: 23, + }, + ], + }, + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) { - const interval = cronTimes.everyMonthOnAndAt(parameters.day as number, parameters.hour as number); + getInterval(parameters: IGlobalVariable['step']['parameters']) { + const interval = cronTimes.everyMonthOnAndAt( + parameters.day as number, + parameters.hour as number + ); return interval; }, async run($: IGlobalVariable, startDateTime: Date) { const dateTime = DateTime.fromJSDate(startDateTime); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue; + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + dateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, async testRun($: IGlobalVariable) { - const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters)); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue; + const nextCronDateTime = getNextCronDateTime( + this.getInterval($.step.parameters) + ); + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + nextCronDateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, diff --git a/packages/backend/src/apps/scheduler/triggers/every-week/index.ts b/packages/backend/src/apps/scheduler/triggers/every-week/index.ts index 0fa3b4ab..878b005b 100644 --- a/packages/backend/src/apps/scheduler/triggers/every-week/index.ts +++ b/packages/backend/src/apps/scheduler/triggers/every-week/index.ts @@ -23,33 +23,33 @@ export default { options: [ { label: 'Monday', - value: 1 + value: 1, }, { label: 'Tuesday', - value: 2 + value: 2, }, { label: 'Wednesday', - value: 3 + value: 3, }, { label: 'Thursday', - value: 4 + value: 4, }, { label: 'Friday', - value: 5 + value: 5, }, { label: 'Saturday', - value: 6 + value: 6, }, { label: 'Sunday', - value: 0 - } - ] + value: 0, + }, + ], }, { label: 'Time of day', @@ -61,126 +61,135 @@ export default { options: [ { label: '00:00', - value: 0 + value: 0, }, { label: '01:00', - value: 1 + value: 1, }, { label: '02:00', - value: 2 + value: 2, }, { label: '03:00', - value: 3 + value: 3, }, { label: '04:00', - value: 4 + value: 4, }, { label: '05:00', - value: 5 + value: 5, }, { label: '06:00', - value: 6 + value: 6, }, { label: '07:00', - value: 7 + value: 7, }, { label: '08:00', - value: 8 + value: 8, }, { label: '09:00', - value: 9 + value: 9, }, { label: '10:00', - value: 10 + value: 10, }, { label: '11:00', - value: 11 + value: 11, }, { label: '12:00', - value: 12 + value: 12, }, { label: '13:00', - value: 13 + value: 13, }, { label: '14:00', - value: 14 + value: 14, }, { label: '15:00', - value: 15 + value: 15, }, { label: '16:00', - value: 16 + value: 16, }, { label: '17:00', - value: 17 + value: 17, }, { label: '18:00', - value: 18 + value: 18, }, { label: '19:00', - value: 19 + value: 19, }, { label: '20:00', - value: 20 + value: 20, }, { label: '21:00', - value: 21 + value: 21, }, { label: '22:00', - value: 22 + value: 22, }, { label: '23:00', - value: 23 - } - ] - } - ] + value: 23, + }, + ], + }, + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]) { - const interval = cronTimes.everyWeekOnAndAt(parameters.weekday as number, parameters.hour as number); + getInterval(parameters: IGlobalVariable['step']['parameters']) { + const interval = cronTimes.everyWeekOnAndAt( + parameters.weekday as number, + parameters.hour as number + ); return interval; }, async run($: IGlobalVariable, startDateTime: Date) { const dateTime = DateTime.fromJSDate(startDateTime); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(dateTime) as IJSONValue; + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + dateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, async testRun($: IGlobalVariable) { - const nextCronDateTime = getNextCronDateTime(this.getInterval($.db.step.parameters)); - const dateTimeObjectRepresentation = getDateTimeObjectRepresentation(nextCronDateTime) as IJSONValue; + const nextCronDateTime = getNextCronDateTime( + this.getInterval($.step.parameters) + ); + const dateTimeObjectRepresentation = getDateTimeObjectRepresentation( + nextCronDateTime + ) as IJSONValue; return { data: [dateTimeObjectRepresentation] }; }, diff --git a/packages/backend/src/apps/slack/actions/find-message/find-message.ts b/packages/backend/src/apps/slack/actions/find-message/find-message.ts index 57c93041..31206bb6 100644 --- a/packages/backend/src/apps/slack/actions/find-message/find-message.ts +++ b/packages/backend/src/apps/slack/actions/find-message/find-message.ts @@ -1,4 +1,4 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { IGlobalVariable, IActionOutput } from '@automatisch/types'; type FindMessageOptions = { query: string; @@ -8,11 +8,6 @@ type FindMessageOptions = { }; const findMessage = async ($: IGlobalVariable, options: FindMessageOptions) => { - const message: { - data?: IJSONObject; - error?: IJSONObject; - } = {}; - const headers = { Authorization: `Bearer ${$.auth.data.accessToken}`, }; @@ -29,20 +24,14 @@ const findMessage = async ($: IGlobalVariable, options: FindMessageOptions) => { params, }); - if (response.integrationError) { - message.error = response.integrationError; - return message; - } - const data = response.data; - if (!data.ok) { - message.error = data; - return message; - } - - const messages = data.messages.matches; - message.data = messages?.[0]; + const message: IActionOutput = { + data: { + raw: data?.data?.messages.matches[0], + }, + error: response?.integrationError || (!data.ok && data), + }; return message; }; diff --git a/packages/backend/src/apps/slack/actions/find-message/index.ts b/packages/backend/src/apps/slack/actions/find-message/index.ts index c6132e2c..26d7a333 100644 --- a/packages/backend/src/apps/slack/actions/find-message/index.ts +++ b/packages/backend/src/apps/slack/actions/find-message/index.ts @@ -72,7 +72,7 @@ export default { ], async run($: IGlobalVariable) { - const parameters = $.db.step.parameters; + const parameters = $.step.parameters; const query = parameters.query as string; const sortBy = parameters.sortBy as string; const sortDirection = parameters.sortDirection as string; diff --git a/packages/backend/src/apps/slack/actions/send-a-message-to-channel/index.ts b/packages/backend/src/apps/slack/actions/send-a-message-to-channel/index.ts index d2033c1e..d5806108 100644 --- a/packages/backend/src/apps/slack/actions/send-a-message-to-channel/index.ts +++ b/packages/backend/src/apps/slack/actions/send-a-message-to-channel/index.ts @@ -49,8 +49,8 @@ export default { ], async run($: IGlobalVariable) { - const channelId = $.db.step.parameters.channel as string; - const text = $.db.step.parameters.message as string; + const channelId = $.step.parameters.channel as string; + const text = $.step.parameters.message as string; const message = await postMessage($, channelId, text); diff --git a/packages/backend/src/apps/slack/actions/send-a-message-to-channel/post-message.ts b/packages/backend/src/apps/slack/actions/send-a-message-to-channel/post-message.ts index 8a499767..40d9969e 100644 --- a/packages/backend/src/apps/slack/actions/send-a-message-to-channel/post-message.ts +++ b/packages/backend/src/apps/slack/actions/send-a-message-to-channel/post-message.ts @@ -1,18 +1,10 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { IGlobalVariable, IActionOutput } from '@automatisch/types'; const postMessage = async ( $: IGlobalVariable, channelId: string, text: string ) => { - const message: { - data: IJSONObject | null | undefined; - error: IJSONObject | null | undefined; - } = { - data: null, - error: null, - }; - const headers = { Authorization: `Bearer ${$.auth.data.accessToken}`, }; @@ -24,8 +16,12 @@ const postMessage = async ( const response = await $.http.post('/chat.postMessage', params, { headers }); - message.error = response?.integrationError; - message.data = response?.data?.message; + const message: IActionOutput = { + data: { + raw: response?.data?.message, + }, + error: response?.integrationError, + }; if (response.data.ok === false) { message.error = response.data; diff --git a/packages/backend/src/apps/twitter/common/get-user-followers.ts b/packages/backend/src/apps/twitter/common/get-user-followers.ts index 0d4e1e64..8f2563fe 100644 --- a/packages/backend/src/apps/twitter/common/get-user-followers.ts +++ b/packages/backend/src/apps/twitter/common/get-user-followers.ts @@ -1,4 +1,8 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { + IGlobalVariable, + IJSONObject, + ITriggerOutput, +} from '@automatisch/types'; import { URLSearchParams } from 'url'; import { omitBy, isEmpty } from 'lodash'; import generateRequest from './generate-request'; @@ -14,12 +18,8 @@ const getUserFollowers = async ( ) => { let response; - const followers: { - data: IJSONObject[]; - error: IJSONObject | null; - } = { + const followers: ITriggerOutput = { data: [], - error: null, }; do { @@ -54,7 +54,10 @@ const getUserFollowers = async ( !options.lastInternalId || Number(tweet.id) > Number(options.lastInternalId) ) { - followers.data.push(tweet); + followers.data.push({ + raw: tweet, + meta: { internalId: tweet.id as string }, + }); } else { return; } diff --git a/packages/backend/src/apps/twitter/common/get-user-tweets.ts b/packages/backend/src/apps/twitter/common/get-user-tweets.ts index 901d22a3..67cfcade 100644 --- a/packages/backend/src/apps/twitter/common/get-user-tweets.ts +++ b/packages/backend/src/apps/twitter/common/get-user-tweets.ts @@ -1,4 +1,8 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { + IGlobalVariable, + IJSONObject, + ITriggerOutput, +} from '@automatisch/types'; import { URLSearchParams } from 'url'; import omitBy from 'lodash/omitBy'; import isEmpty from 'lodash/isEmpty'; @@ -22,19 +26,15 @@ const getUserTweets = async ( const currentUser = await getCurrentUser($); username = currentUser.username as string; } else { - username = $.db.step.parameters.username as string; + username = $.step.parameters.username as string; } const user = await getUserByUsername($, username); let response; - const tweets: { - data: IJSONObject[]; - error: IJSONObject | null; - } = { + const tweets: ITriggerOutput = { data: [], - error: null, }; do { @@ -65,7 +65,10 @@ const getUserTweets = async ( !options.lastInternalId || Number(tweet.id) > Number(options.lastInternalId) ) { - tweets.data.push(tweet); + tweets.data.push({ + raw: tweet, + meta: { internalId: tweet.id as string }, + }); } else { return; } diff --git a/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts index e6cb95a0..591e6a28 100644 --- a/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts @@ -20,7 +20,7 @@ export default { async run($: IGlobalVariable) { return await getUserTweets($, { currentUser: true, - lastInternalId: $.db.flow.lastInternalId, + lastInternalId: $.flow.lastInternalId, }); }, diff --git a/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts b/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts index c9b4be84..3bc6335d 100644 --- a/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts +++ b/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts @@ -18,7 +18,7 @@ export default { ], async run($: IGlobalVariable) { - return await myFollowers($, $.db.flow.lastInternalId); + return await myFollowers($, $.flow.lastInternalId); }, async testRun($: IGlobalVariable) { diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts index 0f67d908..67f4e5a6 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts @@ -32,14 +32,14 @@ export default { async run($: IGlobalVariable) { return await searchTweets($, { - searchTerm: $.db.step.parameters.searchTerm as string, - lastInternalId: $.db.flow.lastInternalId, + searchTerm: $.step.parameters.searchTerm as string, + lastInternalId: $.flow.lastInternalId, }); }, async testRun($: IGlobalVariable) { return await searchTweets($, { - searchTerm: $.db.step.parameters.searchTerm as string, + searchTerm: $.step.parameters.searchTerm as string, }); }, }; diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index 8e3886c6..a7559bb3 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -1,4 +1,8 @@ -import { IGlobalVariable, IJSONObject } from '@automatisch/types'; +import { + IGlobalVariable, + IJSONObject, + ITriggerOutput, +} from '@automatisch/types'; import qs from 'qs'; import generateRequest from '../../common/generate-request'; import { omitBy, isEmpty } from 'lodash'; @@ -14,12 +18,8 @@ const searchTweets = async ( ) => { let response; - const tweets: { - data: IJSONObject[]; - error: IJSONObject | null; - } = { + const tweets: ITriggerOutput = { data: [], - error: null, }; do { @@ -56,7 +56,14 @@ const searchTweets = async ( !options.lastInternalId || Number(tweet.id) > Number(options.lastInternalId) ) { - tweets.data.push(tweet); + const dataItem = { + raw: tweet, + meta: { + internalId: tweet.id as string, + }, + }; + + tweets.data.push(dataItem); } else { return; } diff --git a/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts index 23517c9e..a116e6fe 100644 --- a/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts @@ -32,15 +32,15 @@ export default { async run($: IGlobalVariable) { return await getUserTweets($, { currentUser: false, - userId: $.db.step.parameters.username as string, - lastInternalId: $.db.flow.lastInternalId, + userId: $.step.parameters.username as string, + lastInternalId: $.flow.lastInternalId, }); }, async testRun($: IGlobalVariable) { return await getUserTweets($, { currentUser: false, - userId: $.db.step.parameters.username as string, + userId: $.step.parameters.username as string, }); }, }; diff --git a/packages/backend/src/graphql/mutations/create-auth-data.ts b/packages/backend/src/graphql/mutations/create-auth-data.ts index 91ff8d60..fd3e020b 100644 --- a/packages/backend/src/graphql/mutations/create-auth-data.ts +++ b/packages/backend/src/graphql/mutations/create-auth-data.ts @@ -29,7 +29,7 @@ const createAuthData = async ( .default; const app = await App.findOneByKey(connection.key); - const $ = await globalVariable(connection, app); + const $ = await globalVariable({ connection, app }); await authInstance.createAuthData($); try { diff --git a/packages/backend/src/graphql/mutations/verify-connection.ts b/packages/backend/src/graphql/mutations/verify-connection.ts index 081752b9..61f119d3 100644 --- a/packages/backend/src/graphql/mutations/verify-connection.ts +++ b/packages/backend/src/graphql/mutations/verify-connection.ts @@ -21,7 +21,7 @@ const verifyConnection = async ( .throwIfNotFound(); const app = await App.findOneByKey(connection.key); - const $ = await globalVariable(connection, app); + const $ = await globalVariable({ connection, app }); await app.auth.verifyCredentials($); connection = await connection.$query().patchAndFetch({ diff --git a/packages/backend/src/graphql/queries/get-data.ts b/packages/backend/src/graphql/queries/get-data.ts index 5b158869..e8ea2acd 100644 --- a/packages/backend/src/graphql/queries/get-data.ts +++ b/packages/backend/src/graphql/queries/get-data.ts @@ -25,7 +25,7 @@ const getData = async (_parent: unknown, params: Params, context: Context) => { if (!connection || !step.appKey) return null; const app = await App.findOneByKey(step.appKey); - const $ = await globalVariable(connection, app, step.flow, step); + const $ = await globalVariable({ connection, app, flow: step.flow, step }); const command = app.data.find((data: IData) => data.key === params.key); diff --git a/packages/backend/src/graphql/queries/test-connection.ts b/packages/backend/src/graphql/queries/test-connection.ts index 5aa90fd1..1fa4d036 100644 --- a/packages/backend/src/graphql/queries/test-connection.ts +++ b/packages/backend/src/graphql/queries/test-connection.ts @@ -20,10 +20,9 @@ const testConnection = async ( .throwIfNotFound(); const app = await App.findOneByKey(connection.key, false); - const $ = await globalVariable(connection, app); + const $ = await globalVariable({ connection, app }); - const isStillVerified = - await app.auth.isStillVerified($); + const isStillVerified = await app.auth.isStillVerified($); connection = await connection.$query().patchAndFetch({ formattedData: connection.formattedData, diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 30a8f6be..7077d914 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -4,12 +4,18 @@ import Flow from '../models/flow'; import Step from '../models/step'; import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types'; +type GlobalVariableOptions = { + connection?: Connection; + app: IApp; + flow?: Flow; + step?: Step; +}; + const globalVariable = async ( - connection: Connection, - appData: IApp, - flow?: Flow, - currentStep?: Step + options: GlobalVariableOptions ): Promise => { + const { connection, app, flow, step } = options; + const lastInternalId = await flow?.lastInternalId(); return { @@ -28,15 +34,13 @@ const globalVariable = async ( }, data: connection?.formattedData, }, - app: appData, - http: createHttpClient({ baseURL: appData.baseUrl }), - db: { - flow: { - lastInternalId, - }, - step: { - parameters: currentStep?.parameters || {}, - }, + app: app, + http: createHttpClient({ baseURL: app.baseUrl }), + flow: { + lastInternalId, + }, + step: { + parameters: step?.parameters || {}, }, }; }; diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index 896cbf38..92bf2805 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -1,5 +1,5 @@ import get from 'lodash.get'; -import { IJSONObject } from '@automatisch/types'; +import { IActionOutput } from '@automatisch/types'; import App from '../models/app'; import Flow from '../models/flow'; @@ -61,8 +61,8 @@ class Processor { if (initialTriggerData.data.length > 1) { initialTriggerData.data = initialTriggerData.data.sort( - (item: IJSONObject, nextItem: IJSONObject) => { - return (item.id as number) - (nextItem.id as number); + (item, nextItem) => { + return (item.raw.id as number) - (nextItem.raw.id as number); } ); } @@ -73,7 +73,7 @@ class Processor { const execution = await Execution.query().insert({ flowId: this.flow.id, testRun: this.testRun, - internalId: data.id as string, + internalId: data.meta.internalId as string, }); executions.push(execution); @@ -81,12 +81,8 @@ class Processor { let previousExecutionStep: ExecutionStep; const priorExecutionSteps: ExecutionSteps = {}; - let fetchedActionData: { - data: IJSONObject | null; - error: IJSONObject | null; - } = { + let fetchedActionData: IActionOutput = { data: null, - error: null, }; for await (const step of steps) { @@ -105,12 +101,12 @@ class Processor { const clonedStep = Object.assign({}, step); clonedStep.parameters = computedParameters; - const $ = await globalVariable( - step.connection, + const $ = await globalVariable({ + connection: step.connection, app, - this.flow, - clonedStep - ); + flow: this.flow, + step: clonedStep, + }); if (!isTrigger && key) { const command = app.actions.find((action) => action.key === key); @@ -135,7 +131,7 @@ class Processor { stepId: id, status: 'success', dataIn: isTrigger ? rawParameters : computedParameters, - dataOut: isTrigger ? data : fetchedActionData.data, + dataOut: isTrigger ? data.raw : fetchedActionData.data.raw, }); priorExecutionSteps[id] = previousExecutionStep; @@ -176,12 +172,12 @@ class Processor { if (!step.appKey || !step.key) return null; const app = await App.findOneByKey(step.appKey); - const $ = await globalVariable( - step.connection, + const $ = await globalVariable({ + connection: step.connection, app, - this.flow, + flow: this.flow, step, - ) + }); const command = app.triggers.find((trigger) => trigger.key === step.key); diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 43898860..8925fa8d 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -172,9 +172,9 @@ export interface IData { } export interface IAuth { - createAuthData($: IGlobalVariable): Promise, - verifyCredentials($: IGlobalVariable): Promise, - isStillVerified($: IGlobalVariable): Promise, + createAuthData($: IGlobalVariable): Promise; + verifyCredentials($: IGlobalVariable): Promise; + isStillVerified($: IGlobalVariable): Promise; fields: IField[]; authenticationSteps: IAuthenticationStep[]; reconnectionSteps: IAuthenticationStep[]; @@ -187,23 +187,46 @@ export interface IService { data?: any; } +export interface ITriggerOutput { + data: ITriggerDataItem[]; + error?: IJSONObject; +} + +export interface ITriggerDataItem { + raw: IJSONObject; + meta: { + internalId: string; + }; +} + export interface ITrigger { name: string; - key: string, + key: string; pollInterval: number; description: string; substeps: ISubstep[]; - getInterval(parameters: IGlobalVariable["db"]["step"]["parameters"]): string; - run($: IGlobalVariable): Promise<{ data: IJSONObject[], error: IJSONObject | null }>; - testRun($: IGlobalVariable, startTime?: Date): Promise<{ data: IJSONObject[], error: IJSONObject | null }>; + getInterval(parameters: IGlobalVariable['step']['parameters']): string; + run($: IGlobalVariable): Promise; + testRun($: IGlobalVariable): Promise; +} + +export interface IActionOutput { + data: IActionDataItem; + error?: IJSONObject; +} + +export interface IActionDataItem { + raw: { + data?: IJSONObject; + }; } export interface IAction { name: string; - key: string, + key: string; description: string; substeps: ISubstep[]; - run($: IGlobalVariable): Promise<{ data: IJSONObject, error: IJSONObject | null }>; + run($: IGlobalVariable): Promise; } export interface IAuthentication { @@ -229,13 +252,11 @@ export type IGlobalVariable = { }; app: IApp; http: IHttpClient; - db: { - flow: { - lastInternalId: string; - }; - step: { - parameters: IJSONObject; - } + flow: { + lastInternalId: string; + }; + step: { + parameters: IJSONObject; }; }; From 3c3bb82e97976ed5e297cc68334a7b060e765eff Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Wed, 12 Oct 2022 23:16:25 +0200 Subject: [PATCH 2/5] refactor: Remove early exit strategy for twitter triggers --- .../apps/twitter/common/get-user-followers.ts | 21 +++++++--------- .../apps/twitter/common/get-user-tweets.ts | 19 ++++++-------- .../triggers/search-tweets/search-tweets.ts | 25 ++++++++----------- packages/backend/src/services/processor.ts | 8 ------ 4 files changed, 28 insertions(+), 45 deletions(-) diff --git a/packages/backend/src/apps/twitter/common/get-user-followers.ts b/packages/backend/src/apps/twitter/common/get-user-followers.ts index 8f2563fe..1874f61a 100644 --- a/packages/backend/src/apps/twitter/common/get-user-followers.ts +++ b/packages/backend/src/apps/twitter/common/get-user-followers.ts @@ -49,22 +49,19 @@ const getUserFollowers = async ( } if (response.data.meta.result_count > 0) { - response.data.data.forEach((tweet: IJSONObject) => { - if ( - !options.lastInternalId || - Number(tweet.id) > Number(options.lastInternalId) - ) { - followers.data.push({ - raw: tweet, - meta: { internalId: tweet.id as string }, - }); - } else { - return; - } + response.data.data.forEach((follower: IJSONObject) => { + followers.data.push({ + raw: follower, + meta: { internalId: follower.id as string }, + }); }); } } while (response.data.meta.next_token && options.lastInternalId); + followers.data.sort((follower, nextFollower) => { + return (follower.raw.id as number) - (nextFollower.raw.id as number); + }); + return followers; }; diff --git a/packages/backend/src/apps/twitter/common/get-user-tweets.ts b/packages/backend/src/apps/twitter/common/get-user-tweets.ts index 67cfcade..4cce058f 100644 --- a/packages/backend/src/apps/twitter/common/get-user-tweets.ts +++ b/packages/backend/src/apps/twitter/common/get-user-tweets.ts @@ -61,21 +61,18 @@ const getUserTweets = async ( if (response.data.meta.result_count > 0) { response.data.data.forEach((tweet: IJSONObject) => { - if ( - !options.lastInternalId || - Number(tweet.id) > Number(options.lastInternalId) - ) { - tweets.data.push({ - raw: tweet, - meta: { internalId: tweet.id as string }, - }); - } else { - return; - } + tweets.data.push({ + raw: tweet, + meta: { internalId: tweet.id as string }, + }); }); } } while (response.data.meta.next_token && options.lastInternalId); + tweets.data.sort((tweet, nextTweet) => { + return (tweet.raw.id as number) - (nextTweet.raw.id as number); + }); + return tweets; }; diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index a7559bb3..1c8125d7 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -52,25 +52,22 @@ const searchTweets = async ( if (response.data.meta.result_count > 0) { response.data.data.forEach((tweet: IJSONObject) => { - if ( - !options.lastInternalId || - Number(tweet.id) > Number(options.lastInternalId) - ) { - const dataItem = { - raw: tweet, - meta: { - internalId: tweet.id as string, - }, - }; + const dataItem = { + raw: tweet, + meta: { + internalId: tweet.id as string, + }, + }; - tweets.data.push(dataItem); - } else { - return; - } + tweets.data.push(dataItem); }); } } while (response.data.meta.next_token && options.lastInternalId); + tweets.data.sort((tweet, nextTweet) => { + return (tweet.raw.id as number) - (nextTweet.raw.id as number); + }); + return tweets; }; diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index 92bf2805..0656cceb 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -59,14 +59,6 @@ class Processor { initialTriggerData.data = [initialTriggerData.data[0]]; } - if (initialTriggerData.data.length > 1) { - initialTriggerData.data = initialTriggerData.data.sort( - (item, nextItem) => { - return (item.raw.id as number) - (nextItem.raw.id as number); - } - ); - } - const executions: Execution[] = []; for await (const data of initialTriggerData.data) { From 56a9aeece78b527fdda201f38f64fd8f3beacd1a Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Thu, 13 Oct 2022 18:45:01 +0200 Subject: [PATCH 3/5] refactor: Extract processor job into separate background jobs --- .../twitter/triggers/search-tweets/index.ts | 1 - .../triggers/search-tweets/search-tweets.ts | 4 +- .../src/graphql/mutations/execute-flow.ts | 46 +++-- .../graphql/mutations/update-flow-status.ts | 12 +- .../src/helpers/create-bull-board-handler.ts | 10 +- .../backend/src/helpers/global-variable.ts | 53 +++++- packages/backend/src/models/flow.ts | 10 ++ packages/backend/src/models/step.ts | 25 ++- packages/backend/src/queues/action.ts | 25 +++ .../src/queues/{processor.ts => flow.ts} | 8 +- packages/backend/src/queues/trigger.ts | 25 +++ packages/backend/src/services/processor.ts | 162 +----------------- packages/backend/src/worker.ts | 4 +- packages/backend/src/workers/action.ts | 99 +++++++++++ .../src/workers/{processor.ts => flow.ts} | 21 ++- packages/backend/src/workers/trigger.ts | 64 +++++++ packages/types/index.d.ts | 18 +- 17 files changed, 374 insertions(+), 213 deletions(-) create mode 100644 packages/backend/src/queues/action.ts rename packages/backend/src/queues/{processor.ts => flow.ts} (70%) create mode 100644 packages/backend/src/queues/trigger.ts create mode 100644 packages/backend/src/workers/action.ts rename packages/backend/src/workers/{processor.ts => flow.ts} (57%) create mode 100644 packages/backend/src/workers/trigger.ts diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts index 67f4e5a6..2eb450aa 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts @@ -33,7 +33,6 @@ export default { async run($: IGlobalVariable) { return await searchTweets($, { searchTerm: $.step.parameters.searchTerm as string, - lastInternalId: $.flow.lastInternalId, }); }, diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index 1c8125d7..d3fc6857 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -68,7 +68,9 @@ const searchTweets = async ( return (tweet.raw.id as number) - (nextTweet.raw.id as number); }); - return tweets; + for (const tweet of tweets.data) { + await $.process(tweet); + } }; export default searchTweets; diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index 063259af..39756abf 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -1,7 +1,6 @@ import Context from '../../types/express/context'; -import Processor from '../../services/processor'; // eslint-disable-next-line @typescript-eslint/no-unused-vars -import processorQueue from '../../queues/processor'; +import flowQueue from '../../queues/flow'; type Params = { input: { @@ -14,30 +13,25 @@ const executeFlow = async ( params: Params, context: Context ) => { - const untilStep = await context.currentUser - .$relatedQuery('steps') - .withGraphFetched('connection') - .findOne({ - 'steps.id': params.input.stepId, - }) - .throwIfNotFound(); - - const flow = await untilStep.$relatedQuery('flow'); - - const executionStep = await new Processor(flow, { - untilStep, - testRun: true, - }).run(); - - await untilStep.$query().patch({ - status: 'completed', - }); - - if (executionStep.errorDetails) { - throw new Error(JSON.stringify(executionStep.errorDetails)); - } - - return { data: executionStep.dataOut, step: untilStep }; + // const untilStep = await context.currentUser + // .$relatedQuery('steps') + // .withGraphFetched('connection') + // .findOne({ + // 'steps.id': params.input.stepId, + // }) + // .throwIfNotFound(); + // const flow = await untilStep.$relatedQuery('flow'); + // const executionStep = await new Processor(flow, { + // untilStep, + // testRun: true, + // }).run(); + // await untilStep.$query().patch({ + // status: 'completed', + // }); + // if (executionStep.errorDetails) { + // throw new Error(JSON.stringify(executionStep.errorDetails)); + // } + // return { data: executionStep.dataOut, step: untilStep }; }; export default executeFlow; diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 97e6a69e..711a4029 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -1,5 +1,5 @@ import Context from '../../types/express/context'; -import processorQueue from '../../queues/processor'; +import flowQueue from '../../queues/flow'; type Params = { input: { @@ -9,7 +9,7 @@ type Params = { }; const JOB_NAME = 'processorJob'; -const EVERY_15_MINUTES_CRON = '*/15 * * * *'; +const EVERY_15_MINUTES_CRON = '*/1 * * * *'; const updateFlowStatus = async ( _parent: unknown, @@ -32,7 +32,7 @@ const updateFlowStatus = async ( }); const triggerStep = await flow.getTriggerStep(); - const trigger = await triggerStep.getTrigger(); + const trigger = await triggerStep.getTriggerCommand(); const interval = trigger.getInterval?.(triggerStep.parameters); const repeatOptions = { cron: interval || EVERY_15_MINUTES_CRON, @@ -43,7 +43,7 @@ const updateFlowStatus = async ( published_at: new Date().toISOString(), }); - await processorQueue.add( + await flowQueue.add( JOB_NAME, { flowId: flow.id }, { @@ -52,10 +52,10 @@ const updateFlowStatus = async ( } ); } else { - const repeatableJobs = await processorQueue.getRepeatableJobs(); + const repeatableJobs = await flowQueue.getRepeatableJobs(); const job = repeatableJobs.find((job) => job.id === flow.id); - await processorQueue.removeRepeatableByKey(job.key); + await flowQueue.removeRepeatableByKey(job.key); } return flow; diff --git a/packages/backend/src/helpers/create-bull-board-handler.ts b/packages/backend/src/helpers/create-bull-board-handler.ts index 7525aaa9..c74199e7 100644 --- a/packages/backend/src/helpers/create-bull-board-handler.ts +++ b/packages/backend/src/helpers/create-bull-board-handler.ts @@ -1,13 +1,19 @@ import { ExpressAdapter } from '@bull-board/express'; import { createBullBoard } from '@bull-board/api'; import { BullMQAdapter } from '@bull-board/api/bullMQAdapter'; -import processorQueue from '../queues/processor'; +import flowQueue from '../queues/flow'; +import triggerQueue from '../queues/trigger'; +import actionQueue from '../queues/action'; const serverAdapter = new ExpressAdapter(); const createBullBoardHandler = async (serverAdapter: ExpressAdapter) => { createBullBoard({ - queues: [new BullMQAdapter(processorQueue)], + queues: [ + new BullMQAdapter(flowQueue), + new BullMQAdapter(triggerQueue), + new BullMQAdapter(actionQueue), + ], serverAdapter: serverAdapter, }); }; diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 7077d914..37783f8c 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -2,23 +2,37 @@ import createHttpClient from './http-client'; import Connection from '../models/connection'; import Flow from '../models/flow'; import Step from '../models/step'; -import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types'; +import Execution from '../models/execution'; +import { + IJSONObject, + IApp, + IGlobalVariable, + ITriggerDataItem, +} from '@automatisch/types'; +import triggerQueue from '../queues/trigger'; type GlobalVariableOptions = { connection?: Connection; app: IApp; flow?: Flow; step?: Step; + execution?: Execution; }; const globalVariable = async ( options: GlobalVariableOptions ): Promise => { - const { connection, app, flow, step } = options; + const { connection, app, flow, step, execution } = options; const lastInternalId = await flow?.lastInternalId(); - return { + const trigger = await step?.getTriggerCommand(); + const nextStep = await flow + ?.$relatedQuery('steps') + .where({ position: step.position + 1 }) + .first(); + + const variable: IGlobalVariable = { auth: { set: async (args: IJSONObject) => { if (connection) { @@ -37,12 +51,45 @@ const globalVariable = async ( app: app, http: createHttpClient({ baseURL: app.baseUrl }), flow: { + id: flow?.id, lastInternalId, }, step: { + id: step?.id, + appKey: step?.appKey, parameters: step?.parameters || {}, }, + nextStep: { + id: nextStep?.id, + appKey: nextStep?.appKey, + parameters: nextStep?.parameters || {}, + }, + execution: { + id: execution?.id, + }, }; + + variable.process = async (triggerDataItem: ITriggerDataItem) => { + const jobName = `${step.appKey}-${triggerDataItem.meta.internalId}`; + const jobPayload = { + $: variable, + triggerDataItem, + }; + + await triggerQueue.add(jobName, jobPayload); + }; + + if (trigger && trigger.dedupeStrategy === 'unique') { + const lastInternalIds = await flow?.lastInternalIds(); + + const isAlreadyProcessed = (internalId: string) => { + return lastInternalIds?.includes(internalId); + }; + + variable.flow.isAlreadyProcessed = isAlreadyProcessed; + } + + return variable; }; export default globalVariable; diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index db536431..fea848e1 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -12,6 +12,7 @@ class Flow extends Base { active: boolean; steps: Step[]; published_at: string; + executions?: Execution[]; static tableName = 'flows'; @@ -57,6 +58,15 @@ class Flow extends Base { return lastExecution ? (lastExecution as Execution).internalId : null; } + async lastInternalIds(itemCount = 50) { + const lastExecutions = await this.$relatedQuery('executions') + .select('internal_id') + .orderBy('created_at', 'desc') + .limit(itemCount); + + return lastExecutions.map((execution) => execution.internalId); + } + async $beforeUpdate( opt: ModelOptions, queryContext: QueryContext diff --git a/packages/backend/src/models/step.ts b/packages/backend/src/models/step.ts index b69f3d20..78c0d31d 100644 --- a/packages/backend/src/models/step.ts +++ b/packages/backend/src/models/step.ts @@ -92,16 +92,35 @@ class Step extends Base { return this.type === 'trigger'; } - async getTrigger() { - if (!this.isTrigger) return null; + get isAction(): boolean { + return this.type === 'action'; + } - const { appKey, key } = this; + async getApp() { + if (!this.appKey) return null; + + return await App.findOneByKey(this.appKey); + } + + async getTriggerCommand() { + const { appKey, key, isTrigger } = this; + if (!isTrigger || !appKey || !key) return null; const app = await App.findOneByKey(appKey); const command = app.triggers.find((trigger) => trigger.key === key); return command; } + + async getActionCommand() { + const { appKey, key, isAction } = this; + if (!isAction || !appKey || !key) return null; + + const app = await App.findOneByKey(appKey); + const command = app.actions.find((action) => action.key === key); + + return command; + } } export default Step; diff --git a/packages/backend/src/queues/action.ts b/packages/backend/src/queues/action.ts new file mode 100644 index 00000000..44d8b023 --- /dev/null +++ b/packages/backend/src/queues/action.ts @@ -0,0 +1,25 @@ +import process from 'process'; +import { Queue } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; + +const CONNECTION_REFUSED = 'ECONNREFUSED'; + +const redisConnection = { + connection: redisConfig, +}; + +const actionQueue = new Queue('action', redisConnection); + +process.on('SIGTERM', async () => { + await actionQueue.close(); +}); + +actionQueue.on('error', (err) => { + if ((err as any).code === CONNECTION_REFUSED) { + logger.error('Make sure you have installed Redis and it is running.', err); + process.exit(); + } +}); + +export default actionQueue; diff --git a/packages/backend/src/queues/processor.ts b/packages/backend/src/queues/flow.ts similarity index 70% rename from packages/backend/src/queues/processor.ts rename to packages/backend/src/queues/flow.ts index b9b13c37..9353d72e 100644 --- a/packages/backend/src/queues/processor.ts +++ b/packages/backend/src/queues/flow.ts @@ -9,18 +9,18 @@ const redisConnection = { connection: redisConfig, }; -const processorQueue = new Queue('processor', redisConnection); -const queueScheduler = new QueueScheduler('processor', redisConnection); +const flowQueue = new Queue('flow', redisConnection); +const queueScheduler = new QueueScheduler('flow', redisConnection); process.on('SIGTERM', async () => { await queueScheduler.close(); }); -processorQueue.on('error', (err) => { +flowQueue.on('error', (err) => { if ((err as any).code === CONNECTION_REFUSED) { logger.error('Make sure you have installed Redis and it is running.', err); process.exit(); } }); -export default processorQueue; +export default flowQueue; diff --git a/packages/backend/src/queues/trigger.ts b/packages/backend/src/queues/trigger.ts new file mode 100644 index 00000000..4535deb1 --- /dev/null +++ b/packages/backend/src/queues/trigger.ts @@ -0,0 +1,25 @@ +import process from 'process'; +import { Queue } from 'bullmq'; +import redisConfig from '../config/redis'; +import logger from '../helpers/logger'; + +const CONNECTION_REFUSED = 'ECONNREFUSED'; + +const redisConnection = { + connection: redisConfig, +}; + +const triggerQueue = new Queue('trigger', redisConnection); + +process.on('SIGTERM', async () => { + await triggerQueue.close(); +}); + +triggerQueue.on('error', (err) => { + if ((err as any).code === CONNECTION_REFUSED) { + logger.error('Make sure you have installed Redis and it is running.', err); + process.exit(); + } +}); + +export default triggerQueue; diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index 0656cceb..0942cc78 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -28,165 +28,9 @@ class Processor { this.testRun = processorOptions.testRun; } - async run() { - const steps = await this.flow - .$relatedQuery('steps') - .withGraphFetched('connection') - .orderBy('position', 'asc'); - - const triggerStep = steps.find((step) => step.type === 'trigger'); - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - const initialTriggerData = await this.getInitialTriggerData(triggerStep!); - - if (!initialTriggerData.error && initialTriggerData.data.length === 0) { - const lastInternalId = await this.flow.lastInternalId(); - - const executionData: Partial = { - flowId: this.flow.id, - testRun: this.testRun, - }; - - if (lastInternalId) { - executionData.internalId = lastInternalId; - } - - await Execution.query().insert(executionData); - - return; - } - - if (this.testRun && initialTriggerData.data.length > 0) { - initialTriggerData.data = [initialTriggerData.data[0]]; - } - - const executions: Execution[] = []; - - for await (const data of initialTriggerData.data) { - const execution = await Execution.query().insert({ - flowId: this.flow.id, - testRun: this.testRun, - internalId: data.meta.internalId as string, - }); - - executions.push(execution); - - let previousExecutionStep: ExecutionStep; - const priorExecutionSteps: ExecutionSteps = {}; - - let fetchedActionData: IActionOutput = { - data: null, - }; - - for await (const step of steps) { - if (!step.appKey) continue; - - const { appKey, key, type, parameters: rawParameters = {}, id } = step; - - const isTrigger = type === 'trigger'; - const app = await App.findOneByKey(appKey); - - const computedParameters = Processor.computeParameters( - rawParameters, - priorExecutionSteps - ); - - const clonedStep = Object.assign({}, step); - clonedStep.parameters = computedParameters; - - const $ = await globalVariable({ - connection: step.connection, - app, - flow: this.flow, - step: clonedStep, - }); - - if (!isTrigger && key) { - const command = app.actions.find((action) => action.key === key); - fetchedActionData = await command.run($); - } - - if (!isTrigger && fetchedActionData.error) { - await execution.$relatedQuery('executionSteps').insertAndFetch({ - stepId: id, - status: 'failure', - dataIn: null, - dataOut: computedParameters, - errorDetails: fetchedActionData.error, - }); - - break; - } - - previousExecutionStep = await execution - .$relatedQuery('executionSteps') - .insertAndFetch({ - stepId: id, - status: 'success', - dataIn: isTrigger ? rawParameters : computedParameters, - dataOut: isTrigger ? data.raw : fetchedActionData.data.raw, - }); - - priorExecutionSteps[id] = previousExecutionStep; - - if (id === this.untilStep?.id) { - break; - } - } - } - - if (initialTriggerData.error) { - const executionWithError = await Execution.query().insert({ - flowId: this.flow.id, - testRun: this.testRun, - }); - - executions.push(executionWithError); - - await executionWithError.$relatedQuery('executionSteps').insertAndFetch({ - stepId: triggerStep.id, - status: 'failure', - dataIn: triggerStep.parameters, - errorDetails: initialTriggerData.error, - }); - } - - if (!this.testRun) return; - - const lastExecutionStepFromFirstExecution = await executions[0] - .$relatedQuery('executionSteps') - .orderBy('created_at', 'desc') - .first(); - - return lastExecutionStepFromFirstExecution; - } - - async getInitialTriggerData(step: Step) { - if (!step.appKey || !step.key) return null; - - const app = await App.findOneByKey(step.appKey); - const $ = await globalVariable({ - connection: step.connection, - app, - flow: this.flow, - step, - }); - - const command = app.triggers.find((trigger) => trigger.key === step.key); - - let fetchedData; - - if (this.testRun) { - fetchedData = await command.testRun($); - } else { - fetchedData = await command.run($); - } - - return fetchedData; - } - static computeParameters( parameters: Step['parameters'], - executionSteps: ExecutionSteps + executionSteps: ExecutionStep[] ): Step['parameters'] { const entries = Object.entries(parameters); return entries.reduce((result, [key, value]: [string, unknown]) => { @@ -203,7 +47,9 @@ class Processor { ) as string; const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); const keyPath = keyPaths.join('.'); - const executionStep = executionSteps[stepId.toString() as string]; + const executionStep = executionSteps.find((executionStep) => { + return executionStep.stepId === stepId; + }); const data = executionStep?.dataOut; const dataValue = get(data, keyPath); return dataValue; diff --git a/packages/backend/src/worker.ts b/packages/backend/src/worker.ts index 242562f0..a69b069b 100644 --- a/packages/backend/src/worker.ts +++ b/packages/backend/src/worker.ts @@ -1,5 +1,7 @@ import './config/orm'; -export { worker } from './workers/processor'; +import './workers/flow'; +import './workers/trigger'; +import './workers/action'; import telemetry from './helpers/telemetry'; telemetry.setServiceType('worker'); diff --git a/packages/backend/src/workers/action.ts b/packages/backend/src/workers/action.ts new file mode 100644 index 00000000..f1022e1a --- /dev/null +++ b/packages/backend/src/workers/action.ts @@ -0,0 +1,99 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import Flow from '../models/flow'; +import logger from '../helpers/logger'; +import globalVariable from '../helpers/global-variable'; +import { IGlobalVariable } from '@automatisch/types'; +import Execution from '../models/execution'; +import Processor from '../services/processor'; +import ExecutionStep from '../models/execution-step'; +import Step from '../models/step'; +import actionQueue from '../queues/action'; + +type JobData = { + flowId: string; + executionId: string; + stepId: string; +}; + +export const worker = new Worker( + 'action', + async (job) => { + const { flowId, stepId, executionId } = job.data as JobData; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const execution = await Execution.query() + .findById(executionId) + .throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + execution: execution, + }); + + const priorExecutionSteps = await ExecutionStep.query().where({ + execution_id: $.execution.id, + }); + + const computedParameters = Processor.computeParameters( + $.step.parameters, + priorExecutionSteps + ); + + const actionCommand = await step.getActionCommand(); + + $.step.parameters = computedParameters; + const actionDataItem = await actionCommand.run($); + + await execution.$relatedQuery('executionSteps').insertAndFetch({ + stepId: $.step.id, + status: 'success', + dataIn: computedParameters, + dataOut: actionDataItem.data.raw, + }); + + // TODO: Add until step id logic here! + // TODO: Change job name for the action data item! + const jobName = `${$.step.appKey}-sample`; + + if (!$.nextStep.id) return; + + const nextStep = await Step.query() + .findById($.nextStep.id) + .throwIfNotFound(); + + console.log('hello world'); + + const variable = await globalVariable({ + flow: await Flow.query().findById($.flow.id), + app: await nextStep.getApp(), + step: nextStep, + connection: await nextStep.$relatedQuery('connection'), + execution: execution, + }); + + const jobPayload = { + $: variable, + }; + + await actionQueue.add(jobName, jobPayload); + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed22 to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/backend/src/workers/processor.ts b/packages/backend/src/workers/flow.ts similarity index 57% rename from packages/backend/src/workers/processor.ts rename to packages/backend/src/workers/flow.ts index 17c24e52..cd315fa3 100644 --- a/packages/backend/src/workers/processor.ts +++ b/packages/backend/src/workers/flow.ts @@ -1,27 +1,36 @@ import { Worker } from 'bullmq'; -import Processor from '../services/processor'; import redisConfig from '../config/redis'; import Flow from '../models/flow'; import logger from '../helpers/logger'; +import globalVariable from '../helpers/global-variable'; export const worker = new Worker( - 'processor', + 'flow', async (job) => { const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound(); - const data = await new Processor(flow, { testRun: false }).run(); - return data; + const triggerStep = await flow.getTriggerStep(); + const triggerCommand = await triggerStep.getTriggerCommand(); + + const $ = await globalVariable({ + flow, + connection: await triggerStep.$relatedQuery('connection'), + app: await triggerStep.getApp(), + step: triggerStep, + }); + + await triggerCommand.run($); }, { connection: redisConfig } ); worker.on('completed', (job) => { - logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has completed!`); + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); }); worker.on('failed', (job, err) => { logger.info( - `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed with ${err.message}` + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` ); }); diff --git a/packages/backend/src/workers/trigger.ts b/packages/backend/src/workers/trigger.ts new file mode 100644 index 00000000..29d07fde --- /dev/null +++ b/packages/backend/src/workers/trigger.ts @@ -0,0 +1,64 @@ +import { Worker } from 'bullmq'; +import redisConfig from '../config/redis'; +import Flow from '../models/flow'; +import logger from '../helpers/logger'; +import globalVariable from '../helpers/global-variable'; +import { ITriggerDataItem, IGlobalVariable } from '@automatisch/types'; +import Execution from '../models/execution'; +import actionQueue from '../queues/action'; +import Step from '../models/step'; + +type JobData = { + $: IGlobalVariable; + triggerDataItem: ITriggerDataItem; +}; + +export const worker = new Worker( + 'trigger', + async (job) => { + const { $, triggerDataItem } = job.data as JobData; + + // check if we already process this trigger data item or not! + + const execution = await Execution.query().insert({ + flowId: $.flow.id, + // TODO: Check the testRun logic and adjust following line! + testRun: true, + internalId: triggerDataItem.meta.internalId, + }); + + await execution.$relatedQuery('executionSteps').insertAndFetch({ + stepId: $.step.id, + status: 'success', + dataIn: $.step.parameters, + dataOut: triggerDataItem.raw, + }); + + const jobName = `${$.step.appKey}-${triggerDataItem.meta.internalId}`; + + const nextStep = await Step.query().findById($.nextStep.id); + + const jobPayload = { + flowId: $.flow.id, + executionId: execution.id, + stepId: nextStep.id, + }; + + await actionQueue.add(jobName, jobPayload); + }, + { connection: redisConfig } +); + +worker.on('completed', (job) => { + logger.info(`JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has started!`); +}); + +worker.on('failed', (job, err) => { + logger.info( + `JOB ID: ${job.id} - FLOW ID: ${job.data.flowId} has failed to start with ${err.message}` + ); +}); + +process.on('SIGTERM', async () => { + await worker.close(); +}); diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 8925fa8d..c63a751f 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -204,6 +204,7 @@ export interface ITrigger { key: string; pollInterval: number; description: string; + dedupeStrategy: 'greatest' | 'unique' | 'last'; substeps: ISubstep[]; getInterval(parameters: IGlobalVariable['step']['parameters']): string; run($: IGlobalVariable): Promise; @@ -252,12 +253,25 @@ export type IGlobalVariable = { }; app: IApp; http: IHttpClient; - flow: { + flow?: { + id: string; lastInternalId: string; + isAlreadyProcessed?: (internalId: string) => boolean; }; - step: { + step?: { + id: string; + appKey: string; parameters: IJSONObject; }; + nextStep?: { + id: string; + appKey: string; + parameters: IJSONObject; + }; + execution?: { + id: string; + } + process?: (triggerDataItem: ITriggerDataItem) => Promise; }; declare module 'axios' { From 628f8721809b57ab16217f197e4a052075198319 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Fri, 14 Oct 2022 20:18:58 +0200 Subject: [PATCH 4/5] refactor: Implement test run helper to work with services --- .../triggers/search-tweets/search-tweets.ts | 6 +- .../src/graphql/mutations/execute-flow.ts | 38 +++++----- .../graphql/mutations/update-flow-status.ts | 8 +- .../backend/src/helpers/compute-parameters.ts | 43 +++++++++++ .../backend/src/helpers/global-variable.ts | 22 +----- packages/backend/src/services/action.ts | 55 ++++++++++++++ packages/backend/src/services/flow.ts | 24 ++++++ packages/backend/src/services/processor.ts | 73 ------------------- packages/backend/src/services/test-run.ts | 65 +++++++++++++++++ packages/backend/src/services/trigger.ts | 46 ++++++++++++ packages/types/index.d.ts | 1 + 11 files changed, 262 insertions(+), 119 deletions(-) create mode 100644 packages/backend/src/helpers/compute-parameters.ts create mode 100644 packages/backend/src/services/action.ts create mode 100644 packages/backend/src/services/flow.ts delete mode 100644 packages/backend/src/services/processor.ts create mode 100644 packages/backend/src/services/test-run.ts create mode 100644 packages/backend/src/services/trigger.ts diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index d3fc6857..e0d25027 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -62,15 +62,13 @@ const searchTweets = async ( tweets.data.push(dataItem); }); } - } while (response.data.meta.next_token && options.lastInternalId); + } while (response.data.meta.next_token && !$.execution.testRun); tweets.data.sort((tweet, nextTweet) => { return (tweet.raw.id as number) - (nextTweet.raw.id as number); }); - for (const tweet of tweets.data) { - await $.process(tweet); - } + return tweets; }; export default searchTweets; diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index 39756abf..c9ac3344 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -1,6 +1,5 @@ import Context from '../../types/express/context'; -// eslint-disable-next-line @typescript-eslint/no-unused-vars -import flowQueue from '../../queues/flow'; +import testRun from '../../services/test-run'; type Params = { input: { @@ -13,25 +12,22 @@ const executeFlow = async ( params: Params, context: Context ) => { - // const untilStep = await context.currentUser - // .$relatedQuery('steps') - // .withGraphFetched('connection') - // .findOne({ - // 'steps.id': params.input.stepId, - // }) - // .throwIfNotFound(); - // const flow = await untilStep.$relatedQuery('flow'); - // const executionStep = await new Processor(flow, { - // untilStep, - // testRun: true, - // }).run(); - // await untilStep.$query().patch({ - // status: 'completed', - // }); - // if (executionStep.errorDetails) { - // throw new Error(JSON.stringify(executionStep.errorDetails)); - // } - // return { data: executionStep.dataOut, step: untilStep }; + const { stepId } = params.input; + const { executionStep } = await testRun({ stepId }); + + const untilStep = await context.currentUser + .$relatedQuery('steps') + .findById(stepId); + + await untilStep.$query().patch({ + status: 'completed', + }); + + if (executionStep.errorDetails) { + throw new Error(JSON.stringify(executionStep.errorDetails)); + } + + return { data: executionStep.dataOut, step: untilStep }; }; export default executeFlow; diff --git a/packages/backend/src/graphql/mutations/update-flow-status.ts b/packages/backend/src/graphql/mutations/update-flow-status.ts index 711a4029..a81b31e0 100644 --- a/packages/backend/src/graphql/mutations/update-flow-status.ts +++ b/packages/backend/src/graphql/mutations/update-flow-status.ts @@ -8,8 +8,8 @@ type Params = { }; }; -const JOB_NAME = 'processorJob'; -const EVERY_15_MINUTES_CRON = '*/1 * * * *'; +const JOB_NAME = 'flow'; +const EVERY_15_MINUTES_CRON = '*/15 * * * *'; const updateFlowStatus = async ( _parent: unknown, @@ -43,8 +43,10 @@ const updateFlowStatus = async ( published_at: new Date().toISOString(), }); + const jobName = `${JOB_NAME}-${flow.id}`; + await flowQueue.add( - JOB_NAME, + jobName, { flowId: flow.id }, { repeat: repeatOptions, diff --git a/packages/backend/src/helpers/compute-parameters.ts b/packages/backend/src/helpers/compute-parameters.ts new file mode 100644 index 00000000..c37328bd --- /dev/null +++ b/packages/backend/src/helpers/compute-parameters.ts @@ -0,0 +1,43 @@ +import Step from '../models/step'; +import ExecutionStep from '../models/execution-step'; +import get from 'lodash.get'; + +const variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g; + +export default function computeParameters( + parameters: Step['parameters'], + executionSteps: ExecutionStep[] +): Step['parameters'] { + const entries = Object.entries(parameters); + return entries.reduce((result, [key, value]: [string, unknown]) => { + if (typeof value === 'string') { + const parts = value.split(variableRegExp); + + const computedValue = parts + .map((part: string) => { + const isVariable = part.match(variableRegExp); + if (isVariable) { + const stepIdAndKeyPath = part.replace(/{{step.|}}/g, '') as string; + const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); + const keyPath = keyPaths.join('.'); + const executionStep = executionSteps.find((executionStep) => { + return executionStep.stepId === stepId; + }); + const data = executionStep?.dataOut; + const dataValue = get(data, keyPath); + return dataValue; + } + + return part; + }) + .join(''); + + return { + ...result, + [key]: computedValue, + }; + } + + return result; + }, {}); +} diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 37783f8c..934dbf94 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -3,13 +3,7 @@ import Connection from '../models/connection'; import Flow from '../models/flow'; import Step from '../models/step'; import Execution from '../models/execution'; -import { - IJSONObject, - IApp, - IGlobalVariable, - ITriggerDataItem, -} from '@automatisch/types'; -import triggerQueue from '../queues/trigger'; +import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types'; type GlobalVariableOptions = { connection?: Connection; @@ -17,12 +11,13 @@ type GlobalVariableOptions = { flow?: Flow; step?: Step; execution?: Execution; + testRun?: boolean; }; const globalVariable = async ( options: GlobalVariableOptions ): Promise => { - const { connection, app, flow, step, execution } = options; + const { connection, app, flow, step, execution, testRun = false } = options; const lastInternalId = await flow?.lastInternalId(); @@ -66,19 +61,10 @@ const globalVariable = async ( }, execution: { id: execution?.id, + testRun, }, }; - variable.process = async (triggerDataItem: ITriggerDataItem) => { - const jobName = `${step.appKey}-${triggerDataItem.meta.internalId}`; - const jobPayload = { - $: variable, - triggerDataItem, - }; - - await triggerQueue.add(jobName, jobPayload); - }; - if (trigger && trigger.dedupeStrategy === 'unique') { const lastInternalIds = await flow?.lastInternalIds(); diff --git a/packages/backend/src/services/action.ts b/packages/backend/src/services/action.ts new file mode 100644 index 00000000..bbac7990 --- /dev/null +++ b/packages/backend/src/services/action.ts @@ -0,0 +1,55 @@ +import Step from '../models/step'; +import Flow from '../models/flow'; +import Execution from '../models/execution'; +import ExecutionStep from '../models/execution-step'; +import computeParameters from '../helpers/compute-parameters'; +import globalVariable from '../helpers/global-variable'; + +type ProcessActionOptions = { + flowId: string; + executionId: string; + stepId: string; +}; + +export const processAction = async (options: ProcessActionOptions) => { + const { flowId, stepId, executionId } = options; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + const execution = await Execution.query() + .findById(executionId) + .throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + execution: execution, + }); + + const priorExecutionSteps = await ExecutionStep.query().where({ + execution_id: $.execution.id, + }); + + const computedParameters = computeParameters( + $.step.parameters, + priorExecutionSteps + ); + + const actionCommand = await step.getActionCommand(); + + $.step.parameters = computedParameters; + const actionOutput = await actionCommand.run($); + + const executionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: $.step.id, + status: actionOutput.error ? 'failure' : 'success', + dataIn: computedParameters, + dataOut: actionOutput.error ? null : actionOutput.data.raw, + errorDetails: actionOutput.error, + }); + + return { flowId, stepId, executionId, executionStep }; +}; diff --git a/packages/backend/src/services/flow.ts b/packages/backend/src/services/flow.ts new file mode 100644 index 00000000..c11473a2 --- /dev/null +++ b/packages/backend/src/services/flow.ts @@ -0,0 +1,24 @@ +import Flow from '../models/flow'; +import globalVariable from '../helpers/global-variable'; + +type ProcessFlowOptions = { + flowId: string; + testRun?: boolean; +}; + +export const processFlow = async (options: ProcessFlowOptions) => { + const flow = await Flow.query().findById(options.flowId).throwIfNotFound(); + + const triggerStep = await flow.getTriggerStep(); + const triggerCommand = await triggerStep.getTriggerCommand(); + + const $ = await globalVariable({ + flow, + connection: await triggerStep.$relatedQuery('connection'), + app: await triggerStep.getApp(), + step: triggerStep, + testRun: options.testRun, + }); + + return await triggerCommand.run($); +}; diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts deleted file mode 100644 index 0942cc78..00000000 --- a/packages/backend/src/services/processor.ts +++ /dev/null @@ -1,73 +0,0 @@ -import get from 'lodash.get'; -import { IActionOutput } from '@automatisch/types'; - -import App from '../models/app'; -import Flow from '../models/flow'; -import Step from '../models/step'; -import Execution from '../models/execution'; -import ExecutionStep from '../models/execution-step'; -import globalVariable from '../helpers/global-variable'; - -type ExecutionSteps = Record; - -type ProcessorOptions = { - untilStep?: Step; - testRun?: boolean; -}; - -class Processor { - flow: Flow; - untilStep?: Step; - testRun?: boolean; - - static variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g; - - constructor(flow: Flow, processorOptions: ProcessorOptions) { - this.flow = flow; - this.untilStep = processorOptions.untilStep; - this.testRun = processorOptions.testRun; - } - - static computeParameters( - parameters: Step['parameters'], - executionSteps: ExecutionStep[] - ): Step['parameters'] { - const entries = Object.entries(parameters); - return entries.reduce((result, [key, value]: [string, unknown]) => { - if (typeof value === 'string') { - const parts = value.split(Processor.variableRegExp); - - const computedValue = parts - .map((part: string) => { - const isVariable = part.match(Processor.variableRegExp); - if (isVariable) { - const stepIdAndKeyPath = part.replace( - /{{step.|}}/g, - '' - ) as string; - const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.'); - const keyPath = keyPaths.join('.'); - const executionStep = executionSteps.find((executionStep) => { - return executionStep.stepId === stepId; - }); - const data = executionStep?.dataOut; - const dataValue = get(data, keyPath); - return dataValue; - } - - return part; - }) - .join(''); - - return { - ...result, - [key]: computedValue, - }; - } - - return result; - }, {}); - } -} - -export default Processor; diff --git a/packages/backend/src/services/test-run.ts b/packages/backend/src/services/test-run.ts new file mode 100644 index 00000000..debc011d --- /dev/null +++ b/packages/backend/src/services/test-run.ts @@ -0,0 +1,65 @@ +import Step from '../models/step'; +import { processFlow } from '../services/flow'; +import { processTrigger } from '../services/trigger'; +import { processAction } from '../services/action'; + +type TestRunOptions = { + stepId: string; +}; + +const testRun = async (options: TestRunOptions) => { + const untilStep = await Step.query() + .findById(options.stepId) + .throwIfNotFound(); + + const flow = await untilStep.$relatedQuery('flow'); + const [triggerStep, ...actionSteps] = await flow + .$relatedQuery('steps') + .withGraphFetched('connection') + .orderBy('position', 'asc'); + + const { data, error: triggerError } = await processFlow({ + flowId: flow.id, + testRun: true, + }); + + const firstTriggerDataItem = data[0]; + + const { executionId, executionStep: triggerExecutionStep } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + triggerDataItem: firstTriggerDataItem, + testRun: true, + }); + + if (triggerError) { + const { executionStep: triggerExecutionStepWithError } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + error: triggerError, + testRun: true, + }); + + return { executionStep: triggerExecutionStepWithError }; + } + + if (triggerStep.id === untilStep.id) { + return { executionStep: triggerExecutionStep }; + } + + for (const actionStep of actionSteps) { + const { executionStep: actionExecutionStep } = await processAction({ + flowId: flow.id, + stepId: actionStep.id, + executionId, + }); + + if (actionStep.id === untilStep.id || actionExecutionStep.errorDetails) { + return { executionStep: actionExecutionStep }; + } + } +}; + +export default testRun; diff --git a/packages/backend/src/services/trigger.ts b/packages/backend/src/services/trigger.ts new file mode 100644 index 00000000..61ee8c2e --- /dev/null +++ b/packages/backend/src/services/trigger.ts @@ -0,0 +1,46 @@ +import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; +import Step from '../models/step'; +import Flow from '../models/flow'; +import Execution from '../models/execution'; +import globalVariable from '../helpers/global-variable'; + +type ProcessTriggerOptions = { + flowId: string; + stepId: string; + triggerDataItem?: ITriggerDataItem; + error?: IJSONObject; + testRun?: boolean; +}; + +export const processTrigger = async (options: ProcessTriggerOptions) => { + const { flowId, stepId, triggerDataItem, error, testRun } = options; + + const step = await Step.query().findById(stepId).throwIfNotFound(); + + const $ = await globalVariable({ + flow: await Flow.query().findById(flowId).throwIfNotFound(), + app: await step.getApp(), + step: step, + connection: await step.$relatedQuery('connection'), + }); + + // check if we already process this trigger data item or not! + + const execution = await Execution.query().insert({ + flowId: $.flow.id, + testRun, + internalId: triggerDataItem.meta.internalId, + }); + + const executionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: $.step.id, + status: error ? 'failure' : 'success', + dataIn: $.step.parameters, + dataOut: !error ? triggerDataItem.raw : null, + errorDetails: error, + }); + + return { flowId, stepId, executionId: execution.id, executionStep }; +}; diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index c63a751f..61452076 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -270,6 +270,7 @@ export type IGlobalVariable = { }; execution?: { id: string; + testRun: boolean; } process?: (triggerDataItem: ITriggerDataItem) => Promise; }; From 237ab48d330653bb4572098b75b283e1d65a8398 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Fri, 14 Oct 2022 22:33:26 +0200 Subject: [PATCH 5/5] refactor: Restructure workers to work with services --- .../triggers/search-tweets/search-tweets.ts | 2 +- .../src/graphql/mutations/execute-flow.ts | 2 +- .../backend/src/helpers/global-variable.ts | 5 +- packages/backend/src/models/execution-step.ts | 4 ++ packages/backend/src/models/step.ts | 8 +++ packages/backend/src/services/test-run.ts | 22 +++--- packages/backend/src/services/trigger.ts | 2 +- packages/backend/src/workers/action.ts | 68 +++---------------- packages/backend/src/workers/flow.ts | 40 ++++++++--- packages/backend/src/workers/trigger.ts | 42 ++++-------- 10 files changed, 81 insertions(+), 114 deletions(-) diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts index e0d25027..a9d38636 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/search-tweets.ts @@ -25,7 +25,7 @@ const searchTweets = async ( do { const params: IJSONObject = { query: options.searchTerm, - since_id: options.lastInternalId, + since_id: $.execution.testRun ? null : $.flow.lastInternalId, pagination_token: response?.data?.meta?.next_token, }; diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index c9ac3344..b35e4a82 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -23,7 +23,7 @@ const executeFlow = async ( status: 'completed', }); - if (executionStep.errorDetails) { + if (executionStep.isFailed) { throw new Error(JSON.stringify(executionStep.errorDetails)); } diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 934dbf94..e30d212e 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -22,10 +22,7 @@ const globalVariable = async ( const lastInternalId = await flow?.lastInternalId(); const trigger = await step?.getTriggerCommand(); - const nextStep = await flow - ?.$relatedQuery('steps') - .where({ position: step.position + 1 }) - .first(); + const nextStep = await step?.getNextStep(); const variable: IGlobalVariable = { auth: { diff --git a/packages/backend/src/models/execution-step.ts b/packages/backend/src/models/execution-step.ts index 2d01e7fb..e063d344 100644 --- a/packages/backend/src/models/execution-step.ts +++ b/packages/backend/src/models/execution-step.ts @@ -50,6 +50,10 @@ class ExecutionStep extends Base { }, }); + get isFailed() { + return this.status === 'failure'; + } + async $afterInsert(queryContext: QueryContext) { await super.$afterInsert(queryContext); Telemetry.executionStepCreated(this); diff --git a/packages/backend/src/models/step.ts b/packages/backend/src/models/step.ts index 78c0d31d..1885c7ad 100644 --- a/packages/backend/src/models/step.ts +++ b/packages/backend/src/models/step.ts @@ -102,6 +102,14 @@ class Step extends Base { return await App.findOneByKey(this.appKey); } + async getNextStep() { + const flow = await this.$relatedQuery('flow'); + + return await flow + .$relatedQuery('steps') + .findOne({ position: this.position + 1 }); + } + async getTriggerCommand() { const { appKey, key, isTrigger } = this; if (!isTrigger || !appKey || !key) return null; diff --git a/packages/backend/src/services/test-run.ts b/packages/backend/src/services/test-run.ts index debc011d..950aecc3 100644 --- a/packages/backend/src/services/test-run.ts +++ b/packages/backend/src/services/test-run.ts @@ -23,16 +23,6 @@ const testRun = async (options: TestRunOptions) => { testRun: true, }); - const firstTriggerDataItem = data[0]; - - const { executionId, executionStep: triggerExecutionStep } = - await processTrigger({ - flowId: flow.id, - stepId: triggerStep.id, - triggerDataItem: firstTriggerDataItem, - testRun: true, - }); - if (triggerError) { const { executionStep: triggerExecutionStepWithError } = await processTrigger({ @@ -45,6 +35,16 @@ const testRun = async (options: TestRunOptions) => { return { executionStep: triggerExecutionStepWithError }; } + const firstTriggerDataItem = data[0]; + + const { executionId, executionStep: triggerExecutionStep } = + await processTrigger({ + flowId: flow.id, + stepId: triggerStep.id, + triggerDataItem: firstTriggerDataItem, + testRun: true, + }); + if (triggerStep.id === untilStep.id) { return { executionStep: triggerExecutionStep }; } @@ -56,7 +56,7 @@ const testRun = async (options: TestRunOptions) => { executionId, }); - if (actionStep.id === untilStep.id || actionExecutionStep.errorDetails) { + if (actionStep.id === untilStep.id || actionExecutionStep.isFailed) { return { executionStep: actionExecutionStep }; } } diff --git a/packages/backend/src/services/trigger.ts b/packages/backend/src/services/trigger.ts index 61ee8c2e..5bf92185 100644 --- a/packages/backend/src/services/trigger.ts +++ b/packages/backend/src/services/trigger.ts @@ -29,7 +29,7 @@ export const processTrigger = async (options: ProcessTriggerOptions) => { const execution = await Execution.query().insert({ flowId: $.flow.id, testRun, - internalId: triggerDataItem.meta.internalId, + internalId: triggerDataItem?.meta.internalId, }); const executionStep = await execution diff --git a/packages/backend/src/workers/action.ts b/packages/backend/src/workers/action.ts index f1022e1a..75d9bd86 100644 --- a/packages/backend/src/workers/action.ts +++ b/packages/backend/src/workers/action.ts @@ -1,14 +1,9 @@ import { Worker } from 'bullmq'; import redisConfig from '../config/redis'; -import Flow from '../models/flow'; import logger from '../helpers/logger'; -import globalVariable from '../helpers/global-variable'; -import { IGlobalVariable } from '@automatisch/types'; -import Execution from '../models/execution'; -import Processor from '../services/processor'; -import ExecutionStep from '../models/execution-step'; import Step from '../models/step'; import actionQueue from '../queues/action'; +import { processAction } from '../services/action'; type JobData = { flowId: string; @@ -19,64 +14,21 @@ type JobData = { export const worker = new Worker( 'action', async (job) => { - const { flowId, stepId, executionId } = job.data as JobData; - - const step = await Step.query().findById(stepId).throwIfNotFound(); - const execution = await Execution.query() - .findById(executionId) - .throwIfNotFound(); - - const $ = await globalVariable({ - flow: await Flow.query().findById(flowId).throwIfNotFound(), - app: await step.getApp(), - step: step, - connection: await step.$relatedQuery('connection'), - execution: execution, - }); - - const priorExecutionSteps = await ExecutionStep.query().where({ - execution_id: $.execution.id, - }); - - const computedParameters = Processor.computeParameters( - $.step.parameters, - priorExecutionSteps + const { stepId, flowId, executionId } = await processAction( + job.data as JobData ); - const actionCommand = await step.getActionCommand(); + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); - $.step.parameters = computedParameters; - const actionDataItem = await actionCommand.run($); + if (!nextStep) return; - await execution.$relatedQuery('executionSteps').insertAndFetch({ - stepId: $.step.id, - status: 'success', - dataIn: computedParameters, - dataOut: actionDataItem.data.raw, - }); - - // TODO: Add until step id logic here! - // TODO: Change job name for the action data item! - const jobName = `${$.step.appKey}-sample`; - - if (!$.nextStep.id) return; - - const nextStep = await Step.query() - .findById($.nextStep.id) - .throwIfNotFound(); - - console.log('hello world'); - - const variable = await globalVariable({ - flow: await Flow.query().findById($.flow.id), - app: await nextStep.getApp(), - step: nextStep, - connection: await nextStep.$relatedQuery('connection'), - execution: execution, - }); + const jobName = `${executionId}-${nextStep.id}`; const jobPayload = { - $: variable, + flowId, + executionId, + stepId: nextStep.id, }; await actionQueue.add(jobName, jobPayload); diff --git a/packages/backend/src/workers/flow.ts b/packages/backend/src/workers/flow.ts index cd315fa3..917583cf 100644 --- a/packages/backend/src/workers/flow.ts +++ b/packages/backend/src/workers/flow.ts @@ -1,25 +1,43 @@ import { Worker } from 'bullmq'; import redisConfig from '../config/redis'; -import Flow from '../models/flow'; import logger from '../helpers/logger'; -import globalVariable from '../helpers/global-variable'; +import triggerQueue from '../queues/trigger'; +import { processFlow } from '../services/flow'; +import Flow from '../models/flow'; export const worker = new Worker( 'flow', async (job) => { - const flow = await Flow.query().findById(job.data.flowId).throwIfNotFound(); + const { flowId } = job.data; + const flow = await Flow.query().findById(flowId).throwIfNotFound(); const triggerStep = await flow.getTriggerStep(); - const triggerCommand = await triggerStep.getTriggerCommand(); - const $ = await globalVariable({ - flow, - connection: await triggerStep.$relatedQuery('connection'), - app: await triggerStep.getApp(), - step: triggerStep, - }); + const { data, error } = await processFlow({ flowId }); - await triggerCommand.run($); + for (const triggerDataItem of data) { + const jobName = `${triggerStep.id}-${triggerDataItem.meta.internalId}`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + triggerDataItem, + }; + + await triggerQueue.add(jobName, jobPayload); + } + + if (error) { + const jobName = `${triggerStep.id}-error`; + + const jobPayload = { + flowId, + stepId: triggerStep.id, + error, + }; + + await triggerQueue.add(jobName, jobPayload); + } }, { connection: redisConfig } ); diff --git a/packages/backend/src/workers/trigger.ts b/packages/backend/src/workers/trigger.ts index 29d07fde..f82f568d 100644 --- a/packages/backend/src/workers/trigger.ts +++ b/packages/backend/src/workers/trigger.ts @@ -1,46 +1,34 @@ import { Worker } from 'bullmq'; import redisConfig from '../config/redis'; -import Flow from '../models/flow'; import logger from '../helpers/logger'; -import globalVariable from '../helpers/global-variable'; -import { ITriggerDataItem, IGlobalVariable } from '@automatisch/types'; -import Execution from '../models/execution'; +import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; import actionQueue from '../queues/action'; import Step from '../models/step'; +import { processTrigger } from '../services/trigger'; type JobData = { - $: IGlobalVariable; - triggerDataItem: ITriggerDataItem; + flowId: string; + stepId: string; + triggerDataItem?: ITriggerDataItem; + error?: IJSONObject; }; export const worker = new Worker( 'trigger', async (job) => { - const { $, triggerDataItem } = job.data as JobData; + const { flowId, executionId, stepId, executionStep } = await processTrigger( + job.data as JobData + ); - // check if we already process this trigger data item or not! + if (executionStep.isFailed) return; - const execution = await Execution.query().insert({ - flowId: $.flow.id, - // TODO: Check the testRun logic and adjust following line! - testRun: true, - internalId: triggerDataItem.meta.internalId, - }); - - await execution.$relatedQuery('executionSteps').insertAndFetch({ - stepId: $.step.id, - status: 'success', - dataIn: $.step.parameters, - dataOut: triggerDataItem.raw, - }); - - const jobName = `${$.step.appKey}-${triggerDataItem.meta.internalId}`; - - const nextStep = await Step.query().findById($.nextStep.id); + const step = await Step.query().findById(stepId).throwIfNotFound(); + const nextStep = await step.getNextStep(); + const jobName = `${executionId}-${nextStep.id}`; const jobPayload = { - flowId: $.flow.id, - executionId: execution.id, + flowId, + executionId, stepId: nextStep.id, };