mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-26 12:36:41 +00:00
Compare commits
3 Commits
thundering
...
1.16.2-s.2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e2a65b4b74 | ||
|
|
1f01108b62 | ||
|
|
62c63ddcaa |
@@ -1,6 +1,5 @@
|
|||||||
import { Request, Response, NextFunction } from "express";
|
import { Request, Response, NextFunction } from "express";
|
||||||
import { eq, sql } from "drizzle-orm";
|
import { sql } from "drizzle-orm";
|
||||||
import { sites } from "@server/db";
|
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import createHttpError from "http-errors";
|
import createHttpError from "http-errors";
|
||||||
@@ -31,7 +30,10 @@ const MAX_RETRIES = 3;
|
|||||||
const BASE_DELAY_MS = 50;
|
const BASE_DELAY_MS = 50;
|
||||||
|
|
||||||
// How often to flush accumulated bandwidth data to the database
|
// How often to flush accumulated bandwidth data to the database
|
||||||
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
|
const FLUSH_INTERVAL_MS = 300_000; // 300 seconds
|
||||||
|
|
||||||
|
// Maximum number of sites to include in a single batch UPDATE statement
|
||||||
|
const BATCH_CHUNK_SIZE = 250;
|
||||||
|
|
||||||
// In-memory accumulator: publicKey -> AccumulatorEntry
|
// In-memory accumulator: publicKey -> AccumulatorEntry
|
||||||
let accumulator = new Map<string, AccumulatorEntry>();
|
let accumulator = new Map<string, AccumulatorEntry>();
|
||||||
@@ -75,13 +77,33 @@ async function withDeadlockRetry<T>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute a raw SQL query that returns rows, in a way that works across both
|
||||||
|
* the PostgreSQL driver (which exposes `execute`) and the SQLite driver (which
|
||||||
|
* exposes `all`). Drizzle's typed query builder doesn't support bulk
|
||||||
|
* UPDATE … FROM (VALUES …) natively, so we drop to raw SQL here.
|
||||||
|
*/
|
||||||
|
async function dbQueryRows<T extends Record<string, unknown>>(
|
||||||
|
query: Parameters<(typeof sql)["join"]>[0][number]
|
||||||
|
): Promise<T[]> {
|
||||||
|
const anyDb = db as any;
|
||||||
|
if (typeof anyDb.execute === "function") {
|
||||||
|
// PostgreSQL (node-postgres via Drizzle) — returns { rows: [...] } or an array
|
||||||
|
const result = await anyDb.execute(query);
|
||||||
|
return (Array.isArray(result) ? result : (result.rows ?? [])) as T[];
|
||||||
|
}
|
||||||
|
// SQLite (better-sqlite3 via Drizzle) — returns an array directly
|
||||||
|
return (await anyDb.all(query)) as T[];
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush all accumulated site bandwidth data to the database.
|
* Flush all accumulated site bandwidth data to the database.
|
||||||
*
|
*
|
||||||
* Swaps out the accumulator before writing so that any bandwidth messages
|
* Swaps out the accumulator before writing so that any bandwidth messages
|
||||||
* received during the flush are captured in the new accumulator rather than
|
* received during the flush are captured in the new accumulator rather than
|
||||||
* being lost or causing contention. Entries that fail to write are re-queued
|
* being lost or causing contention. Sites are updated in chunks via a single
|
||||||
* back into the accumulator so they will be retried on the next flush.
|
* batch UPDATE per chunk. Failed chunks are discarded — exact per-flush
|
||||||
|
* accuracy is not critical and re-queuing is not worth the added complexity.
|
||||||
*
|
*
|
||||||
* This function is exported so that the application's graceful-shutdown
|
* This function is exported so that the application's graceful-shutdown
|
||||||
* cleanup handler can call it before the process exits.
|
* cleanup handler can call it before the process exits.
|
||||||
@@ -108,76 +130,76 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
|||||||
`Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database`
|
`Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database`
|
||||||
);
|
);
|
||||||
|
|
||||||
// Aggregate billing usage by org, collected during the DB update loop.
|
// Build a lookup so post-processing can reach each entry by publicKey.
|
||||||
|
const snapshotMap = new Map(sortedEntries);
|
||||||
|
|
||||||
|
// Aggregate billing usage by org across all chunks.
|
||||||
const orgUsageMap = new Map<string, number>();
|
const orgUsageMap = new Map<string, number>();
|
||||||
|
|
||||||
for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) {
|
// Process in chunks so individual queries stay at a reasonable size.
|
||||||
try {
|
for (let i = 0; i < sortedEntries.length; i += BATCH_CHUNK_SIZE) {
|
||||||
const updatedSite = await withDeadlockRetry(async () => {
|
const chunk = sortedEntries.slice(i, i + BATCH_CHUNK_SIZE);
|
||||||
const [result] = await db
|
const chunkEnd = i + chunk.length - 1;
|
||||||
.update(sites)
|
|
||||||
.set({
|
|
||||||
megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`,
|
|
||||||
megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`,
|
|
||||||
lastBandwidthUpdate: currentTime,
|
|
||||||
})
|
|
||||||
.where(eq(sites.pubKey, publicKey))
|
|
||||||
.returning({
|
|
||||||
orgId: sites.orgId,
|
|
||||||
siteId: sites.siteId
|
|
||||||
});
|
|
||||||
return result;
|
|
||||||
}, `flush bandwidth for site ${publicKey}`);
|
|
||||||
|
|
||||||
if (updatedSite) {
|
// Build a parameterised VALUES list: (pubKey, bytesIn, bytesOut), ...
|
||||||
if (exitNodeId) {
|
// Both PostgreSQL and SQLite (≥ 3.33.0, which better-sqlite3 bundles)
|
||||||
const notAllowed = await checkExitNodeOrg(
|
// support UPDATE … FROM (VALUES …), letting us update the whole chunk
|
||||||
exitNodeId,
|
// in a single query instead of N individual round-trips.
|
||||||
updatedSite.orgId
|
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
|
||||||
|
sql`(${publicKey}, ${bytesIn}, ${bytesOut})`
|
||||||
);
|
);
|
||||||
|
const valuesClause = sql.join(valuesList, sql`, `);
|
||||||
|
|
||||||
|
let rows: { orgId: string; pubKey: string }[] = [];
|
||||||
|
|
||||||
|
try {
|
||||||
|
rows = await withDeadlockRetry(async () => {
|
||||||
|
return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
|
||||||
|
UPDATE sites
|
||||||
|
SET
|
||||||
|
"bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in,
|
||||||
|
"bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out,
|
||||||
|
"lastBandwidthUpdate" = ${currentTime}
|
||||||
|
FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out)
|
||||||
|
WHERE sites."pubKey" = v.pub_key
|
||||||
|
RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey"
|
||||||
|
`);
|
||||||
|
}, `flush bandwidth chunk [${i}–${chunkEnd}]`);
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to flush bandwidth chunk [${i}–${chunkEnd}], discarding ${chunk.length} site(s):`,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
// Discard the chunk — exact per-flush accuracy is not critical.
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Collect billing usage from the returned rows.
|
||||||
|
for (const { orgId, pubKey } of rows) {
|
||||||
|
const entry = snapshotMap.get(pubKey);
|
||||||
|
if (!entry) continue;
|
||||||
|
|
||||||
|
const { bytesIn, bytesOut, exitNodeId, calcUsage } = entry;
|
||||||
|
|
||||||
|
if (exitNodeId) {
|
||||||
|
const notAllowed = await checkExitNodeOrg(exitNodeId, orgId);
|
||||||
if (notAllowed) {
|
if (notAllowed) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
|
`Exit node ${exitNodeId} is not allowed for org ${orgId}`
|
||||||
);
|
);
|
||||||
// Skip usage tracking for this site but continue
|
|
||||||
// processing the rest.
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (calcUsage) {
|
if (calcUsage) {
|
||||||
const totalBandwidth = bytesIn + bytesOut;
|
const current = orgUsageMap.get(orgId) ?? 0;
|
||||||
const current = orgUsageMap.get(updatedSite.orgId) ?? 0;
|
orgUsageMap.set(orgId, current + bytesIn + bytesOut);
|
||||||
orgUsageMap.set(updatedSite.orgId, current + totalBandwidth);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to flush bandwidth for site ${publicKey}:`,
|
|
||||||
error
|
|
||||||
);
|
|
||||||
|
|
||||||
// Re-queue the failed entry so it is retried on the next flush
|
|
||||||
// rather than silently dropped.
|
|
||||||
const existing = accumulator.get(publicKey);
|
|
||||||
if (existing) {
|
|
||||||
existing.bytesIn += bytesIn;
|
|
||||||
existing.bytesOut += bytesOut;
|
|
||||||
} else {
|
|
||||||
accumulator.set(publicKey, {
|
|
||||||
bytesIn,
|
|
||||||
bytesOut,
|
|
||||||
exitNodeId,
|
|
||||||
calcUsage
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process billing usage updates outside the site-update loop to keep
|
// Process billing usage updates after all chunks are written.
|
||||||
// lock scope small and concerns separated.
|
|
||||||
if (orgUsageMap.size > 0) {
|
if (orgUsageMap.size > 0) {
|
||||||
// Sort org IDs for consistent lock ordering.
|
|
||||||
const sortedOrgIds = [...orgUsageMap.keys()].sort();
|
const sortedOrgIds = [...orgUsageMap.keys()].sort();
|
||||||
|
|
||||||
for (const orgId of sortedOrgIds) {
|
for (const orgId of sortedOrgIds) {
|
||||||
|
|||||||
Reference in New Issue
Block a user