fix: Make processor to work with multiple trigger objects

This commit is contained in:
Faruk AYDIN
2022-03-12 14:20:45 +03:00
committed by Ömer Faruk Aydın
parent fec55d698a
commit 3f7a888429
3 changed files with 89 additions and 45 deletions

View File

@@ -20,6 +20,6 @@ export default class MyTweet {
const userTimeline = await this.client.v1.userTimelineByUsername(username); const userTimeline = await this.client.v1.userTimelineByUsername(username);
const fetchedTweets = userTimeline.tweets; const fetchedTweets = userTimeline.tweets;
return fetchedTweets[0]; return fetchedTweets;
} }
} }

View File

@@ -23,7 +23,7 @@ const executeFlow = async (
.throwIfNotFound(); .throwIfNotFound();
const flow = await step.$relatedQuery('flow'); const flow = await step.$relatedQuery('flow');
const data = await new Processor(flow, step).run(); const data = await new Processor(flow, step, { testRun: true }).run();
// TODO: Use this snippet to execute flows with the background job. // TODO: Use this snippet to execute flows with the background job.
// const data = processorQueue.add('processorJob', { // const data = processorQueue.add('processorJob', {

View File

@@ -10,12 +10,14 @@ type ExecutionSteps = Record<string, ExecutionStep>;
class Processor { class Processor {
flow: Flow; flow: Flow;
untilStep: Step; untilStep: Step;
testRun: boolean;
static variableRegExp = /({{step\..+\..+}})/g; static variableRegExp = /({{step\..+\..+}})/g;
constructor(flow: Flow, untilStep: Step) { constructor(flow: Flow, untilStep: Step, { testRun = false }) {
this.flow = flow; this.flow = flow;
this.untilStep = untilStep; this.untilStep = untilStep;
this.testRun = testRun;
} }
async run() { async run() {
@@ -24,56 +26,98 @@ class Processor {
.withGraphFetched('connection') .withGraphFetched('connection')
.orderBy('position', 'asc'); .orderBy('position', 'asc');
const execution = await Execution.query().insert({ const triggerStep = steps.find((step) => step.type === 'trigger');
flowId: this.flow.id, let initialTriggerData = await this.getInitialTriggerData(triggerStep);
testRun: true,
});
let previousExecutionStep: ExecutionStep; if (this.testRun) {
let fetchedData; initialTriggerData = [initialTriggerData[0]];
const priorExecutionSteps: ExecutionSteps = {}; }
for await (const step of steps) { const executions: Execution[] = [];
const appData = App.findOneByKey(step.appKey);
const {
appKey,
connection,
key,
type,
parameters: rawParameters = {},
id,
} = step;
const isTrigger = type === 'trigger';
const AppClass = (await import(`../apps/${appKey}`)).default;
const computedParameters = Processor.computeParameters(
rawParameters,
priorExecutionSteps
);
const appInstance = new AppClass(
appData,
connection.formattedData,
computedParameters
);
const commands = isTrigger ? appInstance.triggers : appInstance.actions;
const command = commands[key];
fetchedData = await command.run();
previousExecutionStep = await execution for await (const data of initialTriggerData) {
.$relatedQuery('executionSteps') const execution = await Execution.query().insert({
.insertAndFetch({ flowId: this.flow.id,
stepId: id, testRun: this.testRun,
status: 'success', });
dataIn: previousExecutionStep?.dataOut,
dataOut: fetchedData,
});
priorExecutionSteps[id] = previousExecutionStep; executions.push(execution);
if (id === this.untilStep.id) { let previousExecutionStep: ExecutionStep;
return fetchedData; const priorExecutionSteps: ExecutionSteps = {};
let fetchedActionData = {};
for await (const step of steps) {
const appData = App.findOneByKey(step.appKey);
const {
appKey,
connection,
key,
type,
parameters: rawParameters = {},
id,
} = step;
const isTrigger = type === 'trigger';
const AppClass = (await import(`../apps/${appKey}`)).default;
const computedParameters = Processor.computeParameters(
rawParameters,
priorExecutionSteps
);
const appInstance = new AppClass(
appData,
connection.formattedData,
computedParameters
);
if (!isTrigger) {
const command = appInstance.actions[key];
fetchedActionData = await command.run();
}
previousExecutionStep = await execution
.$relatedQuery('executionSteps')
.insertAndFetch({
stepId: id,
status: 'success',
dataIn: isTrigger ? rawParameters : previousExecutionStep?.dataOut,
dataOut: isTrigger ? data : fetchedActionData,
});
priorExecutionSteps[id] = previousExecutionStep;
if (id === this.untilStep.id) {
break;
}
} }
} }
if (!this.testRun) return;
const lastExecutionStepFromFirstExecution = await executions[0]
.$relatedQuery('executionSteps')
.orderBy('created_at', 'desc')
.first();
return lastExecutionStepFromFirstExecution.dataOut;
}
async getInitialTriggerData(step: Step) {
const appData = App.findOneByKey(step.appKey);
const { appKey, connection, key, parameters: rawParameters = {} } = step;
const AppClass = (await import(`../apps/${appKey}`)).default;
const appInstance = new AppClass(
appData,
connection.formattedData,
rawParameters
);
const command = appInstance.triggers[key];
const fetchedData = await command.run();
return fetchedData; return fetchedData;
} }