mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-04 01:36:39 +00:00
Try to remove deadlocks on client updates
This commit is contained in:
@@ -1,7 +1,7 @@
|
|||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { MessageHandler } from "@server/routers/ws";
|
import { MessageHandler } from "@server/routers/ws";
|
||||||
import { clients, Newt } from "@server/db";
|
import { clients } from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq, sql } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
|
|
||||||
interface PeerBandwidth {
|
interface PeerBandwidth {
|
||||||
@@ -10,13 +10,57 @@ interface PeerBandwidth {
|
|||||||
bytesOut: number;
|
bytesOut: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Retry configuration for deadlock handling
|
||||||
|
const MAX_RETRIES = 3;
|
||||||
|
const BASE_DELAY_MS = 50;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if an error is a deadlock error
|
||||||
|
*/
|
||||||
|
function isDeadlockError(error: any): boolean {
|
||||||
|
return (
|
||||||
|
error?.code === "40P01" ||
|
||||||
|
error?.cause?.code === "40P01" ||
|
||||||
|
(error?.message && error.message.includes("deadlock"))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a function with retry logic for deadlock handling
|
||||||
|
*/
|
||||||
|
async function withDeadlockRetry<T>(
|
||||||
|
operation: () => Promise<T>,
|
||||||
|
context: string
|
||||||
|
): Promise<T> {
|
||||||
|
let attempt = 0;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return await operation();
|
||||||
|
} catch (error: any) {
|
||||||
|
if (isDeadlockError(error) && attempt < MAX_RETRIES) {
|
||||||
|
attempt++;
|
||||||
|
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
||||||
|
const jitter = Math.random() * baseDelay;
|
||||||
|
const delay = baseDelay + jitter;
|
||||||
|
logger.warn(
|
||||||
|
`Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
||||||
|
);
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
export const handleReceiveBandwidthMessage: MessageHandler = async (
|
export const handleReceiveBandwidthMessage: MessageHandler = async (
|
||||||
context
|
context
|
||||||
) => {
|
) => {
|
||||||
const { message, client, sendToClient } = context;
|
const { message } = context;
|
||||||
|
|
||||||
if (!message.data.bandwidthData) {
|
if (!message.data.bandwidthData) {
|
||||||
logger.warn("No bandwidth data provided");
|
logger.warn("No bandwidth data provided");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
const bandwidthData: PeerBandwidth[] = message.data.bandwidthData;
|
const bandwidthData: PeerBandwidth[] = message.data.bandwidthData;
|
||||||
@@ -25,30 +69,40 @@ export const handleReceiveBandwidthMessage: MessageHandler = async (
|
|||||||
throw new Error("Invalid bandwidth data");
|
throw new Error("Invalid bandwidth data");
|
||||||
}
|
}
|
||||||
|
|
||||||
await db.transaction(async (trx) => {
|
// Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances
|
||||||
for (const peer of bandwidthData) {
|
// This is critical for preventing deadlocks when multiple instances update the same clients
|
||||||
const { publicKey, bytesIn, bytesOut } = peer;
|
const sortedBandwidthData = [...bandwidthData].sort((a, b) =>
|
||||||
|
a.publicKey.localeCompare(b.publicKey)
|
||||||
|
);
|
||||||
|
|
||||||
// Find the client by public key
|
const currentTime = new Date().toISOString();
|
||||||
const [client] = await trx
|
|
||||||
.select()
|
|
||||||
.from(clients)
|
|
||||||
.where(eq(clients.pubKey, publicKey))
|
|
||||||
.limit(1);
|
|
||||||
|
|
||||||
if (!client) {
|
// Update each client individually with retry logic
|
||||||
continue;
|
// This reduces transaction scope and allows retries per-client
|
||||||
}
|
for (const peer of sortedBandwidthData) {
|
||||||
|
const { publicKey, bytesIn, bytesOut } = peer;
|
||||||
|
|
||||||
// Update the client's bandwidth usage
|
try {
|
||||||
await trx
|
await withDeadlockRetry(async () => {
|
||||||
.update(clients)
|
// Use atomic SQL increment to avoid SELECT then UPDATE pattern
|
||||||
.set({
|
// This eliminates the need to read the current value first
|
||||||
megabytesOut: (client.megabytesIn || 0) + bytesIn,
|
await db
|
||||||
megabytesIn: (client.megabytesOut || 0) + bytesOut,
|
.update(clients)
|
||||||
lastBandwidthUpdate: new Date().toISOString()
|
.set({
|
||||||
})
|
// Note: bytesIn from peer goes to megabytesOut (data sent to client)
|
||||||
.where(eq(clients.clientId, client.clientId));
|
// and bytesOut from peer goes to megabytesIn (data received from client)
|
||||||
|
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
|
||||||
|
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
|
||||||
|
lastBandwidthUpdate: currentTime
|
||||||
|
})
|
||||||
|
.where(eq(clients.pubKey, publicKey));
|
||||||
|
}, `update client bandwidth ${publicKey}`);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to update bandwidth for client ${publicKey}:`,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
// Continue with other clients even if one fails
|
||||||
}
|
}
|
||||||
});
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user