Merge pull request #462 from automatisch/feature/user-tweet-pagination

feat: Implement draft version of pagination with user tweet trigger
This commit is contained in:
Ömer Faruk Aydın
2022-08-31 12:57:26 +03:00
committed by GitHub
12 changed files with 112 additions and 39 deletions

View File

@@ -8,10 +8,10 @@ export default class CreateTweet {
}
async run() {
const response = await this.client.createTweet.run(
const tweet = await this.client.createTweet.run(
this.client.step.parameters.tweet as string
);
return response.data.data;
return tweet;
}
}

View File

@@ -29,7 +29,9 @@ export default class CreateTweet {
{ headers: { ...authHeader } }
);
return response;
const tweet = response.data.data;
return tweet;
} catch (error) {
const errorMessage = error.response.data.detail;
throw new Error(`Error occured while creating a tweet: ${errorMessage}`);

View File

@@ -24,8 +24,12 @@ export default class GetCurrentUser {
this.client.oauthClient.authorize(requestData, token)
);
return await this.client.httpClient.get(requestPath, {
const response = await this.client.httpClient.get(requestPath, {
headers: { ...authHeader },
});
const currentUser = response.data.data;
return currentUser;
}
}

View File

@@ -39,6 +39,7 @@ export default class GetUserByUsername {
);
}
return response;
const user = response.data.data;
return user;
}
}

View File

