Try to fix deadlocks again

Fixes FOU-284
This commit is contained in:
Owen
2025-12-08 21:25:46 -05:00
parent e011580b96
commit 0a9b19ecfc
2 changed files with 225 additions and 182 deletions

View File

@@ -1,13 +0,0 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/

View File

@@ -14,12 +14,55 @@ import { build } from "@server/build";
// Track sites that are already offline to avoid unnecessary queries // Track sites that are already offline to avoid unnecessary queries
const offlineSites = new Set<string>(); const offlineSites = new Set<string>();
// Retry configuration for deadlock handling
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50;
interface PeerBandwidth { interface PeerBandwidth {
publicKey: string; publicKey: string;
bytesIn: number; bytesIn: number;
bytesOut: number; bytesOut: number;
} }
/**
* Check if an error is a deadlock error
*/
function isDeadlockError(error: any): boolean {
return (
error?.code === "40P01" ||
error?.cause?.code === "40P01" ||
(error?.message && error.message.includes("deadlock"))
);
}
/**
* Execute a function with retry logic for deadlock handling
*/
async function withDeadlockRetry<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isDeadlockError(error) && attempt < MAX_RETRIES) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}
export const receiveBandwidth = async ( export const receiveBandwidth = async (
req: Request, req: Request,
res: Response, res: Response,
@@ -60,24 +103,29 @@ export async function updateSiteBandwidth(
const currentTime = new Date(); const currentTime = new Date();
const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago
// logger.debug(`Received data: ${JSON.stringify(bandwidthData)}`); // Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances
// This is critical for preventing deadlocks when multiple instances update the same sites
const sortedBandwidthData = [...bandwidthData].sort((a, b) =>
a.publicKey.localeCompare(b.publicKey)
);
await db.transaction(async (trx) => {
// First, handle sites that are actively reporting bandwidth // First, handle sites that are actively reporting bandwidth
const activePeers = bandwidthData.filter((peer) => peer.bytesIn > 0); // Bytesout will have data as it tries to send keep alive messages const activePeers = sortedBandwidthData.filter((peer) => peer.bytesIn > 0);
// Aggregate usage data by organization (collected outside transaction)
const orgUsageMap = new Map<string, number>();
const orgUptimeMap = new Map<string, number>();
if (activePeers.length > 0) { if (activePeers.length > 0) {
// Remove any active peers from offline tracking since they're sending data // Remove any active peers from offline tracking since they're sending data
activePeers.forEach((peer) => offlineSites.delete(peer.publicKey)); activePeers.forEach((peer) => offlineSites.delete(peer.publicKey));
// Aggregate usage data by organization // Update each active site individually with retry logic
const orgUsageMap = new Map<string, number>(); // This reduces transaction scope and allows retries per-site
const orgUptimeMap = new Map<string, number>();
// Update all active sites with bandwidth data and get the site data in one operation
const updatedSites = [];
for (const peer of activePeers) { for (const peer of activePeers) {
const [updatedSite] = await trx try {
const updatedSite = await withDeadlockRetry(async () => {
const [result] = await db
.update(sites) .update(sites)
.set({ .set({
megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`, megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`,
@@ -92,45 +140,48 @@ export async function updateSiteBandwidth(
siteId: sites.siteId, siteId: sites.siteId,
lastBandwidthUpdate: sites.lastBandwidthUpdate lastBandwidthUpdate: sites.lastBandwidthUpdate
}); });
return result;
}, `update active site ${peer.publicKey}`);
if (updatedSite) { if (updatedSite) {
if (exitNodeId) { if (exitNodeId) {
if ( const notAllowed = await checkExitNodeOrg(
await checkExitNodeOrg(
exitNodeId, exitNodeId,
updatedSite.orgId, updatedSite.orgId
trx );
) if (notAllowed) {
) {
// not allowed
logger.warn( logger.warn(
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}` `Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
); );
// THIS SHOULD TRIGGER THE TRANSACTION TO FAIL? // Skip this site but continue processing others
throw new Error("Exit node not allowed"); continue;
} }
} }
updatedSites.push({ ...updatedSite, peer });
}
}
// Calculate org usage aggregations using the updated site data
for (const { peer, ...site } of updatedSites) {
// Aggregate bandwidth usage for the org // Aggregate bandwidth usage for the org
const totalBandwidth = peer.bytesIn + peer.bytesOut; const totalBandwidth = peer.bytesIn + peer.bytesOut;
const currentOrgUsage = orgUsageMap.get(site.orgId) || 0; const currentOrgUsage = orgUsageMap.get(updatedSite.orgId) || 0;
orgUsageMap.set(site.orgId, currentOrgUsage + totalBandwidth); orgUsageMap.set(updatedSite.orgId, currentOrgUsage + totalBandwidth);
// Add 10 seconds of uptime for each active site // Add 10 seconds of uptime for each active site
const currentOrgUptime = orgUptimeMap.get(site.orgId) || 0; const currentOrgUptime = orgUptimeMap.get(updatedSite.orgId) || 0;
orgUptimeMap.set(site.orgId, currentOrgUptime + 10 / 60); // Store in minutes and jut add 10 seconds orgUptimeMap.set(updatedSite.orgId, currentOrgUptime + 10 / 60);
}
} catch (error) {
logger.error(
`Failed to update bandwidth for site ${peer.publicKey}:`,
error
);
// Continue with other sites
}
}
} }
if (calcUsageAndLimits) { // Process usage updates outside of site update transactions
// REMOTE EXIT NODES DO NOT COUNT TOWARDS USAGE // This separates the concerns and reduces lock contention
// Process all usage updates sequentially by organization to reduce deadlock risk if (calcUsageAndLimits && (orgUsageMap.size > 0 || orgUptimeMap.size > 0)) {
const allOrgIds = new Set([...orgUsageMap.keys(), ...orgUptimeMap.keys()]); // Sort org IDs to ensure consistent lock ordering
const allOrgIds = [...new Set([...orgUsageMap.keys(), ...orgUptimeMap.keys()])].sort();
for (const orgId of allOrgIds) { for (const orgId of allOrgIds) {
try { try {
@@ -140,17 +191,16 @@ export async function updateSiteBandwidth(
const bandwidthUsage = await usageService.add( const bandwidthUsage = await usageService.add(
orgId, orgId,
FeatureId.EGRESS_DATA_MB, FeatureId.EGRESS_DATA_MB,
totalBandwidth, totalBandwidth
trx
); );
if (bandwidthUsage) { if (bandwidthUsage) {
// Fire and forget - don't block on limit checking
usageService usageService
.checkLimitSet( .checkLimitSet(
orgId, orgId,
true, true,
FeatureId.EGRESS_DATA_MB, FeatureId.EGRESS_DATA_MB,
bandwidthUsage, bandwidthUsage
trx
) )
.catch((error: any) => { .catch((error: any) => {
logger.error( logger.error(
@@ -167,17 +217,16 @@ export async function updateSiteBandwidth(
const uptimeUsage = await usageService.add( const uptimeUsage = await usageService.add(
orgId, orgId,
FeatureId.SITE_UPTIME, FeatureId.SITE_UPTIME,
totalUptime, totalUptime
trx
); );
if (uptimeUsage) { if (uptimeUsage) {
// Fire and forget - don't block on limit checking
usageService usageService
.checkLimitSet( .checkLimitSet(
orgId, orgId,
true, true,
FeatureId.SITE_UPTIME, FeatureId.SITE_UPTIME,
uptimeUsage, uptimeUsage
trx
) )
.catch((error: any) => { .catch((error: any) => {
logger.error( logger.error(
@@ -192,19 +241,19 @@ export async function updateSiteBandwidth(
`Error processing usage for org ${orgId}:`, `Error processing usage for org ${orgId}:`,
error error
); );
// Don't break the loop, continue with other orgs // Continue with other orgs
}
} }
} }
} }
// Handle sites that reported zero bandwidth but need online status updated // Handle sites that reported zero bandwidth but need online status updated
const zeroBandwidthPeers = bandwidthData.filter( const zeroBandwidthPeers = sortedBandwidthData.filter(
(peer) => peer.bytesIn === 0 && !offlineSites.has(peer.publicKey) // Bytesout will have data as it tries to send keep alive messages (peer) => peer.bytesIn === 0 && !offlineSites.has(peer.publicKey)
); );
if (zeroBandwidthPeers.length > 0) { if (zeroBandwidthPeers.length > 0) {
const zeroBandwidthSites = await trx // Fetch all zero bandwidth sites in one query
const zeroBandwidthSites = await db
.select() .select()
.from(sites) .from(sites)
.where( .where(
@@ -214,14 +263,17 @@ export async function updateSiteBandwidth(
) )
); );
for (const site of zeroBandwidthSites) { // Sort by siteId to ensure consistent lock ordering
const sortedZeroBandwidthSites = zeroBandwidthSites.sort(
(a, b) => a.siteId - b.siteId
);
for (const site of sortedZeroBandwidthSites) {
let newOnlineStatus = site.online; let newOnlineStatus = site.online;
// Check if site should go offline based on last bandwidth update WITH DATA // Check if site should go offline based on last bandwidth update WITH DATA
if (site.lastBandwidthUpdate) { if (site.lastBandwidthUpdate) {
const lastUpdateWithData = new Date( const lastUpdateWithData = new Date(site.lastBandwidthUpdate);
site.lastBandwidthUpdate
);
if (lastUpdateWithData < oneMinuteAgo) { if (lastUpdateWithData < oneMinuteAgo) {
newOnlineStatus = false; newOnlineStatus = false;
} }
@@ -230,31 +282,29 @@ export async function updateSiteBandwidth(
newOnlineStatus = false; newOnlineStatus = false;
} }
// Always update lastBandwidthUpdate to show this instance is receiving reports
// Only update online status if it changed // Only update online status if it changed
if (site.online !== newOnlineStatus) { if (site.online !== newOnlineStatus) {
const [updatedSite] = await trx try {
const updatedSite = await withDeadlockRetry(async () => {
const [result] = await db
.update(sites) .update(sites)
.set({ .set({
online: newOnlineStatus online: newOnlineStatus
}) })
.where(eq(sites.siteId, site.siteId)) .where(eq(sites.siteId, site.siteId))
.returning(); .returning();
return result;
}, `update offline status for site ${site.siteId}`);
if (updatedSite && exitNodeId) { if (updatedSite && exitNodeId) {
if ( const notAllowed = await checkExitNodeOrg(
await checkExitNodeOrg(
exitNodeId, exitNodeId,
updatedSite.orgId, updatedSite.orgId
trx );
) if (notAllowed) {
) {
// not allowed
logger.warn( logger.warn(
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}` `Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
); );
// THIS SHOULD TRIGGER THE TRANSACTION TO FAIL?
throw new Error("Exit node not allowed");
} }
} }
@@ -262,8 +312,14 @@ export async function updateSiteBandwidth(
if (!newOnlineStatus && site.pubKey) { if (!newOnlineStatus && site.pubKey) {
offlineSites.add(site.pubKey); offlineSites.add(site.pubKey);
} }
} catch (error) {
logger.error(
`Failed to update offline status for site ${site.siteId}:`,
error
);
// Continue with other sites
}
} }
} }
} }
});
} }