mirror of
https://github.com/fosrl/pangolin.git
synced 2026-04-04 17:06:37 +00:00
@@ -1,6 +1,6 @@
|
|||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { sites, clients, olms } from "@server/db";
|
import { sites, clients, olms } from "@server/db";
|
||||||
import { eq, inArray } from "drizzle-orm";
|
import { inArray } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -21,7 +21,7 @@ import logger from "@server/logger";
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
||||||
const MAX_RETRIES = 2;
|
const MAX_RETRIES = 5;
|
||||||
const BASE_DELAY_MS = 50;
|
const BASE_DELAY_MS = 50;
|
||||||
|
|
||||||
// ── Site (newt) pings ──────────────────────────────────────────────────
|
// ── Site (newt) pings ──────────────────────────────────────────────────
|
||||||
@@ -36,6 +36,14 @@ const pendingOlmArchiveResets: Set<string> = new Set();
|
|||||||
|
|
||||||
let flushTimer: NodeJS.Timeout | null = null;
|
let flushTimer: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Guard that prevents two flush cycles from running concurrently.
|
||||||
|
* setInterval does not await async callbacks, so without this a slow flush
|
||||||
|
* (e.g. due to DB latency) would overlap with the next scheduled cycle and
|
||||||
|
* the two concurrent bulk UPDATEs would deadlock each other.
|
||||||
|
*/
|
||||||
|
let isFlushing = false;
|
||||||
|
|
||||||
// ── Public API ─────────────────────────────────────────────────────────
|
// ── Public API ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -72,6 +80,12 @@ export function recordClientPing(
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush all accumulated site pings to the database.
|
* Flush all accumulated site pings to the database.
|
||||||
|
*
|
||||||
|
* Each batch of up to BATCH_SIZE rows is written with a **single** UPDATE
|
||||||
|
* statement. We use the maximum timestamp across the batch so that `lastPing`
|
||||||
|
* reflects the most recent ping seen for any site in the group. This avoids
|
||||||
|
* the multi-statement transaction that previously created additional
|
||||||
|
* row-lock ordering hazards.
|
||||||
*/
|
*/
|
||||||
async function flushSitePingsToDb(): Promise<void> {
|
async function flushSitePingsToDb(): Promise<void> {
|
||||||
if (pendingSitePings.size === 0) {
|
if (pendingSitePings.size === 0) {
|
||||||
@@ -83,55 +97,35 @@ async function flushSitePingsToDb(): Promise<void> {
|
|||||||
const pingsToFlush = new Map(pendingSitePings);
|
const pingsToFlush = new Map(pendingSitePings);
|
||||||
pendingSitePings.clear();
|
pendingSitePings.clear();
|
||||||
|
|
||||||
// Sort by siteId for consistent lock ordering (prevents deadlocks)
|
const entries = Array.from(pingsToFlush.entries());
|
||||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
|
||||||
([a], [b]) => a - b
|
|
||||||
);
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
const BATCH_SIZE = 50;
|
||||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||||
|
|
||||||
|
// Use the latest timestamp in the batch so that `lastPing` always
|
||||||
|
// moves forward. Using a single timestamp for the whole batch means
|
||||||
|
// we only ever need one UPDATE statement (no transaction).
|
||||||
|
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
|
||||||
|
const siteIds = batch.map(([id]) => id);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await withRetry(async () => {
|
await withRetry(async () => {
|
||||||
// Group by timestamp for efficient bulk updates
|
await db
|
||||||
const byTimestamp = new Map<number, number[]>();
|
.update(sites)
|
||||||
for (const [siteId, timestamp] of batch) {
|
.set({
|
||||||
const group = byTimestamp.get(timestamp) || [];
|
online: true,
|
||||||
group.push(siteId);
|
lastPing: maxTimestamp
|
||||||
byTimestamp.set(timestamp, group);
|
})
|
||||||
}
|
.where(inArray(sites.siteId, siteIds));
|
||||||
|
|
||||||
if (byTimestamp.size === 1) {
|
|
||||||
const [timestamp, siteIds] = Array.from(
|
|
||||||
byTimestamp.entries()
|
|
||||||
)[0];
|
|
||||||
await db
|
|
||||||
.update(sites)
|
|
||||||
.set({
|
|
||||||
online: true,
|
|
||||||
lastPing: timestamp
|
|
||||||
})
|
|
||||||
.where(inArray(sites.siteId, siteIds));
|
|
||||||
} else {
|
|
||||||
await db.transaction(async (tx) => {
|
|
||||||
for (const [timestamp, siteIds] of byTimestamp) {
|
|
||||||
await tx
|
|
||||||
.update(sites)
|
|
||||||
.set({
|
|
||||||
online: true,
|
|
||||||
lastPing: timestamp
|
|
||||||
})
|
|
||||||
.where(inArray(sites.siteId, siteIds));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, "flushSitePingsToDb");
|
}, "flushSitePingsToDb");
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
||||||
{ error }
|
{ error }
|
||||||
);
|
);
|
||||||
|
// Re-queue only if the preserved timestamp is newer than any
|
||||||
|
// update that may have landed since we snapshotted.
|
||||||
for (const [siteId, timestamp] of batch) {
|
for (const [siteId, timestamp] of batch) {
|
||||||
const existing = pendingSitePings.get(siteId);
|
const existing = pendingSitePings.get(siteId);
|
||||||
if (!existing || existing < timestamp) {
|
if (!existing || existing < timestamp) {
|
||||||
@@ -144,6 +138,8 @@ async function flushSitePingsToDb(): Promise<void> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush all accumulated client (OLM) pings to the database.
|
* Flush all accumulated client (OLM) pings to the database.
|
||||||
|
*
|
||||||
|
* Same single-UPDATE-per-batch approach as `flushSitePingsToDb`.
|
||||||
*/
|
*/
|
||||||
async function flushClientPingsToDb(): Promise<void> {
|
async function flushClientPingsToDb(): Promise<void> {
|
||||||
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
||||||
@@ -159,51 +155,25 @@ async function flushClientPingsToDb(): Promise<void> {
|
|||||||
|
|
||||||
// ── Flush client pings ─────────────────────────────────────────────
|
// ── Flush client pings ─────────────────────────────────────────────
|
||||||
if (pingsToFlush.size > 0) {
|
if (pingsToFlush.size > 0) {
|
||||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
const entries = Array.from(pingsToFlush.entries());
|
||||||
([a], [b]) => a - b
|
|
||||||
);
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
const BATCH_SIZE = 50;
|
||||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||||
|
|
||||||
|
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
|
||||||
|
const clientIds = batch.map(([id]) => id);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await withRetry(async () => {
|
await withRetry(async () => {
|
||||||
const byTimestamp = new Map<number, number[]>();
|
await db
|
||||||
for (const [clientId, timestamp] of batch) {
|
.update(clients)
|
||||||
const group = byTimestamp.get(timestamp) || [];
|
.set({
|
||||||
group.push(clientId);
|
lastPing: maxTimestamp,
|
||||||
byTimestamp.set(timestamp, group);
|
online: true,
|
||||||
}
|
archived: false
|
||||||
|
})
|
||||||
if (byTimestamp.size === 1) {
|
.where(inArray(clients.clientId, clientIds));
|
||||||
const [timestamp, clientIds] = Array.from(
|
|
||||||
byTimestamp.entries()
|
|
||||||
)[0];
|
|
||||||
await db
|
|
||||||
.update(clients)
|
|
||||||
.set({
|
|
||||||
lastPing: timestamp,
|
|
||||||
online: true,
|
|
||||||
archived: false
|
|
||||||
})
|
|
||||||
.where(inArray(clients.clientId, clientIds));
|
|
||||||
} else {
|
|
||||||
await db.transaction(async (tx) => {
|
|
||||||
for (const [timestamp, clientIds] of byTimestamp) {
|
|
||||||
await tx
|
|
||||||
.update(clients)
|
|
||||||
.set({
|
|
||||||
lastPing: timestamp,
|
|
||||||
online: true,
|
|
||||||
archived: false
|
|
||||||
})
|
|
||||||
.where(
|
|
||||||
inArray(clients.clientId, clientIds)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, "flushClientPingsToDb");
|
}, "flushClientPingsToDb");
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
@@ -260,7 +230,12 @@ export async function flushPingsToDb(): Promise<void> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple retry wrapper with exponential backoff for transient errors
|
* Simple retry wrapper with exponential backoff for transient errors
|
||||||
* (connection timeouts, unexpected disconnects).
|
* (deadlocks, connection timeouts, unexpected disconnects).
|
||||||
|
*
|
||||||
|
* PostgreSQL deadlocks (40P01) are always safe to retry: the database
|
||||||
|
* guarantees exactly one winner per deadlock pair, so the loser just needs
|
||||||
|
* to try again. MAX_RETRIES is intentionally higher than typical connection
|
||||||
|
* retry budgets to give deadlock victims enough chances to succeed.
|
||||||
*/
|
*/
|
||||||
async function withRetry<T>(
|
async function withRetry<T>(
|
||||||
operation: () => Promise<T>,
|
operation: () => Promise<T>,
|
||||||
@@ -277,7 +252,8 @@ async function withRetry<T>(
|
|||||||
const jitter = Math.random() * baseDelay;
|
const jitter = Math.random() * baseDelay;
|
||||||
const delay = baseDelay + jitter;
|
const delay = baseDelay + jitter;
|
||||||
logger.warn(
|
logger.warn(
|
||||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`,
|
||||||
|
{ code: error?.code ?? error?.cause?.code }
|
||||||
);
|
);
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
continue;
|
continue;
|
||||||
@@ -288,14 +264,14 @@ async function withRetry<T>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Detect transient connection errors that are safe to retry.
|
* Detect transient errors that are safe to retry.
|
||||||
*/
|
*/
|
||||||
function isTransientError(error: any): boolean {
|
function isTransientError(error: any): boolean {
|
||||||
if (!error) return false;
|
if (!error) return false;
|
||||||
|
|
||||||
const message = (error.message || "").toLowerCase();
|
const message = (error.message || "").toLowerCase();
|
||||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||||
const code = error.code || "";
|
const code = error.code || error.cause?.code || "";
|
||||||
|
|
||||||
// Connection timeout / terminated
|
// Connection timeout / terminated
|
||||||
if (
|
if (
|
||||||
@@ -308,12 +284,17 @@ function isTransientError(error: any): boolean {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// PostgreSQL deadlock
|
// PostgreSQL deadlock detected — always safe to retry (one winner guaranteed)
|
||||||
if (code === "40P01" || message.includes("deadlock")) {
|
if (code === "40P01" || message.includes("deadlock")) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ECONNRESET, ECONNREFUSED, EPIPE
|
// PostgreSQL serialization failure
|
||||||
|
if (code === "40001") {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
|
||||||
if (
|
if (
|
||||||
code === "ECONNRESET" ||
|
code === "ECONNRESET" ||
|
||||||
code === "ECONNREFUSED" ||
|
code === "ECONNREFUSED" ||
|
||||||
@@ -337,12 +318,26 @@ export function startPingAccumulator(): void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
flushTimer = setInterval(async () => {
|
flushTimer = setInterval(async () => {
|
||||||
|
// Skip this tick if the previous flush is still in progress.
|
||||||
|
// setInterval does not await async callbacks, so without this guard
|
||||||
|
// two flush cycles can run concurrently and deadlock each other on
|
||||||
|
// overlapping bulk UPDATE statements.
|
||||||
|
if (isFlushing) {
|
||||||
|
logger.debug(
|
||||||
|
"Ping accumulator: previous flush still in progress, skipping cycle"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
isFlushing = true;
|
||||||
try {
|
try {
|
||||||
await flushPingsToDb();
|
await flushPingsToDb();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Unhandled error in ping accumulator flush", {
|
logger.error("Unhandled error in ping accumulator flush", {
|
||||||
error
|
error
|
||||||
});
|
});
|
||||||
|
} finally {
|
||||||
|
isFlushing = false;
|
||||||
}
|
}
|
||||||
}, FLUSH_INTERVAL_MS);
|
}, FLUSH_INTERVAL_MS);
|
||||||
|
|
||||||
@@ -364,7 +359,22 @@ export async function stopPingAccumulator(): Promise<void> {
|
|||||||
flushTimer = null;
|
flushTimer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Final flush to persist any remaining pings
|
// Final flush to persist any remaining pings.
|
||||||
|
// Wait for any in-progress flush to finish first so we don't race.
|
||||||
|
if (isFlushing) {
|
||||||
|
logger.debug(
|
||||||
|
"Ping accumulator: waiting for in-progress flush before stopping…"
|
||||||
|
);
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
const poll = setInterval(() => {
|
||||||
|
if (!isFlushing) {
|
||||||
|
clearInterval(poll);
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
}, 50);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await flushPingsToDb();
|
await flushPingsToDb();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
@@ -379,4 +389,4 @@ export async function stopPingAccumulator(): Promise<void> {
|
|||||||
*/
|
*/
|
||||||
export function getPendingPingCount(): number {
|
export function getPendingPingCount(): number {
|
||||||
return pendingSitePings.size + pendingClientPings.size;
|
return pendingSitePings.size + pendingClientPings.size;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user