mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-26 12:36:41 +00:00
Compare commits
6 Commits
1.16.2-s.1
...
batch-band
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1f01108b62 | ||
|
|
62c63ddcaa | ||
|
|
dfd604c781 | ||
|
|
c96c5e8ae8 | ||
|
|
6f71e9f0f2 | ||
|
|
d17ec6dc1f |
@@ -1,8 +1,10 @@
|
||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
||||
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
||||
|
||||
async function cleanup() {
|
||||
await stopPingAccumulator();
|
||||
await flushBandwidthToDb();
|
||||
await flushSiteBandwidthToDb();
|
||||
await wsCleanup();
|
||||
|
||||
22
server/lib/tokenCache.ts
Normal file
22
server/lib/tokenCache.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Returns a cached plaintext token from Redis if one exists and decrypts
|
||||
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
||||
* encrypted value in Redis with the given TTL, and returns it.
|
||||
*
|
||||
* Failures at the Redis layer are non-fatal – the function always falls
|
||||
* through to session creation so the caller is never blocked by a Redis outage.
|
||||
*
|
||||
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
||||
* @param secret Server secret used for AES encryption/decryption
|
||||
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
||||
* @param createSession Factory that mints a new session and returns its raw token
|
||||
*/
|
||||
export async function getOrCreateCachedToken(
|
||||
cacheKey: string,
|
||||
secret: string,
|
||||
ttlSeconds: number,
|
||||
createSession: () => Promise<string>
|
||||
): Promise<string> {
|
||||
const token = await createSession();
|
||||
return token;
|
||||
}
|
||||
@@ -15,8 +15,10 @@ import { rateLimitService } from "#private/lib/rateLimit";
|
||||
import { cleanup as wsCleanup } from "#private/routers/ws";
|
||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
||||
|
||||
async function cleanup() {
|
||||
await stopPingAccumulator();
|
||||
await flushBandwidthToDb();
|
||||
await flushSiteBandwidthToDb();
|
||||
await rateLimitService.cleanup();
|
||||
|
||||
@@ -1,3 +1,16 @@
|
||||
/*
|
||||
* This file is part of a proprietary work.
|
||||
*
|
||||
* Copyright (c) 2025 Fossorial, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This file is licensed under the Fossorial Commercial License.
|
||||
* You may not use this file except in compliance with the License.
|
||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||
*
|
||||
* This file is not licensed under the AGPLv3.
|
||||
*/
|
||||
|
||||
import NodeCache from "node-cache";
|
||||
import logger from "@server/logger";
|
||||
import { redisManager } from "@server/private/lib/redis";
|
||||
|
||||
77
server/private/lib/tokenCache.ts
Normal file
77
server/private/lib/tokenCache.ts
Normal file
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
* This file is part of a proprietary work.
|
||||
*
|
||||
* Copyright (c) 2025 Fossorial, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This file is licensed under the Fossorial Commercial License.
|
||||
* You may not use this file except in compliance with the License.
|
||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||
*
|
||||
* This file is not licensed under the AGPLv3.
|
||||
*/
|
||||
|
||||
import redisManager from "#private/lib/redis";
|
||||
import { encrypt, decrypt } from "@server/lib/crypto";
|
||||
import logger from "@server/logger";
|
||||
|
||||
/**
|
||||
* Returns a cached plaintext token from Redis if one exists and decrypts
|
||||
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
||||
* encrypted value in Redis with the given TTL, and returns it.
|
||||
*
|
||||
* Failures at the Redis layer are non-fatal – the function always falls
|
||||
* through to session creation so the caller is never blocked by a Redis outage.
|
||||
*
|
||||
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
||||
* @param secret Server secret used for AES encryption/decryption
|
||||
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
||||
* @param createSession Factory that mints a new session and returns its raw token
|
||||
*/
|
||||
export async function getOrCreateCachedToken(
|
||||
cacheKey: string,
|
||||
secret: string,
|
||||
ttlSeconds: number,
|
||||
createSession: () => Promise<string>
|
||||
): Promise<string> {
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
const cached = await redisManager.get(cacheKey);
|
||||
if (cached) {
|
||||
const token = decrypt(cached, secret);
|
||||
if (token) {
|
||||
logger.debug(`Token cache hit for key: ${cacheKey}`);
|
||||
return token;
|
||||
}
|
||||
// Decryption produced an empty string – treat as a miss
|
||||
logger.warn(
|
||||
`Token cache decryption returned empty string for key: ${cacheKey}, treating as miss`
|
||||
);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.warn(
|
||||
`Token cache read/decrypt failed for key ${cacheKey}, falling through to session creation:`,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const token = await createSession();
|
||||
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
const encrypted = encrypt(token, secret);
|
||||
await redisManager.set(cacheKey, encrypted, ttlSeconds);
|
||||
logger.debug(
|
||||
`Token cached in Redis for key: ${cacheKey} (TTL ${ttlSeconds}s)`
|
||||
);
|
||||
} catch (e) {
|
||||
logger.warn(
|
||||
`Token cache write failed for key ${cacheKey} (session was still created):`,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return token;
|
||||
}
|
||||
@@ -23,8 +23,10 @@ import { z } from "zod";
|
||||
import { fromError } from "zod-validation-error";
|
||||
import {
|
||||
createRemoteExitNodeSession,
|
||||
validateRemoteExitNodeSessionToken
|
||||
validateRemoteExitNodeSessionToken,
|
||||
EXPIRES
|
||||
} from "#private/auth/sessions/remoteExitNode";
|
||||
import { getOrCreateCachedToken } from "@server/private/lib/tokenCache";
|
||||
import { verifyPassword } from "@server/auth/password";
|
||||
import logger from "@server/logger";
|
||||
import config from "@server/lib/config";
|
||||
@@ -103,14 +105,23 @@ export async function getRemoteExitNodeToken(
|
||||
);
|
||||
}
|
||||
|
||||
const resToken = generateSessionToken();
|
||||
await createRemoteExitNodeSession(
|
||||
resToken,
|
||||
existingRemoteExitNode.remoteExitNodeId
|
||||
// Return a cached token if one exists to prevent thundering herd on
|
||||
// simultaneous restarts; falls back to creating a fresh session when
|
||||
// Redis is unavailable or the cache has expired.
|
||||
const resToken = await getOrCreateCachedToken(
|
||||
`remote_exit_node:token_cache:${existingRemoteExitNode.remoteExitNodeId}`,
|
||||
config.getRawConfig().server.secret!,
|
||||
Math.floor(EXPIRES / 1000),
|
||||
async () => {
|
||||
const token = generateSessionToken();
|
||||
await createRemoteExitNodeSession(
|
||||
token,
|
||||
existingRemoteExitNode.remoteExitNodeId
|
||||
);
|
||||
return token;
|
||||
}
|
||||
);
|
||||
|
||||
// logger.debug(`Created RemoteExitNode token response: ${JSON.stringify(resToken)}`);
|
||||
|
||||
return response<{ token: string }>(res, {
|
||||
data: {
|
||||
token: resToken
|
||||
|
||||
@@ -19,17 +19,14 @@ import { Socket } from "net";
|
||||
import {
|
||||
Newt,
|
||||
newts,
|
||||
NewtSession,
|
||||
olms,
|
||||
Olm,
|
||||
OlmSession,
|
||||
olms,
|
||||
RemoteExitNode,
|
||||
RemoteExitNodeSession,
|
||||
remoteExitNodes,
|
||||
sites
|
||||
} from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { db } from "@server/db";
|
||||
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||
import logger from "@server/logger";
|
||||
@@ -197,11 +194,7 @@ const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
|
||||
// Config version tracking map (local to this node, resets on server restart)
|
||||
const clientConfigVersions: Map<string, number> = new Map();
|
||||
|
||||
// Tracks the last Unix timestamp (seconds) at which a ping was flushed to the
|
||||
// DB for a given siteId. Resets on server restart which is fine – the first
|
||||
// ping after startup will always write, re-establishing the online state.
|
||||
const lastPingDbWrite: Map<number, number> = new Map();
|
||||
const PING_DB_WRITE_INTERVAL = 45; // seconds
|
||||
|
||||
|
||||
// Recovery tracking
|
||||
let isRedisRecoveryInProgress = false;
|
||||
@@ -853,32 +846,16 @@ const setupConnection = async (
|
||||
);
|
||||
});
|
||||
|
||||
// Handle WebSocket protocol-level pings from older newt clients that do
|
||||
// not send application-level "newt/ping" messages. Update the site's
|
||||
// online state and lastPing timestamp so the offline checker treats them
|
||||
// the same as modern newt clients.
|
||||
if (clientType === "newt") {
|
||||
const newtClient = client as Newt;
|
||||
ws.on("ping", async () => {
|
||||
ws.on("ping", () => {
|
||||
if (!newtClient.siteId) return;
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const lastWrite = lastPingDbWrite.get(newtClient.siteId) ?? 0;
|
||||
if (now - lastWrite < PING_DB_WRITE_INTERVAL) return;
|
||||
lastPingDbWrite.set(newtClient.siteId, now);
|
||||
try {
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: now
|
||||
})
|
||||
.where(eq(sites.siteId, newtClient.siteId));
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Error updating newt site online state on WS ping",
|
||||
{ error }
|
||||
);
|
||||
}
|
||||
// Record the ping in the accumulator instead of writing to the
|
||||
// database on every WS ping frame. The accumulator flushes all
|
||||
// pending pings in a single batched UPDATE every ~10s, which
|
||||
// prevents connection pool exhaustion under load (especially
|
||||
// with cross-region latency to the database).
|
||||
recordPing(newtClient.siteId);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { Request, Response, NextFunction } from "express";
|
||||
import { eq, sql } from "drizzle-orm";
|
||||
import { sites } from "@server/db";
|
||||
import { sql } from "drizzle-orm";
|
||||
import { db } from "@server/db";
|
||||
import logger from "@server/logger";
|
||||
import createHttpError from "http-errors";
|
||||
@@ -31,7 +30,10 @@ const MAX_RETRIES = 3;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
// How often to flush accumulated bandwidth data to the database
|
||||
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
|
||||
const FLUSH_INTERVAL_MS = 300_000; // 300 seconds
|
||||
|
||||
// Maximum number of sites to include in a single batch UPDATE statement
|
||||
const BATCH_CHUNK_SIZE = 250;
|
||||
|
||||
// In-memory accumulator: publicKey -> AccumulatorEntry
|
||||
let accumulator = new Map<string, AccumulatorEntry>();
|
||||
@@ -75,13 +77,33 @@ async function withDeadlockRetry<T>(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a raw SQL query that returns rows, in a way that works across both
|
||||
* the PostgreSQL driver (which exposes `execute`) and the SQLite driver (which
|
||||
* exposes `all`). Drizzle's typed query builder doesn't support bulk
|
||||
* UPDATE … FROM (VALUES …) natively, so we drop to raw SQL here.
|
||||
*/
|
||||
async function dbQueryRows<T extends Record<string, unknown>>(
|
||||
query: Parameters<(typeof sql)["join"]>[0][number]
|
||||
): Promise<T[]> {
|
||||
const anyDb = db as any;
|
||||
if (typeof anyDb.execute === "function") {
|
||||
// PostgreSQL (node-postgres via Drizzle) — returns { rows: [...] } or an array
|
||||
const result = await anyDb.execute(query);
|
||||
return (Array.isArray(result) ? result : (result.rows ?? [])) as T[];
|
||||
}
|
||||
// SQLite (better-sqlite3 via Drizzle) — returns an array directly
|
||||
return (await anyDb.all(query)) as T[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all accumulated site bandwidth data to the database.
|
||||
*
|
||||
* Swaps out the accumulator before writing so that any bandwidth messages
|
||||
* received during the flush are captured in the new accumulator rather than
|
||||
* being lost or causing contention. Entries that fail to write are re-queued
|
||||
* back into the accumulator so they will be retried on the next flush.
|
||||
* being lost or causing contention. Sites are updated in chunks via a single
|
||||
* batch UPDATE per chunk. Failed chunks are discarded — exact per-flush
|
||||
* accuracy is not critical and re-queuing is not worth the added complexity.
|
||||
*
|
||||
* This function is exported so that the application's graceful-shutdown
|
||||
* cleanup handler can call it before the process exits.
|
||||
@@ -108,76 +130,76 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
`Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database`
|
||||
);
|
||||
|
||||
// Aggregate billing usage by org, collected during the DB update loop.
|
||||
// Build a lookup so post-processing can reach each entry by publicKey.
|
||||
const snapshotMap = new Map(sortedEntries);
|
||||
|
||||
// Aggregate billing usage by org across all chunks.
|
||||
const orgUsageMap = new Map<string, number>();
|
||||
|
||||
for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) {
|
||||
// Process in chunks so individual queries stay at a reasonable size.
|
||||
for (let i = 0; i < sortedEntries.length; i += BATCH_CHUNK_SIZE) {
|
||||
const chunk = sortedEntries.slice(i, i + BATCH_CHUNK_SIZE);
|
||||
const chunkEnd = i + chunk.length - 1;
|
||||
|
||||
// Build a parameterised VALUES list: (pubKey, bytesIn, bytesOut), ...
|
||||
// Both PostgreSQL and SQLite (≥ 3.33.0, which better-sqlite3 bundles)
|
||||
// support UPDATE … FROM (VALUES …), letting us update the whole chunk
|
||||
// in a single query instead of N individual round-trips.
|
||||
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
|
||||
sql`(${publicKey}, ${bytesIn}, ${bytesOut})`
|
||||
);
|
||||
const valuesClause = sql.join(valuesList, sql`, `);
|
||||
|
||||
let rows: { orgId: string; pubKey: string }[] = [];
|
||||
|
||||
try {
|
||||
const updatedSite = await withDeadlockRetry(async () => {
|
||||
const [result] = await db
|
||||
.update(sites)
|
||||
.set({
|
||||
megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`,
|
||||
megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`,
|
||||
lastBandwidthUpdate: currentTime,
|
||||
})
|
||||
.where(eq(sites.pubKey, publicKey))
|
||||
.returning({
|
||||
orgId: sites.orgId,
|
||||
siteId: sites.siteId
|
||||
});
|
||||
return result;
|
||||
}, `flush bandwidth for site ${publicKey}`);
|
||||
|
||||
if (updatedSite) {
|
||||
if (exitNodeId) {
|
||||
const notAllowed = await checkExitNodeOrg(
|
||||
exitNodeId,
|
||||
updatedSite.orgId
|
||||
);
|
||||
if (notAllowed) {
|
||||
logger.warn(
|
||||
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
|
||||
);
|
||||
// Skip usage tracking for this site but continue
|
||||
// processing the rest.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (calcUsage) {
|
||||
const totalBandwidth = bytesIn + bytesOut;
|
||||
const current = orgUsageMap.get(updatedSite.orgId) ?? 0;
|
||||
orgUsageMap.set(updatedSite.orgId, current + totalBandwidth);
|
||||
}
|
||||
}
|
||||
rows = await withDeadlockRetry(async () => {
|
||||
return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
|
||||
UPDATE sites
|
||||
SET
|
||||
"bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in,
|
||||
"bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out,
|
||||
"lastBandwidthUpdate" = ${currentTime}
|
||||
FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out)
|
||||
WHERE sites."pubKey" = v.pub_key
|
||||
RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey"
|
||||
`);
|
||||
}, `flush bandwidth chunk [${i}–${chunkEnd}]`);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush bandwidth for site ${publicKey}:`,
|
||||
`Failed to flush bandwidth chunk [${i}–${chunkEnd}], discarding ${chunk.length} site(s):`,
|
||||
error
|
||||
);
|
||||
// Discard the chunk — exact per-flush accuracy is not critical.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Re-queue the failed entry so it is retried on the next flush
|
||||
// rather than silently dropped.
|
||||
const existing = accumulator.get(publicKey);
|
||||
if (existing) {
|
||||
existing.bytesIn += bytesIn;
|
||||
existing.bytesOut += bytesOut;
|
||||
} else {
|
||||
accumulator.set(publicKey, {
|
||||
bytesIn,
|
||||
bytesOut,
|
||||
exitNodeId,
|
||||
calcUsage
|
||||
});
|
||||
// Collect billing usage from the returned rows.
|
||||
for (const { orgId, pubKey } of rows) {
|
||||
const entry = snapshotMap.get(pubKey);
|
||||
if (!entry) continue;
|
||||
|
||||
const { bytesIn, bytesOut, exitNodeId, calcUsage } = entry;
|
||||
|
||||
if (exitNodeId) {
|
||||
const notAllowed = await checkExitNodeOrg(exitNodeId, orgId);
|
||||
if (notAllowed) {
|
||||
logger.warn(
|
||||
`Exit node ${exitNodeId} is not allowed for org ${orgId}`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (calcUsage) {
|
||||
const current = orgUsageMap.get(orgId) ?? 0;
|
||||
orgUsageMap.set(orgId, current + bytesIn + bytesOut);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process billing usage updates outside the site-update loop to keep
|
||||
// lock scope small and concerns separated.
|
||||
// Process billing usage updates after all chunks are written.
|
||||
if (orgUsageMap.size > 0) {
|
||||
// Sort org IDs for consistent lock ordering.
|
||||
const sortedOrgIds = [...orgUsageMap.keys()].sort();
|
||||
|
||||
for (const orgId of sortedOrgIds) {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { generateSessionToken } from "@server/auth/sessions/app";
|
||||
import { db } from "@server/db";
|
||||
import { db, newtSessions } from "@server/db";
|
||||
import { newts } from "@server/db";
|
||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
||||
import { EXPIRES } from "@server/auth/sessions/newt";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
import response from "@server/lib/response";
|
||||
import { eq } from "drizzle-orm";
|
||||
@@ -92,8 +94,19 @@ export async function getNewtToken(
|
||||
);
|
||||
}
|
||||
|
||||
const resToken = generateSessionToken();
|
||||
await createNewtSession(resToken, existingNewt.newtId);
|
||||
// Return a cached token if one exists to prevent thundering herd on
|
||||
// simultaneous restarts; falls back to creating a fresh session when
|
||||
// Redis is unavailable or the cache has expired.
|
||||
const resToken = await getOrCreateCachedToken(
|
||||
`newt:token_cache:${existingNewt.newtId}`,
|
||||
config.getRawConfig().server.secret!,
|
||||
Math.floor(EXPIRES / 1000),
|
||||
async () => {
|
||||
const token = generateSessionToken();
|
||||
await createNewtSession(token, existingNewt.newtId);
|
||||
return token;
|
||||
}
|
||||
);
|
||||
|
||||
return response<{ token: string; serverVersion: string }>(res, {
|
||||
data: {
|
||||
|
||||
@@ -5,6 +5,7 @@ import { Newt } from "@server/db";
|
||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { sendNewtSyncMessage } from "./sync";
|
||||
import { recordPing } from "./pingAccumulator";
|
||||
|
||||
// Track if the offline checker interval is running
|
||||
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||
@@ -114,18 +115,12 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Mark the site as online and record the ping timestamp.
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: Math.floor(Date.now() / 1000)
|
||||
})
|
||||
.where(eq(sites.siteId, newt.siteId));
|
||||
} catch (error) {
|
||||
logger.error("Error updating online state on newt ping", { error });
|
||||
}
|
||||
// Record the ping in memory; it will be flushed to the database
|
||||
// periodically by the ping accumulator (every ~10s) in a single
|
||||
// batched UPDATE instead of one query per ping. This prevents
|
||||
// connection pool exhaustion under load, especially with
|
||||
// cross-region latency to the database.
|
||||
recordPing(newt.siteId);
|
||||
|
||||
// Check config version and sync if stale.
|
||||
const configVersion = await getClientConfigVersion(newt.newtId);
|
||||
|
||||
382
server/routers/newt/pingAccumulator.ts
Normal file
382
server/routers/newt/pingAccumulator.ts
Normal file
@@ -0,0 +1,382 @@
|
||||
import { db } from "@server/db";
|
||||
import { sites, clients, olms } from "@server/db";
|
||||
import { eq, inArray } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
|
||||
/**
|
||||
* Ping Accumulator
|
||||
*
|
||||
* Instead of writing to the database on every single newt/olm ping (which
|
||||
* causes pool exhaustion under load, especially with cross-region latency),
|
||||
* we accumulate pings in memory and flush them to the database periodically
|
||||
* in a single batch.
|
||||
*
|
||||
* This is the same pattern used for bandwidth flushing in
|
||||
* receiveBandwidth.ts and handleReceiveBandwidthMessage.ts.
|
||||
*
|
||||
* Supports two kinds of pings:
|
||||
* - **Site pings** (from newts): update `sites.online` and `sites.lastPing`
|
||||
* - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`,
|
||||
* `clients.archived`, and optionally reset `olms.archived`
|
||||
*/
|
||||
|
||||
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
||||
const MAX_RETRIES = 2;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
// ── Site (newt) pings ──────────────────────────────────────────────────
|
||||
// Map of siteId -> latest ping timestamp (unix seconds)
|
||||
const pendingSitePings: Map<number, number> = new Map();
|
||||
|
||||
// ── Client (OLM) pings ────────────────────────────────────────────────
|
||||
// Map of clientId -> latest ping timestamp (unix seconds)
|
||||
const pendingClientPings: Map<number, number> = new Map();
|
||||
// Set of olmIds whose `archived` flag should be reset to false
|
||||
const pendingOlmArchiveResets: Set<string> = new Set();
|
||||
|
||||
let flushTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
// ── Public API ─────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Record a ping for a newt site. This does NOT write to the database
|
||||
* immediately. Instead it stores the latest ping timestamp in memory,
|
||||
* to be flushed periodically by the background timer.
|
||||
*/
|
||||
export function recordSitePing(siteId: number): void {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
pendingSitePings.set(siteId, now);
|
||||
}
|
||||
|
||||
/** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */
|
||||
export const recordPing = recordSitePing;
|
||||
|
||||
/**
|
||||
* Record a ping for an OLM client. Batches the `clients` table update
|
||||
* (`online`, `lastPing`, `archived`) and, when `olmArchived` is true,
|
||||
* also queues an `olms` table update to clear the archived flag.
|
||||
*/
|
||||
export function recordClientPing(
|
||||
clientId: number,
|
||||
olmId: string,
|
||||
olmArchived: boolean
|
||||
): void {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
pendingClientPings.set(clientId, now);
|
||||
if (olmArchived) {
|
||||
pendingOlmArchiveResets.add(olmId);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Flush Logic ────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Flush all accumulated site pings to the database.
|
||||
*/
|
||||
async function flushSitePingsToDb(): Promise<void> {
|
||||
if (pendingSitePings.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Snapshot and clear so new pings arriving during the flush go into a
|
||||
// fresh map for the next cycle.
|
||||
const pingsToFlush = new Map(pendingSitePings);
|
||||
pendingSitePings.clear();
|
||||
|
||||
// Sort by siteId for consistent lock ordering (prevents deadlocks)
|
||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
||||
([a], [b]) => a - b
|
||||
);
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
// Group by timestamp for efficient bulk updates
|
||||
const byTimestamp = new Map<number, number[]>();
|
||||
for (const [siteId, timestamp] of batch) {
|
||||
const group = byTimestamp.get(timestamp) || [];
|
||||
group.push(siteId);
|
||||
byTimestamp.set(timestamp, group);
|
||||
}
|
||||
|
||||
if (byTimestamp.size === 1) {
|
||||
const [timestamp, siteIds] = Array.from(
|
||||
byTimestamp.entries()
|
||||
)[0];
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: timestamp
|
||||
})
|
||||
.where(inArray(sites.siteId, siteIds));
|
||||
} else {
|
||||
await db.transaction(async (tx) => {
|
||||
for (const [timestamp, siteIds] of byTimestamp) {
|
||||
await tx
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: timestamp
|
||||
})
|
||||
.where(inArray(sites.siteId, siteIds));
|
||||
}
|
||||
});
|
||||
}
|
||||
}, "flushSitePingsToDb");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
||||
{ error }
|
||||
);
|
||||
for (const [siteId, timestamp] of batch) {
|
||||
const existing = pendingSitePings.get(siteId);
|
||||
if (!existing || existing < timestamp) {
|
||||
pendingSitePings.set(siteId, timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all accumulated client (OLM) pings to the database.
|
||||
*/
|
||||
async function flushClientPingsToDb(): Promise<void> {
|
||||
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Snapshot and clear
|
||||
const pingsToFlush = new Map(pendingClientPings);
|
||||
pendingClientPings.clear();
|
||||
|
||||
const olmResetsToFlush = new Set(pendingOlmArchiveResets);
|
||||
pendingOlmArchiveResets.clear();
|
||||
|
||||
// ── Flush client pings ─────────────────────────────────────────────
|
||||
if (pingsToFlush.size > 0) {
|
||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
||||
([a], [b]) => a - b
|
||||
);
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
const byTimestamp = new Map<number, number[]>();
|
||||
for (const [clientId, timestamp] of batch) {
|
||||
const group = byTimestamp.get(timestamp) || [];
|
||||
group.push(clientId);
|
||||
byTimestamp.set(timestamp, group);
|
||||
}
|
||||
|
||||
if (byTimestamp.size === 1) {
|
||||
const [timestamp, clientIds] = Array.from(
|
||||
byTimestamp.entries()
|
||||
)[0];
|
||||
await db
|
||||
.update(clients)
|
||||
.set({
|
||||
lastPing: timestamp,
|
||||
online: true,
|
||||
archived: false
|
||||
})
|
||||
.where(inArray(clients.clientId, clientIds));
|
||||
} else {
|
||||
await db.transaction(async (tx) => {
|
||||
for (const [timestamp, clientIds] of byTimestamp) {
|
||||
await tx
|
||||
.update(clients)
|
||||
.set({
|
||||
lastPing: timestamp,
|
||||
online: true,
|
||||
archived: false
|
||||
})
|
||||
.where(
|
||||
inArray(clients.clientId, clientIds)
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}, "flushClientPingsToDb");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`,
|
||||
{ error }
|
||||
);
|
||||
for (const [clientId, timestamp] of batch) {
|
||||
const existing = pendingClientPings.get(clientId);
|
||||
if (!existing || existing < timestamp) {
|
||||
pendingClientPings.set(clientId, timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Flush OLM archive resets ───────────────────────────────────────
|
||||
if (olmResetsToFlush.size > 0) {
|
||||
const olmIds = Array.from(olmResetsToFlush).sort();
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
for (let i = 0; i < olmIds.length; i += BATCH_SIZE) {
|
||||
const batch = olmIds.slice(i, i + BATCH_SIZE);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
await db
|
||||
.update(olms)
|
||||
.set({ archived: false })
|
||||
.where(inArray(olms.olmId, batch));
|
||||
}, "flushOlmArchiveResets");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`,
|
||||
{ error }
|
||||
);
|
||||
for (const olmId of batch) {
|
||||
pendingOlmArchiveResets.add(olmId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush everything — called by the interval timer and during shutdown.
|
||||
*/
|
||||
export async function flushPingsToDb(): Promise<void> {
|
||||
await flushSitePingsToDb();
|
||||
await flushClientPingsToDb();
|
||||
}
|
||||
|
||||
// ── Retry / Error Helpers ──────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Simple retry wrapper with exponential backoff for transient errors
|
||||
* (connection timeouts, unexpected disconnects).
|
||||
*/
|
||||
async function withRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
context: string
|
||||
): Promise<T> {
|
||||
let attempt = 0;
|
||||
while (true) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: any) {
|
||||
if (isTransientError(error) && attempt < MAX_RETRIES) {
|
||||
attempt++;
|
||||
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
||||
const jitter = Math.random() * baseDelay;
|
||||
const delay = baseDelay + jitter;
|
||||
logger.warn(
|
||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
continue;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect transient connection errors that are safe to retry.
|
||||
*/
|
||||
function isTransientError(error: any): boolean {
|
||||
if (!error) return false;
|
||||
|
||||
const message = (error.message || "").toLowerCase();
|
||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||
const code = error.code || "";
|
||||
|
||||
// Connection timeout / terminated
|
||||
if (
|
||||
message.includes("connection timeout") ||
|
||||
message.includes("connection terminated") ||
|
||||
message.includes("timeout exceeded when trying to connect") ||
|
||||
causeMessage.includes("connection terminated unexpectedly") ||
|
||||
causeMessage.includes("connection timeout")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL deadlock
|
||||
if (code === "40P01" || message.includes("deadlock")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// ECONNRESET, ECONNREFUSED, EPIPE
|
||||
if (
|
||||
code === "ECONNRESET" ||
|
||||
code === "ECONNREFUSED" ||
|
||||
code === "EPIPE" ||
|
||||
code === "ETIMEDOUT"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// ── Lifecycle ──────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Start the background flush timer. Call this once at server startup.
|
||||
*/
|
||||
export function startPingAccumulator(): void {
|
||||
if (flushTimer) {
|
||||
return; // Already running
|
||||
}
|
||||
|
||||
flushTimer = setInterval(async () => {
|
||||
try {
|
||||
await flushPingsToDb();
|
||||
} catch (error) {
|
||||
logger.error("Unhandled error in ping accumulator flush", {
|
||||
error
|
||||
});
|
||||
}
|
||||
}, FLUSH_INTERVAL_MS);
|
||||
|
||||
// Don't prevent the process from exiting
|
||||
flushTimer.unref();
|
||||
|
||||
logger.info(
|
||||
`Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the background flush timer and perform a final flush.
|
||||
* Call this during graceful shutdown.
|
||||
*/
|
||||
export async function stopPingAccumulator(): Promise<void> {
|
||||
if (flushTimer) {
|
||||
clearInterval(flushTimer);
|
||||
flushTimer = null;
|
||||
}
|
||||
|
||||
// Final flush to persist any remaining pings
|
||||
try {
|
||||
await flushPingsToDb();
|
||||
} catch (error) {
|
||||
logger.error("Error during final ping accumulator flush", { error });
|
||||
}
|
||||
|
||||
logger.info("Ping accumulator stopped");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of pending (unflushed) pings. Useful for monitoring.
|
||||
*/
|
||||
export function getPendingPingCount(): number {
|
||||
return pendingSitePings.size + pendingClientPings.size;
|
||||
}
|
||||
@@ -8,7 +8,7 @@ import {
|
||||
ExitNode,
|
||||
exitNodes,
|
||||
sites,
|
||||
clientSitesAssociationsCache
|
||||
clientSitesAssociationsCache,
|
||||
} from "@server/db";
|
||||
import { olms } from "@server/db";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
@@ -20,8 +20,10 @@ import { z } from "zod";
|
||||
import { fromError } from "zod-validation-error";
|
||||
import {
|
||||
createOlmSession,
|
||||
validateOlmSessionToken
|
||||
validateOlmSessionToken,
|
||||
EXPIRES
|
||||
} from "@server/auth/sessions/olm";
|
||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
||||
import { verifyPassword } from "@server/auth/password";
|
||||
import logger from "@server/logger";
|
||||
import config from "@server/lib/config";
|
||||
@@ -132,8 +134,19 @@ export async function getOlmToken(
|
||||
|
||||
logger.debug("Creating new olm session token");
|
||||
|
||||
const resToken = generateSessionToken();
|
||||
await createOlmSession(resToken, existingOlm.olmId);
|
||||
// Return a cached token if one exists to prevent thundering herd on
|
||||
// simultaneous restarts; falls back to creating a fresh session when
|
||||
// Redis is unavailable or the cache has expired.
|
||||
const resToken = await getOrCreateCachedToken(
|
||||
`olm:token_cache:${existingOlm.olmId}`,
|
||||
config.getRawConfig().server.secret!,
|
||||
Math.floor(EXPIRES / 1000),
|
||||
async () => {
|
||||
const token = generateSessionToken();
|
||||
await createOlmSession(token, existingOlm.olmId);
|
||||
return token;
|
||||
}
|
||||
);
|
||||
|
||||
let clientIdToUse;
|
||||
if (orgId) {
|
||||
|
||||
@@ -3,6 +3,7 @@ import { db } from "@server/db";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { clients, olms, Olm } from "@server/db";
|
||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||
import { recordClientPing } from "@server/routers/newt/pingAccumulator";
|
||||
import logger from "@server/logger";
|
||||
import { validateSessionToken } from "@server/auth/sessions/app";
|
||||
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
|
||||
@@ -201,22 +202,12 @@ export const handleOlmPingMessage: MessageHandler = async (context) => {
|
||||
await sendOlmSyncMessage(olm, client);
|
||||
}
|
||||
|
||||
// Update the client's last ping timestamp
|
||||
await db
|
||||
.update(clients)
|
||||
.set({
|
||||
lastPing: Math.floor(Date.now() / 1000),
|
||||
online: true,
|
||||
archived: false
|
||||
})
|
||||
.where(eq(clients.clientId, olm.clientId));
|
||||
|
||||
if (olm.archived) {
|
||||
await db
|
||||
.update(olms)
|
||||
.set({ archived: false })
|
||||
.where(eq(olms.olmId, olm.olmId));
|
||||
}
|
||||
// Record the ping in memory; it will be flushed to the database
|
||||
// periodically by the ping accumulator (every ~10s) in a single
|
||||
// batched UPDATE instead of one query per ping. This prevents
|
||||
// connection pool exhaustion under load, especially with
|
||||
// cross-region latency to the database.
|
||||
recordClientPing(olm.clientId, olm.olmId, !!olm.archived);
|
||||
} catch (error) {
|
||||
logger.error("Error handling ping message", { error });
|
||||
}
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
startNewtOfflineChecker,
|
||||
handleNewtDisconnectingMessage
|
||||
} from "../newt";
|
||||
import { startPingAccumulator } from "../newt/pingAccumulator";
|
||||
import {
|
||||
handleOlmRegisterMessage,
|
||||
handleOlmRelayMessage,
|
||||
@@ -46,6 +47,10 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
||||
"ws/round-trip/complete": handleRoundTripMessage
|
||||
};
|
||||
|
||||
// Start the ping accumulator for all builds — it batches per-site online/lastPing
|
||||
// updates into periodic bulk writes, preventing connection pool exhaustion.
|
||||
startPingAccumulator();
|
||||
|
||||
if (build != "saas") {
|
||||
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
||||
startNewtOfflineChecker(); // this is to handle the offline check for newts
|
||||
|
||||
@@ -6,6 +6,7 @@ import { Socket } from "net";
|
||||
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { db } from "@server/db";
|
||||
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||
import { messageHandlers } from "./messageHandlers";
|
||||
@@ -386,22 +387,14 @@ const setupConnection = async (
|
||||
// the same as modern newt clients.
|
||||
if (clientType === "newt") {
|
||||
const newtClient = client as Newt;
|
||||
ws.on("ping", async () => {
|
||||
ws.on("ping", () => {
|
||||
if (!newtClient.siteId) return;
|
||||
try {
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: Math.floor(Date.now() / 1000)
|
||||
})
|
||||
.where(eq(sites.siteId, newtClient.siteId));
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Error updating newt site online state on WS ping",
|
||||
{ error }
|
||||
);
|
||||
}
|
||||
// Record the ping in the accumulator instead of writing to the
|
||||
// database on every WS ping frame. The accumulator flushes all
|
||||
// pending pings in a single batched UPDATE every ~10s, which
|
||||
// prevents connection pool exhaustion under load (especially
|
||||
// with cross-region latency to the database).
|
||||
recordPing(newtClient.siteId);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user