diff --git a/server/private/lib/logConnectionAudit.ts b/server/private/lib/logConnectionAudit.ts new file mode 100644 index 000000000..c7e786280 --- /dev/null +++ b/server/private/lib/logConnectionAudit.ts @@ -0,0 +1,234 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { logsDb, connectionAuditLog } from "@server/db"; +import logger from "@server/logger"; +import { and, eq, lt } from "drizzle-orm"; +import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs"; + +// --------------------------------------------------------------------------- +// Retry configuration for deadlock handling +// --------------------------------------------------------------------------- + +const MAX_RETRIES = 3; +const BASE_DELAY_MS = 50; + +// --------------------------------------------------------------------------- +// Buffer / flush configuration +// --------------------------------------------------------------------------- + +/** How often to flush accumulated connection log data to the database. */ +const FLUSH_INTERVAL_MS = 30_000; // 30 seconds + +/** Maximum number of records to buffer before forcing a flush. */ +const MAX_BUFFERED_RECORDS = 500; + +/** Maximum number of records to insert in a single database batch. */ +const INSERT_BATCH_SIZE = 100; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export interface ConnectionLogRecord { + sessionId: string; + siteResourceId: number; + orgId: string; + siteId: number; + clientId: number | null; + userId: string | null; + sourceAddr: string; + destAddr: string; + protocol: string; + startedAt: number; // epoch seconds + endedAt: number | null; + bytesTx: number | null; + bytesRx: number | null; +} + +// --------------------------------------------------------------------------- +// In-memory buffer +// --------------------------------------------------------------------------- + +let buffer: ConnectionLogRecord[] = []; + +// --------------------------------------------------------------------------- +// Deadlock helpers +// --------------------------------------------------------------------------- + +function isDeadlockError(error: any): boolean { + return ( + error?.code === "40P01" || + error?.cause?.code === "40P01" || + (error?.message && error.message.includes("deadlock")) + ); +} + +async function withDeadlockRetry( + operation: () => Promise, + context: string +): Promise { + let attempt = 0; + while (true) { + try { + return await operation(); + } catch (error: any) { + if (isDeadlockError(error) && attempt < MAX_RETRIES) { + attempt++; + const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; + const jitter = Math.random() * baseDelay; + const delay = baseDelay + jitter; + logger.warn( + `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + throw error; + } + } +} + +// --------------------------------------------------------------------------- +// Flush +// --------------------------------------------------------------------------- + +/** + * Flush all buffered connection log records to the database. + * + * Swaps out the buffer before writing so that any records added during the + * flush are captured in the new buffer rather than being lost. Entries that + * fail to write are re-queued back into the buffer so they will be retried + * on the next flush. + * + * This function is exported so that the application's graceful-shutdown + * cleanup handler can call it before the process exits. + */ +export async function flushConnectionLogToDb(): Promise { + if (buffer.length === 0) { + return; + } + + // Atomically swap out the buffer so new data keeps flowing in + const snapshot = buffer; + buffer = []; + + logger.debug( + `Flushing ${snapshot.length} connection log record(s) to the database` + ); + + for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) { + const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE); + + try { + await withDeadlockRetry(async () => { + await logsDb.insert(connectionAuditLog).values(batch); + }, `flush connection log batch (${batch.length} records)`); + } catch (error) { + logger.error( + `Failed to flush connection log batch of ${batch.length} records:`, + error + ); + + // Re-queue the failed batch so it is retried on the next flush + buffer = [...batch, ...buffer]; + + // Cap buffer to prevent unbounded growth if the DB is unreachable + const hardLimit = MAX_BUFFERED_RECORDS * 5; + if (buffer.length > hardLimit) { + const dropped = buffer.length - hardLimit; + buffer = buffer.slice(0, hardLimit); + logger.warn( + `Connection log buffer overflow, dropped ${dropped} oldest records` + ); + } + + // Stop processing further batches from this snapshot — they will + // be picked up via the re-queued records on the next flush. + const remaining = snapshot.slice(i + INSERT_BATCH_SIZE); + if (remaining.length > 0) { + buffer = [...remaining, ...buffer]; + } + break; + } + } +} + +// --------------------------------------------------------------------------- +// Periodic flush timer +// --------------------------------------------------------------------------- + +const flushTimer = setInterval(async () => { + try { + await flushConnectionLogToDb(); + } catch (error) { + logger.error( + "Unexpected error during periodic connection log flush:", + error + ); + } +}, FLUSH_INTERVAL_MS); + +// Calling unref() means this timer will not keep the Node.js event loop alive +// on its own — the process can still exit normally when there is no other work +// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly +// before process.exit(), so no data is lost. +flushTimer.unref(); + +// --------------------------------------------------------------------------- +// Cleanup +// --------------------------------------------------------------------------- + +export async function cleanUpOldLogs( + orgId: string, + retentionDays: number +): Promise { + const cutoffTimestamp = calculateCutoffTimestamp(retentionDays); + + try { + await logsDb + .delete(connectionAuditLog) + .where( + and( + lt(connectionAuditLog.startedAt, cutoffTimestamp), + eq(connectionAuditLog.orgId, orgId) + ) + ); + } catch (error) { + logger.error("Error cleaning up old connection audit logs:", error); + } +} + +// --------------------------------------------------------------------------- +// Public logging entry-point +// --------------------------------------------------------------------------- + +/** + * Buffer a single connection log record for eventual persistence. + * + * Records are written to the database in batches either when the buffer + * reaches MAX_BUFFERED_RECORDS or when the periodic flush timer fires. + */ +export function logConnectionAudit(record: ConnectionLogRecord): void { + buffer.push(record); + + if (buffer.length >= MAX_BUFFERED_RECORDS) { + // Fire and forget — errors are handled inside flushConnectionLogToDb + flushConnectionLogToDb().catch((error) => { + logger.error( + "Unexpected error during size-triggered connection log flush:", + error + ); + }); + } +} \ No newline at end of file diff --git a/server/private/routers/newt/handleConnectionLogMessage.ts b/server/private/routers/newt/handleConnectionLogMessage.ts index 2ac7153b5..00fb5dada 100644 --- a/server/private/routers/newt/handleConnectionLogMessage.ts +++ b/server/private/routers/newt/handleConnectionLogMessage.ts @@ -1,27 +1,20 @@ -import { db, logsDb } from "@server/db"; +import { db } from "@server/db"; import { MessageHandler } from "@server/routers/ws"; -import { connectionAuditLog, sites, Newt, clients, orgs } from "@server/db"; -import { and, eq, lt, inArray } from "drizzle-orm"; +import { sites, Newt, clients, orgs } from "@server/db"; +import { and, eq, inArray } from "drizzle-orm"; import logger from "@server/logger"; import { inflate } from "zlib"; import { promisify } from "util"; -import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs"; +import { + logConnectionAudit, + flushConnectionLogToDb, + cleanUpOldLogs +} from "#private/lib/logConnectionAudit"; + +export { flushConnectionLogToDb, cleanUpOldLogs }; const zlibInflate = promisify(inflate); -// Retry configuration for deadlock handling -const MAX_RETRIES = 3; -const BASE_DELAY_MS = 50; - -// How often to flush accumulated connection log data to the database -const FLUSH_INTERVAL_MS = 30_000; // 30 seconds - -// Maximum number of records to buffer before forcing a flush -const MAX_BUFFERED_RECORDS = 500; - -// Maximum number of records to insert in a single batch -const INSERT_BATCH_SIZE = 100; - interface ConnectionSessionData { sessionId: string; resourceId: number; @@ -34,64 +27,6 @@ interface ConnectionSessionData { bytesRx?: number; } -interface ConnectionLogRecord { - sessionId: string; - siteResourceId: number; - orgId: string; - siteId: number; - clientId: number | null; - userId: string | null; - sourceAddr: string; - destAddr: string; - protocol: string; - startedAt: number; // epoch seconds - endedAt: number | null; - bytesTx: number | null; - bytesRx: number | null; -} - -// In-memory buffer of records waiting to be flushed -let buffer: ConnectionLogRecord[] = []; - -/** - * Check if an error is a deadlock error - */ -function isDeadlockError(error: any): boolean { - return ( - error?.code === "40P01" || - error?.cause?.code === "40P01" || - (error?.message && error.message.includes("deadlock")) - ); -} - -/** - * Execute a function with retry logic for deadlock handling - */ -async function withDeadlockRetry( - operation: () => Promise, - context: string -): Promise { - let attempt = 0; - while (true) { - try { - return await operation(); - } catch (error: any) { - if (isDeadlockError(error) && attempt < MAX_RETRIES) { - attempt++; - const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; - const jitter = Math.random() * baseDelay; - const delay = baseDelay + jitter; - logger.warn( - `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; - } - throw error; - } - } -} - /** * Decompress a base64-encoded zlib-compressed string into parsed JSON. */ @@ -125,105 +60,6 @@ function toEpochSeconds(isoString: string | undefined | null): number | null { return Math.floor(ms / 1000); } -/** - * Flush all buffered connection log records to the database. - * - * Swaps out the buffer before writing so that any records added during the - * flush are captured in the new buffer rather than being lost. Entries that - * fail to write are re-queued back into the buffer so they will be retried - * on the next flush. - * - * This function is exported so that the application's graceful-shutdown - * cleanup handler can call it before the process exits. - */ -export async function flushConnectionLogToDb(): Promise { - if (buffer.length === 0) { - return; - } - - // Atomically swap out the buffer so new data keeps flowing in - const snapshot = buffer; - buffer = []; - - logger.debug( - `Flushing ${snapshot.length} connection log record(s) to the database` - ); - - // Insert in batches to avoid overly large SQL statements - for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) { - const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE); - - try { - await withDeadlockRetry(async () => { - await logsDb.insert(connectionAuditLog).values(batch); - }, `flush connection log batch (${batch.length} records)`); - } catch (error) { - logger.error( - `Failed to flush connection log batch of ${batch.length} records:`, - error - ); - - // Re-queue the failed batch so it is retried on the next flush - buffer = [...batch, ...buffer]; - - // Cap buffer to prevent unbounded growth if DB is unreachable - if (buffer.length > MAX_BUFFERED_RECORDS * 5) { - const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5; - buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5); - logger.warn( - `Connection log buffer overflow, dropped ${dropped} oldest records` - ); - } - - // Stop trying further batches from this snapshot — they'll be - // picked up by the next flush via the re-queued records above - const remaining = snapshot.slice(i + INSERT_BATCH_SIZE); - if (remaining.length > 0) { - buffer = [...remaining, ...buffer]; - } - break; - } - } -} - -const flushTimer = setInterval(async () => { - try { - await flushConnectionLogToDb(); - } catch (error) { - logger.error( - "Unexpected error during periodic connection log flush:", - error - ); - } -}, FLUSH_INTERVAL_MS); - -// Calling unref() means this timer will not keep the Node.js event loop alive -// on its own — the process can still exit normally when there is no other work -// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly -// before process.exit(), so no data is lost. -flushTimer.unref(); - -export async function cleanUpOldLogs(orgId: string, retentionDays: number) { - const cutoffTimestamp = calculateCutoffTimestamp(retentionDays); - - try { - await logsDb - .delete(connectionAuditLog) - .where( - and( - lt(connectionAuditLog.startedAt, cutoffTimestamp), - eq(connectionAuditLog.orgId, orgId) - ) - ); - - // logger.debug( - // `Cleaned up connection audit logs older than ${retentionDays} days` - // ); - } catch (error) { - logger.error("Error cleaning up old connection audit logs:", error); - } -} - export const handleConnectionLogMessage: MessageHandler = async (context) => { const { message, client } = context; const newt = client as Newt; @@ -277,13 +113,16 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { return; } - logger.debug(`Sessions: ${JSON.stringify(sessions)}`) + logger.debug(`Sessions: ${JSON.stringify(sessions)}`); // Build a map from sourceAddr → { clientId, userId } by querying clients // whose subnet field matches exactly. Client subnets are stored with the // org's CIDR suffix (e.g. "100.90.128.5/16"), so we reconstruct that from // each unique sourceAddr + the org's CIDR suffix and do a targeted IN query. - const ipToClient = new Map(); + const ipToClient = new Map< + string, + { clientId: number; userId: string | null } + >(); if (cidrSuffix) { // Collect unique source addresses so we only query for what we need @@ -296,13 +135,11 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { if (uniqueSourceAddrs.size > 0) { // Construct the exact subnet strings as stored in the DB - const subnetQueries = Array.from(uniqueSourceAddrs).map( - (addr) => { - // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") - const ip = addr.includes(":") ? addr.split(":")[0] : addr; - return `${ip}${cidrSuffix}`; - } - ); + const subnetQueries = Array.from(uniqueSourceAddrs).map((addr) => { + // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") + const ip = addr.includes(":") ? addr.split(":")[0] : addr; + return `${ip}${cidrSuffix}`; + }); logger.debug(`Subnet queries: ${JSON.stringify(subnetQueries)}`); @@ -322,13 +159,18 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { for (const c of matchedClients) { const ip = c.subnet.split("/")[0]; - logger.debug(`Client ${c.clientId} subnet ${c.subnet} matches ${ip}`); - ipToClient.set(ip, { clientId: c.clientId, userId: c.userId }); + logger.debug( + `Client ${c.clientId} subnet ${c.subnet} matches ${ip}` + ); + ipToClient.set(ip, { + clientId: c.clientId, + userId: c.userId + }); } } } - // Convert to DB records and add to the buffer + // Convert to DB records and hand off to the audit logger for (const session of sessions) { // Validate required fields if ( @@ -356,11 +198,12 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { // client's IP on the WireGuard network, which corresponds to the IP // portion of the client's subnet CIDR (e.g. "100.90.128.5/24"). // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") - const sourceIp = session.sourceAddr.includes(":") ? session.sourceAddr.split(":")[0] : session.sourceAddr; + const sourceIp = session.sourceAddr.includes(":") + ? session.sourceAddr.split(":")[0] + : session.sourceAddr; const clientInfo = ipToClient.get(sourceIp) ?? null; - - buffer.push({ + logConnectionAudit({ sessionId: session.sessionId, siteResourceId: session.resourceId, orgId, @@ -380,15 +223,4 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { logger.debug( `Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})` ); - - // If the buffer has grown large enough, trigger an immediate flush - if (buffer.length >= MAX_BUFFERED_RECORDS) { - // Fire and forget — errors are handled inside flushConnectionLogToDb - flushConnectionLogToDb().catch((error) => { - logger.error( - "Unexpected error during size-triggered connection log flush:", - error - ); - }); - } };