diff --git a/packages/backend/src/apps/twitter/actions/create-tweet.ts b/packages/backend/src/apps/twitter/actions/create-tweet.ts index 92dbc019..482ae0b4 100644 --- a/packages/backend/src/apps/twitter/actions/create-tweet.ts +++ b/packages/backend/src/apps/twitter/actions/create-tweet.ts @@ -8,10 +8,10 @@ export default class CreateTweet { } async run() { - const response = await this.client.createTweet.run( + const tweet = await this.client.createTweet.run( this.client.step.parameters.tweet as string ); - return response.data.data; + return tweet; } } diff --git a/packages/backend/src/apps/twitter/client/endpoints/create-tweet.ts b/packages/backend/src/apps/twitter/client/endpoints/create-tweet.ts index d8980159..52eade01 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/create-tweet.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/create-tweet.ts @@ -29,7 +29,9 @@ export default class CreateTweet { { headers: { ...authHeader } } ); - return response; + const tweet = response.data.data; + + return tweet; } catch (error) { const errorMessage = error.response.data.detail; throw new Error(`Error occured while creating a tweet: ${errorMessage}`); diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-current-user.ts b/packages/backend/src/apps/twitter/client/endpoints/get-current-user.ts index f8cc3531..4ccaa575 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-current-user.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-current-user.ts @@ -24,8 +24,12 @@ export default class GetCurrentUser { this.client.oauthClient.authorize(requestData, token) ); - return await this.client.httpClient.get(requestPath, { + const response = await this.client.httpClient.get(requestPath, { headers: { ...authHeader }, }); + + const currentUser = response.data.data; + + return currentUser; } } diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts b/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts index e21e2fda..23ac8b83 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-user-by-username.ts @@ -39,6 +39,7 @@ export default class GetUserByUsername { ); } - return response; + const user = response.data.data; + return user; } } diff --git a/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts b/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts index 7bde7466..b6343ced 100644 --- a/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts +++ b/packages/backend/src/apps/twitter/client/endpoints/get-user-tweets.ts @@ -1,5 +1,8 @@ import { IJSONObject } from '@automatisch/types'; +import { URLSearchParams } from 'url'; import TwitterClient from '../index'; +import omitBy from 'lodash/omitBy'; +import isEmpty from 'lodash/isEmpty'; export default class GetUserTweets { client: TwitterClient; @@ -8,26 +11,50 @@ export default class GetUserTweets { this.client = client; } - async run(userId: string) { + async run(userId: string, lastInternalId?: string) { const token = { key: this.client.connection.formattedData.accessToken as string, secret: this.client.connection.formattedData.accessSecret as string, }; - const requestPath = `/2/users/${userId}/tweets`; + let response; + const tweets: IJSONObject[] = []; - const requestData = { - url: `${TwitterClient.baseUrl}${requestPath}`, - method: 'GET', - }; + do { + const params: IJSONObject = { + since_id: lastInternalId, + pagination_token: response?.data?.meta?.next_token, + }; - const authHeader = this.client.oauthClient.toHeader( - this.client.oauthClient.authorize(requestData, token) - ); + const queryParams = new URLSearchParams(omitBy(params, isEmpty)); - const response = await this.client.httpClient.get(requestPath, { - headers: { ...authHeader }, - }); + const requestPath = `/2/users/${userId}/tweets${ + queryParams.toString() ? `?${queryParams.toString()}` : '' + }`; + + const requestData = { + url: `${TwitterClient.baseUrl}${requestPath}`, + method: 'GET', + }; + + const authHeader = this.client.oauthClient.toHeader( + this.client.oauthClient.authorize(requestData, token) + ); + + response = await this.client.httpClient.get(requestPath, { + headers: { ...authHeader }, + }); + + if (response.data.meta.result_count > 0) { + response.data.data.forEach((tweet: IJSONObject) => { + if (!lastInternalId || Number(tweet.id) > Number(lastInternalId)) { + tweets.push(tweet); + } else { + return; + } + }); + } + } while (response.data.meta.next_token && lastInternalId); if (response.data?.errors) { const errorMessages = response.data.errors @@ -39,6 +66,6 @@ export default class GetUserTweets { ); } - return response; + return tweets; } } diff --git a/packages/backend/src/apps/twitter/info.json b/packages/backend/src/apps/twitter/info.json index dc3d0f3e..2b383999 100644 --- a/packages/backend/src/apps/twitter/info.json +++ b/packages/backend/src/apps/twitter/info.json @@ -20,7 +20,7 @@ }, { "key": "consumerKey", - "label": "Consumer Key", + "label": "API Key", "type": "string", "required": true, "readOnly": false, @@ -32,7 +32,7 @@ }, { "key": "consumerSecret", - "label": "Consumer Secret", + "label": "API Secret", "type": "string", "required": true, "readOnly": false, diff --git a/packages/backend/src/apps/twitter/triggers/user-tweet.ts b/packages/backend/src/apps/twitter/triggers/user-tweet.ts index 44fcf058..40bce65b 100644 --- a/packages/backend/src/apps/twitter/triggers/user-tweet.ts +++ b/packages/backend/src/apps/twitter/triggers/user-tweet.ts @@ -7,23 +7,20 @@ export default class UserTweet { this.client = client; } - async run() { - return this.getTweets(); + async run(lastInternalId: string) { + return this.getTweets(lastInternalId); } async testRun() { return this.getTweets(); } - async getTweets() { - const userResponse = await this.client.getUserByUsername.run( + async getTweets(lastInternalId?: string) { + const user = await this.client.getUserByUsername.run( this.client.step.parameters.username as string ); - const userId = userResponse.data.data.id; - - const tweetsResponse = await this.client.getUserTweets.run(userId); - const tweets = tweetsResponse.data.data; + const tweets = await this.client.getUserTweets.run(user.id, lastInternalId); return tweets; } diff --git a/packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts b/packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts new file mode 100644 index 00000000..7dacb7c6 --- /dev/null +++ b/packages/backend/src/db/migrations/20220823171017_add_internal_id_to_executions.ts @@ -0,0 +1,13 @@ +import { Knex } from 'knex'; + +export async function up(knex: Knex): Promise { + return knex.schema.table('executions', (table) => { + table.string('internal_id'); + }); +} + +export async function down(knex: Knex): Promise { + return knex.schema.table('executions', (table) => { + table.dropColumn('internal_id'); + }); +} diff --git a/packages/backend/src/models/execution.ts b/packages/backend/src/models/execution.ts index 222f1802..e952e251 100644 --- a/packages/backend/src/models/execution.ts +++ b/packages/backend/src/models/execution.ts @@ -8,6 +8,7 @@ class Execution extends Base { id!: string; flowId!: string; testRun = false; + internalId!: string; executionSteps: ExecutionStep[] = []; static tableName = 'executions'; @@ -19,6 +20,7 @@ class Execution extends Base { id: { type: 'string', format: 'uuid' }, flowId: { type: 'string', format: 'uuid' }, testRun: { type: 'boolean' }, + internalId: { type: 'string' }, }, }; diff --git a/packages/backend/src/models/flow.ts b/packages/backend/src/models/flow.ts index 46169e8e..ca44c208 100644 --- a/packages/backend/src/models/flow.ts +++ b/packages/backend/src/models/flow.ts @@ -4,6 +4,7 @@ import Base from './base'; import Step from './step'; import Execution from './execution'; import Telemetry from '../helpers/telemetry'; +import { IExecution } from '@automatisch/types'; class Flow extends Base { id!: string; @@ -49,6 +50,14 @@ class Flow extends Base { }, }); + async lastInternalId() { + const lastExecution = await this.$relatedQuery('executions') + .orderBy('created_at', 'desc') + .first(); + + return (lastExecution as Execution).internalId; + } + async $beforeUpdate( opt: ModelOptions, queryContext: QueryContext diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts index f5c85554..fc9626fb 100644 --- a/packages/backend/src/services/processor.ts +++ b/packages/backend/src/services/processor.ts @@ -3,6 +3,7 @@ import Flow from '../models/flow'; import Step from '../models/step'; import Execution from '../models/execution'; import ExecutionStep from '../models/execution-step'; +import { IJSONObject } from '@automatisch/types'; type ExecutionSteps = Record; @@ -34,16 +35,37 @@ class Processor { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion let initialTriggerData = await this.getInitialTriggerData(triggerStep!); + if (initialTriggerData.length === 0) { + const lastInternalId = await this.flow.lastInternalId(); + + await Execution.query().insert({ + flowId: this.flow.id, + testRun: this.testRun, + internalId: lastInternalId, + }); + + return; + } + if (this.testRun) { initialTriggerData = [initialTriggerData[0]]; } + if (initialTriggerData.length > 1) { + initialTriggerData = initialTriggerData.sort( + (item: IJSONObject, nextItem: IJSONObject) => { + return (item.id as number) - (nextItem.id as number); + } + ); + } + const executions: Execution[] = []; for await (const data of initialTriggerData) { const execution = await Execution.query().insert({ flowId: this.flow.id, testRun: this.testRun, + internalId: data.id, }); executions.push(execution); @@ -65,6 +87,8 @@ class Processor { priorExecutionSteps ); + step.parameters = computedParameters; + const appInstance = new AppClass(step.connection, this.flow, step); if (!isTrigger && key) { @@ -105,23 +129,16 @@ class Processor { const AppClass = (await import(`../apps/${step.appKey}`)).default; const appInstance = new AppClass(step.connection, this.flow, step); - const lastExecutionStep = await step - .$relatedQuery('executionSteps') - .orderBy('created_at', 'desc') - .first(); - - const lastExecutionStepCreatedAt = lastExecutionStep?.createdAt as string; - const flow = (await step.$relatedQuery('flow')) as Flow; - const command = appInstance.triggers[step.key]; - const startTime = new Date(lastExecutionStepCreatedAt || flow.updatedAt); let fetchedData; + const lastInternalId = await this.flow.lastInternalId(); + if (this.testRun) { - fetchedData = await command.testRun(startTime); + fetchedData = await command.testRun(); } else { - fetchedData = await command.run(startTime); + fetchedData = await command.run(lastInternalId); } return fetchedData; diff --git a/packages/types/index.d.ts b/packages/types/index.d.ts index 6f06513e..3dd82317 100644 --- a/packages/types/index.d.ts +++ b/packages/types/index.d.ts @@ -69,6 +69,7 @@ export interface IFlow { steps: IStep[]; createdAt: string; updatedAt: string; + lastInternalId: () => Promise; } export interface IUser {