From 02033f611f102bd1dfd1bab99b82b1cacea2a1c3 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 11:44:02 -0700 Subject: [PATCH] First pass at HA --- server/lib/rebuildClientAssociations.ts | 581 +++++++++++++++--------- 1 file changed, 355 insertions(+), 226 deletions(-) diff --git a/server/lib/rebuildClientAssociations.ts b/server/lib/rebuildClientAssociations.ts index 121e2c7f0..ece603916 100644 --- a/server/lib/rebuildClientAssociations.ts +++ b/server/lib/rebuildClientAssociations.ts @@ -11,6 +11,7 @@ import { roleSiteResources, Site, SiteResource, + siteNetworks, siteResources, sites, Transaction, @@ -47,15 +48,23 @@ export async function getClientSiteResourceAccess( siteResource: SiteResource, trx: Transaction | typeof db = db ) { - // get the site - const [site] = await trx - .select() - .from(sites) - .where(eq(sites.siteId, siteResource.siteId)) - .limit(1); + // get all sites associated with this siteResource via its network + const sitesList = siteResource.networkId + ? await trx + .select() + .from(sites) + .innerJoin( + siteNetworks, + eq(siteNetworks.siteId, sites.siteId) + ) + .where(eq(siteNetworks.networkId, siteResource.networkId)) + .then((rows) => rows.map((row) => row.sites)) + : []; - if (!site) { - throw new Error(`Site with ID ${siteResource.siteId} not found`); + if (sitesList.length === 0) { + logger.warn( + `No sites found for siteResource ${siteResource.siteResourceId} with networkId ${siteResource.networkId}` + ); } const roleIds = await trx @@ -136,7 +145,7 @@ export async function getClientSiteResourceAccess( const mergedAllClientIds = mergedAllClients.map((c) => c.clientId); return { - site, + sitesList, mergedAllClients, mergedAllClientIds }; @@ -152,40 +161,51 @@ export async function rebuildClientAssociationsFromSiteResource( subnet: string | null; }[]; }> { - const siteId = siteResource.siteId; - - const { site, mergedAllClients, mergedAllClientIds } = + const { sitesList, mergedAllClients, mergedAllClientIds } = await getClientSiteResourceAccess(siteResource, trx); /////////// process the client-siteResource associations /////////// - // get all of the clients associated with other resources on this site - const allUpdatedClientsFromOtherResourcesOnThisSite = await trx - .select({ - clientId: clientSiteResourcesAssociationsCache.clientId - }) - .from(clientSiteResourcesAssociationsCache) - .innerJoin( - siteResources, - eq( - clientSiteResourcesAssociationsCache.siteResourceId, - siteResources.siteResourceId - ) - ) - .where( - and( - eq(siteResources.siteId, siteId), - ne(siteResources.siteResourceId, siteResource.siteResourceId) - ) - ); + // get all of the clients associated with other resources in the same network, + // joined through siteNetworks so we know which siteId each client belongs to + const allUpdatedClientsFromOtherResourcesOnThisSite = siteResource.networkId + ? await trx + .select({ + clientId: clientSiteResourcesAssociationsCache.clientId, + siteId: siteNetworks.siteId + }) + .from(clientSiteResourcesAssociationsCache) + .innerJoin( + siteResources, + eq( + clientSiteResourcesAssociationsCache.siteResourceId, + siteResources.siteResourceId + ) + ) + .innerJoin( + siteNetworks, + eq(siteNetworks.networkId, siteResources.networkId) + ) + .where( + and( + eq(siteResources.networkId, siteResource.networkId), + ne( + siteResources.siteResourceId, + siteResource.siteResourceId + ) + ) + ) + : []; - const allClientIdsFromOtherResourcesOnThisSite = Array.from( - new Set( - allUpdatedClientsFromOtherResourcesOnThisSite.map( - (row) => row.clientId - ) - ) - ); + // Build a per-site map so the loop below can check by siteId rather than + // across the entire network. + const clientsFromOtherResourcesBySite = new Map>(); + for (const row of allUpdatedClientsFromOtherResourcesOnThisSite) { + if (!clientsFromOtherResourcesBySite.has(row.siteId)) { + clientsFromOtherResourcesBySite.set(row.siteId, new Set()); + } + clientsFromOtherResourcesBySite.get(row.siteId)!.add(row.clientId); + } const existingClientSiteResources = await trx .select({ @@ -259,82 +279,90 @@ export async function rebuildClientAssociationsFromSiteResource( /////////// process the client-site associations /////////// - const existingClientSites = await trx - .select({ - clientId: clientSitesAssociationsCache.clientId - }) - .from(clientSitesAssociationsCache) - .where(eq(clientSitesAssociationsCache.siteId, siteResource.siteId)); + for (const site of sitesList) { + const siteId = site.siteId; - const existingClientSiteIds = existingClientSites.map( - (row) => row.clientId - ); + const existingClientSites = await trx + .select({ + clientId: clientSitesAssociationsCache.clientId + }) + .from(clientSitesAssociationsCache) + .where(eq(clientSitesAssociationsCache.siteId, siteId)); - // Get full client details for existing clients (needed for sending delete messages) - const existingClients = await trx - .select({ - clientId: clients.clientId, - pubKey: clients.pubKey, - subnet: clients.subnet - }) - .from(clients) - .where(inArray(clients.clientId, existingClientSiteIds)); + const existingClientSiteIds = existingClientSites.map( + (row) => row.clientId + ); - const clientSitesToAdd = mergedAllClientIds.filter( - (clientId) => - !existingClientSiteIds.includes(clientId) && - !allClientIdsFromOtherResourcesOnThisSite.includes(clientId) // dont remove if there is still another connection for another site resource - ); + // Get full client details for existing clients (needed for sending delete messages) + const existingClients = + existingClientSiteIds.length > 0 + ? await trx + .select({ + clientId: clients.clientId, + pubKey: clients.pubKey, + subnet: clients.subnet + }) + .from(clients) + .where(inArray(clients.clientId, existingClientSiteIds)) + : []; - const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({ - clientId, - siteId - })); + const otherResourceClientIds = clientsFromOtherResourcesBySite.get(siteId) ?? new Set(); - if (clientSitesToInsert.length > 0) { - await trx - .insert(clientSitesAssociationsCache) - .values(clientSitesToInsert) - .returning(); - } + const clientSitesToAdd = mergedAllClientIds.filter( + (clientId) => + !existingClientSiteIds.includes(clientId) && + !otherResourceClientIds.has(clientId) // dont add if already connected via another site resource + ); - // Now remove any client-site associations that should no longer exist - const clientSitesToRemove = existingClientSiteIds.filter( - (clientId) => - !mergedAllClientIds.includes(clientId) && - !allClientIdsFromOtherResourcesOnThisSite.includes(clientId) // dont remove if there is still another connection for another site resource - ); + const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({ + clientId, + siteId + })); - if (clientSitesToRemove.length > 0) { - await trx - .delete(clientSitesAssociationsCache) - .where( - and( - eq(clientSitesAssociationsCache.siteId, siteId), - inArray( - clientSitesAssociationsCache.clientId, - clientSitesToRemove + if (clientSitesToInsert.length > 0) { + await trx + .insert(clientSitesAssociationsCache) + .values(clientSitesToInsert) + .returning(); + } + + // Now remove any client-site associations that should no longer exist + const clientSitesToRemove = existingClientSiteIds.filter( + (clientId) => + !mergedAllClientIds.includes(clientId) && + !otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource + ); + + if (clientSitesToRemove.length > 0) { + await trx + .delete(clientSitesAssociationsCache) + .where( + and( + eq(clientSitesAssociationsCache.siteId, siteId), + inArray( + clientSitesAssociationsCache.clientId, + clientSitesToRemove + ) ) - ) - ); + ); + } + + // Now handle the messages to add/remove peers on both the newt and olm sides + await handleMessagesForSiteClients( + site, + siteId, + mergedAllClients, + existingClients, + clientSitesToAdd, + clientSitesToRemove, + trx + ); } - /////////// send the messages /////////// - - // Now handle the messages to add/remove peers on both the newt and olm sides - await handleMessagesForSiteClients( - site, - siteId, - mergedAllClients, - existingClients, - clientSitesToAdd, - clientSitesToRemove, - trx - ); - // Handle subnet proxy target updates for the resource associations await handleSubnetProxyTargetUpdates( siteResource, + sitesList, mergedAllClients, existingResourceClients, clientSiteResourcesToAdd, @@ -623,6 +651,7 @@ export async function updateClientSiteDestinations( async function handleSubnetProxyTargetUpdates( siteResource: SiteResource, + sitesList: Site[], allClients: { clientId: number; pubKey: string | null; @@ -637,131 +666,144 @@ async function handleSubnetProxyTargetUpdates( clientSiteResourcesToRemove: number[], trx: Transaction | typeof db = db ): Promise { - // Get the newt for this site - const [newt] = await trx - .select() - .from(newts) - .where(eq(newts.siteId, siteResource.siteId)) - .limit(1); + const proxyJobs: Promise[] = []; + const olmJobs: Promise[] = []; - if (!newt) { - logger.warn( - `Newt not found for site ${siteResource.siteId}, skipping subnet proxy target updates` - ); - return; - } + for (const siteData of sitesList) { + const siteId = siteData.siteId; - const proxyJobs = []; - const olmJobs = []; - // Generate targets for added associations - if (clientSiteResourcesToAdd.length > 0) { - const addedClients = allClients.filter((client) => - clientSiteResourcesToAdd.includes(client.clientId) - ); + // Get the newt for this site + const [newt] = await trx + .select() + .from(newts) + .where(eq(newts.siteId, siteId)) + .limit(1); - if (addedClients.length > 0) { - const targetsToAdd = generateSubnetProxyTargets( - siteResource, - addedClients + if (!newt) { + logger.warn( + `Newt not found for site ${siteId}, skipping subnet proxy target updates` ); - - if (targetsToAdd.length > 0) { - logger.info( - `Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` - ); - proxyJobs.push( - addSubnetProxyTargets( - newt.newtId, - targetsToAdd, - newt.version - ) - ); - } - - for (const client of addedClients) { - olmJobs.push( - addPeerData( - client.clientId, - siteResource.siteId, - generateRemoteSubnets([siteResource]), - generateAliasConfig([siteResource]) - ) - ); - } + continue; } - } - // here we use the existingSiteResource from BEFORE we updated the destination so we dont need to worry about updating destinations here - - // Generate targets for removed associations - if (clientSiteResourcesToRemove.length > 0) { - const removedClients = existingClients.filter((client) => - clientSiteResourcesToRemove.includes(client.clientId) - ); - - if (removedClients.length > 0) { - const targetsToRemove = generateSubnetProxyTargets( - siteResource, - removedClients + // Generate targets for added associations + if (clientSiteResourcesToAdd.length > 0) { + const addedClients = allClients.filter((client) => + clientSiteResourcesToAdd.includes(client.clientId) ); - if (targetsToRemove.length > 0) { - logger.info( - `Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` + if (addedClients.length > 0) { + const targetsToAdd = generateSubnetProxyTargets( + siteResource, + addedClients ); - proxyJobs.push( - removeSubnetProxyTargets( - newt.newtId, - targetsToRemove, - newt.version - ) - ); - } - for (const client of removedClients) { - // Check if this client still has access to another resource on this site with the same destination - const destinationStillInUse = await trx - .select() - .from(siteResources) - .innerJoin( - clientSiteResourcesAssociationsCache, - eq( - clientSiteResourcesAssociationsCache.siteResourceId, - siteResources.siteResourceId - ) - ) - .where( - and( - eq( - clientSiteResourcesAssociationsCache.clientId, - client.clientId - ), - eq(siteResources.siteId, siteResource.siteId), - eq( - siteResources.destination, - siteResource.destination - ), - ne( - siteResources.siteResourceId, - siteResource.siteResourceId - ) + if (targetsToAdd.length > 0) { + logger.info( + `Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId} on site ${siteId}` + ); + proxyJobs.push( + addSubnetProxyTargets( + newt.newtId, + targetsToAdd, + newt.version ) ); + } - // Only remove remote subnet if no other resource uses the same destination - const remoteSubnetsToRemove = - destinationStillInUse.length > 0 - ? [] - : generateRemoteSubnets([siteResource]); + for (const client of addedClients) { + olmJobs.push( + addPeerData( + client.clientId, + siteId, + generateRemoteSubnets([siteResource]), + generateAliasConfig([siteResource]) + ) + ); + } + } + } - olmJobs.push( - removePeerData( - client.clientId, - siteResource.siteId, - remoteSubnetsToRemove, - generateAliasConfig([siteResource]) - ) + // here we use the existingSiteResource from BEFORE we updated the destination so we dont need to worry about updating destinations here + + // Generate targets for removed associations + if (clientSiteResourcesToRemove.length > 0) { + const removedClients = existingClients.filter((client) => + clientSiteResourcesToRemove.includes(client.clientId) + ); + + if (removedClients.length > 0) { + const targetsToRemove = generateSubnetProxyTargets( + siteResource, + removedClients ); + + if (targetsToRemove.length > 0) { + logger.info( + `Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId} on site ${siteId}` + ); + proxyJobs.push( + removeSubnetProxyTargets( + newt.newtId, + targetsToRemove, + newt.version + ) + ); + } + + for (const client of removedClients) { + // Check if this client still has access to another resource + // on this specific site with the same destination. We scope + // by siteId (via siteNetworks) rather than networkId because + // removePeerData operates per-site — a resource on a different + // site sharing the same network should not block removal here. + const destinationStillInUse = await trx + .select() + .from(siteResources) + .innerJoin( + clientSiteResourcesAssociationsCache, + eq( + clientSiteResourcesAssociationsCache.siteResourceId, + siteResources.siteResourceId + ) + ) + .innerJoin( + siteNetworks, + eq(siteNetworks.networkId, siteResources.networkId) + ) + .where( + and( + eq( + clientSiteResourcesAssociationsCache.clientId, + client.clientId + ), + eq(siteNetworks.siteId, siteId), + eq( + siteResources.destination, + siteResource.destination + ), + ne( + siteResources.siteResourceId, + siteResource.siteResourceId + ) + ) + ); + + // Only remove remote subnet if no other resource uses the same destination + const remoteSubnetsToRemove = + destinationStillInUse.length > 0 + ? [] + : generateRemoteSubnets([siteResource]); + + olmJobs.push( + removePeerData( + client.clientId, + siteId, + remoteSubnetsToRemove, + generateAliasConfig([siteResource]) + ) + ); + } } } } @@ -868,10 +910,25 @@ export async function rebuildClientAssociationsFromClient( ) : []; - // Group by siteId for site-level associations - const newSiteIds = Array.from( - new Set(newSiteResources.map((sr) => sr.siteId)) + // Group by siteId for site-level associations — look up via siteNetworks since + // siteResources no longer carries a direct siteId column. + const networkIds = Array.from( + new Set( + newSiteResources + .map((sr) => sr.networkId) + .filter((id): id is number => id !== null) + ) ); + const newSiteIds = + networkIds.length > 0 + ? await trx + .select({ siteId: siteNetworks.siteId }) + .from(siteNetworks) + .where(inArray(siteNetworks.networkId, networkIds)) + .then((rows) => + Array.from(new Set(rows.map((r) => r.siteId))) + ) + : []; /////////// Process client-siteResource associations /////////// @@ -1144,13 +1201,45 @@ async function handleMessagesForClientResources( resourcesToAdd.includes(r.siteResourceId) ); + // Build (resource, siteId) pairs by looking up siteNetworks for each resource's networkId + const addedNetworkIds = Array.from( + new Set( + addedResources + .map((r) => r.networkId) + .filter((id): id is number => id !== null) + ) + ); + const addedSiteNetworkRows = + addedNetworkIds.length > 0 + ? await trx + .select({ + networkId: siteNetworks.networkId, + siteId: siteNetworks.siteId + }) + .from(siteNetworks) + .where(inArray(siteNetworks.networkId, addedNetworkIds)) + : []; + const addedNetworkToSites = new Map(); + for (const row of addedSiteNetworkRows) { + if (!addedNetworkToSites.has(row.networkId)) { + addedNetworkToSites.set(row.networkId, []); + } + addedNetworkToSites.get(row.networkId)!.push(row.siteId); + } + // Group by site for proxy updates const addedBySite = new Map(); for (const resource of addedResources) { - if (!addedBySite.has(resource.siteId)) { - addedBySite.set(resource.siteId, []); + const siteIds = + resource.networkId != null + ? (addedNetworkToSites.get(resource.networkId) ?? []) + : []; + for (const siteId of siteIds) { + if (!addedBySite.has(siteId)) { + addedBySite.set(siteId, []); + } + addedBySite.get(siteId)!.push(resource); } - addedBySite.get(resource.siteId)!.push(resource); } // Add subnet proxy targets for each site @@ -1192,7 +1281,7 @@ async function handleMessagesForClientResources( olmJobs.push( addPeerData( client.clientId, - resource.siteId, + siteId, generateRemoteSubnets([resource]), generateAliasConfig([resource]) ) @@ -1204,7 +1293,7 @@ async function handleMessagesForClientResources( error.message.includes("not found") ) { logger.debug( - `Olm data not found for client ${client.clientId} and site ${resource.siteId}, skipping removal` + `Olm data not found for client ${client.clientId} and site ${siteId}, skipping addition` ); } else { throw error; @@ -1221,13 +1310,45 @@ async function handleMessagesForClientResources( .from(siteResources) .where(inArray(siteResources.siteResourceId, resourcesToRemove)); + // Build (resource, siteId) pairs via siteNetworks + const removedNetworkIds = Array.from( + new Set( + removedResources + .map((r) => r.networkId) + .filter((id): id is number => id !== null) + ) + ); + const removedSiteNetworkRows = + removedNetworkIds.length > 0 + ? await trx + .select({ + networkId: siteNetworks.networkId, + siteId: siteNetworks.siteId + }) + .from(siteNetworks) + .where(inArray(siteNetworks.networkId, removedNetworkIds)) + : []; + const removedNetworkToSites = new Map(); + for (const row of removedSiteNetworkRows) { + if (!removedNetworkToSites.has(row.networkId)) { + removedNetworkToSites.set(row.networkId, []); + } + removedNetworkToSites.get(row.networkId)!.push(row.siteId); + } + // Group by site for proxy updates const removedBySite = new Map(); for (const resource of removedResources) { - if (!removedBySite.has(resource.siteId)) { - removedBySite.set(resource.siteId, []); + const siteIds = + resource.networkId != null + ? (removedNetworkToSites.get(resource.networkId) ?? []) + : []; + for (const siteId of siteIds) { + if (!removedBySite.has(siteId)) { + removedBySite.set(siteId, []); + } + removedBySite.get(siteId)!.push(resource); } - removedBySite.get(resource.siteId)!.push(resource); } // Remove subnet proxy targets for each site @@ -1265,7 +1386,11 @@ async function handleMessagesForClientResources( } try { - // Check if this client still has access to another resource on this site with the same destination + // Check if this client still has access to another resource + // on this specific site with the same destination. We scope + // by siteId (via siteNetworks) rather than networkId because + // removePeerData operates per-site — a resource on a different + // site sharing the same network should not block removal here. const destinationStillInUse = await trx .select() .from(siteResources) @@ -1276,13 +1401,17 @@ async function handleMessagesForClientResources( siteResources.siteResourceId ) ) + .innerJoin( + siteNetworks, + eq(siteNetworks.networkId, siteResources.networkId) + ) .where( and( eq( clientSiteResourcesAssociationsCache.clientId, client.clientId ), - eq(siteResources.siteId, resource.siteId), + eq(siteNetworks.siteId, siteId), eq( siteResources.destination, resource.destination @@ -1304,7 +1433,7 @@ async function handleMessagesForClientResources( olmJobs.push( removePeerData( client.clientId, - resource.siteId, + siteId, remoteSubnetsToRemove, generateAliasConfig([resource]) ) @@ -1316,7 +1445,7 @@ async function handleMessagesForClientResources( error.message.includes("not found") ) { logger.debug( - `Olm data not found for client ${client.clientId} and site ${resource.siteId}, skipping removal` + `Olm data not found for client ${client.clientId} and site ${siteId}, skipping removal` ); } else { throw error;