feat: add shared connection capability

This commit is contained in:
Ali BARIN
2023-09-08 11:32:22 +00:00
parent aefff5c861
commit 099a8ea2cf
19 changed files with 315 additions and 103 deletions

View File

@@ -59,8 +59,8 @@
"http-proxy-agent": "^7.0.0",
"https-proxy-agent": "^7.0.1",
"jsonwebtoken": "^9.0.0",
"knex": "^2.4.0",
"libphonenumber-js": "^1.10.48",
"knex": "^2.5.1",
"lodash.get": "^4.4.2",
"luxon": "2.5.2",
"memory-cache": "^0.2.0",
@@ -69,7 +69,7 @@
"node-html-markdown": "^1.3.0",
"nodemailer": "6.7.0",
"oauth-1.0a": "^2.2.6",
"objection": "^3.0.0",
"objection": "^3.1.1",
"passport": "^0.6.0",
"pg": "^8.7.1",
"php-serialize": "^4.0.2",

View File

@@ -0,0 +1,15 @@
import { Knex } from 'knex';
export async function up(knex: Knex): Promise<void> {
return knex.schema.createTable('shared_connections', (table) => {
table.uuid('id').primary().defaultTo(knex.raw('gen_random_uuid()'));
table.uuid('connection_id').notNullable().references('id').inTable('connections');
table.uuid('role_id').notNullable().references('id').inTable('roles');
table.timestamps(true, true);
});
}
export async function down(knex: Knex): Promise<void> {
return knex.schema.dropTable('shared_connections');
}

View File

