make private

This commit is contained in:
Owen
2026-03-23 17:23:51 -07:00
parent 7d8797840a
commit 0d4edcd1c7
9 changed files with 354 additions and 300 deletions

View File

@@ -1,5 +1,5 @@
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
import { flushConnectionLogToDb } from "@server/routers/newt/handleConnectionLogMessage"; import { flushConnectionLogToDb } from "#dynamic/routers/newt";
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
import { cleanup as wsCleanup } from "#dynamic/routers/ws"; import { cleanup as wsCleanup } from "#dynamic/routers/ws";

View File

@@ -55,6 +55,9 @@ export const orgs = pgTable("orgs", {
settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
.notNull() .notNull()
.default(0), .default(0),
settingsLogRetentionDaysConnection: integer("settingsLogRetentionDaysConnection") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
.notNull()
.default(0),
sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format) sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format)
sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format) sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format)
isBillingOrg: boolean("isBillingOrg"), isBillingOrg: boolean("isBillingOrg"),

View File

@@ -47,6 +47,9 @@ export const orgs = sqliteTable("orgs", {
settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
.notNull() .notNull()
.default(0), .default(0),
settingsLogRetentionDaysConnection: integer("settingsLogRetentionDaysConnection") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
.notNull()
.default(0),
sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format) sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format)
sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format) sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format)
isBillingOrg: integer("isBillingOrg", { mode: "boolean" }), isBillingOrg: integer("isBillingOrg", { mode: "boolean" }),

View File

@@ -2,6 +2,7 @@ import { db, orgs } from "@server/db";
import { cleanUpOldLogs as cleanUpOldAccessLogs } from "#dynamic/lib/logAccessAudit"; import { cleanUpOldLogs as cleanUpOldAccessLogs } from "#dynamic/lib/logAccessAudit";
import { cleanUpOldLogs as cleanUpOldActionLogs } from "#dynamic/middlewares/logActionAudit"; import { cleanUpOldLogs as cleanUpOldActionLogs } from "#dynamic/middlewares/logActionAudit";
import { cleanUpOldLogs as cleanUpOldRequestLogs } from "@server/routers/badger/logRequestAudit"; import { cleanUpOldLogs as cleanUpOldRequestLogs } from "@server/routers/badger/logRequestAudit";
import { cleanUpOldLogs as cleanUpOldConnectionLogs } from "#dynamic/routers/newt";
import { gt, or } from "drizzle-orm"; import { gt, or } from "drizzle-orm";
import { cleanUpOldFingerprintSnapshots } from "@server/routers/olm/fingerprintingUtils"; import { cleanUpOldFingerprintSnapshots } from "@server/routers/olm/fingerprintingUtils";
import { build } from "@server/build"; import { build } from "@server/build";
@@ -20,14 +21,17 @@ export function initLogCleanupInterval() {
settingsLogRetentionDaysAccess: settingsLogRetentionDaysAccess:
orgs.settingsLogRetentionDaysAccess, orgs.settingsLogRetentionDaysAccess,
settingsLogRetentionDaysRequest: settingsLogRetentionDaysRequest:
orgs.settingsLogRetentionDaysRequest orgs.settingsLogRetentionDaysRequest,
settingsLogRetentionDaysConnection:
orgs.settingsLogRetentionDaysConnection
}) })
.from(orgs) .from(orgs)
.where( .where(
or( or(
gt(orgs.settingsLogRetentionDaysAction, 0), gt(orgs.settingsLogRetentionDaysAction, 0),
gt(orgs.settingsLogRetentionDaysAccess, 0), gt(orgs.settingsLogRetentionDaysAccess, 0),
gt(orgs.settingsLogRetentionDaysRequest, 0) gt(orgs.settingsLogRetentionDaysRequest, 0),
gt(orgs.settingsLogRetentionDaysConnection, 0)
) )
); );
@@ -37,7 +41,8 @@ export function initLogCleanupInterval() {
orgId, orgId,
settingsLogRetentionDaysAction, settingsLogRetentionDaysAction,
settingsLogRetentionDaysAccess, settingsLogRetentionDaysAccess,
settingsLogRetentionDaysRequest settingsLogRetentionDaysRequest,
settingsLogRetentionDaysConnection
} = org; } = org;
if (settingsLogRetentionDaysAction > 0) { if (settingsLogRetentionDaysAction > 0) {
@@ -60,6 +65,13 @@ export function initLogCleanupInterval() {
settingsLogRetentionDaysRequest settingsLogRetentionDaysRequest
); );
} }
if (settingsLogRetentionDaysConnection > 0) {
await cleanUpOldConnectionLogs(
orgId,
settingsLogRetentionDaysConnection
);
}
} }
await cleanUpOldFingerprintSnapshots(365); await cleanUpOldFingerprintSnapshots(365);

