From ba2b5afe2bc02076c82aeda4c1ff41f8c5221deb Mon Sep 17 00:00:00 2001 From: Faruk AYDIN Date: Sun, 20 Feb 2022 17:34:28 +0300 Subject: [PATCH] feat: Implement initial version of processor --- .../src/apps/twitter/actions/create-tweet.ts | 19 +++--- packages/backend/src/apps/twitter/index.ts | 10 +-- .../src/graphql/mutations/execute-flow.ts | 6 +- packages/backend/src/models/execution.ts | 10 +++ packages/backend/src/services/processor.ts | 62 +++++++++++++++++++ 5 files changed, 88 insertions(+), 19 deletions(-) create mode 100644 packages/backend/src/services/processor.ts diff --git a/packages/backend/src/apps/twitter/actions/create-tweet.ts b/packages/backend/src/apps/twitter/actions/create-tweet.ts index cfaeb1aa..2cb60b76 100644 --- a/packages/backend/src/apps/twitter/actions/create-tweet.ts +++ b/packages/backend/src/apps/twitter/actions/create-tweet.ts @@ -1,27 +1,24 @@ import TwitterApi from 'twitter-api-v2'; export default class CreateTweet { - client: any - parameters: any + client: any; + parameters: any; constructor(connectionData: any, parameters: any) { this.client = new TwitterApi({ appKey: connectionData.consumerKey, appSecret: connectionData.consumerSecret, accessToken: connectionData.accessToken, - accessSecret: connectionData.accessSecret + accessSecret: connectionData.accessSecret, }); - this.parameters = parameters; + if (parameters) { + this.parameters = JSON.parse(parameters); + } } async run() { - const response = await this.client.currentUser(); - const username = response.screen_name; - - const userTimeline = await this.client.v1.userTimelineByUsername(username); - const fetchedTweets = userTimeline.tweets; - - return fetchedTweets[0]; + const tweet = await this.client.v1.tweet(this.parameters.tweet); + return tweet; } } diff --git a/packages/backend/src/apps/twitter/index.ts b/packages/backend/src/apps/twitter/index.ts index 97f19535..048889c3 100644 --- a/packages/backend/src/apps/twitter/index.ts +++ b/packages/backend/src/apps/twitter/index.ts @@ -3,13 +3,13 @@ import Triggers from './triggers'; import Actions from './actions'; export default class Twitter { - authenticationClient: any - triggers: any - actions: any + authenticationClient: any; + triggers: any; + actions: any; - constructor(connectionData: any) { + constructor(connectionData: any, parameters: any) { this.authenticationClient = new Authentication(connectionData); this.triggers = new Triggers(connectionData); - this.actions = new Actions(connectionData, {}); + this.actions = new Actions(connectionData, parameters); } } diff --git a/packages/backend/src/graphql/mutations/execute-flow.ts b/packages/backend/src/graphql/mutations/execute-flow.ts index ca0c3542..de64ccea 100644 --- a/packages/backend/src/graphql/mutations/execute-flow.ts +++ b/packages/backend/src/graphql/mutations/execute-flow.ts @@ -1,6 +1,7 @@ import { GraphQLString, GraphQLNonNull } from 'graphql'; import RequestWithCurrentUser from '../../types/express/request-with-current-user'; import executeFlowType from '../types/execute-flow'; +import Processor from '../../services/processor'; type Params = { stepId: string; @@ -18,9 +19,8 @@ const executeFlowResolver = async ( }) .throwIfNotFound(); - const appClass = (await import(`../../apps/${step.appKey}`)).default; - const appInstance = new appClass(step.connection.data); - const data = await appInstance.triggers[step.key].run(); + const flow = await step.$relatedQuery('flow'); + const data = await new Processor(flow, step).run(); await step.$query().patch({ status: 'completed', diff --git a/packages/backend/src/models/execution.ts b/packages/backend/src/models/execution.ts index eaa74c08..79f11f56 100644 --- a/packages/backend/src/models/execution.ts +++ b/packages/backend/src/models/execution.ts @@ -1,10 +1,12 @@ import Base from './base'; import Flow from './flow'; +import ExecutionStep from './execution-step'; class Execution extends Base { id!: string; flowId!: number; testRun: boolean; + executionSteps: ExecutionStep[]; static tableName = 'executions'; @@ -27,6 +29,14 @@ class Execution extends Base { to: 'flows.id', }, }, + executionSteps: { + relation: Base.HasManyRelation, + modelClass: ExecutionStep, + join: { + from: 'executions.id', + to: 'execution_steps.execution_id', + }, + }, }); } diff --git a/packages/backend/src/services/processor.ts b/packages/backend/src/services/processor.ts new file mode 100644 index 00000000..0a9b1d03 --- /dev/null +++ b/packages/backend/src/services/processor.ts @@ -0,0 +1,62 @@ +import Flow from '../models/flow'; +import Step from '../models/step'; +import Execution from '../models/execution'; +import ExecutionStep from '../models/execution-step'; + +class Processor { + flow: Flow; + untilStep: Step; + + constructor(flow: Flow, untilStep: Step) { + this.flow = flow; + this.untilStep = untilStep; + } + + async run() { + const steps = await this.flow + .$relatedQuery('steps') + .withGraphFetched('connection') + .orderBy('position', 'asc'); + + const execution = await Execution.query().insert({ + flowId: this.flow.id, + testRun: true, + }); + + let previousExecutionStep: ExecutionStep; + let fetchedActionData; + + for await (const step of steps) { + if (step.type.toString() === 'trigger') { + const appClass = (await import(`../apps/${step.appKey}`)).default; + const appInstance = new appClass(step.connection.data); + const fetchedTriggerData = await appInstance.triggers[step.key].run(); + + previousExecutionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: step.id, + status: 'success', + dataOut: fetchedTriggerData, + }); + } else { + const appClass = (await import(`../apps/${step.appKey}`)).default; + const appInstance = new appClass(step.connection.data, step.parameters); + fetchedActionData = await appInstance.actions[step.key].run(); + + previousExecutionStep = await execution + .$relatedQuery('executionSteps') + .insertAndFetch({ + stepId: step.id, + status: 'success', + dataIn: previousExecutionStep.dataOut, + dataOut: fetchedActionData, + }); + } + } + + return fetchedActionData; + } +} + +export default Processor;