wip: Restructure twitter integration
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import qs from 'qs';
|
||||
import { IGlobalVariableForConnection } from '../../../helpers/global-variable/connection';
|
||||
import { IGlobalVariableForConnection } from '@automatisch/types';
|
||||
|
||||
const verifyCredentials = async ($: IGlobalVariableForConnection) => {
|
||||
const headers = {
|
||||
|
@@ -7,7 +7,7 @@ export default class MyTweets {
|
||||
this.client = client;
|
||||
}
|
||||
|
||||
async run(lastInternalId: string) {
|
||||
async run() {
|
||||
return this.getTweets(lastInternalId);
|
||||
}
|
||||
|
||||
|
@@ -1,8 +1,12 @@
|
||||
import { IJSONObject, IField } from '@automatisch/types';
|
||||
import oauthClient from '../common/oauth-client';
|
||||
import generateRequest from '../common/generate-request';
|
||||
import {
|
||||
IJSONObject,
|
||||
IField,
|
||||
IGlobalVariableForConnection,
|
||||
} from '@automatisch/types';
|
||||
import { URLSearchParams } from 'url';
|
||||
|
||||
export default async function createAuthData($: any) {
|
||||
export default async function createAuthData($: IGlobalVariableForConnection) {
|
||||
try {
|
||||
const oauthRedirectUrlField = $.app.fields.find(
|
||||
(field: IField) => field.key == 'oAuthRedirectUrl'
|
||||
@@ -10,18 +14,10 @@ export default async function createAuthData($: any) {
|
||||
|
||||
const callbackUrl = oauthRedirectUrlField.value;
|
||||
|
||||
const requestData = {
|
||||
url: `${$.app.baseUrl}/oauth/request_token`,
|
||||
const response = await generateRequest($, {
|
||||
requestPath: '/oauth/request_token',
|
||||
method: 'POST',
|
||||
data: { oauth_callback: callbackUrl },
|
||||
};
|
||||
|
||||
const authHeader = oauthClient($).toHeader(
|
||||
oauthClient($).authorize(requestData)
|
||||
);
|
||||
|
||||
const response = await $.http.post(`/oauth/request_token`, null, {
|
||||
headers: { ...authHeader },
|
||||
});
|
||||
|
||||
const responseData = Object.fromEntries(new URLSearchParams(response.data));
|
||||
|
@@ -1,6 +1,7 @@
|
||||
import { IGlobalVariableForConnection } from '@automatisch/types';
|
||||
import getCurrentUser from '../common/get-current-user';
|
||||
|
||||
const isStillVerified = async ($: any) => {
|
||||
const isStillVerified = async ($: IGlobalVariableForConnection) => {
|
||||
try {
|
||||
await getCurrentUser($);
|
||||
return true;
|
||||
|
@@ -1,7 +1,10 @@
|
||||
const verifyCredentials = async ($: any) => {
|
||||
import { IGlobalVariableForConnection } from '@automatisch/types';
|
||||
import { URLSearchParams } from 'url';
|
||||
|
||||
const verifyCredentials = async ($: IGlobalVariableForConnection) => {
|
||||
try {
|
||||
const response = await $.http.post(
|
||||
`/oauth/access_token?oauth_verifier=${$.auth.oauthVerifier}&oauth_token=${$.auth.accessToken}`,
|
||||
`/oauth/access_token?oauth_verifier=${$.auth.data.oauthVerifier}&oauth_token=${$.auth.data.accessToken}`,
|
||||
null
|
||||
);
|
||||
|
||||
|
@@ -0,0 +1,39 @@
|
||||
import { IGlobalVariableForConnection, IJSONObject } from '@automatisch/types';
|
||||
import oauthClient from './oauth-client';
|
||||
import { Token } from 'oauth-1.0a';
|
||||
|
||||
type IGenereateRequestOptons = {
|
||||
requestPath: string;
|
||||
method: string;
|
||||
data?: IJSONObject;
|
||||
};
|
||||
|
||||
const generateRequest = async (
|
||||
$: IGlobalVariableForConnection,
|
||||
options: IGenereateRequestOptons
|
||||
) => {
|
||||
const { requestPath, method, data } = options;
|
||||
|
||||
const token: Token = {
|
||||
key: $.auth.data.accessToken as string,
|
||||
secret: $.auth.data.accessSecret as string,
|
||||
};
|
||||
|
||||
const requestData = {
|
||||
url: `${$.app.baseUrl}${requestPath}`,
|
||||
method,
|
||||
data,
|
||||
};
|
||||
|
||||
const authHeader = oauthClient($).toHeader(
|
||||
oauthClient($).authorize(requestData, token)
|
||||
);
|
||||
|
||||
const response = await $.http.post(`/oauth/request_token`, null, {
|
||||
headers: { ...authHeader },
|
||||
});
|
||||
|
||||
return response;
|
||||
};
|
||||
|
||||
export default generateRequest;
|
@@ -0,0 +1,14 @@
|
||||
import { IGlobalVariableForConnection } from '@automatisch/types';
|
||||
import generateRequest from './generate-request';
|
||||
|
||||
const getCurrentUser = async ($: IGlobalVariableForConnection) => {
|
||||
const response = await generateRequest($, {
|
||||
requestPath: '/2/users/me',
|
||||
method: 'GET',
|
||||
});
|
||||
|
||||
const currentUser = response.data.data;
|
||||
return currentUser;
|
||||
};
|
||||
|
||||
export default getCurrentUser;
|
@@ -0,0 +1,25 @@
|
||||
import { IGlobalVariableForConnection, IJSONObject } from '@automatisch/types';
|
||||
import generateRequest from './generate-request';
|
||||
|
||||
const getUserByUsername = async (
|
||||
$: IGlobalVariableForConnection,
|
||||
username: string
|
||||
) => {
|
||||
const response = await generateRequest($, {
|
||||
requestPath: `/2/users/by/username/${username}`,
|
||||
method: 'GET',
|
||||
});
|
||||
|
||||
if (response.data.errors) {
|
||||
const errorMessages = response.data.errors
|
||||
.map((error: IJSONObject) => error.detail)
|
||||
.join(' ');
|
||||
|
||||
throw new Error(`Error occured while fetching user data: ${errorMessages}`);
|
||||
}
|
||||
|
||||
const user = response.data.data;
|
||||
return user;
|
||||
};
|
||||
|
||||
export default getUserByUsername;
|
54
packages/backend/src/apps/twitter2/common/get-user-tweets.ts
Normal file
54
packages/backend/src/apps/twitter2/common/get-user-tweets.ts
Normal file
@@ -0,0 +1,54 @@
|
||||
import { IGlobalVariableForConnection, IJSONObject } from '@automatisch/types';
|
||||
import { URLSearchParams } from 'url';
|
||||
import omitBy from 'lodash/omitBy';
|
||||
import isEmpty from 'lodash/isEmpty';
|
||||
import generateRequest from './generate-request';
|
||||
|
||||
const getUserTweets = async (
|
||||
$: IGlobalVariableForConnection,
|
||||
userId: string,
|
||||
lastInternalId?: string
|
||||
) => {
|
||||
let response;
|
||||
const tweets: IJSONObject[] = [];
|
||||
|
||||
do {
|
||||
const params: IJSONObject = {
|
||||
since_id: lastInternalId,
|
||||
pagination_token: response?.data?.meta?.next_token,
|
||||
};
|
||||
|
||||
const queryParams = new URLSearchParams(omitBy(params, isEmpty));
|
||||
|
||||
const requestPath = `/2/users/${userId}/tweets${
|
||||
queryParams.toString() ? `?${queryParams.toString()}` : ''
|
||||
}`;
|
||||
|
||||
response = await generateRequest($, {
|
||||
requestPath,
|
||||
method: 'GET',
|
||||
});
|
||||
|
||||
if (response.data.meta.result_count > 0) {
|
||||
response.data.data.forEach((tweet: IJSONObject) => {
|
||||
if (!lastInternalId || Number(tweet.id) > Number(lastInternalId)) {
|
||||
tweets.push(tweet);
|
||||
} else {
|
||||
return;
|
||||
}
|
||||
});
|
||||
}
|
||||
} while (response.data.meta.next_token && lastInternalId);
|
||||
|
||||
if (response.data.errors) {
|
||||
const errorMessages = response.data.errors
|
||||
.map((error: IJSONObject) => error.detail)
|
||||
.join(' ');
|
||||
|
||||
throw new Error(`Error occured while fetching user data: ${errorMessages}`);
|
||||
}
|
||||
|
||||
return tweets;
|
||||
};
|
||||
|
||||
export default getUserTweets;
|
@@ -1,10 +1,11 @@
|
||||
import { IGlobalVariableForConnection } from '@automatisch/types';
|
||||
import crypto from 'crypto';
|
||||
import OAuth from 'oauth-1.0a';
|
||||
|
||||
const oauthClient = ($: any) => {
|
||||
const oauthClient = ($: IGlobalVariableForConnection) => {
|
||||
const consumerData = {
|
||||
key: $.auth.consumerKey as string,
|
||||
secret: $.auth.consumerSecret as string,
|
||||
key: $.auth.data.consumerKey as string,
|
||||
secret: $.auth.data.consumerSecret as string,
|
||||
};
|
||||
|
||||
return new OAuth({
|
@@ -0,0 +1,37 @@
|
||||
import { IGlobalVariableForConnection } from '@automatisch/types';
|
||||
import getCurrentUser from '../../common/get-current-user';
|
||||
import getUserByUsername from '../../common/get-user-by-username';
|
||||
import getUserTweets from '../../common/get-user-tweets';
|
||||
|
||||
export default {
|
||||
name: 'My Tweets',
|
||||
key: 'myTweets',
|
||||
pollInterval: 15,
|
||||
description: 'Will be triggered when you tweet something new.',
|
||||
substeps: [
|
||||
{
|
||||
key: 'chooseConnection',
|
||||
name: 'Choose connection',
|
||||
},
|
||||
{
|
||||
key: 'testStep',
|
||||
name: 'Test trigger',
|
||||
},
|
||||
],
|
||||
|
||||
async run($: IGlobalVariableForConnection) {
|
||||
return this.getTweets($, await $.db.flow.lastInternalId());
|
||||
},
|
||||
|
||||
async testRun($: IGlobalVariableForConnection) {
|
||||
return this.getTweets($);
|
||||
},
|
||||
|
||||
async getTweets($: IGlobalVariableForConnection, lastInternalId?: string) {
|
||||
const { username } = await getCurrentUser($);
|
||||
const user = await getUserByUsername($, username);
|
||||
|
||||
const tweets = await getUserTweets($, user.id, lastInternalId);
|
||||
return tweets;
|
||||
},
|
||||
};
|
@@ -1,5 +1,6 @@
|
||||
import createHttpClient from '../http-client';
|
||||
import Connection from '../../models/connection';
|
||||
import Flow from '../../models/flow';
|
||||
import {
|
||||
IJSONObject,
|
||||
IApp,
|
||||
@@ -8,7 +9,8 @@ import {
|
||||
|
||||
const prepareGlobalVariableForConnection = (
|
||||
connection: Connection,
|
||||
appData: IApp
|
||||
appData: IApp,
|
||||
flow?: Flow
|
||||
): IGlobalVariableForConnection => {
|
||||
return {
|
||||
auth: {
|
||||
@@ -24,6 +26,9 @@ const prepareGlobalVariableForConnection = (
|
||||
},
|
||||
app: appData,
|
||||
http: createHttpClient({ baseURL: appData.baseUrl }),
|
||||
db: {
|
||||
flow: flow,
|
||||
},
|
||||
};
|
||||
};
|
||||
|
||||
|
@@ -3,14 +3,15 @@ import Base from './base';
|
||||
import Execution from './execution';
|
||||
import Step from './step';
|
||||
import Telemetry from '../helpers/telemetry';
|
||||
import { IJSONObject } from '@automatisch/types';
|
||||
|
||||
class ExecutionStep extends Base {
|
||||
id!: string;
|
||||
executionId!: string;
|
||||
stepId!: string;
|
||||
dataIn!: Record<string, unknown>;
|
||||
dataOut!: Record<string, unknown>;
|
||||
errorDetails: Record<string, unknown>;
|
||||
dataIn!: IJSONObject;
|
||||
dataOut!: IJSONObject;
|
||||
errorDetails: IJSONObject;
|
||||
status = 'failure';
|
||||
step: Step;
|
||||
|
||||
@@ -23,7 +24,7 @@ class ExecutionStep extends Base {
|
||||
id: { type: 'string', format: 'uuid' },
|
||||
executionId: { type: 'string', format: 'uuid' },
|
||||
stepId: { type: 'string' },
|
||||
dataIn: { type: 'object' },
|
||||
dataIn: { type: ['object', 'null'] },
|
||||
dataOut: { type: ['object', 'null'] },
|
||||
status: { type: 'string', enum: ['success', 'failure'] },
|
||||
errorDetails: { type: ['object', 'null'] },
|
||||
|
@@ -10,7 +10,7 @@ class Flow extends Base {
|
||||
name!: string;
|
||||
userId!: string;
|
||||
active: boolean;
|
||||
steps?: [Step];
|
||||
steps: Step[];
|
||||
published_at: string;
|
||||
|
||||
static tableName = 'flows';
|
||||
|
@@ -20,7 +20,7 @@ class Step extends Base {
|
||||
parameters: Record<string, unknown>;
|
||||
connection?: Connection;
|
||||
flow: Flow;
|
||||
executionSteps?: [ExecutionStep];
|
||||
executionSteps: ExecutionStep[];
|
||||
|
||||
static tableName = 'steps';
|
||||
|
||||
|
@@ -10,9 +10,9 @@ class User extends Base {
|
||||
id!: string;
|
||||
email!: string;
|
||||
password!: string;
|
||||
connections?: [Connection];
|
||||
flows?: [Flow];
|
||||
steps?: [Step];
|
||||
connections?: Connection[];
|
||||
flows?: Flow[];
|
||||
steps?: Step[];
|
||||
|
||||
static tableName = 'users';
|
||||
|
||||
|
@@ -112,8 +112,8 @@ class Processor {
|
||||
await execution.$relatedQuery('executionSteps').insertAndFetch({
|
||||
stepId: id,
|
||||
status: 'failure',
|
||||
dataIn: computedParameters,
|
||||
dataOut: null,
|
||||
dataIn: null,
|
||||
dataOut: computedParameters,
|
||||
errorDetails: fetchedActionData.error,
|
||||
});
|
||||
|
||||
|
Reference in New Issue
Block a user