Merge branch 'siem' into dev

This commit is contained in:
Owen
2026-03-31 14:20:46 -07:00
37 changed files with 3659 additions and 222 deletions

View File

@@ -2460,6 +2460,7 @@
"connectionLogs": "Connection Logs", "connectionLogs": "Connection Logs",
"connectionLogsDescription": "View connection logs for tunnels in this organization", "connectionLogsDescription": "View connection logs for tunnels in this organization",
"sidebarLogsConnection": "Connection Logs", "sidebarLogsConnection": "Connection Logs",
"sidebarLogsStreaming": "Streaming",
"sourceAddress": "Source Address", "sourceAddress": "Source Address",
"destinationAddress": "Destination Address", "destinationAddress": "Destination Address",
"duration": "Duration", "duration": "Duration",

BIN
public/third-party/dd.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 73 KiB

BIN
public/third-party/s3.png vendored Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 13 KiB

View File

@@ -140,7 +140,11 @@ export enum ActionsEnum {
exportLogs = "exportLogs", exportLogs = "exportLogs",
listApprovals = "listApprovals", listApprovals = "listApprovals",
updateApprovals = "updateApprovals", updateApprovals = "updateApprovals",
signSshKey = "signSshKey" signSshKey = "signSshKey",
createEventStreamingDestination = "createEventStreamingDestination",
updateEventStreamingDestination = "updateEventStreamingDestination",
deleteEventStreamingDestination = "deleteEventStreamingDestination",
listEventStreamingDestinations = "listEventStreamingDestinations"
} }
export async function checkUserActionPermission( export async function checkUserActionPermission(

View File

@@ -8,7 +8,8 @@ import {
real, real,
text, text,
index, index,
primaryKey primaryKey,
uniqueIndex
} from "drizzle-orm/pg-core"; } from "drizzle-orm/pg-core";
import { InferSelectModel } from "drizzle-orm"; import { InferSelectModel } from "drizzle-orm";
import { import {
@@ -417,6 +418,46 @@ export const siteProvisioningKeyOrg = pgTable(
] ]
); );
export const eventStreamingDestinations = pgTable(
"eventStreamingDestinations",
{
destinationId: serial("destinationId").primaryKey(),
orgId: varchar("orgId", { length: 255 })
.notNull()
.references(() => orgs.orgId, { onDelete: "cascade" }),
sendConnectionLogs: boolean("sendConnectionLogs").notNull().default(false),
sendRequestLogs: boolean("sendRequestLogs").notNull().default(false),
sendActionLogs: boolean("sendActionLogs").notNull().default(false),
sendAccessLogs: boolean("sendAccessLogs").notNull().default(false),
type: varchar("type", { length: 50 }).notNull(), // e.g. "http", "kafka", etc.
config: text("config").notNull(), // JSON string with the configuration for the destination
enabled: boolean("enabled").notNull().default(true),
createdAt: bigint("createdAt", { mode: "number" }).notNull(),
updatedAt: bigint("updatedAt", { mode: "number" }).notNull()
}
);
export const eventStreamingCursors = pgTable(
"eventStreamingCursors",
{
cursorId: serial("cursorId").primaryKey(),
destinationId: integer("destinationId")
.notNull()
.references(() => eventStreamingDestinations.destinationId, {
onDelete: "cascade"
}),
logType: varchar("logType", { length: 50 }).notNull(), // "request" | "action" | "access" | "connection"
lastSentId: bigint("lastSentId", { mode: "number" }).notNull().default(0),
lastSentAt: bigint("lastSentAt", { mode: "number" }) // epoch milliseconds, null if never sent
},
(table) => [
uniqueIndex("idx_eventStreamingCursors_dest_type").on(
table.destinationId,
table.logType
)
]
);
export type Approval = InferSelectModel<typeof approvals>; export type Approval = InferSelectModel<typeof approvals>;
export type Limit = InferSelectModel<typeof limits>; export type Limit = InferSelectModel<typeof limits>;
export type Account = InferSelectModel<typeof account>; export type Account = InferSelectModel<typeof account>;
@@ -439,3 +480,18 @@ export type LoginPageBranding = InferSelectModel<typeof loginPageBranding>;
export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>; export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>;
export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>; export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>;
export type ConnectionAuditLog = InferSelectModel<typeof connectionAuditLog>; export type ConnectionAuditLog = InferSelectModel<typeof connectionAuditLog>;
export type SessionTransferToken = InferSelectModel<
typeof sessionTransferToken
>;
export type BannedEmail = InferSelectModel<typeof bannedEmails>;
export type BannedIp = InferSelectModel<typeof bannedIps>;
export type SiteProvisioningKey = InferSelectModel<typeof siteProvisioningKeys>;
export type SiteProvisioningKeyOrg = InferSelectModel<
typeof siteProvisioningKeyOrg
>;
export type EventStreamingDestination = InferSelectModel<
typeof eventStreamingDestinations
>;
export type EventStreamingCursor = InferSelectModel<
typeof eventStreamingCursors
>;

View File

@@ -5,9 +5,19 @@ import {
primaryKey, primaryKey,
real, real,
sqliteTable, sqliteTable,
text text,
uniqueIndex
} from "drizzle-orm/sqlite-core"; } from "drizzle-orm/sqlite-core";
import { clients, domains, exitNodes, orgs, sessions, siteResources, sites, users } from "./schema"; import {
clients,
domains,
exitNodes,
orgs,
sessions,
siteResources,
sites,
users
} from "./schema";
export const certificates = sqliteTable("certificates", { export const certificates = sqliteTable("certificates", {
certId: integer("certId").primaryKey({ autoIncrement: true }), certId: integer("certId").primaryKey({ autoIncrement: true }),
@@ -401,6 +411,50 @@ export const siteProvisioningKeyOrg = sqliteTable(
] ]
); );
export const eventStreamingDestinations = sqliteTable(
"eventStreamingDestinations",
{
destinationId: integer("destinationId").primaryKey({
autoIncrement: true
}),
orgId: text("orgId")
.notNull()
.references(() => orgs.orgId, { onDelete: "cascade" }),
sendConnectionLogs: integer("sendConnectionLogs", { mode: "boolean" }).notNull().default(false),
sendRequestLogs: integer("sendRequestLogs", { mode: "boolean" }).notNull().default(false),
sendActionLogs: integer("sendActionLogs", { mode: "boolean" }).notNull().default(false),
sendAccessLogs: integer("sendAccessLogs", { mode: "boolean" }).notNull().default(false),
type: text("type").notNull(), // e.g. "http", "kafka", etc.
config: text("config").notNull(), // JSON string with the configuration for the destination
enabled: integer("enabled", { mode: "boolean" })
.notNull()
.default(true),
createdAt: integer("createdAt").notNull(),
updatedAt: integer("updatedAt").notNull()
}
);
export const eventStreamingCursors = sqliteTable(
"eventStreamingCursors",
{
cursorId: integer("cursorId").primaryKey({ autoIncrement: true }),
destinationId: integer("destinationId")
.notNull()
.references(() => eventStreamingDestinations.destinationId, {
onDelete: "cascade"
}),
logType: text("logType").notNull(), // "request" | "action" | "access" | "connection"
lastSentId: integer("lastSentId").notNull().default(0),
lastSentAt: integer("lastSentAt") // epoch milliseconds, null if never sent
},
(table) => [
uniqueIndex("idx_eventStreamingCursors_dest_type").on(
table.destinationId,
table.logType
)
]
);
export type Approval = InferSelectModel<typeof approvals>; export type Approval = InferSelectModel<typeof approvals>;
export type Limit = InferSelectModel<typeof limits>; export type Limit = InferSelectModel<typeof limits>;
export type Account = InferSelectModel<typeof account>; export type Account = InferSelectModel<typeof account>;
@@ -423,3 +477,12 @@ export type LoginPageBranding = InferSelectModel<typeof loginPageBranding>;
export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>; export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>;
export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>; export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>;
export type ConnectionAuditLog = InferSelectModel<typeof connectionAuditLog>; export type ConnectionAuditLog = InferSelectModel<typeof connectionAuditLog>;
export type BannedEmail = InferSelectModel<typeof bannedEmails>;
export type BannedIp = InferSelectModel<typeof bannedIps>;
export type SiteProvisioningKey = InferSelectModel<typeof siteProvisioningKeys>;
export type EventStreamingDestination = InferSelectModel<
typeof eventStreamingDestinations
>;
export type EventStreamingCursor = InferSelectModel<
typeof eventStreamingCursors
>;

View File

@@ -18,7 +18,8 @@ export enum TierFeature {
AutoProvisioning = "autoProvisioning", // handle downgrade by disabling auto provisioning AutoProvisioning = "autoProvisioning", // handle downgrade by disabling auto provisioning
SshPam = "sshPam", SshPam = "sshPam",
FullRbac = "fullRbac", FullRbac = "fullRbac",
SiteProvisioningKeys = "siteProvisioningKeys" // handle downgrade by revoking keys if needed SiteProvisioningKeys = "siteProvisioningKeys", // handle downgrade by revoking keys if needed
SIEM = "siem" // handle downgrade by disabling SIEM integrations
} }
export const tierMatrix: Record<TierFeature, Tier[]> = { export const tierMatrix: Record<TierFeature, Tier[]> = {
@@ -54,5 +55,6 @@ export const tierMatrix: Record<TierFeature, Tier[]> = {
[TierFeature.AutoProvisioning]: ["tier1", "tier3", "enterprise"], [TierFeature.AutoProvisioning]: ["tier1", "tier3", "enterprise"],
[TierFeature.SshPam]: ["tier1", "tier3", "enterprise"], [TierFeature.SshPam]: ["tier1", "tier3", "enterprise"],
[TierFeature.FullRbac]: ["tier1", "tier2", "tier3", "enterprise"], [TierFeature.FullRbac]: ["tier1", "tier2", "tier3", "enterprise"],
[TierFeature.SiteProvisioningKeys]: ["enterprise"] [TierFeature.SiteProvisioningKeys]: ["tier3", "enterprise"],
[TierFeature.SIEM]: ["enterprise"]
}; };

View File

@@ -12,6 +12,7 @@
*/ */
import { rateLimitService } from "#private/lib/rateLimit"; import { rateLimitService } from "#private/lib/rateLimit";
import { logStreamingManager } from "#private/lib/logStreaming";
import { cleanup as wsCleanup } from "#private/routers/ws"; import { cleanup as wsCleanup } from "#private/routers/ws";
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
import { flushConnectionLogToDb } from "#private/routers/newt"; import { flushConnectionLogToDb } from "#private/routers/newt";
@@ -25,6 +26,7 @@ async function cleanup() {
await flushSiteBandwidthToDb(); await flushSiteBandwidthToDb();
await rateLimitService.cleanup(); await rateLimitService.cleanup();
await wsCleanup(); await wsCleanup();
await logStreamingManager.shutdown();
process.exit(0); process.exit(0);
} }

View File

@@ -0,0 +1,234 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { logsDb, connectionAuditLog } from "@server/db";
import logger from "@server/logger";
import { and, eq, lt } from "drizzle-orm";
import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs";
// ---------------------------------------------------------------------------
// Retry configuration for deadlock handling
// ---------------------------------------------------------------------------
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50;
// ---------------------------------------------------------------------------
// Buffer / flush configuration
// ---------------------------------------------------------------------------
/** How often to flush accumulated connection log data to the database. */
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
/** Maximum number of records to buffer before forcing a flush. */
const MAX_BUFFERED_RECORDS = 500;
/** Maximum number of records to insert in a single database batch. */
const INSERT_BATCH_SIZE = 100;
// ---------------------------------------------------------------------------
// Types
// ---------------------------------------------------------------------------
export interface ConnectionLogRecord {
sessionId: string;
siteResourceId: number;
orgId: string;
siteId: number;
clientId: number | null;
userId: string | null;
sourceAddr: string;
destAddr: string;
protocol: string;
startedAt: number; // epoch seconds
endedAt: number | null;
bytesTx: number | null;
bytesRx: number | null;
}
// ---------------------------------------------------------------------------
// In-memory buffer
// ---------------------------------------------------------------------------
let buffer: ConnectionLogRecord[] = [];
// ---------------------------------------------------------------------------
// Deadlock helpers
// ---------------------------------------------------------------------------
function isDeadlockError(error: any): boolean {
return (
error?.code === "40P01" ||
error?.cause?.code === "40P01" ||
(error?.message && error.message.includes("deadlock"))
);
}
async function withDeadlockRetry<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isDeadlockError(error) && attempt < MAX_RETRIES) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}
// ---------------------------------------------------------------------------
// Flush
// ---------------------------------------------------------------------------
/**
* Flush all buffered connection log records to the database.
*
* Swaps out the buffer before writing so that any records added during the
* flush are captured in the new buffer rather than being lost. Entries that
* fail to write are re-queued back into the buffer so they will be retried
* on the next flush.
*
* This function is exported so that the application's graceful-shutdown
* cleanup handler can call it before the process exits.
*/
export async function flushConnectionLogToDb(): Promise<void> {
if (buffer.length === 0) {
return;
}
// Atomically swap out the buffer so new data keeps flowing in
const snapshot = buffer;
buffer = [];
logger.debug(
`Flushing ${snapshot.length} connection log record(s) to the database`
);
for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) {
const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE);
try {
await withDeadlockRetry(async () => {
await logsDb.insert(connectionAuditLog).values(batch);
}, `flush connection log batch (${batch.length} records)`);
} catch (error) {
logger.error(
`Failed to flush connection log batch of ${batch.length} records:`,
error
);
// Re-queue the failed batch so it is retried on the next flush
buffer = [...batch, ...buffer];
// Cap buffer to prevent unbounded growth if the DB is unreachable
const hardLimit = MAX_BUFFERED_RECORDS * 5;
if (buffer.length > hardLimit) {
const dropped = buffer.length - hardLimit;
buffer = buffer.slice(0, hardLimit);
logger.warn(
`Connection log buffer overflow, dropped ${dropped} oldest records`
);
}
// Stop processing further batches from this snapshot — they will
// be picked up via the re-queued records on the next flush.
const remaining = snapshot.slice(i + INSERT_BATCH_SIZE);
if (remaining.length > 0) {
buffer = [...remaining, ...buffer];
}
break;
}
}
}
// ---------------------------------------------------------------------------
// Periodic flush timer
// ---------------------------------------------------------------------------
const flushTimer = setInterval(async () => {
try {
await flushConnectionLogToDb();
} catch (error) {
logger.error(
"Unexpected error during periodic connection log flush:",
error
);
}
}, FLUSH_INTERVAL_MS);
// Calling unref() means this timer will not keep the Node.js event loop alive
// on its own — the process can still exit normally when there is no other work
// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly
// before process.exit(), so no data is lost.
flushTimer.unref();
// ---------------------------------------------------------------------------
// Cleanup
// ---------------------------------------------------------------------------
export async function cleanUpOldLogs(
orgId: string,
retentionDays: number
): Promise<void> {
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
try {
await logsDb
.delete(connectionAuditLog)
.where(
and(
lt(connectionAuditLog.startedAt, cutoffTimestamp),
eq(connectionAuditLog.orgId, orgId)
)
);
} catch (error) {
logger.error("Error cleaning up old connection audit logs:", error);
}
}
// ---------------------------------------------------------------------------
// Public logging entry-point
// ---------------------------------------------------------------------------
/**
* Buffer a single connection log record for eventual persistence.
*
* Records are written to the database in batches either when the buffer
* reaches MAX_BUFFERED_RECORDS or when the periodic flush timer fires.
*/
export function logConnectionAudit(record: ConnectionLogRecord): void {
buffer.push(record);
if (buffer.length >= MAX_BUFFERED_RECORDS) {
// Fire and forget — errors are handled inside flushConnectionLogToDb
flushConnectionLogToDb().catch((error) => {
logger.error(
"Unexpected error during size-triggered connection log flush:",
error
);
});
}
}

