refactor: Restructure apps with new data pushing logic

This commit is contained in:
Faruk AYDIN
2022-10-22 19:29:02 +02:00
parent bcff9f5a9e
commit a56135ca57
19 changed files with 115 additions and 136 deletions

View File

@@ -27,25 +27,25 @@ export default defineAction({
arguments: [ arguments: [
{ {
name: 'key', name: 'key',
value: 'listRepos' value: 'listRepos',
} },
] ],
} },
}, },
{ {
label: 'Title', label: 'Title',
key: 'title', key: 'title',
type: 'string', type: 'string',
required: true, required: true,
variables: true variables: true,
}, },
{ {
label: 'Body', label: 'Body',
key: 'body', key: 'body',
type: 'string', type: 'string',
required: true, required: true,
variables: true variables: true,
} },
], ],
}, },
{ {
@@ -59,7 +59,7 @@ export default defineAction({
const title = $.step.parameters.title as string; const title = $.step.parameters.title as string;
const body = $.step.parameters.body as string; const body = $.step.parameters.body as string;
if (!repoParameter) throw new Error('A repo must be set!') if (!repoParameter) throw new Error('A repo must be set!');
if (!title) throw new Error('A title must be set!'); if (!title) throw new Error('A title must be set!');
const { repoOwner, repo } = getRepoOwnerAndRepo(repoParameter); const { repoOwner, repo } = getRepoOwnerAndRepo(repoParameter);
@@ -68,13 +68,6 @@ export default defineAction({
body, body,
}); });
const issue: IActionOutput = { $.setActionItem({ raw: response.data });
data: {
raw: response.data,
},
error: response?.integrationError,
};
return issue;
}, },
}); });

View File

@@ -9,7 +9,7 @@ export default defineTrigger({
substeps: [ substeps: [
{ {
key: 'chooseConnection', key: 'chooseConnection',
name: 'Choose connection' name: 'Choose connection',
}, },
{ {
key: 'chooseTrigger', key: 'chooseTrigger',
@@ -27,20 +27,20 @@ export default defineTrigger({
arguments: [ arguments: [
{ {
name: 'key', name: 'key',
value: 'listRepos' value: 'listRepos',
} },
] ],
} },
} },
] ],
}, },
{ {
key: 'testStep', key: 'testStep',
name: 'Test trigger' name: 'Test trigger',
} },
], ],
async run($) { async run($) {
return await newPullRequests($); await newPullRequests($);
}, },
}); });

View File

@@ -1,7 +1,4 @@
import { import { IGlobalVariable, ITriggerOutput } from '@automatisch/types';
IGlobalVariable,
ITriggerOutput,
} from '@automatisch/types';
import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo'; import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo';
import parseLinkHeader from '../../../../helpers/parse-header-link'; import parseLinkHeader from '../../../../helpers/parse-header-link';
@@ -29,16 +26,15 @@ const fetchPullRequests = async ($: IGlobalVariable) => {
const response = await $.http.get(pathname, { params }); const response = await $.http.get(pathname, { params });
links = parseLinkHeader(response.headers.link); links = parseLinkHeader(response.headers.link);
if (response.integrationError) {
pullRequests.error = response.integrationError;
return pullRequests;
}
if (response.data.length) { if (response.data.length) {
for (const pullRequest of response.data) { for (const pullRequest of response.data) {
const pullRequestId = pullRequest.id; const pullRequestId = pullRequest.id;
if (pullRequestId <= Number($.flow.lastInternalId) && !$.execution.testRun) return pullRequests; if (
pullRequestId <= Number($.flow.lastInternalId) &&
!$.execution.testRun
)
return pullRequests;
const dataItem = { const dataItem = {
raw: pullRequest, raw: pullRequest,
@@ -53,7 +49,7 @@ const fetchPullRequests = async ($: IGlobalVariable) => {
} while (links.next && !$.execution.testRun); } while (links.next && !$.execution.testRun);
return pullRequests; return pullRequests;
} };
const newPullRequests = async ($: IGlobalVariable) => { const newPullRequests = async ($: IGlobalVariable) => {
const pullRequests = await fetchPullRequests($); const pullRequests = await fetchPullRequests($);

View File

@@ -44,11 +44,9 @@ export default defineTrigger({
await newStargazers($); await newStargazers($);
}, },
sort($) { sort(stargazerA, stargazerB) {
$.triggerOutput.data.sort((stargazerA, stargazerB) => { return (
return ( Number(stargazerA.meta.internalId) - Number(stargazerB.meta.internalId)
Number(stargazerA.meta.internalId) - Number(stargazerB.meta.internalId) );
);
});
}, },
}); });

View File

@@ -10,7 +10,7 @@ export default defineTrigger({
substeps: [ substeps: [
{ {
key: 'chooseConnection', key: 'chooseConnection',
name: 'Choose connection' name: 'Choose connection',
}, },
{ {
key: 'chooseTrigger', key: 'chooseTrigger',
@@ -28,20 +28,24 @@ export default defineTrigger({
arguments: [ arguments: [
{ {
name: 'key', name: 'key',
value: 'listRepos' value: 'listRepos',
} },
] ],
} },
}, },
] ],
}, },
{ {
key: 'testStep', key: 'testStep',
name: 'Test trigger' name: 'Test trigger',
} },
], ],
async run($) { async run($) {
return await newWatchers($); await newWatchers($);
},
sort() {
return -1;
}, },
}); });

