feat: Implement draft version of pagination with user tweet trigger

This commit is contained in:
Faruk AYDIN
2022-08-29 23:11:28 +03:00
parent 6271cedc25
commit 5ddb5ab6fa
8 changed files with 92 additions and 35 deletions

View File

@@ -39,6 +39,6 @@ export default class GetUserByUsername {
); );
} }
return response; return response.data.data;
} }
} }

View File

@@ -1,5 +1,8 @@
import { IJSONObject } from '@automatisch/types'; import { IJSONObject } from '@automatisch/types';
import { URLSearchParams } from 'url';
import TwitterClient from '../index'; import TwitterClient from '../index';
import omitBy from 'lodash/omitBy';
import isEmpty from 'lodash/isEmpty';
export default class GetUserTweets { export default class GetUserTweets {
client: TwitterClient; client: TwitterClient;
@@ -8,26 +11,52 @@ export default class GetUserTweets {
this.client = client; this.client = client;
} }
async run(userId: string) { async run(userId: string, lastInternalId?: string) {
const token = { const token = {
key: this.client.connection.formattedData.accessToken as string, key: this.client.connection.formattedData.accessToken as string,
secret: this.client.connection.formattedData.accessSecret as string, secret: this.client.connection.formattedData.accessSecret as string,
}; };
const requestPath = `/2/users/${userId}/tweets`; let response;
const tweets: IJSONObject[] = [];
const requestData = { do {
url: `${TwitterClient.baseUrl}${requestPath}`, const params: IJSONObject = {
method: 'GET', since_id: lastInternalId,
}; pagination_token: response?.data?.meta?.next_token,
};
const authHeader = this.client.oauthClient.toHeader( const queryParams = new URLSearchParams({
this.client.oauthClient.authorize(requestData, token) ...omitBy(params, isEmpty),
); });
const response = await this.client.httpClient.get(requestPath, { const requestPath = `/2/users/${userId}/tweets${
headers: { ...authHeader }, queryParams.toString() ? `?${queryParams.toString()}` : ''
}); }`;
const requestData = {
url: `${TwitterClient.baseUrl}${requestPath}`,
method: 'GET',
};
const authHeader = this.client.oauthClient.toHeader(
this.client.oauthClient.authorize(requestData, token)
);
response = await this.client.httpClient.get(requestPath, {
headers: { ...authHeader },
});
if (response.data.meta.result_count > 0) {
response.data.data.forEach((tweet: IJSONObject) => {
if (!lastInternalId || Number(tweet.id) > Number(lastInternalId)) {
tweets.push(tweet);
} else {
return;
}
});
}
} while (response.data.meta.next_token && lastInternalId);
if (response.data?.errors) { if (response.data?.errors) {
const errorMessages = response.data.errors const errorMessages = response.data.errors
@@ -39,6 +68,6 @@ export default class GetUserTweets {
); );
} }
return response; return tweets;
} }
} }

View File

@@ -7,24 +7,19 @@ export default class UserTweet {
this.client = client; this.client = client;
} }
async run() { async run(lastInternalId: string) {
return this.getTweets(); return this.getTweets(lastInternalId);
} }
async testRun() { async testRun() {
return this.getTweets(); return this.getTweets();
} }
async getTweets() { async getTweets(lastInternalId?: string) {
const userResponse = await this.client.getUserByUsername.run( const user = await this.client.getUserByUsername.run(
this.client.step.parameters.username as string this.client.step.parameters.username as string
); );
const userId = userResponse.data.data.id; return await this.client.getUserTweets.run(user.id, lastInternalId);
const tweetsResponse = await this.client.getUserTweets.run(userId);
const tweets = tweetsResponse.data.data;
return tweets;
} }
} }

View File

@@ -0,0 +1,13 @@
import { Knex } from 'knex';
export async function up(knex: Knex): Promise<void> {
return knex.schema.table('executions', (table) => {
table.string('internal_id');
});
}
export async function down(knex: Knex): Promise<void> {
return knex.schema.table('executions', (table) => {
table.dropColumn('internal_id');
});
}

View File

