mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-01 00:06:38 +00:00
@@ -1,6 +1,6 @@
|
|||||||
import { db, orgs, requestAuditLog } from "@server/db";
|
import { db, orgs, requestAuditLog } from "@server/db";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { and, eq, lt } from "drizzle-orm";
|
import { and, eq, lt, sql } from "drizzle-orm";
|
||||||
import cache from "@server/lib/cache";
|
import cache from "@server/lib/cache";
|
||||||
import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs";
|
import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs";
|
||||||
import { stripPortFromHost } from "@server/lib/ip";
|
import { stripPortFromHost } from "@server/lib/ip";
|
||||||
@@ -67,17 +67,27 @@ async function flushAuditLogs() {
|
|||||||
const logsToWrite = auditLogBuffer.splice(0, auditLogBuffer.length);
|
const logsToWrite = auditLogBuffer.splice(0, auditLogBuffer.length);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Batch insert logs in groups of 25 to avoid overwhelming the database
|
// Use a transaction to ensure all inserts succeed or fail together
|
||||||
const BATCH_DB_SIZE = 25;
|
// This prevents index corruption from partial writes
|
||||||
for (let i = 0; i < logsToWrite.length; i += BATCH_DB_SIZE) {
|
await db.transaction(async (tx) => {
|
||||||
const batch = logsToWrite.slice(i, i + BATCH_DB_SIZE);
|
// Batch insert logs in groups of 25 to avoid overwhelming the database
|
||||||
await db.insert(requestAuditLog).values(batch);
|
const BATCH_DB_SIZE = 25;
|
||||||
}
|
for (let i = 0; i < logsToWrite.length; i += BATCH_DB_SIZE) {
|
||||||
|
const batch = logsToWrite.slice(i, i + BATCH_DB_SIZE);
|
||||||
|
await tx.insert(requestAuditLog).values(batch);
|
||||||
|
}
|
||||||
|
});
|
||||||
logger.debug(`Flushed ${logsToWrite.length} audit logs to database`);
|
logger.debug(`Flushed ${logsToWrite.length} audit logs to database`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Error flushing audit logs:", error);
|
logger.error("Error flushing audit logs:", error);
|
||||||
// On error, we lose these logs - consider a fallback strategy if needed
|
// On transaction error, put logs back at the front of the buffer to retry
|
||||||
// (e.g., write to file, or put back in buffer with retry limit)
|
// but only if buffer isn't too large
|
||||||
|
if (auditLogBuffer.length < MAX_BUFFER_SIZE - logsToWrite.length) {
|
||||||
|
auditLogBuffer.unshift(...logsToWrite);
|
||||||
|
logger.info(`Re-queued ${logsToWrite.length} audit logs for retry`);
|
||||||
|
} else {
|
||||||
|
logger.error(`Buffer full, dropped ${logsToWrite.length} audit logs`);
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
isFlushInProgress = false;
|
isFlushInProgress = false;
|
||||||
// If buffer filled up while we were flushing, flush again
|
// If buffer filled up while we were flushing, flush again
|
||||||
|
|||||||
Reference in New Issue
Block a user