From 395cab795c3d23f182651cd5d3bbe1316cb2ff5b Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 25 Mar 2026 20:33:58 -0700 Subject: [PATCH] Batch set bandwidth --- server/routers/gerbil/receiveBandwidth.ts | 146 +++++++++++++--------- 1 file changed, 84 insertions(+), 62 deletions(-) diff --git a/server/routers/gerbil/receiveBandwidth.ts b/server/routers/gerbil/receiveBandwidth.ts index b73ce986d..042c844aa 100644 --- a/server/routers/gerbil/receiveBandwidth.ts +++ b/server/routers/gerbil/receiveBandwidth.ts @@ -1,6 +1,5 @@ import { Request, Response, NextFunction } from "express"; -import { eq, sql } from "drizzle-orm"; -import { sites } from "@server/db"; +import { sql } from "drizzle-orm"; import { db } from "@server/db"; import logger from "@server/logger"; import createHttpError from "http-errors"; @@ -31,7 +30,10 @@ const MAX_RETRIES = 3; const BASE_DELAY_MS = 50; // How often to flush accumulated bandwidth data to the database -const FLUSH_INTERVAL_MS = 30_000; // 30 seconds +const FLUSH_INTERVAL_MS = 300_000; // 300 seconds + +// Maximum number of sites to include in a single batch UPDATE statement +const BATCH_CHUNK_SIZE = 250; // In-memory accumulator: publicKey -> AccumulatorEntry let accumulator = new Map(); @@ -75,13 +77,33 @@ async function withDeadlockRetry( } } +/** + * Execute a raw SQL query that returns rows, in a way that works across both + * the PostgreSQL driver (which exposes `execute`) and the SQLite driver (which + * exposes `all`). Drizzle's typed query builder doesn't support bulk + * UPDATE … FROM (VALUES …) natively, so we drop to raw SQL here. + */ +async function dbQueryRows>( + query: Parameters<(typeof sql)["join"]>[0][number] +): Promise { + const anyDb = db as any; + if (typeof anyDb.execute === "function") { + // PostgreSQL (node-postgres via Drizzle) — returns { rows: [...] } or an array + const result = await anyDb.execute(query); + return (Array.isArray(result) ? result : (result.rows ?? [])) as T[]; + } + // SQLite (better-sqlite3 via Drizzle) — returns an array directly + return (await anyDb.all(query)) as T[]; +} + /** * Flush all accumulated site bandwidth data to the database. * * Swaps out the accumulator before writing so that any bandwidth messages * received during the flush are captured in the new accumulator rather than - * being lost or causing contention. Entries that fail to write are re-queued - * back into the accumulator so they will be retried on the next flush. + * being lost or causing contention. Sites are updated in chunks via a single + * batch UPDATE per chunk. Failed chunks are discarded — exact per-flush + * accuracy is not critical and re-queuing is not worth the added complexity. * * This function is exported so that the application's graceful-shutdown * cleanup handler can call it before the process exits. @@ -108,76 +130,76 @@ export async function flushSiteBandwidthToDb(): Promise { `Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database` ); - // Aggregate billing usage by org, collected during the DB update loop. + // Build a lookup so post-processing can reach each entry by publicKey. + const snapshotMap = new Map(sortedEntries); + + // Aggregate billing usage by org across all chunks. const orgUsageMap = new Map(); - for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) { + // Process in chunks so individual queries stay at a reasonable size. + for (let i = 0; i < sortedEntries.length; i += BATCH_CHUNK_SIZE) { + const chunk = sortedEntries.slice(i, i + BATCH_CHUNK_SIZE); + const chunkEnd = i + chunk.length - 1; + + // Build a parameterised VALUES list: (pubKey, bytesIn, bytesOut), ... + // Both PostgreSQL and SQLite (≥ 3.33.0, which better-sqlite3 bundles) + // support UPDATE … FROM (VALUES …), letting us update the whole chunk + // in a single query instead of N individual round-trips. + const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) => + sql`(${publicKey}, ${bytesIn}, ${bytesOut})` + ); + const valuesClause = sql.join(valuesList, sql`, `); + + let rows: { orgId: string; pubKey: string }[] = []; + try { - const updatedSite = await withDeadlockRetry(async () => { - const [result] = await db - .update(sites) - .set({ - megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`, - megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`, - lastBandwidthUpdate: currentTime, - }) - .where(eq(sites.pubKey, publicKey)) - .returning({ - orgId: sites.orgId, - siteId: sites.siteId - }); - return result; - }, `flush bandwidth for site ${publicKey}`); - - if (updatedSite) { - if (exitNodeId) { - const notAllowed = await checkExitNodeOrg( - exitNodeId, - updatedSite.orgId - ); - if (notAllowed) { - logger.warn( - `Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}` - ); - // Skip usage tracking for this site but continue - // processing the rest. - continue; - } - } - - if (calcUsage) { - const totalBandwidth = bytesIn + bytesOut; - const current = orgUsageMap.get(updatedSite.orgId) ?? 0; - orgUsageMap.set(updatedSite.orgId, current + totalBandwidth); - } - } + rows = await withDeadlockRetry(async () => { + return dbQueryRows<{ orgId: string; pubKey: string }>(sql` + UPDATE sites + SET + "bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in, + "bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out, + "lastBandwidthUpdate" = ${currentTime} + FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out) + WHERE sites."pubKey" = v.pub_key + RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey" + `); + }, `flush bandwidth chunk [${i}–${chunkEnd}]`); } catch (error) { logger.error( - `Failed to flush bandwidth for site ${publicKey}:`, + `Failed to flush bandwidth chunk [${i}–${chunkEnd}], discarding ${chunk.length} site(s):`, error ); + // Discard the chunk — exact per-flush accuracy is not critical. + continue; + } - // Re-queue the failed entry so it is retried on the next flush - // rather than silently dropped. - const existing = accumulator.get(publicKey); - if (existing) { - existing.bytesIn += bytesIn; - existing.bytesOut += bytesOut; - } else { - accumulator.set(publicKey, { - bytesIn, - bytesOut, - exitNodeId, - calcUsage - }); + // Collect billing usage from the returned rows. + for (const { orgId, pubKey } of rows) { + const entry = snapshotMap.get(pubKey); + if (!entry) continue; + + const { bytesIn, bytesOut, exitNodeId, calcUsage } = entry; + + if (exitNodeId) { + const notAllowed = await checkExitNodeOrg(exitNodeId, orgId); + if (notAllowed) { + logger.warn( + `Exit node ${exitNodeId} is not allowed for org ${orgId}` + ); + continue; + } + } + + if (calcUsage) { + const current = orgUsageMap.get(orgId) ?? 0; + orgUsageMap.set(orgId, current + bytesIn + bytesOut); } } } - // Process billing usage updates outside the site-update loop to keep - // lock scope small and concerns separated. + // Process billing usage updates after all chunks are written. if (orgUsageMap.size > 0) { - // Sort org IDs for consistent lock ordering. const sortedOrgIds = [...orgUsageMap.keys()].sort(); for (const orgId of sortedOrgIds) {