mirror of
https://github.com/fosrl/pangolin.git
synced 2026-04-09 11:26:39 +00:00
Breakout sites tables
This commit is contained in:
@@ -1,11 +1,11 @@
|
||||
import { db, newts, sites, targetHealthCheck, targets } from "@server/db";
|
||||
import { db, newts, sites, targetHealthCheck, targets, sitePing, siteBandwidth } from "@server/db";
|
||||
import {
|
||||
hasActiveConnections,
|
||||
getClientConfigVersion
|
||||
} from "#dynamic/routers/ws";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { Newt } from "@server/db";
|
||||
import { eq, lt, isNull, and, or, ne, not } from "drizzle-orm";
|
||||
import { eq, lt, isNull, and, or, ne } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { sendNewtSyncMessage } from "./sync";
|
||||
import { recordPing } from "./pingAccumulator";
|
||||
@@ -41,17 +41,18 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.select({
|
||||
siteId: sites.siteId,
|
||||
newtId: newts.newtId,
|
||||
lastPing: sites.lastPing
|
||||
lastPing: sitePing.lastPing
|
||||
})
|
||||
.from(sites)
|
||||
.innerJoin(newts, eq(newts.siteId, sites.siteId))
|
||||
.leftJoin(sitePing, eq(sitePing.siteId, sites.siteId))
|
||||
.where(
|
||||
and(
|
||||
eq(sites.online, true),
|
||||
eq(sites.type, "newt"),
|
||||
or(
|
||||
lt(sites.lastPing, twoMinutesAgo),
|
||||
isNull(sites.lastPing)
|
||||
lt(sitePing.lastPing, twoMinutesAgo),
|
||||
isNull(sitePing.lastPing)
|
||||
)
|
||||
)
|
||||
);
|
||||
@@ -112,15 +113,11 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.select({
|
||||
siteId: sites.siteId,
|
||||
online: sites.online,
|
||||
lastBandwidthUpdate: sites.lastBandwidthUpdate
|
||||
lastBandwidthUpdate: siteBandwidth.lastBandwidthUpdate
|
||||
})
|
||||
.from(sites)
|
||||
.where(
|
||||
and(
|
||||
eq(sites.type, "wireguard"),
|
||||
not(isNull(sites.lastBandwidthUpdate))
|
||||
)
|
||||
);
|
||||
.innerJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId))
|
||||
.where(eq(sites.type, "wireguard"));
|
||||
|
||||
const wireguardOfflineThreshold = Math.floor(
|
||||
(Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000
|
||||
@@ -128,12 +125,7 @@ export const startNewtOfflineChecker = (): void => {
|
||||
|
||||
// loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline
|
||||
for (const site of allWireguardSites) {
|
||||
const lastBandwidthUpdate =
|
||||
new Date(site.lastBandwidthUpdate!).getTime() / 1000;
|
||||
if (
|
||||
lastBandwidthUpdate < wireguardOfflineThreshold &&
|
||||
site.online
|
||||
) {
|
||||
if ((site.lastBandwidthUpdate ?? 0) < wireguardOfflineThreshold && site.online) {
|
||||
logger.info(
|
||||
`Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes`
|
||||
);
|
||||
@@ -142,10 +134,7 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.update(sites)
|
||||
.set({ online: false })
|
||||
.where(eq(sites.siteId, site.siteId));
|
||||
} else if (
|
||||
lastBandwidthUpdate >= wireguardOfflineThreshold &&
|
||||
!site.online
|
||||
) {
|
||||
} else if ((site.lastBandwidthUpdate ?? 0) >= wireguardOfflineThreshold && !site.online) {
|
||||
logger.info(
|
||||
`Marking wireguard site ${site.siteId} online: recent bandwidth update`
|
||||
);
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { db } from "@server/db";
|
||||
import { db, clients, clientBandwidth } from "@server/db";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { clients } from "@server/db";
|
||||
import { eq, sql } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
|
||||
@@ -85,7 +84,7 @@ export async function flushBandwidthToDb(): Promise<void> {
|
||||
const snapshot = accumulator;
|
||||
accumulator = new Map<string, BandwidthAccumulator>();
|
||||
|
||||
const currentTime = new Date().toISOString();
|
||||
const currentEpoch = Math.floor(Date.now() / 1000);
|
||||
|
||||
// Sort by publicKey for consistent lock ordering across concurrent
|
||||
// writers — this is the same deadlock-prevention strategy used in the
|
||||
@@ -101,19 +100,37 @@ export async function flushBandwidthToDb(): Promise<void> {
|
||||
for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) {
|
||||
try {
|
||||
await withDeadlockRetry(async () => {
|
||||
// Use atomic SQL increment to avoid the SELECT-then-UPDATE
|
||||
// anti-pattern and the races it would introduce.
|
||||
// Find clientId by pubKey
|
||||
const [clientRow] = await db
|
||||
.select({ clientId: clients.clientId })
|
||||
.from(clients)
|
||||
.where(eq(clients.pubKey, publicKey))
|
||||
.limit(1);
|
||||
|
||||
if (!clientRow) {
|
||||
logger.warn(`No client found for pubKey ${publicKey}, skipping`);
|
||||
return;
|
||||
}
|
||||
|
||||
await db
|
||||
.update(clients)
|
||||
.set({
|
||||
.insert(clientBandwidth)
|
||||
.values({
|
||||
clientId: clientRow.clientId,
|
||||
// Note: bytesIn from peer goes to megabytesOut (data
|
||||
// sent to client) and bytesOut from peer goes to
|
||||
// megabytesIn (data received from client).
|
||||
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
|
||||
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
|
||||
lastBandwidthUpdate: currentTime
|
||||
megabytesOut: bytesIn,
|
||||
megabytesIn: bytesOut,
|
||||
lastBandwidthUpdate: currentEpoch
|
||||
})
|
||||
.where(eq(clients.pubKey, publicKey));
|
||||
.onConflictDoUpdate({
|
||||
target: clientBandwidth.clientId,
|
||||
set: {
|
||||
megabytesOut: sql`COALESCE(${clientBandwidth.megabytesOut}, 0) + ${bytesIn}`,
|
||||
megabytesIn: sql`COALESCE(${clientBandwidth.megabytesIn}, 0) + ${bytesOut}`,
|
||||
lastBandwidthUpdate: currentEpoch
|
||||
}
|
||||
});
|
||||
}, `flush bandwidth for client ${publicKey}`);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { db } from "@server/db";
|
||||
import { sites, clients, olms } from "@server/db";
|
||||
import { inArray } from "drizzle-orm";
|
||||
import { sites, clients, olms, sitePing, clientPing } from "@server/db";
|
||||
import { inArray, sql } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
|
||||
/**
|
||||
@@ -81,11 +81,8 @@ export function recordClientPing(
|
||||
/**
|
||||
* Flush all accumulated site pings to the database.
|
||||
*
|
||||
* Each batch of up to BATCH_SIZE rows is written with a **single** UPDATE
|
||||
* statement. We use the maximum timestamp across the batch so that `lastPing`
|
||||
* reflects the most recent ping seen for any site in the group. This avoids
|
||||
* the multi-statement transaction that previously created additional
|
||||
* row-lock ordering hazards.
|
||||
* For each batch: first upserts individual per-site timestamps into
|
||||
* `sitePing`, then bulk-updates `sites.online = true`.
|
||||
*/
|
||||
async function flushSitePingsToDb(): Promise<void> {
|
||||
if (pendingSitePings.size === 0) {
|
||||
@@ -103,20 +100,25 @@ async function flushSitePingsToDb(): Promise<void> {
|
||||
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
// Use the latest timestamp in the batch so that `lastPing` always
|
||||
// moves forward. Using a single timestamp for the whole batch means
|
||||
// we only ever need one UPDATE statement (no transaction).
|
||||
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
|
||||
const siteIds = batch.map(([id]) => id);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
const rows = batch.map(([siteId, ts]) => ({ siteId, lastPing: ts }));
|
||||
|
||||
// Step 1: Upsert ping timestamps into sitePing
|
||||
await db
|
||||
.insert(sitePing)
|
||||
.values(rows)
|
||||
.onConflictDoUpdate({
|
||||
target: sitePing.siteId,
|
||||
set: { lastPing: sql`excluded."lastPing"` }
|
||||
});
|
||||
|
||||
// Step 2: Update online status on sites
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: maxTimestamp
|
||||
})
|
||||
.set({ online: true })
|
||||
.where(inArray(sites.siteId, siteIds));
|
||||
}, "flushSitePingsToDb");
|
||||
} catch (error) {
|
||||
@@ -139,7 +141,8 @@ async function flushSitePingsToDb(): Promise<void> {
|
||||
/**
|
||||
* Flush all accumulated client (OLM) pings to the database.
|
||||
*
|
||||
* Same single-UPDATE-per-batch approach as `flushSitePingsToDb`.
|
||||
* For each batch: first upserts individual per-client timestamps into
|
||||
* `clientPing`, then bulk-updates `clients.online = true, archived = false`.
|
||||
*/
|
||||
async function flushClientPingsToDb(): Promise<void> {
|
||||
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
||||
@@ -161,18 +164,25 @@ async function flushClientPingsToDb(): Promise<void> {
|
||||
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
|
||||
const clientIds = batch.map(([id]) => id);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
const rows = batch.map(([clientId, ts]) => ({ clientId, lastPing: ts }));
|
||||
|
||||
// Step 1: Upsert ping timestamps into clientPing
|
||||
await db
|
||||
.insert(clientPing)
|
||||
.values(rows)
|
||||
.onConflictDoUpdate({
|
||||
target: clientPing.clientId,
|
||||
set: { lastPing: sql`excluded."lastPing"` }
|
||||
});
|
||||
|
||||
// Step 2: Update online + unarchive on clients
|
||||
await db
|
||||
.update(clients)
|
||||
.set({
|
||||
lastPing: maxTimestamp,
|
||||
online: true,
|
||||
archived: false
|
||||
})
|
||||
.set({ online: true, archived: false })
|
||||
.where(inArray(clients.clientId, clientIds));
|
||||
}, "flushClientPingsToDb");
|
||||
} catch (error) {
|
||||
|
||||
Reference in New Issue
Block a user