From 5ddb5ab6fa145d984fd03fd9a93566ab88049384 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Mon, 29 Aug 2022 23:11:28 +0300 Subject: [PATCH 1/5] feat: Implement draft version of pagination with user tweet trigger --- .../client/endpoints/get-user-by-username.ts | 2 +- .../client/endpoints/get-user-tweets.ts | 55 ++++++++++++++----- .../src/apps/twitter/triggers/user-tweet.ts | 15 ++--- ...823171017_add_internal_id_to_executions.ts | 13 +++++ packages/backend/src/models/execution.ts | 2 + packages/backend/src/models/flow.ts | 8 +++ packages/backend/src/services/processor.ts | 31 +++++++---- packages/types/index.d.ts | 1 + 8 files changed, 92 insertions(+), 35 deletions(-) create mode 100644 packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts b/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts index e21e2fda..78c8da2d 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts @@ -39,6 +39,6 @@ export default class GetUserByUsername { ); } - return response; + return response.data.data; } } diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts b/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts index 7bde7466..c9619dfb 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts @@ -1,5 +1,8 @@ import { IJSONObject } from '@automatisch/types'; +import { URLSearchParams } from 'url'; import TwitterClient from '../index'; +import omitBy from 'lodash/omitBy'; +import isEmpty from 'lodash/isEmpty'; export default class GetUserTweets { client: TwitterClient; @@ -8,26 +11,52 @@ export default class GetUserTweets { this.client = client; } - async run(userId: string) { + async run(userId: string, lastInternalId?: string) { const token = { key: this.client.connection.formattedData.accessToken as string, secret: this.client.connection.formattedData.accessSecret as string, }; - const requestPath = `/2/users/${userId}/tweets`; + let response; + const tweets: IJSONObject[] = []; - const requestData = { - url: `${TwitterClient.baseUrl}${requestPath}`, - method: 'GET', - }; + do { + const params: IJSONObject = { + since_id: lastInternalId, + pagination_token: response?.data?.meta?.next_token, + }; - const authHeader = this.client.oauthClient.toHeader( - this.client.oauthClient.authorize(requestData, token) - ); + const queryParams = new URLSearchParams({ + ...omitBy(params, isEmpty), + }); - const response = await this.client.httpClient.get(requestPath, { - headers: { ...authHeader }, - }); + const requestPath = `/2/users/${userId}/tweets${ + queryParams.toString() ? `?${queryParams.toString()}` : '' + }`; + + const requestData = { + url: `${TwitterClient.baseUrl}${requestPath}`, + method: 'GET', + }; + + const authHeader = this.client.oauthClient.toHeader( + this.client.oauthClient.authorize(requestData, token) + ); + + response = await this.client.httpClient.get(requestPath, { + headers: { ...authHeader }, + }); + + if (response.data.meta.result_count > 0) { + response.data.data.forEach((tweet: IJSONObject) => { + if (!lastInternalId || Number(tweet.id) > Number(lastInternalId)) { + tweets.push(tweet); + } else { + return; + } + }); + } + } while (response.data.meta.next_token && lastInternalId); if (response.data?.errors) { const errorMessages = response.data.errors @@ -39,6 +68,6 @@ export default class GetUserTweets { ); } - return response; + return tweets; } } diff --git a/packages/backend/src/apps/twitter/triggers/user-tweet.ts b/packages/backend/src/apps/twitter/triggers/user-tweet.ts index 44fcf058..dca22027 100644 --- a/packages/backend/src/apps/twitter/triggers/user-tweet.ts +++ b/packages/backend/src/apps/twitter/triggers/user-tweet.ts @@ -7,24 +7,19 @@ export default class UserTweet { this.client = client; } - async run() { - return this.getTweets(); + async run(lastInternalId: string) { + return this.getTweets(lastInternalId); } async testRun() { return this.getTweets(); } - async getTweets() { - const userResponse = await this.client.getUserByUsername.run( + async getTweets(lastInternalId?: string) { + const user = await this.client.getUserByUsername.run( this.client.step.parameters.username as string ); - const userId = userResponse.data.data.id; - - const tweetsResponse = await this.client.getUserTweets.run(userId); - const tweets = tweetsResponse.data.data; - - return tweets; + return await this.client.getUserTweets.run(user.id, lastInternalId); } } diff --git a/packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts b/packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts new file mode 100644 index 00000000..7dacb7c6 --- /dev/null +++ b/packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts @@ -0,0 +1,13 @@ +import { Knex } from 'knex'; + +export async function up(knex: Knex): Promise { + return knex.schema.table('executions', (table) => { + table.string('internal_id'); + }); +} + +export async function down(knex: Knex): Promise { + return knex.schema.table('executions', (table) => { + table.dropColumn('internal_id'); + }); +} diff --git a/packages/backend/src/models/execution.ts b/packages/backend/src/models/execution.ts index 222f1802..e952e251 100644 --- a/packages/backend/src/models/execution.ts +++ b/packages/backend/src/models/execution.ts @@ -8,6 +8,7 @@ class Execution extends Base { id!: string; flowId!: string; testRun = false; + internalId!: string; executionSteps: ExecutionStep[] = []; static tableName = 'executions'; @@ -19,6 +20,7 @@ class Execution extends Base { id: { type: 'string', format: 'uuid' }, flowId: { type: 'string', format: 'uuid' }, testRun: { type: 'boolean' }, + internalId: { type: 'string' }, }, }; diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index 46169e8e..9cf31cad 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -49,6 +49,14 @@ class Flow extends Base { }, }); + async lastInternalId() { + const lastInternalIdFetched: any = await this.$relatedQuery( + 'executions' + ).max('internal_id'); + + return lastInternalIdFetched[0].max; + } + async $beforeUpdate( opt: ModelOptions, queryContext: QueryContext diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index f5c85554..d99fd5dc 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -3,6 +3,7 @@ import Flow from '../models/flow'; import Step from '../models/step'; import Execution from '../models/execution'; import ExecutionStep from '../models/execution-step'; +import { IJSONObject } from '@automatisch/types'; type ExecutionSteps = Record; @@ -34,16 +35,29 @@ class Processor { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion let initialTriggerData = await this.getInitialTriggerData(triggerStep!); + if (initialTriggerData.length === 0) { + return; + } + if (this.testRun) { initialTriggerData = [initialTriggerData[0]]; } + if (initialTriggerData.length > 1) { + initialTriggerData = initialTriggerData.sort( + (item: IJSONObject, nextItem: IJSONObject) => { + return (item.id as number) - (nextItem.id as number); + } + ); + } + const executions: Execution[] = []; for await (const data of initialTriggerData) { const execution = await Execution.query().insert({ flowId: this.flow.id, testRun: this.testRun, + internalId: data.id, }); executions.push(execution); @@ -65,6 +79,8 @@ class Processor { priorExecutionSteps ); + step.parameters = computedParameters; + const appInstance = new AppClass(step.connection, this.flow, step); if (!isTrigger && key) { @@ -105,23 +121,16 @@ class Processor { const AppClass = (await import(`../apps/${step.appKey}`)).default; const appInstance = new AppClass(step.connection, this.flow, step); - const lastExecutionStep = await step - .$relatedQuery('executionSteps') - .orderBy('created_at', 'desc') - .first(); - - const lastExecutionStepCreatedAt = lastExecutionStep?.createdAt as string; - const flow = (await step.$relatedQuery('flow')) as Flow; - const command = appInstance.triggers[step.key]; - const startTime = new Date(lastExecutionStepCreatedAt || flow.updatedAt); let fetchedData; + const lastInternalId = await this.flow.lastInternalId(); + if (this.testRun) { - fetchedData = await command.testRun(startTime); + fetchedData = await command.testRun(); } else { - fetchedData = await command.run(startTime); + fetchedData = await command.run(lastInternalId); } return fetchedData; diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 6f06513e..3dd82317 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -69,6 +69,7 @@ export interface IFlow { steps: IStep[]; createdAt: string; updatedAt: string; + lastInternalId: () => Promise; } export interface IUser { From 29abf702bd19d06b3003a3b72d38bf8eb51165f9 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Tue, 30 Aug 2022 14:18:35 +0300 Subject: [PATCH 2/5] chore: Use API Key and API Secret placeholders for twitter connection --- packages/backend/src/apps/twitter/info.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/backend/src/apps/twitter/info.json b/packages/backend/src/apps/twitter/info.json index dc3d0f3e..2b383999 100644 --- a/packages/backend/src/apps/twitter/info.json +++ b/packages/backend/src/apps/twitter/info.json @@ -20,7 +20,7 @@ }, { "key": "consumerKey", - "label": "Consumer Key", + "label": "API Key", "type": "string", "required": true, "readOnly": false, @@ -32,7 +32,7 @@ }, { "key": "consumerSecret", - "label": "Consumer Secret", + "label": "API Secret", "type": "string", "required": true, "readOnly": false, From fda957b1f6072653d94860537dd6eda534404bef Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Tue, 30 Aug 2022 14:26:56 +0300 Subject: [PATCH 3/5] fix: Adjust response types for twitter auth and endpoints --- packages/backend/src/apps/twitter/actions/create-tweet.ts | 4 ++-- .../src/apps/twitter/client/endpoints/create-tweet.ts | 4 +++- .../src/apps/twitter/client/endpoints/get-current-user.ts | 6 +++++- .../apps/twitter/client/endpoints/get-user-by-username.ts | 3 ++- packages/backend/src/apps/twitter/triggers/user-tweet.ts | 4 +++- 5 files changed, 15 insertions(+), 6 deletions(-) diff --git a/packages/backend/src/apps/twitter/actions/create-tweet.ts b/packages/backend/src/apps/twitter/actions/create-tweet.ts index 92dbc019..482ae0b4 100644 --- a/packages/backend/src/apps/twitter/actions/create-tweet.ts +++ b/packages/backend/src/apps/twitter/actions/create-tweet.ts @@ -8,10 +8,10 @@ export default class CreateTweet { } async run() { - const response = await this.client.createTweet.run( + const tweet = await this.client.createTweet.run( this.client.step.parameters.tweet as string ); - return response.data.data; + return tweet; } } diff --git a/packages/backend/src/apps/twitter/client/endpoints/create-tweet.ts b/packages/backend/src/apps/twitter/client/endpoints/create-tweet.ts index d8980159..52eade01 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/create-tweet.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/create-tweet.ts @@ -29,7 +29,9 @@ export default class CreateTweet { { headers: { ...authHeader } } ); - return response; + const tweet = response.data.data; + + return tweet; } catch (error) { const errorMessage = error.response.data.detail; throw new Error(`Error occured while creating a tweet: ${errorMessage}`); diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-current-user.ts b/packages/backend/src/apps/twitter/client/endpoints/get-current-user.ts index f8cc3531..4ccaa575 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-current-user.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-current-user.ts @@ -24,8 +24,12 @@ export default class GetCurrentUser { this.client.oauthClient.authorize(requestData, token) ); - return await this.client.httpClient.get(requestPath, { + const response = await this.client.httpClient.get(requestPath, { headers: { ...authHeader }, }); + + const currentUser = response.data.data; + + return currentUser; } } diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts b/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts index 78c8da2d..23ac8b83 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts @@ -39,6 +39,7 @@ export default class GetUserByUsername { ); } - return response.data.data; + const user = response.data.data; + return user; } } diff --git a/packages/backend/src/apps/twitter/triggers/user-tweet.ts b/packages/backend/src/apps/twitter/triggers/user-tweet.ts index dca22027..40bce65b 100644 --- a/packages/backend/src/apps/twitter/triggers/user-tweet.ts +++ b/packages/backend/src/apps/twitter/triggers/user-tweet.ts @@ -20,6 +20,8 @@ export default class UserTweet { this.client.step.parameters.username as string ); - return await this.client.getUserTweets.run(user.id, lastInternalId); + const tweets = await this.client.getUserTweets.run(user.id, lastInternalId); + + return tweets; } } From 9bd1447bcfd83c28ffb039c8a9fdf4b436edd723 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Tue, 30 Aug 2022 15:14:19 +0300 Subject: [PATCH 4/5] feat: Insert execution even though there is no new data fetched --- packages/backend/src/services/processor.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index d99fd5dc..fc9626fb 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -36,6 +36,14 @@ class Processor { let initialTriggerData = await this.getInitialTriggerData(triggerStep!); if (initialTriggerData.length === 0) { + const lastInternalId = await this.flow.lastInternalId(); + + await Execution.query().insert({ + flowId: this.flow.id, + testRun: this.testRun, + internalId: lastInternalId, + }); + return; } From db2c3556de597ed330dde055f271eab56c3e55eb Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Wed, 31 Aug 2022 12:47:45 +0300 Subject: [PATCH 5/5] refactor: Get last execution and find last internal ID --- .../src/apps/twitter/client/endpoints/get-user-tweets.ts | 4 +--- packages/backend/src/models/flow.ts | 9 +++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts b/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts index c9619dfb..b6343ced 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts @@ -26,9 +26,7 @@ export default class GetUserTweets { pagination_token: response?.data?.meta?.next_token, }; - const queryParams = new URLSearchParams({ - ...omitBy(params, isEmpty), - }); + const queryParams = new URLSearchParams(omitBy(params, isEmpty)); const requestPath = `/2/users/${userId}/tweets${ queryParams.toString() ? `?${queryParams.toString()}` : '' diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index 9cf31cad..ca44c208 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -4,6 +4,7 @@ import Base from './base'; import Step from './step'; import Execution from './execution'; import Telemetry from '../helpers/telemetry'; +import { IExecution } from '@automatisch/types'; class Flow extends Base { id!: string; @@ -50,11 +51,11 @@ class Flow extends Base { }); async lastInternalId() { - const lastInternalIdFetched: any = await this.$relatedQuery( - 'executions' - ).max('internal_id'); + const lastExecution = await this.$relatedQuery('executions') + .orderBy('created_at', 'desc') + .first(); - return lastInternalIdFetched[0].max; + return (lastExecution as Execution).internalId; } async $beforeUpdate(