Merge pull request #632 from automatisch/feature/extended-error-handler

Extend error handling logic to capture errors
This commit is contained in:
Ömer Faruk Aydın
2022-10-22 23:11:02 +02:00
committed by GitHub
40 changed files with 254 additions and 609 deletions

View File

@@ -27,25 +27,25 @@ export default defineAction({
arguments: [
{
name: 'key',
value: 'listRepos'
}
]
}
value: 'listRepos',
},
],
},
},
{
label: 'Title',
key: 'title',
type: 'string',
required: true,
variables: true
variables: true,
},
{
label: 'Body',
key: 'body',
type: 'string',
required: true,
variables: true
}
variables: true,
},
],
},
{
@@ -59,7 +59,7 @@ export default defineAction({
const title = $.step.parameters.title as string;
const body = $.step.parameters.body as string;
if (!repoParameter) throw new Error('A repo must be set!')
if (!repoParameter) throw new Error('A repo must be set!');
if (!title) throw new Error('A title must be set!');
const { repoOwner, repo } = getRepoOwnerAndRepo(repoParameter);
@@ -68,13 +68,6 @@ export default defineAction({
body,
});
const issue: IActionOutput = {
data: {
raw: response.data,
},
error: response?.integrationError,
};
return issue;
$.setActionItem({ raw: response.data });
},
});

View File

@@ -1,13 +1,16 @@
import { IGlobalVariable, IJSONObject } from "@automatisch/types";
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import type { AxiosResponse } from 'axios';
import parseLinkHeader from '../../../helpers/parse-header-link';
type TResponse = {
data: IJSONObject[],
error?: IJSONObject,
}
data: IJSONObject[];
error?: IJSONObject;
};
export default async function paginateAll($: IGlobalVariable, request: Promise<AxiosResponse>) {
export default async function paginateAll(
$: IGlobalVariable,
request: Promise<AxiosResponse>
) {
const response = await request;
const aggregatedResponse: TResponse = {
data: [...response.data],
@@ -21,15 +24,8 @@ export default async function paginateAll($: IGlobalVariable, request: Promise<A
url: links.next.uri,
});
if (nextPageResponse.integrationError) {
aggregatedResponse.error = nextPageResponse.integrationError;
links = null;
} else {
aggregatedResponse.data.push(...nextPageResponse.data);
links = parseLinkHeader(nextPageResponse.headers.link);
}
aggregatedResponse.data.push(...nextPageResponse.data);
links = parseLinkHeader(nextPageResponse.headers.link);
}
return aggregatedResponse;

View File

@@ -9,7 +9,7 @@ export default defineTrigger({
substeps: [
{
key: 'chooseConnection',
name: 'Choose connection'
name: 'Choose connection',
},
{
key: 'chooseTrigger',
@@ -27,10 +27,10 @@ export default defineTrigger({
arguments: [
{
name: 'key',
value: 'listRepos'
}
]
}
value: 'listRepos',
},
],
},
},
{
label: 'Which types of issues should this trigger on?',
@@ -43,25 +43,25 @@ export default defineTrigger({
options: [
{
label: 'Any issue you can see',
value: 'all'
value: 'all',
},
{
label: 'Only issues assigned to you',
value: 'assigned'
value: 'assigned',
},
{
label: 'Only issues created by you',
value: 'created'
value: 'created',
},
{
label: `Only issues you're mentioned in`,
value: 'mentioned'
value: 'mentioned',
},
{
label: `Only issues you're subscribed to`,
value: 'subscribed'
}
]
value: 'subscribed',
},
],
},
{
label: 'Label',
@@ -77,24 +77,24 @@ export default defineTrigger({
arguments: [
{
name: 'key',
value: 'listLabels'
value: 'listLabels',
},
{
name: 'parameters.repo',
value: '{parameters.repo}'
}
]
}
}
]
value: '{parameters.repo}',
},
],
},
},
],
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
async run($) {
return await newIssues($);
await newIssues($);
},
});

View File

