Merge pull request #655 from automatisch/refactor/integration-structure

Refactor integration structure
This commit is contained in:
Ömer Faruk Aydın
2022-10-28 20:14:18 +02:00
committed by GitHub
30 changed files with 103 additions and 145 deletions

View File

@@ -6,16 +6,15 @@ export default defineTrigger({
pollInterval: 15,
key: 'new-albums',
description: 'Triggers when you create a new album.',
dedupeStrategy: 'greatest',
substeps: [
{
key: 'chooseConnection',
name: 'Choose connection'
name: 'Choose connection',
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
async run($) {

View File

@@ -19,7 +19,7 @@ const extraFields = [
'url_t',
'url_s',
'url_m',
'url_o'
'url_o',
].join(',');
const newAlbums = async ($: IGlobalVariable) => {
@@ -42,17 +42,14 @@ const newAlbums = async ($: IGlobalVariable) => {
pages = photosets.pages;
for (const photoset of photosets.photoset) {
if ($.flow.isAlreadyProcessed(photoset.id) && !$.execution.testRun)
return;
$.pushTriggerItem({
raw: photoset,
meta: {
internalId: photoset.id as string
}
})
internalId: photoset.id as string,
},
});
}
} while (page <= pages && !$.execution.testRun);
} while (page <= pages);
};
export default newAlbums;

View File

@@ -6,16 +6,15 @@ export default defineTrigger({
pollInterval: 15,
key: 'newFavoritePhotos',
description: 'Triggers when you favorite a photo.',
dedupeStrategy: 'unique',
substeps: [
{
key: 'chooseConnection',
name: 'Choose connection'
name: 'Choose connection',
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
async run($) {

View File

@@ -48,17 +48,14 @@ const newPhotos = async ($: IGlobalVariable) => {
pages = photos.pages;
for (const photo of photos.photo) {
if ($.flow.isAlreadyProcessed(photo.date_faved) && !$.execution.testRun)
return;
$.pushTriggerItem({
raw: photo,
meta: {
internalId: photo.date_faved as string
}
})
internalId: photo.date_faved as string,
},
});
}
} while (page <= pages && !$.execution.testRun);
} while (page <= pages);
};
export default newPhotos;

View File

@@ -6,11 +6,10 @@ export default defineTrigger({
pollInterval: 15,
key: 'newPhotosInAlbum',
description: 'Triggers when you add a new photo in an album.',
dedupeStrategy: 'greatest',
substeps: [
{
key: 'chooseConnection',
name: 'Choose connection'
name: 'Choose connection',
},
{
key: 'chooseTrigger',
@@ -28,17 +27,17 @@ export default defineTrigger({
arguments: [
{
name: 'key',
value: 'listAlbums'
}
]
}
}
]
value: 'listAlbums',
},
],
},
},
],
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
async run($) {

View File

@@ -43,17 +43,14 @@ const newPhotosInAlbum = async ($: IGlobalVariable) => {
pages = photoset.pages;
for (const photo of photoset.photo) {
if ($.flow.isAlreadyProcessed(photo.id) && !$.execution.testRun)
return;
$.pushTriggerItem({
raw: photo,
meta: {
internalId: photo.id as string
}
})
internalId: photo.id as string,
},
});
}
} while (page <= pages && !$.execution.testRun);
} while (page <= pages);
};
export default newPhotosInAlbum;

View File

@@ -6,16 +6,15 @@ export default defineTrigger({
pollInterval: 15,
key: 'newPhotos',
description: 'Triggers when you add a new photo.',
dedupeStrategy: 'greatest',
substeps: [
{
key: 'chooseConnection',
name: 'Choose connection'
name: 'Choose connection',
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
async run($) {

View File

@@ -48,17 +48,14 @@ const newPhotos = async ($: IGlobalVariable) => {
pages = photos.pages;
for (const photo of photos.photo) {
if ($.flow.isAlreadyProcessed(photo.id) && !$.execution.testRun)
return;
$.pushTriggerItem({
raw: photo,
meta: {
internalId: photo.id as string
}
})
internalId: photo.id as string,
},
});
}
} while (page <= pages && !$.execution.testRun);
} while (page <= pages);
};
export default newPhotos;

