feat: Fetch data while considering previously persisted records
This commit is contained in:

committed by
Ömer Faruk Aydın

parent
782dba1f5e
commit
3340bdff4c
@@ -16,12 +16,43 @@ export default class SearchTweet {
|
|||||||
this.parameters = parameters;
|
this.parameters = parameters;
|
||||||
}
|
}
|
||||||
|
|
||||||
async run() {
|
async run(startTime: Date) {
|
||||||
const response = await this.client.v2.get('tweets/search/recent', {
|
const tweets = [];
|
||||||
query: this.parameters.searchTerm as string,
|
|
||||||
max_results: 10,
|
|
||||||
});
|
|
||||||
|
|
||||||
return response.data;
|
const response = await this.client.v2.search(
|
||||||
|
this.parameters.searchTerm as string,
|
||||||
|
{
|
||||||
|
max_results: 50,
|
||||||
|
'tweet.fields': 'created_at',
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
for await (const tweet of response.data.data) {
|
||||||
|
if (new Date(tweet.created_at).getTime() <= startTime.getTime()) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tweets.push(tweet);
|
||||||
|
|
||||||
|
if (response.data.meta.next_token) {
|
||||||
|
await response.fetchNext();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return tweets;
|
||||||
|
}
|
||||||
|
|
||||||
|
async testRun() {
|
||||||
|
const response = await this.client.v2.search(
|
||||||
|
this.parameters.searchTerm as string,
|
||||||
|
{
|
||||||
|
max_results: 10,
|
||||||
|
'tweet.fields': 'created_at',
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
const mostRecentTweet = response.data.data[0];
|
||||||
|
|
||||||
|
return [mostRecentTweet];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@@ -1,5 +1,5 @@
|
|||||||
import { ValidationError } from 'objection';
|
import { ValidationError } from 'objection';
|
||||||
import type { QueryContext, ModelOptions } from 'objection';
|
import type { ModelOptions } from 'objection';
|
||||||
import Base from './base';
|
import Base from './base';
|
||||||
import Step from './step';
|
import Step from './step';
|
||||||
import Execution from './execution';
|
import Execution from './execution';
|
||||||
|
@@ -121,8 +121,26 @@ class Processor {
|
|||||||
rawParameters
|
rawParameters
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const lastExecutionStep = await step
|
||||||
|
.$relatedQuery('executionSteps')
|
||||||
|
.orderBy('created_at', 'desc')
|
||||||
|
.first();
|
||||||
|
|
||||||
|
const lastExecutionStepCratedAt = lastExecutionStep?.dataOut
|
||||||
|
?.created_at as string;
|
||||||
|
const flow = (await step.$relatedQuery('flow')) as Flow;
|
||||||
|
|
||||||
const command = appInstance.triggers[key];
|
const command = appInstance.triggers[key];
|
||||||
const fetchedData = await command.run();
|
|
||||||
|
const startTime = new Date(lastExecutionStepCratedAt || flow.updatedAt);
|
||||||
|
let fetchedData;
|
||||||
|
|
||||||
|
if (this.testRun) {
|
||||||
|
fetchedData = await command.testRun(startTime);
|
||||||
|
} else {
|
||||||
|
fetchedData = await command.run(startTime);
|
||||||
|
}
|
||||||
|
|
||||||
return fetchedData;
|
return fetchedData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user