@@ -1,12 +1,11 @@
import {
IGlobalVariable,
ITriggerOutput,
} from '@automatisch/types';
import { IGlobalVariable } from '@automatisch/types';
import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo';
import parseLinkHeader from '../../../../helpers/parse-header-link';
function getPathname($: IGlobalVariable) {
const { repoOwner, repo } = getRepoOwnerAndRepo($.step.parameters.repo as string);
const { repoOwner, repo } = getRepoOwnerAndRepo(
$.step.parameters.repo as string
);
if (repoOwner && repo) {
return `/repos/${repoOwner}/${repo}/issues`;
@@ -26,25 +25,17 @@ const newIssues = async ($: IGlobalVariable) => {
per_page: 100,
};
const issues: ITriggerOutput = {
data: [],
};
let links;
do {
const response = await $.http.get(pathname, { params });
links = parseLinkHeader(response.headers.link);
if (response.integrationError) {
issues.error = response.integrationError;
return issues;
}
if (response.data.length) {
for (const issue of response.data) {
const issueId = issue.id;
if (issueId <= Number($.flow.lastInternalId) && !$.execution.testRun) return issues;
if (issueId <= Number($.flow.lastInternalId) && !$.execution.testRun)
return;
const dataItem = {
raw: issue,
@@ -53,12 +44,10 @@ const newIssues = async ($: IGlobalVariable) => {
},
};
issues.data.push(dataItem);
$.triggerOutput.data.push(dataItem);
}
}
} while (links.next && !$.execution.testRun);
return issues;
};
export default newIssues;

View File

@@ -9,7 +9,7 @@ export default defineTrigger({
substeps: [
{
key: 'chooseConnection',
name: 'Choose connection'
name: 'Choose connection',
},
{
key: 'chooseTrigger',
@@ -27,20 +27,20 @@ export default defineTrigger({
arguments: [
{
name: 'key',
value: 'listRepos'
}
]
}
}
]
value: 'listRepos',
},
],
},
},
],
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
async run($) {
return await newPullRequests($);
await newPullRequests($);
},
});

View File

@@ -1,11 +1,8 @@
import {
IGlobalVariable,
ITriggerOutput,
} from '@automatisch/types';
import { IGlobalVariable } from '@automatisch/types';
import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo';
import parseLinkHeader from '../../../../helpers/parse-header-link';
const fetchPullRequests = async ($: IGlobalVariable) => {
const newPullRequests = async ($: IGlobalVariable) => {
const repoParameter = $.step.parameters.repo as string;
if (!repoParameter) throw new Error('A repo must be set!');
@@ -20,25 +17,20 @@ const fetchPullRequests = async ($: IGlobalVariable) => {
per_page: 100,
};
const pullRequests: ITriggerOutput = {
data: [],
};
let links;
do {
const response = await $.http.get(pathname, { params });
links = parseLinkHeader(response.headers.link);
if (response.integrationError) {
pullRequests.error = response.integrationError;
return pullRequests;
}
if (response.data.length) {
for (const pullRequest of response.data) {
const pullRequestId = pullRequest.id;
if (pullRequestId <= Number($.flow.lastInternalId) && !$.execution.testRun) return pullRequests;
if (
pullRequestId <= Number($.flow.lastInternalId) &&
!$.execution.testRun
)
return;
const dataItem = {
raw: pullRequest,
@@ -47,20 +39,10 @@ const fetchPullRequests = async ($: IGlobalVariable) => {
},
};
pullRequests.data.push(dataItem);
$.pushTriggerItem(dataItem);
}
}
} while (links.next && !$.execution.testRun);
return pullRequests;
}
const newPullRequests = async ($: IGlobalVariable) => {
const pullRequests = await fetchPullRequests($);
pullRequests.data.reverse();
return pullRequests;
};
export default newPullRequests;

View File

@@ -9,7 +9,7 @@ export default defineTrigger({
substeps: [
{
key: 'chooseConnection',
name: 'Choose connection'
name: 'Choose connection',
},
{
key: 'chooseTrigger',
@@ -27,20 +27,26 @@ export default defineTrigger({
arguments: [
{
name: 'key',
value: 'listRepos'
}
]
}
value: 'listRepos',
},
],
},
},
]
],
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
async run($) {
return await newStargazers($);
await newStargazers($);
},
sort(stargazerA, stargazerB) {
return (
Number(stargazerB.meta.internalId) - Number(stargazerA.meta.internalId)
);
},
});

View File

