feat(filter): add filter app
This commit is contained in:
79
packages/backend/src/apps/filter/actions/continue/index.ts
Normal file
79
packages/backend/src/apps/filter/actions/continue/index.ts
Normal file
@@ -0,0 +1,79 @@
|
||||
import defineAction from '../../../../helpers/define-action';
|
||||
|
||||
type TGroupItem = {
|
||||
key: string;
|
||||
operator: keyof TOperators;
|
||||
value: string;
|
||||
id: string;
|
||||
}
|
||||
|
||||
type TGroup = Record<'and', TGroupItem[]>;
|
||||
|
||||
const isEqual = (a: string, b: string) => a === b;
|
||||
const isNotEqual = (a: string, b: string) => !isEqual(a, b)
|
||||
const isGreaterThan = (a: string, b: string) => Number(a) > Number(b);
|
||||
const isLessThan = (a: string, b: string) => Number(a) < Number(b);
|
||||
const isGreaterThanOrEqual = (a: string, b: string) => Number(a) >= Number(b);
|
||||
const isLessThanOrEqual = (a: string, b: string) => Number(a) <= Number(b);
|
||||
const contains = (a: string, b: string) => a.includes(b);
|
||||
const doesNotContain = (a: string, b: string) => !contains(a, b);
|
||||
|
||||
type TOperatorFunc = (a: string, b: string) => boolean;
|
||||
|
||||
type TOperators = {
|
||||
equal: TOperatorFunc;
|
||||
not_equal: TOperatorFunc;
|
||||
greater_than: TOperatorFunc;
|
||||
less_than: TOperatorFunc;
|
||||
greater_than_or_equal: TOperatorFunc;
|
||||
less_than_or_equal: TOperatorFunc;
|
||||
contains: TOperatorFunc;
|
||||
not_contains: TOperatorFunc;
|
||||
};
|
||||
|
||||
const operators: TOperators = {
|
||||
'equal': isEqual,
|
||||
'not_equal': isNotEqual,
|
||||
'greater_than': isGreaterThan,
|
||||
'less_than': isLessThan,
|
||||
'greater_than_or_equal': isGreaterThanOrEqual,
|
||||
'less_than_or_equal': isLessThanOrEqual,
|
||||
'contains': contains,
|
||||
'not_contains': doesNotContain,
|
||||
};
|
||||
|
||||
const operate = (operation: keyof TOperators, a: string, b: string) => {
|
||||
return operators[operation](a, b);
|
||||
};
|
||||
|
||||
export default defineAction({
|
||||
name: 'Continue if conditions match',
|
||||
key: 'continueIfMatches',
|
||||
description: 'Let the execution continue if the conditions match',
|
||||
arguments: [],
|
||||
|
||||
async run($) {
|
||||
const orGroups = $.step.parameters.or as TGroup[];
|
||||
|
||||
const matchingGroups = orGroups.reduce((groups, group) => {
|
||||
const matchingConditions = group.and
|
||||
.filter((condition) => operate(condition.operator, condition.key, condition.value));
|
||||
|
||||
if (matchingConditions.length) {
|
||||
return groups.concat([{ and: matchingConditions }]);
|
||||
}
|
||||
|
||||
return groups;
|
||||
}, []);
|
||||
|
||||
if (matchingGroups.length === 0) {
|
||||
$.execution.exit();
|
||||
}
|
||||
|
||||
$.setActionItem({
|
||||
raw: {
|
||||
or: matchingGroups,
|
||||
}
|
||||
});
|
||||
},
|
||||
});
|
3
packages/backend/src/apps/filter/actions/index.ts
Normal file
3
packages/backend/src/apps/filter/actions/index.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
import continueIfMatches from './continue';
|
||||
|
||||
export default [continueIfMatches];
|
8
packages/backend/src/apps/filter/assets/favicon.svg
Normal file
8
packages/backend/src/apps/filter/assets/favicon.svg
Normal file
@@ -0,0 +1,8 @@
|
||||
<svg width="800px" height="800px" viewBox="0 0 512 512" version="1.1" xmlns="http://www.w3.org/2000/svg" xmlns:xlink="http://www.w3.org/1999/xlink">
|
||||
<g id="Page-1" stroke="none" stroke-width="1" fill="none" fill-rule="evenodd">
|
||||
<g id="Shape" fill="#000000" transform="translate(42.666667, 85.333333)">
|
||||
<path d="M3.55271368e-14,1.42108547e-14 L191.565013,234.666667 L192,234.666667 L192,384 L234.666667,384 L234.666667,234.666667 L426.666667,1.42108547e-14 L3.55271368e-14,1.42108547e-14 Z M214.448,192 L211.81248,192 L89.9076267,42.6666667 L336.630187,42.6666667 L214.448,192 Z">
|
||||
</path>
|
||||
</g>
|
||||
</g>
|
||||
</svg>
|
After Width: | Height: | Size: 628 B |
0
packages/backend/src/apps/filter/index.d.ts
vendored
Normal file
0
packages/backend/src/apps/filter/index.d.ts
vendored
Normal file
14
packages/backend/src/apps/filter/index.ts
Normal file
14
packages/backend/src/apps/filter/index.ts
Normal file
@@ -0,0 +1,14 @@
|
||||
import defineApp from '../../helpers/define-app';
|
||||
import actions from './actions';
|
||||
|
||||
export default defineApp({
|
||||
name: 'Filter',
|
||||
key: 'filter',
|
||||
iconUrl: '{BASE_URL}/apps/filter/assets/favicon.svg',
|
||||
authDocUrl: 'https://automatisch.io/docs/apps/filter/connection',
|
||||
supportsConnections: false,
|
||||
baseUrl: '',
|
||||
apiBaseUrl: '',
|
||||
primaryColor: '001F52',
|
||||
actions,
|
||||
});
|
3
packages/backend/src/errors/already-processed.ts
Normal file
3
packages/backend/src/errors/already-processed.ts
Normal file
@@ -0,0 +1,3 @@
|
||||
import BaseError from './base';
|
||||
|
||||
export default class AlreadyProcessedError extends BaseError { }
|
@@ -38,6 +38,13 @@ export default function computeParameters(
|
||||
};
|
||||
}
|
||||
|
||||
if (Array.isArray(value)) {
|
||||
return {
|
||||
...result,
|
||||
[key]: value.map(item => computeParameters(item, executionSteps)),
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
...result,
|
||||
[key]: value,
|
||||
|
@@ -13,6 +13,7 @@ import {
|
||||
IRequest,
|
||||
} from '@automatisch/types';
|
||||
import EarlyExitError from '../errors/early-exit';
|
||||
import AlreadyProcessedError from '../errors/already-processed';
|
||||
|
||||
type GlobalVariableOptions = {
|
||||
connection?: Connection;
|
||||
@@ -77,6 +78,9 @@ const globalVariable = async (
|
||||
execution: {
|
||||
id: execution?.id,
|
||||
testRun,
|
||||
exit: () => {
|
||||
throw new EarlyExitError();
|
||||
}
|
||||
},
|
||||
lastExecutionStep: (await step?.getLastExecutionStep())?.toJSON(),
|
||||
triggerOutput: {
|
||||
@@ -93,7 +97,7 @@ const globalVariable = async (
|
||||
!$.execution.testRun
|
||||
) {
|
||||
// early exit as we do not want to process duplicate items in actual executions
|
||||
throw new EarlyExitError();
|
||||
throw new AlreadyProcessedError();
|
||||
}
|
||||
|
||||
$.triggerOutput.data.push(triggerItem);
|
||||
|
@@ -5,6 +5,8 @@ import ExecutionStep from '../models/execution-step';
|
||||
import computeParameters from '../helpers/compute-parameters';
|
||||
import globalVariable from '../helpers/global-variable';
|
||||
import HttpError from '../errors/http';
|
||||
import EarlyExitError from '../errors/early-exit';
|
||||
import AlreadyProcessedError from '../errors/already-processed';
|
||||
|
||||
type ProcessActionOptions = {
|
||||
flowId: string;
|
||||
@@ -44,13 +46,19 @@ export const processAction = async (options: ProcessActionOptions) => {
|
||||
try {
|
||||
await actionCommand.run($);
|
||||
} catch (error) {
|
||||
if (error instanceof HttpError) {
|
||||
$.actionOutput.error = error.details;
|
||||
} else {
|
||||
try {
|
||||
$.actionOutput.error = JSON.parse(error.message);
|
||||
} catch {
|
||||
$.actionOutput.error = { error: error.message };
|
||||
const shouldEarlyExit = error instanceof EarlyExitError;
|
||||
const shouldNotProcess = error instanceof AlreadyProcessedError;
|
||||
const shouldNotConsiderAsError = shouldEarlyExit || shouldNotProcess;
|
||||
|
||||
if (!shouldNotConsiderAsError) {
|
||||
if (error instanceof HttpError) {
|
||||
$.actionOutput.error = error.details;
|
||||
} else {
|
||||
try {
|
||||
$.actionOutput.error = JSON.parse(error.message);
|
||||
} catch {
|
||||
$.actionOutput.error = { error: error.message };
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
import Flow from '../models/flow';
|
||||
import globalVariable from '../helpers/global-variable';
|
||||
import EarlyExitError from '../errors/early-exit';
|
||||
import AlreadyProcessedError from '../errors/already-processed';
|
||||
import HttpError from '../errors/http';
|
||||
|
||||
type ProcessFlowOptions = {
|
||||
@@ -29,7 +30,11 @@ export const processFlow = async (options: ProcessFlowOptions) => {
|
||||
await triggerCommand.run($);
|
||||
}
|
||||
} catch (error) {
|
||||
if (error instanceof EarlyExitError === false) {
|
||||
const shouldEarlyExit = error instanceof EarlyExitError;
|
||||
const shouldNotProcess = error instanceof AlreadyProcessedError;
|
||||
const shouldNotConsiderAsError = shouldEarlyExit || shouldNotProcess;
|
||||
|
||||
if (!shouldNotConsiderAsError) {
|
||||
if (error instanceof HttpError) {
|
||||
$.triggerOutput.error = error.details;
|
||||
} else {
|
||||
|
@@ -21,7 +21,7 @@ const DEFAULT_DELAY_DURATION = 0;
|
||||
export const worker = new Worker(
|
||||
'action',
|
||||
async (job) => {
|
||||
const { stepId, flowId, executionId, computedParameters } = await processAction(
|
||||
const { stepId, flowId, executionId, computedParameters, executionStep } = await processAction(
|
||||
job.data as JobData
|
||||
);
|
||||
|
||||
@@ -48,6 +48,10 @@ export const worker = new Worker(
|
||||
jobOptions.delay = delayAsMilliseconds(step.key, computedParameters);
|
||||
}
|
||||
|
||||
if (step.appKey === 'filter' && !executionStep.dataOut) {
|
||||
return;
|
||||
}
|
||||
|
||||
await actionQueue.add(jobName, jobPayload, jobOptions);
|
||||
},
|
||||
{ connection: redisConfig }
|
||||
|
Reference in New Issue
Block a user