From d17ec6dc1fe0b20fe92ebabe1dd64db72e8d2707 Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 17:39:43 -0700 Subject: [PATCH] Try to solve th problem --- server/cleanup.ts | 2 + server/private/cleanup.ts | 2 + server/private/routers/ws/ws.ts | 37 +- server/routers/newt/getNewtToken.ts | 22 +- server/routers/newt/handleNewtPingMessage.ts | 19 +- server/routers/newt/pingAccumulator.ts | 382 +++++++++++++++++++ server/routers/olm/getOlmToken.ts | 4 +- server/routers/olm/handleOlmPingMessage.ts | 23 +- server/routers/ws/messageHandlers.ts | 5 + server/routers/ws/ws.ts | 23 +- 10 files changed, 446 insertions(+), 73 deletions(-) create mode 100644 server/routers/newt/pingAccumulator.ts diff --git a/server/cleanup.ts b/server/cleanup.ts index 137654827..3c462f3f2 100644 --- a/server/cleanup.ts +++ b/server/cleanup.ts @@ -1,8 +1,10 @@ import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; +import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator"; import { cleanup as wsCleanup } from "#dynamic/routers/ws"; async function cleanup() { + await stopPingAccumulator(); await flushBandwidthToDb(); await flushSiteBandwidthToDb(); await wsCleanup(); diff --git a/server/private/cleanup.ts b/server/private/cleanup.ts index 0bd9822dd..5321fbc9e 100644 --- a/server/private/cleanup.ts +++ b/server/private/cleanup.ts @@ -15,8 +15,10 @@ import { rateLimitService } from "#private/lib/rateLimit"; import { cleanup as wsCleanup } from "#private/routers/ws"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; +import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator"; async function cleanup() { + await stopPingAccumulator(); await flushBandwidthToDb(); await flushSiteBandwidthToDb(); await rateLimitService.cleanup(); diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index 4bfda5da8..d96c55c91 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -30,6 +30,7 @@ import { } from "@server/db"; import { eq } from "drizzle-orm"; import { db } from "@server/db"; +import { recordPing } from "@server/routers/newt/pingAccumulator"; import { validateNewtSessionToken } from "@server/auth/sessions/newt"; import { validateOlmSessionToken } from "@server/auth/sessions/olm"; import logger from "@server/logger"; @@ -197,11 +198,7 @@ const connectedClients: Map = new Map(); // Config version tracking map (local to this node, resets on server restart) const clientConfigVersions: Map = new Map(); -// Tracks the last Unix timestamp (seconds) at which a ping was flushed to the -// DB for a given siteId. Resets on server restart which is fine – the first -// ping after startup will always write, re-establishing the online state. -const lastPingDbWrite: Map = new Map(); -const PING_DB_WRITE_INTERVAL = 45; // seconds + // Recovery tracking let isRedisRecoveryInProgress = false; @@ -853,32 +850,16 @@ const setupConnection = async ( ); }); - // Handle WebSocket protocol-level pings from older newt clients that do - // not send application-level "newt/ping" messages. Update the site's - // online state and lastPing timestamp so the offline checker treats them - // the same as modern newt clients. if (clientType === "newt") { const newtClient = client as Newt; - ws.on("ping", async () => { + ws.on("ping", () => { if (!newtClient.siteId) return; - const now = Math.floor(Date.now() / 1000); - const lastWrite = lastPingDbWrite.get(newtClient.siteId) ?? 0; - if (now - lastWrite < PING_DB_WRITE_INTERVAL) return; - lastPingDbWrite.set(newtClient.siteId, now); - try { - await db - .update(sites) - .set({ - online: true, - lastPing: now - }) - .where(eq(sites.siteId, newtClient.siteId)); - } catch (error) { - logger.error( - "Error updating newt site online state on WS ping", - { error } - ); - } + // Record the ping in the accumulator instead of writing to the + // database on every WS ping frame. The accumulator flushes all + // pending pings in a single batched UPDATE every ~10s, which + // prevents connection pool exhaustion under load (especially + // with cross-region latency to the database). + recordPing(newtClient.siteId); }); } diff --git a/server/routers/newt/getNewtToken.ts b/server/routers/newt/getNewtToken.ts index 637973582..bc3cca9fc 100644 --- a/server/routers/newt/getNewtToken.ts +++ b/server/routers/newt/getNewtToken.ts @@ -1,5 +1,5 @@ import { generateSessionToken } from "@server/auth/sessions/app"; -import { db } from "@server/db"; +import { db, newtSessions } from "@server/db"; import { newts } from "@server/db"; import HttpCode from "@server/types/HttpCode"; import response from "@server/lib/response"; @@ -92,6 +92,26 @@ export async function getNewtToken( ); } + const [existingSession] = await db + .select() + .from(newtSessions) + .where(eq(newtSessions.newtId, existingNewt.newtId)); + + // if the session still has time in the expires, reuse it + if (existingSession && (existingSession.expiresAt + 30 * 60 * 1000) > Date.now()) { + return response<{ token: string; serverVersion: string }>(res, { + data: { + token: existingSession.sessionId, + serverVersion: APP_VERSION + }, + success: true, + error: false, + message: "Token created successfully", + status: HttpCode.OK + }); + } + + // otherwise generate a new one const resToken = generateSessionToken(); await createNewtSession(resToken, existingNewt.newtId); diff --git a/server/routers/newt/handleNewtPingMessage.ts b/server/routers/newt/handleNewtPingMessage.ts index 319647b83..da25852a0 100644 --- a/server/routers/newt/handleNewtPingMessage.ts +++ b/server/routers/newt/handleNewtPingMessage.ts @@ -5,6 +5,7 @@ import { Newt } from "@server/db"; import { eq, lt, isNull, and, or } from "drizzle-orm"; import logger from "@server/logger"; import { sendNewtSyncMessage } from "./sync"; +import { recordPing } from "./pingAccumulator"; // Track if the offline checker interval is running let offlineCheckerInterval: NodeJS.Timeout | null = null; @@ -114,18 +115,12 @@ export const handleNewtPingMessage: MessageHandler = async (context) => { return; } - try { - // Mark the site as online and record the ping timestamp. - await db - .update(sites) - .set({ - online: true, - lastPing: Math.floor(Date.now() / 1000) - }) - .where(eq(sites.siteId, newt.siteId)); - } catch (error) { - logger.error("Error updating online state on newt ping", { error }); - } + // Record the ping in memory; it will be flushed to the database + // periodically by the ping accumulator (every ~10s) in a single + // batched UPDATE instead of one query per ping. This prevents + // connection pool exhaustion under load, especially with + // cross-region latency to the database. + recordPing(newt.siteId); // Check config version and sync if stale. const configVersion = await getClientConfigVersion(newt.newtId); diff --git a/server/routers/newt/pingAccumulator.ts b/server/routers/newt/pingAccumulator.ts new file mode 100644 index 000000000..83afd613e --- /dev/null +++ b/server/routers/newt/pingAccumulator.ts @@ -0,0 +1,382 @@ +import { db } from "@server/db"; +import { sites, clients, olms } from "@server/db"; +import { eq, inArray } from "drizzle-orm"; +import logger from "@server/logger"; + +/** + * Ping Accumulator + * + * Instead of writing to the database on every single newt/olm ping (which + * causes pool exhaustion under load, especially with cross-region latency), + * we accumulate pings in memory and flush them to the database periodically + * in a single batch. + * + * This is the same pattern used for bandwidth flushing in + * receiveBandwidth.ts and handleReceiveBandwidthMessage.ts. + * + * Supports two kinds of pings: + * - **Site pings** (from newts): update `sites.online` and `sites.lastPing` + * - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`, + * `clients.archived`, and optionally reset `olms.archived` + */ + +const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds +const MAX_RETRIES = 2; +const BASE_DELAY_MS = 50; + +// ── Site (newt) pings ────────────────────────────────────────────────── +// Map of siteId -> latest ping timestamp (unix seconds) +const pendingSitePings: Map = new Map(); + +// ── Client (OLM) pings ──────────────────────────────────────────────── +// Map of clientId -> latest ping timestamp (unix seconds) +const pendingClientPings: Map = new Map(); +// Set of olmIds whose `archived` flag should be reset to false +const pendingOlmArchiveResets: Set = new Set(); + +let flushTimer: NodeJS.Timeout | null = null; + +// ── Public API ───────────────────────────────────────────────────────── + +/** + * Record a ping for a newt site. This does NOT write to the database + * immediately. Instead it stores the latest ping timestamp in memory, + * to be flushed periodically by the background timer. + */ +export function recordSitePing(siteId: number): void { + const now = Math.floor(Date.now() / 1000); + pendingSitePings.set(siteId, now); +} + +/** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */ +export const recordPing = recordSitePing; + +/** + * Record a ping for an OLM client. Batches the `clients` table update + * (`online`, `lastPing`, `archived`) and, when `olmArchived` is true, + * also queues an `olms` table update to clear the archived flag. + */ +export function recordClientPing( + clientId: number, + olmId: string, + olmArchived: boolean +): void { + const now = Math.floor(Date.now() / 1000); + pendingClientPings.set(clientId, now); + if (olmArchived) { + pendingOlmArchiveResets.add(olmId); + } +} + +// ── Flush Logic ──────────────────────────────────────────────────────── + +/** + * Flush all accumulated site pings to the database. + */ +async function flushSitePingsToDb(): Promise { + if (pendingSitePings.size === 0) { + return; + } + + // Snapshot and clear so new pings arriving during the flush go into a + // fresh map for the next cycle. + const pingsToFlush = new Map(pendingSitePings); + pendingSitePings.clear(); + + // Sort by siteId for consistent lock ordering (prevents deadlocks) + const sortedEntries = Array.from(pingsToFlush.entries()).sort( + ([a], [b]) => a - b + ); + + const BATCH_SIZE = 50; + for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) { + const batch = sortedEntries.slice(i, i + BATCH_SIZE); + + try { + await withRetry(async () => { + // Group by timestamp for efficient bulk updates + const byTimestamp = new Map(); + for (const [siteId, timestamp] of batch) { + const group = byTimestamp.get(timestamp) || []; + group.push(siteId); + byTimestamp.set(timestamp, group); + } + + if (byTimestamp.size === 1) { + const [timestamp, siteIds] = Array.from( + byTimestamp.entries() + )[0]; + await db + .update(sites) + .set({ + online: true, + lastPing: timestamp + }) + .where(inArray(sites.siteId, siteIds)); + } else { + await db.transaction(async (tx) => { + for (const [timestamp, siteIds] of byTimestamp) { + await tx + .update(sites) + .set({ + online: true, + lastPing: timestamp + }) + .where(inArray(sites.siteId, siteIds)); + } + }); + } + }, "flushSitePingsToDb"); + } catch (error) { + logger.error( + `Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`, + { error } + ); + for (const [siteId, timestamp] of batch) { + const existing = pendingSitePings.get(siteId); + if (!existing || existing < timestamp) { + pendingSitePings.set(siteId, timestamp); + } + } + } + } +} + +/** + * Flush all accumulated client (OLM) pings to the database. + */ +async function flushClientPingsToDb(): Promise { + if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) { + return; + } + + // Snapshot and clear + const pingsToFlush = new Map(pendingClientPings); + pendingClientPings.clear(); + + const olmResetsToFlush = new Set(pendingOlmArchiveResets); + pendingOlmArchiveResets.clear(); + + // ── Flush client pings ───────────────────────────────────────────── + if (pingsToFlush.size > 0) { + const sortedEntries = Array.from(pingsToFlush.entries()).sort( + ([a], [b]) => a - b + ); + + const BATCH_SIZE = 50; + for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) { + const batch = sortedEntries.slice(i, i + BATCH_SIZE); + + try { + await withRetry(async () => { + const byTimestamp = new Map(); + for (const [clientId, timestamp] of batch) { + const group = byTimestamp.get(timestamp) || []; + group.push(clientId); + byTimestamp.set(timestamp, group); + } + + if (byTimestamp.size === 1) { + const [timestamp, clientIds] = Array.from( + byTimestamp.entries() + )[0]; + await db + .update(clients) + .set({ + lastPing: timestamp, + online: true, + archived: false + }) + .where(inArray(clients.clientId, clientIds)); + } else { + await db.transaction(async (tx) => { + for (const [timestamp, clientIds] of byTimestamp) { + await tx + .update(clients) + .set({ + lastPing: timestamp, + online: true, + archived: false + }) + .where( + inArray(clients.clientId, clientIds) + ); + } + }); + } + }, "flushClientPingsToDb"); + } catch (error) { + logger.error( + `Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`, + { error } + ); + for (const [clientId, timestamp] of batch) { + const existing = pendingClientPings.get(clientId); + if (!existing || existing < timestamp) { + pendingClientPings.set(clientId, timestamp); + } + } + } + } + } + + // ── Flush OLM archive resets ─────────────────────────────────────── + if (olmResetsToFlush.size > 0) { + const olmIds = Array.from(olmResetsToFlush).sort(); + + const BATCH_SIZE = 50; + for (let i = 0; i < olmIds.length; i += BATCH_SIZE) { + const batch = olmIds.slice(i, i + BATCH_SIZE); + + try { + await withRetry(async () => { + await db + .update(olms) + .set({ archived: false }) + .where(inArray(olms.olmId, batch)); + }, "flushOlmArchiveResets"); + } catch (error) { + logger.error( + `Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`, + { error } + ); + for (const olmId of batch) { + pendingOlmArchiveResets.add(olmId); + } + } + } + } +} + +/** + * Flush everything — called by the interval timer and during shutdown. + */ +export async function flushPingsToDb(): Promise { + await flushSitePingsToDb(); + await flushClientPingsToDb(); +} + +// ── Retry / Error Helpers ────────────────────────────────────────────── + +/** + * Simple retry wrapper with exponential backoff for transient errors + * (connection timeouts, unexpected disconnects). + */ +async function withRetry( + operation: () => Promise, + context: string +): Promise { + let attempt = 0; + while (true) { + try { + return await operation(); + } catch (error: any) { + if (isTransientError(error) && attempt < MAX_RETRIES) { + attempt++; + const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; + const jitter = Math.random() * baseDelay; + const delay = baseDelay + jitter; + logger.warn( + `Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + throw error; + } + } +} + +/** + * Detect transient connection errors that are safe to retry. + */ +function isTransientError(error: any): boolean { + if (!error) return false; + + const message = (error.message || "").toLowerCase(); + const causeMessage = (error.cause?.message || "").toLowerCase(); + const code = error.code || ""; + + // Connection timeout / terminated + if ( + message.includes("connection timeout") || + message.includes("connection terminated") || + message.includes("timeout exceeded when trying to connect") || + causeMessage.includes("connection terminated unexpectedly") || + causeMessage.includes("connection timeout") + ) { + return true; + } + + // PostgreSQL deadlock + if (code === "40P01" || message.includes("deadlock")) { + return true; + } + + // ECONNRESET, ECONNREFUSED, EPIPE + if ( + code === "ECONNRESET" || + code === "ECONNREFUSED" || + code === "EPIPE" || + code === "ETIMEDOUT" + ) { + return true; + } + + return false; +} + +// ── Lifecycle ────────────────────────────────────────────────────────── + +/** + * Start the background flush timer. Call this once at server startup. + */ +export function startPingAccumulator(): void { + if (flushTimer) { + return; // Already running + } + + flushTimer = setInterval(async () => { + try { + await flushPingsToDb(); + } catch (error) { + logger.error("Unhandled error in ping accumulator flush", { + error + }); + } + }, FLUSH_INTERVAL_MS); + + // Don't prevent the process from exiting + flushTimer.unref(); + + logger.info( + `Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)` + ); +} + +/** + * Stop the background flush timer and perform a final flush. + * Call this during graceful shutdown. + */ +export async function stopPingAccumulator(): Promise { + if (flushTimer) { + clearInterval(flushTimer); + flushTimer = null; + } + + // Final flush to persist any remaining pings + try { + await flushPingsToDb(); + } catch (error) { + logger.error("Error during final ping accumulator flush", { error }); + } + + logger.info("Ping accumulator stopped"); +} + +/** + * Get the number of pending (unflushed) pings. Useful for monitoring. + */ +export function getPendingPingCount(): number { + return pendingSitePings.size + pendingClientPings.size; +} \ No newline at end of file diff --git a/server/routers/olm/getOlmToken.ts b/server/routers/olm/getOlmToken.ts index 2734a63bc..027e7ec15 100644 --- a/server/routers/olm/getOlmToken.ts +++ b/server/routers/olm/getOlmToken.ts @@ -8,7 +8,9 @@ import { ExitNode, exitNodes, sites, - clientSitesAssociationsCache + clientSitesAssociationsCache, + olmSessions, + olmSessions } from "@server/db"; import { olms } from "@server/db"; import HttpCode from "@server/types/HttpCode"; diff --git a/server/routers/olm/handleOlmPingMessage.ts b/server/routers/olm/handleOlmPingMessage.ts index efcbf1696..0f520b234 100644 --- a/server/routers/olm/handleOlmPingMessage.ts +++ b/server/routers/olm/handleOlmPingMessage.ts @@ -3,6 +3,7 @@ import { db } from "@server/db"; import { MessageHandler } from "@server/routers/ws"; import { clients, olms, Olm } from "@server/db"; import { eq, lt, isNull, and, or } from "drizzle-orm"; +import { recordClientPing } from "@server/routers/newt/pingAccumulator"; import logger from "@server/logger"; import { validateSessionToken } from "@server/auth/sessions/app"; import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy"; @@ -201,22 +202,12 @@ export const handleOlmPingMessage: MessageHandler = async (context) => { await sendOlmSyncMessage(olm, client); } - // Update the client's last ping timestamp - await db - .update(clients) - .set({ - lastPing: Math.floor(Date.now() / 1000), - online: true, - archived: false - }) - .where(eq(clients.clientId, olm.clientId)); - - if (olm.archived) { - await db - .update(olms) - .set({ archived: false }) - .where(eq(olms.olmId, olm.olmId)); - } + // Record the ping in memory; it will be flushed to the database + // periodically by the ping accumulator (every ~10s) in a single + // batched UPDATE instead of one query per ping. This prevents + // connection pool exhaustion under load, especially with + // cross-region latency to the database. + recordClientPing(olm.clientId, olm.olmId, !!olm.archived); } catch (error) { logger.error("Error handling ping message", { error }); } diff --git a/server/routers/ws/messageHandlers.ts b/server/routers/ws/messageHandlers.ts index 628caafd5..143e4d516 100644 --- a/server/routers/ws/messageHandlers.ts +++ b/server/routers/ws/messageHandlers.ts @@ -11,6 +11,7 @@ import { startNewtOfflineChecker, handleNewtDisconnectingMessage } from "../newt"; +import { startPingAccumulator } from "../newt/pingAccumulator"; import { handleOlmRegisterMessage, handleOlmRelayMessage, @@ -46,6 +47,10 @@ export const messageHandlers: Record = { "ws/round-trip/complete": handleRoundTripMessage }; +// Start the ping accumulator for all builds — it batches per-site online/lastPing +// updates into periodic bulk writes, preventing connection pool exhaustion. +startPingAccumulator(); + if (build != "saas") { startOlmOfflineChecker(); // this is to handle the offline check for olms startNewtOfflineChecker(); // this is to handle the offline check for newts diff --git a/server/routers/ws/ws.ts b/server/routers/ws/ws.ts index 08a7dbd4c..6e6312715 100644 --- a/server/routers/ws/ws.ts +++ b/server/routers/ws/ws.ts @@ -6,6 +6,7 @@ import { Socket } from "net"; import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db"; import { eq } from "drizzle-orm"; import { db } from "@server/db"; +import { recordPing } from "@server/routers/newt/pingAccumulator"; import { validateNewtSessionToken } from "@server/auth/sessions/newt"; import { validateOlmSessionToken } from "@server/auth/sessions/olm"; import { messageHandlers } from "./messageHandlers"; @@ -386,22 +387,14 @@ const setupConnection = async ( // the same as modern newt clients. if (clientType === "newt") { const newtClient = client as Newt; - ws.on("ping", async () => { + ws.on("ping", () => { if (!newtClient.siteId) return; - try { - await db - .update(sites) - .set({ - online: true, - lastPing: Math.floor(Date.now() / 1000) - }) - .where(eq(sites.siteId, newtClient.siteId)); - } catch (error) { - logger.error( - "Error updating newt site online state on WS ping", - { error } - ); - } + // Record the ping in the accumulator instead of writing to the + // database on every WS ping frame. The accumulator flushes all + // pending pings in a single batched UPDATE every ~10s, which + // prevents connection pool exhaustion under load (especially + // with cross-region latency to the database). + recordPing(newtClient.siteId); }); }