@@ -1,5 +1,8 @@
import { IJSONObject } from '@automatisch/types';
import { URLSearchParams } from 'url';
import TwitterClient from '../index';
import omitBy from 'lodash/omitBy';
import isEmpty from 'lodash/isEmpty';
export default class GetUserTweets {
client: TwitterClient;
@@ -8,13 +11,26 @@ export default class GetUserTweets {
this.client = client;
}
async run(userId: string) {
async run(userId: string, lastInternalId?: string) {
const token = {
key: this.client.connection.formattedData.accessToken as string,
secret: this.client.connection.formattedData.accessSecret as string,
};
const requestPath = `/2/users/${userId}/tweets`;
let response;
const tweets: IJSONObject[] = [];
do {
const params: IJSONObject = {
since_id: lastInternalId,
pagination_token: response?.data?.meta?.next_token,
};
const queryParams = new URLSearchParams(omitBy(params, isEmpty));
const requestPath = `/2/users/${userId}/tweets${
queryParams.toString() ? `?${queryParams.toString()}` : ''
}`;
const requestData = {
url: `${TwitterClient.baseUrl}${requestPath}`,
@@ -25,10 +41,21 @@ export default class GetUserTweets {
this.client.oauthClient.authorize(requestData, token)
);
const response = await this.client.httpClient.get(requestPath, {
response = await this.client.httpClient.get(requestPath, {
headers: { ...authHeader },
});
if (response.data.meta.result_count > 0) {
response.data.data.forEach((tweet: IJSONObject) => {
if (!lastInternalId || Number(tweet.id) > Number(lastInternalId)) {
tweets.push(tweet);
} else {
return;
}
});
}
} while (response.data.meta.next_token && lastInternalId);
if (response.data?.errors) {
const errorMessages = response.data.errors
.map((error: IJSONObject) => error.detail)
@@ -39,6 +66,6 @@ export default class GetUserTweets {
);
}
return response;
return tweets;
}
}

View File

@@ -20,7 +20,7 @@
},
{
"key": "consumerKey",
"label": "Consumer Key",
"label": "API Key",
"type": "string",
"required": true,
"readOnly": false,
@@ -32,7 +32,7 @@
},
{
"key": "consumerSecret",
"label": "Consumer Secret",
"label": "API Secret",
"type": "string",
"required": true,
"readOnly": false,

View File

@@ -7,23 +7,20 @@ export default class UserTweet {
this.client = client;
}
async run() {
return this.getTweets();
async run(lastInternalId: string) {
return this.getTweets(lastInternalId);
}
async testRun() {
return this.getTweets();
}
async getTweets() {
const userResponse = await this.client.getUserByUsername.run(
async getTweets(lastInternalId?: string) {
const user = await this.client.getUserByUsername.run(
this.client.step.parameters.username as string
);
const userId = userResponse.data.data.id;
const tweetsResponse = await this.client.getUserTweets.run(userId);
const tweets = tweetsResponse.data.data;
const tweets = await this.client.getUserTweets.run(user.id, lastInternalId);
return tweets;
}

View File

@@ -0,0 +1,13 @@
import { Knex } from 'knex';
export async function up(knex: Knex): Promise<void> {
return knex.schema.table('executions', (table) => {
table.string('internal_id');
});
}
export async function down(knex: Knex): Promise<void> {
return knex.schema.table('executions', (table) => {
table.dropColumn('internal_id');
});
}

View File

@@ -8,6 +8,7 @@ class Execution extends Base {
id!: string;
flowId!: string;
testRun = false;
internalId!: string;
executionSteps: ExecutionStep[] = [];
static tableName = 'executions';
@@ -19,6 +20,7 @@ class Execution extends Base {
id: { type: 'string', format: 'uuid' },
flowId: { type: 'string', format: 'uuid' },
testRun: { type: 'boolean' },
internalId: { type: 'string' },
},
};

View File

@@ -4,6 +4,7 @@ import Base from './base';
import Step from './step';
import Execution from './execution';
import Telemetry from '../helpers/telemetry';
import { IExecution } from '@automatisch/types';
class Flow extends Base {
id!: string;
@@ -49,6 +50,14 @@ class Flow extends Base {
},
});
async lastInternalId() {
const lastExecution = await this.$relatedQuery('executions')
.orderBy('created_at', 'desc')
.first();
return (lastExecution as Execution).internalId;
}
async $beforeUpdate(
opt: ModelOptions,
queryContext: QueryContext

View File

@@ -3,6 +3,7 @@ import Flow from '../models/flow';
import Step from '../models/step';
import Execution from '../models/execution';
import ExecutionStep from '../models/execution-step';
import { IJSONObject } from '@automatisch/types';
type ExecutionSteps = Record<string, ExecutionStep>;
@@ -34,16 +35,37 @@ class Processor {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
let initialTriggerData = await this.getInitialTriggerData(triggerStep!);
if (initialTriggerData.length === 0) {
const lastInternalId = await this.flow.lastInternalId();
await Execution.query().insert({
flowId: this.flow.id,
testRun: this.testRun,
internalId: lastInternalId,
});
return;
}
if (this.testRun) {
initialTriggerData = [initialTriggerData[0]];
}
if (initialTriggerData.length > 1) {
initialTriggerData = initialTriggerData.sort(
(item: IJSONObject, nextItem: IJSONObject) => {
return (item.id as number) - (nextItem.id as number);
}
);
}
const executions: Execution[] = [];
for await (const data of initialTriggerData) {
const execution = await Execution.query().insert({
flowId: this.flow.id,
testRun: this.testRun,
internalId: data.id,
});
executions.push(execution);
@@ -65,6 +87,8 @@ class Processor {
priorExecutionSteps
);
step.parameters = computedParameters;
const appInstance = new AppClass(step.connection, this.flow, step);
if (!isTrigger && key) {
@@ -105,23 +129,16 @@ class Processor {
const AppClass = (await import(`../apps/${step.appKey}`)).default;
const appInstance = new AppClass(step.connection, this.flow, step);
const lastExecutionStep = await step
.$relatedQuery('executionSteps')
.orderBy('created_at', 'desc')
.first();
const lastExecutionStepCreatedAt = lastExecutionStep?.createdAt as string;
const flow = (await step.$relatedQuery('flow')) as Flow;
const command = appInstance.triggers[step.key];
const startTime = new Date(lastExecutionStepCreatedAt || flow.updatedAt);
let fetchedData;
const lastInternalId = await this.flow.lastInternalId();
if (this.testRun) {
fetchedData = await command.testRun(startTime);
fetchedData = await command.testRun();
} else {
fetchedData = await command.run(startTime);
fetchedData = await command.run(lastInternalId);
}
return fetchedData;

View File

@@ -69,6 +69,7 @@ export interface IFlow {
steps: IStep[];
createdAt: string;
updatedAt: string;
lastInternalId: () => Promise<string>;
}
export interface IUser {