View File

@@ -0,0 +1,773 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import {
db,
logsDb,
eventStreamingDestinations,
eventStreamingCursors,
requestAuditLog,
actionAuditLog,
accessAuditLog,
connectionAuditLog
} from "@server/db";
import logger from "@server/logger";
import { and, eq, gt, desc, max, sql } from "drizzle-orm";
import {
LogType,
LOG_TYPES,
LogEvent,
DestinationFailureState,
HttpConfig
} from "./types";
import { LogDestinationProvider } from "./providers/LogDestinationProvider";
import { HttpLogDestination } from "./providers/HttpLogDestination";
import type { EventStreamingDestination } from "@server/db";
// ---------------------------------------------------------------------------
// Configuration
// ---------------------------------------------------------------------------
/**
* How often (ms) the manager polls all destinations for new log records.
* Destinations that were behind (full batch returned) will be re-polled
* immediately without waiting for this interval.
*/
const POLL_INTERVAL_MS = 30_000;
/**
* Maximum number of log records fetched from the DB in a single query.
* This also controls the maximum size of one HTTP POST body.
*/
const BATCH_SIZE = 250;
/**
* Minimum delay (ms) between consecutive HTTP requests to the same destination
* during a catch-up run. Prevents bursting thousands of requests back-to-back
* when a destination has fallen behind.
*/
const INTER_BATCH_DELAY_MS = 100;
/**
* Maximum number of consecutive back-to-back batches to process for a single
* destination per poll cycle. After this limit the destination will wait for
* the next scheduled poll before continuing, giving other destinations a turn.
*/
const MAX_CATCHUP_BATCHES = 20;
/**
* Back-off schedule (ms) indexed by consecutive failure count.
* After the last entry the max value is re-used.
*/
const BACKOFF_SCHEDULE_MS = [
60_000, // 1 min (failure 1)
2 * 60_000, // 2 min (failure 2)
5 * 60_000, // 5 min (failure 3)
10 * 60_000, // 10 min (failure 4)
30 * 60_000 // 30 min (failure 5+)
];
/**
* If a destination has been continuously unreachable for this long, its
* cursors are advanced to the current max row id and the backlog is silently
* discarded. This prevents unbounded queue growth when a webhook endpoint is
* down for an extended period. A prominent warning is logged so operators are
* aware logs were dropped.
*
* Default: 24 hours.
*/
const MAX_BACKLOG_DURATION_MS = 24 * 60 * 60_000;
// ---------------------------------------------------------------------------
// LogStreamingManager
// ---------------------------------------------------------------------------
/**
* Orchestrates periodic polling of the four audit-log tables and forwards new
* records to every enabled event-streaming destination.
*
* ### Design
* - **Interval-based**: a timer fires every `POLL_INTERVAL_MS`. On each tick
* every enabled destination is processed in sequence.
* - **Cursor-based**: the last successfully forwarded row `id` is persisted in
* the `eventStreamingCursors` table so state survives restarts.
* - **Catch-up**: if a full batch is returned the destination is immediately
* re-queried (up to `MAX_CATCHUP_BATCHES` times) before yielding.
* - **Smoothing**: `INTER_BATCH_DELAY_MS` is inserted between consecutive
* catch-up batches to avoid hammering the remote endpoint.
* - **Back-off**: consecutive send failures trigger exponential back-off
* (tracked in-memory per destination). Successful sends reset the counter.
* - **Backlog abandonment**: if a destination remains unreachable for longer
* than `MAX_BACKLOG_DURATION_MS`, all cursors for that destination are
* advanced to the current max id so the backlog is discarded and streaming
* resumes from the present moment on recovery.
*/
export class LogStreamingManager {
private pollTimer: ReturnType<typeof setTimeout> | null = null;
private isRunning = false;
private isPolling = false;
/** In-memory back-off state keyed by destinationId. */
private readonly failures = new Map<number, DestinationFailureState>();
// -------------------------------------------------------------------------
// Lifecycle
// -------------------------------------------------------------------------
start(): void {
if (this.isRunning) return;
this.isRunning = true;
logger.info("LogStreamingManager: started");
this.schedulePoll(POLL_INTERVAL_MS);
}
// -------------------------------------------------------------------------
// Cursor initialisation (call this when a destination is first created)
// -------------------------------------------------------------------------
/**
* Eagerly seed cursors for every log type at the **current** max row id of
* each table, scoped to the destination's org.
*
* Call this immediately after inserting a new row into
* `eventStreamingDestinations` so the destination only receives events
* that were written *after* it was created. If a cursor row already exists
* (e.g. the method is called twice) it is left untouched.
*
* The manager also has a lazy fallback inside `getOrCreateCursor` for
* destinations that existed before this method was introduced.
*/
async initializeCursorsForDestination(
destinationId: number,
orgId: string
): Promise<void> {
for (const logType of LOG_TYPES) {
const currentMaxId = await this.getCurrentMaxId(logType, orgId);
try {
await db
.insert(eventStreamingCursors)
.values({
destinationId,
logType,
lastSentId: currentMaxId,
lastSentAt: null
})
.onConflictDoNothing();
} catch (err) {
logger.warn(
`LogStreamingManager: could not initialise cursor for ` +
`destination ${destinationId} logType="${logType}"`,
err
);
}
}
logger.debug(
`LogStreamingManager: cursors initialised for destination ${destinationId} ` +
`(org=${orgId})`
);
}
async shutdown(): Promise<void> {
this.isRunning = false;
if (this.pollTimer !== null) {
clearTimeout(this.pollTimer);
this.pollTimer = null;
}
// Wait for any in-progress poll to finish before returning so that
// callers (graceful-shutdown handlers) can safely exit afterward.
const deadline = Date.now() + 15_000;
while (this.isPolling && Date.now() < deadline) {
await sleep(100);
}
logger.info("LogStreamingManager: stopped");
}
// -------------------------------------------------------------------------
// Scheduling
// -------------------------------------------------------------------------
private schedulePoll(delayMs: number): void {
this.pollTimer = setTimeout(() => {
this.pollTimer = null;
this.runPoll()
.catch((err) =>
logger.error("LogStreamingManager: unexpected poll error", err)
)
.finally(() => {
if (this.isRunning) {
this.schedulePoll(POLL_INTERVAL_MS);
}
});
}, delayMs);
// Do not keep the event loop alive just for the poll timer the
// graceful-shutdown path calls shutdown() explicitly.
this.pollTimer.unref?.();
}
// -------------------------------------------------------------------------
// Poll cycle
// -------------------------------------------------------------------------
private async runPoll(): Promise<void> {
if (this.isPolling) return; // previous poll still running skip
this.isPolling = true;
try {
const destinations = await this.loadEnabledDestinations();
if (destinations.length === 0) return;
for (const dest of destinations) {
if (!this.isRunning) break;
await this.processDestination(dest).catch((err) => {
// Individual destination errors must never abort the whole cycle
logger.error(
`LogStreamingManager: unhandled error for destination ${dest.destinationId}`,
err
);
});
}
} finally {
this.isPolling = false;
}
}
// -------------------------------------------------------------------------
// Per-destination processing
// -------------------------------------------------------------------------
private async processDestination(
dest: EventStreamingDestination
): Promise<void> {
const failState = this.failures.get(dest.destinationId);
// Check whether this destination has been unreachable long enough that
// we should give up on the accumulated backlog.
if (failState) {
const failingForMs = Date.now() - failState.firstFailedAt;
if (failingForMs >= MAX_BACKLOG_DURATION_MS) {
await this.abandonBacklog(dest, failState);
this.failures.delete(dest.destinationId);
// Cursors now point to the current head retry on next poll.
return;
}
}
// Check regular exponential back-off window
if (failState && Date.now() < failState.nextRetryAt) {
logger.debug(
`LogStreamingManager: destination ${dest.destinationId} in back-off, skipping`
);
return;
}
// Parse config skip destination if config is unparseable
let config: HttpConfig;
try {
config = JSON.parse(dest.config) as HttpConfig;
} catch (err) {
logger.error(
`LogStreamingManager: destination ${dest.destinationId} has invalid JSON config`,
err
);
return;
}
const provider = this.createProvider(dest.type, config);
if (!provider) {
logger.warn(
`LogStreamingManager: unsupported destination type "${dest.type}" ` +
`for destination ${dest.destinationId} skipping`
);
return;
}
const enabledTypes: LogType[] = [];
if (dest.sendRequestLogs) enabledTypes.push("request");
if (dest.sendActionLogs) enabledTypes.push("action");
if (dest.sendAccessLogs) enabledTypes.push("access");
if (dest.sendConnectionLogs) enabledTypes.push("connection");
if (enabledTypes.length === 0) return;
let anyFailure = false;
for (const logType of enabledTypes) {
if (!this.isRunning) break;
try {
await this.processLogType(dest, provider, logType);
} catch (err) {
anyFailure = true;
logger.error(
`LogStreamingManager: failed to process "${logType}" logs ` +
`for destination ${dest.destinationId}`,
err
);
}
}
if (anyFailure) {
this.recordFailure(dest.destinationId);
} else {
// Any success resets the failure/back-off state
if (this.failures.has(dest.destinationId)) {
this.failures.delete(dest.destinationId);
logger.info(
`LogStreamingManager: destination ${dest.destinationId} recovered`
);
}
}
}
/**
* Advance every cursor for the destination to the current max row id,
* effectively discarding the accumulated backlog. Called when the
* destination has been unreachable for longer than MAX_BACKLOG_DURATION_MS.
*/
private async abandonBacklog(
dest: EventStreamingDestination,
failState: DestinationFailureState
): Promise<void> {
const failingForHours = (
(Date.now() - failState.firstFailedAt) /
3_600_000
).toFixed(1);
let totalDropped = 0;
for (const logType of LOG_TYPES) {
try {
const currentMaxId = await this.getCurrentMaxId(
logType,
dest.orgId
);
// Find out how many rows are being skipped for this type
const cursor = await db
.select({ lastSentId: eventStreamingCursors.lastSentId })
.from(eventStreamingCursors)
.where(
and(
eq(eventStreamingCursors.destinationId, dest.destinationId),
eq(eventStreamingCursors.logType, logType)
)
)
.limit(1);
const prevId = cursor[0]?.lastSentId ?? currentMaxId;
totalDropped += Math.max(0, currentMaxId - prevId);
await this.updateCursor(
dest.destinationId,
logType,
currentMaxId
);
} catch (err) {
logger.error(
`LogStreamingManager: failed to advance cursor for ` +
`destination ${dest.destinationId} logType="${logType}" ` +
`during backlog abandonment`,
err
);
}
}
logger.warn(
`LogStreamingManager: destination ${dest.destinationId} has been ` +
`unreachable for ${failingForHours}h ` +
`(${failState.consecutiveFailures} consecutive failures). ` +
`Discarding backlog of ~${totalDropped} log event(s) and ` +
`resuming from the current position. ` +
`Verify the destination URL and credentials.`
);
}
/**
* Forward all pending log records of a specific type for a destination.
*
* Fetches up to `BATCH_SIZE` records at a time. If the batch is full
* (indicating more records may exist) it loops immediately, inserting a
* short delay between consecutive requests to the remote endpoint.
* The loop is capped at `MAX_CATCHUP_BATCHES` to keep the poll cycle
* bounded.
*/
private async processLogType(
dest: EventStreamingDestination,
provider: LogDestinationProvider,
logType: LogType
): Promise<void> {
// Ensure a cursor row exists (creates one pointing at the current max
// id so we do not replay historical logs on first run)
const cursor = await this.getOrCreateCursor(
dest.destinationId,
logType,
dest.orgId
);
let lastSentId = cursor.lastSentId;
let batchCount = 0;
while (batchCount < MAX_CATCHUP_BATCHES) {
const rows = await this.fetchLogs(
logType,
dest.orgId,
lastSentId,
BATCH_SIZE
);
if (rows.length === 0) break;
const events = rows.map((row) =>
this.rowToLogEvent(logType, row)
);
// Throws on failure caught by the caller which applies back-off
await provider.send(events);
lastSentId = rows[rows.length - 1].id;
await this.updateCursor(dest.destinationId, logType, lastSentId);
batchCount++;
if (rows.length < BATCH_SIZE) {
// Partial batch means we have caught up
break;
}
// Full batch there are likely more records; pause briefly before
// fetching the next batch to smooth out the HTTP request rate
if (batchCount < MAX_CATCHUP_BATCHES) {
await sleep(INTER_BATCH_DELAY_MS);
}
}
}
// -------------------------------------------------------------------------
// Cursor management
// -------------------------------------------------------------------------
private async getOrCreateCursor(
destinationId: number,
logType: LogType,
orgId: string
): Promise<{ lastSentId: number }> {
// Try to read an existing cursor
const existing = await db
.select({
lastSentId: eventStreamingCursors.lastSentId
})
.from(eventStreamingCursors)
.where(
and(
eq(eventStreamingCursors.destinationId, destinationId),
eq(eventStreamingCursors.logType, logType)
)
)
.limit(1);
if (existing.length > 0) {
return { lastSentId: existing[0].lastSentId };
}
// No cursor yet this destination pre-dates the eager initialisation
// path (initializeCursorsForDestination). Seed at the current max id
// so we do not replay historical logs.
const initialId = await this.getCurrentMaxId(logType, orgId);
// Use onConflictDoNothing in case of a rare race between two poll
// cycles both hitting this branch simultaneously.
await db
.insert(eventStreamingCursors)
.values({
destinationId,
logType,
lastSentId: initialId,
lastSentAt: null
})
.onConflictDoNothing();
logger.debug(
`LogStreamingManager: lazily initialised cursor for destination ${destinationId} ` +
`logType="${logType}" at id=${initialId} ` +
`(prefer initializeCursorsForDestination at creation time)`
);
return { lastSentId: initialId };
}
private async updateCursor(
destinationId: number,
logType: LogType,
lastSentId: number
): Promise<void> {
await db
.update(eventStreamingCursors)
.set({
lastSentId,
lastSentAt: Date.now()
})
.where(
and(
eq(eventStreamingCursors.destinationId, destinationId),
eq(eventStreamingCursors.logType, logType)
)
);
}
/**
* Returns the current maximum `id` in the given log table for the org.
* Returns 0 when the table is empty.
*/
private async getCurrentMaxId(
logType: LogType,
orgId: string
): Promise<number> {
try {
switch (logType) {
case "request": {
const [row] = await logsDb
.select({ maxId: max(requestAuditLog.id) })
.from(requestAuditLog)
.where(eq(requestAuditLog.orgId, orgId));
return row?.maxId ?? 0;
}
case "action": {
const [row] = await logsDb
.select({ maxId: max(actionAuditLog.id) })
.from(actionAuditLog)
.where(eq(actionAuditLog.orgId, orgId));
return row?.maxId ?? 0;
}
case "access": {
const [row] = await logsDb
.select({ maxId: max(accessAuditLog.id) })
.from(accessAuditLog)
.where(eq(accessAuditLog.orgId, orgId));
return row?.maxId ?? 0;
}
case "connection": {
const [row] = await logsDb
.select({ maxId: max(connectionAuditLog.id) })
.from(connectionAuditLog)
.where(eq(connectionAuditLog.orgId, orgId));
return row?.maxId ?? 0;
}
}
} catch (err) {
logger.warn(
`LogStreamingManager: could not determine current max id for ` +
`logType="${logType}", defaulting to 0`,
err
);
return 0;
}
}
// -------------------------------------------------------------------------
// Log fetching
// -------------------------------------------------------------------------
/**
* Fetch up to `limit` log rows with `id > afterId`, ordered by id ASC,
* filtered to the given organisation.
*/
private async fetchLogs(
logType: LogType,
orgId: string,
afterId: number,
limit: number
): Promise<Array<Record<string, unknown> & { id: number }>> {
switch (logType) {
case "request":
return (await logsDb
.select()
.from(requestAuditLog)
.where(
and(
eq(requestAuditLog.orgId, orgId),
gt(requestAuditLog.id, afterId)
)
)
.orderBy(requestAuditLog.id)
.limit(limit)) as Array<
Record<string, unknown> & { id: number }
>;
case "action":
return (await logsDb
.select()
.from(actionAuditLog)
.where(
and(
eq(actionAuditLog.orgId, orgId),
gt(actionAuditLog.id, afterId)
)
)
.orderBy(actionAuditLog.id)
.limit(limit)) as Array<
Record<string, unknown> & { id: number }
>;
case "access":
return (await logsDb
.select()
.from(accessAuditLog)
.where(
and(
eq(accessAuditLog.orgId, orgId),
gt(accessAuditLog.id, afterId)
)
)
.orderBy(accessAuditLog.id)
.limit(limit)) as Array<
Record<string, unknown> & { id: number }
>;
case "connection":
return (await logsDb
.select()
.from(connectionAuditLog)
.where(
and(
eq(connectionAuditLog.orgId, orgId),
gt(connectionAuditLog.id, afterId)
)
)
.orderBy(connectionAuditLog.id)
.limit(limit)) as Array<
Record<string, unknown> & { id: number }
>;
}
}
// -------------------------------------------------------------------------
// Row → LogEvent conversion
// -------------------------------------------------------------------------
private rowToLogEvent(
logType: LogType,
row: Record<string, unknown> & { id: number }
): LogEvent {
// Determine the epoch-seconds timestamp for this row type
let timestamp: number;
switch (logType) {
case "request":
case "action":
case "access":
timestamp =
typeof row.timestamp === "number" ? row.timestamp : 0;
break;
case "connection":
timestamp =
typeof row.startedAt === "number" ? row.startedAt : 0;
break;
}
const orgId =
typeof row.orgId === "string" ? row.orgId : "";
return {
id: row.id,
logType,
orgId,
timestamp,
data: row as Record<string, unknown>
};
}
// -------------------------------------------------------------------------
// Provider factory
// -------------------------------------------------------------------------
/**
* Instantiate the correct LogDestinationProvider for the given destination
* type string. Returns `null` for unknown types.
*
* To add a new provider:
* 1. Implement `LogDestinationProvider` in a new file under `providers/`
* 2. Add a `case` here
*/
private createProvider(
type: string,
config: unknown
): LogDestinationProvider | null {
switch (type) {
case "http":
return new HttpLogDestination(config as HttpConfig);
// Future providers:
// case "datadog": return new DatadogLogDestination(config as DatadogConfig);
default:
return null;
}
}
// -------------------------------------------------------------------------
// Back-off tracking
// -------------------------------------------------------------------------
private recordFailure(destinationId: number): void {
const current = this.failures.get(destinationId) ?? {
consecutiveFailures: 0,
nextRetryAt: 0,
// Stamp the very first failure so we can measure total outage duration
firstFailedAt: Date.now()
};
current.consecutiveFailures += 1;
const scheduleIdx = Math.min(
current.consecutiveFailures - 1,
BACKOFF_SCHEDULE_MS.length - 1
);
const backoffMs = BACKOFF_SCHEDULE_MS[scheduleIdx];
current.nextRetryAt = Date.now() + backoffMs;
this.failures.set(destinationId, current);
logger.warn(
`LogStreamingManager: destination ${destinationId} failed ` +
`(consecutive #${current.consecutiveFailures}), ` +
`backing off for ${backoffMs / 1000}s`
);
}
// -------------------------------------------------------------------------
// DB helpers
// -------------------------------------------------------------------------
private async loadEnabledDestinations(): Promise<
EventStreamingDestination[]
> {
try {
return await db
.select()
.from(eventStreamingDestinations)
.where(eq(eventStreamingDestinations.enabled, true));
} catch (err) {
logger.error(
"LogStreamingManager: failed to load destinations",
err
);
return [];
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}

View File

@@ -0,0 +1,34 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { build } from "@server/build";
import { LogStreamingManager } from "./LogStreamingManager";
/**
* Module-level singleton. Importing this module is sufficient to start the
* streaming manager no explicit init call required by the caller.
*
* The manager registers a non-blocking timer (unref'd) so it will not keep
* the Node.js event loop alive on its own. Call `logStreamingManager.shutdown()`
* during graceful shutdown to drain any in-progress poll and release resources.
*/
export const logStreamingManager = new LogStreamingManager();
if (build != "saas") { // this is handled separately in the saas build, so we don't want to start it here
logStreamingManager.start();
}
export { LogStreamingManager } from "./LogStreamingManager";
export type { LogDestinationProvider } from "./providers/LogDestinationProvider";
export { HttpLogDestination } from "./providers/HttpLogDestination";
export * from "./types";

View File

@@ -0,0 +1,322 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import logger from "@server/logger";
import { LogEvent, HttpConfig, PayloadFormat } from "../types";
import { LogDestinationProvider } from "./LogDestinationProvider";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
/** Maximum time (ms) to wait for a single HTTP response. */
const REQUEST_TIMEOUT_MS = 30_000;
/** Default payload format when none is specified in the config. */
const DEFAULT_FORMAT: PayloadFormat = "json_array";
// ---------------------------------------------------------------------------
// HttpLogDestination
// ---------------------------------------------------------------------------
/**
* Forwards a batch of log events to an arbitrary HTTP endpoint via a single
* POST request per batch.
*
* **Payload format**
*
* **Payload formats** (controlled by `config.format`):
*
* - `json_array` (default) — one POST per batch, body is a JSON array:
* ```json
* [
* { "event": "request", "timestamp": "2024-01-01T00:00:00.000Z", "data": { … } },
* …
* ]
* ```
* `Content-Type: application/json`
*
* - `ndjson` — one POST per batch, body is newline-delimited JSON (one object
* per line, no outer array). Required by Splunk HEC, Elastic/OpenSearch,
* and Grafana Loki:
* ```
* {"event":"request","timestamp":"…","data":{…}}
* {"event":"action","timestamp":"…","data":{…}}
* ```
* `Content-Type: application/x-ndjson`
*
* - `json_single` — one POST **per event**, body is a plain JSON object.
* Use only for endpoints that cannot handle batches at all.
*
* With a body template each event is rendered through the template before
* serialisation. Template placeholders:
* - `{{event}}` → the LogType string ("request", "action", etc.)
* - `{{timestamp}}` → ISO-8601 UTC datetime string
* - `{{data}}` → raw inline JSON object (**no surrounding quotes**)
*
* Example template:
* ```
* { "event": "{{event}}", "ts": "{{timestamp}}", "payload": {{data}} }
* ```
*/
export class HttpLogDestination implements LogDestinationProvider {
readonly type = "http";
private readonly config: HttpConfig;
constructor(config: HttpConfig) {
this.config = config;
}
// -----------------------------------------------------------------------
// LogDestinationProvider implementation
// -----------------------------------------------------------------------
async send(events: LogEvent[]): Promise<void> {
if (events.length === 0) return;
const format = this.config.format ?? DEFAULT_FORMAT;
if (format === "json_single") {
// One HTTP POST per event send sequentially so a failure on one
// event throws and lets the manager retry the whole batch from the
// same cursor position.
for (const event of events) {
await this.postRequest(
this.buildSingleBody(event),
"application/json"
);
}
return;
}
if (format === "ndjson") {
const body = this.buildNdjsonBody(events);
await this.postRequest(body, "application/x-ndjson");
return;
}
// json_array (default)
const body = JSON.stringify(this.buildArrayPayload(events));
await this.postRequest(body, "application/json");
}
// -----------------------------------------------------------------------
// Internal HTTP sender
// -----------------------------------------------------------------------
private async postRequest(
body: string,
contentType: string
): Promise<void> {
const headers = this.buildHeaders(contentType);
const controller = new AbortController();
const timeoutHandle = setTimeout(
() => controller.abort(),
REQUEST_TIMEOUT_MS
);
let response: Response;
try {
response = await fetch(this.config.url, {
method: "POST",
headers,
body,
signal: controller.signal
});
} catch (err: unknown) {
const isAbort =
err instanceof Error && err.name === "AbortError";
if (isAbort) {
throw new Error(
`HttpLogDestination: request to "${this.config.url}" timed out after ${REQUEST_TIMEOUT_MS} ms`
);
}
const msg = err instanceof Error ? err.message : String(err);
throw new Error(
`HttpLogDestination: request to "${this.config.url}" failed ${msg}`
);
} finally {
clearTimeout(timeoutHandle);
}
if (!response.ok) {
// Try to include a snippet of the response body in the error so
// operators can diagnose auth or schema rejections.
let responseSnippet = "";
try {
const text = await response.text();
responseSnippet = text.slice(0, 300);
} catch {
// ignore best effort
}
throw new Error(
`HttpLogDestination: server at "${this.config.url}" returned ` +
`HTTP ${response.status} ${response.statusText}` +
(responseSnippet ? ` ${responseSnippet}` : "")
);
}
}
// -----------------------------------------------------------------------
// Header construction
// -----------------------------------------------------------------------
private buildHeaders(contentType: string): Record<string, string> {
const headers: Record<string, string> = {
"Content-Type": contentType
};
// Authentication
switch (this.config.authType) {
case "bearer": {
const token = this.config.bearerToken?.trim();
if (token) {
headers["Authorization"] = `Bearer ${token}`;
}
break;
}
case "basic": {
const creds = this.config.basicCredentials?.trim();
if (creds) {
const encoded = Buffer.from(creds).toString("base64");
headers["Authorization"] = `Basic ${encoded}`;
}
break;
}
case "custom": {
const name = this.config.customHeaderName?.trim();
const value = this.config.customHeaderValue ?? "";
if (name) {
headers[name] = value;
}
break;
}
case "none":
default:
// No Authorization header
break;
}
// Additional static headers (user-defined; may override Content-Type
// if the operator explicitly sets it, which is intentional).
for (const { key, value } of this.config.headers ?? []) {
const trimmedKey = key?.trim();
if (trimmedKey) {
headers[trimmedKey] = value ?? "";
}
}
return headers;
}
// -----------------------------------------------------------------------
// Payload construction
// -----------------------------------------------------------------------
/** Single default event object (no surrounding array). */
private buildEventObject(event: LogEvent): unknown {
if (this.config.useBodyTemplate && this.config.bodyTemplate?.trim()) {
return this.renderTemplate(this.config.bodyTemplate!, event);
}
return {
event: event.logType,
timestamp: epochSecondsToIso(event.timestamp),
data: event.data
};
}
/** JSON array payload used for `json_array` format. */
private buildArrayPayload(events: LogEvent[]): unknown[] {
return events.map((e) => this.buildEventObject(e));
}
/**
* NDJSON payload one JSON object per line, no outer array.
* Each line must be a complete, valid JSON object.
*/
private buildNdjsonBody(events: LogEvent[]): string {
return events
.map((e) => JSON.stringify(this.buildEventObject(e)))
.join("\n");
}
/** Single-event body used for `json_single` format. */
private buildSingleBody(event: LogEvent): string {
return JSON.stringify(this.buildEventObject(event));
}
/**
* Render a single event through the body template.
*
* The three placeholder tokens are replaced in a specific order to avoid
* accidental double-replacement:
*
* 1. `{{data}}` → raw JSON (may contain `{{` characters in values)
* 2. `{{event}}` → safe string
* 3. `{{timestamp}}` → safe ISO string
*
* If the rendered string is not valid JSON we fall back to returning it as
* a plain string so the batch still makes it out and the operator can
* inspect the template.
*/
private renderTemplate(template: string, event: LogEvent): unknown {
const isoTimestamp = epochSecondsToIso(event.timestamp);
const dataJson = JSON.stringify(event.data);
// Replace {{data}} first because its JSON value might legitimately
// contain the substrings "{{event}}" or "{{timestamp}}" inside string
// fields those should NOT be re-expanded.
const rendered = template
.replace(/\{\{data\}\}/g, dataJson)
.replace(/\{\{event\}\}/g, escapeJsonString(event.logType))
.replace(
/\{\{timestamp\}\}/g,
escapeJsonString(isoTimestamp)
);
try {
return JSON.parse(rendered);
} catch {
logger.warn(
`HttpLogDestination: body template produced invalid JSON for ` +
`event type "${event.logType}" destined for "${this.config.url}". ` +
`Sending rendered template as a raw string. ` +
`Check your template syntax specifically that {{data}} is ` +
`NOT wrapped in quotes.`
);
return rendered;
}
}
}
// ---------------------------------------------------------------------------
// Helpers
// ---------------------------------------------------------------------------
function epochSecondsToIso(epochSeconds: number): string {
return new Date(epochSeconds * 1000).toISOString();
}
/**
* Escape a string value so it can be safely substituted into the interior of
* a JSON string literal (i.e. between existing `"` quotes in the template).
* This prevents a crafted logType or timestamp from breaking out of its
* string context in the rendered template.
*/
function escapeJsonString(value: string): string {
// JSON.stringify produces `"<escaped>"` strip the outer quotes.
return JSON.stringify(value).slice(1, -1);
}

View File

@@ -0,0 +1,44 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { LogEvent } from "../types";
/**
* Common interface that every log-forwarding backend must implement.
*
* Adding a new destination type (e.g. Datadog, Splunk, Kafka) is as simple as
* creating a class that satisfies this interface and registering it inside
* LogStreamingManager.createProvider().
*/
export interface LogDestinationProvider {
/**
* The string identifier that matches the `type` column in the
* `eventStreamingDestinations` table (e.g. "http", "datadog").
*/
readonly type: string;
/**
* Forward a batch of log events to the destination.
*
* Implementations should:
* - Treat the call as atomic: either all events are accepted or an error
* is thrown so the caller can retry / back off.
* - Respect the timeout contract expected by the manager (default 30 s).
* - NOT swallow errors the manager relies on thrown exceptions to track
* failure state and apply exponential back-off.
*
* @param events A non-empty array of normalised log events to forward.
* @throws Any network, authentication, or serialisation error.
*/
send(events: LogEvent[]): Promise<void>;
}

View File

@@ -0,0 +1,134 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
// ---------------------------------------------------------------------------
// Log type identifiers
// ---------------------------------------------------------------------------
export type LogType = "request" | "action" | "access" | "connection";
export const LOG_TYPES: LogType[] = [
"request",
"action",
"access",
"connection"
];
// ---------------------------------------------------------------------------
// A normalised event ready to be forwarded to a destination
// ---------------------------------------------------------------------------
export interface LogEvent {
/** The auto-increment primary key from the source table */
id: number;
/** Which log table this event came from */
logType: LogType;
/** The organisation that owns this event */
orgId: string;
/** Unix epoch seconds taken from the record's own timestamp field */
timestamp: number;
/** Full row data from the source table, serialised as a plain object */
data: Record<string, unknown>;
}
// ---------------------------------------------------------------------------
// A batch of events destined for a single streaming target
// ---------------------------------------------------------------------------
export interface LogBatch {
destinationId: number;
logType: LogType;
events: LogEvent[];
}
// ---------------------------------------------------------------------------
// HTTP destination configuration (mirrors HttpConfig in the UI component)
// ---------------------------------------------------------------------------
export type AuthType = "none" | "bearer" | "basic" | "custom";
/**
* Controls how the batch of events is serialised into the HTTP request body.
*
* - `json_array` `[{…}, {…}]` — default; one POST per batch wrapped in a
* JSON array. Works with most generic webhooks and Datadog.
* - `ndjson` `{…}\n{…}` — newline-delimited JSON, one object per
* line. Required by Splunk HEC, Elastic/OpenSearch, Loki.
* - `json_single` one HTTP POST per event, body is a plain JSON object.
* Use only for endpoints that cannot handle batches at all.
*/
export type PayloadFormat = "json_array" | "ndjson" | "json_single";
export interface HttpConfig {
/** Human-readable label for the destination */
name: string;
/** Target URL that will receive POST requests */
url: string;
/** Authentication strategy to use */
authType: AuthType;
/** Used when authType === "bearer" */
bearerToken?: string;
/** Used when authType === "basic" must be "username:password" */
basicCredentials?: string;
/** Used when authType === "custom" header name */
customHeaderName?: string;
/** Used when authType === "custom" header value */
customHeaderValue?: string;
/** Additional static headers appended to every request */
headers: Array<{ key: string; value: string }>;
/** Whether to render a custom body template instead of the default shape */
/**
* How events are serialised into the request body.
* Defaults to `"json_array"` when absent.
*/
format?: PayloadFormat;
useBodyTemplate: boolean;
/**
* Handlebars-style template for the JSON body of each event.
*
* Supported placeholders:
* {{event}} the LogType string ("request", "action", etc.)
* {{timestamp}} ISO-8601 UTC string derived from the event's timestamp
* {{data}} raw JSON object (no surrounding quotes) of the full row
*
* Example:
* { "event": "{{event}}", "ts": "{{timestamp}}", "payload": {{data}} }
*/
bodyTemplate?: string;
}
// ---------------------------------------------------------------------------
// Per-destination per-log-type cursor (reflects the DB table)
// ---------------------------------------------------------------------------
export interface StreamingCursor {
destinationId: number;
logType: LogType;
/** The `id` of the last row that was successfully forwarded */
lastSentId: number;
/** Epoch milliseconds of the last successful send (or null if never sent) */
lastSentAt: number | null;
}
// ---------------------------------------------------------------------------
// In-memory failure / back-off state tracked per destination
// ---------------------------------------------------------------------------
export interface DestinationFailureState {
/** How many consecutive send failures have occurred */
consecutiveFailures: number;
/** Date.now() value after which the destination may be retried */
nextRetryAt: number;
/** Date.now() value of the very first failure in the current streak */
firstFailedAt: number;
}

View File

@@ -0,0 +1,138 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { db } from "@server/db";
import { eventStreamingDestinations } from "@server/db";
import { logStreamingManager } from "#private/lib/logStreaming";
import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import { OpenAPITags, registry } from "@server/openApi";
const paramsSchema = z.strictObject({
orgId: z.string().nonempty()
});
const bodySchema = z.strictObject({
type: z.string().nonempty(),
config: z.string().nonempty(),
enabled: z.boolean().optional().default(true),
sendConnectionLogs: z.boolean().optional().default(false),
sendRequestLogs: z.boolean().optional().default(false),
sendActionLogs: z.boolean().optional().default(false),
sendAccessLogs: z.boolean().optional().default(false)
});
export type CreateEventStreamingDestinationResponse = {
destinationId: number;
};
registry.registerPath({
method: "put",
path: "/org/{orgId}/event-streaming-destination",
description: "Create an event streaming destination for a specific organization.",
tags: [OpenAPITags.Org],
request: {
params: paramsSchema,
body: {
content: {
"application/json": {
schema: bodySchema
}
}
}
},
responses: {}
});
export async function createEventStreamingDestination(
req: Request,
res: Response,
next: NextFunction
): Promise<any> {
try {
const parsedParams = paramsSchema.safeParse(req.params);
if (!parsedParams.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedParams.error).toString()
)
);
}
const { orgId } = parsedParams.data;
const parsedBody = bodySchema.safeParse(req.body);
if (!parsedBody.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedBody.error).toString()
)
);
}
const { type, config, enabled } = parsedBody.data;
const now = Date.now();
const [destination] = await db
.insert(eventStreamingDestinations)
.values({
orgId,
type,
config,
enabled,
createdAt: now,
updatedAt: now,
sendAccessLogs: parsedBody.data.sendAccessLogs,
sendActionLogs: parsedBody.data.sendActionLogs,
sendConnectionLogs: parsedBody.data.sendConnectionLogs,
sendRequestLogs: parsedBody.data.sendRequestLogs
})
.returning();
// Seed cursors at the current max row id for every log type so this
// destination only receives events written *after* it was created.
// Fire-and-forget: a failure here is non-fatal; the manager has a lazy
// fallback that will seed at the next poll if these rows are missing.
logStreamingManager
.initializeCursorsForDestination(destination.destinationId, orgId)
.catch((err) =>
logger.error(
"createEventStreamingDestination: failed to initialise streaming cursors",
err
)
);
return response<CreateEventStreamingDestinationResponse>(res, {
data: {
destinationId: destination.destinationId
},
success: true,
error: false,
message: "Event streaming destination created successfully",
status: HttpCode.CREATED
});
} catch (error) {
logger.error(error);
return next(
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
);
}
}

