diff --git a/packages/backend/src/apps/github/actions/create-issue/index.ts b/packages/backend/src/apps/github/actions/create-issue/index.ts index 6a53da50..43ee8a12 100644 --- a/packages/backend/src/apps/github/actions/create-issue/index.ts +++ b/packages/backend/src/apps/github/actions/create-issue/index.ts @@ -27,25 +27,25 @@ export default defineAction({ arguments: [ { name: 'key', - value: 'listRepos' - } - ] - } + value: 'listRepos', + }, + ], + }, }, { label: 'Title', key: 'title', type: 'string', required: true, - variables: true + variables: true, }, { label: 'Body', key: 'body', type: 'string', required: true, - variables: true - } + variables: true, + }, ], }, { @@ -59,7 +59,7 @@ export default defineAction({ const title = $.step.parameters.title as string; const body = $.step.parameters.body as string; - if (!repoParameter) throw new Error('A repo must be set!') + if (!repoParameter) throw new Error('A repo must be set!'); if (!title) throw new Error('A title must be set!'); const { repoOwner, repo } = getRepoOwnerAndRepo(repoParameter); @@ -68,13 +68,6 @@ export default defineAction({ body, }); - const issue: IActionOutput = { - data: { - raw: response.data, - }, - error: response?.integrationError, - }; - - return issue; + $.setActionItem({ raw: response.data }); }, }); diff --git a/packages/backend/src/apps/github/triggers/new-pull-requests/index.ts b/packages/backend/src/apps/github/triggers/new-pull-requests/index.ts index 828b4fce..a6d262e2 100644 --- a/packages/backend/src/apps/github/triggers/new-pull-requests/index.ts +++ b/packages/backend/src/apps/github/triggers/new-pull-requests/index.ts @@ -9,7 +9,7 @@ export default defineTrigger({ substeps: [ { key: 'chooseConnection', - name: 'Choose connection' + name: 'Choose connection', }, { key: 'chooseTrigger', @@ -27,20 +27,20 @@ export default defineTrigger({ arguments: [ { name: 'key', - value: 'listRepos' - } - ] - } - } - ] + value: 'listRepos', + }, + ], + }, + }, + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], async run($) { - return await newPullRequests($); + await newPullRequests($); }, }); diff --git a/packages/backend/src/apps/github/triggers/new-pull-requests/new-pull-requests.ts b/packages/backend/src/apps/github/triggers/new-pull-requests/new-pull-requests.ts index 03c168d2..e827b546 100644 --- a/packages/backend/src/apps/github/triggers/new-pull-requests/new-pull-requests.ts +++ b/packages/backend/src/apps/github/triggers/new-pull-requests/new-pull-requests.ts @@ -1,7 +1,4 @@ -import { - IGlobalVariable, - ITriggerOutput, -} from '@automatisch/types'; +import { IGlobalVariable, ITriggerOutput } from '@automatisch/types'; import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo'; import parseLinkHeader from '../../../../helpers/parse-header-link'; @@ -29,16 +26,15 @@ const fetchPullRequests = async ($: IGlobalVariable) => { const response = await $.http.get(pathname, { params }); links = parseLinkHeader(response.headers.link); - if (response.integrationError) { - pullRequests.error = response.integrationError; - return pullRequests; - } - if (response.data.length) { for (const pullRequest of response.data) { const pullRequestId = pullRequest.id; - if (pullRequestId <= Number($.flow.lastInternalId) && !$.execution.testRun) return pullRequests; + if ( + pullRequestId <= Number($.flow.lastInternalId) && + !$.execution.testRun + ) + return pullRequests; const dataItem = { raw: pullRequest, @@ -53,7 +49,7 @@ const fetchPullRequests = async ($: IGlobalVariable) => { } while (links.next && !$.execution.testRun); return pullRequests; -} +}; const newPullRequests = async ($: IGlobalVariable) => { const pullRequests = await fetchPullRequests($); diff --git a/packages/backend/src/apps/github/triggers/new-stargazers/index.ts b/packages/backend/src/apps/github/triggers/new-stargazers/index.ts index f4393f73..cf033f96 100644 --- a/packages/backend/src/apps/github/triggers/new-stargazers/index.ts +++ b/packages/backend/src/apps/github/triggers/new-stargazers/index.ts @@ -44,11 +44,9 @@ export default defineTrigger({ await newStargazers($); }, - sort($) { - $.triggerOutput.data.sort((stargazerA, stargazerB) => { - return ( - Number(stargazerA.meta.internalId) - Number(stargazerB.meta.internalId) - ); - }); + sort(stargazerA, stargazerB) { + return ( + Number(stargazerA.meta.internalId) - Number(stargazerB.meta.internalId) + ); }, }); diff --git a/packages/backend/src/apps/github/triggers/new-watchers/index.ts b/packages/backend/src/apps/github/triggers/new-watchers/index.ts index cffd8f2f..a7f744d1 100644 --- a/packages/backend/src/apps/github/triggers/new-watchers/index.ts +++ b/packages/backend/src/apps/github/triggers/new-watchers/index.ts @@ -10,7 +10,7 @@ export default defineTrigger({ substeps: [ { key: 'chooseConnection', - name: 'Choose connection' + name: 'Choose connection', }, { key: 'chooseTrigger', @@ -28,20 +28,24 @@ export default defineTrigger({ arguments: [ { name: 'key', - value: 'listRepos' - } - ] - } + value: 'listRepos', + }, + ], + }, }, - ] + ], }, { key: 'testStep', - name: 'Test trigger' - } + name: 'Test trigger', + }, ], async run($) { - return await newWatchers($); + await newWatchers($); + }, + + sort() { + return -1; }, }); diff --git a/packages/backend/src/apps/github/triggers/new-watchers/new-watchers.ts b/packages/backend/src/apps/github/triggers/new-watchers/new-watchers.ts index 255ab493..fb4684b3 100644 --- a/packages/backend/src/apps/github/triggers/new-watchers/new-watchers.ts +++ b/packages/backend/src/apps/github/triggers/new-watchers/new-watchers.ts @@ -1,11 +1,8 @@ -import { - IGlobalVariable, - ITriggerOutput, -} from '@automatisch/types'; +import { IGlobalVariable, ITriggerOutput } from '@automatisch/types'; import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo'; import parseLinkHeader from '../../../../helpers/parse-header-link'; -const fetchWatchers = async ($: IGlobalVariable) => { +const newWatchers = async ($: IGlobalVariable) => { const repoParameter = $.step.parameters.repo as string; if (!repoParameter) throw new Error('A repo must be set!'); @@ -15,9 +12,9 @@ const fetchWatchers = async ($: IGlobalVariable) => { const firstPagePathname = `/repos/${repoOwner}/${repo}/subscribers`; const requestConfig = { params: { - per_page: 100 + per_page: 100, }, - } + }; const firstPageResponse = await $.http.get(firstPagePathname, requestConfig); const firstPageLinks = parseLinkHeader(firstPageResponse.headers.link); @@ -25,20 +22,11 @@ const fetchWatchers = async ($: IGlobalVariable) => { // in case there is only single page to fetch let pathname = firstPageLinks.last?.uri || firstPagePathname; - const watchers: ITriggerOutput = { - data: [], - }; - do { const response = await $.http.get(pathname, requestConfig); const links = parseLinkHeader(response.headers.link); pathname = links.prev?.uri; - if (response.integrationError) { - watchers.error = response.integrationError; - return watchers; - } - if (response.data.length) { // to iterate reverse-chronologically response.data.reverse(); @@ -46,7 +34,8 @@ const fetchWatchers = async ($: IGlobalVariable) => { for (const watcher of response.data) { const watcherId = watcher.id.toString(); - if ($.flow.isAlreadyProcessed(watcherId) && !$.execution.testRun) return watchers; + if ($.flow.isAlreadyProcessed(watcherId) && !$.execution.testRun) + return; const dataItem = { raw: watcher, @@ -55,21 +44,10 @@ const fetchWatchers = async ($: IGlobalVariable) => { }, }; - watchers.data.push(dataItem); + $.pushTriggerItem(dataItem); } } } while (pathname && !$.execution.testRun === false); - - return watchers; -} - -const newWatchers = async ($: IGlobalVariable) => { - const watchers = await fetchWatchers($); - - // to process chronologically - watchers.data.reverse(); - - return watchers; }; export default newWatchers; diff --git a/packages/backend/src/apps/twitter/common/get-user-followers.ts b/packages/backend/src/apps/twitter/common/get-user-followers.ts index 45d47d3f..cc174311 100644 --- a/packages/backend/src/apps/twitter/common/get-user-followers.ts +++ b/packages/backend/src/apps/twitter/common/get-user-followers.ts @@ -1,8 +1,4 @@ -import { - IGlobalVariable, - IJSONObject, - ITriggerOutput, -} from '@automatisch/types'; +import { IGlobalVariable, IJSONObject } from '@automatisch/types'; import { URLSearchParams } from 'url'; import { omitBy, isEmpty } from 'lodash'; @@ -16,10 +12,6 @@ const getUserFollowers = async ( ) => { let response; - const followers: ITriggerOutput = { - data: [], - }; - do { const params: IJSONObject = { pagination_token: response?.data?.meta?.next_token, @@ -40,18 +32,16 @@ const getUserFollowers = async ( if (response.data.meta.result_count > 0) { for (const follower of response.data.data) { if ($.flow.isAlreadyProcessed(follower.id as string)) { - return followers; + return; } - followers.data.push({ + $.pushTriggerItem({ raw: follower, meta: { internalId: follower.id as string }, }); } } } while (response.data.meta.next_token && !$.execution.testRun); - - return followers; }; export default getUserFollowers; diff --git a/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts index 8f9f4475..cfcd13f2 100644 --- a/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/my-tweets/index.ts @@ -21,9 +21,7 @@ export default defineTrigger({ await getUserTweets($, { currentUser: true }); }, - sort($) { - $.triggerOutput.data.sort((tweet, nextTweet) => { - return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId); - }); + sort(tweet, nextTweet) { + return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId); }, }); diff --git a/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts b/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts index 8f5fe4f6..19adfd6c 100644 --- a/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts +++ b/packages/backend/src/apps/twitter/triggers/new-follower-of-me/index.ts @@ -19,6 +19,6 @@ export default defineTrigger({ ], async run($) { - return await myFollowers($); + await myFollowers($); }, }); diff --git a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts index 3e69afd2..506bbe83 100644 --- a/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/search-tweets/index.ts @@ -34,9 +34,7 @@ export default defineTrigger({ await searchTweets($); }, - sort($) { - $.triggerOutput.data.sort((tweet, nextTweet) => { - return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId); - }); + sort(tweet, nextTweet) { + return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId); }, }); diff --git a/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts b/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts index 783fe028..dab530ae 100644 --- a/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts +++ b/packages/backend/src/apps/twitter/triggers/user-tweets/index.ts @@ -33,9 +33,7 @@ export default defineTrigger({ await getUserTweets($, { currentUser: false }); }, - sort($) { - $.triggerOutput.data.sort((tweet, nextTweet) => { - return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId); - }); + sort(tweet, nextTweet) { + return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId); }, }); diff --git a/packages/backend/src/helpers/global-variable.ts b/packages/backend/src/helpers/global-variable.ts index 26291fa9..1db67650 100644 --- a/packages/backend/src/helpers/global-variable.ts +++ b/packages/backend/src/helpers/global-variable.ts @@ -3,7 +3,13 @@ import Connection from '../models/connection'; import Flow from '../models/flow'; import Step from '../models/step'; import Execution from '../models/execution'; -import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types'; +import { + IJSONObject, + IApp, + IGlobalVariable, + ITriggerItem, + IActionItem, +} from '@automatisch/types'; type GlobalVariableOptions = { connection?: Connection; @@ -61,11 +67,15 @@ const globalVariable = async ( }, triggerOutput: { data: [], - error: null, }, actionOutput: { data: null, - error: null, + }, + pushTriggerItem: (triggerItem: ITriggerItem) => { + $.triggerOutput.data.push(triggerItem); + }, + setActionItem: (actionItem: IActionItem) => { + $.actionOutput.data = actionItem; }, }; diff --git a/packages/backend/src/services/action.ts b/packages/backend/src/services/action.ts index b4c3f66c..aefdeaf3 100644 --- a/packages/backend/src/services/action.ts +++ b/packages/backend/src/services/action.ts @@ -39,16 +39,29 @@ export const processAction = async (options: ProcessActionOptions) => { const actionCommand = await step.getActionCommand(); $.step.parameters = computedParameters; - const actionOutput = await actionCommand.run($); + + try { + await actionCommand.run($); + } catch (error) { + if (error?.response?.httpError) { + $.actionOutput.error = error.response.httpError; + } else { + try { + $.actionOutput.error = JSON.parse(error.message); + } catch { + $.actionOutput.error = { error: error.message }; + } + } + } const executionStep = await execution .$relatedQuery('executionSteps') .insertAndFetch({ stepId: $.step.id, - status: actionOutput.error ? 'failure' : 'success', + status: $.actionOutput.error ? 'failure' : 'success', dataIn: computedParameters, - dataOut: actionOutput.error ? null : actionOutput.data?.raw, - errorDetails: actionOutput.error ? actionOutput.error : null, + dataOut: $.actionOutput.error ? null : $.actionOutput.data?.raw, + errorDetails: $.actionOutput.error ? $.actionOutput.error : null, }); return { flowId, stepId, executionId, executionStep }; diff --git a/packages/backend/src/services/flow.ts b/packages/backend/src/services/flow.ts index 1a333872..4cde0fd7 100644 --- a/packages/backend/src/services/flow.ts +++ b/packages/backend/src/services/flow.ts @@ -35,7 +35,7 @@ export const processFlow = async (options: ProcessFlowOptions) => { } if (triggerCommand?.sort) { - triggerCommand.sort($); + $.triggerOutput.data.sort(triggerCommand.sort); } return $.triggerOutput; diff --git a/packages/backend/src/services/test-run.ts b/packages/backend/src/services/test-run.ts index 950aecc3..4d18028d 100644 --- a/packages/backend/src/services/test-run.ts +++ b/packages/backend/src/services/test-run.ts @@ -35,13 +35,13 @@ const testRun = async (options: TestRunOptions) => { return { executionStep: triggerExecutionStepWithError }; } - const firstTriggerDataItem = data[0]; + const firstTriggerItem = data[0]; const { executionId, executionStep: triggerExecutionStep } = await processTrigger({ flowId: flow.id, stepId: triggerStep.id, - triggerDataItem: firstTriggerDataItem, + triggerItem: firstTriggerItem, testRun: true, }); diff --git a/packages/backend/src/services/trigger.ts b/packages/backend/src/services/trigger.ts index 265f2bb6..8cfdc974 100644 --- a/packages/backend/src/services/trigger.ts +++ b/packages/backend/src/services/trigger.ts @@ -1,4 +1,4 @@ -import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; +import { IJSONObject, ITriggerItem } from '@automatisch/types'; import Step from '../models/step'; import Flow from '../models/flow'; import Execution from '../models/execution'; @@ -7,13 +7,13 @@ import globalVariable from '../helpers/global-variable'; type ProcessTriggerOptions = { flowId: string; stepId: string; - triggerDataItem?: ITriggerDataItem; + triggerItem?: ITriggerItem; error?: IJSONObject; testRun?: boolean; }; export const processTrigger = async (options: ProcessTriggerOptions) => { - const { flowId, stepId, triggerDataItem, error, testRun } = options; + const { flowId, stepId, triggerItem, error, testRun } = options; const step = await Step.query().findById(stepId).throwIfNotFound(); @@ -29,7 +29,7 @@ export const processTrigger = async (options: ProcessTriggerOptions) => { const execution = await Execution.query().insert({ flowId: $.flow.id, testRun, - internalId: triggerDataItem?.meta.internalId, + internalId: triggerItem?.meta.internalId, }); const executionStep = await execution @@ -38,7 +38,7 @@ export const processTrigger = async (options: ProcessTriggerOptions) => { stepId: $.step.id, status: error ? 'failure' : 'success', dataIn: $.step.parameters, - dataOut: !error ? triggerDataItem?.raw : null, + dataOut: !error ? triggerItem?.raw : null, errorDetails: error, }); diff --git a/packages/backend/src/workers/flow.ts b/packages/backend/src/workers/flow.ts index 917583cf..d8dc670a 100644 --- a/packages/backend/src/workers/flow.ts +++ b/packages/backend/src/workers/flow.ts @@ -15,13 +15,15 @@ export const worker = new Worker( const { data, error } = await processFlow({ flowId }); - for (const triggerDataItem of data) { - const jobName = `${triggerStep.id}-${triggerDataItem.meta.internalId}`; + const reversedData = data.reverse(); + + for (const triggerItem of reversedData) { + const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`; const jobPayload = { flowId, stepId: triggerStep.id, - triggerDataItem, + triggerItem, }; await triggerQueue.add(jobName, jobPayload); diff --git a/packages/backend/src/workers/trigger.ts b/packages/backend/src/workers/trigger.ts index f82f568d..8d64e032 100644 --- a/packages/backend/src/workers/trigger.ts +++ b/packages/backend/src/workers/trigger.ts @@ -1,7 +1,7 @@ import { Worker } from 'bullmq'; import redisConfig from '../config/redis'; import logger from '../helpers/logger'; -import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; +import { IJSONObject, ITriggerItem } from '@automatisch/types'; import actionQueue from '../queues/action'; import Step from '../models/step'; import { processTrigger } from '../services/trigger'; @@ -9,7 +9,7 @@ import { processTrigger } from '../services/trigger'; type JobData = { flowId: string; stepId: string; - triggerDataItem?: ITriggerDataItem; + triggerItem?: ITriggerItem; error?: IJSONObject; }; diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 35808c4e..c1580321 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -194,11 +194,11 @@ export interface IService { } export interface ITriggerOutput { - data: ITriggerDataItem[]; + data: ITriggerItem[]; error?: IJSONObject; } -export interface ITriggerDataItem { +export interface ITriggerItem { raw: IJSONObject; meta: { internalId: string; @@ -212,17 +212,17 @@ export interface ITrigger { description: string; dedupeStrategy?: 'greatest' | 'unique' | 'last'; substeps: ISubstep[]; - getInterval?(parameters: IGlobalVariable['step']['parameters']): string; - run($: IGlobalVariable): Promise; - sort?($: IGlobalVariable): void | ITriggerOutput; + getInterval?(parameters: IStep['parameters']): string; + run($: IGlobalVariable): Promise; + sort?(item: ITriggerItem, nextItem: ITriggerItem): number; } export interface IActionOutput { - data: IActionDataItem; + data: IActionItem; error?: IJSONObject; } -export interface IActionDataItem { +export interface IActionItem { raw: { data?: IJSONObject; }; @@ -233,7 +233,7 @@ export interface IAction { key: string; description: string; substeps: ISubstep[]; - run($: IGlobalVariable): Promise; + run($: IGlobalVariable): Promise; } export interface IAuthentication { @@ -282,7 +282,8 @@ export type IGlobalVariable = { }; triggerOutput?: ITriggerOutput; actionOutput?: IActionOutput; - process?: (triggerDataItem: ITriggerDataItem) => Promise; + pushTriggerItem?: (triggerItem: ITriggerItem) => void; + setActionItem?: (actionItem: IActionItem) => void; }; declare module 'axios' {