View File

@@ -14,7 +14,7 @@
import { rateLimitService } from "#private/lib/rateLimit"; import { rateLimitService } from "#private/lib/rateLimit";
import { cleanup as wsCleanup } from "#private/routers/ws"; import { cleanup as wsCleanup } from "#private/routers/ws";
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
import { flushConnectionLogToDb } from "@server/routers/newt/handleConnectionLogMessage"; import { flushConnectionLogToDb } from "#dynamic/routers/newt";
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
async function cleanup() { async function cleanup() {

View File

@@ -0,0 +1,324 @@
import { db, logsDb } from "@server/db";
import { MessageHandler } from "@server/routers/ws";
import { connectionAuditLog, sites, Newt } from "@server/db";
import { and, eq, lt } from "drizzle-orm";
import logger from "@server/logger";
import { inflate } from "zlib";
import { promisify } from "util";
import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs";
const zlibInflate = promisify(inflate);
// Retry configuration for deadlock handling
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50;
// How often to flush accumulated connection log data to the database
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
// Maximum number of records to buffer before forcing a flush
const MAX_BUFFERED_RECORDS = 500;
// Maximum number of records to insert in a single batch
const INSERT_BATCH_SIZE = 100;
interface ConnectionSessionData {
sessionId: string;
resourceId: number;
sourceAddr: string;
destAddr: string;
protocol: string;
startedAt: string; // ISO 8601 timestamp
endedAt?: string; // ISO 8601 timestamp
bytesTx?: number;
bytesRx?: number;
}
interface ConnectionLogRecord {
sessionId: string;
siteResourceId: number;
orgId: string;
siteId: number;
sourceAddr: string;
destAddr: string;
protocol: string;
startedAt: number; // epoch seconds
endedAt: number | null;
bytesTx: number | null;
bytesRx: number | null;
}
// In-memory buffer of records waiting to be flushed
let buffer: ConnectionLogRecord[] = [];
/**
* Check if an error is a deadlock error
*/
function isDeadlockError(error: any): boolean {
return (
error?.code === "40P01" ||
error?.cause?.code === "40P01" ||
(error?.message && error.message.includes("deadlock"))
);
}
/**
* Execute a function with retry logic for deadlock handling
*/
async function withDeadlockRetry<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isDeadlockError(error) && attempt < MAX_RETRIES) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}
/**
* Decompress a base64-encoded zlib-compressed string into parsed JSON.
*/
async function decompressConnectionLog(
compressed: string
): Promise<ConnectionSessionData[]> {
const compressedBuffer = Buffer.from(compressed, "base64");
const decompressed = await zlibInflate(compressedBuffer);
const jsonString = decompressed.toString("utf-8");
const parsed = JSON.parse(jsonString);
if (!Array.isArray(parsed)) {
throw new Error("Decompressed connection log data is not an array");
}
return parsed;
}
/**
* Convert an ISO 8601 timestamp string to epoch seconds.
* Returns null if the input is falsy.
*/
function toEpochSeconds(isoString: string | undefined | null): number | null {
if (!isoString) {
return null;
}
const ms = new Date(isoString).getTime();
if (isNaN(ms)) {
return null;
}
return Math.floor(ms / 1000);
}
/**
* Flush all buffered connection log records to the database.
*
* Swaps out the buffer before writing so that any records added during the
* flush are captured in the new buffer rather than being lost. Entries that
* fail to write are re-queued back into the buffer so they will be retried
* on the next flush.
*
* This function is exported so that the application's graceful-shutdown
* cleanup handler can call it before the process exits.
*/
export async function flushConnectionLogToDb(): Promise<void> {
if (buffer.length === 0) {
return;
}
// Atomically swap out the buffer so new data keeps flowing in
const snapshot = buffer;
buffer = [];
logger.debug(
`Flushing ${snapshot.length} connection log record(s) to the database`
);
// Insert in batches to avoid overly large SQL statements
for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) {
const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE);
try {
await withDeadlockRetry(async () => {
await logsDb.insert(connectionAuditLog).values(batch);
}, `flush connection log batch (${batch.length} records)`);
} catch (error) {
logger.error(
`Failed to flush connection log batch of ${batch.length} records:`,
error
);
// Re-queue the failed batch so it is retried on the next flush
buffer = [...batch, ...buffer];
// Cap buffer to prevent unbounded growth if DB is unreachable
if (buffer.length > MAX_BUFFERED_RECORDS * 5) {
const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5;
buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5);
logger.warn(
`Connection log buffer overflow, dropped ${dropped} oldest records`
);
}
// Stop trying further batches from this snapshot — they'll be
// picked up by the next flush via the re-queued records above
const remaining = snapshot.slice(i + INSERT_BATCH_SIZE);
if (remaining.length > 0) {
buffer = [...remaining, ...buffer];
}
break;
}
}
}
const flushTimer = setInterval(async () => {
try {
await flushConnectionLogToDb();
} catch (error) {
logger.error(
"Unexpected error during periodic connection log flush:",
error
);
}
}, FLUSH_INTERVAL_MS);
// Calling unref() means this timer will not keep the Node.js event loop alive
// on its own — the process can still exit normally when there is no other work
// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly
// before process.exit(), so no data is lost.
flushTimer.unref();
export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
try {
await logsDb
.delete(connectionAuditLog)
.where(
and(
lt(connectionAuditLog.startedAt, cutoffTimestamp),
eq(connectionAuditLog.orgId, orgId)
)
);
// logger.debug(
// `Cleaned up connection audit logs older than ${retentionDays} days`
// );
} catch (error) {
logger.error("Error cleaning up old connection audit logs:", error);
}
}
export const handleConnectionLogMessage: MessageHandler = async (context) => {
const { message, client } = context;
const newt = client as Newt;
if (!newt) {
logger.warn("Connection log received but no newt client in context");
return;
}
if (!newt.siteId) {
logger.warn("Connection log received but newt has no siteId");
return;
}
if (!message.data?.compressed) {
logger.warn("Connection log message missing compressed data");
return;
}
// Look up the org for this site
const [site] = await db
.select({ orgId: sites.orgId })
.from(sites)
.where(eq(sites.siteId, newt.siteId));
if (!site) {
logger.warn(
`Connection log received but site ${newt.siteId} not found in database`
);
return;
}
const orgId = site.orgId;
let sessions: ConnectionSessionData[];
try {
sessions = await decompressConnectionLog(message.data.compressed);
} catch (error) {
logger.error("Failed to decompress connection log data:", error);
return;
}
if (sessions.length === 0) {
return;
}
// Convert to DB records and add to the buffer
for (const session of sessions) {
// Validate required fields
if (
!session.sessionId ||
!session.resourceId ||
!session.sourceAddr ||
!session.destAddr ||
!session.protocol
) {
logger.debug(
`Skipping connection log session with missing required fields: ${JSON.stringify(session)}`
);
continue;
}
const startedAt = toEpochSeconds(session.startedAt);
if (startedAt === null) {
logger.debug(
`Skipping connection log session with invalid startedAt: ${session.startedAt}`
);
continue;
}
buffer.push({
sessionId: session.sessionId,
siteResourceId: session.resourceId,
orgId,
siteId: newt.siteId,
sourceAddr: session.sourceAddr,
destAddr: session.destAddr,
protocol: session.protocol,
startedAt,
endedAt: toEpochSeconds(session.endedAt),
bytesTx: session.bytesTx ?? null,
bytesRx: session.bytesRx ?? null
});
}
logger.debug(
`Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})`
);
// If the buffer has grown large enough, trigger an immediate flush
if (buffer.length >= MAX_BUFFERED_RECORDS) {
// Fire and forget — errors are handled inside flushConnectionLogToDb
flushConnectionLogToDb().catch((error) => {
logger.error(
"Unexpected error during size-triggered connection log flush:",
error
);
});
}
};