View File

@@ -0,0 +1,103 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { db } from "@server/db";
import { eventStreamingDestinations } from "@server/db";
import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import { OpenAPITags, registry } from "@server/openApi";
import { and, eq } from "drizzle-orm";
const paramsSchema = z
.object({
orgId: z.string().nonempty(),
destinationId: z.coerce.number<number>()
})
.strict();
registry.registerPath({
method: "delete",
path: "/org/{orgId}/event-streaming-destination/{destinationId}",
description: "Delete an event streaming destination for a specific organization.",
tags: [OpenAPITags.Org],
request: {
params: paramsSchema
},
responses: {}
});
export async function deleteEventStreamingDestination(
req: Request,
res: Response,
next: NextFunction
): Promise<any> {
try {
const parsedParams = paramsSchema.safeParse(req.params);
if (!parsedParams.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedParams.error).toString()
)
);
}
const { orgId, destinationId } = parsedParams.data;
const [existing] = await db
.select()
.from(eventStreamingDestinations)
.where(
and(
eq(eventStreamingDestinations.destinationId, destinationId),
eq(eventStreamingDestinations.orgId, orgId)
)
);
if (!existing) {
return next(
createHttpError(
HttpCode.NOT_FOUND,
"Event streaming destination not found"
)
);
}
await db
.delete(eventStreamingDestinations)
.where(
and(
eq(eventStreamingDestinations.destinationId, destinationId),
eq(eventStreamingDestinations.orgId, orgId)
)
);
return response<null>(res, {
data: null,
success: true,
error: false,
message: "Event streaming destination deleted successfully",
status: HttpCode.OK
});
} catch (error) {
logger.error(error);
return next(
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
);
}
}