@@ -28,11 +28,11 @@ const createFlow = async (
});
if (connectionId) {
const hasConnection = await context.currentUser
.$relatedQuery('connections')
const connection = await context.currentUser
.relatedConnectionsQuery()
.findById(connectionId);
if (!hasConnection) {
if (!connection) {
throw new Error('The connection does not exist!');
}
}

View File

@@ -1,4 +1,5 @@
import Context from '../../types/express/context';
import Connection from '../../models/connection';
type Params = {
input: {
@@ -11,10 +12,13 @@ const deleteConnection = async (
params: Params,
context: Context
) => {
context.currentUser.can('delete', 'Connection');
const conditions = context.currentUser.can('delete', 'Connection');
const userConnections = context.currentUser.$relatedQuery('connections');
const allConnections = Connection.query();
const baseQuery = conditions.isCreator ? userConnections : allConnections;
await context.currentUser
.$relatedQuery('connections')
await baseQuery
.clone()
.delete()
.findOne({
id: params.input.id,

View File

@@ -4,6 +4,7 @@ import deleteUserQueue from '../../queues/delete-user.ee';
import flowQueue from '../../queues/flow';
import Flow from '../../models/flow';
import Execution from '../../models/execution';
import User from '../../models/user';
import ExecutionStep from '../../models/execution-step';
import appConfig from '../../config/app';
@@ -14,51 +15,87 @@ const deleteCurrentUser = async (
) => {
const id = context.currentUser.id;
const flows = await context.currentUser.$relatedQuery('flows').where({
active: true,
});
try {
await User.transaction(async (trx) => {
const flows = await context.currentUser
.$relatedQuery('flows', trx)
.where({
active: true,
});
const repeatableJobs = await flowQueue.getRepeatableJobs();
const { count } = await context.currentUser
.$relatedQuery('connections', trx)
.joinRelated('sharedConnections')
.joinRelated('steps')
.join('flows', function () {
this
.on(
'flows.id', '=', 'steps.flow_id'
)
.andOnVal(
'flows.user_id', '<>', id
)
.andOnVal(
'flows.active', '=', true
)
})
.count()
.first();
for (const flow of flows) {
const job = repeatableJobs.find((job) => job.id === flow.id);
if (count) {
throw new Error('The shared connections must be removed first!');
}
if (job) {
await flowQueue.removeRepeatableByKey(job.key);
const executionIds = (
await context.currentUser
.$relatedQuery('executions', trx)
.select('executions.id')
).map((execution: Execution) => execution.id);
const flowIds = flows.map((flow) => flow.id);
await ExecutionStep.query(trx).delete().whereIn('execution_id', executionIds);
await context.currentUser.$relatedQuery('executions', trx).delete();
await context.currentUser.$relatedQuery('steps', trx).delete();
await Flow.query(trx).whereIn('id', flowIds).delete();
await context.currentUser.$relatedQuery('connections', trx).delete();
await context.currentUser.$relatedQuery('identities', trx).delete();
if (appConfig.isCloud) {
await context.currentUser.$relatedQuery('subscriptions', trx).delete();
await context.currentUser.$relatedQuery('usageData', trx).delete();
}
await context.currentUser.$query(trx).delete();
const jobName = `Delete user - ${id}`;
const jobPayload = { id };
const millisecondsFor30Days = Duration.fromObject({ days: 30 }).toMillis();
const jobOptions = {
delay: millisecondsFor30Days,
};
// must be done as the last action as this cannot be reverted via the transaction!
const repeatableJobs = await flowQueue.getRepeatableJobs();
for (const flow of flows) {
const job = repeatableJobs.find((job) => job.id === flow.id);
if (job) {
await flowQueue.removeRepeatableByKey(job.key);
}
}
await deleteUserQueue.add(jobName, jobPayload, jobOptions);
});
return true;
} catch (err) {
if (err instanceof Error) {
throw err;
}
throw new Error('The user deletion has failed!');
}
const executionIds = (
await context.currentUser
.$relatedQuery('executions')
.select('executions.id')
).map((execution: Execution) => execution.id);
const flowIds = flows.map((flow) => flow.id);
await ExecutionStep.query().delete().whereIn('execution_id', executionIds);
await context.currentUser.$relatedQuery('executions').delete();
await context.currentUser.$relatedQuery('steps').delete();
await Flow.query().whereIn('id', flowIds).delete();
await context.currentUser.$relatedQuery('connections').delete();
await context.currentUser.$relatedQuery('identities').delete();
if (appConfig.isCloud) {
await context.currentUser.$relatedQuery('subscriptions').delete();
await context.currentUser.$relatedQuery('usageData').delete();
}
await context.currentUser.$query().delete();
const jobName = `Delete user - ${id}`;
const jobPayload = { id };
const millisecondsFor30Days = Duration.fromObject({ days: 30 }).toMillis();
const jobOptions = {
delay: millisecondsFor30Days,
};
await deleteUserQueue.add(jobName, jobPayload, jobOptions);
return true;
};
export default deleteCurrentUser;

View File

@@ -1,4 +1,5 @@
import Context from '../../types/express/context';
import Connection from '../../models/connection';
type Params = {
input: {
@@ -11,10 +12,13 @@ const resetConnection = async (
params: Params,
context: Context
) => {
context.currentUser.can('create', 'Connection');
const conditions = context.currentUser.can('update', 'Connection');
const userConnections = context.currentUser.$relatedQuery('connections');
const allConnections = Connection.query();
const baseQuery = conditions.isCreator ? userConnections : allConnections;
let connection = await context.currentUser
.$relatedQuery('connections')
let connection = await baseQuery
.clone()
.findOne({
id: params.input.id,
})

View File

@@ -1,6 +1,7 @@
import { IJSONObject } from '@automatisch/types';
import Context from '../../types/express/context';
import AppAuthClient from '../../models/app-auth-client';
import Connection from '../../models/connection';
type Params = {
input: {
@@ -15,10 +16,13 @@ const updateConnection = async (
params: Params,
context: Context
) => {
context.currentUser.can('create', 'Connection');
const conditions = context.currentUser.can('update', 'Connection');
const userConnections = context.currentUser.$relatedQuery('connections');
const allConnections = Connection.query();
const baseQuery = conditions.isCreator ? userConnections : allConnections;
let connection = await context.currentUser
.$relatedQuery('connections')
let connection = await baseQuery
.clone()
.findOne({
id: params.input.id,
})

View File

@@ -45,10 +45,11 @@ const updateStep = async (
canSeeAllConnections = !conditions.isCreator;
} catch {
// void
// The user does not have permission to read any connections!
throw new Error('The connection does not exist!');
}
const userConnections = context.currentUser.$relatedQuery('connections');
const userConnections = context.currentUser.relatedConnectionsQuery();
const allConnections = Connection.query();
const baseConnectionsQuery = canSeeAllConnections ? allConnections : userConnections;

View File

@@ -9,28 +9,55 @@ type Params = {
const getApp = async (_parent: unknown, params: Params, context: Context) => {
const conditions = context.currentUser.can('read', 'Connection');
const userConnections = context.currentUser.$relatedQuery('connections');
const allConnections = Connection.query();
const connectionBaseQuery = conditions.isCreator ? userConnections : allConnections;
const app = await App.findOneByKey(params.key);
if (context.currentUser) {
const connections = await connectionBaseQuery
.clone()
.select('connections.*')
const userConnections = context.currentUser.relatedConnectionsQuery();
const allConnections = Connection.query();
const connectionBaseQuery = conditions.isCreator ? userConnections : allConnections;
const connections = await Connection.query()
.with('connections', connectionBaseQuery)
.with(
'connections_with_flow_count',
Connection.query()
.clearSelect()
.select('connections.id')
.leftJoinRelated('steps')
.leftJoin('flows', function () {
this
.on(
'flows.id',
'=',
'steps.flow_id',
)
if (conditions.isCreator) {
this.andOnVal(
'flows.user_id',
'=',
context.currentUser.id
)
}
})
.where({
'connections.key': params.key,
'connections.draft': false,
})
.countDistinct('steps.flow_id as flowCount')
.groupBy('connections.id')
)
.select(
'connections.*',
'connections_with_flow_count.flowCount as flowCount'
)
.from('connections')
.withGraphFetched({
appConfig: true,
appAuthClient: true
})
.fullOuterJoinRelated('steps')
.where({
'connections.key': params.key,
'connections.draft': false,
})
.countDistinct('steps.flow_id as flowCount')
.groupBy('connections.id')
.orderBy('created_at', 'desc');
.joinRaw('join connections_with_flow_count on connections.id = connections_with_flow_count.id')
.orderBy('connections.created_at', 'desc');
return {
...app,

View File

@@ -15,7 +15,7 @@ const getConnectedApps = async (
) => {
const conditions = context.currentUser.can('read', 'Connection');
const userConnections = context.currentUser.$relatedQuery('connections');
const userConnections = context.currentUser.relatedConnectionsQuery();
const allConnections = Connection.query();
const connectionBaseQuery = conditions.isCreator ? userConnections : allConnections;
@@ -25,8 +25,9 @@ const getConnectedApps = async (
let apps = await App.findAll(params.name);
const connections = await connectionBaseQuery
.clone()
const connections = await Connection
.query()
.with('connections', connectionBaseQuery)
.select('connections.key')
.where({ draft: false })
.count('connections.id as count')

View File

@@ -13,15 +13,15 @@ const testConnection = async (
params: Params,
context: Context
) => {
const conditions = context.currentUser.can('update', 'Connection');
const userConnections = context.currentUser.$relatedQuery('connections');
const conditions = context.currentUser.can('read', 'Connection');
const userConnections = context.currentUser.relatedConnectionsQuery();
const allConnections = Connection.query();
const connectionBaseQuery = conditions.isCreator ? userConnections : allConnections;
let connection = await connectionBaseQuery
.clone()
.findOne({
id: params.id,
'connections.id': params.id,
})
.throwIfNotFound();

View File

@@ -244,6 +244,7 @@ type AuthLink {
type Connection {
id: String
key: String
shared: Boolean
reconnectable: Boolean
appAuthClientId: String
formattedData: ConnectionData

View File

@@ -1,18 +1,18 @@
import { QueryContext, ModelOptions } from 'objection';
import type { RelationMappings } from 'objection';
import { IJSONObject, IRequest } from '@automatisch/types';
import { AES, enc } from 'crypto-js';
import { IRequest } from '@automatisch/types';
import App from './app';
import AppConfig from './app-config';
import AppAuthClient from './app-auth-client';
import Base from './base';
import User from './user';
import Step from './step';
import ExtendedQueryBuilder from './query-builder';
import type { RelationMappings } from 'objection';
import { ModelOptions, QueryContext } from 'objection';
import appConfig from '../config/app';
import { IJSONObject } from '@automatisch/types';
import Telemetry from '../helpers/telemetry';
import globalVariable from '../helpers/global-variable';
import Telemetry from '../helpers/telemetry';
import App from './app';
import AppAuthClient from './app-auth-client';
import AppConfig from './app-config';
import Base from './base';
import ExtendedQueryBuilder from './query-builder';
import SharedConnection from './shared-connection';
import Step from './step';
import User from './user';
class Connection extends Base {
id!: string;
@@ -24,6 +24,9 @@ class Connection extends Base {
draft: boolean;
count?: number;
flowCount?: number;
sharedConnections?: SharedConnection[];
// computed via `User.relevantConnectionsQuery`
shared?: boolean;
user?: User;
steps?: Step[];
triggerSteps?: Step[];
@@ -46,6 +49,7 @@ class Connection extends Base {
appAuthClientId: { type: 'string', format: 'uuid' },
verified: { type: 'boolean', default: false },
draft: { type: 'boolean' },
shared: { type: 'boolean', readOnly: true, },
deletedAt: { type: 'string' },
createdAt: { type: 'string' },
updatedAt: { type: 'string' },
@@ -100,6 +104,14 @@ class Connection extends Base {
to: 'app_auth_clients.id',
},
},
sharedConnections: {
relation: Base.HasManyRelation,
modelClass: SharedConnection,
join: {
from: 'connections.id',
to: 'shared_connections.connection_id',
},
},
});
get reconnectable() {

View File

@@ -0,0 +1,45 @@
import Base from './base';
import Role from './role';
import User from './user';
class SharedConnection extends Base {
id!: string;
roleId!: string;
connectionId!: string;
static tableName = 'shared_connections';
static jsonSchema = {
type: 'object',
required: ['name', 'key'],
properties: {
id: { type: 'string', format: 'uuid' },
roleId: { type: 'string', format: 'uuid' },
connectionId: { type: 'string', format: 'uuid' },
createdAt: { type: 'string' },
updatedAt: { type: 'string' },
},
};
static relationMappings = () => ({
roles: {
relation: Base.HasManyRelation,
modelClass: Role,
join: {
from: 'shared_connections.role_id',
to: 'roles.id',
},
},
users: {
relation: Base.HasManyRelation,
modelClass: User,
join: {
from: 'shared_connections.role_id',
to: 'users.role_id',
},
},
});
}
export default SharedConnection;

View File

@@ -1,7 +1,7 @@
import bcrypt from 'bcrypt';
import { DateTime } from 'luxon';
import crypto from 'node:crypto';
import { ModelOptions, QueryContext } from 'objection';
import { raw, ModelOptions, QueryContext } from 'objection';
import appConfig from '../config/app';
import { hasValidLicense } from '../helpers/license.ee';
@@ -28,6 +28,7 @@ class User extends Base {
resetPasswordTokenSentAt: string;
trialExpiryDate: string;
connections?: Connection[];
sharedConnections?: Connection[];
flows?: Flow[];
steps?: Step[];
executions?: Execution[];
@@ -69,6 +70,18 @@ class User extends Base {
to: 'connections.user_id',
},
},
sharedConnections: {
relation: Base.ManyToManyRelation,
modelClass: Connection,
join: {
from: 'users.role_id',
through: {
from: 'shared_connections.role_id',
to: 'shared_connections.connection_id',
},
to: 'connections.id',
},
},
flows: {
relation: Base.HasManyRelation,
modelClass: Flow,
@@ -165,6 +178,40 @@ class User extends Base {
},
});
relatedConnectionsQuery() {
return Connection
.query()
.select('connections.*', raw('shared_connections.role_id IS NOT NULL as shared'))
.leftJoin(
'shared_connections',
'connections.id',
'=',
'shared_connections.connection_id'
)
.join(
'users',
function () {
this
.on(
'users.id',
'=',
'connections.user_id',
)
.orOn(
'users.role_id',
'=',
'shared_connections.role_id'
)
},
)
.where(
'users.id',
'=',
this.id
)
.groupBy('connections.id', 'shared_connections.role_id');
}
login(password: string) {
return bcrypt.compare(password, this.password);
}