From d948d2ec335f92f3498c642b25d08cbffb497f77 Mon Sep 17 00:00:00 2001 From: Owen Date: Fri, 3 Apr 2026 22:55:04 -0400 Subject: [PATCH] Try to prevent deadlocks --- server/routers/newt/pingAccumulator.ts | 186 +++++++++++++------------ 1 file changed, 98 insertions(+), 88 deletions(-) diff --git a/server/routers/newt/pingAccumulator.ts b/server/routers/newt/pingAccumulator.ts index 83afd613e..fe2cde216 100644 --- a/server/routers/newt/pingAccumulator.ts +++ b/server/routers/newt/pingAccumulator.ts @@ -1,6 +1,6 @@ import { db } from "@server/db"; import { sites, clients, olms } from "@server/db"; -import { eq, inArray } from "drizzle-orm"; +import { inArray } from "drizzle-orm"; import logger from "@server/logger"; /** @@ -21,7 +21,7 @@ import logger from "@server/logger"; */ const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds -const MAX_RETRIES = 2; +const MAX_RETRIES = 5; const BASE_DELAY_MS = 50; // ── Site (newt) pings ────────────────────────────────────────────────── @@ -36,6 +36,14 @@ const pendingOlmArchiveResets: Set = new Set(); let flushTimer: NodeJS.Timeout | null = null; +/** + * Guard that prevents two flush cycles from running concurrently. + * setInterval does not await async callbacks, so without this a slow flush + * (e.g. due to DB latency) would overlap with the next scheduled cycle and + * the two concurrent bulk UPDATEs would deadlock each other. + */ +let isFlushing = false; + // ── Public API ───────────────────────────────────────────────────────── /** @@ -72,6 +80,12 @@ export function recordClientPing( /** * Flush all accumulated site pings to the database. + * + * Each batch of up to BATCH_SIZE rows is written with a **single** UPDATE + * statement. We use the maximum timestamp across the batch so that `lastPing` + * reflects the most recent ping seen for any site in the group. This avoids + * the multi-statement transaction that previously created additional + * row-lock ordering hazards. */ async function flushSitePingsToDb(): Promise { if (pendingSitePings.size === 0) { @@ -83,55 +97,35 @@ async function flushSitePingsToDb(): Promise { const pingsToFlush = new Map(pendingSitePings); pendingSitePings.clear(); - // Sort by siteId for consistent lock ordering (prevents deadlocks) - const sortedEntries = Array.from(pingsToFlush.entries()).sort( - ([a], [b]) => a - b - ); + const entries = Array.from(pingsToFlush.entries()); const BATCH_SIZE = 50; - for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) { - const batch = sortedEntries.slice(i, i + BATCH_SIZE); + for (let i = 0; i < entries.length; i += BATCH_SIZE) { + const batch = entries.slice(i, i + BATCH_SIZE); + + // Use the latest timestamp in the batch so that `lastPing` always + // moves forward. Using a single timestamp for the whole batch means + // we only ever need one UPDATE statement (no transaction). + const maxTimestamp = Math.max(...batch.map(([, ts]) => ts)); + const siteIds = batch.map(([id]) => id); try { await withRetry(async () => { - // Group by timestamp for efficient bulk updates - const byTimestamp = new Map(); - for (const [siteId, timestamp] of batch) { - const group = byTimestamp.get(timestamp) || []; - group.push(siteId); - byTimestamp.set(timestamp, group); - } - - if (byTimestamp.size === 1) { - const [timestamp, siteIds] = Array.from( - byTimestamp.entries() - )[0]; - await db - .update(sites) - .set({ - online: true, - lastPing: timestamp - }) - .where(inArray(sites.siteId, siteIds)); - } else { - await db.transaction(async (tx) => { - for (const [timestamp, siteIds] of byTimestamp) { - await tx - .update(sites) - .set({ - online: true, - lastPing: timestamp - }) - .where(inArray(sites.siteId, siteIds)); - } - }); - } + await db + .update(sites) + .set({ + online: true, + lastPing: maxTimestamp + }) + .where(inArray(sites.siteId, siteIds)); }, "flushSitePingsToDb"); } catch (error) { logger.error( `Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`, { error } ); + // Re-queue only if the preserved timestamp is newer than any + // update that may have landed since we snapshotted. for (const [siteId, timestamp] of batch) { const existing = pendingSitePings.get(siteId); if (!existing || existing < timestamp) { @@ -144,6 +138,8 @@ async function flushSitePingsToDb(): Promise { /** * Flush all accumulated client (OLM) pings to the database. + * + * Same single-UPDATE-per-batch approach as `flushSitePingsToDb`. */ async function flushClientPingsToDb(): Promise { if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) { @@ -159,51 +155,25 @@ async function flushClientPingsToDb(): Promise { // ── Flush client pings ───────────────────────────────────────────── if (pingsToFlush.size > 0) { - const sortedEntries = Array.from(pingsToFlush.entries()).sort( - ([a], [b]) => a - b - ); + const entries = Array.from(pingsToFlush.entries()); const BATCH_SIZE = 50; - for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) { - const batch = sortedEntries.slice(i, i + BATCH_SIZE); + for (let i = 0; i < entries.length; i += BATCH_SIZE) { + const batch = entries.slice(i, i + BATCH_SIZE); + + const maxTimestamp = Math.max(...batch.map(([, ts]) => ts)); + const clientIds = batch.map(([id]) => id); try { await withRetry(async () => { - const byTimestamp = new Map(); - for (const [clientId, timestamp] of batch) { - const group = byTimestamp.get(timestamp) || []; - group.push(clientId); - byTimestamp.set(timestamp, group); - } - - if (byTimestamp.size === 1) { - const [timestamp, clientIds] = Array.from( - byTimestamp.entries() - )[0]; - await db - .update(clients) - .set({ - lastPing: timestamp, - online: true, - archived: false - }) - .where(inArray(clients.clientId, clientIds)); - } else { - await db.transaction(async (tx) => { - for (const [timestamp, clientIds] of byTimestamp) { - await tx - .update(clients) - .set({ - lastPing: timestamp, - online: true, - archived: false - }) - .where( - inArray(clients.clientId, clientIds) - ); - } - }); - } + await db + .update(clients) + .set({ + lastPing: maxTimestamp, + online: true, + archived: false + }) + .where(inArray(clients.clientId, clientIds)); }, "flushClientPingsToDb"); } catch (error) { logger.error( @@ -260,7 +230,12 @@ export async function flushPingsToDb(): Promise { /** * Simple retry wrapper with exponential backoff for transient errors - * (connection timeouts, unexpected disconnects). + * (deadlocks, connection timeouts, unexpected disconnects). + * + * PostgreSQL deadlocks (40P01) are always safe to retry: the database + * guarantees exactly one winner per deadlock pair, so the loser just needs + * to try again. MAX_RETRIES is intentionally higher than typical connection + * retry budgets to give deadlock victims enough chances to succeed. */ async function withRetry( operation: () => Promise, @@ -277,7 +252,8 @@ async function withRetry( const jitter = Math.random() * baseDelay; const delay = baseDelay + jitter; logger.warn( - `Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + `Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`, + { code: error?.code ?? error?.cause?.code } ); await new Promise((resolve) => setTimeout(resolve, delay)); continue; @@ -288,14 +264,14 @@ async function withRetry( } /** - * Detect transient connection errors that are safe to retry. + * Detect transient errors that are safe to retry. */ function isTransientError(error: any): boolean { if (!error) return false; const message = (error.message || "").toLowerCase(); const causeMessage = (error.cause?.message || "").toLowerCase(); - const code = error.code || ""; + const code = error.code || error.cause?.code || ""; // Connection timeout / terminated if ( @@ -308,12 +284,17 @@ function isTransientError(error: any): boolean { return true; } - // PostgreSQL deadlock + // PostgreSQL deadlock detected — always safe to retry (one winner guaranteed) if (code === "40P01" || message.includes("deadlock")) { return true; } - // ECONNRESET, ECONNREFUSED, EPIPE + // PostgreSQL serialization failure + if (code === "40001") { + return true; + } + + // ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT if ( code === "ECONNRESET" || code === "ECONNREFUSED" || @@ -337,12 +318,26 @@ export function startPingAccumulator(): void { } flushTimer = setInterval(async () => { + // Skip this tick if the previous flush is still in progress. + // setInterval does not await async callbacks, so without this guard + // two flush cycles can run concurrently and deadlock each other on + // overlapping bulk UPDATE statements. + if (isFlushing) { + logger.debug( + "Ping accumulator: previous flush still in progress, skipping cycle" + ); + return; + } + + isFlushing = true; try { await flushPingsToDb(); } catch (error) { logger.error("Unhandled error in ping accumulator flush", { error }); + } finally { + isFlushing = false; } }, FLUSH_INTERVAL_MS); @@ -364,7 +359,22 @@ export async function stopPingAccumulator(): Promise { flushTimer = null; } - // Final flush to persist any remaining pings + // Final flush to persist any remaining pings. + // Wait for any in-progress flush to finish first so we don't race. + if (isFlushing) { + logger.debug( + "Ping accumulator: waiting for in-progress flush before stopping…" + ); + await new Promise((resolve) => { + const poll = setInterval(() => { + if (!isFlushing) { + clearInterval(poll); + resolve(); + } + }, 50); + }); + } + try { await flushPingsToDb(); } catch (error) { @@ -379,4 +389,4 @@ export async function stopPingAccumulator(): Promise { */ export function getPendingPingCount(): number { return pendingSitePings.size + pendingClientPings.size; -} \ No newline at end of file +}