View File

@@ -0,0 +1,17 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
export * from "./createEventStreamingDestination";
export * from "./updateEventStreamingDestination";
export * from "./deleteEventStreamingDestination";
export * from "./listEventStreamingDestinations";

View File

@@ -0,0 +1,144 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { db } from "@server/db";
import { eventStreamingDestinations } from "@server/db";
import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import { OpenAPITags, registry } from "@server/openApi";
import { eq, sql } from "drizzle-orm";
const paramsSchema = z.strictObject({
orgId: z.string().nonempty()
});
const querySchema = z.strictObject({
limit: z
.string()
.optional()
.default("1000")
.transform(Number)
.pipe(z.int().nonnegative()),
offset: z
.string()
.optional()
.default("0")
.transform(Number)
.pipe(z.int().nonnegative())
});
export type ListEventStreamingDestinationsResponse = {
destinations: {
destinationId: number;
orgId: string;
type: string;
config: string;
enabled: boolean;
createdAt: number;
updatedAt: number;
sendConnectionLogs: boolean;
sendRequestLogs: boolean;
sendActionLogs: boolean;
sendAccessLogs: boolean;
}[];
pagination: {
total: number;
limit: number;
offset: number;
};
};
async function query(orgId: string, limit: number, offset: number) {
const res = await db
.select()
.from(eventStreamingDestinations)
.where(eq(eventStreamingDestinations.orgId, orgId))
.orderBy(sql`${eventStreamingDestinations.createdAt} DESC`)
.limit(limit)
.offset(offset);
return res;
}
registry.registerPath({
method: "get",
path: "/org/{orgId}/event-streaming-destination",
description: "List all event streaming destinations for a specific organization.",
tags: [OpenAPITags.Org],
request: {
query: querySchema,
params: paramsSchema
},
responses: {}
});
export async function listEventStreamingDestinations(
req: Request,
res: Response,
next: NextFunction
): Promise<any> {
try {
const parsedParams = paramsSchema.safeParse(req.params);
if (!parsedParams.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedParams.error).toString()
)
);
}
const { orgId } = parsedParams.data;
const parsedQuery = querySchema.safeParse(req.query);
if (!parsedQuery.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedQuery.error).toString()
)
);
}
const { limit, offset } = parsedQuery.data;
const list = await query(orgId, limit, offset);
const [{ count }] = await db
.select({ count: sql<number>`count(*)` })
.from(eventStreamingDestinations)
.where(eq(eventStreamingDestinations.orgId, orgId));
return response<ListEventStreamingDestinationsResponse>(res, {
data: {
destinations: list,
pagination: {
total: count,
limit,
offset
}
},
success: true,
error: false,
message: "Event streaming destinations retrieved successfully",
status: HttpCode.OK
});
} catch (error) {
logger.error(error);
return next(
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
);
}
}

View File

