mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-26 12:36:41 +00:00
Compare commits
4 Commits
thundering
...
patch-1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6b3a6fa380 | ||
|
|
e2a65b4b74 | ||
|
|
1f01108b62 | ||
|
|
62c63ddcaa |
@@ -1,6 +1,5 @@
|
||||
import { Request, Response, NextFunction } from "express";
|
||||
import { eq, sql } from "drizzle-orm";
|
||||
import { sites } from "@server/db";
|
||||
import { sql } from "drizzle-orm";
|
||||
import { db } from "@server/db";
|
||||
import logger from "@server/logger";
|
||||
import createHttpError from "http-errors";
|
||||
@@ -31,7 +30,10 @@ const MAX_RETRIES = 3;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
// How often to flush accumulated bandwidth data to the database
|
||||
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
|
||||
const FLUSH_INTERVAL_MS = 300_000; // 300 seconds
|
||||
|
||||
// Maximum number of sites to include in a single batch UPDATE statement
|
||||
const BATCH_CHUNK_SIZE = 250;
|
||||
|
||||
// In-memory accumulator: publicKey -> AccumulatorEntry
|
||||
let accumulator = new Map<string, AccumulatorEntry>();
|
||||
@@ -75,13 +77,33 @@ async function withDeadlockRetry<T>(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a raw SQL query that returns rows, in a way that works across both
|
||||
* the PostgreSQL driver (which exposes `execute`) and the SQLite driver (which
|
||||
* exposes `all`). Drizzle's typed query builder doesn't support bulk
|
||||
* UPDATE … FROM (VALUES …) natively, so we drop to raw SQL here.
|
||||
*/
|
||||
async function dbQueryRows<T extends Record<string, unknown>>(
|
||||
query: Parameters<(typeof sql)["join"]>[0][number]
|
||||
): Promise<T[]> {
|
||||
const anyDb = db as any;
|
||||
if (typeof anyDb.execute === "function") {
|
||||
// PostgreSQL (node-postgres via Drizzle) — returns { rows: [...] } or an array
|
||||
const result = await anyDb.execute(query);
|
||||
return (Array.isArray(result) ? result : (result.rows ?? [])) as T[];
|
||||
}
|
||||
// SQLite (better-sqlite3 via Drizzle) — returns an array directly
|
||||
return (await anyDb.all(query)) as T[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all accumulated site bandwidth data to the database.
|
||||
*
|
||||
* Swaps out the accumulator before writing so that any bandwidth messages
|
||||
* received during the flush are captured in the new accumulator rather than
|
||||
* being lost or causing contention. Entries that fail to write are re-queued
|
||||
* back into the accumulator so they will be retried on the next flush.
|
||||
* being lost or causing contention. Sites are updated in chunks via a single
|
||||
* batch UPDATE per chunk. Failed chunks are discarded — exact per-flush
|
||||
* accuracy is not critical and re-queuing is not worth the added complexity.
|
||||
*
|
||||
* This function is exported so that the application's graceful-shutdown
|
||||
* cleanup handler can call it before the process exits.
|
||||
@@ -108,76 +130,76 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
`Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database`
|
||||
);
|
||||
|
||||
// Aggregate billing usage by org, collected during the DB update loop.
|
||||
// Build a lookup so post-processing can reach each entry by publicKey.
|
||||
const snapshotMap = new Map(sortedEntries);
|
||||
|
||||
// Aggregate billing usage by org across all chunks.
|
||||
const orgUsageMap = new Map<string, number>();
|
||||
|
||||
for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) {
|
||||
// Process in chunks so individual queries stay at a reasonable size.
|
||||
for (let i = 0; i < sortedEntries.length; i += BATCH_CHUNK_SIZE) {
|
||||
const chunk = sortedEntries.slice(i, i + BATCH_CHUNK_SIZE);
|
||||
const chunkEnd = i + chunk.length - 1;
|
||||
|
||||
// Build a parameterised VALUES list: (pubKey, bytesIn, bytesOut), ...
|
||||
// Both PostgreSQL and SQLite (≥ 3.33.0, which better-sqlite3 bundles)
|
||||
// support UPDATE … FROM (VALUES …), letting us update the whole chunk
|
||||
// in a single query instead of N individual round-trips.
|
||||
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
|
||||
sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)`
|
||||
);
|
||||
const valuesClause = sql.join(valuesList, sql`, `);
|
||||
|
||||
let rows: { orgId: string; pubKey: string }[] = [];
|
||||
|
||||
try {
|
||||
const updatedSite = await withDeadlockRetry(async () => {
|
||||
const [result] = await db
|
||||
.update(sites)
|
||||
.set({
|
||||
megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`,
|
||||
megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`,
|
||||
lastBandwidthUpdate: currentTime,
|
||||
})
|
||||
.where(eq(sites.pubKey, publicKey))
|
||||
.returning({
|
||||
orgId: sites.orgId,
|
||||
siteId: sites.siteId
|
||||
});
|
||||
return result;
|
||||
}, `flush bandwidth for site ${publicKey}`);
|
||||
|
||||
if (updatedSite) {
|
||||
if (exitNodeId) {
|
||||
const notAllowed = await checkExitNodeOrg(
|
||||
exitNodeId,
|
||||
updatedSite.orgId
|
||||
);
|
||||
if (notAllowed) {
|
||||
logger.warn(
|
||||
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
|
||||
);
|
||||
// Skip usage tracking for this site but continue
|
||||
// processing the rest.
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (calcUsage) {
|
||||
const totalBandwidth = bytesIn + bytesOut;
|
||||
const current = orgUsageMap.get(updatedSite.orgId) ?? 0;
|
||||
orgUsageMap.set(updatedSite.orgId, current + totalBandwidth);
|
||||
}
|
||||
}
|
||||
rows = await withDeadlockRetry(async () => {
|
||||
return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
|
||||
UPDATE sites
|
||||
SET
|
||||
"bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in,
|
||||
"bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out,
|
||||
"lastBandwidthUpdate" = ${currentTime}
|
||||
FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out)
|
||||
WHERE sites."pubKey" = v.pub_key
|
||||
RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey"
|
||||
`);
|
||||
}, `flush bandwidth chunk [${i}–${chunkEnd}]`);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush bandwidth for site ${publicKey}:`,
|
||||
`Failed to flush bandwidth chunk [${i}–${chunkEnd}], discarding ${chunk.length} site(s):`,
|
||||
error
|
||||
);
|
||||
// Discard the chunk — exact per-flush accuracy is not critical.
|
||||
continue;
|
||||
}
|
||||
|
||||
// Re-queue the failed entry so it is retried on the next flush
|
||||
// rather than silently dropped.
|
||||
const existing = accumulator.get(publicKey);
|
||||
if (existing) {
|
||||
existing.bytesIn += bytesIn;
|
||||
existing.bytesOut += bytesOut;
|
||||
} else {
|
||||
accumulator.set(publicKey, {
|
||||
bytesIn,
|
||||
bytesOut,
|
||||
exitNodeId,
|
||||
calcUsage
|
||||
});
|
||||
// Collect billing usage from the returned rows.
|
||||
for (const { orgId, pubKey } of rows) {
|
||||
const entry = snapshotMap.get(pubKey);
|
||||
if (!entry) continue;
|
||||
|
||||
const { bytesIn, bytesOut, exitNodeId, calcUsage } = entry;
|
||||
|
||||
if (exitNodeId) {
|
||||
const notAllowed = await checkExitNodeOrg(exitNodeId, orgId);
|
||||
if (notAllowed) {
|
||||
logger.warn(
|
||||
`Exit node ${exitNodeId} is not allowed for org ${orgId}`
|
||||
);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (calcUsage) {
|
||||
const current = orgUsageMap.get(orgId) ?? 0;
|
||||
orgUsageMap.set(orgId, current + bytesIn + bytesOut);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Process billing usage updates outside the site-update loop to keep
|
||||
// lock scope small and concerns separated.
|
||||
// Process billing usage updates after all chunks are written.
|
||||
if (orgUsageMap.size > 0) {
|
||||
// Sort org IDs for consistent lock ordering.
|
||||
const sortedOrgIds = [...orgUsageMap.keys()].sort();
|
||||
|
||||
for (const orgId of sortedOrgIds) {
|
||||
|
||||
Reference in New Issue
Block a user