@@ -1,19 +1,17 @@
import { DateTime } from 'luxon';
import {
IGlobalVariable,
IJSONObject,
ITriggerOutput,
} from '@automatisch/types';
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo';
import parseLinkHeader from '../../../../helpers/parse-header-link';
type TResponseDataItem = {
starred_at: string;
user: IJSONObject;
}
};
const fetchStargazers = async ($: IGlobalVariable) => {
const { repoOwner, repo } = getRepoOwnerAndRepo($.step.parameters.repo as string);
const newStargazers = async ($: IGlobalVariable) => {
const { repoOwner, repo } = getRepoOwnerAndRepo(
$.step.parameters.repo as string
);
const firstPagePathname = `/repos/${repoOwner}/${repo}/stargazers`;
const requestConfig = {
params: {
@@ -22,35 +20,33 @@ const fetchStargazers = async ($: IGlobalVariable) => {
headers: {
// needed to get `starred_at` time
Accept: 'application/vnd.github.star+json',
}
}
},
};
const firstPageResponse = await $.http.get<TResponseDataItem[]>(firstPagePathname, requestConfig);
const firstPageResponse = await $.http.get<TResponseDataItem[]>(
firstPagePathname,
requestConfig
);
const firstPageLinks = parseLinkHeader(firstPageResponse.headers.link);
// in case there is only single page to fetch
let pathname = firstPageLinks.last?.uri || firstPagePathname;
const stargazers: ITriggerOutput = {
data: [],
};
do {
const response = await $.http.get<TResponseDataItem[]>(pathname, requestConfig);
const response = await $.http.get<TResponseDataItem[]>(
pathname,
requestConfig
);
const links = parseLinkHeader(response.headers.link);
pathname = links.prev?.uri;
if (response.integrationError) {
stargazers.error = response.integrationError;
return stargazers;
}
if (response.data.length) {
for (const starEntry of response.data) {
const { starred_at, user } = starEntry;
const timestamp = DateTime.fromISO(starred_at).toMillis();
if (timestamp <= Number($.flow.lastInternalId) && !$.execution.testRun) return stargazers;
if (timestamp <= Number($.flow.lastInternalId) && !$.execution.testRun)
return;
const dataItem = {
raw: user,
@@ -59,22 +55,10 @@ const fetchStargazers = async ($: IGlobalVariable) => {
},
};
stargazers.data.push(dataItem);
$.triggerOutput.data.push(dataItem);
}
}
} while (pathname && !$.execution.testRun);
return stargazers;
}
const newStargazers = async ($: IGlobalVariable) => {
const stargazers = await fetchStargazers($);
stargazers.data.sort((stargazerA, stargazerB) => {
return Number(stargazerA.meta.internalId) - Number(stargazerB.meta.internalId);
});
return stargazers;
};
export default newStargazers;

View File

@@ -10,7 +10,7 @@ export default defineTrigger({
substeps: [
{
key: 'chooseConnection',
name: 'Choose connection'
name: 'Choose connection',
},
{
key: 'chooseTrigger',
@@ -28,20 +28,24 @@ export default defineTrigger({
arguments: [
{
name: 'key',
value: 'listRepos'
}
]
}
value: 'listRepos',
},
],
},
},
]
],
},
{
key: 'testStep',
name: 'Test trigger'
}
name: 'Test trigger',
},
],
async run($) {
return await newWatchers($);
await newWatchers($);
},
sort() {
return -1;
},
});

View File