@@ -0,0 +1,153 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import { db } from "@server/db";
import { eventStreamingDestinations } from "@server/db";
import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import { OpenAPITags, registry } from "@server/openApi";
import { and, eq } from "drizzle-orm";
const paramsSchema = z
.object({
orgId: z.string().nonempty(),
destinationId: z.coerce.number<number>()
})
.strict();
const bodySchema = z.strictObject({
type: z.string().optional(),
config: z.string().optional(),
enabled: z.boolean().optional(),
sendConnectionLogs: z.boolean().optional(),
sendRequestLogs: z.boolean().optional(),
sendActionLogs: z.boolean().optional(),
sendAccessLogs: z.boolean().optional()
});
export type UpdateEventStreamingDestinationResponse = {
destinationId: number;
};
registry.registerPath({
method: "post",
path: "/org/{orgId}/event-streaming-destination/{destinationId}",
description: "Update an event streaming destination for a specific organization.",
tags: [OpenAPITags.Org],
request: {
params: paramsSchema,
body: {
content: {
"application/json": {
schema: bodySchema
}
}
}
},
responses: {}
});
export async function updateEventStreamingDestination(
req: Request,
res: Response,
next: NextFunction
): Promise<any> {
try {
const parsedParams = paramsSchema.safeParse(req.params);
if (!parsedParams.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedParams.error).toString()
)
);
}
const { orgId, destinationId } = parsedParams.data;
const parsedBody = bodySchema.safeParse(req.body);
if (!parsedBody.success) {
return next(
createHttpError(
HttpCode.BAD_REQUEST,
fromError(parsedBody.error).toString()
)
);
}
const [existing] = await db
.select()
.from(eventStreamingDestinations)
.where(
and(
eq(eventStreamingDestinations.destinationId, destinationId),
eq(eventStreamingDestinations.orgId, orgId)
)
);
if (!existing) {
return next(
createHttpError(
HttpCode.NOT_FOUND,
"Event streaming destination not found"
)
);
}
const { type, config, enabled, sendAccessLogs, sendActionLogs, sendConnectionLogs, sendRequestLogs } = parsedBody.data;
const updateData: Record<string, unknown> = {
updatedAt: Date.now()
};
if (type !== undefined) updateData.type = type;
if (config !== undefined) updateData.config = config;
if (enabled !== undefined) updateData.enabled = enabled;
if (sendAccessLogs !== undefined) updateData.sendAccessLogs = sendAccessLogs;
if (sendActionLogs !== undefined) updateData.sendActionLogs = sendActionLogs;
if (sendConnectionLogs !== undefined) updateData.sendConnectionLogs = sendConnectionLogs;
if (sendRequestLogs !== undefined) updateData.sendRequestLogs = sendRequestLogs;
await db
.update(eventStreamingDestinations)
.set(updateData)
.where(
and(
eq(eventStreamingDestinations.destinationId, destinationId),
eq(eventStreamingDestinations.orgId, orgId)
)
);
return response<UpdateEventStreamingDestinationResponse>(res, {
data: {
destinationId
},
success: true,
error: false,
message: "Event streaming destination updated successfully",
status: HttpCode.OK
});
} catch (error) {
logger.error(error);
return next(
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
);
}
}

View File