View File

@@ -1,11 +1,8 @@
import { import { IGlobalVariable, ITriggerOutput } from '@automatisch/types';
IGlobalVariable,
ITriggerOutput,
} from '@automatisch/types';
import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo'; import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo';
import parseLinkHeader from '../../../../helpers/parse-header-link'; import parseLinkHeader from '../../../../helpers/parse-header-link';
const fetchWatchers = async ($: IGlobalVariable) => { const newWatchers = async ($: IGlobalVariable) => {
const repoParameter = $.step.parameters.repo as string; const repoParameter = $.step.parameters.repo as string;
if (!repoParameter) throw new Error('A repo must be set!'); if (!repoParameter) throw new Error('A repo must be set!');
@@ -15,9 +12,9 @@ const fetchWatchers = async ($: IGlobalVariable) => {
const firstPagePathname = `/repos/${repoOwner}/${repo}/subscribers`; const firstPagePathname = `/repos/${repoOwner}/${repo}/subscribers`;
const requestConfig = { const requestConfig = {
params: { params: {
per_page: 100 per_page: 100,
}, },
} };
const firstPageResponse = await $.http.get(firstPagePathname, requestConfig); const firstPageResponse = await $.http.get(firstPagePathname, requestConfig);
const firstPageLinks = parseLinkHeader(firstPageResponse.headers.link); const firstPageLinks = parseLinkHeader(firstPageResponse.headers.link);
@@ -25,20 +22,11 @@ const fetchWatchers = async ($: IGlobalVariable) => {
// in case there is only single page to fetch // in case there is only single page to fetch
let pathname = firstPageLinks.last?.uri || firstPagePathname; let pathname = firstPageLinks.last?.uri || firstPagePathname;
const watchers: ITriggerOutput = {
data: [],
};
do { do {
const response = await $.http.get(pathname, requestConfig); const response = await $.http.get(pathname, requestConfig);
const links = parseLinkHeader(response.headers.link); const links = parseLinkHeader(response.headers.link);
pathname = links.prev?.uri; pathname = links.prev?.uri;
if (response.integrationError) {
watchers.error = response.integrationError;
return watchers;
}
if (response.data.length) { if (response.data.length) {
// to iterate reverse-chronologically // to iterate reverse-chronologically
response.data.reverse(); response.data.reverse();
@@ -46,7 +34,8 @@ const fetchWatchers = async ($: IGlobalVariable) => {
for (const watcher of response.data) { for (const watcher of response.data) {
const watcherId = watcher.id.toString(); const watcherId = watcher.id.toString();
if ($.flow.isAlreadyProcessed(watcherId) && !$.execution.testRun) return watchers; if ($.flow.isAlreadyProcessed(watcherId) && !$.execution.testRun)
return;
const dataItem = { const dataItem = {
raw: watcher, raw: watcher,
@@ -55,21 +44,10 @@ const fetchWatchers = async ($: IGlobalVariable) => {
}, },
}; };
watchers.data.push(dataItem); $.pushTriggerItem(dataItem);
} }
} }
} while (pathname && !$.execution.testRun === false); } while (pathname && !$.execution.testRun === false);
return watchers;
}
const newWatchers = async ($: IGlobalVariable) => {
const watchers = await fetchWatchers($);
// to process chronologically
watchers.data.reverse();
return watchers;
}; };
export default newWatchers; export default newWatchers;

View File

@@ -1,8 +1,4 @@
import { import { IGlobalVariable, IJSONObject } from '@automatisch/types';
IGlobalVariable,
IJSONObject,
ITriggerOutput,
} from '@automatisch/types';
import { URLSearchParams } from 'url'; import { URLSearchParams } from 'url';
import { omitBy, isEmpty } from 'lodash'; import { omitBy, isEmpty } from 'lodash';
@@ -16,10 +12,6 @@ const getUserFollowers = async (
) => { ) => {
let response; let response;
const followers: ITriggerOutput = {
data: [],
};
do { do {
const params: IJSONObject = { const params: IJSONObject = {
pagination_token: response?.data?.meta?.next_token, pagination_token: response?.data?.meta?.next_token,
@@ -40,18 +32,16 @@ const getUserFollowers = async (
if (response.data.meta.result_count > 0) { if (response.data.meta.result_count > 0) {
for (const follower of response.data.data) { for (const follower of response.data.data) {
if ($.flow.isAlreadyProcessed(follower.id as string)) { if ($.flow.isAlreadyProcessed(follower.id as string)) {
return followers; return;
} }
followers.data.push({ $.pushTriggerItem({
raw: follower, raw: follower,
meta: { internalId: follower.id as string }, meta: { internalId: follower.id as string },
}); });
} }
} }
} while (response.data.meta.next_token && !$.execution.testRun); } while (response.data.meta.next_token && !$.execution.testRun);
return followers;
}; };
export default getUserFollowers; export default getUserFollowers;

View File

@@ -21,9 +21,7 @@ export default defineTrigger({
await getUserTweets($, { currentUser: true }); await getUserTweets($, { currentUser: true });
}, },
sort($) { sort(tweet, nextTweet) {
$.triggerOutput.data.sort((tweet, nextTweet) => { return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
});
}, },
}); });

