Working on orchestration

This commit is contained in:
Owen
2025-11-20 10:31:09 -05:00
parent fa5facdf33
commit 3750c36aa7
10 changed files with 177 additions and 90 deletions

View File

@@ -88,7 +88,7 @@ export const sites = pgTable("sites", {
publicKey: varchar("publicKey"), publicKey: varchar("publicKey"),
lastHolePunch: bigint("lastHolePunch", { mode: "number" }), lastHolePunch: bigint("lastHolePunch", { mode: "number" }),
listenPort: integer("listenPort"), listenPort: integer("listenPort"),
dockerSocketEnabled: boolean("dockerSocketEnabled").notNull().default(true), dockerSocketEnabled: boolean("dockerSocketEnabled").notNull().default(true)
}); });
export const resources = pgTable("resources", { export const resources = pgTable("resources", {
@@ -206,7 +206,7 @@ export const siteResources = pgTable("siteResources", {
mode: varchar("mode").notNull(), // "host" | "cidr" | "port" mode: varchar("mode").notNull(), // "host" | "cidr" | "port"
protocol: varchar("protocol"), // only for port mode protocol: varchar("protocol"), // only for port mode
proxyPort: integer("proxyPort"), // only for port mode proxyPort: integer("proxyPort"), // only for port mode
destinationPort: integer("destinationPort"), // only for port mode destinationPort: integer("destinationPort"), // only for port mode
destination: varchar("destination").notNull(), // ip, cidr, hostname; validate against the mode destination: varchar("destination").notNull(), // ip, cidr, hostname; validate against the mode
enabled: boolean("enabled").notNull().default(true), enabled: boolean("enabled").notNull().default(true),
alias: varchar("alias") alias: varchar("alias")
@@ -654,25 +654,25 @@ export const clients = pgTable("clients", {
maxConnections: integer("maxConnections") maxConnections: integer("maxConnections")
}); });
export const clientSitesAssociationsCache = pgTable("clientSitesAssociationsCache", { export const clientSitesAssociationsCache = pgTable(
clientId: integer("clientId") "clientSitesAssociationsCache",
.notNull() {
.references(() => clients.clientId, { onDelete: "cascade" }), clientId: integer("clientId") // not a foreign key here so after its deleted the rebuild function can delete it and send the message
siteId: integer("siteId") .notNull(),
.notNull() siteId: integer("siteId").notNull(),
.references(() => sites.siteId, { onDelete: "cascade" }), isRelayed: boolean("isRelayed").notNull().default(false),
isRelayed: boolean("isRelayed").notNull().default(false), endpoint: varchar("endpoint")
endpoint: varchar("endpoint") }
}); );
export const clientSiteResourcesAssociationsCache = pgTable("clientSiteResourcesAssociationsCache", { export const clientSiteResourcesAssociationsCache = pgTable(
clientId: integer("clientId") "clientSiteResourcesAssociationsCache",
.notNull() {
.references(() => clients.clientId, { onDelete: "cascade" }), clientId: integer("clientId") // not a foreign key here so after its deleted the rebuild function can delete it and send the message
siteResourceId: integer("siteResourceId") .notNull(),
.notNull() siteResourceId: integer("siteResourceId").notNull()
.references(() => siteResources.siteResourceId, { onDelete: "cascade" }) }
}); );
export const olms = pgTable("olms", { export const olms = pgTable("olms", {
olmId: varchar("id").primaryKey(), olmId: varchar("id").primaryKey(),

View File

@@ -361,27 +361,27 @@ export const clients = sqliteTable("clients", {
lastHolePunch: integer("lastHolePunch") lastHolePunch: integer("lastHolePunch")
}); });
export const clientSitesAssociationsCache = sqliteTable("clientSitesAssociationsCache", { export const clientSitesAssociationsCache = sqliteTable(
clientId: integer("clientId") "clientSitesAssociationsCache",
.notNull() {
.references(() => clients.clientId, { onDelete: "cascade" }), clientId: integer("clientId") // not a foreign key here so after its deleted the rebuild function can delete it and send the message
siteId: integer("siteId") .notNull(),
.notNull() siteId: integer("siteId").notNull(),
.references(() => sites.siteId, { onDelete: "cascade" }), isRelayed: integer("isRelayed", { mode: "boolean" })
isRelayed: integer("isRelayed", { mode: "boolean" }) .notNull()
.notNull() .default(false),
.default(false), endpoint: text("endpoint")
endpoint: text("endpoint") }
}); );
export const clientSiteResourcesAssociationsCache = sqliteTable("clientSiteResourcesAssociationsCache", { export const clientSiteResourcesAssociationsCache = sqliteTable(
clientId: integer("clientId") "clientSiteResourcesAssociationsCache",
.notNull() {
.references(() => clients.clientId, { onDelete: "cascade" }), clientId: integer("clientId") // not a foreign key here so after its deleted the rebuild function can delete it and send the message
siteResourceId: integer("siteResourceId") .notNull(),
.notNull() siteResourceId: integer("siteResourceId").notNull()
.references(() => siteResources.siteResourceId, { onDelete: "cascade" }) }
}); );
export const olms = sqliteTable("olms", { export const olms = sqliteTable("olms", {
olmId: text("id").primaryKey(), olmId: text("id").primaryKey(),

View File

@@ -304,7 +304,7 @@ export async function getNextAvailableOrgSubnet(): Promise<string> {
return subnet; return subnet;
} }
export function generateRemoteSubnetsStr(allSiteResources: SiteResource[]) { export function generateRemoteSubnets(allSiteResources: SiteResource[]): string[] {
let remoteSubnets = allSiteResources let remoteSubnets = allSiteResources
.filter((sr) => { .filter((sr) => {
if (sr.mode === "cidr") return true; if (sr.mode === "cidr") return true;
@@ -321,12 +321,11 @@ export function generateRemoteSubnetsStr(allSiteResources: SiteResource[]) {
if (sr.mode === "host") { if (sr.mode === "host") {
return `${sr.destination}/32`; return `${sr.destination}/32`;
} }
}); return ""; // This should never be reached due to filtering, but satisfies TypeScript
})
.filter((subnet) => subnet !== ""); // Remove empty strings just to be safe
// remove duplicates // remove duplicates
remoteSubnets = Array.from(new Set(remoteSubnets)); return Array.from(new Set(remoteSubnets));
const remoteSubnetsStr =
remoteSubnets.length > 0 ? remoteSubnets.join(",") : null;
return remoteSubnetsStr;
} }
export type SubnetProxyTarget = { export type SubnetProxyTarget = {

View File

@@ -2,6 +2,7 @@ import {
Client, Client,
clients, clients,
clientSiteResources, clientSiteResources,
clientSiteResourcesAssociationsCache,
clientSitesAssociationsCache, clientSitesAssociationsCache,
db, db,
exitNodes, exitNodes,
@@ -30,7 +31,7 @@ import {
import { sendToExitNode } from "#dynamic/lib/exitNodes"; import { sendToExitNode } from "#dynamic/lib/exitNodes";
import logger from "@server/logger"; import logger from "@server/logger";
import { import {
generateRemoteSubnetsStr, generateRemoteSubnets,
generateSubnetProxyTargets, generateSubnetProxyTargets,
SubnetProxyTarget SubnetProxyTarget
} from "@server/lib/ip"; } from "@server/lib/ip";
@@ -204,17 +205,35 @@ export async function rebuildClientAssociations(
const existingClientSiteResources = await trx const existingClientSiteResources = await trx
.select({ .select({
clientId: clientSiteResources.clientId clientId: clientSiteResourcesAssociationsCache.clientId
}) })
.from(clientSiteResources) .from(clientSiteResourcesAssociationsCache)
.where( .where(
eq(clientSiteResources.siteResourceId, siteResource.siteResourceId) eq(
clientSiteResourcesAssociationsCache.siteResourceId,
siteResource.siteResourceId
)
); );
const existingClientSiteResourceIds = existingClientSiteResources.map( const existingClientSiteResourceIds = existingClientSiteResources.map(
(row) => row.clientId (row) => row.clientId
); );
// Get full client details for existing resource clients (needed for sending delete messages)
const existingResourceClients =
existingClientSiteResourceIds.length > 0
? await trx
.select({
clientId: clients.clientId,
pubKey: clients.pubKey,
subnet: clients.subnet
})
.from(clients)
.where(
inArray(clients.clientId, existingClientSiteResourceIds)
)
: [];
const clientSiteResourcesToAdd = mergedAllClientIds.filter( const clientSiteResourcesToAdd = mergedAllClientIds.filter(
(clientId) => !existingClientSiteResourceIds.includes(clientId) (clientId) => !existingClientSiteResourceIds.includes(clientId)
); );
@@ -228,7 +247,7 @@ export async function rebuildClientAssociations(
if (clientSiteResourcesToInsert.length > 0) { if (clientSiteResourcesToInsert.length > 0) {
await trx await trx
.insert(clientSiteResources) .insert(clientSiteResourcesAssociationsCache)
.values(clientSiteResourcesToInsert) .values(clientSiteResourcesToInsert)
.returning(); .returning();
} }
@@ -239,15 +258,15 @@ export async function rebuildClientAssociations(
if (clientSiteResourcesToRemove.length > 0) { if (clientSiteResourcesToRemove.length > 0) {
await trx await trx
.delete(clientSiteResources) .delete(clientSiteResourcesAssociationsCache)
.where( .where(
and( and(
eq( eq(
clientSiteResources.siteResourceId, clientSiteResourcesAssociationsCache.siteResourceId,
siteResource.siteResourceId siteResource.siteResourceId
), ),
inArray( inArray(
clientSiteResources.clientId, clientSiteResourcesAssociationsCache.clientId,
clientSiteResourcesToRemove clientSiteResourcesToRemove
) )
) )
@@ -269,7 +288,7 @@ export async function rebuildClientAssociations(
await handleSubnetProxyTargetUpdates( await handleSubnetProxyTargetUpdates(
siteResource, siteResource,
mergedAllClients, mergedAllClients,
existingClients, existingResourceClients,
clientSiteResourcesToAdd, clientSiteResourcesToAdd,
clientSiteResourcesToRemove, clientSiteResourcesToRemove,
trx trx
@@ -277,7 +296,7 @@ export async function rebuildClientAssociations(
return { return {
mergedAllClients mergedAllClients
} };
} }
async function handleMessagesForSiteClients( async function handleMessagesForSiteClients(
@@ -429,10 +448,25 @@ async function handleMessagesForSiteClients(
); );
// TODO: should we have this here? // TODO: should we have this here?
const allSiteResources = await trx const allSiteResources = await db // only get the site resources that this client has access to
.select() .select()
.from(siteResources) .from(siteResources)
.where(eq(siteResources.siteId, site.siteId)); .innerJoin(
clientSiteResourcesAssociationsCache,
eq(
siteResources.siteResourceId,
clientSiteResourcesAssociationsCache.siteResourceId
)
)
.where(
and(
eq(siteResources.siteId, site.siteId),
eq(
clientSiteResourcesAssociationsCache.clientId,
client.clientId
)
)
);
olmJobs.push( olmJobs.push(
olmAddPeer( olmAddPeer(
@@ -446,8 +480,11 @@ async function handleMessagesForSiteClients(
publicKey: site.publicKey, publicKey: site.publicKey,
serverIP: site.address, serverIP: site.address,
serverPort: site.listenPort, serverPort: site.listenPort,
remoteSubnets: remoteSubnets: generateRemoteSubnets(
generateRemoteSubnetsStr(allSiteResources) allSiteResources.map(
({ siteResources }) => siteResources
)
)
}, },
olm.olmId olm.olmId
) )
@@ -518,9 +555,13 @@ export async function updateClientSiteDestinations(
exitNodeId: site.exitNodes?.exitNodeId || 0, exitNodeId: site.exitNodes?.exitNodeId || 0,
type: site.exitNodes?.type || "", type: site.exitNodes?.type || "",
name: site.exitNodes?.name || "", name: site.exitNodes?.name || "",
sourceIp: site.clientSitesAssociationsCache.endpoint.split(":")[0] || "", sourceIp:
site.clientSitesAssociationsCache.endpoint.split(":")[0] ||
"",
sourcePort: sourcePort:
parseInt(site.clientSitesAssociationsCache.endpoint.split(":")[1]) || 0, parseInt(
site.clientSitesAssociationsCache.endpoint.split(":")[1]
) || 0,
destinations: [ destinations: [
{ {
destinationIP: site.sites.subnet.split("/")[0], destinationIP: site.sites.subnet.split("/")[0],

View File

@@ -7,16 +7,16 @@ import {
ExitNode, ExitNode,
exitNodes, exitNodes,
siteResources, siteResources,
clientSiteResourcesAssociationsCache, clientSiteResourcesAssociationsCache
} from "@server/db"; } from "@server/db";
import { clients, clientSitesAssociationsCache, Newt, sites } from "@server/db"; import { clients, clientSitesAssociationsCache, Newt, sites } from "@server/db";
import { eq, and, inArray } from "drizzle-orm"; import { eq, and, inArray } from "drizzle-orm";
import { updatePeer } from "../olm/peers"; import { updatePeer } from "../olm/peers";
import { sendToExitNode } from "#dynamic/lib/exitNodes"; import { sendToExitNode } from "#dynamic/lib/exitNodes";
import { import {
generateRemoteSubnetsStr, generateRemoteSubnets,
generateSubnetProxyTargets, generateSubnetProxyTargets,
SubnetProxyTarget, SubnetProxyTarget
} from "@server/lib/ip"; } from "@server/lib/ip";
const inputSchema = z.object({ const inputSchema = z.object({
@@ -137,7 +137,10 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
const clientsRes = await db const clientsRes = await db
.select() .select()
.from(clients) .from(clients)
.innerJoin(clientSitesAssociationsCache, eq(clients.clientId, clientSitesAssociationsCache.clientId)) .innerJoin(
clientSitesAssociationsCache,
eq(clients.clientId, clientSitesAssociationsCache.clientId)
)
.where(eq(clientSitesAssociationsCache.siteId, siteId)); .where(eq(clientSitesAssociationsCache.siteId, siteId));
// Prepare peers data for the response // Prepare peers data for the response
@@ -186,10 +189,25 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
return null; return null;
} }
const allSiteResources = await db const allSiteResources = await db // only get the site resources that this client has access to
.select() .select()
.from(siteResources) .from(siteResources)
.where(eq(siteResources.siteId, site.siteId)); .innerJoin(
clientSiteResourcesAssociationsCache,
eq(
siteResources.siteResourceId,
clientSiteResourcesAssociationsCache.siteResourceId
)
)
.where(
and(
eq(siteResources.siteId, site.siteId),
eq(
clientSiteResourcesAssociationsCache.clientId,
client.clients.clientId
)
)
);
await updatePeer(client.clients.clientId, { await updatePeer(client.clients.clientId, {
siteId: site.siteId, siteId: site.siteId,
@@ -197,8 +215,11 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
publicKey: site.publicKey, publicKey: site.publicKey,
serverIP: site.address, serverIP: site.address,
serverPort: site.listenPort, serverPort: site.listenPort,
remoteSubnets: remoteSubnets: generateRemoteSubnets(
generateRemoteSubnetsStr(allSiteResources) allSiteResources.map(
({ siteResources }) => siteResources
)
)
}); });
} catch (error) { } catch (error) {
logger.error( logger.error(
@@ -238,7 +259,10 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
.from(clients) .from(clients)
.innerJoin( .innerJoin(
clientSiteResourcesAssociationsCache, clientSiteResourcesAssociationsCache,
eq(clients.clientId, clientSiteResourcesAssociationsCache.clientId) eq(
clients.clientId,
clientSiteResourcesAssociationsCache.clientId
)
) )
.where( .where(
eq( eq(
@@ -247,7 +271,10 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
) )
); );
const resourceTargets = generateSubnetProxyTargets(resource, resourceClients); const resourceTargets = generateSubnetProxyTargets(
resource,
resourceClients
);
targetsToSend.push(...resourceTargets); targetsToSend.push(...resourceTargets);
} }

View File

@@ -1,5 +1,6 @@
import { import {
Client, Client,
clientSiteResourcesAssociationsCache,
db, db,
ExitNode, ExitNode,
orgs, orgs,
@@ -12,13 +13,20 @@ import {
users users
} from "@server/db"; } from "@server/db";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { clients, clientSitesAssociationsCache, exitNodes, Olm, olms, sites } from "@server/db"; import {
clients,
clientSitesAssociationsCache,
exitNodes,
Olm,
olms,
sites
} from "@server/db";
import { and, eq, inArray, isNull } from "drizzle-orm"; import { and, eq, inArray, isNull } from "drizzle-orm";
import { addPeer, deletePeer } from "../newt/peers"; import { addPeer, deletePeer } from "../newt/peers";
import logger from "@server/logger"; import logger from "@server/logger";
import { listExitNodes } from "#dynamic/lib/exitNodes"; import { listExitNodes } from "#dynamic/lib/exitNodes";
import { getNextAvailableClientSubnet } from "@server/lib/ip"; import { getNextAvailableClientSubnet } from "@server/lib/ip";
import { generateRemoteSubnetsStr } from "@server/lib/ip"; import { generateRemoteSubnets } from "@server/lib/ip";
export const handleOlmRegisterMessage: MessageHandler = async (context) => { export const handleOlmRegisterMessage: MessageHandler = async (context) => {
logger.info("Handling register olm message!"); logger.info("Handling register olm message!");
@@ -170,7 +178,10 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
const sitesData = await db const sitesData = await db
.select() .select()
.from(sites) .from(sites)
.innerJoin(clientSitesAssociationsCache, eq(sites.siteId, clientSitesAssociationsCache.siteId)) .innerJoin(
clientSitesAssociationsCache,
eq(sites.siteId, clientSitesAssociationsCache.siteId)
)
.where(eq(clientSitesAssociationsCache.clientId, client.clientId)); .where(eq(clientSitesAssociationsCache.clientId, client.clientId));
// Prepare an array to store site configurations // Prepare an array to store site configurations
@@ -234,11 +245,6 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
) )
.limit(1); .limit(1);
const allSiteResources = await db
.select()
.from(siteResources)
.where(eq(siteResources.siteId, site.siteId));
// Add the peer to the exit node for this site // Add the peer to the exit node for this site
if (clientSite.endpoint) { if (clientSite.endpoint) {
logger.info( logger.info(
@@ -269,6 +275,26 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
endpoint = `${exitNode.endpoint}:21820`; endpoint = `${exitNode.endpoint}:21820`;
} }
const allSiteResources = await db // only get the site resources that this client has access to
.select()
.from(siteResources)
.innerJoin(
clientSiteResourcesAssociationsCache,
eq(
siteResources.siteResourceId,
clientSiteResourcesAssociationsCache.siteResourceId
)
)
.where(
and(
eq(siteResources.siteId, site.siteId),
eq(
clientSiteResourcesAssociationsCache.clientId,
client.clientId
)
)
);
// Add site configuration to the array // Add site configuration to the array
siteConfigurations.push({ siteConfigurations.push({
siteId: site.siteId, siteId: site.siteId,
@@ -276,7 +302,7 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
publicKey: site.publicKey, publicKey: site.publicKey,
serverIP: site.address, serverIP: site.address,
serverPort: site.listenPort, serverPort: site.listenPort,
remoteSubnets: generateRemoteSubnetsStr(allSiteResources) remoteSubnets: generateRemoteSubnets(allSiteResources.map(({ siteResources }) => siteResources))
}); });
} }

View File

@@ -12,7 +12,7 @@ export async function addPeer(
endpoint: string; endpoint: string;
serverIP: string | null; serverIP: string | null;
serverPort: number | null; serverPort: number | null;
remoteSubnets: string | null; // optional, comma-separated list of subnets that this site can access remoteSubnets: string[] | null; // optional, comma-separated list of subnets that this site can access
}, },
olmId?: string olmId?: string
) { ) {
@@ -80,7 +80,7 @@ export async function updatePeer(
endpoint: string; endpoint: string;
serverIP: string | null; serverIP: string | null;
serverPort: number | null; serverPort: number | null;
remoteSubnets?: string | null; // optional, comma-separated list of subnets that remoteSubnets?: string[] | null; // optional, comma-separated list of subnets that
}, },
olmId?: string olmId?: string
) { ) {

View File

@@ -272,9 +272,6 @@ export async function createSiteResource(
); );
} }
// const targets = await generateSubnetProxyTargets([newSiteResource], trx);
// await addTargets(newt.newtId, targets);
await rebuildClientAssociations(newSiteResource, trx); // we need to call this because we added to the admin role await rebuildClientAssociations(newSiteResource, trx); // we need to call this because we added to the admin role
}); });

View File

@@ -106,10 +106,7 @@ export async function deleteSiteResource(
); );
} }
// const targets = await generateSubnetProxyTargets([removedSiteResource], trx); await rebuildClientAssociations(removedSiteResource, trx);
// await removeTargets(newt.newtId, targets);
await rebuildClientAssociations(existingSiteResource, trx);
}); });
logger.info( logger.info(

0
statement-breakpoint Normal file
View File