First pass at HA

This commit is contained in:
Owen
2026-03-23 11:44:02 -07:00
parent 1366901e24
commit 02033f611f

View File

@@ -11,6 +11,7 @@ import {
roleSiteResources, roleSiteResources,
Site, Site,
SiteResource, SiteResource,
siteNetworks,
siteResources, siteResources,
sites, sites,
Transaction, Transaction,
@@ -47,15 +48,23 @@ export async function getClientSiteResourceAccess(
siteResource: SiteResource, siteResource: SiteResource,
trx: Transaction | typeof db = db trx: Transaction | typeof db = db
) { ) {
// get the site // get all sites associated with this siteResource via its network
const [site] = await trx const sitesList = siteResource.networkId
? await trx
.select() .select()
.from(sites) .from(sites)
.where(eq(sites.siteId, siteResource.siteId)) .innerJoin(
.limit(1); siteNetworks,
eq(siteNetworks.siteId, sites.siteId)
)
.where(eq(siteNetworks.networkId, siteResource.networkId))
.then((rows) => rows.map((row) => row.sites))
: [];
if (!site) { if (sitesList.length === 0) {
throw new Error(`Site with ID ${siteResource.siteId} not found`); logger.warn(
`No sites found for siteResource ${siteResource.siteResourceId} with networkId ${siteResource.networkId}`
);
} }
const roleIds = await trx const roleIds = await trx
@@ -136,7 +145,7 @@ export async function getClientSiteResourceAccess(
const mergedAllClientIds = mergedAllClients.map((c) => c.clientId); const mergedAllClientIds = mergedAllClients.map((c) => c.clientId);
return { return {
site, sitesList,
mergedAllClients, mergedAllClients,
mergedAllClientIds mergedAllClientIds
}; };
@@ -152,17 +161,18 @@ export async function rebuildClientAssociationsFromSiteResource(
subnet: string | null; subnet: string | null;
}[]; }[];
}> { }> {
const siteId = siteResource.siteId; const { sitesList, mergedAllClients, mergedAllClientIds } =
const { site, mergedAllClients, mergedAllClientIds } =
await getClientSiteResourceAccess(siteResource, trx); await getClientSiteResourceAccess(siteResource, trx);
/////////// process the client-siteResource associations /////////// /////////// process the client-siteResource associations ///////////
// get all of the clients associated with other resources on this site // get all of the clients associated with other resources in the same network,
const allUpdatedClientsFromOtherResourcesOnThisSite = await trx // joined through siteNetworks so we know which siteId each client belongs to
const allUpdatedClientsFromOtherResourcesOnThisSite = siteResource.networkId
? await trx
.select({ .select({
clientId: clientSiteResourcesAssociationsCache.clientId clientId: clientSiteResourcesAssociationsCache.clientId,
siteId: siteNetworks.siteId
}) })
.from(clientSiteResourcesAssociationsCache) .from(clientSiteResourcesAssociationsCache)
.innerJoin( .innerJoin(
@@ -172,20 +182,30 @@ export async function rebuildClientAssociationsFromSiteResource(
siteResources.siteResourceId siteResources.siteResourceId
) )
) )
.innerJoin(
siteNetworks,
eq(siteNetworks.networkId, siteResources.networkId)
)
.where( .where(
and( and(
eq(siteResources.siteId, siteId), eq(siteResources.networkId, siteResource.networkId),
ne(siteResources.siteResourceId, siteResource.siteResourceId) ne(
siteResources.siteResourceId,
siteResource.siteResourceId
) )
); )
)
: [];
const allClientIdsFromOtherResourcesOnThisSite = Array.from( // Build a per-site map so the loop below can check by siteId rather than
new Set( // across the entire network.
allUpdatedClientsFromOtherResourcesOnThisSite.map( const clientsFromOtherResourcesBySite = new Map<number, Set<number>>();
(row) => row.clientId for (const row of allUpdatedClientsFromOtherResourcesOnThisSite) {
) if (!clientsFromOtherResourcesBySite.has(row.siteId)) {
) clientsFromOtherResourcesBySite.set(row.siteId, new Set());
); }
clientsFromOtherResourcesBySite.get(row.siteId)!.add(row.clientId);
}
const existingClientSiteResources = await trx const existingClientSiteResources = await trx
.select({ .select({
@@ -259,31 +279,39 @@ export async function rebuildClientAssociationsFromSiteResource(
/////////// process the client-site associations /////////// /////////// process the client-site associations ///////////
for (const site of sitesList) {
const siteId = site.siteId;
const existingClientSites = await trx const existingClientSites = await trx
.select({ .select({
clientId: clientSitesAssociationsCache.clientId clientId: clientSitesAssociationsCache.clientId
}) })
.from(clientSitesAssociationsCache) .from(clientSitesAssociationsCache)
.where(eq(clientSitesAssociationsCache.siteId, siteResource.siteId)); .where(eq(clientSitesAssociationsCache.siteId, siteId));
const existingClientSiteIds = existingClientSites.map( const existingClientSiteIds = existingClientSites.map(
(row) => row.clientId (row) => row.clientId
); );
// Get full client details for existing clients (needed for sending delete messages) // Get full client details for existing clients (needed for sending delete messages)
const existingClients = await trx const existingClients =
existingClientSiteIds.length > 0
? await trx
.select({ .select({
clientId: clients.clientId, clientId: clients.clientId,
pubKey: clients.pubKey, pubKey: clients.pubKey,
subnet: clients.subnet subnet: clients.subnet
}) })
.from(clients) .from(clients)
.where(inArray(clients.clientId, existingClientSiteIds)); .where(inArray(clients.clientId, existingClientSiteIds))
: [];
const otherResourceClientIds = clientsFromOtherResourcesBySite.get(siteId) ?? new Set<number>();
const clientSitesToAdd = mergedAllClientIds.filter( const clientSitesToAdd = mergedAllClientIds.filter(
(clientId) => (clientId) =>
!existingClientSiteIds.includes(clientId) && !existingClientSiteIds.includes(clientId) &&
!allClientIdsFromOtherResourcesOnThisSite.includes(clientId) // dont remove if there is still another connection for another site resource !otherResourceClientIds.has(clientId) // dont add if already connected via another site resource
); );
const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({ const clientSitesToInsert = clientSitesToAdd.map((clientId) => ({
@@ -302,7 +330,7 @@ export async function rebuildClientAssociationsFromSiteResource(
const clientSitesToRemove = existingClientSiteIds.filter( const clientSitesToRemove = existingClientSiteIds.filter(
(clientId) => (clientId) =>
!mergedAllClientIds.includes(clientId) && !mergedAllClientIds.includes(clientId) &&
!allClientIdsFromOtherResourcesOnThisSite.includes(clientId) // dont remove if there is still another connection for another site resource !otherResourceClientIds.has(clientId) // dont remove if there is still another connection for another site resource
); );
if (clientSitesToRemove.length > 0) { if (clientSitesToRemove.length > 0) {
@@ -319,8 +347,6 @@ export async function rebuildClientAssociationsFromSiteResource(
); );
} }
/////////// send the messages ///////////
// Now handle the messages to add/remove peers on both the newt and olm sides // Now handle the messages to add/remove peers on both the newt and olm sides
await handleMessagesForSiteClients( await handleMessagesForSiteClients(
site, site,
@@ -331,10 +357,12 @@ export async function rebuildClientAssociationsFromSiteResource(
clientSitesToRemove, clientSitesToRemove,
trx trx
); );
}
// Handle subnet proxy target updates for the resource associations // Handle subnet proxy target updates for the resource associations
await handleSubnetProxyTargetUpdates( await handleSubnetProxyTargetUpdates(
siteResource, siteResource,
sitesList,
mergedAllClients, mergedAllClients,
existingResourceClients, existingResourceClients,
clientSiteResourcesToAdd, clientSiteResourcesToAdd,
@@ -623,6 +651,7 @@ export async function updateClientSiteDestinations(
async function handleSubnetProxyTargetUpdates( async function handleSubnetProxyTargetUpdates(
siteResource: SiteResource, siteResource: SiteResource,
sitesList: Site[],
allClients: { allClients: {
clientId: number; clientId: number;
pubKey: string | null; pubKey: string | null;
@@ -637,22 +666,26 @@ async function handleSubnetProxyTargetUpdates(
clientSiteResourcesToRemove: number[], clientSiteResourcesToRemove: number[],
trx: Transaction | typeof db = db trx: Transaction | typeof db = db
): Promise<void> { ): Promise<void> {
const proxyJobs: Promise<any>[] = [];
const olmJobs: Promise<any>[] = [];
for (const siteData of sitesList) {
const siteId = siteData.siteId;
// Get the newt for this site // Get the newt for this site
const [newt] = await trx const [newt] = await trx
.select() .select()
.from(newts) .from(newts)
.where(eq(newts.siteId, siteResource.siteId)) .where(eq(newts.siteId, siteId))
.limit(1); .limit(1);
if (!newt) { if (!newt) {
logger.warn( logger.warn(
`Newt not found for site ${siteResource.siteId}, skipping subnet proxy target updates` `Newt not found for site ${siteId}, skipping subnet proxy target updates`
); );
return; continue;
} }
const proxyJobs = [];
const olmJobs = [];
// Generate targets for added associations // Generate targets for added associations
if (clientSiteResourcesToAdd.length > 0) { if (clientSiteResourcesToAdd.length > 0) {
const addedClients = allClients.filter((client) => const addedClients = allClients.filter((client) =>
@@ -667,7 +700,7 @@ async function handleSubnetProxyTargetUpdates(
if (targetsToAdd.length > 0) { if (targetsToAdd.length > 0) {
logger.info( logger.info(
`Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` `Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId} on site ${siteId}`
); );
proxyJobs.push( proxyJobs.push(
addSubnetProxyTargets( addSubnetProxyTargets(
@@ -682,7 +715,7 @@ async function handleSubnetProxyTargetUpdates(
olmJobs.push( olmJobs.push(
addPeerData( addPeerData(
client.clientId, client.clientId,
siteResource.siteId, siteId,
generateRemoteSubnets([siteResource]), generateRemoteSubnets([siteResource]),
generateAliasConfig([siteResource]) generateAliasConfig([siteResource])
) )
@@ -707,7 +740,7 @@ async function handleSubnetProxyTargetUpdates(
if (targetsToRemove.length > 0) { if (targetsToRemove.length > 0) {
logger.info( logger.info(
`Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` `Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId} on site ${siteId}`
); );
proxyJobs.push( proxyJobs.push(
removeSubnetProxyTargets( removeSubnetProxyTargets(
@@ -719,7 +752,11 @@ async function handleSubnetProxyTargetUpdates(
} }
for (const client of removedClients) { for (const client of removedClients) {
// Check if this client still has access to another resource on this site with the same destination // Check if this client still has access to another resource
// on this specific site with the same destination. We scope
// by siteId (via siteNetworks) rather than networkId because
// removePeerData operates per-site — a resource on a different
// site sharing the same network should not block removal here.
const destinationStillInUse = await trx const destinationStillInUse = await trx
.select() .select()
.from(siteResources) .from(siteResources)
@@ -730,13 +767,17 @@ async function handleSubnetProxyTargetUpdates(
siteResources.siteResourceId siteResources.siteResourceId
) )
) )
.innerJoin(
siteNetworks,
eq(siteNetworks.networkId, siteResources.networkId)
)
.where( .where(
and( and(
eq( eq(
clientSiteResourcesAssociationsCache.clientId, clientSiteResourcesAssociationsCache.clientId,
client.clientId client.clientId
), ),
eq(siteResources.siteId, siteResource.siteId), eq(siteNetworks.siteId, siteId),
eq( eq(
siteResources.destination, siteResources.destination,
siteResource.destination siteResource.destination
@@ -757,7 +798,7 @@ async function handleSubnetProxyTargetUpdates(
olmJobs.push( olmJobs.push(
removePeerData( removePeerData(
client.clientId, client.clientId,
siteResource.siteId, siteId,
remoteSubnetsToRemove, remoteSubnetsToRemove,
generateAliasConfig([siteResource]) generateAliasConfig([siteResource])
) )
@@ -765,6 +806,7 @@ async function handleSubnetProxyTargetUpdates(
} }
} }
} }
}
await Promise.all(proxyJobs); await Promise.all(proxyJobs);
} }
@@ -868,10 +910,25 @@ export async function rebuildClientAssociationsFromClient(
) )
: []; : [];
// Group by siteId for site-level associations // Group by siteId for site-level associations — look up via siteNetworks since
const newSiteIds = Array.from( // siteResources no longer carries a direct siteId column.
new Set(newSiteResources.map((sr) => sr.siteId)) const networkIds = Array.from(
new Set(
newSiteResources
.map((sr) => sr.networkId)
.filter((id): id is number => id !== null)
)
); );
const newSiteIds =
networkIds.length > 0
? await trx
.select({ siteId: siteNetworks.siteId })
.from(siteNetworks)
.where(inArray(siteNetworks.networkId, networkIds))
.then((rows) =>
Array.from(new Set(rows.map((r) => r.siteId)))
)
: [];
/////////// Process client-siteResource associations /////////// /////////// Process client-siteResource associations ///////////
@@ -1144,13 +1201,45 @@ async function handleMessagesForClientResources(
resourcesToAdd.includes(r.siteResourceId) resourcesToAdd.includes(r.siteResourceId)
); );
// Build (resource, siteId) pairs by looking up siteNetworks for each resource's networkId
const addedNetworkIds = Array.from(
new Set(
addedResources
.map((r) => r.networkId)
.filter((id): id is number => id !== null)
)
);
const addedSiteNetworkRows =
addedNetworkIds.length > 0
? await trx
.select({
networkId: siteNetworks.networkId,
siteId: siteNetworks.siteId
})
.from(siteNetworks)
.where(inArray(siteNetworks.networkId, addedNetworkIds))
: [];
const addedNetworkToSites = new Map<number, number[]>();
for (const row of addedSiteNetworkRows) {
if (!addedNetworkToSites.has(row.networkId)) {
addedNetworkToSites.set(row.networkId, []);
}
addedNetworkToSites.get(row.networkId)!.push(row.siteId);
}
// Group by site for proxy updates // Group by site for proxy updates
const addedBySite = new Map<number, SiteResource[]>(); const addedBySite = new Map<number, SiteResource[]>();
for (const resource of addedResources) { for (const resource of addedResources) {
if (!addedBySite.has(resource.siteId)) { const siteIds =
addedBySite.set(resource.siteId, []); resource.networkId != null
? (addedNetworkToSites.get(resource.networkId) ?? [])
: [];
for (const siteId of siteIds) {
if (!addedBySite.has(siteId)) {
addedBySite.set(siteId, []);
}
addedBySite.get(siteId)!.push(resource);
} }
addedBySite.get(resource.siteId)!.push(resource);
} }
// Add subnet proxy targets for each site // Add subnet proxy targets for each site
@@ -1192,7 +1281,7 @@ async function handleMessagesForClientResources(
olmJobs.push( olmJobs.push(
addPeerData( addPeerData(
client.clientId, client.clientId,
resource.siteId, siteId,
generateRemoteSubnets([resource]), generateRemoteSubnets([resource]),
generateAliasConfig([resource]) generateAliasConfig([resource])
) )
@@ -1204,7 +1293,7 @@ async function handleMessagesForClientResources(
error.message.includes("not found") error.message.includes("not found")
) { ) {
logger.debug( logger.debug(
`Olm data not found for client ${client.clientId} and site ${resource.siteId}, skipping removal` `Olm data not found for client ${client.clientId} and site ${siteId}, skipping addition`
); );
} else { } else {
throw error; throw error;
@@ -1221,13 +1310,45 @@ async function handleMessagesForClientResources(
.from(siteResources) .from(siteResources)
.where(inArray(siteResources.siteResourceId, resourcesToRemove)); .where(inArray(siteResources.siteResourceId, resourcesToRemove));
// Build (resource, siteId) pairs via siteNetworks
const removedNetworkIds = Array.from(
new Set(
removedResources
.map((r) => r.networkId)
.filter((id): id is number => id !== null)
)
);
const removedSiteNetworkRows =
removedNetworkIds.length > 0
? await trx
.select({
networkId: siteNetworks.networkId,
siteId: siteNetworks.siteId
})
.from(siteNetworks)
.where(inArray(siteNetworks.networkId, removedNetworkIds))
: [];
const removedNetworkToSites = new Map<number, number[]>();
for (const row of removedSiteNetworkRows) {
if (!removedNetworkToSites.has(row.networkId)) {
removedNetworkToSites.set(row.networkId, []);
}
removedNetworkToSites.get(row.networkId)!.push(row.siteId);
}
// Group by site for proxy updates // Group by site for proxy updates
const removedBySite = new Map<number, SiteResource[]>(); const removedBySite = new Map<number, SiteResource[]>();
for (const resource of removedResources) { for (const resource of removedResources) {
if (!removedBySite.has(resource.siteId)) { const siteIds =
removedBySite.set(resource.siteId, []); resource.networkId != null
? (removedNetworkToSites.get(resource.networkId) ?? [])
: [];
for (const siteId of siteIds) {
if (!removedBySite.has(siteId)) {
removedBySite.set(siteId, []);
}
removedBySite.get(siteId)!.push(resource);
} }
removedBySite.get(resource.siteId)!.push(resource);
} }
// Remove subnet proxy targets for each site // Remove subnet proxy targets for each site
@@ -1265,7 +1386,11 @@ async function handleMessagesForClientResources(
} }
try { try {
// Check if this client still has access to another resource on this site with the same destination // Check if this client still has access to another resource
// on this specific site with the same destination. We scope
// by siteId (via siteNetworks) rather than networkId because
// removePeerData operates per-site — a resource on a different
// site sharing the same network should not block removal here.
const destinationStillInUse = await trx const destinationStillInUse = await trx
.select() .select()
.from(siteResources) .from(siteResources)
@@ -1276,13 +1401,17 @@ async function handleMessagesForClientResources(
siteResources.siteResourceId siteResources.siteResourceId
) )
) )
.innerJoin(
siteNetworks,
eq(siteNetworks.networkId, siteResources.networkId)
)
.where( .where(
and( and(
eq( eq(
clientSiteResourcesAssociationsCache.clientId, clientSiteResourcesAssociationsCache.clientId,
client.clientId client.clientId
), ),
eq(siteResources.siteId, resource.siteId), eq(siteNetworks.siteId, siteId),
eq( eq(
siteResources.destination, siteResources.destination,
resource.destination resource.destination
@@ -1304,7 +1433,7 @@ async function handleMessagesForClientResources(
olmJobs.push( olmJobs.push(
removePeerData( removePeerData(
client.clientId, client.clientId,
resource.siteId, siteId,
remoteSubnetsToRemove, remoteSubnetsToRemove,
generateAliasConfig([resource]) generateAliasConfig([resource])
) )
@@ -1316,7 +1445,7 @@ async function handleMessagesForClientResources(
error.message.includes("not found") error.message.includes("not found")
) { ) {
logger.debug( logger.debug(
`Olm data not found for client ${client.clientId} and site ${resource.siteId}, skipping removal` `Olm data not found for client ${client.clientId} and site ${siteId}, skipping removal`
); );
} else { } else {
throw error; throw error;