diff --git a/messages/en-US.json b/messages/en-US.json index 6a137e2ba..e8c7cb47d 100644 --- a/messages/en-US.json +++ b/messages/en-US.json @@ -2460,6 +2460,7 @@ "connectionLogs": "Connection Logs", "connectionLogsDescription": "View connection logs for tunnels in this organization", "sidebarLogsConnection": "Connection Logs", + "sidebarLogsStreaming": "Streaming", "sourceAddress": "Source Address", "destinationAddress": "Destination Address", "duration": "Duration", diff --git a/public/third-party/dd.png b/public/third-party/dd.png new file mode 100644 index 000000000..598771157 Binary files /dev/null and b/public/third-party/dd.png differ diff --git a/public/third-party/s3.png b/public/third-party/s3.png new file mode 100644 index 000000000..f86959a93 Binary files /dev/null and b/public/third-party/s3.png differ diff --git a/server/auth/actions.ts b/server/auth/actions.ts index fc5daa4f8..213dab9d3 100644 --- a/server/auth/actions.ts +++ b/server/auth/actions.ts @@ -140,7 +140,11 @@ export enum ActionsEnum { exportLogs = "exportLogs", listApprovals = "listApprovals", updateApprovals = "updateApprovals", - signSshKey = "signSshKey" + signSshKey = "signSshKey", + createEventStreamingDestination = "createEventStreamingDestination", + updateEventStreamingDestination = "updateEventStreamingDestination", + deleteEventStreamingDestination = "deleteEventStreamingDestination", + listEventStreamingDestinations = "listEventStreamingDestinations" } export async function checkUserActionPermission( diff --git a/server/db/pg/schema/privateSchema.ts b/server/db/pg/schema/privateSchema.ts index 9d5955d51..4122fb5b5 100644 --- a/server/db/pg/schema/privateSchema.ts +++ b/server/db/pg/schema/privateSchema.ts @@ -8,7 +8,8 @@ import { real, text, index, - primaryKey + primaryKey, + uniqueIndex } from "drizzle-orm/pg-core"; import { InferSelectModel } from "drizzle-orm"; import { @@ -417,6 +418,46 @@ export const siteProvisioningKeyOrg = pgTable( ] ); +export const eventStreamingDestinations = pgTable( + "eventStreamingDestinations", + { + destinationId: serial("destinationId").primaryKey(), + orgId: varchar("orgId", { length: 255 }) + .notNull() + .references(() => orgs.orgId, { onDelete: "cascade" }), + sendConnectionLogs: boolean("sendConnectionLogs").notNull().default(false), + sendRequestLogs: boolean("sendRequestLogs").notNull().default(false), + sendActionLogs: boolean("sendActionLogs").notNull().default(false), + sendAccessLogs: boolean("sendAccessLogs").notNull().default(false), + type: varchar("type", { length: 50 }).notNull(), // e.g. "http", "kafka", etc. + config: text("config").notNull(), // JSON string with the configuration for the destination + enabled: boolean("enabled").notNull().default(true), + createdAt: bigint("createdAt", { mode: "number" }).notNull(), + updatedAt: bigint("updatedAt", { mode: "number" }).notNull() + } +); + +export const eventStreamingCursors = pgTable( + "eventStreamingCursors", + { + cursorId: serial("cursorId").primaryKey(), + destinationId: integer("destinationId") + .notNull() + .references(() => eventStreamingDestinations.destinationId, { + onDelete: "cascade" + }), + logType: varchar("logType", { length: 50 }).notNull(), // "request" | "action" | "access" | "connection" + lastSentId: bigint("lastSentId", { mode: "number" }).notNull().default(0), + lastSentAt: bigint("lastSentAt", { mode: "number" }) // epoch milliseconds, null if never sent + }, + (table) => [ + uniqueIndex("idx_eventStreamingCursors_dest_type").on( + table.destinationId, + table.logType + ) + ] +); + export type Approval = InferSelectModel; export type Limit = InferSelectModel; export type Account = InferSelectModel; @@ -439,3 +480,18 @@ export type LoginPageBranding = InferSelectModel; export type ActionAuditLog = InferSelectModel; export type AccessAuditLog = InferSelectModel; export type ConnectionAuditLog = InferSelectModel; +export type SessionTransferToken = InferSelectModel< + typeof sessionTransferToken +>; +export type BannedEmail = InferSelectModel; +export type BannedIp = InferSelectModel; +export type SiteProvisioningKey = InferSelectModel; +export type SiteProvisioningKeyOrg = InferSelectModel< + typeof siteProvisioningKeyOrg +>; +export type EventStreamingDestination = InferSelectModel< + typeof eventStreamingDestinations +>; +export type EventStreamingCursor = InferSelectModel< + typeof eventStreamingCursors +>; diff --git a/server/db/sqlite/schema/privateSchema.ts b/server/db/sqlite/schema/privateSchema.ts index 809c0c45d..c1aa084a2 100644 --- a/server/db/sqlite/schema/privateSchema.ts +++ b/server/db/sqlite/schema/privateSchema.ts @@ -5,9 +5,19 @@ import { primaryKey, real, sqliteTable, - text + text, + uniqueIndex } from "drizzle-orm/sqlite-core"; -import { clients, domains, exitNodes, orgs, sessions, siteResources, sites, users } from "./schema"; +import { + clients, + domains, + exitNodes, + orgs, + sessions, + siteResources, + sites, + users +} from "./schema"; export const certificates = sqliteTable("certificates", { certId: integer("certId").primaryKey({ autoIncrement: true }), @@ -401,6 +411,50 @@ export const siteProvisioningKeyOrg = sqliteTable( ] ); +export const eventStreamingDestinations = sqliteTable( + "eventStreamingDestinations", + { + destinationId: integer("destinationId").primaryKey({ + autoIncrement: true + }), + orgId: text("orgId") + .notNull() + .references(() => orgs.orgId, { onDelete: "cascade" }), + sendConnectionLogs: integer("sendConnectionLogs", { mode: "boolean" }).notNull().default(false), + sendRequestLogs: integer("sendRequestLogs", { mode: "boolean" }).notNull().default(false), + sendActionLogs: integer("sendActionLogs", { mode: "boolean" }).notNull().default(false), + sendAccessLogs: integer("sendAccessLogs", { mode: "boolean" }).notNull().default(false), + type: text("type").notNull(), // e.g. "http", "kafka", etc. + config: text("config").notNull(), // JSON string with the configuration for the destination + enabled: integer("enabled", { mode: "boolean" }) + .notNull() + .default(true), + createdAt: integer("createdAt").notNull(), + updatedAt: integer("updatedAt").notNull() + } +); + +export const eventStreamingCursors = sqliteTable( + "eventStreamingCursors", + { + cursorId: integer("cursorId").primaryKey({ autoIncrement: true }), + destinationId: integer("destinationId") + .notNull() + .references(() => eventStreamingDestinations.destinationId, { + onDelete: "cascade" + }), + logType: text("logType").notNull(), // "request" | "action" | "access" | "connection" + lastSentId: integer("lastSentId").notNull().default(0), + lastSentAt: integer("lastSentAt") // epoch milliseconds, null if never sent + }, + (table) => [ + uniqueIndex("idx_eventStreamingCursors_dest_type").on( + table.destinationId, + table.logType + ) + ] +); + export type Approval = InferSelectModel; export type Limit = InferSelectModel; export type Account = InferSelectModel; @@ -423,3 +477,12 @@ export type LoginPageBranding = InferSelectModel; export type ActionAuditLog = InferSelectModel; export type AccessAuditLog = InferSelectModel; export type ConnectionAuditLog = InferSelectModel; +export type BannedEmail = InferSelectModel; +export type BannedIp = InferSelectModel; +export type SiteProvisioningKey = InferSelectModel; +export type EventStreamingDestination = InferSelectModel< + typeof eventStreamingDestinations +>; +export type EventStreamingCursor = InferSelectModel< + typeof eventStreamingCursors +>; diff --git a/server/lib/billing/tierMatrix.ts b/server/lib/billing/tierMatrix.ts index 2aa38e1ef..c76dcd95b 100644 --- a/server/lib/billing/tierMatrix.ts +++ b/server/lib/billing/tierMatrix.ts @@ -18,7 +18,8 @@ export enum TierFeature { AutoProvisioning = "autoProvisioning", // handle downgrade by disabling auto provisioning SshPam = "sshPam", FullRbac = "fullRbac", - SiteProvisioningKeys = "siteProvisioningKeys" // handle downgrade by revoking keys if needed + SiteProvisioningKeys = "siteProvisioningKeys", // handle downgrade by revoking keys if needed + SIEM = "siem" // handle downgrade by disabling SIEM integrations } export const tierMatrix: Record = { @@ -54,5 +55,6 @@ export const tierMatrix: Record = { [TierFeature.AutoProvisioning]: ["tier1", "tier3", "enterprise"], [TierFeature.SshPam]: ["tier1", "tier3", "enterprise"], [TierFeature.FullRbac]: ["tier1", "tier2", "tier3", "enterprise"], - [TierFeature.SiteProvisioningKeys]: ["enterprise"] + [TierFeature.SiteProvisioningKeys]: ["tier3", "enterprise"], + [TierFeature.SIEM]: ["enterprise"] }; diff --git a/server/private/cleanup.ts b/server/private/cleanup.ts index f8032eb3b..af4238721 100644 --- a/server/private/cleanup.ts +++ b/server/private/cleanup.ts @@ -12,6 +12,7 @@ */ import { rateLimitService } from "#private/lib/rateLimit"; +import { logStreamingManager } from "#private/lib/logStreaming"; import { cleanup as wsCleanup } from "#private/routers/ws"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; import { flushConnectionLogToDb } from "#private/routers/newt"; @@ -25,6 +26,7 @@ async function cleanup() { await flushSiteBandwidthToDb(); await rateLimitService.cleanup(); await wsCleanup(); + await logStreamingManager.shutdown(); process.exit(0); } diff --git a/server/private/lib/logConnectionAudit.ts b/server/private/lib/logConnectionAudit.ts new file mode 100644 index 000000000..c7e786280 --- /dev/null +++ b/server/private/lib/logConnectionAudit.ts @@ -0,0 +1,234 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { logsDb, connectionAuditLog } from "@server/db"; +import logger from "@server/logger"; +import { and, eq, lt } from "drizzle-orm"; +import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs"; + +// --------------------------------------------------------------------------- +// Retry configuration for deadlock handling +// --------------------------------------------------------------------------- + +const MAX_RETRIES = 3; +const BASE_DELAY_MS = 50; + +// --------------------------------------------------------------------------- +// Buffer / flush configuration +// --------------------------------------------------------------------------- + +/** How often to flush accumulated connection log data to the database. */ +const FLUSH_INTERVAL_MS = 30_000; // 30 seconds + +/** Maximum number of records to buffer before forcing a flush. */ +const MAX_BUFFERED_RECORDS = 500; + +/** Maximum number of records to insert in a single database batch. */ +const INSERT_BATCH_SIZE = 100; + +// --------------------------------------------------------------------------- +// Types +// --------------------------------------------------------------------------- + +export interface ConnectionLogRecord { + sessionId: string; + siteResourceId: number; + orgId: string; + siteId: number; + clientId: number | null; + userId: string | null; + sourceAddr: string; + destAddr: string; + protocol: string; + startedAt: number; // epoch seconds + endedAt: number | null; + bytesTx: number | null; + bytesRx: number | null; +} + +// --------------------------------------------------------------------------- +// In-memory buffer +// --------------------------------------------------------------------------- + +let buffer: ConnectionLogRecord[] = []; + +// --------------------------------------------------------------------------- +// Deadlock helpers +// --------------------------------------------------------------------------- + +function isDeadlockError(error: any): boolean { + return ( + error?.code === "40P01" || + error?.cause?.code === "40P01" || + (error?.message && error.message.includes("deadlock")) + ); +} + +async function withDeadlockRetry( + operation: () => Promise, + context: string +): Promise { + let attempt = 0; + while (true) { + try { + return await operation(); + } catch (error: any) { + if (isDeadlockError(error) && attempt < MAX_RETRIES) { + attempt++; + const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; + const jitter = Math.random() * baseDelay; + const delay = baseDelay + jitter; + logger.warn( + `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + throw error; + } + } +} + +// --------------------------------------------------------------------------- +// Flush +// --------------------------------------------------------------------------- + +/** + * Flush all buffered connection log records to the database. + * + * Swaps out the buffer before writing so that any records added during the + * flush are captured in the new buffer rather than being lost. Entries that + * fail to write are re-queued back into the buffer so they will be retried + * on the next flush. + * + * This function is exported so that the application's graceful-shutdown + * cleanup handler can call it before the process exits. + */ +export async function flushConnectionLogToDb(): Promise { + if (buffer.length === 0) { + return; + } + + // Atomically swap out the buffer so new data keeps flowing in + const snapshot = buffer; + buffer = []; + + logger.debug( + `Flushing ${snapshot.length} connection log record(s) to the database` + ); + + for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) { + const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE); + + try { + await withDeadlockRetry(async () => { + await logsDb.insert(connectionAuditLog).values(batch); + }, `flush connection log batch (${batch.length} records)`); + } catch (error) { + logger.error( + `Failed to flush connection log batch of ${batch.length} records:`, + error + ); + + // Re-queue the failed batch so it is retried on the next flush + buffer = [...batch, ...buffer]; + + // Cap buffer to prevent unbounded growth if the DB is unreachable + const hardLimit = MAX_BUFFERED_RECORDS * 5; + if (buffer.length > hardLimit) { + const dropped = buffer.length - hardLimit; + buffer = buffer.slice(0, hardLimit); + logger.warn( + `Connection log buffer overflow, dropped ${dropped} oldest records` + ); + } + + // Stop processing further batches from this snapshot — they will + // be picked up via the re-queued records on the next flush. + const remaining = snapshot.slice(i + INSERT_BATCH_SIZE); + if (remaining.length > 0) { + buffer = [...remaining, ...buffer]; + } + break; + } + } +} + +// --------------------------------------------------------------------------- +// Periodic flush timer +// --------------------------------------------------------------------------- + +const flushTimer = setInterval(async () => { + try { + await flushConnectionLogToDb(); + } catch (error) { + logger.error( + "Unexpected error during periodic connection log flush:", + error + ); + } +}, FLUSH_INTERVAL_MS); + +// Calling unref() means this timer will not keep the Node.js event loop alive +// on its own — the process can still exit normally when there is no other work +// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly +// before process.exit(), so no data is lost. +flushTimer.unref(); + +// --------------------------------------------------------------------------- +// Cleanup +// --------------------------------------------------------------------------- + +export async function cleanUpOldLogs( + orgId: string, + retentionDays: number +): Promise { + const cutoffTimestamp = calculateCutoffTimestamp(retentionDays); + + try { + await logsDb + .delete(connectionAuditLog) + .where( + and( + lt(connectionAuditLog.startedAt, cutoffTimestamp), + eq(connectionAuditLog.orgId, orgId) + ) + ); + } catch (error) { + logger.error("Error cleaning up old connection audit logs:", error); + } +} + +// --------------------------------------------------------------------------- +// Public logging entry-point +// --------------------------------------------------------------------------- + +/** + * Buffer a single connection log record for eventual persistence. + * + * Records are written to the database in batches either when the buffer + * reaches MAX_BUFFERED_RECORDS or when the periodic flush timer fires. + */ +export function logConnectionAudit(record: ConnectionLogRecord): void { + buffer.push(record); + + if (buffer.length >= MAX_BUFFERED_RECORDS) { + // Fire and forget — errors are handled inside flushConnectionLogToDb + flushConnectionLogToDb().catch((error) => { + logger.error( + "Unexpected error during size-triggered connection log flush:", + error + ); + }); + } +} \ No newline at end of file diff --git a/server/private/lib/logStreaming/LogStreamingManager.ts b/server/private/lib/logStreaming/LogStreamingManager.ts new file mode 100644 index 000000000..04e35ad00 --- /dev/null +++ b/server/private/lib/logStreaming/LogStreamingManager.ts @@ -0,0 +1,773 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { + db, + logsDb, + eventStreamingDestinations, + eventStreamingCursors, + requestAuditLog, + actionAuditLog, + accessAuditLog, + connectionAuditLog +} from "@server/db"; +import logger from "@server/logger"; +import { and, eq, gt, desc, max, sql } from "drizzle-orm"; +import { + LogType, + LOG_TYPES, + LogEvent, + DestinationFailureState, + HttpConfig +} from "./types"; +import { LogDestinationProvider } from "./providers/LogDestinationProvider"; +import { HttpLogDestination } from "./providers/HttpLogDestination"; +import type { EventStreamingDestination } from "@server/db"; + +// --------------------------------------------------------------------------- +// Configuration +// --------------------------------------------------------------------------- + +/** + * How often (ms) the manager polls all destinations for new log records. + * Destinations that were behind (full batch returned) will be re-polled + * immediately without waiting for this interval. + */ +const POLL_INTERVAL_MS = 30_000; + +/** + * Maximum number of log records fetched from the DB in a single query. + * This also controls the maximum size of one HTTP POST body. + */ +const BATCH_SIZE = 250; + +/** + * Minimum delay (ms) between consecutive HTTP requests to the same destination + * during a catch-up run. Prevents bursting thousands of requests back-to-back + * when a destination has fallen behind. + */ +const INTER_BATCH_DELAY_MS = 100; + +/** + * Maximum number of consecutive back-to-back batches to process for a single + * destination per poll cycle. After this limit the destination will wait for + * the next scheduled poll before continuing, giving other destinations a turn. + */ +const MAX_CATCHUP_BATCHES = 20; + +/** + * Back-off schedule (ms) indexed by consecutive failure count. + * After the last entry the max value is re-used. + */ +const BACKOFF_SCHEDULE_MS = [ + 60_000, // 1 min (failure 1) + 2 * 60_000, // 2 min (failure 2) + 5 * 60_000, // 5 min (failure 3) + 10 * 60_000, // 10 min (failure 4) + 30 * 60_000 // 30 min (failure 5+) +]; + +/** + * If a destination has been continuously unreachable for this long, its + * cursors are advanced to the current max row id and the backlog is silently + * discarded. This prevents unbounded queue growth when a webhook endpoint is + * down for an extended period. A prominent warning is logged so operators are + * aware logs were dropped. + * + * Default: 24 hours. + */ +const MAX_BACKLOG_DURATION_MS = 24 * 60 * 60_000; + +// --------------------------------------------------------------------------- +// LogStreamingManager +// --------------------------------------------------------------------------- + +/** + * Orchestrates periodic polling of the four audit-log tables and forwards new + * records to every enabled event-streaming destination. + * + * ### Design + * - **Interval-based**: a timer fires every `POLL_INTERVAL_MS`. On each tick + * every enabled destination is processed in sequence. + * - **Cursor-based**: the last successfully forwarded row `id` is persisted in + * the `eventStreamingCursors` table so state survives restarts. + * - **Catch-up**: if a full batch is returned the destination is immediately + * re-queried (up to `MAX_CATCHUP_BATCHES` times) before yielding. + * - **Smoothing**: `INTER_BATCH_DELAY_MS` is inserted between consecutive + * catch-up batches to avoid hammering the remote endpoint. + * - **Back-off**: consecutive send failures trigger exponential back-off + * (tracked in-memory per destination). Successful sends reset the counter. + * - **Backlog abandonment**: if a destination remains unreachable for longer + * than `MAX_BACKLOG_DURATION_MS`, all cursors for that destination are + * advanced to the current max id so the backlog is discarded and streaming + * resumes from the present moment on recovery. + */ +export class LogStreamingManager { + private pollTimer: ReturnType | null = null; + private isRunning = false; + private isPolling = false; + + /** In-memory back-off state keyed by destinationId. */ + private readonly failures = new Map(); + + // ------------------------------------------------------------------------- + // Lifecycle + // ------------------------------------------------------------------------- + + start(): void { + if (this.isRunning) return; + this.isRunning = true; + logger.info("LogStreamingManager: started"); + this.schedulePoll(POLL_INTERVAL_MS); + } + + // ------------------------------------------------------------------------- + // Cursor initialisation (call this when a destination is first created) + // ------------------------------------------------------------------------- + + /** + * Eagerly seed cursors for every log type at the **current** max row id of + * each table, scoped to the destination's org. + * + * Call this immediately after inserting a new row into + * `eventStreamingDestinations` so the destination only receives events + * that were written *after* it was created. If a cursor row already exists + * (e.g. the method is called twice) it is left untouched. + * + * The manager also has a lazy fallback inside `getOrCreateCursor` for + * destinations that existed before this method was introduced. + */ + async initializeCursorsForDestination( + destinationId: number, + orgId: string + ): Promise { + for (const logType of LOG_TYPES) { + const currentMaxId = await this.getCurrentMaxId(logType, orgId); + try { + await db + .insert(eventStreamingCursors) + .values({ + destinationId, + logType, + lastSentId: currentMaxId, + lastSentAt: null + }) + .onConflictDoNothing(); + } catch (err) { + logger.warn( + `LogStreamingManager: could not initialise cursor for ` + + `destination ${destinationId} logType="${logType}"`, + err + ); + } + } + + logger.debug( + `LogStreamingManager: cursors initialised for destination ${destinationId} ` + + `(org=${orgId})` + ); + } + + async shutdown(): Promise { + this.isRunning = false; + if (this.pollTimer !== null) { + clearTimeout(this.pollTimer); + this.pollTimer = null; + } + // Wait for any in-progress poll to finish before returning so that + // callers (graceful-shutdown handlers) can safely exit afterward. + const deadline = Date.now() + 15_000; + while (this.isPolling && Date.now() < deadline) { + await sleep(100); + } + logger.info("LogStreamingManager: stopped"); + } + + // ------------------------------------------------------------------------- + // Scheduling + // ------------------------------------------------------------------------- + + private schedulePoll(delayMs: number): void { + this.pollTimer = setTimeout(() => { + this.pollTimer = null; + this.runPoll() + .catch((err) => + logger.error("LogStreamingManager: unexpected poll error", err) + ) + .finally(() => { + if (this.isRunning) { + this.schedulePoll(POLL_INTERVAL_MS); + } + }); + }, delayMs); + + // Do not keep the event loop alive just for the poll timer – the + // graceful-shutdown path calls shutdown() explicitly. + this.pollTimer.unref?.(); + } + + // ------------------------------------------------------------------------- + // Poll cycle + // ------------------------------------------------------------------------- + + private async runPoll(): Promise { + if (this.isPolling) return; // previous poll still running – skip + this.isPolling = true; + + try { + const destinations = await this.loadEnabledDestinations(); + if (destinations.length === 0) return; + + for (const dest of destinations) { + if (!this.isRunning) break; + await this.processDestination(dest).catch((err) => { + // Individual destination errors must never abort the whole cycle + logger.error( + `LogStreamingManager: unhandled error for destination ${dest.destinationId}`, + err + ); + }); + } + } finally { + this.isPolling = false; + } + } + + // ------------------------------------------------------------------------- + // Per-destination processing + // ------------------------------------------------------------------------- + + private async processDestination( + dest: EventStreamingDestination + ): Promise { + const failState = this.failures.get(dest.destinationId); + + // Check whether this destination has been unreachable long enough that + // we should give up on the accumulated backlog. + if (failState) { + const failingForMs = Date.now() - failState.firstFailedAt; + if (failingForMs >= MAX_BACKLOG_DURATION_MS) { + await this.abandonBacklog(dest, failState); + this.failures.delete(dest.destinationId); + // Cursors now point to the current head – retry on next poll. + return; + } + } + + // Check regular exponential back-off window + if (failState && Date.now() < failState.nextRetryAt) { + logger.debug( + `LogStreamingManager: destination ${dest.destinationId} in back-off, skipping` + ); + return; + } + + // Parse config – skip destination if config is unparseable + let config: HttpConfig; + try { + config = JSON.parse(dest.config) as HttpConfig; + } catch (err) { + logger.error( + `LogStreamingManager: destination ${dest.destinationId} has invalid JSON config`, + err + ); + return; + } + + const provider = this.createProvider(dest.type, config); + if (!provider) { + logger.warn( + `LogStreamingManager: unsupported destination type "${dest.type}" ` + + `for destination ${dest.destinationId} – skipping` + ); + return; + } + + const enabledTypes: LogType[] = []; + if (dest.sendRequestLogs) enabledTypes.push("request"); + if (dest.sendActionLogs) enabledTypes.push("action"); + if (dest.sendAccessLogs) enabledTypes.push("access"); + if (dest.sendConnectionLogs) enabledTypes.push("connection"); + + if (enabledTypes.length === 0) return; + + let anyFailure = false; + + for (const logType of enabledTypes) { + if (!this.isRunning) break; + try { + await this.processLogType(dest, provider, logType); + } catch (err) { + anyFailure = true; + logger.error( + `LogStreamingManager: failed to process "${logType}" logs ` + + `for destination ${dest.destinationId}`, + err + ); + } + } + + if (anyFailure) { + this.recordFailure(dest.destinationId); + } else { + // Any success resets the failure/back-off state + if (this.failures.has(dest.destinationId)) { + this.failures.delete(dest.destinationId); + logger.info( + `LogStreamingManager: destination ${dest.destinationId} recovered` + ); + } + } + } + + /** + * Advance every cursor for the destination to the current max row id, + * effectively discarding the accumulated backlog. Called when the + * destination has been unreachable for longer than MAX_BACKLOG_DURATION_MS. + */ + private async abandonBacklog( + dest: EventStreamingDestination, + failState: DestinationFailureState + ): Promise { + const failingForHours = ( + (Date.now() - failState.firstFailedAt) / + 3_600_000 + ).toFixed(1); + + let totalDropped = 0; + + for (const logType of LOG_TYPES) { + try { + const currentMaxId = await this.getCurrentMaxId( + logType, + dest.orgId + ); + + // Find out how many rows are being skipped for this type + const cursor = await db + .select({ lastSentId: eventStreamingCursors.lastSentId }) + .from(eventStreamingCursors) + .where( + and( + eq(eventStreamingCursors.destinationId, dest.destinationId), + eq(eventStreamingCursors.logType, logType) + ) + ) + .limit(1); + + const prevId = cursor[0]?.lastSentId ?? currentMaxId; + totalDropped += Math.max(0, currentMaxId - prevId); + + await this.updateCursor( + dest.destinationId, + logType, + currentMaxId + ); + } catch (err) { + logger.error( + `LogStreamingManager: failed to advance cursor for ` + + `destination ${dest.destinationId} logType="${logType}" ` + + `during backlog abandonment`, + err + ); + } + } + + logger.warn( + `LogStreamingManager: destination ${dest.destinationId} has been ` + + `unreachable for ${failingForHours}h ` + + `(${failState.consecutiveFailures} consecutive failures). ` + + `Discarding backlog of ~${totalDropped} log event(s) and ` + + `resuming from the current position. ` + + `Verify the destination URL and credentials.` + ); + } + + /** + * Forward all pending log records of a specific type for a destination. + * + * Fetches up to `BATCH_SIZE` records at a time. If the batch is full + * (indicating more records may exist) it loops immediately, inserting a + * short delay between consecutive requests to the remote endpoint. + * The loop is capped at `MAX_CATCHUP_BATCHES` to keep the poll cycle + * bounded. + */ + private async processLogType( + dest: EventStreamingDestination, + provider: LogDestinationProvider, + logType: LogType + ): Promise { + // Ensure a cursor row exists (creates one pointing at the current max + // id so we do not replay historical logs on first run) + const cursor = await this.getOrCreateCursor( + dest.destinationId, + logType, + dest.orgId + ); + + let lastSentId = cursor.lastSentId; + let batchCount = 0; + + while (batchCount < MAX_CATCHUP_BATCHES) { + const rows = await this.fetchLogs( + logType, + dest.orgId, + lastSentId, + BATCH_SIZE + ); + + if (rows.length === 0) break; + + const events = rows.map((row) => + this.rowToLogEvent(logType, row) + ); + + // Throws on failure – caught by the caller which applies back-off + await provider.send(events); + + lastSentId = rows[rows.length - 1].id; + await this.updateCursor(dest.destinationId, logType, lastSentId); + + batchCount++; + + if (rows.length < BATCH_SIZE) { + // Partial batch means we have caught up + break; + } + + // Full batch – there are likely more records; pause briefly before + // fetching the next batch to smooth out the HTTP request rate + if (batchCount < MAX_CATCHUP_BATCHES) { + await sleep(INTER_BATCH_DELAY_MS); + } + } + } + + // ------------------------------------------------------------------------- + // Cursor management + // ------------------------------------------------------------------------- + + private async getOrCreateCursor( + destinationId: number, + logType: LogType, + orgId: string + ): Promise<{ lastSentId: number }> { + // Try to read an existing cursor + const existing = await db + .select({ + lastSentId: eventStreamingCursors.lastSentId + }) + .from(eventStreamingCursors) + .where( + and( + eq(eventStreamingCursors.destinationId, destinationId), + eq(eventStreamingCursors.logType, logType) + ) + ) + .limit(1); + + if (existing.length > 0) { + return { lastSentId: existing[0].lastSentId }; + } + + // No cursor yet – this destination pre-dates the eager initialisation + // path (initializeCursorsForDestination). Seed at the current max id + // so we do not replay historical logs. + const initialId = await this.getCurrentMaxId(logType, orgId); + + // Use onConflictDoNothing in case of a rare race between two poll + // cycles both hitting this branch simultaneously. + await db + .insert(eventStreamingCursors) + .values({ + destinationId, + logType, + lastSentId: initialId, + lastSentAt: null + }) + .onConflictDoNothing(); + + logger.debug( + `LogStreamingManager: lazily initialised cursor for destination ${destinationId} ` + + `logType="${logType}" at id=${initialId} ` + + `(prefer initializeCursorsForDestination at creation time)` + ); + + return { lastSentId: initialId }; + } + + private async updateCursor( + destinationId: number, + logType: LogType, + lastSentId: number + ): Promise { + await db + .update(eventStreamingCursors) + .set({ + lastSentId, + lastSentAt: Date.now() + }) + .where( + and( + eq(eventStreamingCursors.destinationId, destinationId), + eq(eventStreamingCursors.logType, logType) + ) + ); + } + + /** + * Returns the current maximum `id` in the given log table for the org. + * Returns 0 when the table is empty. + */ + private async getCurrentMaxId( + logType: LogType, + orgId: string + ): Promise { + try { + switch (logType) { + case "request": { + const [row] = await logsDb + .select({ maxId: max(requestAuditLog.id) }) + .from(requestAuditLog) + .where(eq(requestAuditLog.orgId, orgId)); + return row?.maxId ?? 0; + } + case "action": { + const [row] = await logsDb + .select({ maxId: max(actionAuditLog.id) }) + .from(actionAuditLog) + .where(eq(actionAuditLog.orgId, orgId)); + return row?.maxId ?? 0; + } + case "access": { + const [row] = await logsDb + .select({ maxId: max(accessAuditLog.id) }) + .from(accessAuditLog) + .where(eq(accessAuditLog.orgId, orgId)); + return row?.maxId ?? 0; + } + case "connection": { + const [row] = await logsDb + .select({ maxId: max(connectionAuditLog.id) }) + .from(connectionAuditLog) + .where(eq(connectionAuditLog.orgId, orgId)); + return row?.maxId ?? 0; + } + } + } catch (err) { + logger.warn( + `LogStreamingManager: could not determine current max id for ` + + `logType="${logType}", defaulting to 0`, + err + ); + return 0; + } + } + + // ------------------------------------------------------------------------- + // Log fetching + // ------------------------------------------------------------------------- + + /** + * Fetch up to `limit` log rows with `id > afterId`, ordered by id ASC, + * filtered to the given organisation. + */ + private async fetchLogs( + logType: LogType, + orgId: string, + afterId: number, + limit: number + ): Promise & { id: number }>> { + switch (logType) { + case "request": + return (await logsDb + .select() + .from(requestAuditLog) + .where( + and( + eq(requestAuditLog.orgId, orgId), + gt(requestAuditLog.id, afterId) + ) + ) + .orderBy(requestAuditLog.id) + .limit(limit)) as Array< + Record & { id: number } + >; + + case "action": + return (await logsDb + .select() + .from(actionAuditLog) + .where( + and( + eq(actionAuditLog.orgId, orgId), + gt(actionAuditLog.id, afterId) + ) + ) + .orderBy(actionAuditLog.id) + .limit(limit)) as Array< + Record & { id: number } + >; + + case "access": + return (await logsDb + .select() + .from(accessAuditLog) + .where( + and( + eq(accessAuditLog.orgId, orgId), + gt(accessAuditLog.id, afterId) + ) + ) + .orderBy(accessAuditLog.id) + .limit(limit)) as Array< + Record & { id: number } + >; + + case "connection": + return (await logsDb + .select() + .from(connectionAuditLog) + .where( + and( + eq(connectionAuditLog.orgId, orgId), + gt(connectionAuditLog.id, afterId) + ) + ) + .orderBy(connectionAuditLog.id) + .limit(limit)) as Array< + Record & { id: number } + >; + } + } + + // ------------------------------------------------------------------------- + // Row → LogEvent conversion + // ------------------------------------------------------------------------- + + private rowToLogEvent( + logType: LogType, + row: Record & { id: number } + ): LogEvent { + // Determine the epoch-seconds timestamp for this row type + let timestamp: number; + switch (logType) { + case "request": + case "action": + case "access": + timestamp = + typeof row.timestamp === "number" ? row.timestamp : 0; + break; + case "connection": + timestamp = + typeof row.startedAt === "number" ? row.startedAt : 0; + break; + } + + const orgId = + typeof row.orgId === "string" ? row.orgId : ""; + + return { + id: row.id, + logType, + orgId, + timestamp, + data: row as Record + }; + } + + // ------------------------------------------------------------------------- + // Provider factory + // ------------------------------------------------------------------------- + + /** + * Instantiate the correct LogDestinationProvider for the given destination + * type string. Returns `null` for unknown types. + * + * To add a new provider: + * 1. Implement `LogDestinationProvider` in a new file under `providers/` + * 2. Add a `case` here + */ + private createProvider( + type: string, + config: unknown + ): LogDestinationProvider | null { + switch (type) { + case "http": + return new HttpLogDestination(config as HttpConfig); + // Future providers: + // case "datadog": return new DatadogLogDestination(config as DatadogConfig); + default: + return null; + } + } + + // ------------------------------------------------------------------------- + // Back-off tracking + // ------------------------------------------------------------------------- + + private recordFailure(destinationId: number): void { + const current = this.failures.get(destinationId) ?? { + consecutiveFailures: 0, + nextRetryAt: 0, + // Stamp the very first failure so we can measure total outage duration + firstFailedAt: Date.now() + }; + + current.consecutiveFailures += 1; + + const scheduleIdx = Math.min( + current.consecutiveFailures - 1, + BACKOFF_SCHEDULE_MS.length - 1 + ); + const backoffMs = BACKOFF_SCHEDULE_MS[scheduleIdx]; + current.nextRetryAt = Date.now() + backoffMs; + + this.failures.set(destinationId, current); + + logger.warn( + `LogStreamingManager: destination ${destinationId} failed ` + + `(consecutive #${current.consecutiveFailures}), ` + + `backing off for ${backoffMs / 1000}s` + ); + } + + // ------------------------------------------------------------------------- + // DB helpers + // ------------------------------------------------------------------------- + + private async loadEnabledDestinations(): Promise< + EventStreamingDestination[] + > { + try { + return await db + .select() + .from(eventStreamingDestinations) + .where(eq(eventStreamingDestinations.enabled, true)); + } catch (err) { + logger.error( + "LogStreamingManager: failed to load destinations", + err + ); + return []; + } + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} \ No newline at end of file diff --git a/server/private/lib/logStreaming/index.ts b/server/private/lib/logStreaming/index.ts new file mode 100644 index 000000000..619809771 --- /dev/null +++ b/server/private/lib/logStreaming/index.ts @@ -0,0 +1,34 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { build } from "@server/build"; +import { LogStreamingManager } from "./LogStreamingManager"; + +/** + * Module-level singleton. Importing this module is sufficient to start the + * streaming manager – no explicit init call required by the caller. + * + * The manager registers a non-blocking timer (unref'd) so it will not keep + * the Node.js event loop alive on its own. Call `logStreamingManager.shutdown()` + * during graceful shutdown to drain any in-progress poll and release resources. + */ +export const logStreamingManager = new LogStreamingManager(); + +if (build != "saas") { // this is handled separately in the saas build, so we don't want to start it here + logStreamingManager.start(); +} + +export { LogStreamingManager } from "./LogStreamingManager"; +export type { LogDestinationProvider } from "./providers/LogDestinationProvider"; +export { HttpLogDestination } from "./providers/HttpLogDestination"; +export * from "./types"; diff --git a/server/private/lib/logStreaming/providers/HttpLogDestination.ts b/server/private/lib/logStreaming/providers/HttpLogDestination.ts new file mode 100644 index 000000000..5e149f814 --- /dev/null +++ b/server/private/lib/logStreaming/providers/HttpLogDestination.ts @@ -0,0 +1,322 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import logger from "@server/logger"; +import { LogEvent, HttpConfig, PayloadFormat } from "../types"; +import { LogDestinationProvider } from "./LogDestinationProvider"; + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +/** Maximum time (ms) to wait for a single HTTP response. */ +const REQUEST_TIMEOUT_MS = 30_000; + +/** Default payload format when none is specified in the config. */ +const DEFAULT_FORMAT: PayloadFormat = "json_array"; + +// --------------------------------------------------------------------------- +// HttpLogDestination +// --------------------------------------------------------------------------- + +/** + * Forwards a batch of log events to an arbitrary HTTP endpoint via a single + * POST request per batch. + * + * **Payload format** + * + * **Payload formats** (controlled by `config.format`): + * + * - `json_array` (default) — one POST per batch, body is a JSON array: + * ```json + * [ + * { "event": "request", "timestamp": "2024-01-01T00:00:00.000Z", "data": { … } }, + * … + * ] + * ``` + * `Content-Type: application/json` + * + * - `ndjson` — one POST per batch, body is newline-delimited JSON (one object + * per line, no outer array). Required by Splunk HEC, Elastic/OpenSearch, + * and Grafana Loki: + * ``` + * {"event":"request","timestamp":"…","data":{…}} + * {"event":"action","timestamp":"…","data":{…}} + * ``` + * `Content-Type: application/x-ndjson` + * + * - `json_single` — one POST **per event**, body is a plain JSON object. + * Use only for endpoints that cannot handle batches at all. + * + * With a body template each event is rendered through the template before + * serialisation. Template placeholders: + * - `{{event}}` → the LogType string ("request", "action", etc.) + * - `{{timestamp}}` → ISO-8601 UTC datetime string + * - `{{data}}` → raw inline JSON object (**no surrounding quotes**) + * + * Example template: + * ``` + * { "event": "{{event}}", "ts": "{{timestamp}}", "payload": {{data}} } + * ``` + */ +export class HttpLogDestination implements LogDestinationProvider { + readonly type = "http"; + + private readonly config: HttpConfig; + + constructor(config: HttpConfig) { + this.config = config; + } + + // ----------------------------------------------------------------------- + // LogDestinationProvider implementation + // ----------------------------------------------------------------------- + + async send(events: LogEvent[]): Promise { + if (events.length === 0) return; + + const format = this.config.format ?? DEFAULT_FORMAT; + + if (format === "json_single") { + // One HTTP POST per event – send sequentially so a failure on one + // event throws and lets the manager retry the whole batch from the + // same cursor position. + for (const event of events) { + await this.postRequest( + this.buildSingleBody(event), + "application/json" + ); + } + return; + } + + if (format === "ndjson") { + const body = this.buildNdjsonBody(events); + await this.postRequest(body, "application/x-ndjson"); + return; + } + + // json_array (default) + const body = JSON.stringify(this.buildArrayPayload(events)); + await this.postRequest(body, "application/json"); + } + + // ----------------------------------------------------------------------- + // Internal HTTP sender + // ----------------------------------------------------------------------- + + private async postRequest( + body: string, + contentType: string + ): Promise { + const headers = this.buildHeaders(contentType); + + const controller = new AbortController(); + const timeoutHandle = setTimeout( + () => controller.abort(), + REQUEST_TIMEOUT_MS + ); + + let response: Response; + try { + response = await fetch(this.config.url, { + method: "POST", + headers, + body, + signal: controller.signal + }); + } catch (err: unknown) { + const isAbort = + err instanceof Error && err.name === "AbortError"; + if (isAbort) { + throw new Error( + `HttpLogDestination: request to "${this.config.url}" timed out after ${REQUEST_TIMEOUT_MS} ms` + ); + } + const msg = err instanceof Error ? err.message : String(err); + throw new Error( + `HttpLogDestination: request to "${this.config.url}" failed – ${msg}` + ); + } finally { + clearTimeout(timeoutHandle); + } + + if (!response.ok) { + // Try to include a snippet of the response body in the error so + // operators can diagnose auth or schema rejections. + let responseSnippet = ""; + try { + const text = await response.text(); + responseSnippet = text.slice(0, 300); + } catch { + // ignore – best effort + } + + throw new Error( + `HttpLogDestination: server at "${this.config.url}" returned ` + + `HTTP ${response.status} ${response.statusText}` + + (responseSnippet ? ` – ${responseSnippet}` : "") + ); + } + } + + // ----------------------------------------------------------------------- + // Header construction + // ----------------------------------------------------------------------- + + private buildHeaders(contentType: string): Record { + const headers: Record = { + "Content-Type": contentType + }; + + // Authentication + switch (this.config.authType) { + case "bearer": { + const token = this.config.bearerToken?.trim(); + if (token) { + headers["Authorization"] = `Bearer ${token}`; + } + break; + } + case "basic": { + const creds = this.config.basicCredentials?.trim(); + if (creds) { + const encoded = Buffer.from(creds).toString("base64"); + headers["Authorization"] = `Basic ${encoded}`; + } + break; + } + case "custom": { + const name = this.config.customHeaderName?.trim(); + const value = this.config.customHeaderValue ?? ""; + if (name) { + headers[name] = value; + } + break; + } + case "none": + default: + // No Authorization header + break; + } + + // Additional static headers (user-defined; may override Content-Type + // if the operator explicitly sets it, which is intentional). + for (const { key, value } of this.config.headers ?? []) { + const trimmedKey = key?.trim(); + if (trimmedKey) { + headers[trimmedKey] = value ?? ""; + } + } + + return headers; + } + + // ----------------------------------------------------------------------- + // Payload construction + // ----------------------------------------------------------------------- + + /** Single default event object (no surrounding array). */ + private buildEventObject(event: LogEvent): unknown { + if (this.config.useBodyTemplate && this.config.bodyTemplate?.trim()) { + return this.renderTemplate(this.config.bodyTemplate!, event); + } + return { + event: event.logType, + timestamp: epochSecondsToIso(event.timestamp), + data: event.data + }; + } + + /** JSON array payload – used for `json_array` format. */ + private buildArrayPayload(events: LogEvent[]): unknown[] { + return events.map((e) => this.buildEventObject(e)); + } + + /** + * NDJSON payload – one JSON object per line, no outer array. + * Each line must be a complete, valid JSON object. + */ + private buildNdjsonBody(events: LogEvent[]): string { + return events + .map((e) => JSON.stringify(this.buildEventObject(e))) + .join("\n"); + } + + /** Single-event body – used for `json_single` format. */ + private buildSingleBody(event: LogEvent): string { + return JSON.stringify(this.buildEventObject(event)); + } + + /** + * Render a single event through the body template. + * + * The three placeholder tokens are replaced in a specific order to avoid + * accidental double-replacement: + * + * 1. `{{data}}` → raw JSON (may contain `{{` characters in values) + * 2. `{{event}}` → safe string + * 3. `{{timestamp}}` → safe ISO string + * + * If the rendered string is not valid JSON we fall back to returning it as + * a plain string so the batch still makes it out and the operator can + * inspect the template. + */ + private renderTemplate(template: string, event: LogEvent): unknown { + const isoTimestamp = epochSecondsToIso(event.timestamp); + const dataJson = JSON.stringify(event.data); + + // Replace {{data}} first because its JSON value might legitimately + // contain the substrings "{{event}}" or "{{timestamp}}" inside string + // fields – those should NOT be re-expanded. + const rendered = template + .replace(/\{\{data\}\}/g, dataJson) + .replace(/\{\{event\}\}/g, escapeJsonString(event.logType)) + .replace( + /\{\{timestamp\}\}/g, + escapeJsonString(isoTimestamp) + ); + + try { + return JSON.parse(rendered); + } catch { + logger.warn( + `HttpLogDestination: body template produced invalid JSON for ` + + `event type "${event.logType}" destined for "${this.config.url}". ` + + `Sending rendered template as a raw string. ` + + `Check your template syntax – specifically that {{data}} is ` + + `NOT wrapped in quotes.` + ); + return rendered; + } + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function epochSecondsToIso(epochSeconds: number): string { + return new Date(epochSeconds * 1000).toISOString(); +} + +/** + * Escape a string value so it can be safely substituted into the interior of + * a JSON string literal (i.e. between existing `"` quotes in the template). + * This prevents a crafted logType or timestamp from breaking out of its + * string context in the rendered template. + */ +function escapeJsonString(value: string): string { + // JSON.stringify produces `""` – strip the outer quotes. + return JSON.stringify(value).slice(1, -1); +} \ No newline at end of file diff --git a/server/private/lib/logStreaming/providers/LogDestinationProvider.ts b/server/private/lib/logStreaming/providers/LogDestinationProvider.ts new file mode 100644 index 000000000..d09be320b --- /dev/null +++ b/server/private/lib/logStreaming/providers/LogDestinationProvider.ts @@ -0,0 +1,44 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { LogEvent } from "../types"; + +/** + * Common interface that every log-forwarding backend must implement. + * + * Adding a new destination type (e.g. Datadog, Splunk, Kafka) is as simple as + * creating a class that satisfies this interface and registering it inside + * LogStreamingManager.createProvider(). + */ +export interface LogDestinationProvider { + /** + * The string identifier that matches the `type` column in the + * `eventStreamingDestinations` table (e.g. "http", "datadog"). + */ + readonly type: string; + + /** + * Forward a batch of log events to the destination. + * + * Implementations should: + * - Treat the call as atomic: either all events are accepted or an error + * is thrown so the caller can retry / back off. + * - Respect the timeout contract expected by the manager (default 30 s). + * - NOT swallow errors – the manager relies on thrown exceptions to track + * failure state and apply exponential back-off. + * + * @param events A non-empty array of normalised log events to forward. + * @throws Any network, authentication, or serialisation error. + */ + send(events: LogEvent[]): Promise; +} \ No newline at end of file diff --git a/server/private/lib/logStreaming/types.ts b/server/private/lib/logStreaming/types.ts new file mode 100644 index 000000000..03fe88cad --- /dev/null +++ b/server/private/lib/logStreaming/types.ts @@ -0,0 +1,134 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +// --------------------------------------------------------------------------- +// Log type identifiers +// --------------------------------------------------------------------------- + +export type LogType = "request" | "action" | "access" | "connection"; + +export const LOG_TYPES: LogType[] = [ + "request", + "action", + "access", + "connection" +]; + +// --------------------------------------------------------------------------- +// A normalised event ready to be forwarded to a destination +// --------------------------------------------------------------------------- + +export interface LogEvent { + /** The auto-increment primary key from the source table */ + id: number; + /** Which log table this event came from */ + logType: LogType; + /** The organisation that owns this event */ + orgId: string; + /** Unix epoch seconds – taken from the record's own timestamp field */ + timestamp: number; + /** Full row data from the source table, serialised as a plain object */ + data: Record; +} + +// --------------------------------------------------------------------------- +// A batch of events destined for a single streaming target +// --------------------------------------------------------------------------- + +export interface LogBatch { + destinationId: number; + logType: LogType; + events: LogEvent[]; +} + +// --------------------------------------------------------------------------- +// HTTP destination configuration (mirrors HttpConfig in the UI component) +// --------------------------------------------------------------------------- + +export type AuthType = "none" | "bearer" | "basic" | "custom"; + +/** + * Controls how the batch of events is serialised into the HTTP request body. + * + * - `json_array` – `[{…}, {…}]` — default; one POST per batch wrapped in a + * JSON array. Works with most generic webhooks and Datadog. + * - `ndjson` – `{…}\n{…}` — newline-delimited JSON, one object per + * line. Required by Splunk HEC, Elastic/OpenSearch, Loki. + * - `json_single` – one HTTP POST per event, body is a plain JSON object. + * Use only for endpoints that cannot handle batches at all. + */ +export type PayloadFormat = "json_array" | "ndjson" | "json_single"; + +export interface HttpConfig { + /** Human-readable label for the destination */ + name: string; + /** Target URL that will receive POST requests */ + url: string; + /** Authentication strategy to use */ + authType: AuthType; + /** Used when authType === "bearer" */ + bearerToken?: string; + /** Used when authType === "basic" – must be "username:password" */ + basicCredentials?: string; + /** Used when authType === "custom" – header name */ + customHeaderName?: string; + /** Used when authType === "custom" – header value */ + customHeaderValue?: string; + /** Additional static headers appended to every request */ + headers: Array<{ key: string; value: string }>; + /** Whether to render a custom body template instead of the default shape */ + /** + * How events are serialised into the request body. + * Defaults to `"json_array"` when absent. + */ + format?: PayloadFormat; + useBodyTemplate: boolean; + /** + * Handlebars-style template for the JSON body of each event. + * + * Supported placeholders: + * {{event}} – the LogType string ("request", "action", etc.) + * {{timestamp}} – ISO-8601 UTC string derived from the event's timestamp + * {{data}} – raw JSON object (no surrounding quotes) of the full row + * + * Example: + * { "event": "{{event}}", "ts": "{{timestamp}}", "payload": {{data}} } + */ + bodyTemplate?: string; +} + +// --------------------------------------------------------------------------- +// Per-destination per-log-type cursor (reflects the DB table) +// --------------------------------------------------------------------------- + +export interface StreamingCursor { + destinationId: number; + logType: LogType; + /** The `id` of the last row that was successfully forwarded */ + lastSentId: number; + /** Epoch milliseconds of the last successful send (or null if never sent) */ + lastSentAt: number | null; +} + +// --------------------------------------------------------------------------- +// In-memory failure / back-off state tracked per destination +// --------------------------------------------------------------------------- + +export interface DestinationFailureState { + /** How many consecutive send failures have occurred */ + consecutiveFailures: number; + /** Date.now() value after which the destination may be retried */ + nextRetryAt: number; + /** Date.now() value of the very first failure in the current streak */ + firstFailedAt: number; +} \ No newline at end of file diff --git a/server/private/routers/eventStreamingDestination/createEventStreamingDestination.ts b/server/private/routers/eventStreamingDestination/createEventStreamingDestination.ts new file mode 100644 index 000000000..623f2d9e0 --- /dev/null +++ b/server/private/routers/eventStreamingDestination/createEventStreamingDestination.ts @@ -0,0 +1,138 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { Request, Response, NextFunction } from "express"; +import { z } from "zod"; +import { db } from "@server/db"; +import { eventStreamingDestinations } from "@server/db"; +import { logStreamingManager } from "#private/lib/logStreaming"; +import response from "@server/lib/response"; +import HttpCode from "@server/types/HttpCode"; +import createHttpError from "http-errors"; +import logger from "@server/logger"; +import { fromError } from "zod-validation-error"; +import { OpenAPITags, registry } from "@server/openApi"; + +const paramsSchema = z.strictObject({ + orgId: z.string().nonempty() +}); + +const bodySchema = z.strictObject({ + type: z.string().nonempty(), + config: z.string().nonempty(), + enabled: z.boolean().optional().default(true), + sendConnectionLogs: z.boolean().optional().default(false), + sendRequestLogs: z.boolean().optional().default(false), + sendActionLogs: z.boolean().optional().default(false), + sendAccessLogs: z.boolean().optional().default(false) +}); + +export type CreateEventStreamingDestinationResponse = { + destinationId: number; +}; + +registry.registerPath({ + method: "put", + path: "/org/{orgId}/event-streaming-destination", + description: "Create an event streaming destination for a specific organization.", + tags: [OpenAPITags.Org], + request: { + params: paramsSchema, + body: { + content: { + "application/json": { + schema: bodySchema + } + } + } + }, + responses: {} +}); + +export async function createEventStreamingDestination( + req: Request, + res: Response, + next: NextFunction +): Promise { + try { + const parsedParams = paramsSchema.safeParse(req.params); + if (!parsedParams.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedParams.error).toString() + ) + ); + } + + const { orgId } = parsedParams.data; + + const parsedBody = bodySchema.safeParse(req.body); + if (!parsedBody.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedBody.error).toString() + ) + ); + } + + const { type, config, enabled } = parsedBody.data; + + const now = Date.now(); + + const [destination] = await db + .insert(eventStreamingDestinations) + .values({ + orgId, + type, + config, + enabled, + createdAt: now, + updatedAt: now, + sendAccessLogs: parsedBody.data.sendAccessLogs, + sendActionLogs: parsedBody.data.sendActionLogs, + sendConnectionLogs: parsedBody.data.sendConnectionLogs, + sendRequestLogs: parsedBody.data.sendRequestLogs + }) + .returning(); + + // Seed cursors at the current max row id for every log type so this + // destination only receives events written *after* it was created. + // Fire-and-forget: a failure here is non-fatal; the manager has a lazy + // fallback that will seed at the next poll if these rows are missing. + logStreamingManager + .initializeCursorsForDestination(destination.destinationId, orgId) + .catch((err) => + logger.error( + "createEventStreamingDestination: failed to initialise streaming cursors", + err + ) + ); + + return response(res, { + data: { + destinationId: destination.destinationId + }, + success: true, + error: false, + message: "Event streaming destination created successfully", + status: HttpCode.CREATED + }); + } catch (error) { + logger.error(error); + return next( + createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred") + ); + } +} diff --git a/server/private/routers/eventStreamingDestination/deleteEventStreamingDestination.ts b/server/private/routers/eventStreamingDestination/deleteEventStreamingDestination.ts new file mode 100644 index 000000000..d93bc4405 --- /dev/null +++ b/server/private/routers/eventStreamingDestination/deleteEventStreamingDestination.ts @@ -0,0 +1,103 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { Request, Response, NextFunction } from "express"; +import { z } from "zod"; +import { db } from "@server/db"; +import { eventStreamingDestinations } from "@server/db"; +import response from "@server/lib/response"; +import HttpCode from "@server/types/HttpCode"; +import createHttpError from "http-errors"; +import logger from "@server/logger"; +import { fromError } from "zod-validation-error"; +import { OpenAPITags, registry } from "@server/openApi"; +import { and, eq } from "drizzle-orm"; + +const paramsSchema = z + .object({ + orgId: z.string().nonempty(), + destinationId: z.coerce.number() + }) + .strict(); + +registry.registerPath({ + method: "delete", + path: "/org/{orgId}/event-streaming-destination/{destinationId}", + description: "Delete an event streaming destination for a specific organization.", + tags: [OpenAPITags.Org], + request: { + params: paramsSchema + }, + responses: {} +}); + +export async function deleteEventStreamingDestination( + req: Request, + res: Response, + next: NextFunction +): Promise { + try { + const parsedParams = paramsSchema.safeParse(req.params); + if (!parsedParams.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedParams.error).toString() + ) + ); + } + + const { orgId, destinationId } = parsedParams.data; + + const [existing] = await db + .select() + .from(eventStreamingDestinations) + .where( + and( + eq(eventStreamingDestinations.destinationId, destinationId), + eq(eventStreamingDestinations.orgId, orgId) + ) + ); + + if (!existing) { + return next( + createHttpError( + HttpCode.NOT_FOUND, + "Event streaming destination not found" + ) + ); + } + + await db + .delete(eventStreamingDestinations) + .where( + and( + eq(eventStreamingDestinations.destinationId, destinationId), + eq(eventStreamingDestinations.orgId, orgId) + ) + ); + + return response(res, { + data: null, + success: true, + error: false, + message: "Event streaming destination deleted successfully", + status: HttpCode.OK + }); + } catch (error) { + logger.error(error); + return next( + createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred") + ); + } +} \ No newline at end of file diff --git a/server/private/routers/eventStreamingDestination/index.ts b/server/private/routers/eventStreamingDestination/index.ts new file mode 100644 index 000000000..595e9595b --- /dev/null +++ b/server/private/routers/eventStreamingDestination/index.ts @@ -0,0 +1,17 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +export * from "./createEventStreamingDestination"; +export * from "./updateEventStreamingDestination"; +export * from "./deleteEventStreamingDestination"; +export * from "./listEventStreamingDestinations"; \ No newline at end of file diff --git a/server/private/routers/eventStreamingDestination/listEventStreamingDestinations.ts b/server/private/routers/eventStreamingDestination/listEventStreamingDestinations.ts new file mode 100644 index 000000000..b3f5ff149 --- /dev/null +++ b/server/private/routers/eventStreamingDestination/listEventStreamingDestinations.ts @@ -0,0 +1,144 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { Request, Response, NextFunction } from "express"; +import { z } from "zod"; +import { db } from "@server/db"; +import { eventStreamingDestinations } from "@server/db"; +import response from "@server/lib/response"; +import HttpCode from "@server/types/HttpCode"; +import createHttpError from "http-errors"; +import logger from "@server/logger"; +import { fromError } from "zod-validation-error"; +import { OpenAPITags, registry } from "@server/openApi"; +import { eq, sql } from "drizzle-orm"; + +const paramsSchema = z.strictObject({ + orgId: z.string().nonempty() +}); + +const querySchema = z.strictObject({ + limit: z + .string() + .optional() + .default("1000") + .transform(Number) + .pipe(z.int().nonnegative()), + offset: z + .string() + .optional() + .default("0") + .transform(Number) + .pipe(z.int().nonnegative()) +}); + +export type ListEventStreamingDestinationsResponse = { + destinations: { + destinationId: number; + orgId: string; + type: string; + config: string; + enabled: boolean; + createdAt: number; + updatedAt: number; + sendConnectionLogs: boolean; + sendRequestLogs: boolean; + sendActionLogs: boolean; + sendAccessLogs: boolean; + }[]; + pagination: { + total: number; + limit: number; + offset: number; + }; +}; + +async function query(orgId: string, limit: number, offset: number) { + const res = await db + .select() + .from(eventStreamingDestinations) + .where(eq(eventStreamingDestinations.orgId, orgId)) + .orderBy(sql`${eventStreamingDestinations.createdAt} DESC`) + .limit(limit) + .offset(offset); + return res; +} + +registry.registerPath({ + method: "get", + path: "/org/{orgId}/event-streaming-destination", + description: "List all event streaming destinations for a specific organization.", + tags: [OpenAPITags.Org], + request: { + query: querySchema, + params: paramsSchema + }, + responses: {} +}); + +export async function listEventStreamingDestinations( + req: Request, + res: Response, + next: NextFunction +): Promise { + try { + const parsedParams = paramsSchema.safeParse(req.params); + if (!parsedParams.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedParams.error).toString() + ) + ); + } + const { orgId } = parsedParams.data; + + const parsedQuery = querySchema.safeParse(req.query); + if (!parsedQuery.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedQuery.error).toString() + ) + ); + } + const { limit, offset } = parsedQuery.data; + + const list = await query(orgId, limit, offset); + + const [{ count }] = await db + .select({ count: sql`count(*)` }) + .from(eventStreamingDestinations) + .where(eq(eventStreamingDestinations.orgId, orgId)); + + return response(res, { + data: { + destinations: list, + pagination: { + total: count, + limit, + offset + } + }, + success: true, + error: false, + message: "Event streaming destinations retrieved successfully", + status: HttpCode.OK + }); + } catch (error) { + logger.error(error); + return next( + createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred") + ); + } +} diff --git a/server/private/routers/eventStreamingDestination/updateEventStreamingDestination.ts b/server/private/routers/eventStreamingDestination/updateEventStreamingDestination.ts new file mode 100644 index 000000000..3d3321824 --- /dev/null +++ b/server/private/routers/eventStreamingDestination/updateEventStreamingDestination.ts @@ -0,0 +1,153 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { Request, Response, NextFunction } from "express"; +import { z } from "zod"; +import { db } from "@server/db"; +import { eventStreamingDestinations } from "@server/db"; +import response from "@server/lib/response"; +import HttpCode from "@server/types/HttpCode"; +import createHttpError from "http-errors"; +import logger from "@server/logger"; +import { fromError } from "zod-validation-error"; +import { OpenAPITags, registry } from "@server/openApi"; +import { and, eq } from "drizzle-orm"; + + +const paramsSchema = z + .object({ + orgId: z.string().nonempty(), + destinationId: z.coerce.number() + }) + .strict(); + +const bodySchema = z.strictObject({ + type: z.string().optional(), + config: z.string().optional(), + enabled: z.boolean().optional(), + sendConnectionLogs: z.boolean().optional(), + sendRequestLogs: z.boolean().optional(), + sendActionLogs: z.boolean().optional(), + sendAccessLogs: z.boolean().optional() +}); + +export type UpdateEventStreamingDestinationResponse = { + destinationId: number; +}; + +registry.registerPath({ + method: "post", + path: "/org/{orgId}/event-streaming-destination/{destinationId}", + description: "Update an event streaming destination for a specific organization.", + tags: [OpenAPITags.Org], + request: { + params: paramsSchema, + body: { + content: { + "application/json": { + schema: bodySchema + } + } + } + }, + responses: {} +}); + +export async function updateEventStreamingDestination( + req: Request, + res: Response, + next: NextFunction +): Promise { + try { + const parsedParams = paramsSchema.safeParse(req.params); + if (!parsedParams.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedParams.error).toString() + ) + ); + } + + const { orgId, destinationId } = parsedParams.data; + + const parsedBody = bodySchema.safeParse(req.body); + if (!parsedBody.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedBody.error).toString() + ) + ); + } + + const [existing] = await db + .select() + .from(eventStreamingDestinations) + .where( + and( + eq(eventStreamingDestinations.destinationId, destinationId), + eq(eventStreamingDestinations.orgId, orgId) + ) + ); + + if (!existing) { + return next( + createHttpError( + HttpCode.NOT_FOUND, + "Event streaming destination not found" + ) + ); + } + + const { type, config, enabled, sendAccessLogs, sendActionLogs, sendConnectionLogs, sendRequestLogs } = parsedBody.data; + + const updateData: Record = { + updatedAt: Date.now() + }; + + if (type !== undefined) updateData.type = type; + if (config !== undefined) updateData.config = config; + if (enabled !== undefined) updateData.enabled = enabled; + if (sendAccessLogs !== undefined) updateData.sendAccessLogs = sendAccessLogs; + if (sendActionLogs !== undefined) updateData.sendActionLogs = sendActionLogs; + if (sendConnectionLogs !== undefined) updateData.sendConnectionLogs = sendConnectionLogs; + if (sendRequestLogs !== undefined) updateData.sendRequestLogs = sendRequestLogs; + + await db + .update(eventStreamingDestinations) + .set(updateData) + .where( + and( + eq(eventStreamingDestinations.destinationId, destinationId), + eq(eventStreamingDestinations.orgId, orgId) + ) + ); + + + return response(res, { + data: { + destinationId + }, + success: true, + error: false, + message: "Event streaming destination updated successfully", + status: HttpCode.OK + }); + } catch (error) { + logger.error(error); + return next( + createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred") + ); + } +} diff --git a/server/private/routers/external.ts b/server/private/routers/external.ts index 412895a41..4410a44c8 100644 --- a/server/private/routers/external.ts +++ b/server/private/routers/external.ts @@ -28,6 +28,7 @@ import * as approval from "#private/routers/approvals"; import * as ssh from "#private/routers/ssh"; import * as user from "#private/routers/user"; import * as siteProvisioning from "#private/routers/siteProvisioning"; +import * as eventStreamingDestination from "#private/routers/eventStreamingDestination"; import { verifyOrgAccess, @@ -615,3 +616,39 @@ authenticated.patch( logActionAudit(ActionsEnum.updateSiteProvisioningKey), siteProvisioning.updateSiteProvisioningKey ); + +authenticated.put( + "/org/:orgId/event-streaming-destination", + verifyValidLicense, + verifyOrgAccess, + verifyLimits, + verifyUserHasAction(ActionsEnum.createEventStreamingDestination), + logActionAudit(ActionsEnum.createEventStreamingDestination), + eventStreamingDestination.createEventStreamingDestination +); + +authenticated.post( + "/org/:orgId/event-streaming-destination/:destinationId", + verifyValidLicense, + verifyOrgAccess, + verifyLimits, + verifyUserHasAction(ActionsEnum.updateEventStreamingDestination), + logActionAudit(ActionsEnum.updateEventStreamingDestination), + eventStreamingDestination.updateEventStreamingDestination +); + +authenticated.delete( + "/org/:orgId/event-streaming-destination/:destinationId", + verifyValidLicense, + verifyOrgAccess, + verifyUserHasAction(ActionsEnum.deleteEventStreamingDestination), + logActionAudit(ActionsEnum.deleteEventStreamingDestination), + eventStreamingDestination.deleteEventStreamingDestination +); + +authenticated.get( + "/org/:orgId/event-streaming-destinations", + verifyOrgAccess, + verifyUserHasAction(ActionsEnum.listEventStreamingDestinations), + eventStreamingDestination.listEventStreamingDestinations +); diff --git a/server/private/routers/newt/handleConnectionLogMessage.ts b/server/private/routers/newt/handleConnectionLogMessage.ts index 2ac7153b5..e980f85c9 100644 --- a/server/private/routers/newt/handleConnectionLogMessage.ts +++ b/server/private/routers/newt/handleConnectionLogMessage.ts @@ -1,27 +1,33 @@ -import { db, logsDb } from "@server/db"; +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { db } from "@server/db"; import { MessageHandler } from "@server/routers/ws"; -import { connectionAuditLog, sites, Newt, clients, orgs } from "@server/db"; -import { and, eq, lt, inArray } from "drizzle-orm"; +import { sites, Newt, clients, orgs } from "@server/db"; +import { and, eq, inArray } from "drizzle-orm"; import logger from "@server/logger"; import { inflate } from "zlib"; import { promisify } from "util"; -import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs"; +import { + logConnectionAudit, + flushConnectionLogToDb, + cleanUpOldLogs +} from "#private/lib/logConnectionAudit"; + +export { flushConnectionLogToDb, cleanUpOldLogs }; const zlibInflate = promisify(inflate); -// Retry configuration for deadlock handling -const MAX_RETRIES = 3; -const BASE_DELAY_MS = 50; - -// How often to flush accumulated connection log data to the database -const FLUSH_INTERVAL_MS = 30_000; // 30 seconds - -// Maximum number of records to buffer before forcing a flush -const MAX_BUFFERED_RECORDS = 500; - -// Maximum number of records to insert in a single batch -const INSERT_BATCH_SIZE = 100; - interface ConnectionSessionData { sessionId: string; resourceId: number; @@ -34,64 +40,6 @@ interface ConnectionSessionData { bytesRx?: number; } -interface ConnectionLogRecord { - sessionId: string; - siteResourceId: number; - orgId: string; - siteId: number; - clientId: number | null; - userId: string | null; - sourceAddr: string; - destAddr: string; - protocol: string; - startedAt: number; // epoch seconds - endedAt: number | null; - bytesTx: number | null; - bytesRx: number | null; -} - -// In-memory buffer of records waiting to be flushed -let buffer: ConnectionLogRecord[] = []; - -/** - * Check if an error is a deadlock error - */ -function isDeadlockError(error: any): boolean { - return ( - error?.code === "40P01" || - error?.cause?.code === "40P01" || - (error?.message && error.message.includes("deadlock")) - ); -} - -/** - * Execute a function with retry logic for deadlock handling - */ -async function withDeadlockRetry( - operation: () => Promise, - context: string -): Promise { - let attempt = 0; - while (true) { - try { - return await operation(); - } catch (error: any) { - if (isDeadlockError(error) && attempt < MAX_RETRIES) { - attempt++; - const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; - const jitter = Math.random() * baseDelay; - const delay = baseDelay + jitter; - logger.warn( - `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; - } - throw error; - } - } -} - /** * Decompress a base64-encoded zlib-compressed string into parsed JSON. */ @@ -125,105 +73,6 @@ function toEpochSeconds(isoString: string | undefined | null): number | null { return Math.floor(ms / 1000); } -/** - * Flush all buffered connection log records to the database. - * - * Swaps out the buffer before writing so that any records added during the - * flush are captured in the new buffer rather than being lost. Entries that - * fail to write are re-queued back into the buffer so they will be retried - * on the next flush. - * - * This function is exported so that the application's graceful-shutdown - * cleanup handler can call it before the process exits. - */ -export async function flushConnectionLogToDb(): Promise { - if (buffer.length === 0) { - return; - } - - // Atomically swap out the buffer so new data keeps flowing in - const snapshot = buffer; - buffer = []; - - logger.debug( - `Flushing ${snapshot.length} connection log record(s) to the database` - ); - - // Insert in batches to avoid overly large SQL statements - for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) { - const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE); - - try { - await withDeadlockRetry(async () => { - await logsDb.insert(connectionAuditLog).values(batch); - }, `flush connection log batch (${batch.length} records)`); - } catch (error) { - logger.error( - `Failed to flush connection log batch of ${batch.length} records:`, - error - ); - - // Re-queue the failed batch so it is retried on the next flush - buffer = [...batch, ...buffer]; - - // Cap buffer to prevent unbounded growth if DB is unreachable - if (buffer.length > MAX_BUFFERED_RECORDS * 5) { - const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5; - buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5); - logger.warn( - `Connection log buffer overflow, dropped ${dropped} oldest records` - ); - } - - // Stop trying further batches from this snapshot — they'll be - // picked up by the next flush via the re-queued records above - const remaining = snapshot.slice(i + INSERT_BATCH_SIZE); - if (remaining.length > 0) { - buffer = [...remaining, ...buffer]; - } - break; - } - } -} - -const flushTimer = setInterval(async () => { - try { - await flushConnectionLogToDb(); - } catch (error) { - logger.error( - "Unexpected error during periodic connection log flush:", - error - ); - } -}, FLUSH_INTERVAL_MS); - -// Calling unref() means this timer will not keep the Node.js event loop alive -// on its own — the process can still exit normally when there is no other work -// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly -// before process.exit(), so no data is lost. -flushTimer.unref(); - -export async function cleanUpOldLogs(orgId: string, retentionDays: number) { - const cutoffTimestamp = calculateCutoffTimestamp(retentionDays); - - try { - await logsDb - .delete(connectionAuditLog) - .where( - and( - lt(connectionAuditLog.startedAt, cutoffTimestamp), - eq(connectionAuditLog.orgId, orgId) - ) - ); - - // logger.debug( - // `Cleaned up connection audit logs older than ${retentionDays} days` - // ); - } catch (error) { - logger.error("Error cleaning up old connection audit logs:", error); - } -} - export const handleConnectionLogMessage: MessageHandler = async (context) => { const { message, client } = context; const newt = client as Newt; @@ -277,13 +126,16 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { return; } - logger.debug(`Sessions: ${JSON.stringify(sessions)}`) + logger.debug(`Sessions: ${JSON.stringify(sessions)}`); // Build a map from sourceAddr → { clientId, userId } by querying clients // whose subnet field matches exactly. Client subnets are stored with the // org's CIDR suffix (e.g. "100.90.128.5/16"), so we reconstruct that from // each unique sourceAddr + the org's CIDR suffix and do a targeted IN query. - const ipToClient = new Map(); + const ipToClient = new Map< + string, + { clientId: number; userId: string | null } + >(); if (cidrSuffix) { // Collect unique source addresses so we only query for what we need @@ -296,13 +148,11 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { if (uniqueSourceAddrs.size > 0) { // Construct the exact subnet strings as stored in the DB - const subnetQueries = Array.from(uniqueSourceAddrs).map( - (addr) => { - // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") - const ip = addr.includes(":") ? addr.split(":")[0] : addr; - return `${ip}${cidrSuffix}`; - } - ); + const subnetQueries = Array.from(uniqueSourceAddrs).map((addr) => { + // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") + const ip = addr.includes(":") ? addr.split(":")[0] : addr; + return `${ip}${cidrSuffix}`; + }); logger.debug(`Subnet queries: ${JSON.stringify(subnetQueries)}`); @@ -322,13 +172,18 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { for (const c of matchedClients) { const ip = c.subnet.split("/")[0]; - logger.debug(`Client ${c.clientId} subnet ${c.subnet} matches ${ip}`); - ipToClient.set(ip, { clientId: c.clientId, userId: c.userId }); + logger.debug( + `Client ${c.clientId} subnet ${c.subnet} matches ${ip}` + ); + ipToClient.set(ip, { + clientId: c.clientId, + userId: c.userId + }); } } } - // Convert to DB records and add to the buffer + // Convert to DB records and hand off to the audit logger for (const session of sessions) { // Validate required fields if ( @@ -356,11 +211,12 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { // client's IP on the WireGuard network, which corresponds to the IP // portion of the client's subnet CIDR (e.g. "100.90.128.5/24"). // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") - const sourceIp = session.sourceAddr.includes(":") ? session.sourceAddr.split(":")[0] : session.sourceAddr; + const sourceIp = session.sourceAddr.includes(":") + ? session.sourceAddr.split(":")[0] + : session.sourceAddr; const clientInfo = ipToClient.get(sourceIp) ?? null; - - buffer.push({ + logConnectionAudit({ sessionId: session.sessionId, siteResourceId: session.resourceId, orgId, @@ -380,15 +236,4 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { logger.debug( `Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})` ); - - // If the buffer has grown large enough, trigger an immediate flush - if (buffer.length >= MAX_BUFFERED_RECORDS) { - // Fire and forget — errors are handled inside flushConnectionLogToDb - flushConnectionLogToDb().catch((error) => { - logger.error( - "Unexpected error during size-triggered connection log flush:", - error - ); - }); - } }; diff --git a/server/private/routers/newt/index.ts b/server/private/routers/newt/index.ts index cc182cf7d..256d19cb7 100644 --- a/server/private/routers/newt/index.ts +++ b/server/private/routers/newt/index.ts @@ -1 +1,14 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + export * from "./handleConnectionLogMessage"; diff --git a/src/app/[orgId]/settings/access/invitations/page.tsx b/src/app/[orgId]/settings/access/invitations/page.tsx index 00cb0ffc8..ae37c3752 100644 --- a/src/app/[orgId]/settings/access/invitations/page.tsx +++ b/src/app/[orgId]/settings/access/invitations/page.tsx @@ -3,13 +3,12 @@ import { authCookieHeader } from "@app/lib/api/cookies"; import { AxiosResponse } from "axios"; import InvitationsTable, { InvitationRow -} from "../../../../../components/InvitationsTable"; +} from "@app/components/InvitationsTable"; import { GetOrgResponse } from "@server/routers/org"; import { cache } from "react"; import OrgProvider from "@app/providers/OrgProvider"; import UserProvider from "@app/providers/UserProvider"; import { verifySession } from "@app/lib/auth/verifySession"; -import AccessPageHeaderAndNav from "../../../../../components/AccessPageHeaderAndNav"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import { getTranslations } from "next-intl/server"; diff --git a/src/app/[orgId]/settings/access/users/page.tsx b/src/app/[orgId]/settings/access/users/page.tsx index 812ac2b64..84685cc04 100644 --- a/src/app/[orgId]/settings/access/users/page.tsx +++ b/src/app/[orgId]/settings/access/users/page.tsx @@ -3,13 +3,12 @@ import { authCookieHeader } from "@app/lib/api/cookies"; import { getUserDisplayName } from "@app/lib/getUserDisplayName"; import { ListUsersResponse } from "@server/routers/user"; import { AxiosResponse } from "axios"; -import UsersTable, { UserRow } from "../../../../../components/UsersTable"; +import UsersTable, { UserRow } from "@app/components/UsersTable"; import { GetOrgResponse } from "@server/routers/org"; import { cache } from "react"; import OrgProvider from "@app/providers/OrgProvider"; import UserProvider from "@app/providers/UserProvider"; import { verifySession } from "@app/lib/auth/verifySession"; -import AccessPageHeaderAndNav from "../../../../../components/AccessPageHeaderAndNav"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import { getTranslations } from "next-intl/server"; diff --git a/src/app/[orgId]/settings/api-keys/page.tsx b/src/app/[orgId]/settings/api-keys/page.tsx index 2973bb542..0ed9553af 100644 --- a/src/app/[orgId]/settings/api-keys/page.tsx +++ b/src/app/[orgId]/settings/api-keys/page.tsx @@ -4,7 +4,7 @@ import { AxiosResponse } from "axios"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import OrgApiKeysTable, { OrgApiKeyRow -} from "../../../../components/OrgApiKeysTable"; +} from "@app/components/OrgApiKeysTable"; import { ListOrgApiKeysResponse } from "@server/routers/apiKeys"; import { getTranslations } from "next-intl/server"; diff --git a/src/app/[orgId]/settings/domains/page.tsx b/src/app/[orgId]/settings/domains/page.tsx index 04db84b38..d1325d32b 100644 --- a/src/app/[orgId]/settings/domains/page.tsx +++ b/src/app/[orgId]/settings/domains/page.tsx @@ -2,7 +2,7 @@ import { internal } from "@app/lib/api"; import { authCookieHeader } from "@app/lib/api/cookies"; import { AxiosResponse } from "axios"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; -import DomainsTable, { DomainRow } from "../../../../components/DomainsTable"; +import DomainsTable, { DomainRow } from "@app/components/DomainsTable"; import { getTranslations } from "next-intl/server"; import { cache } from "react"; import { GetOrgResponse } from "@server/routers/org"; diff --git a/src/app/[orgId]/settings/logs/streaming/page.tsx b/src/app/[orgId]/settings/logs/streaming/page.tsx new file mode 100644 index 000000000..843f1ddb7 --- /dev/null +++ b/src/app/[orgId]/settings/logs/streaming/page.tsx @@ -0,0 +1,478 @@ +"use client"; + +import { useState, useEffect, useCallback } from "react"; +import { useParams } from "next/navigation"; +import { createApiClient, formatAxiosError } from "@app/lib/api"; +import { useEnvContext } from "@app/hooks/useEnvContext"; +import { toast } from "@app/hooks/useToast"; +import { usePaidStatus } from "@app/hooks/usePaidStatus"; +import { PaidFeaturesAlert } from "@app/components/PaidFeaturesAlert"; +import ConfirmDeleteDialog from "@app/components/ConfirmDeleteDialog"; +import { tierMatrix, TierFeature } from "@server/lib/billing/tierMatrix"; +import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; +import { + Credenza, + CredenzaBody, + CredenzaClose, + CredenzaContent, + CredenzaDescription, + CredenzaFooter, + CredenzaHeader, + CredenzaTitle +} from "@app/components/Credenza"; +import { Button } from "@app/components/ui/button"; +import { Switch } from "@app/components/ui/switch"; +import { Globe, MoreHorizontal, Plus } from "lucide-react"; +import { AxiosResponse } from "axios"; +import { build } from "@server/build"; +import Image from "next/image"; +import { StrategySelect, StrategyOption } from "@app/components/StrategySelect"; +import { + DropdownMenu, + DropdownMenuContent, + DropdownMenuItem, + DropdownMenuTrigger +} from "@app/components/ui/dropdown-menu"; +import { + Destination, + HttpDestinationCredenza, + parseHttpConfig +} from "@app/components/HttpDestinationCredenza"; + +// ── Re-export Destination so the rest of the file can use it ────────────────── + +interface ListDestinationsResponse { + destinations: Destination[]; + pagination: { + total: number; + limit: number; + offset: number; + }; +} + +// ── Destination card ─────────────────────────────────────────────────────────── + +interface DestinationCardProps { + destination: Destination; + onToggle: (id: number, enabled: boolean) => void; + onEdit: (destination: Destination) => void; + onDelete: (destination: Destination) => void; + isToggling: boolean; + disabled?: boolean; +} + +function DestinationCard({ + destination, + onToggle, + onEdit, + onDelete, + isToggling, + disabled = false +}: DestinationCardProps) { + const cfg = parseHttpConfig(destination.config); + + return ( +
+ {/* Top row: icon + name/type + toggle */} +
+
+ {/* Squirkle icon: gray outer → white inner → black globe */} +
+
+ +
+
+
+

+ {cfg.name || "Unnamed destination"} +

+

+ HTTP +

+
+
+ + onToggle(destination.destinationId, v) + } + disabled={isToggling || disabled} + className="shrink-0 mt-0.5" + /> +
+ + {/* URL preview */} +

+ {cfg.url || ( + No URL configured + )} +

+ + {/* Footer: edit button + three-dots menu */} +
+ + + + + + + onDelete(destination)} + > + Delete + + + +
+
+ ); +} + +// ── Add destination card ─────────────────────────────────────────────────────── + +function AddDestinationCard({ onClick }: { onClick: () => void }) { + return ( + + ); +} + +// ── Destination type picker ──────────────────────────────────────────────────── + +type DestinationType = "http" | "s3" | "datadog"; + +const destinationTypeOptions: ReadonlyArray> = [ + { + id: "http", + title: "HTTP Webhook", + description: + "Send events to any HTTP endpoint with flexible authentication and templating.", + icon: + }, + { + id: "s3", + title: "Amazon S3", + description: + "Stream events to an S3-compatible object storage bucket. Coming soon.", + disabled: true, + icon: ( + Amazon S3 + ) + }, + { + id: "datadog", + title: "Datadog", + description: + "Forward events directly to your Datadog account. Coming soon.", + disabled: true, + icon: ( + Datadog + ) + } +]; + +interface DestinationTypePickerProps { + open: boolean; + onOpenChange: (open: boolean) => void; + onSelect: (type: DestinationType) => void; + isPaywalled?: boolean; +} + +function DestinationTypePicker({ + open, + onOpenChange, + onSelect, + isPaywalled = false +}: DestinationTypePickerProps) { + const [selected, setSelected] = useState("http"); + + useEffect(() => { + if (open) setSelected("http"); + }, [open]); + + return ( + + + + Add Destination + + Choose a destination type to get started. + + + +
+ +
+
+ + + + + + +
+
+ ); +} + +// ── Main page ────────────────────────────────────────────────────────────────── + +export default function StreamingDestinationsPage() { + const { orgId } = useParams() as { orgId: string }; + const api = createApiClient(useEnvContext()); + const { isPaidUser } = usePaidStatus(); + const isEnterprise = isPaidUser(tierMatrix[TierFeature.SIEM]); + + const [destinations, setDestinations] = useState([]); + const [loading, setLoading] = useState(true); + const [modalOpen, setModalOpen] = useState(false); + const [typePickerOpen, setTypePickerOpen] = useState(false); + const [editingDestination, setEditingDestination] = + useState(null); + const [togglingIds, setTogglingIds] = useState>(new Set()); + + // Delete state + const [deleteTarget, setDeleteTarget] = useState(null); + const [deleteDialogOpen, setDeleteDialogOpen] = useState(false); + const [deleting, setDeleting] = useState(false); + + const loadDestinations = useCallback(async () => { + if (build == "oss") { + setDestinations([]); + setLoading(false); + return; + } + try { + const res = await api.get>( + `/org/${orgId}/event-streaming-destinations` + ); + setDestinations(res.data.data.destinations ?? []); + } catch (e) { + toast({ + variant: "destructive", + title: "Failed to load destinations", + description: formatAxiosError( + e, + "An unexpected error occurred." + ) + }); + } finally { + setLoading(false); + } + }, [orgId]); + + useEffect(() => { + loadDestinations(); + }, [loadDestinations]); + + const handleToggle = async (destinationId: number, enabled: boolean) => { + // Optimistic update + setDestinations((prev) => + prev.map((d) => + d.destinationId === destinationId ? { ...d, enabled } : d + ) + ); + setTogglingIds((prev) => new Set(prev).add(destinationId)); + + try { + await api.post( + `/org/${orgId}/event-streaming-destination/${destinationId}`, + { enabled } + ); + } catch (e) { + // Revert on failure + setDestinations((prev) => + prev.map((d) => + d.destinationId === destinationId + ? { ...d, enabled: !enabled } + : d + ) + ); + toast({ + variant: "destructive", + title: "Failed to update destination", + description: formatAxiosError( + e, + "An unexpected error occurred." + ) + }); + } finally { + setTogglingIds((prev) => { + const next = new Set(prev); + next.delete(destinationId); + return next; + }); + } + }; + + const handleDeleteCard = (destination: Destination) => { + setDeleteTarget(destination); + setDeleteDialogOpen(true); + }; + + const handleDeleteConfirm = async () => { + if (!deleteTarget) return; + setDeleting(true); + try { + await api.delete( + `/org/${orgId}/event-streaming-destination/${deleteTarget.destinationId}` + ); + toast({ title: "Destination deleted successfully" }); + setDeleteDialogOpen(false); + setDeleteTarget(null); + loadDestinations(); + } catch (e) { + toast({ + variant: "destructive", + title: "Failed to delete destination", + description: formatAxiosError( + e, + "An unexpected error occurred." + ) + }); + } finally { + setDeleting(false); + } + }; + + const openCreate = () => { + setTypePickerOpen(true); + }; + + const handleTypePicked = (_type: DestinationType) => { + setTypePickerOpen(false); + setEditingDestination(null); + setModalOpen(true); + }; + + const openEdit = (destination: Destination) => { + setEditingDestination(destination); + setModalOpen(true); + }; + + return ( + <> + + + + + {loading ? ( +
+ {Array.from({ length: 4 }).map((_, i) => ( +
+ ))} +
+ ) : ( +
+ {destinations.map((dest) => ( + + ))} + {/* Add card is always clickable — paywall is enforced inside the picker */} + +
+ )} + + + + + + {deleteTarget && ( + { + setDeleteDialogOpen(v); + if (!v) setDeleteTarget(null); + }} + string={ + parseHttpConfig(deleteTarget.config).name || "delete" + } + title="Delete Destination" + dialog={ +

+ Are you sure you want to delete{" "} + + {parseHttpConfig(deleteTarget.config).name || + "this destination"} + + ? All configuration will be permanently removed. +

+ } + buttonText="Delete Destination" + onConfirm={handleDeleteConfirm} + /> + )} + + ); +} diff --git a/src/app/[orgId]/settings/provisioning/keys/page.tsx b/src/app/[orgId]/settings/provisioning/keys/page.tsx index 9637b03b3..32a06706d 100644 --- a/src/app/[orgId]/settings/provisioning/keys/page.tsx +++ b/src/app/[orgId]/settings/provisioning/keys/page.tsx @@ -4,7 +4,7 @@ import { AxiosResponse } from "axios"; import { PaidFeaturesAlert } from "@app/components/PaidFeaturesAlert"; import SiteProvisioningKeysTable, { SiteProvisioningKeyRow -} from "../../../../../components/SiteProvisioningKeysTable"; +} from "@app/components/SiteProvisioningKeysTable"; import { ListSiteProvisioningKeysResponse } from "@server/routers/siteProvisioning/types"; import { getTranslations } from "next-intl/server"; import { TierFeature, tierMatrix } from "@server/lib/billing/tierMatrix"; diff --git a/src/app/[orgId]/settings/share-links/page.tsx b/src/app/[orgId]/settings/share-links/page.tsx index 3b52393cf..b41a3d1ce 100644 --- a/src/app/[orgId]/settings/share-links/page.tsx +++ b/src/app/[orgId]/settings/share-links/page.tsx @@ -9,7 +9,7 @@ import OrgProvider from "@app/providers/OrgProvider"; import { ListAccessTokensResponse } from "@server/routers/accessToken"; import ShareLinksTable, { ShareLinkRow -} from "../../../../components/ShareLinksTable"; +} from "@app/components/ShareLinksTable"; import { getTranslations } from "next-intl/server"; type ShareLinksPageProps = { diff --git a/src/app/[orgId]/settings/sites/[niceId]/layout.tsx b/src/app/[orgId]/settings/sites/[niceId]/layout.tsx index 2554403ea..aa02bb667 100644 --- a/src/app/[orgId]/settings/sites/[niceId]/layout.tsx +++ b/src/app/[orgId]/settings/sites/[niceId]/layout.tsx @@ -6,9 +6,9 @@ import { redirect } from "next/navigation"; import { authCookieHeader } from "@app/lib/api/cookies"; import { HorizontalTabs } from "@app/components/HorizontalTabs"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; -import SiteInfoCard from "../../../../../components/SiteInfoCard"; +import SiteInfoCard from "@app/components/SiteInfoCard"; import { getTranslations } from "next-intl/server"; -import { build } from "@server/build"; + interface SettingsLayoutProps { children: React.ReactNode; diff --git a/src/app/admin/api-keys/page.tsx b/src/app/admin/api-keys/page.tsx index 60195c4aa..ce468dd39 100644 --- a/src/app/admin/api-keys/page.tsx +++ b/src/app/admin/api-keys/page.tsx @@ -3,7 +3,7 @@ import { authCookieHeader } from "@app/lib/api/cookies"; import { AxiosResponse } from "axios"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import { ListRootApiKeysResponse } from "@server/routers/apiKeys"; -import ApiKeysTable, { ApiKeyRow } from "../../../components/ApiKeysTable"; +import ApiKeysTable, { ApiKeyRow } from "@app/components/ApiKeysTable"; import { getTranslations } from "next-intl/server"; type ApiKeyPageProps = {}; diff --git a/src/app/admin/idp/[idpId]/policies/page.tsx b/src/app/admin/idp/[idpId]/policies/page.tsx index 57ee3cf7b..60e8a094a 100644 --- a/src/app/admin/idp/[idpId]/policies/page.tsx +++ b/src/app/admin/idp/[idpId]/policies/page.tsx @@ -31,7 +31,7 @@ import { zodResolver } from "@hookform/resolvers/zod"; import { z } from "zod"; import { Alert, AlertDescription, AlertTitle } from "@app/components/ui/alert"; import { InfoIcon, ExternalLink, CheckIcon } from "lucide-react"; -import PolicyTable, { PolicyRow } from "../../../../../components/PolicyTable"; +import PolicyTable, { PolicyRow } from "@app/components/PolicyTable"; import { AxiosResponse } from "axios"; import { ListOrgsResponse } from "@server/routers/org"; import { ListRolesResponse } from "@server/routers/role"; diff --git a/src/app/admin/idp/page.tsx b/src/app/admin/idp/page.tsx index a341c0469..01aac2a1e 100644 --- a/src/app/admin/idp/page.tsx +++ b/src/app/admin/idp/page.tsx @@ -2,7 +2,7 @@ import { internal } from "@app/lib/api"; import { authCookieHeader } from "@app/lib/api/cookies"; import { AxiosResponse } from "axios"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; -import IdpTable, { IdpRow } from "../../../components/AdminIdpTable"; +import IdpTable, { IdpRow } from "@app/components/AdminIdpTable"; import { getTranslations } from "next-intl/server"; export default async function IdpPage() { diff --git a/src/app/admin/license/page.tsx b/src/app/admin/license/page.tsx index 3c444debb..b2322ec3b 100644 --- a/src/app/admin/license/page.tsx +++ b/src/app/admin/license/page.tsx @@ -6,7 +6,7 @@ import { createApiClient } from "@app/lib/api"; import { useEnvContext } from "@app/hooks/useEnvContext"; import { toast } from "@app/hooks/useToast"; import { formatAxiosError } from "@app/lib/api"; -import { LicenseKeysDataTable } from "../../../components/LicenseKeysDataTable"; +import { LicenseKeysDataTable } from "@app/components/LicenseKeysDataTable"; import { AxiosResponse } from "axios"; import { Button } from "@app/components/ui/button"; import { @@ -45,7 +45,7 @@ import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import { Check, Heart, InfoIcon } from "lucide-react"; import CopyTextBox from "@app/components/CopyTextBox"; import ConfirmDeleteDialog from "@app/components/ConfirmDeleteDialog"; -import { SitePriceCalculator } from "../../../components/SitePriceCalculator"; +import { SitePriceCalculator } from "@app/components/SitePriceCalculator"; import { Checkbox } from "@app/components/ui/checkbox"; import { Alert, AlertDescription, AlertTitle } from "@app/components/ui/alert"; import { useSupporterStatusContext } from "@app/hooks/useSupporterStatusContext"; diff --git a/src/app/admin/users/page.tsx b/src/app/admin/users/page.tsx index 7368cb253..2a000b34b 100644 --- a/src/app/admin/users/page.tsx +++ b/src/app/admin/users/page.tsx @@ -3,7 +3,7 @@ import { authCookieHeader } from "@app/lib/api/cookies"; import { AxiosResponse } from "axios"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import { AdminListUsersResponse } from "@server/routers/user/adminListUsers"; -import UsersTable, { GlobalUserRow } from "../../../components/AdminUsersTable"; +import UsersTable, { GlobalUserRow } from "@app/components/AdminUsersTable"; import { Alert, AlertDescription, AlertTitle } from "@app/components/ui/alert"; import { InfoIcon } from "lucide-react"; import { getTranslations } from "next-intl/server"; diff --git a/src/app/navigation.tsx b/src/app/navigation.tsx index 66e6cdad0..ac7a4a10f 100644 --- a/src/app/navigation.tsx +++ b/src/app/navigation.tsx @@ -23,6 +23,7 @@ import { Settings, SquareMousePointer, TicketCheck, + Unplug, User, UserCog, Users, @@ -196,6 +197,11 @@ export const orgNavSections = ( title: "sidebarLogsConnection", href: "/{orgId}/settings/logs/connection", icon: + }, + { + title: "sidebarLogsStreaming", + href: "/{orgId}/settings/logs/streaming", + icon: } ] : []) diff --git a/src/components/HttpDestinationCredenza.tsx b/src/components/HttpDestinationCredenza.tsx new file mode 100644 index 000000000..205c17c84 --- /dev/null +++ b/src/components/HttpDestinationCredenza.tsx @@ -0,0 +1,836 @@ +"use client"; + +import { useState, useEffect } from "react"; +import { + Credenza, + CredenzaBody, + CredenzaClose, + CredenzaContent, + CredenzaDescription, + CredenzaFooter, + CredenzaHeader, + CredenzaTitle +} from "@app/components/Credenza"; +import { Button } from "@app/components/ui/button"; +import { Input } from "@app/components/ui/input"; +import { Label } from "@app/components/ui/label"; +import { Switch } from "@app/components/ui/switch"; +import { HorizontalTabs } from "@app/components/HorizontalTabs"; +import { RadioGroup, RadioGroupItem } from "@app/components/ui/radio-group"; +import { Textarea } from "@app/components/ui/textarea"; +import { Checkbox } from "@app/components/ui/checkbox"; +import { Plus, X } from "lucide-react"; +import { createApiClient, formatAxiosError } from "@app/lib/api"; +import { useEnvContext } from "@app/hooks/useEnvContext"; +import { toast } from "@app/hooks/useToast"; +import { build } from "@server/build"; + +// ── Types ────────────────────────────────────────────────────────────────────── + +export type AuthType = "none" | "bearer" | "basic" | "custom"; + +export type PayloadFormat = "json_array" | "ndjson" | "json_single"; + +export interface HttpConfig { + name: string; + url: string; + authType: AuthType; + bearerToken?: string; + basicCredentials?: string; + customHeaderName?: string; + customHeaderValue?: string; + headers: Array<{ key: string; value: string }>; + format: PayloadFormat; + useBodyTemplate: boolean; + bodyTemplate?: string; +} + +export interface Destination { + destinationId: number; + orgId: string; + type: string; + config: string; + enabled: boolean; + sendAccessLogs: boolean; + sendActionLogs: boolean; + sendConnectionLogs: boolean; + sendRequestLogs: boolean; + createdAt: number; + updatedAt: number; +} + +// ── Helpers ──────────────────────────────────────────────────────────────────── + +export const defaultHttpConfig = (): HttpConfig => ({ + name: "", + url: "", + authType: "none", + bearerToken: "", + basicCredentials: "", + customHeaderName: "", + customHeaderValue: "", + headers: [], + format: "json_array", + useBodyTemplate: false, + bodyTemplate: "" +}); + +export function parseHttpConfig(raw: string): HttpConfig { + try { + return { ...defaultHttpConfig(), ...JSON.parse(raw) }; + } catch { + return defaultHttpConfig(); + } +} + +// ── Headers editor ───────────────────────────────────────────────────────────── + +interface HeadersEditorProps { + headers: Array<{ key: string; value: string }>; + onChange: (headers: Array<{ key: string; value: string }>) => void; +} + +function HeadersEditor({ headers, onChange }: HeadersEditorProps) { + const addRow = () => onChange([...headers, { key: "", value: "" }]); + + const removeRow = (i: number) => + onChange(headers.filter((_, idx) => idx !== i)); + + const updateRow = (i: number, field: "key" | "value", val: string) => { + const next = [...headers]; + next[i] = { ...next[i], [field]: val }; + onChange(next); + }; + + return ( +
+ {headers.length === 0 && ( +

+ No custom headers configured. Click "Add Header" to add + one. +

+ )} + {headers.map((h, i) => ( +
+ updateRow(i, "key", e.target.value)} + placeholder="Header name" + className="flex-1" + /> + + updateRow(i, "value", e.target.value) + } + placeholder="Value" + className="flex-1" + /> + +
+ ))} + +
+ ); +} + +// ── Component ────────────────────────────────────────────────────────────────── + +export interface HttpDestinationCredenzaProps { + open: boolean; + onOpenChange: (open: boolean) => void; + editing: Destination | null; + orgId: string; + onSaved: () => void; +} + +export function HttpDestinationCredenza({ + open, + onOpenChange, + editing, + orgId, + onSaved +}: HttpDestinationCredenzaProps) { + const api = createApiClient(useEnvContext()); + + const [saving, setSaving] = useState(false); + const [cfg, setCfg] = useState(defaultHttpConfig()); + const [sendAccessLogs, setSendAccessLogs] = useState(false); + const [sendActionLogs, setSendActionLogs] = useState(false); + const [sendConnectionLogs, setSendConnectionLogs] = useState(false); + const [sendRequestLogs, setSendRequestLogs] = useState(false); + + useEffect(() => { + if (open) { + setCfg( + editing ? parseHttpConfig(editing.config) : defaultHttpConfig() + ); + setSendAccessLogs(editing?.sendAccessLogs ?? false); + setSendActionLogs(editing?.sendActionLogs ?? false); + setSendConnectionLogs(editing?.sendConnectionLogs ?? false); + setSendRequestLogs(editing?.sendRequestLogs ?? false); + } + }, [open, editing]); + + const update = (patch: Partial) => + setCfg((prev) => ({ ...prev, ...patch })); + + const urlError: string | null = (() => { + const raw = cfg.url.trim(); + if (!raw) return null; + try { + const parsed = new URL(raw); + if ( + parsed.protocol !== "http:" && + parsed.protocol !== "https:" + ) { + return "URL must use http or https"; + } + if (build === "saas" && parsed.protocol !== "https:") { + return "HTTPS is required on cloud deployments"; + } + return null; + } catch { + return "Enter a valid URL (e.g. https://example.com/webhook)"; + } + })(); + + const isValid = + cfg.name.trim() !== "" && + cfg.url.trim() !== "" && + urlError === null; + + async function handleSave() { + if (!isValid) return; + setSaving(true); + try { + const payload = { + type: "http", + config: JSON.stringify(cfg), + sendAccessLogs, + sendActionLogs, + sendConnectionLogs, + sendRequestLogs + }; + if (editing) { + await api.post( + `/org/${orgId}/event-streaming-destination/${editing.destinationId}`, + payload + ); + toast({ title: "Destination updated successfully" }); + } else { + await api.put( + `/org/${orgId}/event-streaming-destination`, + payload + ); + toast({ title: "Destination created successfully" }); + } + onSaved(); + onOpenChange(false); + } catch (e) { + toast({ + variant: "destructive", + title: editing + ? "Failed to update destination" + : "Failed to create destination", + description: formatAxiosError( + e, + "An unexpected error occurred." + ) + }); + } finally { + setSaving(false); + } + } + + return ( + + + + + {editing + ? "Edit Destination" + : "Add HTTP Destination"} + + + {editing + ? "Update the configuration for this HTTP event streaming destination." + : "Configure a new HTTP endpoint to receive your organization's events."} + + + + + + {/* ── Settings tab ────────────────────────────── */} +
+ {/* Name */} +
+ + + update({ name: e.target.value }) + } + /> +
+ + {/* URL */} +
+ + + update({ url: e.target.value }) + } + /> + {urlError && ( +

+ {urlError} +

+ )} +
+ + {/* Authentication */} +
+
+ +

