diff --git a/server/routers/gerbil/updateHolePunch.ts b/server/routers/gerbil/updateHolePunch.ts index 810c44ff7..ac3ff3945 100644 --- a/server/routers/gerbil/updateHolePunch.ts +++ b/server/routers/gerbil/updateHolePunch.ts @@ -11,7 +11,7 @@ import { ExitNode } from "@server/db"; import { db } from "@server/db"; -import { eq, and } from "drizzle-orm"; +import { eq, and, inArray } from "drizzle-orm"; import HttpCode from "@server/types/HttpCode"; import createHttpError from "http-errors"; import logger from "@server/logger"; @@ -202,24 +202,29 @@ export async function updateAndGenerateEndpointDestinations( ) ); - // Update clientSites for each site on this exit node + // Format the endpoint properly for both IPv4 and IPv6 + const formattedEndpoint = formatEndpoint(ip, port); + + // Determine which rows actually need updating and whether the endpoint + // (as opposed to only the publicKey) changed for any of them. + const siteIdsToUpdate: number[] = []; + let endpointChanged = false; for (const site of sitesOnExitNode) { - // logger.debug( - // `Updating site ${site.siteId} on exit node ${exitNode.exitNodeId}` - // ); - - // Format the endpoint properly for both IPv4 and IPv6 - const formattedEndpoint = formatEndpoint(ip, port); - - // if the public key or endpoint has changed, update it otherwise continue if ( site.endpoint === formattedEndpoint && site.publicKey === publicKey ) { continue; } + siteIdsToUpdate.push(site.siteId); + if (site.endpoint !== formattedEndpoint) { + endpointChanged = true; + } + } - const [updatedClientSitesAssociationsCache] = await db + if (siteIdsToUpdate.length > 0) { + // Single bulk update for all affected rows for this client on this exit node + await db .update(clientSitesAssociationsCache) .set({ endpoint: formattedEndpoint, @@ -228,24 +233,22 @@ export async function updateAndGenerateEndpointDestinations( .where( and( eq(clientSitesAssociationsCache.clientId, olm.clientId), - eq(clientSitesAssociationsCache.siteId, site.siteId) + inArray( + clientSitesAssociationsCache.siteId, + siteIdsToUpdate + ) ) - ) - .returning(); + ); - if ( - updatedClientSitesAssociationsCache.endpoint !== - site.endpoint && // this is the endpoint from the join table not the site - updatedClient.pubKey === publicKey // only trigger if the client's public key matches the current public key which means it has registered so we dont prematurely send the update - ) { + // Only trigger downstream peer updates once per hole punch: the + // endpoint is the same for every site on this exit node, and + // handleClientEndpointChange already fans out to all connected + // sites for this client. + if (endpointChanged && updatedClient.pubKey === publicKey) { logger.info( - `ClientSitesAssociationsCache for client ${olm.clientId} and site ${site.siteId} endpoint changed from ${site.endpoint} to ${updatedClientSitesAssociationsCache.endpoint}` - ); - // Handle any additional logic for endpoint change - handleClientEndpointChange( - olm.clientId, - updatedClientSitesAssociationsCache.endpoint! + `ClientSitesAssociationsCache for client ${olm.clientId} endpoint changed to ${formattedEndpoint} for ${siteIdsToUpdate.length} site(s) on exit node ${exitNode.exitNodeId}` ); + handleClientEndpointChange(olm.clientId, formattedEndpoint); } } @@ -456,11 +459,11 @@ async function handleSiteEndpointChange(siteId: number, newEndpoint: string) { } } -async function handleClientEndpointChange( +async function handleClientEndpointChange( // TODO: I THINK WE DONT NEED TO HIT EVERY SITE HERE BECAUSE WE ONLY NEED TO UPDATE THE SITES CONNECTED TO THIS NODE WHICH WE ALREADY HAVE FROM ABOVE clientId: number, newEndpoint: string ) { - // Alert all sites connected to this client that the endpoint has changed (only if NOT relayed) + // Alert all sites connected to this client that the endpoint has changed (only if NOT relayed and NOT JIT MODE) try { // Get client details const [client] = await db @@ -480,6 +483,7 @@ async function handleClientEndpointChange( siteId: sites.siteId, newtId: newts.newtId, isRelayed: clientSitesAssociationsCache.isRelayed, + isJitMode: clientSitesAssociationsCache.isJitMode, subnet: clients.subnet }) .from(clientSitesAssociationsCache) @@ -495,37 +499,47 @@ async function handleClientEndpointChange( .where( and( eq(clientSitesAssociationsCache.clientId, clientId), - eq(clientSitesAssociationsCache.isRelayed, false) + eq(clientSitesAssociationsCache.isRelayed, false), + eq(clientSitesAssociationsCache.isJitMode, false) ) ); - // Update each non-relayed site with the new client endpoint - for (const siteData of connectedSites) { - try { - if (!siteData.subnet) { + if (connectedSites.length > 250) { + logger.warn( + `Client ${clientId} has ${connectedSites.length} connected sites so the client will be in jit mode anyway, skipping endpoint updates` + ); + return; + } + + // Update each non-relayed site with the new client endpoint (in parallel) + await Promise.allSettled( + connectedSites.map(async (siteData) => { + if (!siteData.subnet || !client.pubKey) { logger.warn( - `Client ${clientId} has no subnet, skipping update for site ${siteData.siteId}` + `Client ${clientId} has no subnet or public key, skipping update for site ${siteData.siteId}` ); - continue; + return; } - await updateNewtPeer( - siteData.siteId, - client.pubKey, - { - endpoint: newEndpoint - }, - siteData.newtId - ); - logger.debug( - `Updated site ${siteData.siteId} with new client ${clientId} endpoint: ${newEndpoint}` - ); - } catch (error) { - logger.error( - `Failed to update site ${siteData.siteId} with new client endpoint: ${error}` - ); - } - } + try { + await updateNewtPeer( + siteData.siteId, + client.pubKey, + { + endpoint: newEndpoint + }, + siteData.newtId + ); + logger.debug( + `Updated site ${siteData.siteId} with new client ${clientId} endpoint: ${newEndpoint}` + ); + } catch (error) { + logger.error( + `Failed to update site ${siteData.siteId} with new client endpoint: ${error}` + ); + } + }) + ); } catch (error) { logger.error( `Error handling client endpoint change for client ${clientId}: ${error}`