feat: Implement initial version of processor
This commit is contained in:

committed by
Ömer Faruk Aydın

parent
4a8c3988c2
commit
ba2b5afe2b
@@ -1,27 +1,24 @@
|
||||
import TwitterApi from 'twitter-api-v2';
|
||||
|
||||
export default class CreateTweet {
|
||||
client: any
|
||||
parameters: any
|
||||
client: any;
|
||||
parameters: any;
|
||||
|
||||
constructor(connectionData: any, parameters: any) {
|
||||
this.client = new TwitterApi({
|
||||
appKey: connectionData.consumerKey,
|
||||
appSecret: connectionData.consumerSecret,
|
||||
accessToken: connectionData.accessToken,
|
||||
accessSecret: connectionData.accessSecret
|
||||
accessSecret: connectionData.accessSecret,
|
||||
});
|
||||
|
||||
this.parameters = parameters;
|
||||
if (parameters) {
|
||||
this.parameters = JSON.parse(parameters);
|
||||
}
|
||||
}
|
||||
|
||||
async run() {
|
||||
const response = await this.client.currentUser();
|
||||
const username = response.screen_name;
|
||||
|
||||
const userTimeline = await this.client.v1.userTimelineByUsername(username);
|
||||
const fetchedTweets = userTimeline.tweets;
|
||||
|
||||
return fetchedTweets[0];
|
||||
const tweet = await this.client.v1.tweet(this.parameters.tweet);
|
||||
return tweet;
|
||||
}
|
||||
}
|
||||
|
@@ -3,13 +3,13 @@ import Triggers from './triggers';
|
||||
import Actions from './actions';
|
||||
|
||||
export default class Twitter {
|
||||
authenticationClient: any
|
||||
triggers: any
|
||||
actions: any
|
||||
authenticationClient: any;
|
||||
triggers: any;
|
||||
actions: any;
|
||||
|
||||
constructor(connectionData: any) {
|
||||
constructor(connectionData: any, parameters: any) {
|
||||
this.authenticationClient = new Authentication(connectionData);
|
||||
this.triggers = new Triggers(connectionData);
|
||||
this.actions = new Actions(connectionData, {});
|
||||
this.actions = new Actions(connectionData, parameters);
|
||||
}
|
||||
}
|
||||
|
@@ -1,6 +1,7 @@
|
||||
import { GraphQLString, GraphQLNonNull } from 'graphql';
|
||||
import RequestWithCurrentUser from '../../types/express/request-with-current-user';
|
||||
import executeFlowType from '../types/execute-flow';
|
||||
import Processor from '../../services/processor';
|
||||
|
||||
type Params = {
|
||||
stepId: string;
|
||||
@@ -18,9 +19,8 @@ const executeFlowResolver = async (
|
||||
})
|
||||
.throwIfNotFound();
|
||||
|
||||
const appClass = (await import(`../../apps/${step.appKey}`)).default;
|
||||
const appInstance = new appClass(step.connection.data);
|
||||
const data = await appInstance.triggers[step.key].run();
|
||||
const flow = await step.$relatedQuery('flow');
|
||||
const data = await new Processor(flow, step).run();
|
||||
|
||||
await step.$query().patch({
|
||||
status: 'completed',
|
||||
|
@@ -1,10 +1,12 @@
|
||||
import Base from './base';
|
||||
import Flow from './flow';
|
||||
import ExecutionStep from './execution-step';
|
||||
|
||||
class Execution extends Base {
|
||||
id!: string;
|
||||
flowId!: number;
|
||||
testRun: boolean;
|
||||
executionSteps: ExecutionStep[];
|
||||
|
||||
static tableName = 'executions';
|
||||
|
||||
@@ -27,6 +29,14 @@ class Execution extends Base {
|
||||
to: 'flows.id',
|
||||
},
|
||||
},
|
||||
executionSteps: {
|
||||
relation: Base.HasManyRelation,
|
||||
modelClass: ExecutionStep,
|
||||
join: {
|
||||
from: 'executions.id',
|
||||
to: 'execution_steps.execution_id',
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
|
62
packages/backend/src/services/processor.ts
Normal file
62
packages/backend/src/services/processor.ts
Normal file
@@ -0,0 +1,62 @@
|
||||
import Flow from '../models/flow';
|
||||
import Step from '../models/step';
|
||||
import Execution from '../models/execution';
|
||||
import ExecutionStep from '../models/execution-step';
|
||||
|
||||
class Processor {
|
||||
flow: Flow;
|
||||
untilStep: Step;
|
||||
|
||||
constructor(flow: Flow, untilStep: Step) {
|
||||
this.flow = flow;
|
||||
this.untilStep = untilStep;
|
||||
}
|
||||
|
||||
async run() {
|
||||
const steps = await this.flow
|
||||
.$relatedQuery('steps')
|
||||
.withGraphFetched('connection')
|
||||
.orderBy('position', 'asc');
|
||||
|
||||
const execution = await Execution.query().insert({
|
||||
flowId: this.flow.id,
|
||||
testRun: true,
|
||||
});
|
||||
|
||||
let previousExecutionStep: ExecutionStep;
|
||||
let fetchedActionData;
|
||||
|
||||
for await (const step of steps) {
|
||||
if (step.type.toString() === 'trigger') {
|
||||
const appClass = (await import(`../apps/${step.appKey}`)).default;
|
||||
const appInstance = new appClass(step.connection.data);
|
||||
const fetchedTriggerData = await appInstance.triggers[step.key].run();
|
||||
|
||||
previousExecutionStep = await execution
|
||||
.$relatedQuery('executionSteps')
|
||||
.insertAndFetch({
|
||||
stepId: step.id,
|
||||
status: 'success',
|
||||
dataOut: fetchedTriggerData,
|
||||
});
|
||||
} else {
|
||||
const appClass = (await import(`../apps/${step.appKey}`)).default;
|
||||
const appInstance = new appClass(step.connection.data, step.parameters);
|
||||
fetchedActionData = await appInstance.actions[step.key].run();
|
||||
|
||||
previousExecutionStep = await execution
|
||||
.$relatedQuery('executionSteps')
|
||||
.insertAndFetch({
|
||||
stepId: step.id,
|
||||
status: 'success',
|
||||
dataIn: previousExecutionStep.dataOut,
|
||||
dataOut: fetchedActionData,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return fetchedActionData;
|
||||
}
|
||||
}
|
||||
|
||||
export default Processor;
|
Reference in New Issue
Block a user