From 5ddb5ab6fa145d984fd03fd9a93566ab88049384 Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Mon, 29 Aug 2022 23:11:28 +0300 Subject: [PATCH] feat: Implement draft version of pagination with user tweet trigger --- .../client/endpoints/get-user-by-username.ts | 2 +- .../client/endpoints/get-user-tweets.ts | 55 ++++++++++++++----- .../src/apps/twitter/triggers/user-tweet.ts | 15 ++--- ...823171017_add_internal_id_to_executions.ts | 13 +++++ packages/backend/src/models/execution.ts | 2 + packages/backend/src/models/flow.ts | 8 +++ packages/backend/src/services/processor.ts | 31 +++++++---- packages/types/index.d.ts | 1 + 8 files changed, 92 insertions(+), 35 deletions(-) create mode 100644 packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts b/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts index e21e2fda..78c8da2d 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts @@ -39,6 +39,6 @@ export default class GetUserByUsername { ); } - return response; + return response.data.data; } } diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts b/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts index 7bde7466..c9619dfb 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts @@ -1,5 +1,8 @@ import { IJSONObject } from '@automatisch/types'; +import { URLSearchParams } from 'url'; import TwitterClient from '../index'; +import omitBy from 'lodash/omitBy'; +import isEmpty from 'lodash/isEmpty'; export default class GetUserTweets { client: TwitterClient; @@ -8,26 +11,52 @@ export default class GetUserTweets { this.client = client; } - async run(userId: string) { + async run(userId: string, lastInternalId?: string) { const token = { key: this.client.connection.formattedData.accessToken as string, secret: this.client.connection.formattedData.accessSecret as string, }; - const requestPath = `/2/users/${userId}/tweets`; + let response; + const tweets: IJSONObject[] = []; - const requestData = { - url: `${TwitterClient.baseUrl}${requestPath}`, - method: 'GET', - }; + do { + const params: IJSONObject = { + since_id: lastInternalId, + pagination_token: response?.data?.meta?.next_token, + }; - const authHeader = this.client.oauthClient.toHeader( - this.client.oauthClient.authorize(requestData, token) - ); + const queryParams = new URLSearchParams({ + ...omitBy(params, isEmpty), + }); - const response = await this.client.httpClient.get(requestPath, { - headers: { ...authHeader }, - }); + const requestPath = `/2/users/${userId}/tweets${ + queryParams.toString() ? `?${queryParams.toString()}` : '' + }`; + + const requestData = { + url: `${TwitterClient.baseUrl}${requestPath}`, + method: 'GET', + }; + + const authHeader = this.client.oauthClient.toHeader( + this.client.oauthClient.authorize(requestData, token) + ); + + response = await this.client.httpClient.get(requestPath, { + headers: { ...authHeader }, + }); + + if (response.data.meta.result_count > 0) { + response.data.data.forEach((tweet: IJSONObject) => { + if (!lastInternalId || Number(tweet.id) > Number(lastInternalId)) { + tweets.push(tweet); + } else { + return; + } + }); + } + } while (response.data.meta.next_token && lastInternalId); if (response.data?.errors) { const errorMessages = response.data.errors @@ -39,6 +68,6 @@ export default class GetUserTweets { ); } - return response; + return tweets; } } diff --git a/packages/backend/src/apps/twitter/triggers/user-tweet.ts b/packages/backend/src/apps/twitter/triggers/user-tweet.ts index 44fcf058..dca22027 100644 --- a/packages/backend/src/apps/twitter/triggers/user-tweet.ts +++ b/packages/backend/src/apps/twitter/triggers/user-tweet.ts @@ -7,24 +7,19 @@ export default class UserTweet { this.client = client; } - async run() { - return this.getTweets(); + async run(lastInternalId: string) { + return this.getTweets(lastInternalId); } async testRun() { return this.getTweets(); } - async getTweets() { - const userResponse = await this.client.getUserByUsername.run( + async getTweets(lastInternalId?: string) { + const user = await this.client.getUserByUsername.run( this.client.step.parameters.username as string ); - const userId = userResponse.data.data.id; - - const tweetsResponse = await this.client.getUserTweets.run(userId); - const tweets = tweetsResponse.data.data; - - return tweets; + return await this.client.getUserTweets.run(user.id, lastInternalId); } } diff --git a/packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts b/packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts new file mode 100644 index 00000000..7dacb7c6 --- /dev/null +++ b/packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts @@ -0,0 +1,13 @@ +import { Knex } from 'knex'; + +export async function up(knex: Knex): Promise { + return knex.schema.table('executions', (table) => { + table.string('internal_id'); + }); +} + +export async function down(knex: Knex): Promise { + return knex.schema.table('executions', (table) => { + table.dropColumn('internal_id'); + }); +} diff --git a/packages/backend/src/models/execution.ts b/packages/backend/src/models/execution.ts index 222f1802..e952e251 100644 --- a/packages/backend/src/models/execution.ts +++ b/packages/backend/src/models/execution.ts @@ -8,6 +8,7 @@ class Execution extends Base { id!: string; flowId!: string; testRun = false; + internalId!: string; executionSteps: ExecutionStep[] = []; static tableName = 'executions'; @@ -19,6 +20,7 @@ class Execution extends Base { id: { type: 'string', format: 'uuid' }, flowId: { type: 'string', format: 'uuid' }, testRun: { type: 'boolean' }, + internalId: { type: 'string' }, }, }; diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index 46169e8e..9cf31cad 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -49,6 +49,14 @@ class Flow extends Base { }, }); + async lastInternalId() { + const lastInternalIdFetched: any = await this.$relatedQuery( + 'executions' + ).max('internal_id'); + + return lastInternalIdFetched[0].max; + } + async $beforeUpdate( opt: ModelOptions, queryContext: QueryContext diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index f5c85554..d99fd5dc 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -3,6 +3,7 @@ import Flow from '../models/flow'; import Step from '../models/step'; import Execution from '../models/execution'; import ExecutionStep from '../models/execution-step'; +import { IJSONObject } from '@automatisch/types'; type ExecutionSteps = Record; @@ -34,16 +35,29 @@ class Processor { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion let initialTriggerData = await this.getInitialTriggerData(triggerStep!); + if (initialTriggerData.length === 0) { + return; + } + if (this.testRun) { initialTriggerData = [initialTriggerData[0]]; } + if (initialTriggerData.length > 1) { + initialTriggerData = initialTriggerData.sort( + (item: IJSONObject, nextItem: IJSONObject) => { + return (item.id as number) - (nextItem.id as number); + } + ); + } + const executions: Execution[] = []; for await (const data of initialTriggerData) { const execution = await Execution.query().insert({ flowId: this.flow.id, testRun: this.testRun, + internalId: data.id, }); executions.push(execution); @@ -65,6 +79,8 @@ class Processor { priorExecutionSteps ); + step.parameters = computedParameters; + const appInstance = new AppClass(step.connection, this.flow, step); if (!isTrigger && key) { @@ -105,23 +121,16 @@ class Processor { const AppClass = (await import(`../apps/${step.appKey}`)).default; const appInstance = new AppClass(step.connection, this.flow, step); - const lastExecutionStep = await step - .$relatedQuery('executionSteps') - .orderBy('created_at', 'desc') - .first(); - - const lastExecutionStepCreatedAt = lastExecutionStep?.createdAt as string; - const flow = (await step.$relatedQuery('flow')) as Flow; - const command = appInstance.triggers[step.key]; - const startTime = new Date(lastExecutionStepCreatedAt || flow.updatedAt); let fetchedData; + const lastInternalId = await this.flow.lastInternalId(); + if (this.testRun) { - fetchedData = await command.testRun(startTime); + fetchedData = await command.testRun(); } else { - fetchedData = await command.run(startTime); + fetchedData = await command.run(lastInternalId); } return fetchedData; diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 6f06513e..3dd82317 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -69,6 +69,7 @@ export interface IFlow { steps: IStep[]; createdAt: string; updatedAt: string; + lastInternalId: () => Promise; } export interface IUser {