View File

@@ -34,9 +34,6 @@ const newIssues = async ($: IGlobalVariable) => {
for (const issue of response.data) {
const issueId = issue.id;
if (issueId <= Number($.flow.lastInternalId) && !$.execution.testRun)
return;
const dataItem = {
raw: issue,
meta: {
@@ -47,7 +44,7 @@ const newIssues = async ($: IGlobalVariable) => {
$.pushTriggerItem(dataItem);
}
}
} while (links.next && !$.execution.testRun);
} while (links.next);
};
export default newIssues;

View File

@@ -26,12 +26,6 @@ const newPullRequests = async ($: IGlobalVariable) => {
for (const pullRequest of response.data) {
const pullRequestId = pullRequest.id;
if (
pullRequestId <= Number($.flow.lastInternalId) &&
!$.execution.testRun
)
return;
const dataItem = {
raw: pullRequest,
meta: {
@@ -42,7 +36,7 @@ const newPullRequests = async ($: IGlobalVariable) => {
$.pushTriggerItem(dataItem);
}
}
} while (links.next && !$.execution.testRun);
} while (links.next);
};
export default newPullRequests;

View File

@@ -43,10 +43,4 @@ export default defineTrigger({
async run($) {
await newStargazers($);
},
sort(stargazerA, stargazerB) {
return (
Number(stargazerB.meta.internalId) - Number(stargazerA.meta.internalId)
);
},
});

View File

@@ -48,9 +48,6 @@ const newStargazers = async ($: IGlobalVariable) => {
const { starred_at, user } = starEntry;
const timestamp = DateTime.fromISO(starred_at).toMillis();
if (timestamp <= Number($.flow.lastInternalId) && !$.execution.testRun)
return;
const dataItem = {
raw: user,
meta: {
@@ -61,7 +58,7 @@ const newStargazers = async ($: IGlobalVariable) => {
$.pushTriggerItem(dataItem);
}
}
} while (pathname && !$.execution.testRun);
} while (pathname);
};
export default newStargazers;

View File