View File

@@ -0,0 +1 @@
export * from "./handleConnectionLogMessage";

View File

@@ -18,7 +18,7 @@ import {
} from "#private/routers/remoteExitNode"; } from "#private/routers/remoteExitNode";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { build } from "@server/build"; import { build } from "@server/build";
import { handleConnectionLogMessage } from "@server/routers/newt"; import { handleConnectionLogMessage } from "#dynamic/routers/newt";
export const messageHandlers: Record<string, MessageHandler> = { export const messageHandlers: Record<string, MessageHandler> = {
"remoteExitNode/register": handleRemoteExitNodeRegisterMessage, "remoteExitNode/register": handleRemoteExitNodeRegisterMessage,

View File

@@ -1,302 +1,13 @@
import { db } from "@server/db";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { connectionAuditLog, sites, Newt } from "@server/db";
import { eq } from "drizzle-orm";
import logger from "@server/logger";
import { inflate } from "zlib";
import { promisify } from "util";
const zlibInflate = promisify(inflate);
// Retry configuration for deadlock handling
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50;
// How often to flush accumulated connection log data to the database
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
// Maximum number of records to buffer before forcing a flush
const MAX_BUFFERED_RECORDS = 500;
// Maximum number of records to insert in a single batch
const INSERT_BATCH_SIZE = 100;
interface ConnectionSessionData {
sessionId: string;
resourceId: number;
sourceAddr: string;
destAddr: string;
protocol: string;
startedAt: string; // ISO 8601 timestamp
endedAt?: string; // ISO 8601 timestamp
bytesTx?: number;
bytesRx?: number;
}
interface ConnectionLogRecord {
sessionId: string;
siteResourceId: number;
orgId: string;
siteId: number;
sourceAddr: string;
destAddr: string;
protocol: string;
startedAt: number; // epoch seconds
endedAt: number | null;
bytesTx: number | null;
bytesRx: number | null;
}
// In-memory buffer of records waiting to be flushed
let buffer: ConnectionLogRecord[] = [];
/**
* Check if an error is a deadlock error
*/
function isDeadlockError(error: any): boolean {
return (
error?.code === "40P01" ||
error?.cause?.code === "40P01" ||
(error?.message && error.message.includes("deadlock"))
);
}
/**
* Execute a function with retry logic for deadlock handling
*/
async function withDeadlockRetry<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isDeadlockError(error) && attempt < MAX_RETRIES) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}
/**
* Decompress a base64-encoded zlib-compressed string into parsed JSON.
*/
async function decompressConnectionLog(
compressed: string
): Promise<ConnectionSessionData[]> {
const compressedBuffer = Buffer.from(compressed, "base64");
const decompressed = await zlibInflate(compressedBuffer);
const jsonString = decompressed.toString("utf-8");
const parsed = JSON.parse(jsonString);
if (!Array.isArray(parsed)) {
throw new Error("Decompressed connection log data is not an array");
}
return parsed;
}
/**
* Convert an ISO 8601 timestamp string to epoch seconds.
* Returns null if the input is falsy.
*/
function toEpochSeconds(isoString: string | undefined | null): number | null {
if (!isoString) {
return null;
}
const ms = new Date(isoString).getTime();
if (isNaN(ms)) {
return null;
}
return Math.floor(ms / 1000);
}
/**
* Flush all buffered connection log records to the database.
*
* Swaps out the buffer before writing so that any records added during the
* flush are captured in the new buffer rather than being lost. Entries that
* fail to write are re-queued back into the buffer so they will be retried
* on the next flush.
*
* This function is exported so that the application's graceful-shutdown
* cleanup handler can call it before the process exits.
*/
export async function flushConnectionLogToDb(): Promise<void> { export async function flushConnectionLogToDb(): Promise<void> {
if (buffer.length === 0) { return;
return;
}
// Atomically swap out the buffer so new data keeps flowing in
const snapshot = buffer;
buffer = [];
logger.debug(
`Flushing ${snapshot.length} connection log record(s) to the database`
);
// Insert in batches to avoid overly large SQL statements
for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) {
const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE);
try {
await withDeadlockRetry(async () => {
await db.insert(connectionAuditLog).values(batch);
}, `flush connection log batch (${batch.length} records)`);
} catch (error) {
logger.error(
`Failed to flush connection log batch of ${batch.length} records:`,
error
);
// Re-queue the failed batch so it is retried on the next flush
buffer = [...batch, ...buffer];
// Cap buffer to prevent unbounded growth if DB is unreachable
if (buffer.length > MAX_BUFFERED_RECORDS * 5) {
const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5;
buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5);
logger.warn(
`Connection log buffer overflow, dropped ${dropped} oldest records`
);
}
// Stop trying further batches from this snapshot — they'll be
// picked up by the next flush via the re-queued records above
const remaining = snapshot.slice(i + INSERT_BATCH_SIZE);
if (remaining.length > 0) {
buffer = [...remaining, ...buffer];
}
break;
}
}
} }
const flushTimer = setInterval(async () => { export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
try { return;
await flushConnectionLogToDb(); }
} catch (error) {
logger.error(
"Unexpected error during periodic connection log flush:",
error
);
}
}, FLUSH_INTERVAL_MS);
// Calling unref() means this timer will not keep the Node.js event loop alive
// on its own — the process can still exit normally when there is no other work
// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly
// before process.exit(), so no data is lost.
flushTimer.unref();
export const handleConnectionLogMessage: MessageHandler = async (context) => { export const handleConnectionLogMessage: MessageHandler = async (context) => {
const { message, client } = context; return;
const newt = client as Newt;
if (!newt) {
logger.warn("Connection log received but no newt client in context");
return;
}
if (!newt.siteId) {
logger.warn("Connection log received but newt has no siteId");
return;
}
if (!message.data?.compressed) {
logger.warn("Connection log message missing compressed data");
return;
}
// Look up the org for this site
const [site] = await db
.select({ orgId: sites.orgId })
.from(sites)
.where(eq(sites.siteId, newt.siteId));
if (!site) {
logger.warn(
`Connection log received but site ${newt.siteId} not found in database`
);
return;
}
const orgId = site.orgId;
let sessions: ConnectionSessionData[];
try {
sessions = await decompressConnectionLog(message.data.compressed);
} catch (error) {
logger.error("Failed to decompress connection log data:", error);
return;
}
if (sessions.length === 0) {
return;
}
// Convert to DB records and add to the buffer
for (const session of sessions) {
// Validate required fields
if (
!session.sessionId ||
!session.resourceId ||
!session.sourceAddr ||
!session.destAddr ||
!session.protocol
) {
logger.debug(
`Skipping connection log session with missing required fields: ${JSON.stringify(session)}`
);
continue;
}
const startedAt = toEpochSeconds(session.startedAt);
if (startedAt === null) {
logger.debug(
`Skipping connection log session with invalid startedAt: ${session.startedAt}`
);
continue;
}
buffer.push({
sessionId: session.sessionId,
siteResourceId: session.resourceId,
orgId,
siteId: newt.siteId,
sourceAddr: session.sourceAddr,
destAddr: session.destAddr,
protocol: session.protocol,
startedAt,
endedAt: toEpochSeconds(session.endedAt),
bytesTx: session.bytesTx ?? null,
bytesRx: session.bytesRx ?? null
});
}
logger.debug(
`Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})`
);
// If the buffer has grown large enough, trigger an immediate flush
if (buffer.length >= MAX_BUFFERED_RECORDS) {
// Fire and forget — errors are handled inside flushConnectionLogToDb
flushConnectionLogToDb().catch((error) => {
logger.error(
"Unexpected error during size-triggered connection log flush:",
error
);
});
}
}; };