View File

@@ -19,6 +19,6 @@ export default defineTrigger({
], ],
async run($) { async run($) {
return await myFollowers($); await myFollowers($);
}, },
}); });

View File

@@ -34,9 +34,7 @@ export default defineTrigger({
await searchTweets($); await searchTweets($);
}, },
sort($) { sort(tweet, nextTweet) {
$.triggerOutput.data.sort((tweet, nextTweet) => { return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
});
}, },
}); });

View File

@@ -33,9 +33,7 @@ export default defineTrigger({
await getUserTweets($, { currentUser: false }); await getUserTweets($, { currentUser: false });
}, },
sort($) { sort(tweet, nextTweet) {
$.triggerOutput.data.sort((tweet, nextTweet) => { return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
});
}, },
}); });

View File

@@ -3,7 +3,13 @@ import Connection from '../models/connection';
import Flow from '../models/flow'; import Flow from '../models/flow';
import Step from '../models/step'; import Step from '../models/step';
import Execution from '../models/execution'; import Execution from '../models/execution';
import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types'; import {
IJSONObject,
IApp,
IGlobalVariable,
ITriggerItem,
IActionItem,
} from '@automatisch/types';
type GlobalVariableOptions = { type GlobalVariableOptions = {
connection?: Connection; connection?: Connection;
@@ -61,11 +67,15 @@ const globalVariable = async (
}, },
triggerOutput: { triggerOutput: {
data: [], data: [],
error: null,
}, },
actionOutput: { actionOutput: {
data: null, data: null,
error: null, },
pushTriggerItem: (triggerItem: ITriggerItem) => {
$.triggerOutput.data.push(triggerItem);
},
setActionItem: (actionItem: IActionItem) => {
$.actionOutput.data = actionItem;
}, },
}; };

View File

