diff --git a/server/cleanup.ts b/server/cleanup.ts index 137654827..7366bb876 100644 --- a/server/cleanup.ts +++ b/server/cleanup.ts @@ -1,9 +1,11 @@ import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; +import { flushConnectionLogToDb } from "@server/routers/newt/handleConnectionLogMessage"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; import { cleanup as wsCleanup } from "#dynamic/routers/ws"; async function cleanup() { await flushBandwidthToDb(); + await flushConnectionLogToDb(); await flushSiteBandwidthToDb(); await wsCleanup(); @@ -14,4 +16,4 @@ export async function initCleanup() { // Handle process termination process.on("SIGTERM", () => cleanup()); process.on("SIGINT", () => cleanup()); -} \ No newline at end of file +} diff --git a/server/db/pg/schema/privateSchema.ts b/server/db/pg/schema/privateSchema.ts index c9d7cc907..8fed6462a 100644 --- a/server/db/pg/schema/privateSchema.ts +++ b/server/db/pg/schema/privateSchema.ts @@ -17,7 +17,9 @@ import { users, exitNodes, sessions, - clients + clients, + siteResources, + sites } from "./schema"; export const certificates = pgTable("certificates", { @@ -302,6 +304,39 @@ export const accessAuditLog = pgTable( ] ); +export const connectionAuditLog = pgTable( + "connectionAuditLog", + { + id: serial("id").primaryKey(), + sessionId: text("sessionId").notNull(), + siteResourceId: integer("siteResourceId").references( + () => siteResources.siteResourceId, + { onDelete: "cascade" } + ), + orgId: text("orgId").references(() => orgs.orgId, { + onDelete: "cascade" + }), + siteId: integer("siteId").references(() => sites.siteId, { + onDelete: "cascade" + }), + sourceAddr: text("sourceAddr").notNull(), + destAddr: text("destAddr").notNull(), + protocol: text("protocol").notNull(), + startedAt: integer("startedAt").notNull(), + endedAt: integer("endedAt"), + bytesTx: integer("bytesTx"), + bytesRx: integer("bytesRx") + }, + (table) => [ + index("idx_accessAuditLog_startedAt").on(table.startedAt), + index("idx_accessAuditLog_org_startedAt").on( + table.orgId, + table.startedAt + ), + index("idx_accessAuditLog_siteResourceId").on(table.siteResourceId) + ] +); + export const approvals = pgTable("approvals", { approvalId: serial("approvalId").primaryKey(), timestamp: integer("timestamp").notNull(), // this is EPOCH time in seconds @@ -357,3 +392,4 @@ export type LoginPage = InferSelectModel; export type LoginPageBranding = InferSelectModel; export type ActionAuditLog = InferSelectModel; export type AccessAuditLog = InferSelectModel; +export type ConnectionAuditLog = InferSelectModel; diff --git a/server/db/sqlite/schema/privateSchema.ts b/server/db/sqlite/schema/privateSchema.ts index 8baeb5220..ecc386ea6 100644 --- a/server/db/sqlite/schema/privateSchema.ts +++ b/server/db/sqlite/schema/privateSchema.ts @@ -6,7 +6,7 @@ import { sqliteTable, text } from "drizzle-orm/sqlite-core"; -import { clients, domains, exitNodes, orgs, sessions, users } from "./schema"; +import { clients, domains, exitNodes, orgs, sessions, siteResources, sites, users } from "./schema"; export const certificates = sqliteTable("certificates", { certId: integer("certId").primaryKey({ autoIncrement: true }), @@ -294,6 +294,39 @@ export const accessAuditLog = sqliteTable( ] ); +export const connectionAuditLog = sqliteTable( + "connectionAuditLog", + { + id: integer("id").primaryKey({ autoIncrement: true }), + sessionId: text("sessionId").notNull(), + siteResourceId: integer("siteResourceId").references( + () => siteResources.siteResourceId, + { onDelete: "cascade" } + ), + orgId: text("orgId").references(() => orgs.orgId, { + onDelete: "cascade" + }), + siteId: integer("siteId").references(() => sites.siteId, { + onDelete: "cascade" + }), + sourceAddr: text("sourceAddr").notNull(), + destAddr: text("destAddr").notNull(), + protocol: text("protocol").notNull(), + startedAt: integer("startedAt").notNull(), + endedAt: integer("endedAt"), + bytesTx: integer("bytesTx"), + bytesRx: integer("bytesRx") + }, + (table) => [ + index("idx_accessAuditLog_startedAt").on(table.startedAt), + index("idx_accessAuditLog_org_startedAt").on( + table.orgId, + table.startedAt + ), + index("idx_accessAuditLog_siteResourceId").on(table.siteResourceId) + ] +); + export const approvals = sqliteTable("approvals", { approvalId: integer("approvalId").primaryKey({ autoIncrement: true }), timestamp: integer("timestamp").notNull(), // this is EPOCH time in seconds @@ -348,3 +381,4 @@ export type LoginPage = InferSelectModel; export type LoginPageBranding = InferSelectModel; export type ActionAuditLog = InferSelectModel; export type AccessAuditLog = InferSelectModel; +export type ConnectionAuditLog = InferSelectModel; diff --git a/server/private/cleanup.ts b/server/private/cleanup.ts index 0bd9822dd..933c943ed 100644 --- a/server/private/cleanup.ts +++ b/server/private/cleanup.ts @@ -14,10 +14,12 @@ import { rateLimitService } from "#private/lib/rateLimit"; import { cleanup as wsCleanup } from "#private/routers/ws"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; +import { flushConnectionLogToDb } from "@server/routers/newt/handleConnectionLogMessage"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; async function cleanup() { await flushBandwidthToDb(); + await flushConnectionLogToDb(); await flushSiteBandwidthToDb(); await rateLimitService.cleanup(); await wsCleanup(); @@ -29,4 +31,4 @@ export async function initCleanup() { // Handle process termination process.on("SIGTERM", () => cleanup()); process.on("SIGINT", () => cleanup()); -} \ No newline at end of file +} diff --git a/server/private/routers/ws/messageHandlers.ts b/server/private/routers/ws/messageHandlers.ts index d388ce40a..9e4622e2d 100644 --- a/server/private/routers/ws/messageHandlers.ts +++ b/server/private/routers/ws/messageHandlers.ts @@ -18,10 +18,12 @@ import { } from "#private/routers/remoteExitNode"; import { MessageHandler } from "@server/routers/ws"; import { build } from "@server/build"; +import { handleConnectionLogMessage } from "@server/routers/newt"; export const messageHandlers: Record = { "remoteExitNode/register": handleRemoteExitNodeRegisterMessage, - "remoteExitNode/ping": handleRemoteExitNodePingMessage + "remoteExitNode/ping": handleRemoteExitNodePingMessage, + "newt/access-log": handleConnectionLogMessage, }; if (build != "saas") { diff --git a/server/routers/newt/handleConnectionLogMessage.ts b/server/routers/newt/handleConnectionLogMessage.ts new file mode 100644 index 000000000..458470af7 --- /dev/null +++ b/server/routers/newt/handleConnectionLogMessage.ts @@ -0,0 +1,302 @@ +import { db } from "@server/db"; +import { MessageHandler } from "@server/routers/ws"; +import { connectionAuditLog, sites, Newt } from "@server/db"; +import { eq } from "drizzle-orm"; +import logger from "@server/logger"; +import { inflate } from "zlib"; +import { promisify } from "util"; + +const zlibInflate = promisify(inflate); + +// Retry configuration for deadlock handling +const MAX_RETRIES = 3; +const BASE_DELAY_MS = 50; + +// How often to flush accumulated connection log data to the database +const FLUSH_INTERVAL_MS = 30_000; // 30 seconds + +// Maximum number of records to buffer before forcing a flush +const MAX_BUFFERED_RECORDS = 500; + +// Maximum number of records to insert in a single batch +const INSERT_BATCH_SIZE = 100; + +interface ConnectionSessionData { + sessionId: string; + resourceId: number; + sourceAddr: string; + destAddr: string; + protocol: string; + startedAt: string; // ISO 8601 timestamp + endedAt?: string; // ISO 8601 timestamp + bytesTx?: number; + bytesRx?: number; +} + +interface ConnectionLogRecord { + sessionId: string; + siteResourceId: number; + orgId: string; + siteId: number; + sourceAddr: string; + destAddr: string; + protocol: string; + startedAt: number; // epoch seconds + endedAt: number | null; + bytesTx: number | null; + bytesRx: number | null; +} + +// In-memory buffer of records waiting to be flushed +let buffer: ConnectionLogRecord[] = []; + +/** + * Check if an error is a deadlock error + */ +function isDeadlockError(error: any): boolean { + return ( + error?.code === "40P01" || + error?.cause?.code === "40P01" || + (error?.message && error.message.includes("deadlock")) + ); +} + +/** + * Execute a function with retry logic for deadlock handling + */ +async function withDeadlockRetry( + operation: () => Promise, + context: string +): Promise { + let attempt = 0; + while (true) { + try { + return await operation(); + } catch (error: any) { + if (isDeadlockError(error) && attempt < MAX_RETRIES) { + attempt++; + const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; + const jitter = Math.random() * baseDelay; + const delay = baseDelay + jitter; + logger.warn( + `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + throw error; + } + } +} + +/** + * Decompress a base64-encoded zlib-compressed string into parsed JSON. + */ +async function decompressConnectionLog( + compressed: string +): Promise { + const compressedBuffer = Buffer.from(compressed, "base64"); + const decompressed = await zlibInflate(compressedBuffer); + const jsonString = decompressed.toString("utf-8"); + const parsed = JSON.parse(jsonString); + + if (!Array.isArray(parsed)) { + throw new Error("Decompressed connection log data is not an array"); + } + + return parsed; +} + +/** + * Convert an ISO 8601 timestamp string to epoch seconds. + * Returns null if the input is falsy. + */ +function toEpochSeconds(isoString: string | undefined | null): number | null { + if (!isoString) { + return null; + } + const ms = new Date(isoString).getTime(); + if (isNaN(ms)) { + return null; + } + return Math.floor(ms / 1000); +} + +/** + * Flush all buffered connection log records to the database. + * + * Swaps out the buffer before writing so that any records added during the + * flush are captured in the new buffer rather than being lost. Entries that + * fail to write are re-queued back into the buffer so they will be retried + * on the next flush. + * + * This function is exported so that the application's graceful-shutdown + * cleanup handler can call it before the process exits. + */ +export async function flushConnectionLogToDb(): Promise { + if (buffer.length === 0) { + return; + } + + // Atomically swap out the buffer so new data keeps flowing in + const snapshot = buffer; + buffer = []; + + logger.debug( + `Flushing ${snapshot.length} connection log record(s) to the database` + ); + + // Insert in batches to avoid overly large SQL statements + for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) { + const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE); + + try { + await withDeadlockRetry(async () => { + await db.insert(connectionAuditLog).values(batch); + }, `flush connection log batch (${batch.length} records)`); + } catch (error) { + logger.error( + `Failed to flush connection log batch of ${batch.length} records:`, + error + ); + + // Re-queue the failed batch so it is retried on the next flush + buffer = [...batch, ...buffer]; + + // Cap buffer to prevent unbounded growth if DB is unreachable + if (buffer.length > MAX_BUFFERED_RECORDS * 5) { + const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5; + buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5); + logger.warn( + `Connection log buffer overflow, dropped ${dropped} oldest records` + ); + } + + // Stop trying further batches from this snapshot — they'll be + // picked up by the next flush via the re-queued records above + const remaining = snapshot.slice(i + INSERT_BATCH_SIZE); + if (remaining.length > 0) { + buffer = [...remaining, ...buffer]; + } + break; + } + } +} + +const flushTimer = setInterval(async () => { + try { + await flushConnectionLogToDb(); + } catch (error) { + logger.error( + "Unexpected error during periodic connection log flush:", + error + ); + } +}, FLUSH_INTERVAL_MS); + +// Calling unref() means this timer will not keep the Node.js event loop alive +// on its own — the process can still exit normally when there is no other work +// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly +// before process.exit(), so no data is lost. +flushTimer.unref(); + +export const handleConnectionLogMessage: MessageHandler = async (context) => { + const { message, client } = context; + const newt = client as Newt; + + if (!newt) { + logger.warn("Connection log received but no newt client in context"); + return; + } + + if (!newt.siteId) { + logger.warn("Connection log received but newt has no siteId"); + return; + } + + if (!message.data?.compressed) { + logger.warn("Connection log message missing compressed data"); + return; + } + + // Look up the org for this site + const [site] = await db + .select({ orgId: sites.orgId }) + .from(sites) + .where(eq(sites.siteId, newt.siteId)); + + if (!site) { + logger.warn( + `Connection log received but site ${newt.siteId} not found in database` + ); + return; + } + + const orgId = site.orgId; + + let sessions: ConnectionSessionData[]; + try { + sessions = await decompressConnectionLog(message.data.compressed); + } catch (error) { + logger.error("Failed to decompress connection log data:", error); + return; + } + + if (sessions.length === 0) { + return; + } + + // Convert to DB records and add to the buffer + for (const session of sessions) { + // Validate required fields + if ( + !session.sessionId || + !session.resourceId || + !session.sourceAddr || + !session.destAddr || + !session.protocol + ) { + logger.debug( + `Skipping connection log session with missing required fields: ${JSON.stringify(session)}` + ); + continue; + } + + const startedAt = toEpochSeconds(session.startedAt); + if (startedAt === null) { + logger.debug( + `Skipping connection log session with invalid startedAt: ${session.startedAt}` + ); + continue; + } + + buffer.push({ + sessionId: session.sessionId, + siteResourceId: session.resourceId, + orgId, + siteId: newt.siteId, + sourceAddr: session.sourceAddr, + destAddr: session.destAddr, + protocol: session.protocol, + startedAt, + endedAt: toEpochSeconds(session.endedAt), + bytesTx: session.bytesTx ?? null, + bytesRx: session.bytesRx ?? null + }); + } + + logger.debug( + `Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})` + ); + + // If the buffer has grown large enough, trigger an immediate flush + if (buffer.length >= MAX_BUFFERED_RECORDS) { + // Fire and forget — errors are handled inside flushConnectionLogToDb + flushConnectionLogToDb().catch((error) => { + logger.error( + "Unexpected error during size-triggered connection log flush:", + error + ); + }); + } +}; diff --git a/server/routers/newt/index.ts b/server/routers/newt/index.ts index f31cd753b..63d1e1068 100644 --- a/server/routers/newt/index.ts +++ b/server/routers/newt/index.ts @@ -8,3 +8,4 @@ export * from "./handleNewtPingRequestMessage"; export * from "./handleApplyBlueprintMessage"; export * from "./handleNewtPingMessage"; export * from "./handleNewtDisconnectingMessage"; +export * from "./handleConnectionLogMessage";