@@ -8,6 +8,7 @@ class Execution extends Base {
id!: string; id!: string;
flowId!: string; flowId!: string;
testRun = false; testRun = false;
internalId!: string;
executionSteps: ExecutionStep[] = []; executionSteps: ExecutionStep[] = [];
static tableName = 'executions'; static tableName = 'executions';
@@ -19,6 +20,7 @@ class Execution extends Base {
id: { type: 'string', format: 'uuid' }, id: { type: 'string', format: 'uuid' },
flowId: { type: 'string', format: 'uuid' }, flowId: { type: 'string', format: 'uuid' },
testRun: { type: 'boolean' }, testRun: { type: 'boolean' },
internalId: { type: 'string' },
}, },
}; };

View File

@@ -49,6 +49,14 @@ class Flow extends Base {
}, },
}); });
async lastInternalId() {
const lastInternalIdFetched: any = await this.$relatedQuery(
'executions'
).max('internal_id');
return lastInternalIdFetched[0].max;
}
async $beforeUpdate( async $beforeUpdate(
opt: ModelOptions, opt: ModelOptions,
queryContext: QueryContext queryContext: QueryContext

View File

@@ -3,6 +3,7 @@ import Flow from '../models/flow';
import Step from '../models/step'; import Step from '../models/step';
import Execution from '../models/execution'; import Execution from '../models/execution';
import ExecutionStep from '../models/execution-step'; import ExecutionStep from '../models/execution-step';
import { IJSONObject } from '@automatisch/types';
type ExecutionSteps = Record<string, ExecutionStep>; type ExecutionSteps = Record<string, ExecutionStep>;
@@ -34,16 +35,29 @@ class Processor {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
let initialTriggerData = await this.getInitialTriggerData(triggerStep!); let initialTriggerData = await this.getInitialTriggerData(triggerStep!);
if (initialTriggerData.length === 0) {
return;
}
if (this.testRun) { if (this.testRun) {
initialTriggerData = [initialTriggerData[0]]; initialTriggerData = [initialTriggerData[0]];
} }
if (initialTriggerData.length > 1) {
initialTriggerData = initialTriggerData.sort(
(item: IJSONObject, nextItem: IJSONObject) => {
return (item.id as number) - (nextItem.id as number);
}
);
}
const executions: Execution[] = []; const executions: Execution[] = [];
for await (const data of initialTriggerData) { for await (const data of initialTriggerData) {
const execution = await Execution.query().insert({ const execution = await Execution.query().insert({
flowId: this.flow.id, flowId: this.flow.id,
testRun: this.testRun, testRun: this.testRun,
internalId: data.id,
}); });
executions.push(execution); executions.push(execution);
@@ -65,6 +79,8 @@ class Processor {
priorExecutionSteps priorExecutionSteps
); );
step.parameters = computedParameters;
const appInstance = new AppClass(step.connection, this.flow, step); const appInstance = new AppClass(step.connection, this.flow, step);
if (!isTrigger && key) { if (!isTrigger && key) {
@@ -105,23 +121,16 @@ class Processor {
const AppClass = (await import(`../apps/${step.appKey}`)).default; const AppClass = (await import(`../apps/${step.appKey}`)).default;
const appInstance = new AppClass(step.connection, this.flow, step); const appInstance = new AppClass(step.connection, this.flow, step);
const lastExecutionStep = await step
.$relatedQuery('executionSteps')
.orderBy('created_at', 'desc')
.first();
const lastExecutionStepCreatedAt = lastExecutionStep?.createdAt as string;
const flow = (await step.$relatedQuery('flow')) as Flow;
const command = appInstance.triggers[step.key]; const command = appInstance.triggers[step.key];
const startTime = new Date(lastExecutionStepCreatedAt || flow.updatedAt);
let fetchedData; let fetchedData;
const lastInternalId = await this.flow.lastInternalId();
if (this.testRun) { if (this.testRun) {
fetchedData = await command.testRun(startTime); fetchedData = await command.testRun();
} else { } else {
fetchedData = await command.run(startTime); fetchedData = await command.run(lastInternalId);
} }
return fetchedData; return fetchedData;

View File

@@ -69,6 +69,7 @@ export interface IFlow {
steps: IStep[]; steps: IStep[];
createdAt: string; createdAt: string;
updatedAt: string; updatedAt: string;
lastInternalId: () => Promise<string>;
} }
export interface IUser { export interface IUser {