@@ -39,16 +39,29 @@ export const processAction = async (options: ProcessActionOptions) => {
const actionCommand = await step.getActionCommand(); const actionCommand = await step.getActionCommand();
$.step.parameters = computedParameters; $.step.parameters = computedParameters;
const actionOutput = await actionCommand.run($);
try {
await actionCommand.run($);
} catch (error) {
if (error?.response?.httpError) {
$.actionOutput.error = error.response.httpError;
} else {
try {
$.actionOutput.error = JSON.parse(error.message);
} catch {
$.actionOutput.error = { error: error.message };
}
}
}
const executionStep = await execution const executionStep = await execution
.$relatedQuery('executionSteps') .$relatedQuery('executionSteps')
.insertAndFetch({ .insertAndFetch({
stepId: $.step.id, stepId: $.step.id,
status: actionOutput.error ? 'failure' : 'success', status: $.actionOutput.error ? 'failure' : 'success',
dataIn: computedParameters, dataIn: computedParameters,
dataOut: actionOutput.error ? null : actionOutput.data?.raw, dataOut: $.actionOutput.error ? null : $.actionOutput.data?.raw,
errorDetails: actionOutput.error ? actionOutput.error : null, errorDetails: $.actionOutput.error ? $.actionOutput.error : null,
}); });
return { flowId, stepId, executionId, executionStep }; return { flowId, stepId, executionId, executionStep };

View File

@@ -35,7 +35,7 @@ export const processFlow = async (options: ProcessFlowOptions) => {
} }
if (triggerCommand?.sort) { if (triggerCommand?.sort) {
triggerCommand.sort($); $.triggerOutput.data.sort(triggerCommand.sort);
} }
return $.triggerOutput; return $.triggerOutput;

View File

@@ -35,13 +35,13 @@ const testRun = async (options: TestRunOptions) => {
return { executionStep: triggerExecutionStepWithError }; return { executionStep: triggerExecutionStepWithError };
} }
const firstTriggerDataItem = data[0]; const firstTriggerItem = data[0];
const { executionId, executionStep: triggerExecutionStep } = const { executionId, executionStep: triggerExecutionStep } =
await processTrigger({ await processTrigger({
flowId: flow.id, flowId: flow.id,
stepId: triggerStep.id, stepId: triggerStep.id,
triggerDataItem: firstTriggerDataItem, triggerItem: firstTriggerItem,
testRun: true, testRun: true,
}); });

View File

@@ -1,4 +1,4 @@
import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; import { IJSONObject, ITriggerItem } from '@automatisch/types';
import Step from '../models/step'; import Step from '../models/step';
import Flow from '../models/flow'; import Flow from '../models/flow';
import Execution from '../models/execution'; import Execution from '../models/execution';
@@ -7,13 +7,13 @@ import globalVariable from '../helpers/global-variable';
type ProcessTriggerOptions = { type ProcessTriggerOptions = {
flowId: string; flowId: string;
stepId: string; stepId: string;
triggerDataItem?: ITriggerDataItem; triggerItem?: ITriggerItem;
error?: IJSONObject; error?: IJSONObject;
testRun?: boolean; testRun?: boolean;
}; };
export const processTrigger = async (options: ProcessTriggerOptions) => { export const processTrigger = async (options: ProcessTriggerOptions) => {
const { flowId, stepId, triggerDataItem, error, testRun } = options; const { flowId, stepId, triggerItem, error, testRun } = options;
const step = await Step.query().findById(stepId).throwIfNotFound(); const step = await Step.query().findById(stepId).throwIfNotFound();
@@ -29,7 +29,7 @@ export const processTrigger = async (options: ProcessTriggerOptions) => {
const execution = await Execution.query().insert({ const execution = await Execution.query().insert({
flowId: $.flow.id, flowId: $.flow.id,
testRun, testRun,
internalId: triggerDataItem?.meta.internalId, internalId: triggerItem?.meta.internalId,
}); });
const executionStep = await execution const executionStep = await execution
@@ -38,7 +38,7 @@ export const processTrigger = async (options: ProcessTriggerOptions) => {
stepId: $.step.id, stepId: $.step.id,
status: error ? 'failure' : 'success', status: error ? 'failure' : 'success',
dataIn: $.step.parameters, dataIn: $.step.parameters,
dataOut: !error ? triggerDataItem?.raw : null, dataOut: !error ? triggerItem?.raw : null,
errorDetails: error, errorDetails: error,
}); });

