refactor: Implement test run helper to work with services

This commit is contained in:
Faruk AYDIN
2022-10-14 20:18:58 +02:00
parent 56a9aeece7
commit 628f872180
11 changed files with 262 additions and 119 deletions

View File

@@ -62,15 +62,13 @@ const searchTweets = async (
tweets.data.push(dataItem);
});
}
} while (response.data.meta.next_token && options.lastInternalId);
} while (response.data.meta.next_token && !$.execution.testRun);
tweets.data.sort((tweet, nextTweet) => {
return (tweet.raw.id as number) - (nextTweet.raw.id as number);
});
for (const tweet of tweets.data) {
await $.process(tweet);
}
return tweets;
};
export default searchTweets;

View File

@@ -1,6 +1,5 @@
import Context from '../../types/express/context';
// eslint-disable-next-line @typescript-eslint/no-unused-vars
import flowQueue from '../../queues/flow';
import testRun from '../../services/test-run';
type Params = {
input: {
@@ -13,25 +12,22 @@ const executeFlow = async (
params: Params,
context: Context
) => {
// const untilStep = await context.currentUser
// .$relatedQuery('steps')
// .withGraphFetched('connection')
// .findOne({
// 'steps.id': params.input.stepId,
// })
// .throwIfNotFound();
// const flow = await untilStep.$relatedQuery('flow');
// const executionStep = await new Processor(flow, {
// untilStep,
// testRun: true,
// }).run();
// await untilStep.$query().patch({
// status: 'completed',
// });
// if (executionStep.errorDetails) {
// throw new Error(JSON.stringify(executionStep.errorDetails));
// }
// return { data: executionStep.dataOut, step: untilStep };
const { stepId } = params.input;
const { executionStep } = await testRun({ stepId });
const untilStep = await context.currentUser
.$relatedQuery('steps')
.findById(stepId);
await untilStep.$query().patch({
status: 'completed',
});
if (executionStep.errorDetails) {
throw new Error(JSON.stringify(executionStep.errorDetails));
}
return { data: executionStep.dataOut, step: untilStep };
};
export default executeFlow;

View File

@@ -8,8 +8,8 @@ type Params = {
};
};
const JOB_NAME = 'processorJob';
const EVERY_15_MINUTES_CRON = '*/1 * * * *';
const JOB_NAME = 'flow';
const EVERY_15_MINUTES_CRON = '*/15 * * * *';
const updateFlowStatus = async (
_parent: unknown,
@@ -43,8 +43,10 @@ const updateFlowStatus = async (
published_at: new Date().toISOString(),
});
const jobName = `${JOB_NAME}-${flow.id}`;
await flowQueue.add(
JOB_NAME,
jobName,
{ flowId: flow.id },
{
repeat: repeatOptions,

View File

@@ -0,0 +1,43 @@
import Step from '../models/step';
import ExecutionStep from '../models/execution-step';
import get from 'lodash.get';
const variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g;
export default function computeParameters(
parameters: Step['parameters'],
executionSteps: ExecutionStep[]
): Step['parameters'] {
const entries = Object.entries(parameters);
return entries.reduce((result, [key, value]: [string, unknown]) => {
if (typeof value === 'string') {
const parts = value.split(variableRegExp);
const computedValue = parts
.map((part: string) => {
const isVariable = part.match(variableRegExp);
if (isVariable) {
const stepIdAndKeyPath = part.replace(/{{step.|}}/g, '') as string;
const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.');
const keyPath = keyPaths.join('.');
const executionStep = executionSteps.find((executionStep) => {
return executionStep.stepId === stepId;
});
const data = executionStep?.dataOut;
const dataValue = get(data, keyPath);
return dataValue;
}
return part;
})
.join('');
return {
...result,
[key]: computedValue,
};
}
return result;
}, {});
}

View File

@@ -3,13 +3,7 @@ import Connection from '../models/connection';
import Flow from '../models/flow';
import Step from '../models/step';
import Execution from '../models/execution';
import {
IJSONObject,
IApp,
IGlobalVariable,
ITriggerDataItem,
} from '@automatisch/types';
import triggerQueue from '../queues/trigger';
import { IJSONObject, IApp, IGlobalVariable } from '@automatisch/types';
type GlobalVariableOptions = {
connection?: Connection;
@@ -17,12 +11,13 @@ type GlobalVariableOptions = {
flow?: Flow;
step?: Step;
execution?: Execution;
testRun?: boolean;
};
const globalVariable = async (
options: GlobalVariableOptions
): Promise<IGlobalVariable> => {
const { connection, app, flow, step, execution } = options;
const { connection, app, flow, step, execution, testRun = false } = options;
const lastInternalId = await flow?.lastInternalId();
@@ -66,19 +61,10 @@ const globalVariable = async (
},
execution: {
id: execution?.id,
testRun,
},
};
variable.process = async (triggerDataItem: ITriggerDataItem) => {
const jobName = `${step.appKey}-${triggerDataItem.meta.internalId}`;
const jobPayload = {
$: variable,
triggerDataItem,
};
await triggerQueue.add(jobName, jobPayload);
};
if (trigger && trigger.dedupeStrategy === 'unique') {
const lastInternalIds = await flow?.lastInternalIds();

View File

@@ -0,0 +1,55 @@
import Step from '../models/step';
import Flow from '../models/flow';
import Execution from '../models/execution';
import ExecutionStep from '../models/execution-step';
import computeParameters from '../helpers/compute-parameters';
import globalVariable from '../helpers/global-variable';
type ProcessActionOptions = {
flowId: string;
executionId: string;
stepId: string;
};
export const processAction = async (options: ProcessActionOptions) => {
const { flowId, stepId, executionId } = options;
const step = await Step.query().findById(stepId).throwIfNotFound();
const execution = await Execution.query()
.findById(executionId)
.throwIfNotFound();
const $ = await globalVariable({
flow: await Flow.query().findById(flowId).throwIfNotFound(),
app: await step.getApp(),
step: step,
connection: await step.$relatedQuery('connection'),
execution: execution,
});
const priorExecutionSteps = await ExecutionStep.query().where({
execution_id: $.execution.id,
});
const computedParameters = computeParameters(
$.step.parameters,
priorExecutionSteps
);
const actionCommand = await step.getActionCommand();
$.step.parameters = computedParameters;
const actionOutput = await actionCommand.run($);
const executionStep = await execution
.$relatedQuery('executionSteps')
.insertAndFetch({
stepId: $.step.id,
status: actionOutput.error ? 'failure' : 'success',
dataIn: computedParameters,
dataOut: actionOutput.error ? null : actionOutput.data.raw,
errorDetails: actionOutput.error,
});
return { flowId, stepId, executionId, executionStep };
};

View File

@@ -0,0 +1,24 @@
import Flow from '../models/flow';
import globalVariable from '../helpers/global-variable';
type ProcessFlowOptions = {
flowId: string;
testRun?: boolean;
};
export const processFlow = async (options: ProcessFlowOptions) => {
const flow = await Flow.query().findById(options.flowId).throwIfNotFound();
const triggerStep = await flow.getTriggerStep();
const triggerCommand = await triggerStep.getTriggerCommand();
const $ = await globalVariable({
flow,
connection: await triggerStep.$relatedQuery('connection'),
app: await triggerStep.getApp(),
step: triggerStep,
testRun: options.testRun,
});
return await triggerCommand.run($);
};

View File

@@ -1,73 +0,0 @@
import get from 'lodash.get';
import { IActionOutput } from '@automatisch/types';
import App from '../models/app';
import Flow from '../models/flow';
import Step from '../models/step';
import Execution from '../models/execution';
import ExecutionStep from '../models/execution-step';
import globalVariable from '../helpers/global-variable';
type ExecutionSteps = Record<string, ExecutionStep>;
type ProcessorOptions = {
untilStep?: Step;
testRun?: boolean;
};
class Processor {
flow: Flow;
untilStep?: Step;
testRun?: boolean;
static variableRegExp = /({{step\.[\da-zA-Z-]+(?:\.[\da-zA-Z-]+)+}})/g;
constructor(flow: Flow, processorOptions: ProcessorOptions) {
this.flow = flow;
this.untilStep = processorOptions.untilStep;
this.testRun = processorOptions.testRun;
}
static computeParameters(
parameters: Step['parameters'],
executionSteps: ExecutionStep[]
): Step['parameters'] {
const entries = Object.entries(parameters);
return entries.reduce((result, [key, value]: [string, unknown]) => {
if (typeof value === 'string') {
const parts = value.split(Processor.variableRegExp);
const computedValue = parts
.map((part: string) => {
const isVariable = part.match(Processor.variableRegExp);
if (isVariable) {
const stepIdAndKeyPath = part.replace(
/{{step.|}}/g,
''
) as string;
const [stepId, ...keyPaths] = stepIdAndKeyPath.split('.');
const keyPath = keyPaths.join('.');
const executionStep = executionSteps.find((executionStep) => {
return executionStep.stepId === stepId;
});
const data = executionStep?.dataOut;
const dataValue = get(data, keyPath);
return dataValue;
}
return part;
})
.join('');
return {
...result,
[key]: computedValue,
};
}
return result;
}, {});
}
}
export default Processor;

View File

@@ -0,0 +1,65 @@
import Step from '../models/step';
import { processFlow } from '../services/flow';
import { processTrigger } from '../services/trigger';
import { processAction } from '../services/action';
type TestRunOptions = {
stepId: string;
};
const testRun = async (options: TestRunOptions) => {
const untilStep = await Step.query()
.findById(options.stepId)
.throwIfNotFound();
const flow = await untilStep.$relatedQuery('flow');
const [triggerStep, ...actionSteps] = await flow
.$relatedQuery('steps')
.withGraphFetched('connection')
.orderBy('position', 'asc');
const { data, error: triggerError } = await processFlow({
flowId: flow.id,
testRun: true,
});
const firstTriggerDataItem = data[0];
const { executionId, executionStep: triggerExecutionStep } =
await processTrigger({
flowId: flow.id,
stepId: triggerStep.id,
triggerDataItem: firstTriggerDataItem,
testRun: true,
});
if (triggerError) {
const { executionStep: triggerExecutionStepWithError } =
await processTrigger({
flowId: flow.id,
stepId: triggerStep.id,
error: triggerError,
testRun: true,
});
return { executionStep: triggerExecutionStepWithError };
}
if (triggerStep.id === untilStep.id) {
return { executionStep: triggerExecutionStep };
}
for (const actionStep of actionSteps) {
const { executionStep: actionExecutionStep } = await processAction({
flowId: flow.id,
stepId: actionStep.id,
executionId,
});
if (actionStep.id === untilStep.id || actionExecutionStep.errorDetails) {
return { executionStep: actionExecutionStep };
}
}
};
export default testRun;

View File

@@ -0,0 +1,46 @@
import { IJSONObject, ITriggerDataItem } from '@automatisch/types';
import Step from '../models/step';
import Flow from '../models/flow';
import Execution from '../models/execution';
import globalVariable from '../helpers/global-variable';
type ProcessTriggerOptions = {
flowId: string;
stepId: string;
triggerDataItem?: ITriggerDataItem;
error?: IJSONObject;
testRun?: boolean;
};
export const processTrigger = async (options: ProcessTriggerOptions) => {
const { flowId, stepId, triggerDataItem, error, testRun } = options;
const step = await Step.query().findById(stepId).throwIfNotFound();
const $ = await globalVariable({
flow: await Flow.query().findById(flowId).throwIfNotFound(),
app: await step.getApp(),
step: step,
connection: await step.$relatedQuery('connection'),
});
// check if we already process this trigger data item or not!
const execution = await Execution.query().insert({
flowId: $.flow.id,
testRun,
internalId: triggerDataItem.meta.internalId,
});
const executionStep = await execution
.$relatedQuery('executionSteps')
.insertAndFetch({
stepId: $.step.id,
status: error ? 'failure' : 'success',
dataIn: $.step.parameters,
dataOut: !error ? triggerDataItem.raw : null,
errorDetails: error,
});
return { flowId, stepId, executionId: execution.id, executionStep };
};

View File

@@ -270,6 +270,7 @@ export type IGlobalVariable = {
};
execution?: {
id: string;
testRun: boolean;
}
process?: (triggerDataItem: ITriggerDataItem) => Promise<void>;
};