mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-25 12:06:37 +00:00
382 lines
14 KiB
TypeScript
382 lines
14 KiB
TypeScript
import { db } from "@server/db";
|
|
import { sites, clients, olms } from "@server/db";
|
|
import { eq, inArray } from "drizzle-orm";
|
|
import logger from "@server/logger";
|
|
|
|
/**
|
|
* Ping Accumulator
|
|
*
|
|
* Instead of writing to the database on every single newt/olm ping (which
|
|
* causes pool exhaustion under load, especially with cross-region latency),
|
|
* we accumulate pings in memory and flush them to the database periodically
|
|
* in a single batch.
|
|
*
|
|
* This is the same pattern used for bandwidth flushing in
|
|
* receiveBandwidth.ts and handleReceiveBandwidthMessage.ts.
|
|
*
|
|
* Supports two kinds of pings:
|
|
* - **Site pings** (from newts): update `sites.online` and `sites.lastPing`
|
|
* - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`,
|
|
* `clients.archived`, and optionally reset `olms.archived`
|
|
*/
|
|
|
|
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
|
const MAX_RETRIES = 2;
|
|
const BASE_DELAY_MS = 50;
|
|
|
|
// ── Site (newt) pings ──────────────────────────────────────────────────
|
|
// Map of siteId -> latest ping timestamp (unix seconds)
|
|
const pendingSitePings: Map<number, number> = new Map();
|
|
|
|
// ── Client (OLM) pings ────────────────────────────────────────────────
|
|
// Map of clientId -> latest ping timestamp (unix seconds)
|
|
const pendingClientPings: Map<number, number> = new Map();
|
|
// Set of olmIds whose `archived` flag should be reset to false
|
|
const pendingOlmArchiveResets: Set<string> = new Set();
|
|
|
|
let flushTimer: NodeJS.Timeout | null = null;
|
|
|
|
// ── Public API ─────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Record a ping for a newt site. This does NOT write to the database
|
|
* immediately. Instead it stores the latest ping timestamp in memory,
|
|
* to be flushed periodically by the background timer.
|
|
*/
|
|
export function recordSitePing(siteId: number): void {
|
|
const now = Math.floor(Date.now() / 1000);
|
|
pendingSitePings.set(siteId, now);
|
|
}
|
|
|
|
/** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */
|
|
export const recordPing = recordSitePing;
|
|
|
|
/**
|
|
* Record a ping for an OLM client. Batches the `clients` table update
|
|
* (`online`, `lastPing`, `archived`) and, when `olmArchived` is true,
|
|
* also queues an `olms` table update to clear the archived flag.
|
|
*/
|
|
export function recordClientPing(
|
|
clientId: number,
|
|
olmId: string,
|
|
olmArchived: boolean
|
|
): void {
|
|
const now = Math.floor(Date.now() / 1000);
|
|
pendingClientPings.set(clientId, now);
|
|
if (olmArchived) {
|
|
pendingOlmArchiveResets.add(olmId);
|
|
}
|
|
}
|
|
|
|
// ── Flush Logic ────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Flush all accumulated site pings to the database.
|
|
*/
|
|
async function flushSitePingsToDb(): Promise<void> {
|
|
if (pendingSitePings.size === 0) {
|
|
return;
|
|
}
|
|
|
|
// Snapshot and clear so new pings arriving during the flush go into a
|
|
// fresh map for the next cycle.
|
|
const pingsToFlush = new Map(pendingSitePings);
|
|
pendingSitePings.clear();
|
|
|
|
// Sort by siteId for consistent lock ordering (prevents deadlocks)
|
|
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
|
([a], [b]) => a - b
|
|
);
|
|
|
|
const BATCH_SIZE = 50;
|
|
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
|
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
|
|
|
try {
|
|
await withRetry(async () => {
|
|
// Group by timestamp for efficient bulk updates
|
|
const byTimestamp = new Map<number, number[]>();
|
|
for (const [siteId, timestamp] of batch) {
|
|
const group = byTimestamp.get(timestamp) || [];
|
|
group.push(siteId);
|
|
byTimestamp.set(timestamp, group);
|
|
}
|
|
|
|
if (byTimestamp.size === 1) {
|
|
const [timestamp, siteIds] = Array.from(
|
|
byTimestamp.entries()
|
|
)[0];
|
|
await db
|
|
.update(sites)
|
|
.set({
|
|
online: true,
|
|
lastPing: timestamp
|
|
})
|
|
.where(inArray(sites.siteId, siteIds));
|
|
} else {
|
|
await db.transaction(async (tx) => {
|
|
for (const [timestamp, siteIds] of byTimestamp) {
|
|
await tx
|
|
.update(sites)
|
|
.set({
|
|
online: true,
|
|
lastPing: timestamp
|
|
})
|
|
.where(inArray(sites.siteId, siteIds));
|
|
}
|
|
});
|
|
}
|
|
}, "flushSitePingsToDb");
|
|
} catch (error) {
|
|
logger.error(
|
|
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
|
{ error }
|
|
);
|
|
for (const [siteId, timestamp] of batch) {
|
|
const existing = pendingSitePings.get(siteId);
|
|
if (!existing || existing < timestamp) {
|
|
pendingSitePings.set(siteId, timestamp);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Flush all accumulated client (OLM) pings to the database.
|
|
*/
|
|
async function flushClientPingsToDb(): Promise<void> {
|
|
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
|
return;
|
|
}
|
|
|
|
// Snapshot and clear
|
|
const pingsToFlush = new Map(pendingClientPings);
|
|
pendingClientPings.clear();
|
|
|
|
const olmResetsToFlush = new Set(pendingOlmArchiveResets);
|
|
pendingOlmArchiveResets.clear();
|
|
|
|
// ── Flush client pings ─────────────────────────────────────────────
|
|
if (pingsToFlush.size > 0) {
|
|
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
|
([a], [b]) => a - b
|
|
);
|
|
|
|
const BATCH_SIZE = 50;
|
|
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
|
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
|
|
|
try {
|
|
await withRetry(async () => {
|
|
const byTimestamp = new Map<number, number[]>();
|
|
for (const [clientId, timestamp] of batch) {
|
|
const group = byTimestamp.get(timestamp) || [];
|
|
group.push(clientId);
|
|
byTimestamp.set(timestamp, group);
|
|
}
|
|
|
|
if (byTimestamp.size === 1) {
|
|
const [timestamp, clientIds] = Array.from(
|
|
byTimestamp.entries()
|
|
)[0];
|
|
await db
|
|
.update(clients)
|
|
.set({
|
|
lastPing: timestamp,
|
|
online: true,
|
|
archived: false
|
|
})
|
|
.where(inArray(clients.clientId, clientIds));
|
|
} else {
|
|
await db.transaction(async (tx) => {
|
|
for (const [timestamp, clientIds] of byTimestamp) {
|
|
await tx
|
|
.update(clients)
|
|
.set({
|
|
lastPing: timestamp,
|
|
online: true,
|
|
archived: false
|
|
})
|
|
.where(
|
|
inArray(clients.clientId, clientIds)
|
|
);
|
|
}
|
|
});
|
|
}
|
|
}, "flushClientPingsToDb");
|
|
} catch (error) {
|
|
logger.error(
|
|
`Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`,
|
|
{ error }
|
|
);
|
|
for (const [clientId, timestamp] of batch) {
|
|
const existing = pendingClientPings.get(clientId);
|
|
if (!existing || existing < timestamp) {
|
|
pendingClientPings.set(clientId, timestamp);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Flush OLM archive resets ───────────────────────────────────────
|
|
if (olmResetsToFlush.size > 0) {
|
|
const olmIds = Array.from(olmResetsToFlush).sort();
|
|
|
|
const BATCH_SIZE = 50;
|
|
for (let i = 0; i < olmIds.length; i += BATCH_SIZE) {
|
|
const batch = olmIds.slice(i, i + BATCH_SIZE);
|
|
|
|
try {
|
|
await withRetry(async () => {
|
|
await db
|
|
.update(olms)
|
|
.set({ archived: false })
|
|
.where(inArray(olms.olmId, batch));
|
|
}, "flushOlmArchiveResets");
|
|
} catch (error) {
|
|
logger.error(
|
|
`Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`,
|
|
{ error }
|
|
);
|
|
for (const olmId of batch) {
|
|
pendingOlmArchiveResets.add(olmId);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Flush everything — called by the interval timer and during shutdown.
|
|
*/
|
|
export async function flushPingsToDb(): Promise<void> {
|
|
await flushSitePingsToDb();
|
|
await flushClientPingsToDb();
|
|
}
|
|
|
|
// ── Retry / Error Helpers ──────────────────────────────────────────────
|
|
|
|
/**
|
|
* Simple retry wrapper with exponential backoff for transient errors
|
|
* (connection timeouts, unexpected disconnects).
|
|
*/
|
|
async function withRetry<T>(
|
|
operation: () => Promise<T>,
|
|
context: string
|
|
): Promise<T> {
|
|
let attempt = 0;
|
|
while (true) {
|
|
try {
|
|
return await operation();
|
|
} catch (error: any) {
|
|
if (isTransientError(error) && attempt < MAX_RETRIES) {
|
|
attempt++;
|
|
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
|
const jitter = Math.random() * baseDelay;
|
|
const delay = baseDelay + jitter;
|
|
logger.warn(
|
|
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
|
);
|
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
|
continue;
|
|
}
|
|
throw error;
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Detect transient connection errors that are safe to retry.
|
|
*/
|
|
function isTransientError(error: any): boolean {
|
|
if (!error) return false;
|
|
|
|
const message = (error.message || "").toLowerCase();
|
|
const causeMessage = (error.cause?.message || "").toLowerCase();
|
|
const code = error.code || "";
|
|
|
|
// Connection timeout / terminated
|
|
if (
|
|
message.includes("connection timeout") ||
|
|
message.includes("connection terminated") ||
|
|
message.includes("timeout exceeded when trying to connect") ||
|
|
causeMessage.includes("connection terminated unexpectedly") ||
|
|
causeMessage.includes("connection timeout")
|
|
) {
|
|
return true;
|
|
}
|
|
|
|
// PostgreSQL deadlock
|
|
if (code === "40P01" || message.includes("deadlock")) {
|
|
return true;
|
|
}
|
|
|
|
// ECONNRESET, ECONNREFUSED, EPIPE
|
|
if (
|
|
code === "ECONNRESET" ||
|
|
code === "ECONNREFUSED" ||
|
|
code === "EPIPE" ||
|
|
code === "ETIMEDOUT"
|
|
) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
// ── Lifecycle ──────────────────────────────────────────────────────────
|
|
|
|
/**
|
|
* Start the background flush timer. Call this once at server startup.
|
|
*/
|
|
export function startPingAccumulator(): void {
|
|
if (flushTimer) {
|
|
return; // Already running
|
|
}
|
|
|
|
flushTimer = setInterval(async () => {
|
|
try {
|
|
await flushPingsToDb();
|
|
} catch (error) {
|
|
logger.error("Unhandled error in ping accumulator flush", {
|
|
error
|
|
});
|
|
}
|
|
}, FLUSH_INTERVAL_MS);
|
|
|
|
// Don't prevent the process from exiting
|
|
flushTimer.unref();
|
|
|
|
logger.info(
|
|
`Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)`
|
|
);
|
|
}
|
|
|
|
/**
|
|
* Stop the background flush timer and perform a final flush.
|
|
* Call this during graceful shutdown.
|
|
*/
|
|
export async function stopPingAccumulator(): Promise<void> {
|
|
if (flushTimer) {
|
|
clearInterval(flushTimer);
|
|
flushTimer = null;
|
|
}
|
|
|
|
// Final flush to persist any remaining pings
|
|
try {
|
|
await flushPingsToDb();
|
|
} catch (error) {
|
|
logger.error("Error during final ping accumulator flush", { error });
|
|
}
|
|
|
|
logger.info("Ping accumulator stopped");
|
|
}
|
|
|
|
/**
|
|
* Get the number of pending (unflushed) pings. Useful for monitoring.
|
|
*/
|
|
export function getPendingPingCount(): number {
|
|
return pendingSitePings.size + pendingClientPings.size;
|
|
} |