@@ -1,11 +1,8 @@
import {
IGlobalVariable,
ITriggerOutput,
} from '@automatisch/types';
import { IGlobalVariable, ITriggerOutput } from '@automatisch/types';
import getRepoOwnerAndRepo from '../../common/get-repo-owner-and-repo';
import parseLinkHeader from '../../../../helpers/parse-header-link';
const fetchWatchers = async ($: IGlobalVariable) => {
const newWatchers = async ($: IGlobalVariable) => {
const repoParameter = $.step.parameters.repo as string;
if (!repoParameter) throw new Error('A repo must be set!');
@@ -15,9 +12,9 @@ const fetchWatchers = async ($: IGlobalVariable) => {
const firstPagePathname = `/repos/${repoOwner}/${repo}/subscribers`;
const requestConfig = {
params: {
per_page: 100
per_page: 100,
},
}
};
const firstPageResponse = await $.http.get(firstPagePathname, requestConfig);
const firstPageLinks = parseLinkHeader(firstPageResponse.headers.link);
@@ -25,20 +22,11 @@ const fetchWatchers = async ($: IGlobalVariable) => {
// in case there is only single page to fetch
let pathname = firstPageLinks.last?.uri || firstPagePathname;
const watchers: ITriggerOutput = {
data: [],
};
do {
const response = await $.http.get(pathname, requestConfig);
const links = parseLinkHeader(response.headers.link);
pathname = links.prev?.uri;
if (response.integrationError) {
watchers.error = response.integrationError;
return watchers;
}
if (response.data.length) {
// to iterate reverse-chronologically
response.data.reverse();
@@ -46,7 +34,8 @@ const fetchWatchers = async ($: IGlobalVariable) => {
for (const watcher of response.data) {
const watcherId = watcher.id.toString();
if ($.flow.isAlreadyProcessed(watcherId) && !$.execution.testRun) return watchers;
if ($.flow.isAlreadyProcessed(watcherId) && !$.execution.testRun)
return;
const dataItem = {
raw: watcher,
@@ -55,21 +44,10 @@ const fetchWatchers = async ($: IGlobalVariable) => {
},
};
watchers.data.push(dataItem);
$.pushTriggerItem(dataItem);
}
}
} while (pathname && !$.execution.testRun === false);
return watchers;
}
const newWatchers = async ($: IGlobalVariable) => {
const watchers = await fetchWatchers($);
// to process chronologically
watchers.data.reverse();
return watchers;
};
export default newWatchers;

File diff suppressed because one or more lines are too long

Before

Width:  |  Height:  |  Size: 13 KiB

View File

@@ -1,39 +0,0 @@
import type { IAuthentication, IApp, IJSONObject } from '@automatisch/types';
import { Client } from 'pg';
export default class Authentication implements IAuthentication {
appData: IApp;
connectionData: IJSONObject;
client: Client;
constructor(appData: IApp, connectionData: IJSONObject) {
this.client = new Client({
host: connectionData.host as string,
port: connectionData.port as number,
database: connectionData.database as string,
user: connectionData.username as string,
password: connectionData.password as string,
ssl: connectionData.ssl as boolean,
});
this.connectionData = connectionData;
this.appData = appData;
}
async verifyCredentials() {
await this.client.connect();
return {
screenName: this.connectionData.database,
};
}
async isStillVerified() {
try {
await this.client.connect();
return true;
} catch (error) {
return false;
}
}
}

View File

@@ -1,15 +0,0 @@
import Authentication from './authentication';
import {
IService,
IAuthentication,
IApp,
IJSONObject,
} from '@automatisch/types';
export default class PostgreSQL implements IService {
authenticationClient: IAuthentication;
constructor(appData: IApp, connectionData: IJSONObject) {
this.authenticationClient = new Authentication(appData, connectionData);
}
}

View File

@@ -1,201 +0,0 @@
{
"name": "PostgreSQL",
"key": "postgresql",
"iconUrl": "{BASE_URL}/apps/postgresql/assets/favicon.svg",
"docUrl": "https://automatisch.io/docs/postgresql",
"primaryColor": "2DAAE1",
"supportsConnections": true,
"fields": [
{
"key": "host",
"label": "Host",
"type": "string",
"required": true,
"readOnly": false,
"value": null,
"placeholder": null,
"description": "The host information Automatisch will connect to.",
"docUrl": "https://automatisch.io/docs/postgresql#host",
"clickToCopy": false
},
{
"key": "port",
"label": "Port",
"type": "integer",
"required": true,
"readOnly": false,
"value": 5432,
"placeholder": null,
"description": null,
"docUrl": "https://automatisch.io/docs/postgresql#port",
"clickToCopy": false
},
{
"key": "database",
"label": "Database",
"type": "string",
"required": true,
"readOnly": false,
"value": null,
"placeholder": null,
"description": "The name of the database.",
"docUrl": "https://automatisch.io/docs/postgresql#password",
"clickToCopy": false
},
{
"key": "username",
"label": "Username",
"type": "string",
"required": true,
"readOnly": false,
"value": null,
"placeholder": null,
"description": null,
"docUrl": "https://automatisch.io/docs/postgresql#username",
"clickToCopy": false
},
{
"key": "password",
"label": "Password",
"type": "string",
"required": false,
"readOnly": false,
"value": null,
"placeholder": null,
"description": null,
"docUrl": "https://automatisch.io/docs/postgresql#password",
"clickToCopy": false
},
{
"key": "ssl",
"label": "Use SSL?",
"type": "boolean",
"required": true,
"readOnly": false,
"value": false,
"placeholder": null,
"description": null,
"docUrl": "https://automatisch.io/docs/postgresql#ssl",
"clickToCopy": false
}
],
"authenticationSteps": [
{
"step": 1,
"type": "mutation",
"name": "createConnection",
"arguments": [
{
"name": "key",
"value": "{key}"
},
{
"name": "formattedData",
"value": null,
"properties": [
{
"name": "host",
"value": "{fields.host}"
},
{
"name": "port",
"value": "{fields.port}"
},
{
"name": "database",
"value": "{fields.database}"
},
{
"name": "username",
"value": "{fields.username}"
},
{
"name": "password",
"value": "{fields.password}"
},
{
"name": "ssl",
"value": "{fields.ssl}"
}
]
}
]
},
{
"step": 2,
"type": "mutation",
"name": "verifyConnection",
"arguments": [
{
"name": "id",
"value": "{createConnection.id}"
}
]
}
],
"reconnectionSteps": [
{
"step": 1,
"type": "mutation",
"name": "resetConnection",
"arguments": [
{
"name": "id",
"value": "{connection.id}"
}
]
},
{
"step": 2,
"type": "mutation",
"name": "updateConnection",
"arguments": [
{
"name": "id",
"value": "{connection.id}"
},
{
"name": "formattedData",
"value": null,
"properties": [
{
"name": "host",
"value": "{fields.host}"
},
{
"name": "port",
"value": "{fields.port}"
},
{
"name": "database",
"value": "{fields.database}"
},
{
"name": "username",
"value": "{fields.username}"
},
{
"name": "password",
"value": "{fields.password}"
},
{
"name": "ssl",
"value": "{fields.ssl}"
}
]
}
]
},
{
"step": 3,
"type": "mutation",
"name": "verifyConnection",
"arguments": [
{
"name": "id",
"value": "{connection.id}"
}
]
}
]
}

View File

@@ -171,6 +171,6 @@ export default defineTrigger({
},
};
return { data: [dataItem] };
$.triggerOutput.data.push(dataItem);
},
});

