diff --git a/server/routers/badger/logRequestAudit.ts b/server/routers/badger/logRequestAudit.ts index 026ee4bb..5975d8f3 100644 --- a/server/routers/badger/logRequestAudit.ts +++ b/server/routers/badger/logRequestAudit.ts @@ -1,6 +1,6 @@ import { db, orgs, requestAuditLog } from "@server/db"; import logger from "@server/logger"; -import { and, eq, lt } from "drizzle-orm"; +import { and, eq, lt, sql } from "drizzle-orm"; import cache from "@server/lib/cache"; import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs"; import { stripPortFromHost } from "@server/lib/ip"; @@ -67,17 +67,27 @@ async function flushAuditLogs() { const logsToWrite = auditLogBuffer.splice(0, auditLogBuffer.length); try { - // Batch insert logs in groups of 25 to avoid overwhelming the database - const BATCH_DB_SIZE = 25; - for (let i = 0; i < logsToWrite.length; i += BATCH_DB_SIZE) { - const batch = logsToWrite.slice(i, i + BATCH_DB_SIZE); - await db.insert(requestAuditLog).values(batch); - } + // Use a transaction to ensure all inserts succeed or fail together + // This prevents index corruption from partial writes + await db.transaction(async (tx) => { + // Batch insert logs in groups of 25 to avoid overwhelming the database + const BATCH_DB_SIZE = 25; + for (let i = 0; i < logsToWrite.length; i += BATCH_DB_SIZE) { + const batch = logsToWrite.slice(i, i + BATCH_DB_SIZE); + await tx.insert(requestAuditLog).values(batch); + } + }); logger.debug(`Flushed ${logsToWrite.length} audit logs to database`); } catch (error) { logger.error("Error flushing audit logs:", error); - // On error, we lose these logs - consider a fallback strategy if needed - // (e.g., write to file, or put back in buffer with retry limit) + // On transaction error, put logs back at the front of the buffer to retry + // but only if buffer isn't too large + if (auditLogBuffer.length < MAX_BUFFER_SIZE - logsToWrite.length) { + auditLogBuffer.unshift(...logsToWrite); + logger.info(`Re-queued ${logsToWrite.length} audit logs for retry`); + } else { + logger.error(`Buffer full, dropped ${logsToWrite.length} audit logs`); + } } finally { isFlushInProgress = false; // If buffer filled up while we were flushing, flush again