From b622aca2214e293c94ceee62621bc6a8c6f3648f Mon Sep 17 00:00:00 2001 From: Owen Date: Fri, 20 Feb 2026 17:20:01 -0800 Subject: [PATCH] Try to route logs requests to a different database --- server/db/pg/index.ts | 1 + server/db/pg/logsDriver.ts | 89 +++++++++++++++++ server/db/sqlite/index.ts | 1 + server/db/sqlite/logsDriver.ts | 7 ++ server/lib/readConfigFile.ts | 40 ++++++++ server/private/lib/logAccessAudit.ts | 6 +- server/private/lib/readConfigFile.ts | 40 ++++++++ server/private/middlewares/logActionAudit.ts | 6 +- .../routers/auditLogs/queryAccessAuditLog.ts | 90 +++++++++++++---- .../routers/auditLogs/queryActionAuditLog.ts | 10 +- .../auditLogs/queryRequestAnalytics.ts | 10 +- .../routers/auditLogs/queryRequestAuditLog.ts | 98 ++++++++++++++----- server/routers/badger/logRequestAudit.ts | 6 +- 13 files changed, 338 insertions(+), 66 deletions(-) create mode 100644 server/db/pg/logsDriver.ts create mode 100644 server/db/sqlite/logsDriver.ts diff --git a/server/db/pg/index.ts b/server/db/pg/index.ts index 43e2650f..97257502 100644 --- a/server/db/pg/index.ts +++ b/server/db/pg/index.ts @@ -1,4 +1,5 @@ export * from "./driver"; +export * from "./logsDriver"; export * from "./schema/schema"; export * from "./schema/privateSchema"; export * from "./migrate"; diff --git a/server/db/pg/logsDriver.ts b/server/db/pg/logsDriver.ts new file mode 100644 index 00000000..3e4c2b7e --- /dev/null +++ b/server/db/pg/logsDriver.ts @@ -0,0 +1,89 @@ +import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres"; +import { Pool } from "pg"; +import { readConfigFile } from "@server/lib/readConfigFile"; +import { readPrivateConfigFile } from "@server/private/lib/readConfigFile"; +import { withReplicas } from "drizzle-orm/pg-core"; +import { build } from "@server/build"; +import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver"; + +function createLogsDb() { + // Only use separate logs database in SaaS builds + if (build !== "saas") { + return mainDb; + } + + const config = readConfigFile(); + const privateConfig = readPrivateConfigFile(); + + // Merge configs, prioritizing private config + const logsConfig = privateConfig.postgres_logs || config.postgres_logs; + + // Check environment variable first + let connectionString = process.env.POSTGRES_LOGS_CONNECTION_STRING; + let replicaConnections: Array<{ connection_string: string }> = []; + + if (!connectionString && logsConfig) { + connectionString = logsConfig.connection_string; + replicaConnections = logsConfig.replicas || []; + } + + // If POSTGRES_LOGS_REPLICA_CONNECTION_STRINGS is set, use it + if (process.env.POSTGRES_LOGS_REPLICA_CONNECTION_STRINGS) { + replicaConnections = + process.env.POSTGRES_LOGS_REPLICA_CONNECTION_STRINGS.split(",").map( + (conn) => ({ + connection_string: conn.trim() + }) + ); + } + + // If no logs database is configured, fall back to main database + if (!connectionString) { + return mainDb; + } + + // Create separate connection pool for logs database + const poolConfig = logsConfig?.pool || config.postgres?.pool; + const primaryPool = new Pool({ + connectionString, + max: poolConfig?.max_connections || 20, + idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, + connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000 + }); + + const replicas = []; + + if (!replicaConnections.length) { + replicas.push( + DrizzlePostgres(primaryPool, { + logger: process.env.QUERY_LOGGING == "true" + }) + ); + } else { + for (const conn of replicaConnections) { + const replicaPool = new Pool({ + connectionString: conn.connection_string, + max: poolConfig?.max_replica_connections || 20, + idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, + connectionTimeoutMillis: + poolConfig?.connection_timeout_ms || 5000 + }); + replicas.push( + DrizzlePostgres(replicaPool, { + logger: process.env.QUERY_LOGGING == "true" + }) + ); + } + } + + return withReplicas( + DrizzlePostgres(primaryPool, { + logger: process.env.QUERY_LOGGING == "true" + }), + replicas as any + ); +} + +export const logsDb = createLogsDb(); +export default logsDb; +export const primaryLogsDb = logsDb.$primary; \ No newline at end of file diff --git a/server/db/sqlite/index.ts b/server/db/sqlite/index.ts index 43e2650f..97257502 100644 --- a/server/db/sqlite/index.ts +++ b/server/db/sqlite/index.ts @@ -1,4 +1,5 @@ export * from "./driver"; +export * from "./logsDriver"; export * from "./schema/schema"; export * from "./schema/privateSchema"; export * from "./migrate"; diff --git a/server/db/sqlite/logsDriver.ts b/server/db/sqlite/logsDriver.ts new file mode 100644 index 00000000..f70c79fc --- /dev/null +++ b/server/db/sqlite/logsDriver.ts @@ -0,0 +1,7 @@ +import { db as mainDb } from "./driver"; + +// SQLite doesn't support separate databases for logs in the same way as Postgres +// Always use the main database connection for SQLite +export const logsDb = mainDb; +export default logsDb; +export const primaryLogsDb = logsDb; \ No newline at end of file diff --git a/server/lib/readConfigFile.ts b/server/lib/readConfigFile.ts index bfca5970..cca0aa6a 100644 --- a/server/lib/readConfigFile.ts +++ b/server/lib/readConfigFile.ts @@ -189,6 +189,46 @@ export const configSchema = z .prefault({}) }) .optional(), + postgres_logs: z + .object({ + connection_string: z + .string() + .optional() + .transform(getEnvOrYaml("POSTGRES_LOGS_CONNECTION_STRING")), + replicas: z + .array( + z.object({ + connection_string: z.string() + }) + ) + .optional(), + pool: z + .object({ + max_connections: z + .number() + .positive() + .optional() + .default(20), + max_replica_connections: z + .number() + .positive() + .optional() + .default(10), + idle_timeout_ms: z + .number() + .positive() + .optional() + .default(30000), + connection_timeout_ms: z + .number() + .positive() + .optional() + .default(5000) + }) + .optional() + .prefault({}) + }) + .optional(), traefik: z .object({ http_entrypoint: z.string().optional().default("web"), diff --git a/server/private/lib/logAccessAudit.ts b/server/private/lib/logAccessAudit.ts index 33dcaf1f..3024283b 100644 --- a/server/private/lib/logAccessAudit.ts +++ b/server/private/lib/logAccessAudit.ts @@ -11,7 +11,7 @@ * This file is not licensed under the AGPLv3. */ -import { accessAuditLog, db, orgs } from "@server/db"; +import { accessAuditLog, logsDb, db, orgs } from "@server/db"; import { getCountryCodeForIp } from "@server/lib/geoip"; import logger from "@server/logger"; import { and, eq, lt } from "drizzle-orm"; @@ -52,7 +52,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) { const cutoffTimestamp = calculateCutoffTimestamp(retentionDays); try { - await db + await logsDb .delete(accessAuditLog) .where( and( @@ -124,7 +124,7 @@ export async function logAccessAudit(data: { ? await getCountryCodeFromIp(data.requestIp) : undefined; - await db.insert(accessAuditLog).values({ + await logsDb.insert(accessAuditLog).values({ timestamp: timestamp, orgId: data.orgId, actorType, diff --git a/server/private/lib/readConfigFile.ts b/server/private/lib/readConfigFile.ts index e5efa498..18bfe811 100644 --- a/server/private/lib/readConfigFile.ts +++ b/server/private/lib/readConfigFile.ts @@ -83,6 +83,46 @@ export const privateConfigSchema = z.object({ // .optional() }) .optional(), + postgres_logs: z + .object({ + connection_string: z + .string() + .optional() + .transform(getEnvOrYaml("POSTGRES_LOGS_CONNECTION_STRING")), + replicas: z + .array( + z.object({ + connection_string: z.string() + }) + ) + .optional(), + pool: z + .object({ + max_connections: z + .number() + .positive() + .optional() + .default(20), + max_replica_connections: z + .number() + .positive() + .optional() + .default(10), + idle_timeout_ms: z + .number() + .positive() + .optional() + .default(30000), + connection_timeout_ms: z + .number() + .positive() + .optional() + .default(5000) + }) + .optional() + .prefault({}) + }) + .optional(), gerbil: z .object({ local_exit_node_reachable_at: z diff --git a/server/private/middlewares/logActionAudit.ts b/server/private/middlewares/logActionAudit.ts index 17cc67c0..ee74725c 100644 --- a/server/private/middlewares/logActionAudit.ts +++ b/server/private/middlewares/logActionAudit.ts @@ -12,7 +12,7 @@ */ import { ActionsEnum } from "@server/auth/actions"; -import { actionAuditLog, db, orgs } from "@server/db"; +import { actionAuditLog, logsDb, db, orgs } from "@server/db"; import logger from "@server/logger"; import HttpCode from "@server/types/HttpCode"; import { Request, Response, NextFunction } from "express"; @@ -54,7 +54,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) { const cutoffTimestamp = calculateCutoffTimestamp(retentionDays); try { - await db + await logsDb .delete(actionAuditLog) .where( and( @@ -123,7 +123,7 @@ export function logActionAudit(action: ActionsEnum) { metadata = JSON.stringify(req.params); } - await db.insert(actionAuditLog).values({ + await logsDb.insert(actionAuditLog).values({ timestamp, orgId, actorType, diff --git a/server/private/routers/auditLogs/queryAccessAuditLog.ts b/server/private/routers/auditLogs/queryAccessAuditLog.ts index 96d241fb..57bdf1b8 100644 --- a/server/private/routers/auditLogs/queryAccessAuditLog.ts +++ b/server/private/routers/auditLogs/queryAccessAuditLog.ts @@ -11,11 +11,11 @@ * This file is not licensed under the AGPLv3. */ -import { accessAuditLog, db, resources } from "@server/db"; +import { accessAuditLog, logsDb, resources, db, primaryDb } from "@server/db"; import { registry } from "@server/openApi"; import { NextFunction } from "express"; import { Request, Response } from "express"; -import { eq, gt, lt, and, count, desc } from "drizzle-orm"; +import { eq, gt, lt, and, count, desc, inArray } from "drizzle-orm"; import { OpenAPITags } from "@server/openApi"; import { z } from "zod"; import createHttpError from "http-errors"; @@ -115,7 +115,7 @@ function getWhere(data: Q) { } export function queryAccess(data: Q) { - return db + return logsDb .select({ orgId: accessAuditLog.orgId, action: accessAuditLog.action, @@ -133,16 +133,46 @@ export function queryAccess(data: Q) { actor: accessAuditLog.actor }) .from(accessAuditLog) - .leftJoin( - resources, - eq(accessAuditLog.resourceId, resources.resourceId) - ) .where(getWhere(data)) .orderBy(desc(accessAuditLog.timestamp), desc(accessAuditLog.id)); } +async function enrichWithResourceDetails(logs: Awaited>) { + // If logs database is the same as main database, we can do a join + // Otherwise, we need to fetch resource details separately + const resourceIds = logs + .map(log => log.resourceId) + .filter((id): id is number => id !== null && id !== undefined); + + if (resourceIds.length === 0) { + return logs.map(log => ({ ...log, resourceName: null, resourceNiceId: null })); + } + + // Fetch resource details from main database + const resourceDetails = await primaryDb + .select({ + resourceId: resources.resourceId, + name: resources.name, + niceId: resources.niceId + }) + .from(resources) + .where(inArray(resources.resourceId, resourceIds)); + + // Create a map for quick lookup + const resourceMap = new Map( + resourceDetails.map(r => [r.resourceId, { name: r.name, niceId: r.niceId }]) + ); + + // Enrich logs with resource details + return logs.map(log => ({ + ...log, + resourceName: log.resourceId ? resourceMap.get(log.resourceId)?.name ?? null : null, + resourceNiceId: log.resourceId ? resourceMap.get(log.resourceId)?.niceId ?? null : null + })); +} + export function countAccessQuery(data: Q) { - const countQuery = db + const countQuery = logsDb .select({ count: count() }) .from(accessAuditLog) .where(getWhere(data)); @@ -161,7 +191,7 @@ async function queryUniqueFilterAttributes( ); // Get unique actors - const uniqueActors = await db + const uniqueActors = await logsDb .selectDistinct({ actor: accessAuditLog.actor }) @@ -169,7 +199,7 @@ async function queryUniqueFilterAttributes( .where(baseConditions); // Get unique locations - const uniqueLocations = await db + const uniqueLocations = await logsDb .selectDistinct({ locations: accessAuditLog.location }) @@ -177,25 +207,40 @@ async function queryUniqueFilterAttributes( .where(baseConditions); // Get unique resources with names - const uniqueResources = await db + const uniqueResources = await logsDb .selectDistinct({ - id: accessAuditLog.resourceId, - name: resources.name + id: accessAuditLog.resourceId }) .from(accessAuditLog) - .leftJoin( - resources, - eq(accessAuditLog.resourceId, resources.resourceId) - ) .where(baseConditions); + // Fetch resource names from main database for the unique resource IDs + const resourceIds = uniqueResources + .map(row => row.id) + .filter((id): id is number => id !== null); + + let resourcesWithNames: Array<{ id: number; name: string | null }> = []; + + if (resourceIds.length > 0) { + const resourceDetails = await primaryDb + .select({ + resourceId: resources.resourceId, + name: resources.name + }) + .from(resources) + .where(inArray(resources.resourceId, resourceIds)); + + resourcesWithNames = resourceDetails.map(r => ({ + id: r.resourceId, + name: r.name + })); + } + return { actors: uniqueActors .map((row) => row.actor) .filter((actor): actor is string => actor !== null), - resources: uniqueResources.filter( - (row): row is { id: number; name: string | null } => row.id !== null - ), + resources: resourcesWithNames, locations: uniqueLocations .map((row) => row.locations) .filter((location): location is string => location !== null) @@ -243,7 +288,10 @@ export async function queryAccessAuditLogs( const baseQuery = queryAccess(data); - const log = await baseQuery.limit(data.limit).offset(data.offset); + const logsRaw = await baseQuery.limit(data.limit).offset(data.offset); + + // Enrich with resource details (handles cross-database scenario) + const log = await enrichWithResourceDetails(logsRaw); const totalCountResult = await countAccessQuery(data); const totalCount = totalCountResult[0].count; diff --git a/server/private/routers/auditLogs/queryActionAuditLog.ts b/server/private/routers/auditLogs/queryActionAuditLog.ts index 7eed741b..bd636dee 100644 --- a/server/private/routers/auditLogs/queryActionAuditLog.ts +++ b/server/private/routers/auditLogs/queryActionAuditLog.ts @@ -11,7 +11,7 @@ * This file is not licensed under the AGPLv3. */ -import { actionAuditLog, db } from "@server/db"; +import { actionAuditLog, logsDb } from "@server/db"; import { registry } from "@server/openApi"; import { NextFunction } from "express"; import { Request, Response } from "express"; @@ -97,7 +97,7 @@ function getWhere(data: Q) { } export function queryAction(data: Q) { - return db + return logsDb .select({ orgId: actionAuditLog.orgId, action: actionAuditLog.action, @@ -113,7 +113,7 @@ export function queryAction(data: Q) { } export function countActionQuery(data: Q) { - const countQuery = db + const countQuery = logsDb .select({ count: count() }) .from(actionAuditLog) .where(getWhere(data)); @@ -132,14 +132,14 @@ async function queryUniqueFilterAttributes( ); // Get unique actors - const uniqueActors = await db + const uniqueActors = await logsDb .selectDistinct({ actor: actionAuditLog.actor }) .from(actionAuditLog) .where(baseConditions); - const uniqueActions = await db + const uniqueActions = await logsDb .selectDistinct({ action: actionAuditLog.action }) diff --git a/server/routers/auditLogs/queryRequestAnalytics.ts b/server/routers/auditLogs/queryRequestAnalytics.ts index a6f9cb76..e838c5f5 100644 --- a/server/routers/auditLogs/queryRequestAnalytics.ts +++ b/server/routers/auditLogs/queryRequestAnalytics.ts @@ -1,4 +1,4 @@ -import { db, requestAuditLog, driver, primaryDb } from "@server/db"; +import { logsDb, requestAuditLog, driver, primaryLogsDb } from "@server/db"; import { registry } from "@server/openApi"; import { NextFunction } from "express"; import { Request, Response } from "express"; @@ -74,12 +74,12 @@ async function query(query: Q) { ); } - const [all] = await primaryDb + const [all] = await primaryLogsDb .select({ total: count() }) .from(requestAuditLog) .where(baseConditions); - const [blocked] = await primaryDb + const [blocked] = await primaryLogsDb .select({ total: count() }) .from(requestAuditLog) .where(and(baseConditions, eq(requestAuditLog.action, false))); @@ -90,7 +90,7 @@ async function query(query: Q) { const DISTINCT_LIMIT = 500; - const requestsPerCountry = await primaryDb + const requestsPerCountry = await primaryLogsDb .selectDistinct({ code: requestAuditLog.location, count: totalQ @@ -118,7 +118,7 @@ async function query(query: Q) { const booleanTrue = driver === "pg" ? sql`true` : sql`1`; const booleanFalse = driver === "pg" ? sql`false` : sql`0`; - const requestsPerDay = await primaryDb + const requestsPerDay = await primaryLogsDb .select({ day: groupByDayFunction.as("day"), allowedCount: diff --git a/server/routers/auditLogs/queryRequestAuditLog.ts b/server/routers/auditLogs/queryRequestAuditLog.ts index 98c23721..3b598e03 100644 --- a/server/routers/auditLogs/queryRequestAuditLog.ts +++ b/server/routers/auditLogs/queryRequestAuditLog.ts @@ -1,8 +1,8 @@ -import { db, primaryDb, requestAuditLog, resources } from "@server/db"; +import { logsDb, primaryLogsDb, requestAuditLog, resources, db, primaryDb } from "@server/db"; import { registry } from "@server/openApi"; import { NextFunction } from "express"; import { Request, Response } from "express"; -import { eq, gt, lt, and, count, desc } from "drizzle-orm"; +import { eq, gt, lt, and, count, desc, inArray } from "drizzle-orm"; import { OpenAPITags } from "@server/openApi"; import { z } from "zod"; import createHttpError from "http-errors"; @@ -107,7 +107,7 @@ function getWhere(data: Q) { } export function queryRequest(data: Q) { - return primaryDb + return primaryLogsDb .select({ id: requestAuditLog.id, timestamp: requestAuditLog.timestamp, @@ -129,21 +129,49 @@ export function queryRequest(data: Q) { host: requestAuditLog.host, path: requestAuditLog.path, method: requestAuditLog.method, - tls: requestAuditLog.tls, - resourceName: resources.name, - resourceNiceId: resources.niceId + tls: requestAuditLog.tls }) .from(requestAuditLog) - .leftJoin( - resources, - eq(requestAuditLog.resourceId, resources.resourceId) - ) // TODO: Is this efficient? .where(getWhere(data)) .orderBy(desc(requestAuditLog.timestamp)); } +async function enrichWithResourceDetails(logs: Awaited>) { + // If logs database is the same as main database, we can do a join + // Otherwise, we need to fetch resource details separately + const resourceIds = logs + .map(log => log.resourceId) + .filter((id): id is number => id !== null && id !== undefined); + + if (resourceIds.length === 0) { + return logs.map(log => ({ ...log, resourceName: null, resourceNiceId: null })); + } + + // Fetch resource details from main database + const resourceDetails = await primaryDb + .select({ + resourceId: resources.resourceId, + name: resources.name, + niceId: resources.niceId + }) + .from(resources) + .where(inArray(resources.resourceId, resourceIds)); + + // Create a map for quick lookup + const resourceMap = new Map( + resourceDetails.map(r => [r.resourceId, { name: r.name, niceId: r.niceId }]) + ); + + // Enrich logs with resource details + return logs.map(log => ({ + ...log, + resourceName: log.resourceId ? resourceMap.get(log.resourceId)?.name ?? null : null, + resourceNiceId: log.resourceId ? resourceMap.get(log.resourceId)?.niceId ?? null : null + })); +} + export function countRequestQuery(data: Q) { - const countQuery = primaryDb + const countQuery = primaryLogsDb .select({ count: count() }) .from(requestAuditLog) .where(getWhere(data)); @@ -185,36 +213,31 @@ async function queryUniqueFilterAttributes( uniquePaths, uniqueResources ] = await Promise.all([ - primaryDb + primaryLogsDb .selectDistinct({ actor: requestAuditLog.actor }) .from(requestAuditLog) .where(baseConditions) .limit(DISTINCT_LIMIT + 1), - primaryDb + primaryLogsDb .selectDistinct({ locations: requestAuditLog.location }) .from(requestAuditLog) .where(baseConditions) .limit(DISTINCT_LIMIT + 1), - primaryDb + primaryLogsDb .selectDistinct({ hosts: requestAuditLog.host }) .from(requestAuditLog) .where(baseConditions) .limit(DISTINCT_LIMIT + 1), - primaryDb + primaryLogsDb .selectDistinct({ paths: requestAuditLog.path }) .from(requestAuditLog) .where(baseConditions) .limit(DISTINCT_LIMIT + 1), - primaryDb + primaryLogsDb .selectDistinct({ - id: requestAuditLog.resourceId, - name: resources.name + id: requestAuditLog.resourceId }) .from(requestAuditLog) - .leftJoin( - resources, - eq(requestAuditLog.resourceId, resources.resourceId) - ) .where(baseConditions) .limit(DISTINCT_LIMIT + 1) ]); @@ -231,13 +254,33 @@ async function queryUniqueFilterAttributes( // throw new Error("Too many distinct filter attributes to retrieve. Please refine your time range."); // } + // Fetch resource names from main database for the unique resource IDs + const resourceIds = uniqueResources + .map(row => row.id) + .filter((id): id is number => id !== null); + + let resourcesWithNames: Array<{ id: number; name: string | null }> = []; + + if (resourceIds.length > 0) { + const resourceDetails = await primaryDb + .select({ + resourceId: resources.resourceId, + name: resources.name + }) + .from(resources) + .where(inArray(resources.resourceId, resourceIds)); + + resourcesWithNames = resourceDetails.map(r => ({ + id: r.resourceId, + name: r.name + })); + } + return { actors: uniqueActors .map((row) => row.actor) .filter((actor): actor is string => actor !== null), - resources: uniqueResources.filter( - (row): row is { id: number; name: string | null } => row.id !== null - ), + resources: resourcesWithNames, locations: uniqueLocations .map((row) => row.locations) .filter((location): location is string => location !== null), @@ -280,7 +323,10 @@ export async function queryRequestAuditLogs( const baseQuery = queryRequest(data); - const log = await baseQuery.limit(data.limit).offset(data.offset); + const logsRaw = await baseQuery.limit(data.limit).offset(data.offset); + + // Enrich with resource details (handles cross-database scenario) + const log = await enrichWithResourceDetails(logsRaw); const totalCountResult = await countRequestQuery(data); const totalCount = totalCountResult[0].count; diff --git a/server/routers/badger/logRequestAudit.ts b/server/routers/badger/logRequestAudit.ts index 5975d8f3..4075e526 100644 --- a/server/routers/badger/logRequestAudit.ts +++ b/server/routers/badger/logRequestAudit.ts @@ -1,4 +1,4 @@ -import { db, orgs, requestAuditLog } from "@server/db"; +import { logsDb, primaryLogsDb, db, orgs, requestAuditLog } from "@server/db"; import logger from "@server/logger"; import { and, eq, lt, sql } from "drizzle-orm"; import cache from "@server/lib/cache"; @@ -69,7 +69,7 @@ async function flushAuditLogs() { try { // Use a transaction to ensure all inserts succeed or fail together // This prevents index corruption from partial writes - await db.transaction(async (tx) => { + await logsDb.transaction(async (tx) => { // Batch insert logs in groups of 25 to avoid overwhelming the database const BATCH_DB_SIZE = 25; for (let i = 0; i < logsToWrite.length; i += BATCH_DB_SIZE) { @@ -162,7 +162,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) { const cutoffTimestamp = calculateCutoffTimestamp(retentionDays); try { - await db + await logsDb .delete(requestAuditLog) .where( and(