Compare commits

...

2 Commits

Author SHA1 Message Date
Owen Schwartz
e2a65b4b74 Merge pull request #2715 from fosrl/batch-band
Batch set bandwidth
2026-03-25 21:54:44 -07:00
Owen
1f01108b62 Batch set bandwidth 2026-03-25 21:53:20 -07:00

View File

@@ -1,6 +1,5 @@
import { Request, Response, NextFunction } from "express"; import { Request, Response, NextFunction } from "express";
import { eq, sql } from "drizzle-orm"; import { sql } from "drizzle-orm";
import { sites } from "@server/db";
import { db } from "@server/db"; import { db } from "@server/db";
import logger from "@server/logger"; import logger from "@server/logger";
import createHttpError from "http-errors"; import createHttpError from "http-errors";
@@ -31,7 +30,10 @@ const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50; const BASE_DELAY_MS = 50;
// How often to flush accumulated bandwidth data to the database // How often to flush accumulated bandwidth data to the database
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds const FLUSH_INTERVAL_MS = 300_000; // 300 seconds
// Maximum number of sites to include in a single batch UPDATE statement
const BATCH_CHUNK_SIZE = 250;
// In-memory accumulator: publicKey -> AccumulatorEntry // In-memory accumulator: publicKey -> AccumulatorEntry
let accumulator = new Map<string, AccumulatorEntry>(); let accumulator = new Map<string, AccumulatorEntry>();
@@ -75,13 +77,33 @@ async function withDeadlockRetry<T>(
} }
} }
/**
* Execute a raw SQL query that returns rows, in a way that works across both
* the PostgreSQL driver (which exposes `execute`) and the SQLite driver (which
* exposes `all`). Drizzle's typed query builder doesn't support bulk
* UPDATE … FROM (VALUES …) natively, so we drop to raw SQL here.
*/
async function dbQueryRows<T extends Record<string, unknown>>(
query: Parameters<(typeof sql)["join"]>[0][number]
): Promise<T[]> {
const anyDb = db as any;
if (typeof anyDb.execute === "function") {
// PostgreSQL (node-postgres via Drizzle) — returns { rows: [...] } or an array
const result = await anyDb.execute(query);
return (Array.isArray(result) ? result : (result.rows ?? [])) as T[];
}
// SQLite (better-sqlite3 via Drizzle) — returns an array directly
return (await anyDb.all(query)) as T[];
}
/** /**
* Flush all accumulated site bandwidth data to the database. * Flush all accumulated site bandwidth data to the database.
* *
* Swaps out the accumulator before writing so that any bandwidth messages * Swaps out the accumulator before writing so that any bandwidth messages
* received during the flush are captured in the new accumulator rather than * received during the flush are captured in the new accumulator rather than
* being lost or causing contention. Entries that fail to write are re-queued * being lost or causing contention. Sites are updated in chunks via a single
* back into the accumulator so they will be retried on the next flush. * batch UPDATE per chunk. Failed chunks are discarded — exact per-flush
* accuracy is not critical and re-queuing is not worth the added complexity.
* *
* This function is exported so that the application's graceful-shutdown * This function is exported so that the application's graceful-shutdown
* cleanup handler can call it before the process exits. * cleanup handler can call it before the process exits.
@@ -108,76 +130,76 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
`Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database` `Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database`
); );
// Aggregate billing usage by org, collected during the DB update loop. // Build a lookup so post-processing can reach each entry by publicKey.
const snapshotMap = new Map(sortedEntries);
// Aggregate billing usage by org across all chunks.
const orgUsageMap = new Map<string, number>(); const orgUsageMap = new Map<string, number>();
for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) { // Process in chunks so individual queries stay at a reasonable size.
for (let i = 0; i < sortedEntries.length; i += BATCH_CHUNK_SIZE) {
const chunk = sortedEntries.slice(i, i + BATCH_CHUNK_SIZE);
const chunkEnd = i + chunk.length - 1;
// Build a parameterised VALUES list: (pubKey, bytesIn, bytesOut), ...
// Both PostgreSQL and SQLite (≥ 3.33.0, which better-sqlite3 bundles)
// support UPDATE … FROM (VALUES …), letting us update the whole chunk
// in a single query instead of N individual round-trips.
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
sql`(${publicKey}, ${bytesIn}, ${bytesOut})`
);
const valuesClause = sql.join(valuesList, sql`, `);
let rows: { orgId: string; pubKey: string }[] = [];
try { try {
const updatedSite = await withDeadlockRetry(async () => { rows = await withDeadlockRetry(async () => {
const [result] = await db return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
.update(sites) UPDATE sites
.set({ SET
megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`, "bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in,
megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`, "bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out,
lastBandwidthUpdate: currentTime, "lastBandwidthUpdate" = ${currentTime}
}) FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out)
.where(eq(sites.pubKey, publicKey)) WHERE sites."pubKey" = v.pub_key
.returning({ RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey"
orgId: sites.orgId, `);
siteId: sites.siteId }, `flush bandwidth chunk [${i}${chunkEnd}]`);
});
return result;
}, `flush bandwidth for site ${publicKey}`);
if (updatedSite) {
if (exitNodeId) {
const notAllowed = await checkExitNodeOrg(
exitNodeId,
updatedSite.orgId
);
if (notAllowed) {
logger.warn(
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
);
// Skip usage tracking for this site but continue
// processing the rest.
continue;
}
}
if (calcUsage) {
const totalBandwidth = bytesIn + bytesOut;
const current = orgUsageMap.get(updatedSite.orgId) ?? 0;
orgUsageMap.set(updatedSite.orgId, current + totalBandwidth);
}
}
} catch (error) { } catch (error) {
logger.error( logger.error(
`Failed to flush bandwidth for site ${publicKey}:`, `Failed to flush bandwidth chunk [${i}${chunkEnd}], discarding ${chunk.length} site(s):`,
error error
); );
// Discard the chunk — exact per-flush accuracy is not critical.
continue;
}
// Re-queue the failed entry so it is retried on the next flush // Collect billing usage from the returned rows.
// rather than silently dropped. for (const { orgId, pubKey } of rows) {
const existing = accumulator.get(publicKey); const entry = snapshotMap.get(pubKey);
if (existing) { if (!entry) continue;
existing.bytesIn += bytesIn;
existing.bytesOut += bytesOut; const { bytesIn, bytesOut, exitNodeId, calcUsage } = entry;
} else {
accumulator.set(publicKey, { if (exitNodeId) {
bytesIn, const notAllowed = await checkExitNodeOrg(exitNodeId, orgId);
bytesOut, if (notAllowed) {
exitNodeId, logger.warn(
calcUsage `Exit node ${exitNodeId} is not allowed for org ${orgId}`
}); );
continue;
}
}
if (calcUsage) {
const current = orgUsageMap.get(orgId) ?? 0;
orgUsageMap.set(orgId, current + bytesIn + bytesOut);
} }
} }
} }
// Process billing usage updates outside the site-update loop to keep // Process billing usage updates after all chunks are written.
// lock scope small and concerns separated.
if (orgUsageMap.size > 0) { if (orgUsageMap.size > 0) {
// Sort org IDs for consistent lock ordering.
const sortedOrgIds = [...orgUsageMap.keys()].sort(); const sortedOrgIds = [...orgUsageMap.keys()].sort();
for (const orgId of sortedOrgIds) { for (const orgId of sortedOrgIds) {