From 3dc258da16c0dd05e2f0f53d5495769400b4f17b Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 31 Mar 2026 12:22:37 -0700 Subject: [PATCH] Log streaming manager pass 1 --- server/db/pg/schema/privateSchema.ts | 28 +- server/db/sqlite/schema/privateSchema.ts | 27 +- server/private/cleanup.ts | 2 + .../lib/logStreaming/LogStreamingManager.ts | 626 ++++++++++++++++++ server/private/lib/logStreaming/index.ts | 31 + .../providers/HttpLogDestination.ts | 260 ++++++++ .../providers/LogDestinationProvider.ts | 44 ++ server/private/lib/logStreaming/types.ts | 115 ++++ 8 files changed, 1131 insertions(+), 2 deletions(-) create mode 100644 server/private/lib/logStreaming/LogStreamingManager.ts create mode 100644 server/private/lib/logStreaming/index.ts create mode 100644 server/private/lib/logStreaming/providers/HttpLogDestination.ts create mode 100644 server/private/lib/logStreaming/providers/LogDestinationProvider.ts create mode 100644 server/private/lib/logStreaming/types.ts diff --git a/server/db/pg/schema/privateSchema.ts b/server/db/pg/schema/privateSchema.ts index 1b031636f..ed6301437 100644 --- a/server/db/pg/schema/privateSchema.ts +++ b/server/db/pg/schema/privateSchema.ts @@ -8,7 +8,8 @@ import { real, text, index, - primaryKey + primaryKey, + uniqueIndex } from "drizzle-orm/pg-core"; import { InferSelectModel } from "drizzle-orm"; import { @@ -470,3 +471,28 @@ export type SiteProvisioningKeyOrg = InferSelectModel< export type EventStreamingDestination = InferSelectModel< typeof eventStreamingDestinations >; + +export const eventStreamingCursors = pgTable( + "eventStreamingCursors", + { + cursorId: serial("cursorId").primaryKey(), + destinationId: integer("destinationId") + .notNull() + .references(() => eventStreamingDestinations.destinationId, { + onDelete: "cascade" + }), + logType: varchar("logType", { length: 50 }).notNull(), // "request" | "action" | "access" | "connection" + lastSentId: bigint("lastSentId", { mode: "number" }).notNull().default(0), + lastSentAt: bigint("lastSentAt", { mode: "number" }) // epoch milliseconds, null if never sent + }, + (table) => [ + uniqueIndex("idx_eventStreamingCursors_dest_type").on( + table.destinationId, + table.logType + ) + ] +); + +export type EventStreamingCursor = InferSelectModel< + typeof eventStreamingCursors +>; diff --git a/server/db/sqlite/schema/privateSchema.ts b/server/db/sqlite/schema/privateSchema.ts index 9bb994266..c1aa084a2 100644 --- a/server/db/sqlite/schema/privateSchema.ts +++ b/server/db/sqlite/schema/privateSchema.ts @@ -5,7 +5,8 @@ import { primaryKey, real, sqliteTable, - text + text, + uniqueIndex } from "drizzle-orm/sqlite-core"; import { clients, @@ -433,6 +434,27 @@ export const eventStreamingDestinations = sqliteTable( } ); +export const eventStreamingCursors = sqliteTable( + "eventStreamingCursors", + { + cursorId: integer("cursorId").primaryKey({ autoIncrement: true }), + destinationId: integer("destinationId") + .notNull() + .references(() => eventStreamingDestinations.destinationId, { + onDelete: "cascade" + }), + logType: text("logType").notNull(), // "request" | "action" | "access" | "connection" + lastSentId: integer("lastSentId").notNull().default(0), + lastSentAt: integer("lastSentAt") // epoch milliseconds, null if never sent + }, + (table) => [ + uniqueIndex("idx_eventStreamingCursors_dest_type").on( + table.destinationId, + table.logType + ) + ] +); + export type Approval = InferSelectModel; export type Limit = InferSelectModel; export type Account = InferSelectModel; @@ -461,3 +483,6 @@ export type SiteProvisioningKey = InferSelectModel; export type EventStreamingDestination = InferSelectModel< typeof eventStreamingDestinations >; +export type EventStreamingCursor = InferSelectModel< + typeof eventStreamingCursors +>; diff --git a/server/private/cleanup.ts b/server/private/cleanup.ts index f8032eb3b..af4238721 100644 --- a/server/private/cleanup.ts +++ b/server/private/cleanup.ts @@ -12,6 +12,7 @@ */ import { rateLimitService } from "#private/lib/rateLimit"; +import { logStreamingManager } from "#private/lib/logStreaming"; import { cleanup as wsCleanup } from "#private/routers/ws"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; import { flushConnectionLogToDb } from "#private/routers/newt"; @@ -25,6 +26,7 @@ async function cleanup() { await flushSiteBandwidthToDb(); await rateLimitService.cleanup(); await wsCleanup(); + await logStreamingManager.shutdown(); process.exit(0); } diff --git a/server/private/lib/logStreaming/LogStreamingManager.ts b/server/private/lib/logStreaming/LogStreamingManager.ts new file mode 100644 index 000000000..131cc8de0 --- /dev/null +++ b/server/private/lib/logStreaming/LogStreamingManager.ts @@ -0,0 +1,626 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { + db, + logsDb, + eventStreamingDestinations, + eventStreamingCursors, + requestAuditLog, + actionAuditLog, + accessAuditLog, + connectionAuditLog +} from "@server/db"; +import logger from "@server/logger"; +import { and, eq, gt, desc, max, sql } from "drizzle-orm"; +import { + LogType, + LOG_TYPES, + LogEvent, + DestinationFailureState, + HttpConfig +} from "./types"; +import { LogDestinationProvider } from "./providers/LogDestinationProvider"; +import { HttpLogDestination } from "./providers/HttpLogDestination"; +import type { EventStreamingDestination } from "@server/db"; + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +/** + * How often (ms) the manager polls all destinations for new log records. + * Destinations that were behind (full batch returned) will be re-polled + * immediately without waiting for this interval. + */ +const POLL_INTERVAL_MS = 30_000; + +/** + * Maximum number of log records fetched from the DB in a single query. + * This also controls the maximum size of one HTTP POST body. + */ +const BATCH_SIZE = 250; + +/** + * Minimum delay (ms) between consecutive HTTP requests to the same destination + * during a catch-up run. Prevents bursting thousands of requests back-to-back + * when a destination has fallen behind. + */ +const INTER_BATCH_DELAY_MS = 100; + +/** + * Maximum number of consecutive back-to-back batches to process for a single + * destination per poll cycle. After this limit the destination will wait for + * the next scheduled poll before continuing, giving other destinations a turn. + */ +const MAX_CATCHUP_BATCHES = 20; + +/** + * Back-off schedule (ms) indexed by consecutive failure count. + * After the last entry the max value is re-used. + */ +const BACKOFF_SCHEDULE_MS = [ + 60_000, // 1 min (failure 1) + 2 * 60_000, // 2 min (failure 2) + 5 * 60_000, // 5 min (failure 3) + 10 * 60_000, // 10 min (failure 4) + 30 * 60_000 // 30 min (failure 5+) +]; + +// --------------------------------------------------------------------------- +// LogStreamingManager +// --------------------------------------------------------------------------- + +/** + * Orchestrates periodic polling of the four audit-log tables and forwards new + * records to every enabled event-streaming destination. + * + * ### Design + * - **Interval-based**: a timer fires every `POLL_INTERVAL_MS`. On each tick + * every enabled destination is processed in sequence. + * - **Cursor-based**: the last successfully forwarded row `id` is persisted in + * the `eventStreamingCursors` table so state survives restarts. + * - **Catch-up**: if a full batch is returned the destination is immediately + * re-queried (up to `MAX_CATCHUP_BATCHES` times) before yielding. + * - **Smoothing**: `INTER_BATCH_DELAY_MS` is inserted between consecutive + * catch-up batches to avoid hammering the remote endpoint. + * - **Back-off**: consecutive send failures trigger exponential back-off + * (tracked in-memory per destination). Successful sends reset the counter. + */ +export class LogStreamingManager { + private pollTimer: ReturnType | null = null; + private isRunning = false; + private isPolling = false; + + /** In-memory back-off state keyed by destinationId. */ + private readonly failures = new Map(); + + // ------------------------------------------------------------------------- + // Lifecycle + // ------------------------------------------------------------------------- + + start(): void { + if (this.isRunning) return; + this.isRunning = true; + logger.info("LogStreamingManager: started"); + this.schedulePoll(POLL_INTERVAL_MS); + } + + async shutdown(): Promise { + this.isRunning = false; + if (this.pollTimer !== null) { + clearTimeout(this.pollTimer); + this.pollTimer = null; + } + // Wait for any in-progress poll to finish before returning so that + // callers (graceful-shutdown handlers) can safely exit afterward. + const deadline = Date.now() + 15_000; + while (this.isPolling && Date.now() < deadline) { + await sleep(100); + } + logger.info("LogStreamingManager: stopped"); + } + + // ------------------------------------------------------------------------- + // Scheduling + // ------------------------------------------------------------------------- + + private schedulePoll(delayMs: number): void { + this.pollTimer = setTimeout(() => { + this.pollTimer = null; + this.runPoll() + .catch((err) => + logger.error("LogStreamingManager: unexpected poll error", err) + ) + .finally(() => { + if (this.isRunning) { + this.schedulePoll(POLL_INTERVAL_MS); + } + }); + }, delayMs); + + // Do not keep the event loop alive just for the poll timer – the + // graceful-shutdown path calls shutdown() explicitly. + this.pollTimer.unref?.(); + } + + // ------------------------------------------------------------------------- + // Poll cycle + // ------------------------------------------------------------------------- + + private async runPoll(): Promise { + if (this.isPolling) return; // previous poll still running – skip + this.isPolling = true; + + try { + const destinations = await this.loadEnabledDestinations(); + if (destinations.length === 0) return; + + for (const dest of destinations) { + if (!this.isRunning) break; + await this.processDestination(dest).catch((err) => { + // Individual destination errors must never abort the whole cycle + logger.error( + `LogStreamingManager: unhandled error for destination ${dest.destinationId}`, + err + ); + }); + } + } finally { + this.isPolling = false; + } + } + + // ------------------------------------------------------------------------- + // Per-destination processing + // ------------------------------------------------------------------------- + + private async processDestination( + dest: EventStreamingDestination + ): Promise { + // Check back-off + const failState = this.failures.get(dest.destinationId); + if (failState && Date.now() < failState.nextRetryAt) { + logger.debug( + `LogStreamingManager: destination ${dest.destinationId} in back-off, skipping` + ); + return; + } + + // Parse config – skip destination if config is unparseable + let config: HttpConfig; + try { + config = JSON.parse(dest.config) as HttpConfig; + } catch (err) { + logger.error( + `LogStreamingManager: destination ${dest.destinationId} has invalid JSON config`, + err + ); + return; + } + + const provider = this.createProvider(dest.type, config); + if (!provider) { + logger.warn( + `LogStreamingManager: unsupported destination type "${dest.type}" ` + + `for destination ${dest.destinationId} – skipping` + ); + return; + } + + const enabledTypes: LogType[] = []; + if (dest.sendRequestLogs) enabledTypes.push("request"); + if (dest.sendActionLogs) enabledTypes.push("action"); + if (dest.sendAccessLogs) enabledTypes.push("access"); + if (dest.sendConnectionLogs) enabledTypes.push("connection"); + + if (enabledTypes.length === 0) return; + + let anyFailure = false; + + for (const logType of enabledTypes) { + if (!this.isRunning) break; + try { + await this.processLogType(dest, provider, logType); + } catch (err) { + anyFailure = true; + logger.error( + `LogStreamingManager: failed to process "${logType}" logs ` + + `for destination ${dest.destinationId}`, + err + ); + } + } + + if (anyFailure) { + this.recordFailure(dest.destinationId); + } else { + // Any success resets the failure/back-off state + if (this.failures.has(dest.destinationId)) { + this.failures.delete(dest.destinationId); + logger.info( + `LogStreamingManager: destination ${dest.destinationId} recovered` + ); + } + } + } + + /** + * Forward all pending log records of a specific type for a destination. + * + * Fetches up to `BATCH_SIZE` records at a time. If the batch is full + * (indicating more records may exist) it loops immediately, inserting a + * short delay between consecutive requests to the remote endpoint. + * The loop is capped at `MAX_CATCHUP_BATCHES` to keep the poll cycle + * bounded. + */ + private async processLogType( + dest: EventStreamingDestination, + provider: LogDestinationProvider, + logType: LogType + ): Promise { + // Ensure a cursor row exists (creates one pointing at the current max + // id so we do not replay historical logs on first run) + const cursor = await this.getOrCreateCursor( + dest.destinationId, + logType, + dest.orgId + ); + + let lastSentId = cursor.lastSentId; + let batchCount = 0; + + while (batchCount < MAX_CATCHUP_BATCHES) { + const rows = await this.fetchLogs( + logType, + dest.orgId, + lastSentId, + BATCH_SIZE + ); + + if (rows.length === 0) break; + + const events = rows.map((row) => + this.rowToLogEvent(logType, row) + ); + + // Throws on failure – caught by the caller which applies back-off + await provider.send(events); + + lastSentId = rows[rows.length - 1].id; + await this.updateCursor(dest.destinationId, logType, lastSentId); + + batchCount++; + + if (rows.length < BATCH_SIZE) { + // Partial batch means we have caught up + break; + } + + // Full batch – there are likely more records; pause briefly before + // fetching the next batch to smooth out the HTTP request rate + if (batchCount < MAX_CATCHUP_BATCHES) { + await sleep(INTER_BATCH_DELAY_MS); + } + } + } + + // ------------------------------------------------------------------------- + // Cursor management + // ------------------------------------------------------------------------- + + private async getOrCreateCursor( + destinationId: number, + logType: LogType, + orgId: string + ): Promise<{ lastSentId: number }> { + // Try to read an existing cursor + const existing = await db + .select({ + lastSentId: eventStreamingCursors.lastSentId + }) + .from(eventStreamingCursors) + .where( + and( + eq(eventStreamingCursors.destinationId, destinationId), + eq(eventStreamingCursors.logType, logType) + ) + ) + .limit(1); + + if (existing.length > 0) { + return { lastSentId: existing[0].lastSentId }; + } + + // No cursor yet – initialise at the current maximum id of the log + // table for this org so we only forward *new* events going forward. + const initialId = await this.getCurrentMaxId(logType, orgId); + + await db.insert(eventStreamingCursors).values({ + destinationId, + logType, + lastSentId: initialId, + lastSentAt: null + }); + + logger.debug( + `LogStreamingManager: initialised cursor for destination ${destinationId} ` + + `logType="${logType}" at id=${initialId}` + ); + + return { lastSentId: initialId }; + } + + private async updateCursor( + destinationId: number, + logType: LogType, + lastSentId: number + ): Promise { + await db + .update(eventStreamingCursors) + .set({ + lastSentId, + lastSentAt: Date.now() + }) + .where( + and( + eq(eventStreamingCursors.destinationId, destinationId), + eq(eventStreamingCursors.logType, logType) + ) + ); + } + + /** + * Returns the current maximum `id` in the given log table for the org. + * Returns 0 when the table is empty. + */ + private async getCurrentMaxId( + logType: LogType, + orgId: string + ): Promise { + try { + switch (logType) { + case "request": { + const [row] = await logsDb + .select({ maxId: max(requestAuditLog.id) }) + .from(requestAuditLog) + .where(eq(requestAuditLog.orgId, orgId)); + return row?.maxId ?? 0; + } + case "action": { + const [row] = await logsDb + .select({ maxId: max(actionAuditLog.id) }) + .from(actionAuditLog) + .where(eq(actionAuditLog.orgId, orgId)); + return row?.maxId ?? 0; + } + case "access": { + const [row] = await logsDb + .select({ maxId: max(accessAuditLog.id) }) + .from(accessAuditLog) + .where(eq(accessAuditLog.orgId, orgId)); + return row?.maxId ?? 0; + } + case "connection": { + const [row] = await logsDb + .select({ maxId: max(connectionAuditLog.id) }) + .from(connectionAuditLog) + .where(eq(connectionAuditLog.orgId, orgId)); + return row?.maxId ?? 0; + } + } + } catch (err) { + logger.warn( + `LogStreamingManager: could not determine current max id for ` + + `logType="${logType}", defaulting to 0`, + err + ); + return 0; + } + } + + // ------------------------------------------------------------------------- + // Log fetching + // ------------------------------------------------------------------------- + + /** + * Fetch up to `limit` log rows with `id > afterId`, ordered by id ASC, + * filtered to the given organisation. + */ + private async fetchLogs( + logType: LogType, + orgId: string, + afterId: number, + limit: number + ): Promise & { id: number }>> { + switch (logType) { + case "request": + return (await logsDb + .select() + .from(requestAuditLog) + .where( + and( + eq(requestAuditLog.orgId, orgId), + gt(requestAuditLog.id, afterId) + ) + ) + .orderBy(requestAuditLog.id) + .limit(limit)) as Array< + Record & { id: number } + >; + + case "action": + return (await logsDb + .select() + .from(actionAuditLog) + .where( + and( + eq(actionAuditLog.orgId, orgId), + gt(actionAuditLog.id, afterId) + ) + ) + .orderBy(actionAuditLog.id) + .limit(limit)) as Array< + Record & { id: number } + >; + + case "access": + return (await logsDb + .select() + .from(accessAuditLog) + .where( + and( + eq(accessAuditLog.orgId, orgId), + gt(accessAuditLog.id, afterId) + ) + ) + .orderBy(accessAuditLog.id) + .limit(limit)) as Array< + Record & { id: number } + >; + + case "connection": + return (await logsDb + .select() + .from(connectionAuditLog) + .where( + and( + eq(connectionAuditLog.orgId, orgId), + gt(connectionAuditLog.id, afterId) + ) + ) + .orderBy(connectionAuditLog.id) + .limit(limit)) as Array< + Record & { id: number } + >; + } + } + + // ------------------------------------------------------------------------- + // Row → LogEvent conversion + // ------------------------------------------------------------------------- + + private rowToLogEvent( + logType: LogType, + row: Record & { id: number } + ): LogEvent { + // Determine the epoch-seconds timestamp for this row type + let timestamp: number; + switch (logType) { + case "request": + case "action": + case "access": + timestamp = + typeof row.timestamp === "number" ? row.timestamp : 0; + break; + case "connection": + timestamp = + typeof row.startedAt === "number" ? row.startedAt : 0; + break; + } + + const orgId = + typeof row.orgId === "string" ? row.orgId : ""; + + return { + id: row.id, + logType, + orgId, + timestamp, + data: row as Record + }; + } + + // ------------------------------------------------------------------------- + // Provider factory + // ------------------------------------------------------------------------- + + /** + * Instantiate the correct LogDestinationProvider for the given destination + * type string. Returns `null` for unknown types. + * + * To add a new provider: + * 1. Implement `LogDestinationProvider` in a new file under `providers/` + * 2. Add a `case` here + */ + private createProvider( + type: string, + config: unknown + ): LogDestinationProvider | null { + switch (type) { + case "http": + return new HttpLogDestination(config as HttpConfig); + // Future providers: + // case "datadog": return new DatadogLogDestination(config as DatadogConfig); + default: + return null; + } + } + + // ------------------------------------------------------------------------- + // Back-off tracking + // ------------------------------------------------------------------------- + + private recordFailure(destinationId: number): void { + const current = this.failures.get(destinationId) ?? { + consecutiveFailures: 0, + nextRetryAt: 0 + }; + + current.consecutiveFailures += 1; + + const scheduleIdx = Math.min( + current.consecutiveFailures - 1, + BACKOFF_SCHEDULE_MS.length - 1 + ); + const backoffMs = BACKOFF_SCHEDULE_MS[scheduleIdx]; + current.nextRetryAt = Date.now() + backoffMs; + + this.failures.set(destinationId, current); + + logger.warn( + `LogStreamingManager: destination ${destinationId} failed ` + + `(consecutive #${current.consecutiveFailures}), ` + + `backing off for ${backoffMs / 1000}s` + ); + } + + // ------------------------------------------------------------------------- + // DB helpers + // ------------------------------------------------------------------------- + + private async loadEnabledDestinations(): Promise< + EventStreamingDestination[] + > { + try { + return await db + .select() + .from(eventStreamingDestinations) + .where(eq(eventStreamingDestinations.enabled, true)); + } catch (err) { + logger.error( + "LogStreamingManager: failed to load destinations", + err + ); + return []; + } + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} \ No newline at end of file diff --git a/server/private/lib/logStreaming/index.ts b/server/private/lib/logStreaming/index.ts new file mode 100644 index 000000000..a4b8c569a --- /dev/null +++ b/server/private/lib/logStreaming/index.ts @@ -0,0 +1,31 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { LogStreamingManager } from "./LogStreamingManager"; + +/** + * Module-level singleton. Importing this module is sufficient to start the + * streaming manager – no explicit init call required by the caller. + * + * The manager registers a non-blocking timer (unref'd) so it will not keep + * the Node.js event loop alive on its own. Call `logStreamingManager.shutdown()` + * during graceful shutdown to drain any in-progress poll and release resources. + */ +export const logStreamingManager = new LogStreamingManager(); + +logStreamingManager.start(); + +export { LogStreamingManager } from "./LogStreamingManager"; +export type { LogDestinationProvider } from "./providers/LogDestinationProvider"; +export { HttpLogDestination } from "./providers/HttpLogDestination"; +export * from "./types"; \ No newline at end of file diff --git a/server/private/lib/logStreaming/providers/HttpLogDestination.ts b/server/private/lib/logStreaming/providers/HttpLogDestination.ts new file mode 100644 index 000000000..eac3c42e2 --- /dev/null +++ b/server/private/lib/logStreaming/providers/HttpLogDestination.ts @@ -0,0 +1,260 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import logger from "@server/logger"; +import { LogEvent, HttpConfig } from "../types"; +import { LogDestinationProvider } from "./LogDestinationProvider"; + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +/** Maximum time (ms) to wait for a single HTTP response. */ +const REQUEST_TIMEOUT_MS = 30_000; + +// --------------------------------------------------------------------------- +// HttpLogDestination +// --------------------------------------------------------------------------- + +/** + * Forwards a batch of log events to an arbitrary HTTP endpoint via a single + * POST request per batch. + * + * **Payload format** + * + * Without a body template the payload is a JSON array, one object per event: + * ```json + * [ + * { "event": "request", "timestamp": "2024-01-01T00:00:00.000Z", "data": { … } }, + * … + * ] + * ``` + * + * With a body template each event is rendered through the template and the + * resulting objects are wrapped in the same outer array. Template placeholders: + * - `{{event}}` → the LogType string ("request", "action", etc.) + * - `{{timestamp}}` → ISO-8601 UTC datetime string + * - `{{data}}` → raw inline JSON object (**no surrounding quotes**) + * + * Example template: + * ``` + * { "event": "{{event}}", "ts": "{{timestamp}}", "payload": {{data}} } + * ``` + */ +export class HttpLogDestination implements LogDestinationProvider { + readonly type = "http"; + + private readonly config: HttpConfig; + + constructor(config: HttpConfig) { + this.config = config; + } + + // ----------------------------------------------------------------------- + // LogDestinationProvider implementation + // ----------------------------------------------------------------------- + + async send(events: LogEvent[]): Promise { + if (events.length === 0) return; + + const headers = this.buildHeaders(); + const payload = this.buildPayload(events); + const body = JSON.stringify(payload); + + const controller = new AbortController(); + const timeoutHandle = setTimeout( + () => controller.abort(), + REQUEST_TIMEOUT_MS + ); + + let response: Response; + try { + response = await fetch(this.config.url, { + method: "POST", + headers, + body, + signal: controller.signal + }); + } catch (err: unknown) { + const isAbort = + err instanceof Error && err.name === "AbortError"; + if (isAbort) { + throw new Error( + `HttpLogDestination: request to "${this.config.url}" timed out after ${REQUEST_TIMEOUT_MS} ms` + ); + } + const msg = err instanceof Error ? err.message : String(err); + throw new Error( + `HttpLogDestination: request to "${this.config.url}" failed – ${msg}` + ); + } finally { + clearTimeout(timeoutHandle); + } + + if (!response.ok) { + // Try to include a snippet of the response body in the error so + // operators can diagnose auth or schema rejections. + let responseSnippet = ""; + try { + const text = await response.text(); + responseSnippet = text.slice(0, 300); + } catch { + // ignore – best effort + } + + throw new Error( + `HttpLogDestination: server at "${this.config.url}" returned ` + + `HTTP ${response.status} ${response.statusText}` + + (responseSnippet ? ` – ${responseSnippet}` : "") + ); + } + } + + // ----------------------------------------------------------------------- + // Header construction + // ----------------------------------------------------------------------- + + private buildHeaders(): Record { + const headers: Record = { + "Content-Type": "application/json" + }; + + // Authentication + switch (this.config.authType) { + case "bearer": { + const token = this.config.bearerToken?.trim(); + if (token) { + headers["Authorization"] = `Bearer ${token}`; + } + break; + } + case "basic": { + const creds = this.config.basicCredentials?.trim(); + if (creds) { + const encoded = Buffer.from(creds).toString("base64"); + headers["Authorization"] = `Basic ${encoded}`; + } + break; + } + case "custom": { + const name = this.config.customHeaderName?.trim(); + const value = this.config.customHeaderValue ?? ""; + if (name) { + headers[name] = value; + } + break; + } + case "none": + default: + // No Authorization header + break; + } + + // Additional static headers (user-defined; may override Content-Type + // if the operator explicitly sets it, which is intentional). + for (const { key, value } of this.config.headers ?? []) { + const trimmedKey = key?.trim(); + if (trimmedKey) { + headers[trimmedKey] = value ?? ""; + } + } + + return headers; + } + + // ----------------------------------------------------------------------- + // Payload construction + // ----------------------------------------------------------------------- + + /** + * Build the JSON-serialisable value that will be sent as the request body. + * + * - No template → `Array<{ event, timestamp, data }>` + * - With template → `Array` + */ + private buildPayload(events: LogEvent[]): unknown { + if (this.config.useBodyTemplate && this.config.bodyTemplate?.trim()) { + return events.map((event) => + this.renderTemplate(this.config.bodyTemplate!, event) + ); + } + + return events.map((event) => ({ + event: event.logType, + timestamp: epochSecondsToIso(event.timestamp), + data: event.data + })); + } + + /** + * Render a single event through the body template. + * + * The three placeholder tokens are replaced in a specific order to avoid + * accidental double-replacement: + * + * 1. `{{data}}` → raw JSON (may contain `{{` characters in values) + * 2. `{{event}}` → safe string + * 3. `{{timestamp}}` → safe ISO string + * + * If the rendered string is not valid JSON we fall back to returning it as + * a plain string so the batch still makes it out and the operator can + * inspect the template. + */ + private renderTemplate(template: string, event: LogEvent): unknown { + const isoTimestamp = epochSecondsToIso(event.timestamp); + const dataJson = JSON.stringify(event.data); + + // Replace {{data}} first because its JSON value might legitimately + // contain the substrings "{{event}}" or "{{timestamp}}" inside string + // fields – those should NOT be re-expanded. + const rendered = template + .replace(/\{\{data\}\}/g, dataJson) + .replace(/\{\{event\}\}/g, escapeJsonString(event.logType)) + .replace( + /\{\{timestamp\}\}/g, + escapeJsonString(isoTimestamp) + ); + + try { + return JSON.parse(rendered); + } catch { + logger.warn( + `HttpLogDestination: body template produced invalid JSON for ` + + `event type "${event.logType}" destined for "${this.config.url}". ` + + `Sending rendered template as a raw string. ` + + `Check your template syntax – specifically that {{data}} is ` + + `NOT wrapped in quotes.` + ); + return rendered; + } + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function epochSecondsToIso(epochSeconds: number): string { + return new Date(epochSeconds * 1000).toISOString(); +} + +/** + * Escape a string value so it can be safely substituted into the interior of + * a JSON string literal (i.e. between existing `"` quotes in the template). + * This prevents a crafted logType or timestamp from breaking out of its + * string context in the rendered template. + */ +function escapeJsonString(value: string): string { + // JSON.stringify produces `""` – strip the outer quotes. + return JSON.stringify(value).slice(1, -1); +} \ No newline at end of file diff --git a/server/private/lib/logStreaming/providers/LogDestinationProvider.ts b/server/private/lib/logStreaming/providers/LogDestinationProvider.ts new file mode 100644 index 000000000..d09be320b --- /dev/null +++ b/server/private/lib/logStreaming/providers/LogDestinationProvider.ts @@ -0,0 +1,44 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { LogEvent } from "../types"; + +/** + * Common interface that every log-forwarding backend must implement. + * + * Adding a new destination type (e.g. Datadog, Splunk, Kafka) is as simple as + * creating a class that satisfies this interface and registering it inside + * LogStreamingManager.createProvider(). + */ +export interface LogDestinationProvider { + /** + * The string identifier that matches the `type` column in the + * `eventStreamingDestinations` table (e.g. "http", "datadog"). + */ + readonly type: string; + + /** + * Forward a batch of log events to the destination. + * + * Implementations should: + * - Treat the call as atomic: either all events are accepted or an error + * is thrown so the caller can retry / back off. + * - Respect the timeout contract expected by the manager (default 30 s). + * - NOT swallow errors – the manager relies on thrown exceptions to track + * failure state and apply exponential back-off. + * + * @param events A non-empty array of normalised log events to forward. + * @throws Any network, authentication, or serialisation error. + */ + send(events: LogEvent[]): Promise; +} \ No newline at end of file diff --git a/server/private/lib/logStreaming/types.ts b/server/private/lib/logStreaming/types.ts new file mode 100644 index 000000000..2bd25418d --- /dev/null +++ b/server/private/lib/logStreaming/types.ts @@ -0,0 +1,115 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +// --------------------------------------------------------------------------- +// Log type identifiers +// --------------------------------------------------------------------------- + +export type LogType = "request" | "action" | "access" | "connection"; + +export const LOG_TYPES: LogType[] = [ + "request", + "action", + "access", + "connection" +]; + +// --------------------------------------------------------------------------- +// A normalised event ready to be forwarded to a destination +// --------------------------------------------------------------------------- + +export interface LogEvent { + /** The auto-increment primary key from the source table */ + id: number; + /** Which log table this event came from */ + logType: LogType; + /** The organisation that owns this event */ + orgId: string; + /** Unix epoch seconds – taken from the record's own timestamp field */ + timestamp: number; + /** Full row data from the source table, serialised as a plain object */ + data: Record; +} + +// --------------------------------------------------------------------------- +// A batch of events destined for a single streaming target +// --------------------------------------------------------------------------- + +export interface LogBatch { + destinationId: number; + logType: LogType; + events: LogEvent[]; +} + +// --------------------------------------------------------------------------- +// HTTP destination configuration (mirrors HttpConfig in the UI component) +// --------------------------------------------------------------------------- + +export type AuthType = "none" | "bearer" | "basic" | "custom"; + +export interface HttpConfig { + /** Human-readable label for the destination */ + name: string; + /** Target URL that will receive POST requests */ + url: string; + /** Authentication strategy to use */ + authType: AuthType; + /** Used when authType === "bearer" */ + bearerToken?: string; + /** Used when authType === "basic" – must be "username:password" */ + basicCredentials?: string; + /** Used when authType === "custom" – header name */ + customHeaderName?: string; + /** Used when authType === "custom" – header value */ + customHeaderValue?: string; + /** Additional static headers appended to every request */ + headers: Array<{ key: string; value: string }>; + /** Whether to render a custom body template instead of the default shape */ + useBodyTemplate: boolean; + /** + * Handlebars-style template for the JSON body of each event. + * + * Supported placeholders: + * {{event}} – the LogType string ("request", "action", etc.) + * {{timestamp}} – ISO-8601 UTC string derived from the event's timestamp + * {{data}} – raw JSON object (no surrounding quotes) of the full row + * + * Example: + * { "event": "{{event}}", "ts": "{{timestamp}}", "payload": {{data}} } + */ + bodyTemplate?: string; +} + +// --------------------------------------------------------------------------- +// Per-destination per-log-type cursor (reflects the DB table) +// --------------------------------------------------------------------------- + +export interface StreamingCursor { + destinationId: number; + logType: LogType; + /** The `id` of the last row that was successfully forwarded */ + lastSentId: number; + /** Epoch milliseconds of the last successful send (or null if never sent) */ + lastSentAt: number | null; +} + +// --------------------------------------------------------------------------- +// In-memory failure / back-off state tracked per destination +// --------------------------------------------------------------------------- + +export interface DestinationFailureState { + /** How many consecutive send failures have occurred */ + consecutiveFailures: number; + /** Date.now() value after which the destination may be retried */ + nextRetryAt: number; +} \ No newline at end of file