@@ -28,6 +28,7 @@ import * as approval from "#private/routers/approvals";
import * as ssh from "#private/routers/ssh"; import * as ssh from "#private/routers/ssh";
import * as user from "#private/routers/user"; import * as user from "#private/routers/user";
import * as siteProvisioning from "#private/routers/siteProvisioning"; import * as siteProvisioning from "#private/routers/siteProvisioning";
import * as eventStreamingDestination from "#private/routers/eventStreamingDestination";
import { import {
verifyOrgAccess, verifyOrgAccess,
@@ -615,3 +616,39 @@ authenticated.patch(
logActionAudit(ActionsEnum.updateSiteProvisioningKey), logActionAudit(ActionsEnum.updateSiteProvisioningKey),
siteProvisioning.updateSiteProvisioningKey siteProvisioning.updateSiteProvisioningKey
); );
authenticated.put(
"/org/:orgId/event-streaming-destination",
verifyValidLicense,
verifyOrgAccess,
verifyLimits,
verifyUserHasAction(ActionsEnum.createEventStreamingDestination),
logActionAudit(ActionsEnum.createEventStreamingDestination),
eventStreamingDestination.createEventStreamingDestination
);
authenticated.post(
"/org/:orgId/event-streaming-destination/:destinationId",
verifyValidLicense,
verifyOrgAccess,
verifyLimits,
verifyUserHasAction(ActionsEnum.updateEventStreamingDestination),
logActionAudit(ActionsEnum.updateEventStreamingDestination),
eventStreamingDestination.updateEventStreamingDestination
);
authenticated.delete(
"/org/:orgId/event-streaming-destination/:destinationId",
verifyValidLicense,
verifyOrgAccess,
verifyUserHasAction(ActionsEnum.deleteEventStreamingDestination),
logActionAudit(ActionsEnum.deleteEventStreamingDestination),
eventStreamingDestination.deleteEventStreamingDestination
);
authenticated.get(
"/org/:orgId/event-streaming-destinations",
verifyOrgAccess,
verifyUserHasAction(ActionsEnum.listEventStreamingDestinations),
eventStreamingDestination.listEventStreamingDestinations
);

View File

@@ -1,27 +1,33 @@
import { db, logsDb } from "@server/db"; /*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { db } from "@server/db";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { connectionAuditLog, sites, Newt, clients, orgs } from "@server/db"; import { sites, Newt, clients, orgs } from "@server/db";
import { and, eq, lt, inArray } from "drizzle-orm"; import { and, eq, inArray } from "drizzle-orm";
import logger from "@server/logger"; import logger from "@server/logger";
import { inflate } from "zlib"; import { inflate } from "zlib";
import { promisify } from "util"; import { promisify } from "util";
import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs"; import {
logConnectionAudit,
flushConnectionLogToDb,
cleanUpOldLogs
} from "#private/lib/logConnectionAudit";
export { flushConnectionLogToDb, cleanUpOldLogs };
const zlibInflate = promisify(inflate); const zlibInflate = promisify(inflate);
// Retry configuration for deadlock handling
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50;
// How often to flush accumulated connection log data to the database
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
// Maximum number of records to buffer before forcing a flush
const MAX_BUFFERED_RECORDS = 500;
// Maximum number of records to insert in a single batch
const INSERT_BATCH_SIZE = 100;
interface ConnectionSessionData { interface ConnectionSessionData {
sessionId: string; sessionId: string;
resourceId: number; resourceId: number;
@@ -34,64 +40,6 @@ interface ConnectionSessionData {
bytesRx?: number; bytesRx?: number;
} }
interface ConnectionLogRecord {
sessionId: string;
siteResourceId: number;
orgId: string;
siteId: number;
clientId: number | null;
userId: string | null;
sourceAddr: string;
destAddr: string;
protocol: string;
startedAt: number; // epoch seconds
endedAt: number | null;
bytesTx: number | null;
bytesRx: number | null;
}
// In-memory buffer of records waiting to be flushed
let buffer: ConnectionLogRecord[] = [];
/**
* Check if an error is a deadlock error
*/
function isDeadlockError(error: any): boolean {
return (
error?.code === "40P01" ||
error?.cause?.code === "40P01" ||
(error?.message && error.message.includes("deadlock"))
);
}
/**
* Execute a function with retry logic for deadlock handling
*/
async function withDeadlockRetry<T>(
operation: () => Promise<T>,
context: string
): Promise<T> {
let attempt = 0;
while (true) {
try {
return await operation();
} catch (error: any) {
if (isDeadlockError(error) && attempt < MAX_RETRIES) {
attempt++;
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
const jitter = Math.random() * baseDelay;
const delay = baseDelay + jitter;
logger.warn(
`Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
);
await new Promise((resolve) => setTimeout(resolve, delay));
continue;
}
throw error;
}
}
}
/** /**
* Decompress a base64-encoded zlib-compressed string into parsed JSON. * Decompress a base64-encoded zlib-compressed string into parsed JSON.
*/ */
@@ -125,105 +73,6 @@ function toEpochSeconds(isoString: string | undefined | null): number | null {
return Math.floor(ms / 1000); return Math.floor(ms / 1000);
} }
/**
* Flush all buffered connection log records to the database.
*
* Swaps out the buffer before writing so that any records added during the
* flush are captured in the new buffer rather than being lost. Entries that
* fail to write are re-queued back into the buffer so they will be retried
* on the next flush.
*
* This function is exported so that the application's graceful-shutdown
* cleanup handler can call it before the process exits.
*/
export async function flushConnectionLogToDb(): Promise<void> {
if (buffer.length === 0) {
return;
}
// Atomically swap out the buffer so new data keeps flowing in
const snapshot = buffer;
buffer = [];
logger.debug(
`Flushing ${snapshot.length} connection log record(s) to the database`
);
// Insert in batches to avoid overly large SQL statements
for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) {
const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE);
try {
await withDeadlockRetry(async () => {
await logsDb.insert(connectionAuditLog).values(batch);
}, `flush connection log batch (${batch.length} records)`);
} catch (error) {
logger.error(
`Failed to flush connection log batch of ${batch.length} records:`,
error
);
// Re-queue the failed batch so it is retried on the next flush
buffer = [...batch, ...buffer];
// Cap buffer to prevent unbounded growth if DB is unreachable
if (buffer.length > MAX_BUFFERED_RECORDS * 5) {
const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5;
buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5);
logger.warn(
`Connection log buffer overflow, dropped ${dropped} oldest records`
);
}
// Stop trying further batches from this snapshot — they'll be
// picked up by the next flush via the re-queued records above
const remaining = snapshot.slice(i + INSERT_BATCH_SIZE);
if (remaining.length > 0) {
buffer = [...remaining, ...buffer];
}
break;
}
}
}
const flushTimer = setInterval(async () => {
try {
await flushConnectionLogToDb();
} catch (error) {
logger.error(
"Unexpected error during periodic connection log flush:",
error
);
}
}, FLUSH_INTERVAL_MS);
// Calling unref() means this timer will not keep the Node.js event loop alive
// on its own — the process can still exit normally when there is no other work
// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly
// before process.exit(), so no data is lost.
flushTimer.unref();
export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
try {
await logsDb
.delete(connectionAuditLog)
.where(
and(
lt(connectionAuditLog.startedAt, cutoffTimestamp),
eq(connectionAuditLog.orgId, orgId)
)
);
// logger.debug(
// `Cleaned up connection audit logs older than ${retentionDays} days`
// );
} catch (error) {
logger.error("Error cleaning up old connection audit logs:", error);
}
}
export const handleConnectionLogMessage: MessageHandler = async (context) => { export const handleConnectionLogMessage: MessageHandler = async (context) => {
const { message, client } = context; const { message, client } = context;
const newt = client as Newt; const newt = client as Newt;
@@ -277,13 +126,16 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
return; return;
} }
logger.debug(`Sessions: ${JSON.stringify(sessions)}`) logger.debug(`Sessions: ${JSON.stringify(sessions)}`);
// Build a map from sourceAddr → { clientId, userId } by querying clients // Build a map from sourceAddr → { clientId, userId } by querying clients
// whose subnet field matches exactly. Client subnets are stored with the // whose subnet field matches exactly. Client subnets are stored with the
// org's CIDR suffix (e.g. "100.90.128.5/16"), so we reconstruct that from // org's CIDR suffix (e.g. "100.90.128.5/16"), so we reconstruct that from
// each unique sourceAddr + the org's CIDR suffix and do a targeted IN query. // each unique sourceAddr + the org's CIDR suffix and do a targeted IN query.
const ipToClient = new Map<string, { clientId: number; userId: string | null }>(); const ipToClient = new Map<
string,
{ clientId: number; userId: string | null }
>();
if (cidrSuffix) { if (cidrSuffix) {
// Collect unique source addresses so we only query for what we need // Collect unique source addresses so we only query for what we need
@@ -296,13 +148,11 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
if (uniqueSourceAddrs.size > 0) { if (uniqueSourceAddrs.size > 0) {
// Construct the exact subnet strings as stored in the DB // Construct the exact subnet strings as stored in the DB
const subnetQueries = Array.from(uniqueSourceAddrs).map( const subnetQueries = Array.from(uniqueSourceAddrs).map((addr) => {
(addr) => {
// Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1")
const ip = addr.includes(":") ? addr.split(":")[0] : addr; const ip = addr.includes(":") ? addr.split(":")[0] : addr;
return `${ip}${cidrSuffix}`; return `${ip}${cidrSuffix}`;
} });
);
logger.debug(`Subnet queries: ${JSON.stringify(subnetQueries)}`); logger.debug(`Subnet queries: ${JSON.stringify(subnetQueries)}`);
@@ -322,13 +172,18 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
for (const c of matchedClients) { for (const c of matchedClients) {
const ip = c.subnet.split("/")[0]; const ip = c.subnet.split("/")[0];
logger.debug(`Client ${c.clientId} subnet ${c.subnet} matches ${ip}`); logger.debug(
ipToClient.set(ip, { clientId: c.clientId, userId: c.userId }); `Client ${c.clientId} subnet ${c.subnet} matches ${ip}`
);
ipToClient.set(ip, {
clientId: c.clientId,
userId: c.userId
});
} }
} }
} }
// Convert to DB records and add to the buffer // Convert to DB records and hand off to the audit logger
for (const session of sessions) { for (const session of sessions) {
// Validate required fields // Validate required fields
if ( if (
@@ -356,11 +211,12 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
// client's IP on the WireGuard network, which corresponds to the IP // client's IP on the WireGuard network, which corresponds to the IP
// portion of the client's subnet CIDR (e.g. "100.90.128.5/24"). // portion of the client's subnet CIDR (e.g. "100.90.128.5/24").
// Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1")
const sourceIp = session.sourceAddr.includes(":") ? session.sourceAddr.split(":")[0] : session.sourceAddr; const sourceIp = session.sourceAddr.includes(":")
? session.sourceAddr.split(":")[0]
: session.sourceAddr;
const clientInfo = ipToClient.get(sourceIp) ?? null; const clientInfo = ipToClient.get(sourceIp) ?? null;
logConnectionAudit({
buffer.push({
sessionId: session.sessionId, sessionId: session.sessionId,
siteResourceId: session.resourceId, siteResourceId: session.resourceId,
orgId, orgId,
@@ -380,15 +236,4 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
logger.debug( logger.debug(
`Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})` `Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})`
); );
// If the buffer has grown large enough, trigger an immediate flush
if (buffer.length >= MAX_BUFFERED_RECORDS) {
// Fire and forget — errors are handled inside flushConnectionLogToDb
flushConnectionLogToDb().catch((error) => {
logger.error(
"Unexpected error during size-triggered connection log flush:",
error
);
});
}
}; };

View File

@@ -1 +1,14 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
export * from "./handleConnectionLogMessage"; export * from "./handleConnectionLogMessage";

View File

@@ -3,13 +3,12 @@ import { authCookieHeader } from "@app/lib/api/cookies";
import { AxiosResponse } from "axios"; import { AxiosResponse } from "axios";
import InvitationsTable, { import InvitationsTable, {
InvitationRow InvitationRow
} from "../../../../../components/InvitationsTable"; } from "@app/components/InvitationsTable";
import { GetOrgResponse } from "@server/routers/org"; import { GetOrgResponse } from "@server/routers/org";
import { cache } from "react"; import { cache } from "react";
import OrgProvider from "@app/providers/OrgProvider"; import OrgProvider from "@app/providers/OrgProvider";
import UserProvider from "@app/providers/UserProvider"; import UserProvider from "@app/providers/UserProvider";
import { verifySession } from "@app/lib/auth/verifySession"; import { verifySession } from "@app/lib/auth/verifySession";
import AccessPageHeaderAndNav from "../../../../../components/AccessPageHeaderAndNav";
import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";

View File

@@ -3,13 +3,12 @@ import { authCookieHeader } from "@app/lib/api/cookies";
import { getUserDisplayName } from "@app/lib/getUserDisplayName"; import { getUserDisplayName } from "@app/lib/getUserDisplayName";
import { ListUsersResponse } from "@server/routers/user"; import { ListUsersResponse } from "@server/routers/user";
import { AxiosResponse } from "axios"; import { AxiosResponse } from "axios";
import UsersTable, { UserRow } from "../../../../../components/UsersTable"; import UsersTable, { UserRow } from "@app/components/UsersTable";
import { GetOrgResponse } from "@server/routers/org"; import { GetOrgResponse } from "@server/routers/org";
import { cache } from "react"; import { cache } from "react";
import OrgProvider from "@app/providers/OrgProvider"; import OrgProvider from "@app/providers/OrgProvider";
import UserProvider from "@app/providers/UserProvider"; import UserProvider from "@app/providers/UserProvider";
import { verifySession } from "@app/lib/auth/verifySession"; import { verifySession } from "@app/lib/auth/verifySession";
import AccessPageHeaderAndNav from "../../../../../components/AccessPageHeaderAndNav";
import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";

View File

@@ -4,7 +4,7 @@ import { AxiosResponse } from "axios";
import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import OrgApiKeysTable, { import OrgApiKeysTable, {
OrgApiKeyRow OrgApiKeyRow
} from "../../../../components/OrgApiKeysTable"; } from "@app/components/OrgApiKeysTable";
import { ListOrgApiKeysResponse } from "@server/routers/apiKeys"; import { ListOrgApiKeysResponse } from "@server/routers/apiKeys";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";

View File

@@ -2,7 +2,7 @@ import { internal } from "@app/lib/api";
import { authCookieHeader } from "@app/lib/api/cookies"; import { authCookieHeader } from "@app/lib/api/cookies";
import { AxiosResponse } from "axios"; import { AxiosResponse } from "axios";
import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import DomainsTable, { DomainRow } from "../../../../components/DomainsTable"; import DomainsTable, { DomainRow } from "@app/components/DomainsTable";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";
import { cache } from "react"; import { cache } from "react";
import { GetOrgResponse } from "@server/routers/org"; import { GetOrgResponse } from "@server/routers/org";

View File

@@ -0,0 +1,478 @@
"use client";
import { useState, useEffect, useCallback } from "react";
import { useParams } from "next/navigation";
import { createApiClient, formatAxiosError } from "@app/lib/api";
import { useEnvContext } from "@app/hooks/useEnvContext";
import { toast } from "@app/hooks/useToast";
import { usePaidStatus } from "@app/hooks/usePaidStatus";
import { PaidFeaturesAlert } from "@app/components/PaidFeaturesAlert";
import ConfirmDeleteDialog from "@app/components/ConfirmDeleteDialog";
import { tierMatrix, TierFeature } from "@server/lib/billing/tierMatrix";
import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import {
Credenza,
CredenzaBody,
CredenzaClose,
CredenzaContent,
CredenzaDescription,
CredenzaFooter,
CredenzaHeader,
CredenzaTitle
} from "@app/components/Credenza";
import { Button } from "@app/components/ui/button";
import { Switch } from "@app/components/ui/switch";
import { Globe, MoreHorizontal, Plus } from "lucide-react";
import { AxiosResponse } from "axios";
import { build } from "@server/build";
import Image from "next/image";
import { StrategySelect, StrategyOption } from "@app/components/StrategySelect";
import {
DropdownMenu,
DropdownMenuContent,
DropdownMenuItem,
DropdownMenuTrigger
} from "@app/components/ui/dropdown-menu";
import {
Destination,
HttpDestinationCredenza,
parseHttpConfig
} from "@app/components/HttpDestinationCredenza";
// ── Re-export Destination so the rest of the file can use it ──────────────────
interface ListDestinationsResponse {
destinations: Destination[];
pagination: {
total: number;
limit: number;
offset: number;
};
}
// ── Destination card ───────────────────────────────────────────────────────────
interface DestinationCardProps {
destination: Destination;
onToggle: (id: number, enabled: boolean) => void;
onEdit: (destination: Destination) => void;
onDelete: (destination: Destination) => void;
isToggling: boolean;
disabled?: boolean;
}
function DestinationCard({
destination,
onToggle,
onEdit,
onDelete,
isToggling,
disabled = false
}: DestinationCardProps) {
const cfg = parseHttpConfig(destination.config);
return (
<div className="relative flex flex-col rounded-lg border bg-card text-card-foreground p-5 gap-3">
{/* Top row: icon + name/type + toggle */}
<div className="flex items-start justify-between gap-3">
<div className="flex items-center gap-3 min-w-0">
{/* Squirkle icon: gray outer → white inner → black globe */}
<div className="shrink-0 flex items-center justify-center w-10 h-10 rounded-2xl bg-muted">
<div className="flex items-center justify-center w-6 h-6 rounded-xl bg-white shadow-sm">
<Globe className="h-3.5 w-3.5 text-black" />
</div>
</div>
<div className="min-w-0">
<p className="font-semibold text-sm leading-tight truncate">
{cfg.name || "Unnamed destination"}
</p>
<p className="text-xs text-muted-foreground truncate mt-0.5">
HTTP
</p>
</div>
</div>
<Switch
checked={destination.enabled}
onCheckedChange={(v) =>
onToggle(destination.destinationId, v)
}
disabled={isToggling || disabled}
className="shrink-0 mt-0.5"
/>
</div>
{/* URL preview */}
<p className="text-xs text-muted-foreground truncate">
{cfg.url || (
<span className="italic">No URL configured</span>
)}
</p>
{/* Footer: edit button + three-dots menu */}
<div className="mt-auto pt-5 flex gap-2">
<Button
variant="outline"
onClick={() => onEdit(destination)}
disabled={disabled}
className="flex-1"
>
Edit
</Button>
<DropdownMenu>
<DropdownMenuTrigger asChild>
<Button
variant="outline"
size="icon"
className="h-9 w-9 shrink-0"
disabled={disabled}
>
<MoreHorizontal className="h-4 w-4" />
</Button>
</DropdownMenuTrigger>
<DropdownMenuContent align="end">
<DropdownMenuItem
className="text-destructive focus:text-destructive"
onClick={() => onDelete(destination)}
>
Delete
</DropdownMenuItem>
</DropdownMenuContent>
</DropdownMenu>
</div>
</div>
);
}
// ── Add destination card ───────────────────────────────────────────────────────
function AddDestinationCard({ onClick }: { onClick: () => void }) {
return (
<button
type="button"
onClick={onClick}
className="flex flex-col items-center justify-center rounded-lg border-2 border-dashed border-border bg-transparent transition-colors p-5 min-h-35 w-full text-muted-foreground hover:border-primary hover:text-primary hover:bg-primary/5 cursor-pointer"
>
<div className="flex flex-col items-center gap-2">
<div className="flex items-center justify-center w-9 h-9 rounded-md border-2 border-dashed border-current">
<Plus className="h-4 w-4" />
</div>
<span className="text-sm font-medium">Add Destination</span>
</div>
</button>
);
}
// ── Destination type picker ────────────────────────────────────────────────────
type DestinationType = "http" | "s3" | "datadog";
const destinationTypeOptions: ReadonlyArray<StrategyOption<DestinationType>> = [
{
id: "http",
title: "HTTP Webhook",
description:
"Send events to any HTTP endpoint with flexible authentication and templating.",
icon: <Globe className="h-6 w-6" />
},
{
id: "s3",
title: "Amazon S3",
description:
"Stream events to an S3-compatible object storage bucket. Coming soon.",
disabled: true,
icon: (
<Image
src="/third-party/s3.png"
alt="Amazon S3"
width={24}
height={24}
className="rounded-sm"
/>
)
},
{
id: "datadog",
title: "Datadog",
description:
"Forward events directly to your Datadog account. Coming soon.",
disabled: true,
icon: (
<Image
src="/third-party/dd.png"
alt="Datadog"
width={24}
height={24}
className="rounded-sm"
/>
)
}
];
interface DestinationTypePickerProps {
open: boolean;
onOpenChange: (open: boolean) => void;
onSelect: (type: DestinationType) => void;
isPaywalled?: boolean;
}
function DestinationTypePicker({
open,
onOpenChange,
onSelect,
isPaywalled = false
}: DestinationTypePickerProps) {
const [selected, setSelected] = useState<DestinationType>("http");
useEffect(() => {
if (open) setSelected("http");
}, [open]);
return (
<Credenza open={open} onOpenChange={onOpenChange}>
<CredenzaContent className="sm:max-w-lg">
<CredenzaHeader>
<CredenzaTitle>Add Destination</CredenzaTitle>
<CredenzaDescription>
Choose a destination type to get started.
</CredenzaDescription>
</CredenzaHeader>
<CredenzaBody>
<div className={isPaywalled ? "pointer-events-none opacity-50" : ""}>
<StrategySelect
options={destinationTypeOptions}
value={selected}
onChange={setSelected}
cols={1}
/>
</div>
</CredenzaBody>
<CredenzaFooter>
<CredenzaClose asChild>
<Button variant="outline">Cancel</Button>
</CredenzaClose>
<Button
onClick={() => onSelect(selected)}
disabled={isPaywalled}
>
Continue
</Button>
</CredenzaFooter>
</CredenzaContent>
</Credenza>
);
}
// ── Main page ──────────────────────────────────────────────────────────────────
export default function StreamingDestinationsPage() {
const { orgId } = useParams() as { orgId: string };
const api = createApiClient(useEnvContext());
const { isPaidUser } = usePaidStatus();
const isEnterprise = isPaidUser(tierMatrix[TierFeature.SIEM]);
const [destinations, setDestinations] = useState<Destination[]>([]);
const [loading, setLoading] = useState(true);
const [modalOpen, setModalOpen] = useState(false);
const [typePickerOpen, setTypePickerOpen] = useState(false);
const [editingDestination, setEditingDestination] =
useState<Destination | null>(null);
const [togglingIds, setTogglingIds] = useState<Set<number>>(new Set());
// Delete state
const [deleteTarget, setDeleteTarget] = useState<Destination | null>(null);
const [deleteDialogOpen, setDeleteDialogOpen] = useState(false);
const [deleting, setDeleting] = useState(false);
const loadDestinations = useCallback(async () => {
if (build == "oss") {
setDestinations([]);
setLoading(false);
return;
}
try {
const res = await api.get<AxiosResponse<ListDestinationsResponse>>(
`/org/${orgId}/event-streaming-destinations`
);
setDestinations(res.data.data.destinations ?? []);
} catch (e) {
toast({
variant: "destructive",
title: "Failed to load destinations",
description: formatAxiosError(
e,
"An unexpected error occurred."
)
});
} finally {
setLoading(false);
}
}, [orgId]);
useEffect(() => {
loadDestinations();
}, [loadDestinations]);
const handleToggle = async (destinationId: number, enabled: boolean) => {
// Optimistic update
setDestinations((prev) =>
prev.map((d) =>
d.destinationId === destinationId ? { ...d, enabled } : d
)
);
setTogglingIds((prev) => new Set(prev).add(destinationId));
try {
await api.post(
`/org/${orgId}/event-streaming-destination/${destinationId}`,
{ enabled }
);
} catch (e) {
// Revert on failure
setDestinations((prev) =>
prev.map((d) =>
d.destinationId === destinationId
? { ...d, enabled: !enabled }
: d
)
);
toast({
variant: "destructive",
title: "Failed to update destination",
description: formatAxiosError(
e,
"An unexpected error occurred."
)
});
} finally {
setTogglingIds((prev) => {
const next = new Set(prev);
next.delete(destinationId);
return next;
});
}
};
const handleDeleteCard = (destination: Destination) => {
setDeleteTarget(destination);
setDeleteDialogOpen(true);
};
const handleDeleteConfirm = async () => {
if (!deleteTarget) return;
setDeleting(true);
try {
await api.delete(
`/org/${orgId}/event-streaming-destination/${deleteTarget.destinationId}`
);
toast({ title: "Destination deleted successfully" });
setDeleteDialogOpen(false);
setDeleteTarget(null);
loadDestinations();
} catch (e) {
toast({
variant: "destructive",
title: "Failed to delete destination",
description: formatAxiosError(
e,
"An unexpected error occurred."
)
});
} finally {
setDeleting(false);
}
};
const openCreate = () => {
setTypePickerOpen(true);
};
const handleTypePicked = (_type: DestinationType) => {
setTypePickerOpen(false);
setEditingDestination(null);
setModalOpen(true);
};
const openEdit = (destination: Destination) => {
setEditingDestination(destination);
setModalOpen(true);
};
return (
<>
<SettingsSectionTitle
title="Event Streaming"
description="Stream events from your organization to external destinations in real time."
/>
<PaidFeaturesAlert tiers={tierMatrix[TierFeature.SIEM]} />
{loading ? (
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 xl:grid-cols-4 gap-4">
{Array.from({ length: 4 }).map((_, i) => (
<div
key={i}
className="rounded-lg border bg-card p-5 min-h-36 animate-pulse"
/>
))}
</div>
) : (
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-3 xl:grid-cols-4 gap-4">
{destinations.map((dest) => (
<DestinationCard
key={dest.destinationId}
destination={dest}
onToggle={handleToggle}
onEdit={openEdit}
onDelete={handleDeleteCard}
isToggling={togglingIds.has(dest.destinationId)}
disabled={!isEnterprise}
/>
))}
{/* Add card is always clickable — paywall is enforced inside the picker */}
<AddDestinationCard onClick={openCreate} />
</div>
)}
<DestinationTypePicker
open={typePickerOpen}
onOpenChange={setTypePickerOpen}
onSelect={handleTypePicked}
isPaywalled={!isEnterprise}
/>
<HttpDestinationCredenza
open={modalOpen}
onOpenChange={setModalOpen}
editing={editingDestination}
orgId={orgId}
onSaved={loadDestinations}
/>
{deleteTarget && (
<ConfirmDeleteDialog
open={deleteDialogOpen}
setOpen={(v) => {
setDeleteDialogOpen(v);
if (!v) setDeleteTarget(null);
}}
string={
parseHttpConfig(deleteTarget.config).name || "delete"
}
title="Delete Destination"
dialog={
<p className="text-sm text-muted-foreground">
Are you sure you want to delete{" "}
<span className="font-semibold text-foreground">
{parseHttpConfig(deleteTarget.config).name ||
"this destination"}
</span>
? All configuration will be permanently removed.
</p>
}
buttonText="Delete Destination"
onConfirm={handleDeleteConfirm}
/>
)}
</>
);
}

View File

@@ -4,7 +4,7 @@ import { AxiosResponse } from "axios";
import { PaidFeaturesAlert } from "@app/components/PaidFeaturesAlert"; import { PaidFeaturesAlert } from "@app/components/PaidFeaturesAlert";
import SiteProvisioningKeysTable, { import SiteProvisioningKeysTable, {
SiteProvisioningKeyRow SiteProvisioningKeyRow
} from "../../../../../components/SiteProvisioningKeysTable"; } from "@app/components/SiteProvisioningKeysTable";
import { ListSiteProvisioningKeysResponse } from "@server/routers/siteProvisioning/types"; import { ListSiteProvisioningKeysResponse } from "@server/routers/siteProvisioning/types";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";
import { TierFeature, tierMatrix } from "@server/lib/billing/tierMatrix"; import { TierFeature, tierMatrix } from "@server/lib/billing/tierMatrix";

View File

@@ -9,7 +9,7 @@ import OrgProvider from "@app/providers/OrgProvider";
import { ListAccessTokensResponse } from "@server/routers/accessToken"; import { ListAccessTokensResponse } from "@server/routers/accessToken";
import ShareLinksTable, { import ShareLinksTable, {
ShareLinkRow ShareLinkRow
} from "../../../../components/ShareLinksTable"; } from "@app/components/ShareLinksTable";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";
type ShareLinksPageProps = { type ShareLinksPageProps = {

View File

@@ -6,9 +6,9 @@ import { redirect } from "next/navigation";
import { authCookieHeader } from "@app/lib/api/cookies"; import { authCookieHeader } from "@app/lib/api/cookies";
import { HorizontalTabs } from "@app/components/HorizontalTabs"; import { HorizontalTabs } from "@app/components/HorizontalTabs";
import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import SiteInfoCard from "../../../../../components/SiteInfoCard"; import SiteInfoCard from "@app/components/SiteInfoCard";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";
import { build } from "@server/build";
interface SettingsLayoutProps { interface SettingsLayoutProps {
children: React.ReactNode; children: React.ReactNode;

View File

@@ -3,7 +3,7 @@ import { authCookieHeader } from "@app/lib/api/cookies";
import { AxiosResponse } from "axios"; import { AxiosResponse } from "axios";
import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import { ListRootApiKeysResponse } from "@server/routers/apiKeys"; import { ListRootApiKeysResponse } from "@server/routers/apiKeys";
import ApiKeysTable, { ApiKeyRow } from "../../../components/ApiKeysTable"; import ApiKeysTable, { ApiKeyRow } from "@app/components/ApiKeysTable";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";
type ApiKeyPageProps = {}; type ApiKeyPageProps = {};

View File

@@ -31,7 +31,7 @@ import { zodResolver } from "@hookform/resolvers/zod";
import { z } from "zod"; import { z } from "zod";
import { Alert, AlertDescription, AlertTitle } from "@app/components/ui/alert"; import { Alert, AlertDescription, AlertTitle } from "@app/components/ui/alert";
import { InfoIcon, ExternalLink, CheckIcon } from "lucide-react"; import { InfoIcon, ExternalLink, CheckIcon } from "lucide-react";
import PolicyTable, { PolicyRow } from "../../../../../components/PolicyTable"; import PolicyTable, { PolicyRow } from "@app/components/PolicyTable";
import { AxiosResponse } from "axios"; import { AxiosResponse } from "axios";
import { ListOrgsResponse } from "@server/routers/org"; import { ListOrgsResponse } from "@server/routers/org";
import { ListRolesResponse } from "@server/routers/role"; import { ListRolesResponse } from "@server/routers/role";

View File

@@ -2,7 +2,7 @@ import { internal } from "@app/lib/api";
import { authCookieHeader } from "@app/lib/api/cookies"; import { authCookieHeader } from "@app/lib/api/cookies";
import { AxiosResponse } from "axios"; import { AxiosResponse } from "axios";
import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import IdpTable, { IdpRow } from "../../../components/AdminIdpTable"; import IdpTable, { IdpRow } from "@app/components/AdminIdpTable";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";
export default async function IdpPage() { export default async function IdpPage() {

View File

@@ -6,7 +6,7 @@ import { createApiClient } from "@app/lib/api";
import { useEnvContext } from "@app/hooks/useEnvContext"; import { useEnvContext } from "@app/hooks/useEnvContext";
import { toast } from "@app/hooks/useToast"; import { toast } from "@app/hooks/useToast";
import { formatAxiosError } from "@app/lib/api"; import { formatAxiosError } from "@app/lib/api";
import { LicenseKeysDataTable } from "../../../components/LicenseKeysDataTable"; import { LicenseKeysDataTable } from "@app/components/LicenseKeysDataTable";
import { AxiosResponse } from "axios"; import { AxiosResponse } from "axios";
import { Button } from "@app/components/ui/button"; import { Button } from "@app/components/ui/button";
import { import {
@@ -45,7 +45,7 @@ import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import { Check, Heart, InfoIcon } from "lucide-react"; import { Check, Heart, InfoIcon } from "lucide-react";
import CopyTextBox from "@app/components/CopyTextBox"; import CopyTextBox from "@app/components/CopyTextBox";
import ConfirmDeleteDialog from "@app/components/ConfirmDeleteDialog"; import ConfirmDeleteDialog from "@app/components/ConfirmDeleteDialog";
import { SitePriceCalculator } from "../../../components/SitePriceCalculator"; import { SitePriceCalculator } from "@app/components/SitePriceCalculator";
import { Checkbox } from "@app/components/ui/checkbox"; import { Checkbox } from "@app/components/ui/checkbox";
import { Alert, AlertDescription, AlertTitle } from "@app/components/ui/alert"; import { Alert, AlertDescription, AlertTitle } from "@app/components/ui/alert";
import { useSupporterStatusContext } from "@app/hooks/useSupporterStatusContext"; import { useSupporterStatusContext } from "@app/hooks/useSupporterStatusContext";

View File

@@ -3,7 +3,7 @@ import { authCookieHeader } from "@app/lib/api/cookies";
import { AxiosResponse } from "axios"; import { AxiosResponse } from "axios";
import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; import SettingsSectionTitle from "@app/components/SettingsSectionTitle";
import { AdminListUsersResponse } from "@server/routers/user/adminListUsers"; import { AdminListUsersResponse } from "@server/routers/user/adminListUsers";
import UsersTable, { GlobalUserRow } from "../../../components/AdminUsersTable"; import UsersTable, { GlobalUserRow } from "@app/components/AdminUsersTable";
import { Alert, AlertDescription, AlertTitle } from "@app/components/ui/alert"; import { Alert, AlertDescription, AlertTitle } from "@app/components/ui/alert";
import { InfoIcon } from "lucide-react"; import { InfoIcon } from "lucide-react";
import { getTranslations } from "next-intl/server"; import { getTranslations } from "next-intl/server";

View File

@@ -23,6 +23,7 @@ import {
Settings, Settings,
SquareMousePointer, SquareMousePointer,
TicketCheck, TicketCheck,
Unplug,
User, User,
UserCog, UserCog,
Users, Users,
@@ -196,6 +197,11 @@ export const orgNavSections = (
title: "sidebarLogsConnection", title: "sidebarLogsConnection",
href: "/{orgId}/settings/logs/connection", href: "/{orgId}/settings/logs/connection",
icon: <Cable className="size-4 flex-none" /> icon: <Cable className="size-4 flex-none" />
},
{
title: "sidebarLogsStreaming",
href: "/{orgId}/settings/logs/streaming",
icon: <Unplug className="size-4 flex-none" />
} }
] ]
: []) : [])

View File

@@ -0,0 +1,836 @@
"use client";
import { useState, useEffect } from "react";
import {
Credenza,
CredenzaBody,
CredenzaClose,
CredenzaContent,
CredenzaDescription,
CredenzaFooter,
CredenzaHeader,
CredenzaTitle
} from "@app/components/Credenza";
import { Button } from "@app/components/ui/button";
import { Input } from "@app/components/ui/input";
import { Label } from "@app/components/ui/label";
import { Switch } from "@app/components/ui/switch";
import { HorizontalTabs } from "@app/components/HorizontalTabs";
import { RadioGroup, RadioGroupItem } from "@app/components/ui/radio-group";
import { Textarea } from "@app/components/ui/textarea";
import { Checkbox } from "@app/components/ui/checkbox";
import { Plus, X } from "lucide-react";
import { createApiClient, formatAxiosError } from "@app/lib/api";
import { useEnvContext } from "@app/hooks/useEnvContext";
import { toast } from "@app/hooks/useToast";
import { build } from "@server/build";
// ── Types ──────────────────────────────────────────────────────────────────────
export type AuthType = "none" | "bearer" | "basic" | "custom";
export type PayloadFormat = "json_array" | "ndjson" | "json_single";
export interface HttpConfig {
name: string;
url: string;
authType: AuthType;
bearerToken?: string;
basicCredentials?: string;
customHeaderName?: string;
customHeaderValue?: string;
headers: Array<{ key: string; value: string }>;
format: PayloadFormat;
useBodyTemplate: boolean;
bodyTemplate?: string;
}
export interface Destination {
destinationId: number;
orgId: string;
type: string;
config: string;
enabled: boolean;
sendAccessLogs: boolean;
sendActionLogs: boolean;
sendConnectionLogs: boolean;
sendRequestLogs: boolean;
createdAt: number;
updatedAt: number;
}
// ── Helpers ────────────────────────────────────────────────────────────────────
export const defaultHttpConfig = (): HttpConfig => ({
name: "",
url: "",
authType: "none",
bearerToken: "",
basicCredentials: "",
customHeaderName: "",
customHeaderValue: "",
headers: [],
format: "json_array",
useBodyTemplate: false,
bodyTemplate: ""
});
export function parseHttpConfig(raw: string): HttpConfig {
try {
return { ...defaultHttpConfig(), ...JSON.parse(raw) };
} catch {
return defaultHttpConfig();
}
}
// ── Headers editor ─────────────────────────────────────────────────────────────
interface HeadersEditorProps {
headers: Array<{ key: string; value: string }>;
onChange: (headers: Array<{ key: string; value: string }>) => void;
}
function HeadersEditor({ headers, onChange }: HeadersEditorProps) {
const addRow = () => onChange([...headers, { key: "", value: "" }]);
const removeRow = (i: number) =>
onChange(headers.filter((_, idx) => idx !== i));
const updateRow = (i: number, field: "key" | "value", val: string) => {
const next = [...headers];
next[i] = { ...next[i], [field]: val };
onChange(next);
};
return (
<div className="space-y-3">
{headers.length === 0 && (
<p className="text-xs text-muted-foreground">
No custom headers configured. Click "Add Header" to add
one.
</p>
)}
{headers.map((h, i) => (
<div key={i} className="flex gap-2 items-center">
<Input
value={h.key}
onChange={(e) => updateRow(i, "key", e.target.value)}
placeholder="Header name"
className="flex-1"
/>
<Input
value={h.value}
onChange={(e) =>
updateRow(i, "value", e.target.value)
}
placeholder="Value"
className="flex-1"
/>
<Button
type="button"
variant="ghost"
size="icon"
onClick={() => removeRow(i)}
className="shrink-0 h-9 w-9"
>
<X className="h-4 w-4" />
</Button>
</div>
))}
<Button
type="button"
variant="outline"
size="sm"
onClick={addRow}
className="gap-1.5"
>
<Plus className="h-3.5 w-3.5" />
Add Header
</Button>
</div>
);
}
// ── Component ──────────────────────────────────────────────────────────────────
export interface HttpDestinationCredenzaProps {
open: boolean;
onOpenChange: (open: boolean) => void;
editing: Destination | null;
orgId: string;
onSaved: () => void;
}
export function HttpDestinationCredenza({
open,
onOpenChange,
editing,
orgId,
onSaved
}: HttpDestinationCredenzaProps) {
const api = createApiClient(useEnvContext());
const [saving, setSaving] = useState(false);
const [cfg, setCfg] = useState<HttpConfig>(defaultHttpConfig());
const [sendAccessLogs, setSendAccessLogs] = useState(false);
const [sendActionLogs, setSendActionLogs] = useState(false);
const [sendConnectionLogs, setSendConnectionLogs] = useState(false);
const [sendRequestLogs, setSendRequestLogs] = useState(false);
useEffect(() => {
if (open) {
setCfg(
editing ? parseHttpConfig(editing.config) : defaultHttpConfig()
);
setSendAccessLogs(editing?.sendAccessLogs ?? false);
setSendActionLogs(editing?.sendActionLogs ?? false);
setSendConnectionLogs(editing?.sendConnectionLogs ?? false);
setSendRequestLogs(editing?.sendRequestLogs ?? false);
}
}, [open, editing]);
const update = (patch: Partial<HttpConfig>) =>
setCfg((prev) => ({ ...prev, ...patch }));
const urlError: string | null = (() => {
const raw = cfg.url.trim();
if (!raw) return null;
try {
const parsed = new URL(raw);
if (
parsed.protocol !== "http:" &&
parsed.protocol !== "https:"
) {
return "URL must use http or https";
}
if (build === "saas" && parsed.protocol !== "https:") {
return "HTTPS is required on cloud deployments";
}
return null;
} catch {
return "Enter a valid URL (e.g. https://example.com/webhook)";
}
})();
const isValid =
cfg.name.trim() !== "" &&
cfg.url.trim() !== "" &&
urlError === null;
async function handleSave() {
if (!isValid) return;
setSaving(true);
try {
const payload = {
type: "http",
config: JSON.stringify(cfg),
sendAccessLogs,
sendActionLogs,
sendConnectionLogs,
sendRequestLogs
};
if (editing) {
await api.post(
`/org/${orgId}/event-streaming-destination/${editing.destinationId}`,
payload
);
toast({ title: "Destination updated successfully" });
} else {
await api.put(
`/org/${orgId}/event-streaming-destination`,
payload
);
toast({ title: "Destination created successfully" });
}
onSaved();
onOpenChange(false);
} catch (e) {
toast({
variant: "destructive",
title: editing
? "Failed to update destination"
: "Failed to create destination",
description: formatAxiosError(
e,
"An unexpected error occurred."
)
});
} finally {
setSaving(false);
}
}
return (
<Credenza open={open} onOpenChange={onOpenChange}>
<CredenzaContent className="sm:max-w-2xl">
<CredenzaHeader>
<CredenzaTitle>
{editing
? "Edit Destination"
: "Add HTTP Destination"}
</CredenzaTitle>
<CredenzaDescription>
{editing
? "Update the configuration for this HTTP event streaming destination."
: "Configure a new HTTP endpoint to receive your organization's events."}
</CredenzaDescription>
</CredenzaHeader>
<CredenzaBody>
<HorizontalTabs
clientSide
items={[
{ title: "Settings", href: "" },
{ title: "Headers", href: "" },
{ title: "Body", href: "" },
{ title: "Logs", href: "" }
]}
>
{/* ── Settings tab ────────────────────────────── */}
<div className="space-y-6 mt-4 p-1">
{/* Name */}
<div className="space-y-2">
<Label htmlFor="dest-name">Name</Label>
<Input
id="dest-name"
placeholder="My HTTP destination"
value={cfg.name}
onChange={(e) =>
update({ name: e.target.value })
}
/>
</div>
{/* URL */}
<div className="space-y-2">
<Label htmlFor="dest-url">
Destination URL
</Label>
<Input
id="dest-url"
placeholder="https://example.com/webhook"
value={cfg.url}
onChange={(e) =>
update({ url: e.target.value })
}
/>
{urlError && (
<p className="text-xs text-destructive">
{urlError}
</p>
)}
</div>
{/* Authentication */}
<div className="space-y-3">
<div>
<label className="font-medium block">
Authentication
</label>
<p className="text-sm text-muted-foreground mt-0.5">
Choose how requests to your endpoint
are authenticated.
</p>
</div>
<RadioGroup
value={cfg.authType}
onValueChange={(v) =>
update({ authType: v as AuthType })
}
className="gap-2"
>
{/* None */}
<div className="flex items-start gap-3 rounded-md border p-3 transition-colors">
<RadioGroupItem
value="none"
id="auth-none"
className="mt-0.5"
/>
<div>
<Label
htmlFor="auth-none"
className="cursor-pointer font-medium"
>
No Authentication
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
Sends requests without an{" "}
<code className="bg-muted px-1 py-0.5 rounded text-xs">
Authorization
</code>{" "}
header.
</p>
</div>
</div>
{/* Bearer */}
<div className="flex items-start gap-3 rounded-md border p-3">
<RadioGroupItem
value="bearer"
id="auth-bearer"
className="mt-0.5"
/>
<div className="flex-1 space-y-3">
<div>
<Label
htmlFor="auth-bearer"
className="cursor-pointer font-medium"
>
Bearer Token
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
Adds an{" "}
<code className="bg-muted px-1 py-0.5 rounded text-xs">
Authorization: Bearer
&lt;token&gt;
</code>{" "}
header to each request.
</p>
</div>
{cfg.authType === "bearer" && (
<Input
placeholder="Your API key or token"
value={
cfg.bearerToken ?? ""
}
onChange={(e) =>
update({
bearerToken:
e.target.value
})
}
/>
)}
</div>
</div>
{/* Basic */}
<div className="flex items-start gap-3 rounded-md border p-3">
<RadioGroupItem
value="basic"
id="auth-basic"
className="mt-0.5"
/>
<div className="flex-1 space-y-3">
<div>
<Label
htmlFor="auth-basic"
className="cursor-pointer font-medium"
>
Basic Auth
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
Adds an{" "}
<code className="bg-muted px-1 py-0.5 rounded text-xs">
Authorization: Basic
&lt;credentials&gt;
</code>{" "}
header. Provide credentials
as{" "}
<code className="bg-muted px-1 py-0.5 rounded text-xs">
username:password
</code>
.
</p>
</div>
{cfg.authType === "basic" && (
<Input
placeholder="username:password"
value={
cfg.basicCredentials ??
""
}
onChange={(e) =>
update({
basicCredentials:
e.target.value
})
}
/>
)}
</div>
</div>
{/* Custom */}
<div className="flex items-start gap-3 rounded-md border p-3">
<RadioGroupItem
value="custom"
id="auth-custom"
className="mt-0.5"
/>
<div className="flex-1 space-y-3">
<div>
<Label
htmlFor="auth-custom"
className="cursor-pointer font-medium"
>
Custom Header
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
Specify a custom HTTP
header name and value for
authentication (e.g.{" "}
<code className="bg-muted px-1 py-0.5 rounded text-xs">
X-API-Key
</code>
).
</p>
</div>
{cfg.authType === "custom" && (
<div className="flex gap-2">
<Input
placeholder="Header name (e.g. X-API-Key)"
value={
cfg.customHeaderName ??
""
}
onChange={(e) =>
update({
customHeaderName:
e.target
.value
})
}
className="flex-1"
/>
<Input
placeholder="Header value"
value={
cfg.customHeaderValue ??
""
}
onChange={(e) =>
update({
customHeaderValue:
e.target
.value
})
}
className="flex-1"
/>
</div>
)}
</div>
</div>
</RadioGroup>
</div>
</div>
{/* ── Headers tab ──────────────────────────────── */}
<div className="space-y-6 mt-4 p-1">
<div>
<label className="font-medium block">
Custom HTTP Headers
</label>
<p className="text-sm text-muted-foreground mt-0.5">
Add custom headers to every outgoing
request. Useful for static tokens or
custom{" "}
<code className="bg-muted px-1 py-0.5 rounded text-xs">
Content-Type
</code>
. By default,{" "}
<code className="bg-muted px-1 py-0.5 rounded text-xs">
Content-Type: application/json
</code>{" "}
is sent.
</p>
</div>
<HeadersEditor
headers={cfg.headers}
onChange={(headers) => update({ headers })}
/>
</div>
{/* ── Body tab ─────────────────────────── */}
<div className="space-y-6 mt-4 p-1">
<div>
<label className="font-medium block">
Custom Body Template
</label>
<p className="text-sm text-muted-foreground mt-0.5">
Control the JSON payload structure sent to
your endpoint. If disabled, a default JSON
object is sent for each event.
</p>
</div>
<div className="flex items-center gap-3">
<Switch
id="use-body-template"
checked={cfg.useBodyTemplate}
onCheckedChange={(v) =>
update({ useBodyTemplate: v })
}
/>
<Label
htmlFor="use-body-template"
className="cursor-pointer"
>
Enable custom body template
</Label>
</div>
{cfg.useBodyTemplate && (
<div className="space-y-2">
<Label htmlFor="body-template">
Body Template (JSON)
</Label>
<Textarea
id="body-template"
placeholder={
'{\n "event": "{{event}}",\n "timestamp": "{{timestamp}}",\n "data": {{data}}\n}'
}
value={cfg.bodyTemplate ?? ""}
onChange={(e) =>
update({
bodyTemplate: e.target.value
})
}
className="font-mono text-xs min-h-45 resize-y"
/>
<p className="text-xs text-muted-foreground">
Use template variables to reference
event fields in your payload.
</p>
</div>
)}
{/* Payload Format */}
<div className="space-y-3">
<div>
<label className="font-medium block">
Payload Format
</label>
<p className="text-sm text-muted-foreground mt-0.5">
How events are serialised into each
request body.
</p>
</div>
<RadioGroup
value={cfg.format ?? "json_array"}
onValueChange={(v) =>
update({
format: v as PayloadFormat
})
}
className="gap-2"
>
{/* JSON Array */}
<div className="flex items-start gap-3 rounded-md border p-3 transition-colors">
<RadioGroupItem
value="json_array"
id="fmt-json-array"
className="mt-0.5"
/>
<div>
<Label
htmlFor="fmt-json-array"
className="cursor-pointer font-medium"
>
JSON Array
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
One request per batch, body is
a JSON array{" "}
<code className="bg-muted px-1 py-0.5 rounded text-xs">
[{"{...}"}, {"{...}"}]
</code>
. Compatible with most generic
webhooks and Datadog.
</p>
</div>
</div>
{/* NDJSON */}
<div className="flex items-start gap-3 rounded-md border p-3 transition-colors">
<RadioGroupItem
value="ndjson"
id="fmt-ndjson"
className="mt-0.5"
/>
<div>
<Label
htmlFor="fmt-ndjson"
className="cursor-pointer font-medium"
>
NDJSON
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
One request per batch, body is
newline-delimited JSON one
object per line, no outer
array. Required by{" "}
<strong>Splunk HEC</strong>,{" "}
<strong>
Elastic / OpenSearch
</strong>
, and{" "}
<strong>Grafana Loki</strong>.
</p>
</div>
</div>
{/* Single event per request */}
<div className="flex items-start gap-3 rounded-md border p-3 transition-colors">
<RadioGroupItem
value="json_single"
id="fmt-json-single"
className="mt-0.5"
/>
<div>
<Label
htmlFor="fmt-json-single"
className="cursor-pointer font-medium"
>
One Event Per Request
</Label>
<p className="text-xs text-muted-foreground mt-0.5">
Sends a separate HTTP POST for
each individual event. Use only
for endpoints that cannot
handle batches.
</p>
</div>
</div>
</RadioGroup>
</div>
</div>
{/* ── Logs tab ──────────────────────────────────── */}
<div className="space-y-6 mt-4 p-1">
<div>
<label className="font-medium block">
Log Types
</label>
<p className="text-sm text-muted-foreground mt-0.5">
Choose which log types are forwarded to
this destination. Only enabled log types
will be streamed.
</p>
</div>
<div className="space-y-3">
<div className="flex items-start gap-3 rounded-md border p-3">
<Checkbox
id="log-access"
checked={sendAccessLogs}
onCheckedChange={(v) =>
setSendAccessLogs(v === true)
}
className="mt-0.5"
/>
<div>
<label
htmlFor="log-access"
className="text-sm font-medium cursor-pointer"
>
Access Logs
</label>
<p className="text-xs text-muted-foreground mt-0.5">
Resource access attempts, including
authenticated and denied requests.
</p>
</div>
</div>
<div className="flex items-start gap-3 rounded-md border p-3">
<Checkbox
id="log-action"
checked={sendActionLogs}
onCheckedChange={(v) =>
setSendActionLogs(v === true)
}
className="mt-0.5"
/>
<div>
<label
htmlFor="log-action"
className="text-sm font-medium cursor-pointer"
>
Action Logs
</label>
<p className="text-xs text-muted-foreground mt-0.5">
Administrative actions performed by
users within the organization.
</p>
</div>
</div>
<div className="flex items-start gap-3 rounded-md border p-3">
<Checkbox
id="log-connection"
checked={sendConnectionLogs}
onCheckedChange={(v) =>
setSendConnectionLogs(v === true)
}
className="mt-0.5"
/>
<div>
<label
htmlFor="log-connection"
className="text-sm font-medium cursor-pointer"
>
Connection Logs
</label>
<p className="text-xs text-muted-foreground mt-0.5">
Site and tunnel connection events,
including connects and
disconnects.
</p>
</div>
</div>
<div className="flex items-start gap-3 rounded-md border p-3">
<Checkbox
id="log-request"
checked={sendRequestLogs}
onCheckedChange={(v) =>
setSendRequestLogs(v === true)
}
className="mt-0.5"
/>
<div>
<label
htmlFor="log-request"
className="text-sm font-medium cursor-pointer"
>
Request Logs
</label>
<p className="text-xs text-muted-foreground mt-0.5">
HTTP request logs for proxied
resources, including method, path,
and response code.
</p>
</div>
</div>
</div>
</div>
</HorizontalTabs>
</CredenzaBody>
<CredenzaFooter>
<CredenzaClose asChild>
<Button
type="button"
variant="outline"
disabled={saving}
>
Cancel
</Button>
</CredenzaClose>
<Button
type="button"
onClick={handleSave}
loading={saving}
disabled={!isValid || saving}
>
{editing ? "Save Changes" : "Create Destination"}
</Button>
</CredenzaFooter>
</CredenzaContent>
</Credenza>
);
}