@@ -5,7 +5,6 @@ export default defineTrigger({
name: 'New watchers',
key: 'newWatchers',
pollInterval: 15,
dedupeStrategy: 'unique',
description: 'Triggers when a user watches a repository',
substeps: [
{
@@ -44,8 +43,4 @@ export default defineTrigger({
async run($) {
await newWatchers($);
},
sort() {
return -1;
},
});

View File

@@ -1,4 +1,4 @@
import { IGlobalVariable, ITriggerOutput } from '@automatisch/types';
import { IGlobalVariable } from '@automatisch/types';
import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo';
import parseLinkHeader from '../../../../helpers/parse-header-link';
@@ -34,9 +34,6 @@ const newWatchers = async ($: IGlobalVariable) => {
for (const watcher of response.data) {
const watcherId = watcher.id.toString();
if ($.flow.isAlreadyProcessed(watcherId) && !$.execution.testRun)
return;
const dataItem = {
raw: watcher,
meta: {
@@ -47,7 +44,7 @@ const newWatchers = async ($: IGlobalVariable) => {
$.pushTriggerItem(dataItem);
}
}
} while (pathname && !$.execution.testRun);
} while (pathname);
};
export default newWatchers;

View File

@@ -1,5 +1,5 @@
import { IGlobalVariable } from '@automatisch/types';
import { XMLParser } from 'fast-xml-parser';[]
import { XMLParser } from 'fast-xml-parser';
const newItemsInFeed = async ($: IGlobalVariable) => {
const { data } = await $.http.get($.step.parameters.feedUrl as string);
@@ -7,15 +7,12 @@ const newItemsInFeed = async ($: IGlobalVariable) => {
const parsedData = parser.parse(data);
for (const item of parsedData.rss.channel.item) {
if ($.flow.isAlreadyProcessed(item.guid))
return;
const dataItem = {
raw: item,
meta: {
internalId: item.guid
}
}
internalId: item.guid,
},
};
$.pushTriggerItem(dataItem);
}

View File

@@ -1,4 +1,4 @@
import { IGlobalVariable, IActionOutput } from '@automatisch/types';
import { IGlobalVariable } from '@automatisch/types';
type FindMessageOptions = {
query: string;

View File

@@ -31,17 +31,13 @@ const getUserFollowers = async (
if (response.data.meta.result_count > 0) {
for (const follower of response.data.data) {
if ($.flow.isAlreadyProcessed(follower.id as string)) {
return;
}
$.pushTriggerItem({
raw: follower,
meta: { internalId: follower.id as string },
});
}
}
} while (response.data.meta.next_token && !$.execution.testRun);
} while (response.data.meta.next_token);
};
export default getUserFollowers;

View File

@@ -16,7 +16,7 @@ const fetchTweets = async ($: IGlobalVariable, username: string) => {
do {
const params: IJSONObject = {
since_id: $.execution.testRun ? null : $.flow.lastInternalId,
since_id: $.flow.lastInternalId,
pagination_token: response?.data?.meta?.next_token,
};
@@ -40,7 +40,7 @@ const fetchTweets = async ($: IGlobalVariable, username: string) => {
$.pushTriggerItem(dataItem);
});
}
} while (response.data.meta.next_token && !$.execution.testRun);
} while (response.data.meta.next_token);
return $.triggerOutput;
};

View File

@@ -20,8 +20,4 @@ export default defineTrigger({
async run($) {
await getUserTweets($, { currentUser: true });
},
sort(tweet, nextTweet) {
return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
},
});

View File