View File

@@ -65,6 +65,6 @@ export default defineTrigger({
},
};
return { data: [dataItem] };
$.triggerOutput.data.push(dataItem);
},
});

View File

@@ -287,6 +287,6 @@ export default defineTrigger({
},
};
return { data: [dataItem] };
$.triggerOutput.data.push(dataItem);
},
});

View File

@@ -191,6 +191,6 @@ export default defineTrigger({
},
};
return { data: [dataItem] };
$.triggerOutput.data.push(dataItem);
},
});

View File

@@ -21,14 +21,11 @@ const findMessage = async ($: IGlobalVariable, options: FindMessageOptions) => {
const data = response.data;
const message: IActionOutput = {
data: {
raw: data?.messages.matches[0],
},
error: response?.integrationError || (!data.ok && data),
};
if (!data.ok && data) {
throw new Error(JSON.stringify(response.data));
}
return message;
$.actionOutput.data.raw = data?.messages.matches[0];
};
export default findMessage;

View File

@@ -1,4 +1,4 @@
import { IGlobalVariable, IActionOutput } from '@automatisch/types';
import { IGlobalVariable } from '@automatisch/types';
const postMessage = async (
$: IGlobalVariable,
@@ -12,18 +12,15 @@ const postMessage = async (
const response = await $.http.post('/chat.postMessage', params);
const message: IActionOutput = {
data: {
raw: response?.data?.message,
},
error: response?.integrationError,
};
if (response.data.ok === false) {
message.error = response.data;
throw new Error(JSON.stringify(response.data));
}
return message;
const message = {
raw: response?.data?.message,
};
$.setActionItem(message);
};
export default postMessage;

View File

@@ -15,14 +15,8 @@ export default {
const response = await $.http.get('/conversations.list');
if (response.integrationError) {
channels.error = response.integrationError;
return channels;
}
if (response.data.ok === false) {
channels.error = response.data;
return channels;
throw new Error(response.data);
}
channels.data = response.data.channels.map((channel: IJSONObject) => {

View File

@@ -36,13 +36,6 @@ export default defineAction({
text,
});
const tweet: IActionOutput = {
data: {
raw: response.data,
},
error: response?.integrationError,
};
return tweet;
$.actionOutput.data.raw = response.data;
},
});

View File

@@ -1,8 +1,4 @@
import {
IGlobalVariable,
IJSONObject,
ITriggerOutput,
} from '@automatisch/types';
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import { URLSearchParams } from 'url';
import { omitBy, isEmpty } from 'lodash';
@@ -16,10 +12,6 @@ const getUserFollowers = async (
) => {
let response;
const followers: ITriggerOutput = {
data: [],
};
do {
const params: IJSONObject = {
pagination_token: response?.data?.meta?.next_token,
@@ -33,31 +25,23 @@ const getUserFollowers = async (
response = await $.http.get(requestPath);
if (response.integrationError) {
followers.error = response.integrationError;
return followers;
}
if (response.data?.errors) {
followers.error = response.data.errors;
return followers;
throw new Error(response.data.errors);
}
if (response.data.meta.result_count > 0) {
for (const follower of response.data.data) {
if ($.flow.isAlreadyProcessed(follower.id as string)) {
return followers;
return;
}
followers.data.push({
$.pushTriggerItem({
raw: follower,
meta: { internalId: follower.id as string },
});
}
}
} while (response.data.meta.next_token && !$.execution.testRun);
return followers;
};
export default getUserFollowers;

View File

@@ -1,8 +1,4 @@
import {
IGlobalVariable,
IJSONObject,
ITriggerOutput,
} from '@automatisch/types';
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import { URLSearchParams } from 'url';
import omitBy from 'lodash/omitBy';
import isEmpty from 'lodash/isEmpty';
@@ -18,10 +14,6 @@ const fetchTweets = async ($: IGlobalVariable, username: string) => {
let response;
const tweets: ITriggerOutput = {
data: [],
};
do {
const params: IJSONObject = {
since_id: $.execution.testRun ? null : $.flow.lastInternalId,
@@ -36,14 +28,9 @@ const fetchTweets = async ($: IGlobalVariable, username: string) => {
response = await $.http.get(requestPath);
if (response.integrationError) {
tweets.error = response.integrationError;
return tweets;
}
if (response.data.meta.result_count > 0) {
response.data.data.forEach((tweet: IJSONObject) => {
tweets.data.push({
$.triggerOutput.data.push({
raw: tweet,
meta: {
internalId: tweet.id as string,
@@ -53,7 +40,7 @@ const fetchTweets = async ($: IGlobalVariable, username: string) => {
}
} while (response.data.meta.next_token && !$.execution.testRun);
return tweets;
return $.triggerOutput;
};
const getUserTweets = async (
@@ -69,13 +56,7 @@ const getUserTweets = async (
username = $.step.parameters.username as string;
}
const tweets = await fetchTweets($, username);
tweets.data.sort((tweet, nextTweet) => {
return Number(tweet.meta.internalId) - Number(nextTweet.meta.internalId);
});
return tweets;
await fetchTweets($, username);
};
export default getUserTweets;

View File

@@ -18,8 +18,10 @@ export default defineTrigger({
],
async run($) {
return await getUserTweets($, {
currentUser: true,
});
await getUserTweets($, { currentUser: true });
},
sort(tweet, nextTweet) {
return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
},
});

View File

@@ -19,6 +19,6 @@ export default defineTrigger({
],
async run($) {
return await myFollowers($);
await myFollowers($);
},
});

View File

@@ -31,6 +31,10 @@ export default defineTrigger({
],
async run($) {
return await searchTweets($);
await searchTweets($);
},
sort(tweet, nextTweet) {
return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
},
});

View File

@@ -1,20 +1,12 @@
import {
IGlobalVariable,
IJSONObject,
ITriggerOutput,
} from '@automatisch/types';
import { IGlobalVariable, IJSONObject } from '@automatisch/types';
import qs from 'qs';
import { omitBy, isEmpty } from 'lodash';
const fetchTweets = async ($: IGlobalVariable) => {
const searchTweets = async ($: IGlobalVariable) => {
const searchTerm = $.step.parameters.searchTerm as string;
let response;
const tweets: ITriggerOutput = {
data: [],
};
do {
const params: IJSONObject = {
query: searchTerm,
@@ -30,14 +22,8 @@ const fetchTweets = async ($: IGlobalVariable) => {
response = await $.http.get(requestPath);
if (response.integrationError) {
tweets.error = response.integrationError;
return tweets;
}
if (response.data.errors) {
tweets.error = response.data.errors;
return tweets;
throw new Error(JSON.stringify(response.data.errors));
}
if (response.data.meta.result_count > 0) {
@@ -49,22 +35,10 @@ const fetchTweets = async ($: IGlobalVariable) => {
},
};
tweets.data.push(dataItem);
$.triggerOutput.data.push(dataItem);
});
}
} while (response.data.meta.next_token && !$.execution.testRun);
return tweets;
};
const searchTweets = async ($: IGlobalVariable) => {
const tweets = await fetchTweets($);
tweets.data.sort((tweet, nextTweet) => {
return Number(tweet.meta.internalId) - Number(nextTweet.meta.internalId);
});
return tweets;
};
export default searchTweets;

View File

@@ -30,8 +30,10 @@ export default defineTrigger({
],
async run($) {
return await getUserTweets($, {
currentUser: false,
});
await getUserTweets($, { currentUser: false });
},
sort(tweet, nextTweet) {
return Number(nextTweet.meta.internalId) - Number(tweet.meta.internalId);
},
});

View File

@@ -3,7 +3,13 @@ import Connection from '../models/connection';
import Flow from '../models/flow';
import Step from '../models/step';
import Execution from '../models/execution';
import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types';
import {
IJSONObject,
IApp,
IGlobalVariable,
ITriggerItem,
IActionItem,
} from '@automatisch/types';
type GlobalVariableOptions = {
connection?: Connection;
@@ -59,6 +65,20 @@ const globalVariable = async (
id: execution?.id,
testRun,
},
triggerOutput: {
data: [],
},
actionOutput: {
data: {
raw: null,
},
},
pushTriggerItem: (triggerItem: ITriggerItem) => {
$.triggerOutput.data.push(triggerItem);
},
setActionItem: (actionItem: IActionItem) => {
$.actionOutput.data = actionItem;
},
};
$.http = createHttpClient({

View File

@@ -39,8 +39,8 @@ export default function createHttpClient({
instance.interceptors.response.use(
(response) => response,
(error) => {
error.response.integrationError = error.response.data;
return error.response;
error.response.httpError = error.response.data;
throw error;
}
);

View File

@@ -10,12 +10,7 @@ class App {
// Temporaryly restrict the apps we expose until
// their actions/triggers are implemented!
static temporaryList = [
'github',
'scheduler',
'slack',
'twitter',
];
static temporaryList = ['github', 'scheduler', 'slack', 'twitter'];
static async findAll(name?: string, stripFuncs = true): Promise<IApp[]> {
if (!name)

View File

@@ -39,16 +39,29 @@ export const processAction = async (options: ProcessActionOptions) => {
const actionCommand = await step.getActionCommand();
$.step.parameters = computedParameters;
const actionOutput = await actionCommand.run($);
try {
await actionCommand.run($);
} catch (error) {
if (error?.response?.httpError) {
$.actionOutput.error = error.response.httpError;
} else {
try {
$.actionOutput.error = JSON.parse(error.message);
} catch {
$.actionOutput.error = { error: error.message };
}
}
}
const executionStep = await execution
.$relatedQuery('executionSteps')
.insertAndFetch({
stepId: $.step.id,
status: actionOutput.error ? 'failure' : 'success',
status: $.actionOutput.error ? 'failure' : 'success',
dataIn: computedParameters,
dataOut: actionOutput.error ? null : actionOutput.data?.raw,
errorDetails: actionOutput.error ? actionOutput.error : null,
dataOut: $.actionOutput.error ? null : $.actionOutput.data?.raw,
errorDetails: $.actionOutput.error ? $.actionOutput.error : null,
});
return { flowId, stepId, executionId, executionStep };

View File

@@ -20,5 +20,23 @@ export const processFlow = async (options: ProcessFlowOptions) => {
testRun: options.testRun,
});
return await triggerCommand.run($);
try {
await triggerCommand.run($);
} catch (error) {
if (error?.response?.httpError) {
$.triggerOutput.error = error.response.httpError;
} else {
try {
$.triggerOutput.error = JSON.parse(error.message);
} catch {
$.triggerOutput.error = { error: error.message };
}
}
}
if (triggerCommand?.sort) {
$.triggerOutput.data.sort(triggerCommand.sort);
}
return $.triggerOutput;
};

View File

@@ -35,13 +35,13 @@ const testRun = async (options: TestRunOptions) => {
return { executionStep: triggerExecutionStepWithError };
}
const firstTriggerDataItem = data[0];
const firstTriggerItem = data[0];
const { executionId, executionStep: triggerExecutionStep } =
await processTrigger({
flowId: flow.id,
stepId: triggerStep.id,
triggerDataItem: firstTriggerDataItem,
triggerItem: firstTriggerItem,
testRun: true,
});

View File

@@ -1,4 +1,4 @@
import { IJSONObject, ITriggerDataItem } from '@automatisch/types';
import { IJSONObject, ITriggerItem } from '@automatisch/types';
import Step from '../models/step';
import Flow from '../models/flow';
import Execution from '../models/execution';
@@ -7,13 +7,13 @@ import globalVariable from '../helpers/global-variable';
type ProcessTriggerOptions = {
flowId: string;
stepId: string;
triggerDataItem?: ITriggerDataItem;
triggerItem?: ITriggerItem;
error?: IJSONObject;
testRun?: boolean;
};
export const processTrigger = async (options: ProcessTriggerOptions) => {
const { flowId, stepId, triggerDataItem, error, testRun } = options;
const { flowId, stepId, triggerItem, error, testRun } = options;
const step = await Step.query().findById(stepId).throwIfNotFound();
@@ -29,7 +29,7 @@ export const processTrigger = async (options: ProcessTriggerOptions) => {
const execution = await Execution.query().insert({
flowId: $.flow.id,
testRun,
internalId: triggerDataItem?.meta.internalId,
internalId: triggerItem?.meta.internalId,
});
const executionStep = await execution
@@ -38,7 +38,7 @@ export const processTrigger = async (options: ProcessTriggerOptions) => {
stepId: $.step.id,
status: error ? 'failure' : 'success',
dataIn: $.step.parameters,
dataOut: !error ? triggerDataItem?.raw : null,
dataOut: !error ? triggerItem?.raw : null,
errorDetails: error,
});

View File

@@ -15,13 +15,15 @@ export const worker = new Worker(
const { data, error } = await processFlow({ flowId });
for (const triggerDataItem of data) {
const jobName = `${triggerStep.id}-${triggerDataItem.meta.internalId}`;
const reversedData = data.reverse();
for (const triggerItem of reversedData) {
const jobName = `${triggerStep.id}-${triggerItem.meta.internalId}`;
const jobPayload = {
flowId,
stepId: triggerStep.id,
triggerDataItem,
triggerItem,
};
await triggerQueue.add(jobName, jobPayload);

View File

@@ -1,7 +1,7 @@
import { Worker } from 'bullmq';
import redisConfig from '../config/redis';
import logger from '../helpers/logger';
import { IJSONObject, ITriggerDataItem } from '@automatisch/types';
import { IJSONObject, ITriggerItem } from '@automatisch/types';
import actionQueue from '../queues/action';
import Step from '../models/step';
import { processTrigger } from '../services/trigger';
@@ -9,7 +9,7 @@ import { processTrigger } from '../services/trigger';
type JobData = {
flowId: string;
stepId: string;
triggerDataItem?: ITriggerDataItem;
triggerItem?: ITriggerItem;
error?: IJSONObject;
};

View File

@@ -171,7 +171,7 @@ export interface IApp {
export type TBeforeRequest = {
($: IGlobalVariable, requestConfig: AxiosRequestConfig): AxiosRequestConfig;
}
};
export interface IData {
[index: string]: any;
@@ -194,11 +194,11 @@ export interface IService {
}
export interface ITriggerOutput {
data: ITriggerDataItem[];
data: ITriggerItem[];
error?: IJSONObject;
}
export interface ITriggerDataItem {
export interface ITriggerItem {
raw: IJSONObject;
meta: {
internalId: string;
@@ -212,19 +212,18 @@ export interface ITrigger {
description: string;
dedupeStrategy?: 'greatest' | 'unique' | 'last';
substeps: ISubstep[];
getInterval?(parameters: IGlobalVariable['step']['parameters']): string;
run($: IGlobalVariable): Promise<ITriggerOutput>;
getInterval?(parameters: IStep['parameters']): string;
run($: IGlobalVariable): Promise<void>;
sort?(item: ITriggerItem, nextItem: ITriggerItem): number;
}
export interface IActionOutput {
data: IActionDataItem;
data: IActionItem;
error?: IJSONObject;
}
export interface IActionDataItem {
raw: {
data?: IJSONObject;
};
export interface IActionItem {
raw: IJSONObject;
}
export interface IAction {
@@ -232,7 +231,7 @@ export interface IAction {
key: string;
description: string;
substeps: ISubstep[];
run($: IGlobalVariable): Promise<IActionOutput>;
run($: IGlobalVariable): Promise<void>;
}
export interface IAuthentication {
@@ -279,11 +278,14 @@ export type IGlobalVariable = {
id: string;
testRun: boolean;
};
process?: (triggerDataItem: ITriggerDataItem) => Promise<void>;
triggerOutput?: ITriggerOutput;
actionOutput?: IActionOutput;
pushTriggerItem?: (triggerItem: ITriggerItem) => void;
setActionItem?: (actionItem: IActionItem) => void;
};
declare module 'axios' {
interface AxiosResponse {
integrationError?: IJSONObject;
httpError?: IJSONObject;
}
}