+ Choose how requests to your endpoint + are authenticated. +

+
+ + + update({ authType: v as AuthType }) + } + className="gap-2" + > + {/* None */} +
+ +
+ +

+ Sends requests without an{" "} + + Authorization + {" "} + header. +

+
+
+ + {/* Bearer */} +
+ +
+
+ +

+ Adds an{" "} + + Authorization: Bearer + <token> + {" "} + header to each request. +

+
+ {cfg.authType === "bearer" && ( + + update({ + bearerToken: + e.target.value + }) + } + /> + )} +
+
+ + {/* Basic */} +
+ +
+
+ +

+ Adds an{" "} + + Authorization: Basic + <credentials> + {" "} + header. Provide credentials + as{" "} + + username:password + + . +

+
+ {cfg.authType === "basic" && ( + + update({ + basicCredentials: + e.target.value + }) + } + /> + )} +
+
+ + {/* Custom */} +
+ +
+
+ +

+ Specify a custom HTTP + header name and value for + authentication (e.g.{" "} + + X-API-Key + + ). +

+
+ {cfg.authType === "custom" && ( +
+ + update({ + customHeaderName: + e.target + .value + }) + } + className="flex-1" + /> + + update({ + customHeaderValue: + e.target + .value + }) + } + className="flex-1" + /> +
+ )} +
+
+
+
+
+ + {/* ── Headers tab ──────────────────────────────── */} +
+
+ +

+ Add custom headers to every outgoing + request. Useful for static tokens or + custom{" "} + + Content-Type + + . By default,{" "} + + Content-Type: application/json + {" "} + is sent. +

+
+ update({ headers })} + /> +
+ + {/* ── Body tab ─────────────────────────── */} +
+
+ +

+ Control the JSON payload structure sent to + your endpoint. If disabled, a default JSON + object is sent for each event. +

+
+ +
+ + update({ useBodyTemplate: v }) + } + /> + +
+ + {cfg.useBodyTemplate && ( +
+ +