View File

@@ -15,13 +15,15 @@ export const worker = new Worker(
const { data, error } = await processFlow({ flowId }); const { data, error } = await processFlow({ flowId });
for (const triggerDataItem of data) { const reversedData = data.reverse();
const jobName = `${triggerStep.id}-${triggerDataItem.meta.internalId}`;
for (const triggerItem of reversedData) {
const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`;
const jobPayload = { const jobPayload = {
flowId, flowId,
stepId: triggerStep.id, stepId: triggerStep.id,
triggerDataItem, triggerItem,
}; };
await triggerQueue.add(jobName, jobPayload); await triggerQueue.add(jobName, jobPayload);

View File

@@ -1,7 +1,7 @@
import { Worker } from 'bullmq'; import { Worker } from 'bullmq';
import redisConfig from '../config/redis'; import redisConfig from '../config/redis';
import logger from '../helpers/logger'; import logger from '../helpers/logger';
import { IJSONObject, ITriggerDataItem } from '@automatisch/types'; import { IJSONObject, ITriggerItem } from '@automatisch/types';
import actionQueue from '../queues/action'; import actionQueue from '../queues/action';
import Step from '../models/step'; import Step from '../models/step';
import { processTrigger } from '../services/trigger'; import { processTrigger } from '../services/trigger';
@@ -9,7 +9,7 @@ import { processTrigger } from '../services/trigger';
type JobData = { type JobData = {
flowId: string; flowId: string;
stepId: string; stepId: string;
triggerDataItem?: ITriggerDataItem; triggerItem?: ITriggerItem;
error?: IJSONObject; error?: IJSONObject;
}; };

View File

@@ -194,11 +194,11 @@ export interface IService {
} }
export interface ITriggerOutput { export interface ITriggerOutput {
data: ITriggerDataItem[]; data: ITriggerItem[];
error?: IJSONObject; error?: IJSONObject;
} }
export interface ITriggerDataItem { export interface ITriggerItem {
raw: IJSONObject; raw: IJSONObject;
meta: { meta: {
internalId: string; internalId: string;
@@ -212,17 +212,17 @@ export interface ITrigger {
description: string; description: string;
dedupeStrategy?: 'greatest' | 'unique' | 'last'; dedupeStrategy?: 'greatest' | 'unique' | 'last';
substeps: ISubstep[]; substeps: ISubstep[];
getInterval?(parameters: IGlobalVariable['step']['parameters']): string; getInterval?(parameters: IStep['parameters']): string;
run($: IGlobalVariable): Promise<void | ITriggerOutput>; run($: IGlobalVariable): Promise<void>;
sort?($: IGlobalVariable): void | ITriggerOutput; sort?(item: ITriggerItem, nextItem: ITriggerItem): number;
} }
export interface IActionOutput { export interface IActionOutput {
data: IActionDataItem; data: IActionItem;
error?: IJSONObject; error?: IJSONObject;
} }
export interface IActionDataItem { export interface IActionItem {
raw: { raw: {
data?: IJSONObject; data?: IJSONObject;
}; };
@@ -233,7 +233,7 @@ export interface IAction {
key: string; key: string;
description: string; description: string;
substeps: ISubstep[]; substeps: ISubstep[];
run($: IGlobalVariable): Promise<void | IActionOutput>; run($: IGlobalVariable): Promise<void>;
} }
export interface IAuthentication { export interface IAuthentication {
@@ -282,7 +282,8 @@ export type IGlobalVariable = {
}; };
triggerOutput?: ITriggerOutput; triggerOutput?: ITriggerOutput;
actionOutput?: IActionOutput; actionOutput?: IActionOutput;
process?: (triggerDataItem: ITriggerDataItem) => Promise<void>; pushTriggerItem?: (triggerItem: ITriggerItem) => void;
setActionItem?: (actionItem: IActionItem) => void;
}; };
declare module 'axios' { declare module 'axios' {