@@ -6,7 +6,6 @@ export default defineTrigger({
key: 'myFollowers',
pollInterval: 15,
description: 'Will be triggered when you have a new follower.',
dedupeStrategy: 'unique',
substeps: [
{
key: 'chooseConnection',

View File

@@ -33,8 +33,4 @@ export default defineTrigger({
async run($) {
await searchTweets($);
},
sort(tweet, nextTweet) {
return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
},
});

View File

@@ -10,7 +10,7 @@ const searchTweets = async ($: IGlobalVariable) => {
do {
const params: IJSONObject = {
query: searchTerm,
since_id: $.execution.testRun ? null : $.flow.lastInternalId,
since_id: $.flow.lastInternalId,
pagination_token: response?.data?.meta?.next_token,
};
@@ -38,7 +38,7 @@ const searchTweets = async ($: IGlobalVariable) => {
$.pushTriggerItem(dataItem);
});
}
} while (response.data.meta.next_token && !$.execution.testRun);
} while (response.data.meta.next_token);
};
export default searchTweets;

View File

@@ -32,8 +32,4 @@ export default defineTrigger({
async run($) {
await getUserTweets($, { currentUser: false });
},
sort(tweet, nextTweet) {
return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
},
});

View File

@@ -0,0 +1,17 @@
import { IJSONObject } from '@automatisch/types';
export default class BaseError extends Error {
error = {};
constructor(error?: string | IJSONObject) {
super();
try {
this.error = JSON.parse(error as string);
} catch {
this.error = typeof error === 'string' ? { error } : error;
}
this.name = this.constructor.name;
}
}

View File

@@ -0,0 +1,3 @@
import BaseError from './base';
export default class EarlyExitError extends BaseError {}

View File

@@ -0,0 +1,3 @@
import BaseError from './base';
export default class HttpError extends BaseError {}

View File

@@ -10,6 +10,7 @@ import {
ITriggerItem,
IActionItem,
} from '@automatisch/types';
import EarlyExitError from '../errors/early-exit';
type GlobalVariableOptions = {
connection?: Connection;
@@ -25,9 +26,7 @@ const globalVariable = async (
): Promise<IGlobalVariable> => {
const { connection, app, flow, step, execution, testRun = false } = options;
const lastInternalId = await flow?.lastInternalId();
const trigger = await step?.getTriggerCommand();
const lastInternalId = testRun ? undefined : await flow?.lastInternalId();
const nextStep = await step?.getNextStep();
const $: IGlobalVariable = {
@@ -74,7 +73,17 @@ const globalVariable = async (
},
},
pushTriggerItem: (triggerItem: ITriggerItem) => {
if (isAlreadyProcessed(triggerItem.meta.internalId) && !$.execution.testRun) {
// early exit as we do not want to process duplicate items in actual executions
throw new EarlyExitError();
}
$.triggerOutput.data.push(triggerItem);
if ($.execution.testRun) {
// early exit after receiving one item as it is enough for test execution
throw new EarlyExitError();
}
},
setActionItem: (actionItem: IActionItem) => {
$.actionOutput.data = actionItem;
@@ -87,27 +96,12 @@ const globalVariable = async (
beforeRequest: app.beforeRequest,
});
if (trigger) {
if (trigger.dedupeStrategy === 'unique') {
const lastInternalIds = testRun ? [] : await flow?.lastInternalIds();
const lastInternalIds =
testRun || (flow && step.isAction) ? [] : await flow?.lastInternalIds(2000);
const isAlreadyProcessed = (internalId: string) => {
if (testRun) return false;
return lastInternalIds?.includes(internalId);
};
$.flow.isAlreadyProcessed = isAlreadyProcessed;
} else if (trigger.dedupeStrategy === 'greatest') {
const isAlreadyProcessed = (internalId: string) => {
if (testRun) return false;
return Number(internalId) <= Number($.flow.lastInternalId);
};
$.flow.isAlreadyProcessed = isAlreadyProcessed;
}
}
const isAlreadyProcessed = (internalId: string) => {
return lastInternalIds?.includes(internalId);
};
return $;
};

View File

@@ -2,6 +2,7 @@ import axios, { AxiosRequestConfig } from 'axios';
export { AxiosInstance as IHttpClient } from 'axios';
import { IHttpClientParams } from '@automatisch/types';
import { URL } from 'url';
import HttpError from '../../errors/http-error';
const removeBaseUrlForAbsoluteUrls = (
requestConfig: AxiosRequestConfig
@@ -39,8 +40,7 @@ export default function createHttpClient({
instance.interceptors.response.use(
(response) => response,
(error) => {
error.response.httpError = error.response.data;
throw error;
throw new HttpError(error.response.data);
}
);

View File

@@ -1,5 +1,7 @@
import Flow from '../models/flow';
import globalVariable from '../helpers/global-variable';
import EarlyExitError from '../errors/early-exit';
import HttpError from '../errors/http-error';
type ProcessFlowOptions = {
flowId: string;
@@ -23,13 +25,15 @@ export const processFlow = async (options: ProcessFlowOptions) => {
try {
await triggerCommand.run($);
} catch (error) {
if (error?.response?.httpError) {
$.triggerOutput.error = error.response.httpError;
} else {
try {
$.triggerOutput.error = JSON.parse(error.message);
} catch {
$.triggerOutput.error = { error: error.message };
if (error instanceof EarlyExitError === false) {
if (error instanceof HttpError) {
$.triggerOutput.error = error.error;
} else {
try {
$.triggerOutput.error = JSON.parse(error.message);
} catch {
$.triggerOutput.error = { error: error.message };
}
}
}
}

View File

@@ -210,7 +210,6 @@ export interface ITrigger {
key: string;
pollInterval?: number;
description: string;
dedupeStrategy?: 'greatest' | 'unique' | 'last';
substeps: ISubstep[];
getInterval?(parameters: IStep['parameters']): string;
run($: IGlobalVariable): Promise<void>;