/* * This file is part of a proprietary work. * * Copyright (c) 2025 Fossorial, Inc. * All rights reserved. * * This file is licensed under the Fossorial Commercial License. * You may not use this file except in compliance with the License. * Unauthorized use, copying, modification, or distribution is strictly prohibited. * * This file is not licensed under the AGPLv3. */ import { db, logsDb, eventStreamingDestinations, eventStreamingCursors, requestAuditLog, actionAuditLog, accessAuditLog, connectionAuditLog } from "@server/db"; import logger from "@server/logger"; import { and, eq, gt, desc, max, sql } from "drizzle-orm"; import { LogType, LOG_TYPES, LogEvent, DestinationFailureState, HttpConfig } from "./types"; import { LogDestinationProvider } from "./providers/LogDestinationProvider"; import { HttpLogDestination } from "./providers/HttpLogDestination"; import type { EventStreamingDestination } from "@server/db"; // --------------------------------------------------------------------------- // Configuration // --------------------------------------------------------------------------- /** * How often (ms) the manager polls all destinations for new log records. * Destinations that were behind (full batch returned) will be re-polled * immediately without waiting for this interval. */ const POLL_INTERVAL_MS = 30_000; /** * Maximum number of log records fetched from the DB in a single query. * This also controls the maximum size of one HTTP POST body. */ const BATCH_SIZE = 250; /** * Minimum delay (ms) between consecutive HTTP requests to the same destination * during a catch-up run. Prevents bursting thousands of requests back-to-back * when a destination has fallen behind. */ const INTER_BATCH_DELAY_MS = 100; /** * Maximum number of consecutive back-to-back batches to process for a single * destination per poll cycle. After this limit the destination will wait for * the next scheduled poll before continuing, giving other destinations a turn. */ const MAX_CATCHUP_BATCHES = 20; /** * Back-off schedule (ms) indexed by consecutive failure count. * After the last entry the max value is re-used. */ const BACKOFF_SCHEDULE_MS = [ 60_000, // 1 min (failure 1) 2 * 60_000, // 2 min (failure 2) 5 * 60_000, // 5 min (failure 3) 10 * 60_000, // 10 min (failure 4) 30 * 60_000 // 30 min (failure 5+) ]; // --------------------------------------------------------------------------- // LogStreamingManager // --------------------------------------------------------------------------- /** * Orchestrates periodic polling of the four audit-log tables and forwards new * records to every enabled event-streaming destination. * * ### Design * - **Interval-based**: a timer fires every `POLL_INTERVAL_MS`. On each tick * every enabled destination is processed in sequence. * - **Cursor-based**: the last successfully forwarded row `id` is persisted in * the `eventStreamingCursors` table so state survives restarts. * - **Catch-up**: if a full batch is returned the destination is immediately * re-queried (up to `MAX_CATCHUP_BATCHES` times) before yielding. * - **Smoothing**: `INTER_BATCH_DELAY_MS` is inserted between consecutive * catch-up batches to avoid hammering the remote endpoint. * - **Back-off**: consecutive send failures trigger exponential back-off * (tracked in-memory per destination). Successful sends reset the counter. */ export class LogStreamingManager { private pollTimer: ReturnType | null = null; private isRunning = false; private isPolling = false; /** In-memory back-off state keyed by destinationId. */ private readonly failures = new Map(); // ------------------------------------------------------------------------- // Lifecycle // ------------------------------------------------------------------------- start(): void { if (this.isRunning) return; this.isRunning = true; logger.info("LogStreamingManager: started"); this.schedulePoll(POLL_INTERVAL_MS); } async shutdown(): Promise { this.isRunning = false; if (this.pollTimer !== null) { clearTimeout(this.pollTimer); this.pollTimer = null; } // Wait for any in-progress poll to finish before returning so that // callers (graceful-shutdown handlers) can safely exit afterward. const deadline = Date.now() + 15_000; while (this.isPolling && Date.now() < deadline) { await sleep(100); } logger.info("LogStreamingManager: stopped"); } // ------------------------------------------------------------------------- // Scheduling // ------------------------------------------------------------------------- private schedulePoll(delayMs: number): void { this.pollTimer = setTimeout(() => { this.pollTimer = null; this.runPoll() .catch((err) => logger.error("LogStreamingManager: unexpected poll error", err) ) .finally(() => { if (this.isRunning) { this.schedulePoll(POLL_INTERVAL_MS); } }); }, delayMs); // Do not keep the event loop alive just for the poll timer – the // graceful-shutdown path calls shutdown() explicitly. this.pollTimer.unref?.(); } // ------------------------------------------------------------------------- // Poll cycle // ------------------------------------------------------------------------- private async runPoll(): Promise { if (this.isPolling) return; // previous poll still running – skip this.isPolling = true; try { const destinations = await this.loadEnabledDestinations(); if (destinations.length === 0) return; for (const dest of destinations) { if (!this.isRunning) break; await this.processDestination(dest).catch((err) => { // Individual destination errors must never abort the whole cycle logger.error( `LogStreamingManager: unhandled error for destination ${dest.destinationId}`, err ); }); } } finally { this.isPolling = false; } } // ------------------------------------------------------------------------- // Per-destination processing // ------------------------------------------------------------------------- private async processDestination( dest: EventStreamingDestination ): Promise { // Check back-off const failState = this.failures.get(dest.destinationId); if (failState && Date.now() < failState.nextRetryAt) { logger.debug( `LogStreamingManager: destination ${dest.destinationId} in back-off, skipping` ); return; } // Parse config – skip destination if config is unparseable let config: HttpConfig; try { config = JSON.parse(dest.config) as HttpConfig; } catch (err) { logger.error( `LogStreamingManager: destination ${dest.destinationId} has invalid JSON config`, err ); return; } const provider = this.createProvider(dest.type, config); if (!provider) { logger.warn( `LogStreamingManager: unsupported destination type "${dest.type}" ` + `for destination ${dest.destinationId} – skipping` ); return; } const enabledTypes: LogType[] = []; if (dest.sendRequestLogs) enabledTypes.push("request"); if (dest.sendActionLogs) enabledTypes.push("action"); if (dest.sendAccessLogs) enabledTypes.push("access"); if (dest.sendConnectionLogs) enabledTypes.push("connection"); if (enabledTypes.length === 0) return; let anyFailure = false; for (const logType of enabledTypes) { if (!this.isRunning) break; try { await this.processLogType(dest, provider, logType); } catch (err) { anyFailure = true; logger.error( `LogStreamingManager: failed to process "${logType}" logs ` + `for destination ${dest.destinationId}`, err ); } } if (anyFailure) { this.recordFailure(dest.destinationId); } else { // Any success resets the failure/back-off state if (this.failures.has(dest.destinationId)) { this.failures.delete(dest.destinationId); logger.info( `LogStreamingManager: destination ${dest.destinationId} recovered` ); } } } /** * Forward all pending log records of a specific type for a destination. * * Fetches up to `BATCH_SIZE` records at a time. If the batch is full * (indicating more records may exist) it loops immediately, inserting a * short delay between consecutive requests to the remote endpoint. * The loop is capped at `MAX_CATCHUP_BATCHES` to keep the poll cycle * bounded. */ private async processLogType( dest: EventStreamingDestination, provider: LogDestinationProvider, logType: LogType ): Promise { // Ensure a cursor row exists (creates one pointing at the current max // id so we do not replay historical logs on first run) const cursor = await this.getOrCreateCursor( dest.destinationId, logType, dest.orgId ); let lastSentId = cursor.lastSentId; let batchCount = 0; while (batchCount < MAX_CATCHUP_BATCHES) { const rows = await this.fetchLogs( logType, dest.orgId, lastSentId, BATCH_SIZE ); if (rows.length === 0) break; const events = rows.map((row) => this.rowToLogEvent(logType, row) ); // Throws on failure – caught by the caller which applies back-off await provider.send(events); lastSentId = rows[rows.length - 1].id; await this.updateCursor(dest.destinationId, logType, lastSentId); batchCount++; if (rows.length < BATCH_SIZE) { // Partial batch means we have caught up break; } // Full batch – there are likely more records; pause briefly before // fetching the next batch to smooth out the HTTP request rate if (batchCount < MAX_CATCHUP_BATCHES) { await sleep(INTER_BATCH_DELAY_MS); } } } // ------------------------------------------------------------------------- // Cursor management // ------------------------------------------------------------------------- private async getOrCreateCursor( destinationId: number, logType: LogType, orgId: string ): Promise<{ lastSentId: number }> { // Try to read an existing cursor const existing = await db .select({ lastSentId: eventStreamingCursors.lastSentId }) .from(eventStreamingCursors) .where( and( eq(eventStreamingCursors.destinationId, destinationId), eq(eventStreamingCursors.logType, logType) ) ) .limit(1); if (existing.length > 0) { return { lastSentId: existing[0].lastSentId }; } // No cursor yet – initialise at the current maximum id of the log // table for this org so we only forward *new* events going forward. const initialId = await this.getCurrentMaxId(logType, orgId); await db.insert(eventStreamingCursors).values({ destinationId, logType, lastSentId: initialId, lastSentAt: null }); logger.debug( `LogStreamingManager: initialised cursor for destination ${destinationId} ` + `logType="${logType}" at id=${initialId}` ); return { lastSentId: initialId }; } private async updateCursor( destinationId: number, logType: LogType, lastSentId: number ): Promise { await db .update(eventStreamingCursors) .set({ lastSentId, lastSentAt: Date.now() }) .where( and( eq(eventStreamingCursors.destinationId, destinationId), eq(eventStreamingCursors.logType, logType) ) ); } /** * Returns the current maximum `id` in the given log table for the org. * Returns 0 when the table is empty. */ private async getCurrentMaxId( logType: LogType, orgId: string ): Promise { try { switch (logType) { case "request": { const [row] = await logsDb .select({ maxId: max(requestAuditLog.id) }) .from(requestAuditLog) .where(eq(requestAuditLog.orgId, orgId)); return row?.maxId ?? 0; } case "action": { const [row] = await logsDb .select({ maxId: max(actionAuditLog.id) }) .from(actionAuditLog) .where(eq(actionAuditLog.orgId, orgId)); return row?.maxId ?? 0; } case "access": { const [row] = await logsDb .select({ maxId: max(accessAuditLog.id) }) .from(accessAuditLog) .where(eq(accessAuditLog.orgId, orgId)); return row?.maxId ?? 0; } case "connection": { const [row] = await logsDb .select({ maxId: max(connectionAuditLog.id) }) .from(connectionAuditLog) .where(eq(connectionAuditLog.orgId, orgId)); return row?.maxId ?? 0; } } } catch (err) { logger.warn( `LogStreamingManager: could not determine current max id for ` + `logType="${logType}", defaulting to 0`, err ); return 0; } } // ------------------------------------------------------------------------- // Log fetching // ------------------------------------------------------------------------- /** * Fetch up to `limit` log rows with `id > afterId`, ordered by id ASC, * filtered to the given organisation. */ private async fetchLogs( logType: LogType, orgId: string, afterId: number, limit: number ): Promise & { id: number }>> { switch (logType) { case "request": return (await logsDb .select() .from(requestAuditLog) .where( and( eq(requestAuditLog.orgId, orgId), gt(requestAuditLog.id, afterId) ) ) .orderBy(requestAuditLog.id) .limit(limit)) as Array< Record & { id: number } >; case "action": return (await logsDb .select() .from(actionAuditLog) .where( and( eq(actionAuditLog.orgId, orgId), gt(actionAuditLog.id, afterId) ) ) .orderBy(actionAuditLog.id) .limit(limit)) as Array< Record & { id: number } >; case "access": return (await logsDb .select() .from(accessAuditLog) .where( and( eq(accessAuditLog.orgId, orgId), gt(accessAuditLog.id, afterId) ) ) .orderBy(accessAuditLog.id) .limit(limit)) as Array< Record & { id: number } >; case "connection": return (await logsDb .select() .from(connectionAuditLog) .where( and( eq(connectionAuditLog.orgId, orgId), gt(connectionAuditLog.id, afterId) ) ) .orderBy(connectionAuditLog.id) .limit(limit)) as Array< Record & { id: number } >; } } // ------------------------------------------------------------------------- // Row → LogEvent conversion // ------------------------------------------------------------------------- private rowToLogEvent( logType: LogType, row: Record & { id: number } ): LogEvent { // Determine the epoch-seconds timestamp for this row type let timestamp: number; switch (logType) { case "request": case "action": case "access": timestamp = typeof row.timestamp === "number" ? row.timestamp : 0; break; case "connection": timestamp = typeof row.startedAt === "number" ? row.startedAt : 0; break; } const orgId = typeof row.orgId === "string" ? row.orgId : ""; return { id: row.id, logType, orgId, timestamp, data: row as Record }; } // ------------------------------------------------------------------------- // Provider factory // ------------------------------------------------------------------------- /** * Instantiate the correct LogDestinationProvider for the given destination * type string. Returns `null` for unknown types. * * To add a new provider: * 1. Implement `LogDestinationProvider` in a new file under `providers/` * 2. Add a `case` here */ private createProvider( type: string, config: unknown ): LogDestinationProvider | null { switch (type) { case "http": return new HttpLogDestination(config as HttpConfig); // Future providers: // case "datadog": return new DatadogLogDestination(config as DatadogConfig); default: return null; } } // ------------------------------------------------------------------------- // Back-off tracking // ------------------------------------------------------------------------- private recordFailure(destinationId: number): void { const current = this.failures.get(destinationId) ?? { consecutiveFailures: 0, nextRetryAt: 0 }; current.consecutiveFailures += 1; const scheduleIdx = Math.min( current.consecutiveFailures - 1, BACKOFF_SCHEDULE_MS.length - 1 ); const backoffMs = BACKOFF_SCHEDULE_MS[scheduleIdx]; current.nextRetryAt = Date.now() + backoffMs; this.failures.set(destinationId, current); logger.warn( `LogStreamingManager: destination ${destinationId} failed ` + `(consecutive #${current.consecutiveFailures}), ` + `backing off for ${backoffMs / 1000}s` ); } // ------------------------------------------------------------------------- // DB helpers // ------------------------------------------------------------------------- private async loadEnabledDestinations(): Promise< EventStreamingDestination[] > { try { return await db .select() .from(eventStreamingDestinations) .where(eq(eventStreamingDestinations.enabled, true)); } catch (err) { logger.error( "LogStreamingManager: failed to load destinations", err ); return []; } } } // --------------------------------------------------------------------------- // Helpers // --------------------------------------------------------------------------- function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); }