import { db } from "@server/db"; import { sites, clients, olms } from "@server/db"; import { eq, inArray } from "drizzle-orm"; import logger from "@server/logger"; /** * Ping Accumulator * * Instead of writing to the database on every single newt/olm ping (which * causes pool exhaustion under load, especially with cross-region latency), * we accumulate pings in memory and flush them to the database periodically * in a single batch. * * This is the same pattern used for bandwidth flushing in * receiveBandwidth.ts and handleReceiveBandwidthMessage.ts. * * Supports two kinds of pings: * - **Site pings** (from newts): update `sites.online` and `sites.lastPing` * - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`, * `clients.archived`, and optionally reset `olms.archived` */ const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds const MAX_RETRIES = 2; const BASE_DELAY_MS = 50; // ── Site (newt) pings ────────────────────────────────────────────────── // Map of siteId -> latest ping timestamp (unix seconds) const pendingSitePings: Map = new Map(); // ── Client (OLM) pings ──────────────────────────────────────────────── // Map of clientId -> latest ping timestamp (unix seconds) const pendingClientPings: Map = new Map(); // Set of olmIds whose `archived` flag should be reset to false const pendingOlmArchiveResets: Set = new Set(); let flushTimer: NodeJS.Timeout | null = null; // ── Public API ───────────────────────────────────────────────────────── /** * Record a ping for a newt site. This does NOT write to the database * immediately. Instead it stores the latest ping timestamp in memory, * to be flushed periodically by the background timer. */ export function recordSitePing(siteId: number): void { const now = Math.floor(Date.now() / 1000); pendingSitePings.set(siteId, now); } /** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */ export const recordPing = recordSitePing; /** * Record a ping for an OLM client. Batches the `clients` table update * (`online`, `lastPing`, `archived`) and, when `olmArchived` is true, * also queues an `olms` table update to clear the archived flag. */ export function recordClientPing( clientId: number, olmId: string, olmArchived: boolean ): void { const now = Math.floor(Date.now() / 1000); pendingClientPings.set(clientId, now); if (olmArchived) { pendingOlmArchiveResets.add(olmId); } } // ── Flush Logic ──────────────────────────────────────────────────────── /** * Flush all accumulated site pings to the database. */ async function flushSitePingsToDb(): Promise { if (pendingSitePings.size === 0) { return; } // Snapshot and clear so new pings arriving during the flush go into a // fresh map for the next cycle. const pingsToFlush = new Map(pendingSitePings); pendingSitePings.clear(); // Sort by siteId for consistent lock ordering (prevents deadlocks) const sortedEntries = Array.from(pingsToFlush.entries()).sort( ([a], [b]) => a - b ); const BATCH_SIZE = 50; for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) { const batch = sortedEntries.slice(i, i + BATCH_SIZE); try { await withRetry(async () => { // Group by timestamp for efficient bulk updates const byTimestamp = new Map(); for (const [siteId, timestamp] of batch) { const group = byTimestamp.get(timestamp) || []; group.push(siteId); byTimestamp.set(timestamp, group); } if (byTimestamp.size === 1) { const [timestamp, siteIds] = Array.from( byTimestamp.entries() )[0]; await db .update(sites) .set({ online: true, lastPing: timestamp }) .where(inArray(sites.siteId, siteIds)); } else { await db.transaction(async (tx) => { for (const [timestamp, siteIds] of byTimestamp) { await tx .update(sites) .set({ online: true, lastPing: timestamp }) .where(inArray(sites.siteId, siteIds)); } }); } }, "flushSitePingsToDb"); } catch (error) { logger.error( `Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`, { error } ); for (const [siteId, timestamp] of batch) { const existing = pendingSitePings.get(siteId); if (!existing || existing < timestamp) { pendingSitePings.set(siteId, timestamp); } } } } } /** * Flush all accumulated client (OLM) pings to the database. */ async function flushClientPingsToDb(): Promise { if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) { return; } // Snapshot and clear const pingsToFlush = new Map(pendingClientPings); pendingClientPings.clear(); const olmResetsToFlush = new Set(pendingOlmArchiveResets); pendingOlmArchiveResets.clear(); // ── Flush client pings ───────────────────────────────────────────── if (pingsToFlush.size > 0) { const sortedEntries = Array.from(pingsToFlush.entries()).sort( ([a], [b]) => a - b ); const BATCH_SIZE = 50; for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) { const batch = sortedEntries.slice(i, i + BATCH_SIZE); try { await withRetry(async () => { const byTimestamp = new Map(); for (const [clientId, timestamp] of batch) { const group = byTimestamp.get(timestamp) || []; group.push(clientId); byTimestamp.set(timestamp, group); } if (byTimestamp.size === 1) { const [timestamp, clientIds] = Array.from( byTimestamp.entries() )[0]; await db .update(clients) .set({ lastPing: timestamp, online: true, archived: false }) .where(inArray(clients.clientId, clientIds)); } else { await db.transaction(async (tx) => { for (const [timestamp, clientIds] of byTimestamp) { await tx .update(clients) .set({ lastPing: timestamp, online: true, archived: false }) .where( inArray(clients.clientId, clientIds) ); } }); } }, "flushClientPingsToDb"); } catch (error) { logger.error( `Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`, { error } ); for (const [clientId, timestamp] of batch) { const existing = pendingClientPings.get(clientId); if (!existing || existing < timestamp) { pendingClientPings.set(clientId, timestamp); } } } } } // ── Flush OLM archive resets ─────────────────────────────────────── if (olmResetsToFlush.size > 0) { const olmIds = Array.from(olmResetsToFlush).sort(); const BATCH_SIZE = 50; for (let i = 0; i < olmIds.length; i += BATCH_SIZE) { const batch = olmIds.slice(i, i + BATCH_SIZE); try { await withRetry(async () => { await db .update(olms) .set({ archived: false }) .where(inArray(olms.olmId, batch)); }, "flushOlmArchiveResets"); } catch (error) { logger.error( `Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`, { error } ); for (const olmId of batch) { pendingOlmArchiveResets.add(olmId); } } } } } /** * Flush everything — called by the interval timer and during shutdown. */ export async function flushPingsToDb(): Promise { await flushSitePingsToDb(); await flushClientPingsToDb(); } // ── Retry / Error Helpers ────────────────────────────────────────────── /** * Simple retry wrapper with exponential backoff for transient errors * (connection timeouts, unexpected disconnects). */ async function withRetry( operation: () => Promise, context: string ): Promise { let attempt = 0; while (true) { try { return await operation(); } catch (error: any) { if (isTransientError(error) && attempt < MAX_RETRIES) { attempt++; const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; const jitter = Math.random() * baseDelay; const delay = baseDelay + jitter; logger.warn( `Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` ); await new Promise((resolve) => setTimeout(resolve, delay)); continue; } throw error; } } } /** * Detect transient connection errors that are safe to retry. */ function isTransientError(error: any): boolean { if (!error) return false; const message = (error.message || "").toLowerCase(); const causeMessage = (error.cause?.message || "").toLowerCase(); const code = error.code || ""; // Connection timeout / terminated if ( message.includes("connection timeout") || message.includes("connection terminated") || message.includes("timeout exceeded when trying to connect") || causeMessage.includes("connection terminated unexpectedly") || causeMessage.includes("connection timeout") ) { return true; } // PostgreSQL deadlock if (code === "40P01" || message.includes("deadlock")) { return true; } // ECONNRESET, ECONNREFUSED, EPIPE if ( code === "ECONNRESET" || code === "ECONNREFUSED" || code === "EPIPE" || code === "ETIMEDOUT" ) { return true; } return false; } // ── Lifecycle ────────────────────────────────────────────────────────── /** * Start the background flush timer. Call this once at server startup. */ export function startPingAccumulator(): void { if (flushTimer) { return; // Already running } flushTimer = setInterval(async () => { try { await flushPingsToDb(); } catch (error) { logger.error("Unhandled error in ping accumulator flush", { error }); } }, FLUSH_INTERVAL_MS); // Don't prevent the process from exiting flushTimer.unref(); logger.info( `Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)` ); } /** * Stop the background flush timer and perform a final flush. * Call this during graceful shutdown. */ export async function stopPingAccumulator(): Promise { if (flushTimer) { clearInterval(flushTimer); flushTimer = null; } // Final flush to persist any remaining pings try { await flushPingsToDb(); } catch (error) { logger.error("Error during final ping accumulator flush", { error }); } logger.info("Ping accumulator stopped"); } /** * Get the number of pending (unflushed) pings. Useful for monitoring. */ export function getPendingPingCount(): number { return pendingSitePings.size + pendingClientPings.size; }