Merge branch 'clients-user' into refactor/separate-tables

This commit is contained in:
Fred KISSIE
2025-12-03 17:01:50 +01:00
committed by GitHub
50 changed files with 1945 additions and 562 deletions

View File

@@ -1,8 +1,20 @@
import { clients, clientSitesAssociationsCache, db, olms, orgs, roleClients, roles, userClients, userOrgs, Transaction } from "@server/db";
import {
clients,
db,
olms,
orgs,
roleClients,
roles,
userClients,
userOrgs,
Transaction
} from "@server/db";
import { eq, and, notInArray } from "drizzle-orm";
import { listExitNodes } from "#dynamic/lib/exitNodes";
import { getNextAvailableClientSubnet } from "@server/lib/ip";
import logger from "@server/logger";
import { rebuildClientAssociationsFromClient } from "./rebuildClientAssociations";
import { sendTerminateClient } from "@server/routers/client/terminate";
export async function calculateUserClientsForOrgs(
userId: string,
@@ -88,7 +100,10 @@ export async function calculateUserClientsForOrgs(
.where(
and(
eq(roleClients.roleId, adminRole.roleId),
eq(roleClients.clientId, existingClient.clientId)
eq(
roleClients.clientId,
existingClient.clientId
)
)
)
.limit(1);
@@ -110,7 +125,10 @@ export async function calculateUserClientsForOrgs(
.where(
and(
eq(userClients.userId, userId),
eq(userClients.clientId, existingClient.clientId)
eq(
userClients.clientId,
existingClient.clientId
)
)
)
.limit(1);
@@ -172,6 +190,11 @@ export async function calculateUserClientsForOrgs(
})
.returning();
await rebuildClientAssociationsFromClient(
newClient,
transaction
);
// Grant admin role access to the client
await transaction.insert(roleClients).values({
roleId: adminRole.roleId,
@@ -225,15 +248,8 @@ async function cleanupOrphanedClients(
: and(eq(clients.userId, userId))
);
// Delete client-site associations first, then delete the clients
for (const client of clientsToDelete) {
await trx
.delete(clientSitesAssociationsCache)
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
}
if (clientsToDelete.length > 0) {
await trx
const deletedClients = await trx
.delete(clients)
.where(
userOrgIds.length > 0
@@ -242,7 +258,20 @@ async function cleanupOrphanedClients(
notInArray(clients.orgId, userOrgIds)
)
: and(eq(clients.userId, userId))
);
)
.returning();
// Rebuild associations for each deleted client to clean up related data
for (const deletedClient of deletedClients) {
await rebuildClientAssociationsFromClient(deletedClient, trx);
if (deletedClient.olmId) {
await sendTerminateClient(
deletedClient.clientId,
deletedClient.olmId
);
}
}
if (userOrgIds.length === 0) {
logger.debug(
@@ -255,4 +284,3 @@ async function cleanupOrphanedClients(
}
}
}

View File

@@ -18,6 +18,7 @@ import { defaultRoleAllowedActions } from "@server/routers/role";
import { FeatureId, limitsService, sandboxLimitSet } from "@server/lib/billing";
import { createCustomer } from "#dynamic/lib/billing";
import { usageService } from "@server/lib/billing/usageService";
import config from "@server/lib/config";
export async function createUserAccountOrg(
userId: string,
@@ -76,6 +77,8 @@ export async function createUserAccountOrg(
.from(domains)
.where(eq(domains.configManaged, true));
const utilitySubnet = config.getRawConfig().orgs.utility_subnet_group;
const newOrg = await trx
.insert(orgs)
.values({
@@ -83,6 +86,7 @@ export async function createUserAccountOrg(
name,
// subnet
subnet: "100.90.128.0/24", // TODO: this should not be hardcoded - or can it be the same in all orgs?
utilitySubnet: utilitySubnet,
createdAt: new Date().toISOString()
})
.returning();

View File

@@ -1,4 +1,10 @@
import { clientSitesAssociationsCache, db, SiteResource, Transaction } from "@server/db";
import {
clientSitesAssociationsCache,
db,
SiteResource,
siteResources,
Transaction
} from "@server/db";
import { clients, orgs, sites } from "@server/db";
import { and, eq, isNotNull } from "drizzle-orm";
import config from "@server/lib/config";
@@ -281,6 +287,56 @@ export async function getNextAvailableClientSubnet(
return subnet;
}
export async function getNextAvailableAliasAddress(
orgId: string
): Promise<string> {
const [org] = await db.select().from(orgs).where(eq(orgs.orgId, orgId));
if (!org) {
throw new Error(`Organization with ID ${orgId} not found`);
}
if (!org.subnet) {
throw new Error(`Organization with ID ${orgId} has no subnet defined`);
}
if (!org.utilitySubnet) {
throw new Error(
`Organization with ID ${orgId} has no utility subnet defined`
);
}
const existingAddresses = await db
.select({
aliasAddress: siteResources.aliasAddress
})
.from(siteResources)
.where(
and(
isNotNull(siteResources.aliasAddress),
eq(siteResources.orgId, orgId)
)
);
const addresses = [
...existingAddresses.map(
(site) => `${site.aliasAddress?.split("/")[0]}/32`
),
// reserve a /29 for the dns server and other stuff
`${org.utilitySubnet.split("/")[0]}/29`
].filter((address) => address !== null) as string[];
let subnet = findNextAvailableCidr(addresses, 32, org.utilitySubnet);
if (!subnet) {
throw new Error("No available subnets remaining in space");
}
// remove the cidr
subnet = subnet.split("/")[0];
return subnet;
}
export async function getNextAvailableOrgSubnet(): Promise<string> {
const existingAddresses = await db
.select({
@@ -327,9 +383,22 @@ export function generateRemoteSubnets(allSiteResources: SiteResource[]): string[
return Array.from(new Set(remoteSubnets));
}
export type Alias = { alias: string | null; aliasAddress: string | null };
export function generateAliasConfig(allSiteResources: SiteResource[]): Alias[] {
let aliasConfigs = allSiteResources
.filter((sr) => sr.alias && sr.aliasAddress && sr.mode == "host")
.map((sr) => ({
alias: sr.alias,
aliasAddress: sr.aliasAddress
}));
return aliasConfigs;
}
export type SubnetProxyTarget = {
sourcePrefix: string;
destPrefix: string;
sourcePrefix: string; // must be a cidr
destPrefix: string; // must be a cidr
rewriteTo?: string; // must be a cidr
portRange?: {
min: number;
max: number;
@@ -372,6 +441,15 @@ export function generateSubnetProxyTargets(
destPrefix: `${siteResource.destination}/32`
});
}
if (siteResource.alias && siteResource.aliasAddress) {
// also push a match for the alias address
targets.push({
sourcePrefix: clientPrefix,
destPrefix: `${siteResource.aliasAddress}/32`,
rewriteTo: `${siteResource.destination}/32`
});
}
} else if (siteResource.mode == "cidr") {
targets.push({
sourcePrefix: clientPrefix,
@@ -386,4 +464,4 @@ export function generateSubnetProxyTargets(
);
return targets;
}
}

View File

@@ -229,6 +229,11 @@ export const configSchema = z
.default(51820)
.transform(stoi)
.pipe(portSchema),
clients_start_port: portSchema
.optional()
.default(21820)
.transform(stoi)
.pipe(portSchema),
base_endpoint: z
.string()
.optional()
@@ -249,12 +254,14 @@ export const configSchema = z
orgs: z
.object({
block_size: z.number().positive().gt(0).optional().default(24),
subnet_group: z.string().optional().default("100.90.128.0/24")
subnet_group: z.string().optional().default("100.90.128.0/24"),
utility_subnet_group: z.string().optional().default("100.96.128.0/24") //just hardcode this for now as well
})
.optional()
.default({
block_size: 24,
subnet_group: "100.90.128.0/24"
subnet_group: "100.90.128.0/24",
utility_subnet_group: "100.96.128.0/24"
}),
rate_limits: z
.object({

View File

@@ -25,20 +25,22 @@ import {
deletePeer as newtDeletePeer
} from "@server/routers/newt/peers";
import {
initPeerAddHandshake as holepunchSiteAdd,
addPeer as olmAddPeer,
deletePeer as olmDeletePeer
} from "@server/routers/olm/peers";
import { sendToExitNode } from "#dynamic/lib/exitNodes";
import logger from "@server/logger";
import {
generateAliasConfig,
generateRemoteSubnets,
generateSubnetProxyTargets,
SubnetProxyTarget
} from "@server/lib/ip";
import {
addRemoteSubnets,
addPeerData,
addTargets as addSubnetProxyTargets,
removeRemoteSubnets,
removePeerData,
removeTargets as removeSubnetProxyTargets
} from "@server/routers/client/targets";
@@ -128,7 +130,7 @@ export async function getClientSiteResourceAccess(
};
}
export async function rebuildClientAssociations(
export async function rebuildClientAssociationsFromSiteResource(
siteResource: SiteResource,
trx: Transaction | typeof db = db
): Promise<{
@@ -463,65 +465,17 @@ async function handleMessagesForSiteClients(
}
if (isAdd) {
// TODO: WE NEED TO HANDLE THIS BETTER. WE ARE DEFAULTING TO RELAYING FOR NEW SITES
// BUT REALLY WE NEED TO TRACK THE USERS PREFERENCE THAT THEY CHOSE IN THE CLIENTS
// AND TRIGGER A HOLEPUNCH OR SOMETHING TO GET THE ENDPOINT AND HP TO THE NEW SITES
const isRelayed = true;
newtJobs.push(
newtAddPeer(
await holepunchSiteAdd(
// this will kick off the add peer process for the client
client.clientId,
{
siteId,
{
publicKey: client.pubKey,
allowedIps: [`${client.subnet.split("/")[0]}/32`], // we want to only allow from that client
// endpoint: isRelayed ? "" : clientSite.endpoint
endpoint: isRelayed ? "" : "" // we are not HPing yet so no endpoint
},
newt.newtId
)
);
// TODO: should we have this here?
const allSiteResources = await db // only get the site resources that this client has access to
.select()
.from(siteResources)
.innerJoin(
clientSiteResourcesAssociationsCache,
eq(
siteResources.siteResourceId,
clientSiteResourcesAssociationsCache.siteResourceId
)
)
.where(
and(
eq(siteResources.siteId, site.siteId),
eq(
clientSiteResourcesAssociationsCache.clientId,
client.clientId
)
)
);
olmJobs.push(
olmAddPeer(
client.clientId,
{
siteId: site.siteId,
endpoint:
isRelayed || !site.endpoint
? `${exitNode.endpoint}:21820`
: site.endpoint,
publicKey: site.publicKey,
serverIP: site.address,
serverPort: site.listenPort,
remoteSubnets: generateRemoteSubnets(
allSiteResources.map(
({ siteResources }) => siteResources
)
)
},
olm.olmId
)
exitNode: {
publicKey: exitNode.publicKey,
endpoint: exitNode.endpoint
}
},
olm.olmId
);
}
@@ -703,10 +657,11 @@ async function handleSubnetProxyTargetUpdates(
for (const client of addedClients) {
olmJobs.push(
addRemoteSubnets(
addPeerData(
client.clientId,
siteResource.siteId,
generateRemoteSubnets([siteResource])
generateRemoteSubnets([siteResource]),
generateAliasConfig([siteResource])
)
);
}
@@ -738,10 +693,11 @@ async function handleSubnetProxyTargetUpdates(
for (const client of removedClients) {
olmJobs.push(
removeRemoteSubnets(
removePeerData(
client.clientId,
siteResource.siteId,
generateRemoteSubnets([siteResource])
generateRemoteSubnets([siteResource]),
generateAliasConfig([siteResource])
)
);
}
@@ -750,3 +706,511 @@ async function handleSubnetProxyTargetUpdates(
await Promise.all(proxyJobs);
}
export async function rebuildClientAssociationsFromClient(
client: Client,
trx: Transaction | typeof db = db
): Promise<void> {
let newSiteResourceIds: number[] = [];
// 1. Direct client associations
const directSiteResources = await trx
.select({ siteResourceId: clientSiteResources.siteResourceId })
.from(clientSiteResources)
.where(eq(clientSiteResources.clientId, client.clientId));
newSiteResourceIds.push(
...directSiteResources.map((r) => r.siteResourceId)
);
// 2. User-based and role-based access (if client has a userId)
if (client.userId) {
// Direct user associations
const userSiteResourceIds = await trx
.select({ siteResourceId: userSiteResources.siteResourceId })
.from(userSiteResources)
.innerJoin(
siteResources,
eq(
siteResources.siteResourceId,
userSiteResources.siteResourceId
)
)
.where(
and(
eq(userSiteResources.userId, client.userId),
eq(siteResources.orgId, client.orgId)
)
); // this needs to be locked onto this org or else cross-org access could happen
newSiteResourceIds.push(
...userSiteResourceIds.map((r) => r.siteResourceId)
);
// Role-based access
const roleIds = await trx
.select({ roleId: userOrgs.roleId })
.from(userOrgs)
.where(
and(
eq(userOrgs.userId, client.userId),
eq(userOrgs.orgId, client.orgId)
)
) // this needs to be locked onto this org or else cross-org access could happen
.then((rows) => rows.map((row) => row.roleId));
if (roleIds.length > 0) {
const roleSiteResourceIds = await trx
.select({ siteResourceId: roleSiteResources.siteResourceId })
.from(roleSiteResources)
.where(inArray(roleSiteResources.roleId, roleIds));
newSiteResourceIds.push(
...roleSiteResourceIds.map((r) => r.siteResourceId)
);
}
}
// Remove duplicates
newSiteResourceIds = Array.from(new Set(newSiteResourceIds));
// Get full siteResource details
const newSiteResources =
newSiteResourceIds.length > 0
? await trx
.select()
.from(siteResources)
.where(
inArray(siteResources.siteResourceId, newSiteResourceIds)
)
: [];
// Group by siteId for site-level associations
const newSiteIds = Array.from(
new Set(newSiteResources.map((sr) => sr.siteId))
);
/////////// Process client-siteResource associations ///////////
// Get existing resource associations
const existingResourceAssociations = await trx
.select({
siteResourceId: clientSiteResourcesAssociationsCache.siteResourceId
})
.from(clientSiteResourcesAssociationsCache)
.where(
eq(clientSiteResourcesAssociationsCache.clientId, client.clientId)
);
const existingSiteResourceIds = existingResourceAssociations.map(
(r) => r.siteResourceId
);
const resourcesToAdd = newSiteResourceIds.filter(
(id) => !existingSiteResourceIds.includes(id)
);
const resourcesToRemove = existingSiteResourceIds.filter(
(id) => !newSiteResourceIds.includes(id)
);
// Insert new associations
if (resourcesToAdd.length > 0) {
await trx.insert(clientSiteResourcesAssociationsCache).values(
resourcesToAdd.map((siteResourceId) => ({
clientId: client.clientId,
siteResourceId
}))
);
}
// Remove old associations
if (resourcesToRemove.length > 0) {
await trx
.delete(clientSiteResourcesAssociationsCache)
.where(
and(
eq(
clientSiteResourcesAssociationsCache.clientId,
client.clientId
),
inArray(
clientSiteResourcesAssociationsCache.siteResourceId,
resourcesToRemove
)
)
);
}
/////////// Process client-site associations ///////////
// Get existing site associations
const existingSiteAssociations = await trx
.select({ siteId: clientSitesAssociationsCache.siteId })
.from(clientSitesAssociationsCache)
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
const existingSiteIds = existingSiteAssociations.map((s) => s.siteId);
const sitesToAdd = newSiteIds.filter((id) => !existingSiteIds.includes(id));
const sitesToRemove = existingSiteIds.filter(
(id) => !newSiteIds.includes(id)
);
// Insert new site associations
if (sitesToAdd.length > 0) {
await trx.insert(clientSitesAssociationsCache).values(
sitesToAdd.map((siteId) => ({
clientId: client.clientId,
siteId
}))
);
}
// Remove old site associations
if (sitesToRemove.length > 0) {
await trx
.delete(clientSitesAssociationsCache)
.where(
and(
eq(clientSitesAssociationsCache.clientId, client.clientId),
inArray(clientSitesAssociationsCache.siteId, sitesToRemove)
)
);
}
/////////// Send messages ///////////
// Get the olm for this client
const [olm] = await trx
.select({ olmId: olms.olmId })
.from(olms)
.where(eq(olms.clientId, client.clientId))
.limit(1);
if (!olm) {
logger.warn(
`Olm not found for client ${client.clientId}, skipping peer updates`
);
return;
}
// Handle messages for sites being added
await handleMessagesForClientSites(
client,
olm.olmId,
sitesToAdd,
sitesToRemove,
trx
);
// Handle subnet proxy target updates for resources
await handleMessagesForClientResources(
client,
newSiteResources,
resourcesToAdd,
resourcesToRemove,
trx
);
}
async function handleMessagesForClientSites(
client: {
clientId: number;
pubKey: string | null;
subnet: string | null;
userId: string | null;
orgId: string;
},
olmId: string,
sitesToAdd: number[],
sitesToRemove: number[],
trx: Transaction | typeof db = db
): Promise<void> {
if (!client.subnet || !client.pubKey) {
logger.warn(
`Client ${client.clientId} missing subnet or pubKey, skipping peer updates`
);
return;
}
const allSiteIds = [...sitesToAdd, ...sitesToRemove];
if (allSiteIds.length === 0) {
return;
}
// Get site details for all affected sites
const sitesData = await trx
.select()
.from(sites)
.leftJoin(exitNodes, eq(sites.exitNodeId, exitNodes.exitNodeId))
.leftJoin(newts, eq(sites.siteId, newts.siteId))
.where(inArray(sites.siteId, allSiteIds));
let newtJobs: Promise<any>[] = [];
let olmJobs: Promise<any>[] = [];
let exitNodeJobs: Promise<any>[] = [];
for (const siteData of sitesData) {
const site = siteData.sites;
const exitNode = siteData.exitNodes;
const newt = siteData.newt;
if (!site.publicKey) {
logger.warn(
`Site ${site.siteId} missing publicKey, skipping peer updates`
);
continue;
}
if (!newt) {
logger.warn(
`Newt not found for site ${site.siteId}, skipping peer updates`
);
continue;
}
const isAdd = sitesToAdd.includes(site.siteId);
const isRemove = sitesToRemove.includes(site.siteId);
if (isRemove) {
// Remove peer from newt
newtJobs.push(
newtDeletePeer(site.siteId, client.pubKey, newt.newtId)
);
try {
// Remove peer from olm
olmJobs.push(
olmDeletePeer(
client.clientId,
site.siteId,
site.publicKey,
olmId
)
);
} catch (error) {
// if the error includes not found then its just because the olm does not exist anymore or yet and its fine if we dont send
if (
error instanceof Error &&
error.message.includes("not found")
) {
logger.debug(
`Olm data not found for client ${client.clientId}, skipping removal`
);
} else {
throw error;
}
}
}
if (isAdd) {
if (!exitNode) {
logger.warn(
`Exit node not found for site ${site.siteId}, skipping peer add`
);
continue;
}
await holepunchSiteAdd(
// this will kick off the add peer process for the client
client.clientId,
{
siteId: site.siteId,
exitNode: {
publicKey: exitNode.publicKey,
endpoint: exitNode.endpoint
}
},
olmId
);
}
// Update exit node destinations
exitNodeJobs.push(
updateClientSiteDestinations(
{
clientId: client.clientId,
pubKey: client.pubKey,
subnet: client.subnet
},
trx
)
);
}
await Promise.all(exitNodeJobs);
await Promise.all(newtJobs);
await Promise.all(olmJobs);
}
async function handleMessagesForClientResources(
client: {
clientId: number;
pubKey: string | null;
subnet: string | null;
userId: string | null;
orgId: string;
},
allNewResources: SiteResource[],
resourcesToAdd: number[],
resourcesToRemove: number[],
trx: Transaction | typeof db = db
): Promise<void> {
// Group resources by site
const resourcesBySite = new Map<number, SiteResource[]>();
for (const resource of allNewResources) {
if (!resourcesBySite.has(resource.siteId)) {
resourcesBySite.set(resource.siteId, []);
}
resourcesBySite.get(resource.siteId)!.push(resource);
}
let proxyJobs: Promise<any>[] = [];
let olmJobs: Promise<any>[] = [];
// Handle additions
if (resourcesToAdd.length > 0) {
const addedResources = allNewResources.filter((r) =>
resourcesToAdd.includes(r.siteResourceId)
);
// Group by site for proxy updates
const addedBySite = new Map<number, SiteResource[]>();
for (const resource of addedResources) {
if (!addedBySite.has(resource.siteId)) {
addedBySite.set(resource.siteId, []);
}
addedBySite.get(resource.siteId)!.push(resource);
}
// Add subnet proxy targets for each site
for (const [siteId, resources] of addedBySite.entries()) {
const [newt] = await trx
.select({ newtId: newts.newtId })
.from(newts)
.where(eq(newts.siteId, siteId))
.limit(1);
if (!newt) {
logger.warn(
`Newt not found for site ${siteId}, skipping proxy updates`
);
continue;
}
for (const resource of resources) {
const targets = generateSubnetProxyTargets(resource, [
{
clientId: client.clientId,
pubKey: client.pubKey,
subnet: client.subnet
}
]);
if (targets.length > 0) {
proxyJobs.push(addSubnetProxyTargets(newt.newtId, targets));
}
try {
// Add peer data to olm
olmJobs.push(
addPeerData(
client.clientId,
resource.siteId,
generateRemoteSubnets([resource]),
generateAliasConfig([resource])
)
);
} catch (error) {
// if the error includes not found then its just because the olm does not exist anymore or yet and its fine if we dont send
if (
error instanceof Error &&
error.message.includes("not found")
) {
logger.debug(
`Olm data not found for client ${client.clientId} and site ${resource.siteId}, skipping removal`
);
} else {
throw error;
}
}
}
}
}
// Handle removals
if (resourcesToRemove.length > 0) {
const removedResources = await trx
.select()
.from(siteResources)
.where(inArray(siteResources.siteResourceId, resourcesToRemove));
// Group by site for proxy updates
const removedBySite = new Map<number, SiteResource[]>();
for (const resource of removedResources) {
if (!removedBySite.has(resource.siteId)) {
removedBySite.set(resource.siteId, []);
}
removedBySite.get(resource.siteId)!.push(resource);
}
// Remove subnet proxy targets for each site
for (const [siteId, resources] of removedBySite.entries()) {
const [newt] = await trx
.select({ newtId: newts.newtId })
.from(newts)
.where(eq(newts.siteId, siteId))
.limit(1);
if (!newt) {
logger.warn(
`Newt not found for site ${siteId}, skipping proxy updates`
);
continue;
}
for (const resource of resources) {
const targets = generateSubnetProxyTargets(resource, [
{
clientId: client.clientId,
pubKey: client.pubKey,
subnet: client.subnet
}
]);
if (targets.length > 0) {
proxyJobs.push(
removeSubnetProxyTargets(newt.newtId, targets)
);
}
try {
// Remove peer data from olm
olmJobs.push(
removePeerData(
client.clientId,
resource.siteId,
generateRemoteSubnets([resource]),
generateAliasConfig([resource])
)
);
} catch (error) {
// if the error includes not found then its just because the olm does not exist anymore or yet and its fine if we dont send
if (
error instanceof Error &&
error.message.includes("not found")
) {
logger.debug(
`Olm data not found for client ${client.clientId} and site ${resource.siteId}, skipping removal`
);
} else {
throw error;
}
}
}
}
}
await Promise.all([...proxyJobs, ...olmJobs]);
}