From dae169540bd3430f698725f9d8c830f441e5be96 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 2 Mar 2026 16:49:17 -0800 Subject: [PATCH 01/12] Fix defaults for orgs --- server/lib/readConfigFile.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/lib/readConfigFile.ts b/server/lib/readConfigFile.ts index cca0aa6aa..f6c8592e9 100644 --- a/server/lib/readConfigFile.ts +++ b/server/lib/readConfigFile.ts @@ -302,8 +302,8 @@ export const configSchema = z .optional() .default({ block_size: 24, - subnet_group: "100.90.128.0/24", - utility_subnet_group: "100.96.128.0/24" + subnet_group: "100.90.128.0/20", + utility_subnet_group: "100.96.128.0/20" }), rate_limits: z .object({ From 6cf1b9b0108d5f4eda8e22f716f91e8874a4c558 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 2 Mar 2026 18:51:48 -0800 Subject: [PATCH 02/12] Support improved targets msg v2 --- server/lib/ip.ts | 123 +++++++++++ server/lib/rebuildClientAssociations.ts | 32 ++- server/routers/client/targets.ts | 209 ++++++++++++++---- server/routers/newt/buildConfiguration.ts | 26 ++- server/routers/newt/handleGetConfigMessage.ts | 5 +- .../siteResource/updateSiteResource.ts | 10 +- 6 files changed, 326 insertions(+), 79 deletions(-) diff --git a/server/lib/ip.ts b/server/lib/ip.ts index 21ec78c1b..3a29b8661 100644 --- a/server/lib/ip.ts +++ b/server/lib/ip.ts @@ -571,6 +571,129 @@ export function generateSubnetProxyTargets( return targets; } +export type SubnetProxyTargetV2 = { + sourcePrefixes: string[]; // must be cidrs + destPrefix: string; // must be a cidr + disableIcmp?: boolean; + rewriteTo?: string; // must be a cidr + portRange?: { + min: number; + max: number; + protocol: "tcp" | "udp"; + }[]; +}; + +export function generateSubnetProxyTargetV2( + siteResource: SiteResource, + clients: { + clientId: number; + pubKey: string | null; + subnet: string | null; + }[] +): SubnetProxyTargetV2 | undefined { + if (clients.length === 0) { + logger.debug( + `No clients have access to site resource ${siteResource.siteResourceId}, skipping target generation.` + ); + return; + } + + let target: SubnetProxyTargetV2 | null = null; + + const portRange = [ + ...parsePortRangeString(siteResource.tcpPortRangeString, "tcp"), + ...parsePortRangeString(siteResource.udpPortRangeString, "udp") + ]; + const disableIcmp = siteResource.disableIcmp ?? false; + + if (siteResource.mode == "host") { + let destination = siteResource.destination; + // check if this is a valid ip + const ipSchema = z.union([z.ipv4(), z.ipv6()]); + if (ipSchema.safeParse(destination).success) { + destination = `${destination}/32`; + + target = { + sourcePrefixes: [], + destPrefix: destination, + portRange, + disableIcmp + }; + } + + if (siteResource.alias && siteResource.aliasAddress) { + // also push a match for the alias address + target = { + sourcePrefixes: [], + destPrefix: `${siteResource.aliasAddress}/32`, + rewriteTo: destination, + portRange, + disableIcmp + }; + } + } else if (siteResource.mode == "cidr") { + target = { + sourcePrefixes: [], + destPrefix: siteResource.destination, + portRange, + disableIcmp + }; + } + + if (!target) { + return; + } + + for (const clientSite of clients) { + if (!clientSite.subnet) { + logger.debug( + `Client ${clientSite.clientId} has no subnet, skipping for site resource ${siteResource.siteResourceId}.` + ); + continue; + } + + const clientPrefix = `${clientSite.subnet.split("/")[0]}/32`; + + // add client prefix to source prefixes + target.sourcePrefixes.push(clientPrefix); + } + + // print a nice representation of the targets + // logger.debug( + // `Generated subnet proxy targets for: ${JSON.stringify(targets, null, 2)}` + // ); + + return target; +} + + +/** + * Converts a SubnetProxyTargetV2 to an array of SubnetProxyTarget (v1) + * by expanding each source prefix into its own target entry. + * @param targetV2 - The v2 target to convert + * @returns Array of v1 SubnetProxyTarget objects + */ + export function convertSubnetProxyTargetsV2ToV1( + targetsV2: SubnetProxyTargetV2[] + ): SubnetProxyTarget[] { + return targetsV2.flatMap((targetV2) => + targetV2.sourcePrefixes.map((sourcePrefix) => ({ + sourcePrefix, + destPrefix: targetV2.destPrefix, + ...(targetV2.disableIcmp !== undefined && { + disableIcmp: targetV2.disableIcmp + }), + ...(targetV2.rewriteTo !== undefined && { + rewriteTo: targetV2.rewriteTo + }), + ...(targetV2.portRange !== undefined && { + portRange: targetV2.portRange + }) + })) + ); + } + + // Custom schema for validating port range strings // Format: "80,443,8000-9000" or "*" for all ports, or empty string export const portRangeStringSchema = z diff --git a/server/lib/rebuildClientAssociations.ts b/server/lib/rebuildClientAssociations.ts index 625e57935..915c3648d 100644 --- a/server/lib/rebuildClientAssociations.ts +++ b/server/lib/rebuildClientAssociations.ts @@ -32,7 +32,7 @@ import logger from "@server/logger"; import { generateAliasConfig, generateRemoteSubnets, - generateSubnetProxyTargets, + generateSubnetProxyTargetV2, parseEndpoint, formatEndpoint } from "@server/lib/ip"; @@ -659,17 +659,14 @@ async function handleSubnetProxyTargetUpdates( ); if (addedClients.length > 0) { - const targetsToAdd = generateSubnetProxyTargets( + const targetToAdd = generateSubnetProxyTargetV2( siteResource, addedClients ); - if (targetsToAdd.length > 0) { - logger.info( - `Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` - ); + if (targetToAdd) { proxyJobs.push( - addSubnetProxyTargets(newt.newtId, targetsToAdd) + addSubnetProxyTargets(newt.newtId, [targetToAdd]) ); } @@ -695,17 +692,14 @@ async function handleSubnetProxyTargetUpdates( ); if (removedClients.length > 0) { - const targetsToRemove = generateSubnetProxyTargets( + const targetToRemove = generateSubnetProxyTargetV2( siteResource, removedClients ); - if (targetsToRemove.length > 0) { - logger.info( - `Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` - ); + if (targetToRemove) { proxyJobs.push( - removeSubnetProxyTargets(newt.newtId, targetsToRemove) + removeSubnetProxyTargets(newt.newtId, [targetToRemove]) ); } @@ -1159,7 +1153,7 @@ async function handleMessagesForClientResources( } for (const resource of resources) { - const targets = generateSubnetProxyTargets(resource, [ + const target = generateSubnetProxyTargetV2(resource, [ { clientId: client.clientId, pubKey: client.pubKey, @@ -1167,8 +1161,8 @@ async function handleMessagesForClientResources( } ]); - if (targets.length > 0) { - proxyJobs.push(addSubnetProxyTargets(newt.newtId, targets)); + if (target) { + proxyJobs.push(addSubnetProxyTargets(newt.newtId, [target])); } try { @@ -1230,7 +1224,7 @@ async function handleMessagesForClientResources( } for (const resource of resources) { - const targets = generateSubnetProxyTargets(resource, [ + const target = generateSubnetProxyTargetV2(resource, [ { clientId: client.clientId, pubKey: client.pubKey, @@ -1238,9 +1232,9 @@ async function handleMessagesForClientResources( } ]); - if (targets.length > 0) { + if (target) { proxyJobs.push( - removeSubnetProxyTargets(newt.newtId, targets) + removeSubnetProxyTargets(newt.newtId, [target]) ); } diff --git a/server/routers/client/targets.ts b/server/routers/client/targets.ts index bf612d352..48b7e216d 100644 --- a/server/routers/client/targets.ts +++ b/server/routers/client/targets.ts @@ -1,8 +1,15 @@ import { sendToClient } from "#dynamic/routers/ws"; -import { db, olms, Transaction } from "@server/db"; -import { Alias, SubnetProxyTarget } from "@server/lib/ip"; +import { S } from "@faker-js/faker/dist/airline-Dz1uGqgJ"; +import { db, newts, olms, Transaction } from "@server/db"; +import { + Alias, + convertSubnetProxyTargetsV2ToV1, + SubnetProxyTarget, + SubnetProxyTargetV2 +} from "@server/lib/ip"; import logger from "@server/logger"; import { eq } from "drizzle-orm"; +import semver from "semver"; const BATCH_SIZE = 50; const BATCH_DELAY_MS = 50; @@ -19,57 +26,149 @@ function chunkArray(array: T[], size: number): T[][] { return chunks; } -export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) { - const batches = chunkArray(targets, BATCH_SIZE); +const NEWT_V2_TARGETS_VERSION = ">=1.11.0"; + +export async function convertTargetsIfNessicary( + newtId: string, + targets: SubnetProxyTarget[] | SubnetProxyTargetV2[] +) { + // get the newt + const [newt] = await db + .select() + .from(newts) + .where(eq(newts.newtId, newtId)); + if (!newt) { + throw new Error(`No newt found for id: ${newtId}`); + } + + // check the semver + if ( + newt.version && + !semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION) + ) { + logger.debug( + `addTargets Newt version ${newt.version} does not support targets v2 falling back` + ); + targets = convertSubnetProxyTargetsV2ToV1( + targets as SubnetProxyTargetV2[] + ); + } + + return targets; +} + +export async function addTargets( + newtId: string, + targets: SubnetProxyTarget[] | SubnetProxyTargetV2[] +) { + targets = await convertTargetsIfNessicary(newtId, targets); + + const batches = chunkArray( + targets, + BATCH_SIZE + ); + for (let i = 0; i < batches.length; i++) { if (i > 0) { await sleep(BATCH_DELAY_MS); } - await sendToClient(newtId, { - type: `newt/wg/targets/add`, - data: batches[i] - }, { incrementConfigVersion: true }); + await sendToClient( + newtId, + { + type: `newt/wg/targets/add`, + data: batches[i] + }, + { incrementConfigVersion: true } + ); } } export async function removeTargets( newtId: string, - targets: SubnetProxyTarget[] + targets: SubnetProxyTarget[] | SubnetProxyTargetV2[] ) { - const batches = chunkArray(targets, BATCH_SIZE); + targets = await convertTargetsIfNessicary(newtId, targets); + + const batches = chunkArray( + targets, + BATCH_SIZE + ); for (let i = 0; i < batches.length; i++) { if (i > 0) { await sleep(BATCH_DELAY_MS); } - await sendToClient(newtId, { - type: `newt/wg/targets/remove`, - data: batches[i] - },{ incrementConfigVersion: true }); + await sendToClient( + newtId, + { + type: `newt/wg/targets/remove`, + data: batches[i] + }, + { incrementConfigVersion: true } + ); } } export async function updateTargets( newtId: string, targets: { - oldTargets: SubnetProxyTarget[]; - newTargets: SubnetProxyTarget[]; + oldTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[]; + newTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[]; } ) { - const oldBatches = chunkArray(targets.oldTargets, BATCH_SIZE); - const newBatches = chunkArray(targets.newTargets, BATCH_SIZE); + // get the newt + const [newt] = await db + .select() + .from(newts) + .where(eq(newts.newtId, newtId)); + if (!newt) { + logger.error(`addTargetsL No newt found for id: ${newtId}`); + return; + } + + // check the semver + if ( + newt.version && + !semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION) + ) { + logger.debug( + `addTargets Newt version ${newt.version} does not support targets v2 falling back` + ); + targets = { + oldTargets: convertSubnetProxyTargetsV2ToV1( + targets.oldTargets as SubnetProxyTargetV2[] + ), + newTargets: convertSubnetProxyTargetsV2ToV1( + targets.newTargets as SubnetProxyTargetV2[] + ) + }; + } + + const oldBatches = chunkArray( + targets.oldTargets, + BATCH_SIZE + ); + const newBatches = chunkArray( + targets.newTargets, + BATCH_SIZE + ); + const maxBatches = Math.max(oldBatches.length, newBatches.length); for (let i = 0; i < maxBatches; i++) { if (i > 0) { await sleep(BATCH_DELAY_MS); } - await sendToClient(newtId, { - type: `newt/wg/targets/update`, - data: { - oldTargets: oldBatches[i] || [], - newTargets: newBatches[i] || [] - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + newtId, + { + type: `newt/wg/targets/update`, + data: { + oldTargets: oldBatches[i] || [], + newTargets: newBatches[i] || [] + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -94,14 +193,18 @@ export async function addPeerData( olmId = olm.olmId; } - await sendToClient(olmId, { - type: `olm/wg/peer/data/add`, - data: { - siteId: siteId, - remoteSubnets: remoteSubnets, - aliases: aliases - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + olmId, + { + type: `olm/wg/peer/data/add`, + data: { + siteId: siteId, + remoteSubnets: remoteSubnets, + aliases: aliases + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -125,14 +228,18 @@ export async function removePeerData( olmId = olm.olmId; } - await sendToClient(olmId, { - type: `olm/wg/peer/data/remove`, - data: { - siteId: siteId, - remoteSubnets: remoteSubnets, - aliases: aliases - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + olmId, + { + type: `olm/wg/peer/data/remove`, + data: { + siteId: siteId, + remoteSubnets: remoteSubnets, + aliases: aliases + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -166,14 +273,18 @@ export async function updatePeerData( olmId = olm.olmId; } - await sendToClient(olmId, { - type: `olm/wg/peer/data/update`, - data: { - siteId: siteId, - ...remoteSubnets, - ...aliases - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + olmId, + { + type: `olm/wg/peer/data/update`, + data: { + siteId: siteId, + ...remoteSubnets, + ...aliases + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } diff --git a/server/routers/newt/buildConfiguration.ts b/server/routers/newt/buildConfiguration.ts index e349f24e8..c20e713f6 100644 --- a/server/routers/newt/buildConfiguration.ts +++ b/server/routers/newt/buildConfiguration.ts @@ -1,9 +1,23 @@ -import { clients, clientSiteResourcesAssociationsCache, clientSitesAssociationsCache, db, ExitNode, resources, Site, siteResources, targetHealthCheck, targets } from "@server/db"; +import { + clients, + clientSiteResourcesAssociationsCache, + clientSitesAssociationsCache, + db, + ExitNode, + resources, + Site, + siteResources, + targetHealthCheck, + targets +} from "@server/db"; import logger from "@server/logger"; import { initPeerAddHandshake, updatePeer } from "../olm/peers"; import { eq, and } from "drizzle-orm"; import config from "@server/lib/config"; -import { generateSubnetProxyTargets, SubnetProxyTarget } from "@server/lib/ip"; +import { + generateSubnetProxyTargetV2, + SubnetProxyTargetV2 +} from "@server/lib/ip"; export async function buildClientConfigurationForNewtClient( site: Site, @@ -126,7 +140,7 @@ export async function buildClientConfigurationForNewtClient( .from(siteResources) .where(eq(siteResources.siteId, siteId)); - const targetsToSend: SubnetProxyTarget[] = []; + const targetsToSend: SubnetProxyTargetV2[] = []; for (const resource of allSiteResources) { // Get clients associated with this specific resource @@ -151,12 +165,14 @@ export async function buildClientConfigurationForNewtClient( ) ); - const resourceTargets = generateSubnetProxyTargets( + const resourceTarget = generateSubnetProxyTargetV2( resource, resourceClients ); - targetsToSend.push(...resourceTargets); + if (resourceTarget) { + targetsToSend.push(resourceTarget); + } } return { diff --git a/server/routers/newt/handleGetConfigMessage.ts b/server/routers/newt/handleGetConfigMessage.ts index 801c8b65a..d17a37e48 100644 --- a/server/routers/newt/handleGetConfigMessage.ts +++ b/server/routers/newt/handleGetConfigMessage.ts @@ -6,6 +6,7 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db"; import { eq } from "drizzle-orm"; import { sendToExitNode } from "#dynamic/lib/exitNodes"; import { buildClientConfigurationForNewtClient } from "./buildConfiguration"; +import { convertTargetsIfNessicary } from "../client/targets"; const inputSchema = z.object({ publicKey: z.string(), @@ -126,13 +127,15 @@ export const handleGetConfigMessage: MessageHandler = async (context) => { exitNode ); + const targetsToSend = await convertTargetsIfNessicary(newt.newtId, targets); + return { message: { type: "newt/wg/receive-config", data: { ipAddress: site.address, peers, - targets + targets: targetsToSend } }, broadcast: false, diff --git a/server/routers/siteResource/updateSiteResource.ts b/server/routers/siteResource/updateSiteResource.ts index 242b92265..c8119840f 100644 --- a/server/routers/siteResource/updateSiteResource.ts +++ b/server/routers/siteResource/updateSiteResource.ts @@ -24,7 +24,7 @@ import { updatePeerData, updateTargets } from "@server/routers/client/targets"; import { generateAliasConfig, generateRemoteSubnets, - generateSubnetProxyTargets, + generateSubnetProxyTargetV2, isIpInCidr, portRangeStringSchema } from "@server/lib/ip"; @@ -608,18 +608,18 @@ export async function handleMessagingForUpdatedSiteResource( // Only update targets on newt if destination changed if (destinationChanged || portRangesChanged) { - const oldTargets = generateSubnetProxyTargets( + const oldTarget = generateSubnetProxyTargetV2( existingSiteResource, mergedAllClients ); - const newTargets = generateSubnetProxyTargets( + const newTarget = generateSubnetProxyTargetV2( updatedSiteResource, mergedAllClients ); await updateTargets(newt.newtId, { - oldTargets: oldTargets, - newTargets: newTargets + oldTargets: [oldTarget], + newTargets: [newTarget] }); } From b01fcc70feab3d86939c5ae6fd43e8bcb4ee50db Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 3 Mar 2026 14:45:18 -0800 Subject: [PATCH 03/12] Fix ts and add note about ipv4 --- server/routers/siteResource/createSiteResource.ts | 2 +- server/routers/siteResource/updateSiteResource.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/routers/siteResource/createSiteResource.ts b/server/routers/siteResource/createSiteResource.ts index bbdc3638d..b12b8d100 100644 --- a/server/routers/siteResource/createSiteResource.ts +++ b/server/routers/siteResource/createSiteResource.ts @@ -88,7 +88,7 @@ const createSiteResourceSchema = z }, { message: - "Destination must be a valid IP address or valid domain AND alias is required" + "Destination must be a valid IPV4 address or valid domain AND alias is required" } ) .refine( diff --git a/server/routers/siteResource/updateSiteResource.ts b/server/routers/siteResource/updateSiteResource.ts index c8119840f..bc5daa55f 100644 --- a/server/routers/siteResource/updateSiteResource.ts +++ b/server/routers/siteResource/updateSiteResource.ts @@ -618,8 +618,8 @@ export async function handleMessagingForUpdatedSiteResource( ); await updateTargets(newt.newtId, { - oldTargets: [oldTarget], - newTargets: [newTarget] + oldTargets: oldTarget ? [oldTarget] : [], + newTargets: newTarget ? [newTarget] : [] }); } From 3cca0c09c06e1a928c5cae9b8b5ad10ab231ee88 Mon Sep 17 00:00:00 2001 From: Noe Charmet Date: Mon, 23 Mar 2026 11:09:19 +0100 Subject: [PATCH 04/12] Allow setting Redis password from env --- server/private/lib/readConfigFile.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/private/lib/readConfigFile.ts b/server/private/lib/readConfigFile.ts index 0ce6d0272..54260009b 100644 --- a/server/private/lib/readConfigFile.ts +++ b/server/private/lib/readConfigFile.ts @@ -57,7 +57,10 @@ export const privateConfigSchema = z.object({ .object({ host: z.string(), port: portSchema, - password: z.string().optional(), + password: z + .string() + .optional() + .transform(getEnvOrYaml("REDIS_PASSWORD")), db: z.int().nonnegative().optional().default(0), replicas: z .array( From 37d331e813a67274c22bfcd7839bbdd8bc692541 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 16:05:05 -0700 Subject: [PATCH 05/12] Update version --- server/routers/client/targets.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/routers/client/targets.ts b/server/routers/client/targets.ts index 7cd59133c..b2d49db4c 100644 --- a/server/routers/client/targets.ts +++ b/server/routers/client/targets.ts @@ -11,7 +11,7 @@ import logger from "@server/logger"; import { eq } from "drizzle-orm"; import semver from "semver"; -const NEWT_V2_TARGETS_VERSION = ">=1.11.0"; +const NEWT_V2_TARGETS_VERSION = ">=1.10.3"; export async function convertTargetsIfNessicary( newtId: string, From 7d8797840a1d9cc8d455fe858ff97d4f3b206e15 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 17:01:34 -0700 Subject: [PATCH 06/12] Add connection log --- server/cleanup.ts | 4 +- server/db/pg/schema/privateSchema.ts | 38 ++- server/db/sqlite/schema/privateSchema.ts | 36 ++- server/private/cleanup.ts | 4 +- server/private/routers/ws/messageHandlers.ts | 4 +- .../newt/handleConnectionLogMessage.ts | 302 ++++++++++++++++++ server/routers/newt/index.ts | 1 + 7 files changed, 384 insertions(+), 5 deletions(-) create mode 100644 server/routers/newt/handleConnectionLogMessage.ts diff --git a/server/cleanup.ts b/server/cleanup.ts index 137654827..7366bb876 100644 --- a/server/cleanup.ts +++ b/server/cleanup.ts @@ -1,9 +1,11 @@ import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; +import { flushConnectionLogToDb } from "@server/routers/newt/handleConnectionLogMessage"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; import { cleanup as wsCleanup } from "#dynamic/routers/ws"; async function cleanup() { await flushBandwidthToDb(); + await flushConnectionLogToDb(); await flushSiteBandwidthToDb(); await wsCleanup(); @@ -14,4 +16,4 @@ export async function initCleanup() { // Handle process termination process.on("SIGTERM", () => cleanup()); process.on("SIGINT", () => cleanup()); -} \ No newline at end of file +} diff --git a/server/db/pg/schema/privateSchema.ts b/server/db/pg/schema/privateSchema.ts index c9d7cc907..8fed6462a 100644 --- a/server/db/pg/schema/privateSchema.ts +++ b/server/db/pg/schema/privateSchema.ts @@ -17,7 +17,9 @@ import { users, exitNodes, sessions, - clients + clients, + siteResources, + sites } from "./schema"; export const certificates = pgTable("certificates", { @@ -302,6 +304,39 @@ export const accessAuditLog = pgTable( ] ); +export const connectionAuditLog = pgTable( + "connectionAuditLog", + { + id: serial("id").primaryKey(), + sessionId: text("sessionId").notNull(), + siteResourceId: integer("siteResourceId").references( + () => siteResources.siteResourceId, + { onDelete: "cascade" } + ), + orgId: text("orgId").references(() => orgs.orgId, { + onDelete: "cascade" + }), + siteId: integer("siteId").references(() => sites.siteId, { + onDelete: "cascade" + }), + sourceAddr: text("sourceAddr").notNull(), + destAddr: text("destAddr").notNull(), + protocol: text("protocol").notNull(), + startedAt: integer("startedAt").notNull(), + endedAt: integer("endedAt"), + bytesTx: integer("bytesTx"), + bytesRx: integer("bytesRx") + }, + (table) => [ + index("idx_accessAuditLog_startedAt").on(table.startedAt), + index("idx_accessAuditLog_org_startedAt").on( + table.orgId, + table.startedAt + ), + index("idx_accessAuditLog_siteResourceId").on(table.siteResourceId) + ] +); + export const approvals = pgTable("approvals", { approvalId: serial("approvalId").primaryKey(), timestamp: integer("timestamp").notNull(), // this is EPOCH time in seconds @@ -357,3 +392,4 @@ export type LoginPage = InferSelectModel; export type LoginPageBranding = InferSelectModel; export type ActionAuditLog = InferSelectModel; export type AccessAuditLog = InferSelectModel; +export type ConnectionAuditLog = InferSelectModel; diff --git a/server/db/sqlite/schema/privateSchema.ts b/server/db/sqlite/schema/privateSchema.ts index 8baeb5220..ecc386ea6 100644 --- a/server/db/sqlite/schema/privateSchema.ts +++ b/server/db/sqlite/schema/privateSchema.ts @@ -6,7 +6,7 @@ import { sqliteTable, text } from "drizzle-orm/sqlite-core"; -import { clients, domains, exitNodes, orgs, sessions, users } from "./schema"; +import { clients, domains, exitNodes, orgs, sessions, siteResources, sites, users } from "./schema"; export const certificates = sqliteTable("certificates", { certId: integer("certId").primaryKey({ autoIncrement: true }), @@ -294,6 +294,39 @@ export const accessAuditLog = sqliteTable( ] ); +export const connectionAuditLog = sqliteTable( + "connectionAuditLog", + { + id: integer("id").primaryKey({ autoIncrement: true }), + sessionId: text("sessionId").notNull(), + siteResourceId: integer("siteResourceId").references( + () => siteResources.siteResourceId, + { onDelete: "cascade" } + ), + orgId: text("orgId").references(() => orgs.orgId, { + onDelete: "cascade" + }), + siteId: integer("siteId").references(() => sites.siteId, { + onDelete: "cascade" + }), + sourceAddr: text("sourceAddr").notNull(), + destAddr: text("destAddr").notNull(), + protocol: text("protocol").notNull(), + startedAt: integer("startedAt").notNull(), + endedAt: integer("endedAt"), + bytesTx: integer("bytesTx"), + bytesRx: integer("bytesRx") + }, + (table) => [ + index("idx_accessAuditLog_startedAt").on(table.startedAt), + index("idx_accessAuditLog_org_startedAt").on( + table.orgId, + table.startedAt + ), + index("idx_accessAuditLog_siteResourceId").on(table.siteResourceId) + ] +); + export const approvals = sqliteTable("approvals", { approvalId: integer("approvalId").primaryKey({ autoIncrement: true }), timestamp: integer("timestamp").notNull(), // this is EPOCH time in seconds @@ -348,3 +381,4 @@ export type LoginPage = InferSelectModel; export type LoginPageBranding = InferSelectModel; export type ActionAuditLog = InferSelectModel; export type AccessAuditLog = InferSelectModel; +export type ConnectionAuditLog = InferSelectModel; diff --git a/server/private/cleanup.ts b/server/private/cleanup.ts index 0bd9822dd..933c943ed 100644 --- a/server/private/cleanup.ts +++ b/server/private/cleanup.ts @@ -14,10 +14,12 @@ import { rateLimitService } from "#private/lib/rateLimit"; import { cleanup as wsCleanup } from "#private/routers/ws"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; +import { flushConnectionLogToDb } from "@server/routers/newt/handleConnectionLogMessage"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; async function cleanup() { await flushBandwidthToDb(); + await flushConnectionLogToDb(); await flushSiteBandwidthToDb(); await rateLimitService.cleanup(); await wsCleanup(); @@ -29,4 +31,4 @@ export async function initCleanup() { // Handle process termination process.on("SIGTERM", () => cleanup()); process.on("SIGINT", () => cleanup()); -} \ No newline at end of file +} diff --git a/server/private/routers/ws/messageHandlers.ts b/server/private/routers/ws/messageHandlers.ts index d388ce40a..9e4622e2d 100644 --- a/server/private/routers/ws/messageHandlers.ts +++ b/server/private/routers/ws/messageHandlers.ts @@ -18,10 +18,12 @@ import { } from "#private/routers/remoteExitNode"; import { MessageHandler } from "@server/routers/ws"; import { build } from "@server/build"; +import { handleConnectionLogMessage } from "@server/routers/newt"; export const messageHandlers: Record = { "remoteExitNode/register": handleRemoteExitNodeRegisterMessage, - "remoteExitNode/ping": handleRemoteExitNodePingMessage + "remoteExitNode/ping": handleRemoteExitNodePingMessage, + "newt/access-log": handleConnectionLogMessage, }; if (build != "saas") { diff --git a/server/routers/newt/handleConnectionLogMessage.ts b/server/routers/newt/handleConnectionLogMessage.ts new file mode 100644 index 000000000..458470af7 --- /dev/null +++ b/server/routers/newt/handleConnectionLogMessage.ts @@ -0,0 +1,302 @@ +import { db } from "@server/db"; +import { MessageHandler } from "@server/routers/ws"; +import { connectionAuditLog, sites, Newt } from "@server/db"; +import { eq } from "drizzle-orm"; +import logger from "@server/logger"; +import { inflate } from "zlib"; +import { promisify } from "util"; + +const zlibInflate = promisify(inflate); + +// Retry configuration for deadlock handling +const MAX_RETRIES = 3; +const BASE_DELAY_MS = 50; + +// How often to flush accumulated connection log data to the database +const FLUSH_INTERVAL_MS = 30_000; // 30 seconds + +// Maximum number of records to buffer before forcing a flush +const MAX_BUFFERED_RECORDS = 500; + +// Maximum number of records to insert in a single batch +const INSERT_BATCH_SIZE = 100; + +interface ConnectionSessionData { + sessionId: string; + resourceId: number; + sourceAddr: string; + destAddr: string; + protocol: string; + startedAt: string; // ISO 8601 timestamp + endedAt?: string; // ISO 8601 timestamp + bytesTx?: number; + bytesRx?: number; +} + +interface ConnectionLogRecord { + sessionId: string; + siteResourceId: number; + orgId: string; + siteId: number; + sourceAddr: string; + destAddr: string; + protocol: string; + startedAt: number; // epoch seconds + endedAt: number | null; + bytesTx: number | null; + bytesRx: number | null; +} + +// In-memory buffer of records waiting to be flushed +let buffer: ConnectionLogRecord[] = []; + +/** + * Check if an error is a deadlock error + */ +function isDeadlockError(error: any): boolean { + return ( + error?.code === "40P01" || + error?.cause?.code === "40P01" || + (error?.message && error.message.includes("deadlock")) + ); +} + +/** + * Execute a function with retry logic for deadlock handling + */ +async function withDeadlockRetry( + operation: () => Promise, + context: string +): Promise { + let attempt = 0; + while (true) { + try { + return await operation(); + } catch (error: any) { + if (isDeadlockError(error) && attempt < MAX_RETRIES) { + attempt++; + const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; + const jitter = Math.random() * baseDelay; + const delay = baseDelay + jitter; + logger.warn( + `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + throw error; + } + } +} + +/** + * Decompress a base64-encoded zlib-compressed string into parsed JSON. + */ +async function decompressConnectionLog( + compressed: string +): Promise { + const compressedBuffer = Buffer.from(compressed, "base64"); + const decompressed = await zlibInflate(compressedBuffer); + const jsonString = decompressed.toString("utf-8"); + const parsed = JSON.parse(jsonString); + + if (!Array.isArray(parsed)) { + throw new Error("Decompressed connection log data is not an array"); + } + + return parsed; +} + +/** + * Convert an ISO 8601 timestamp string to epoch seconds. + * Returns null if the input is falsy. + */ +function toEpochSeconds(isoString: string | undefined | null): number | null { + if (!isoString) { + return null; + } + const ms = new Date(isoString).getTime(); + if (isNaN(ms)) { + return null; + } + return Math.floor(ms / 1000); +} + +/** + * Flush all buffered connection log records to the database. + * + * Swaps out the buffer before writing so that any records added during the + * flush are captured in the new buffer rather than being lost. Entries that + * fail to write are re-queued back into the buffer so they will be retried + * on the next flush. + * + * This function is exported so that the application's graceful-shutdown + * cleanup handler can call it before the process exits. + */ +export async function flushConnectionLogToDb(): Promise { + if (buffer.length === 0) { + return; + } + + // Atomically swap out the buffer so new data keeps flowing in + const snapshot = buffer; + buffer = []; + + logger.debug( + `Flushing ${snapshot.length} connection log record(s) to the database` + ); + + // Insert in batches to avoid overly large SQL statements + for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) { + const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE); + + try { + await withDeadlockRetry(async () => { + await db.insert(connectionAuditLog).values(batch); + }, `flush connection log batch (${batch.length} records)`); + } catch (error) { + logger.error( + `Failed to flush connection log batch of ${batch.length} records:`, + error + ); + + // Re-queue the failed batch so it is retried on the next flush + buffer = [...batch, ...buffer]; + + // Cap buffer to prevent unbounded growth if DB is unreachable + if (buffer.length > MAX_BUFFERED_RECORDS * 5) { + const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5; + buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5); + logger.warn( + `Connection log buffer overflow, dropped ${dropped} oldest records` + ); + } + + // Stop trying further batches from this snapshot — they'll be + // picked up by the next flush via the re-queued records above + const remaining = snapshot.slice(i + INSERT_BATCH_SIZE); + if (remaining.length > 0) { + buffer = [...remaining, ...buffer]; + } + break; + } + } +} + +const flushTimer = setInterval(async () => { + try { + await flushConnectionLogToDb(); + } catch (error) { + logger.error( + "Unexpected error during periodic connection log flush:", + error + ); + } +}, FLUSH_INTERVAL_MS); + +// Calling unref() means this timer will not keep the Node.js event loop alive +// on its own — the process can still exit normally when there is no other work +// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly +// before process.exit(), so no data is lost. +flushTimer.unref(); + +export const handleConnectionLogMessage: MessageHandler = async (context) => { + const { message, client } = context; + const newt = client as Newt; + + if (!newt) { + logger.warn("Connection log received but no newt client in context"); + return; + } + + if (!newt.siteId) { + logger.warn("Connection log received but newt has no siteId"); + return; + } + + if (!message.data?.compressed) { + logger.warn("Connection log message missing compressed data"); + return; + } + + // Look up the org for this site + const [site] = await db + .select({ orgId: sites.orgId }) + .from(sites) + .where(eq(sites.siteId, newt.siteId)); + + if (!site) { + logger.warn( + `Connection log received but site ${newt.siteId} not found in database` + ); + return; + } + + const orgId = site.orgId; + + let sessions: ConnectionSessionData[]; + try { + sessions = await decompressConnectionLog(message.data.compressed); + } catch (error) { + logger.error("Failed to decompress connection log data:", error); + return; + } + + if (sessions.length === 0) { + return; + } + + // Convert to DB records and add to the buffer + for (const session of sessions) { + // Validate required fields + if ( + !session.sessionId || + !session.resourceId || + !session.sourceAddr || + !session.destAddr || + !session.protocol + ) { + logger.debug( + `Skipping connection log session with missing required fields: ${JSON.stringify(session)}` + ); + continue; + } + + const startedAt = toEpochSeconds(session.startedAt); + if (startedAt === null) { + logger.debug( + `Skipping connection log session with invalid startedAt: ${session.startedAt}` + ); + continue; + } + + buffer.push({ + sessionId: session.sessionId, + siteResourceId: session.resourceId, + orgId, + siteId: newt.siteId, + sourceAddr: session.sourceAddr, + destAddr: session.destAddr, + protocol: session.protocol, + startedAt, + endedAt: toEpochSeconds(session.endedAt), + bytesTx: session.bytesTx ?? null, + bytesRx: session.bytesRx ?? null + }); + } + + logger.debug( + `Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})` + ); + + // If the buffer has grown large enough, trigger an immediate flush + if (buffer.length >= MAX_BUFFERED_RECORDS) { + // Fire and forget — errors are handled inside flushConnectionLogToDb + flushConnectionLogToDb().catch((error) => { + logger.error( + "Unexpected error during size-triggered connection log flush:", + error + ); + }); + } +}; diff --git a/server/routers/newt/index.ts b/server/routers/newt/index.ts index f31cd753b..63d1e1068 100644 --- a/server/routers/newt/index.ts +++ b/server/routers/newt/index.ts @@ -8,3 +8,4 @@ export * from "./handleNewtPingRequestMessage"; export * from "./handleApplyBlueprintMessage"; export * from "./handleNewtPingMessage"; export * from "./handleNewtDisconnectingMessage"; +export * from "./handleConnectionLogMessage"; From 0d4edcd1c79219f65f223cbdce179c3cf0cfcf1b Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 17:23:51 -0700 Subject: [PATCH 07/12] make private --- server/cleanup.ts | 2 +- server/db/pg/schema/schema.ts | 3 + server/db/sqlite/schema/schema.ts | 3 + server/lib/cleanupLogs.ts | 18 +- server/private/cleanup.ts | 2 +- .../newt/handleConnectionLogMessage.ts | 324 ++++++++++++++++++ server/private/routers/newt/index.ts | 1 + server/private/routers/ws/messageHandlers.ts | 2 +- .../newt/handleConnectionLogMessage.ts | 299 +--------------- 9 files changed, 354 insertions(+), 300 deletions(-) create mode 100644 server/private/routers/newt/handleConnectionLogMessage.ts create mode 100644 server/private/routers/newt/index.ts diff --git a/server/cleanup.ts b/server/cleanup.ts index 7366bb876..81cc31692 100644 --- a/server/cleanup.ts +++ b/server/cleanup.ts @@ -1,5 +1,5 @@ import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; -import { flushConnectionLogToDb } from "@server/routers/newt/handleConnectionLogMessage"; +import { flushConnectionLogToDb } from "#dynamic/routers/newt"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; import { cleanup as wsCleanup } from "#dynamic/routers/ws"; diff --git a/server/db/pg/schema/schema.ts b/server/db/pg/schema/schema.ts index b93c21fd6..de423945d 100644 --- a/server/db/pg/schema/schema.ts +++ b/server/db/pg/schema/schema.ts @@ -55,6 +55,9 @@ export const orgs = pgTable("orgs", { settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year .notNull() .default(0), + settingsLogRetentionDaysConnection: integer("settingsLogRetentionDaysConnection") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year + .notNull() + .default(0), sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format) sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format) isBillingOrg: boolean("isBillingOrg"), diff --git a/server/db/sqlite/schema/schema.ts b/server/db/sqlite/schema/schema.ts index 188caac2b..ba02cfb76 100644 --- a/server/db/sqlite/schema/schema.ts +++ b/server/db/sqlite/schema/schema.ts @@ -47,6 +47,9 @@ export const orgs = sqliteTable("orgs", { settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year .notNull() .default(0), + settingsLogRetentionDaysConnection: integer("settingsLogRetentionDaysConnection") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year + .notNull() + .default(0), sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format) sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format) isBillingOrg: integer("isBillingOrg", { mode: "boolean" }), diff --git a/server/lib/cleanupLogs.ts b/server/lib/cleanupLogs.ts index 8eb4ca77f..f5b6d8b2f 100644 --- a/server/lib/cleanupLogs.ts +++ b/server/lib/cleanupLogs.ts @@ -2,6 +2,7 @@ import { db, orgs } from "@server/db"; import { cleanUpOldLogs as cleanUpOldAccessLogs } from "#dynamic/lib/logAccessAudit"; import { cleanUpOldLogs as cleanUpOldActionLogs } from "#dynamic/middlewares/logActionAudit"; import { cleanUpOldLogs as cleanUpOldRequestLogs } from "@server/routers/badger/logRequestAudit"; +import { cleanUpOldLogs as cleanUpOldConnectionLogs } from "#dynamic/routers/newt"; import { gt, or } from "drizzle-orm"; import { cleanUpOldFingerprintSnapshots } from "@server/routers/olm/fingerprintingUtils"; import { build } from "@server/build"; @@ -20,14 +21,17 @@ export function initLogCleanupInterval() { settingsLogRetentionDaysAccess: orgs.settingsLogRetentionDaysAccess, settingsLogRetentionDaysRequest: - orgs.settingsLogRetentionDaysRequest + orgs.settingsLogRetentionDaysRequest, + settingsLogRetentionDaysConnection: + orgs.settingsLogRetentionDaysConnection }) .from(orgs) .where( or( gt(orgs.settingsLogRetentionDaysAction, 0), gt(orgs.settingsLogRetentionDaysAccess, 0), - gt(orgs.settingsLogRetentionDaysRequest, 0) + gt(orgs.settingsLogRetentionDaysRequest, 0), + gt(orgs.settingsLogRetentionDaysConnection, 0) ) ); @@ -37,7 +41,8 @@ export function initLogCleanupInterval() { orgId, settingsLogRetentionDaysAction, settingsLogRetentionDaysAccess, - settingsLogRetentionDaysRequest + settingsLogRetentionDaysRequest, + settingsLogRetentionDaysConnection } = org; if (settingsLogRetentionDaysAction > 0) { @@ -60,6 +65,13 @@ export function initLogCleanupInterval() { settingsLogRetentionDaysRequest ); } + + if (settingsLogRetentionDaysConnection > 0) { + await cleanUpOldConnectionLogs( + orgId, + settingsLogRetentionDaysConnection + ); + } } await cleanUpOldFingerprintSnapshots(365); diff --git a/server/private/cleanup.ts b/server/private/cleanup.ts index 933c943ed..4b12f1b3c 100644 --- a/server/private/cleanup.ts +++ b/server/private/cleanup.ts @@ -14,7 +14,7 @@ import { rateLimitService } from "#private/lib/rateLimit"; import { cleanup as wsCleanup } from "#private/routers/ws"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; -import { flushConnectionLogToDb } from "@server/routers/newt/handleConnectionLogMessage"; +import { flushConnectionLogToDb } from "#dynamic/routers/newt"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; async function cleanup() { diff --git a/server/private/routers/newt/handleConnectionLogMessage.ts b/server/private/routers/newt/handleConnectionLogMessage.ts new file mode 100644 index 000000000..7549ab37f --- /dev/null +++ b/server/private/routers/newt/handleConnectionLogMessage.ts @@ -0,0 +1,324 @@ +import { db, logsDb } from "@server/db"; +import { MessageHandler } from "@server/routers/ws"; +import { connectionAuditLog, sites, Newt } from "@server/db"; +import { and, eq, lt } from "drizzle-orm"; +import logger from "@server/logger"; +import { inflate } from "zlib"; +import { promisify } from "util"; +import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs"; + +const zlibInflate = promisify(inflate); + +// Retry configuration for deadlock handling +const MAX_RETRIES = 3; +const BASE_DELAY_MS = 50; + +// How often to flush accumulated connection log data to the database +const FLUSH_INTERVAL_MS = 30_000; // 30 seconds + +// Maximum number of records to buffer before forcing a flush +const MAX_BUFFERED_RECORDS = 500; + +// Maximum number of records to insert in a single batch +const INSERT_BATCH_SIZE = 100; + +interface ConnectionSessionData { + sessionId: string; + resourceId: number; + sourceAddr: string; + destAddr: string; + protocol: string; + startedAt: string; // ISO 8601 timestamp + endedAt?: string; // ISO 8601 timestamp + bytesTx?: number; + bytesRx?: number; +} + +interface ConnectionLogRecord { + sessionId: string; + siteResourceId: number; + orgId: string; + siteId: number; + sourceAddr: string; + destAddr: string; + protocol: string; + startedAt: number; // epoch seconds + endedAt: number | null; + bytesTx: number | null; + bytesRx: number | null; +} + +// In-memory buffer of records waiting to be flushed +let buffer: ConnectionLogRecord[] = []; + +/** + * Check if an error is a deadlock error + */ +function isDeadlockError(error: any): boolean { + return ( + error?.code === "40P01" || + error?.cause?.code === "40P01" || + (error?.message && error.message.includes("deadlock")) + ); +} + +/** + * Execute a function with retry logic for deadlock handling + */ +async function withDeadlockRetry( + operation: () => Promise, + context: string +): Promise { + let attempt = 0; + while (true) { + try { + return await operation(); + } catch (error: any) { + if (isDeadlockError(error) && attempt < MAX_RETRIES) { + attempt++; + const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; + const jitter = Math.random() * baseDelay; + const delay = baseDelay + jitter; + logger.warn( + `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + throw error; + } + } +} + +/** + * Decompress a base64-encoded zlib-compressed string into parsed JSON. + */ +async function decompressConnectionLog( + compressed: string +): Promise { + const compressedBuffer = Buffer.from(compressed, "base64"); + const decompressed = await zlibInflate(compressedBuffer); + const jsonString = decompressed.toString("utf-8"); + const parsed = JSON.parse(jsonString); + + if (!Array.isArray(parsed)) { + throw new Error("Decompressed connection log data is not an array"); + } + + return parsed; +} + +/** + * Convert an ISO 8601 timestamp string to epoch seconds. + * Returns null if the input is falsy. + */ +function toEpochSeconds(isoString: string | undefined | null): number | null { + if (!isoString) { + return null; + } + const ms = new Date(isoString).getTime(); + if (isNaN(ms)) { + return null; + } + return Math.floor(ms / 1000); +} + +/** + * Flush all buffered connection log records to the database. + * + * Swaps out the buffer before writing so that any records added during the + * flush are captured in the new buffer rather than being lost. Entries that + * fail to write are re-queued back into the buffer so they will be retried + * on the next flush. + * + * This function is exported so that the application's graceful-shutdown + * cleanup handler can call it before the process exits. + */ +export async function flushConnectionLogToDb(): Promise { + if (buffer.length === 0) { + return; + } + + // Atomically swap out the buffer so new data keeps flowing in + const snapshot = buffer; + buffer = []; + + logger.debug( + `Flushing ${snapshot.length} connection log record(s) to the database` + ); + + // Insert in batches to avoid overly large SQL statements + for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) { + const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE); + + try { + await withDeadlockRetry(async () => { + await logsDb.insert(connectionAuditLog).values(batch); + }, `flush connection log batch (${batch.length} records)`); + } catch (error) { + logger.error( + `Failed to flush connection log batch of ${batch.length} records:`, + error + ); + + // Re-queue the failed batch so it is retried on the next flush + buffer = [...batch, ...buffer]; + + // Cap buffer to prevent unbounded growth if DB is unreachable + if (buffer.length > MAX_BUFFERED_RECORDS * 5) { + const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5; + buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5); + logger.warn( + `Connection log buffer overflow, dropped ${dropped} oldest records` + ); + } + + // Stop trying further batches from this snapshot — they'll be + // picked up by the next flush via the re-queued records above + const remaining = snapshot.slice(i + INSERT_BATCH_SIZE); + if (remaining.length > 0) { + buffer = [...remaining, ...buffer]; + } + break; + } + } +} + +const flushTimer = setInterval(async () => { + try { + await flushConnectionLogToDb(); + } catch (error) { + logger.error( + "Unexpected error during periodic connection log flush:", + error + ); + } +}, FLUSH_INTERVAL_MS); + +// Calling unref() means this timer will not keep the Node.js event loop alive +// on its own — the process can still exit normally when there is no other work +// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly +// before process.exit(), so no data is lost. +flushTimer.unref(); + +export async function cleanUpOldLogs(orgId: string, retentionDays: number) { + const cutoffTimestamp = calculateCutoffTimestamp(retentionDays); + + try { + await logsDb + .delete(connectionAuditLog) + .where( + and( + lt(connectionAuditLog.startedAt, cutoffTimestamp), + eq(connectionAuditLog.orgId, orgId) + ) + ); + + // logger.debug( + // `Cleaned up connection audit logs older than ${retentionDays} days` + // ); + } catch (error) { + logger.error("Error cleaning up old connection audit logs:", error); + } +} + +export const handleConnectionLogMessage: MessageHandler = async (context) => { + const { message, client } = context; + const newt = client as Newt; + + if (!newt) { + logger.warn("Connection log received but no newt client in context"); + return; + } + + if (!newt.siteId) { + logger.warn("Connection log received but newt has no siteId"); + return; + } + + if (!message.data?.compressed) { + logger.warn("Connection log message missing compressed data"); + return; + } + + // Look up the org for this site + const [site] = await db + .select({ orgId: sites.orgId }) + .from(sites) + .where(eq(sites.siteId, newt.siteId)); + + if (!site) { + logger.warn( + `Connection log received but site ${newt.siteId} not found in database` + ); + return; + } + + const orgId = site.orgId; + + let sessions: ConnectionSessionData[]; + try { + sessions = await decompressConnectionLog(message.data.compressed); + } catch (error) { + logger.error("Failed to decompress connection log data:", error); + return; + } + + if (sessions.length === 0) { + return; + } + + // Convert to DB records and add to the buffer + for (const session of sessions) { + // Validate required fields + if ( + !session.sessionId || + !session.resourceId || + !session.sourceAddr || + !session.destAddr || + !session.protocol + ) { + logger.debug( + `Skipping connection log session with missing required fields: ${JSON.stringify(session)}` + ); + continue; + } + + const startedAt = toEpochSeconds(session.startedAt); + if (startedAt === null) { + logger.debug( + `Skipping connection log session with invalid startedAt: ${session.startedAt}` + ); + continue; + } + + buffer.push({ + sessionId: session.sessionId, + siteResourceId: session.resourceId, + orgId, + siteId: newt.siteId, + sourceAddr: session.sourceAddr, + destAddr: session.destAddr, + protocol: session.protocol, + startedAt, + endedAt: toEpochSeconds(session.endedAt), + bytesTx: session.bytesTx ?? null, + bytesRx: session.bytesRx ?? null + }); + } + + logger.debug( + `Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})` + ); + + // If the buffer has grown large enough, trigger an immediate flush + if (buffer.length >= MAX_BUFFERED_RECORDS) { + // Fire and forget — errors are handled inside flushConnectionLogToDb + flushConnectionLogToDb().catch((error) => { + logger.error( + "Unexpected error during size-triggered connection log flush:", + error + ); + }); + } +}; diff --git a/server/private/routers/newt/index.ts b/server/private/routers/newt/index.ts new file mode 100644 index 000000000..cc182cf7d --- /dev/null +++ b/server/private/routers/newt/index.ts @@ -0,0 +1 @@ +export * from "./handleConnectionLogMessage"; diff --git a/server/private/routers/ws/messageHandlers.ts b/server/private/routers/ws/messageHandlers.ts index 9e4622e2d..a3c9c5bdb 100644 --- a/server/private/routers/ws/messageHandlers.ts +++ b/server/private/routers/ws/messageHandlers.ts @@ -18,7 +18,7 @@ import { } from "#private/routers/remoteExitNode"; import { MessageHandler } from "@server/routers/ws"; import { build } from "@server/build"; -import { handleConnectionLogMessage } from "@server/routers/newt"; +import { handleConnectionLogMessage } from "#dynamic/routers/newt"; export const messageHandlers: Record = { "remoteExitNode/register": handleRemoteExitNodeRegisterMessage, diff --git a/server/routers/newt/handleConnectionLogMessage.ts b/server/routers/newt/handleConnectionLogMessage.ts index 458470af7..ca1b129d2 100644 --- a/server/routers/newt/handleConnectionLogMessage.ts +++ b/server/routers/newt/handleConnectionLogMessage.ts @@ -1,302 +1,13 @@ -import { db } from "@server/db"; import { MessageHandler } from "@server/routers/ws"; -import { connectionAuditLog, sites, Newt } from "@server/db"; -import { eq } from "drizzle-orm"; -import logger from "@server/logger"; -import { inflate } from "zlib"; -import { promisify } from "util"; -const zlibInflate = promisify(inflate); - -// Retry configuration for deadlock handling -const MAX_RETRIES = 3; -const BASE_DELAY_MS = 50; - -// How often to flush accumulated connection log data to the database -const FLUSH_INTERVAL_MS = 30_000; // 30 seconds - -// Maximum number of records to buffer before forcing a flush -const MAX_BUFFERED_RECORDS = 500; - -// Maximum number of records to insert in a single batch -const INSERT_BATCH_SIZE = 100; - -interface ConnectionSessionData { - sessionId: string; - resourceId: number; - sourceAddr: string; - destAddr: string; - protocol: string; - startedAt: string; // ISO 8601 timestamp - endedAt?: string; // ISO 8601 timestamp - bytesTx?: number; - bytesRx?: number; -} - -interface ConnectionLogRecord { - sessionId: string; - siteResourceId: number; - orgId: string; - siteId: number; - sourceAddr: string; - destAddr: string; - protocol: string; - startedAt: number; // epoch seconds - endedAt: number | null; - bytesTx: number | null; - bytesRx: number | null; -} - -// In-memory buffer of records waiting to be flushed -let buffer: ConnectionLogRecord[] = []; - -/** - * Check if an error is a deadlock error - */ -function isDeadlockError(error: any): boolean { - return ( - error?.code === "40P01" || - error?.cause?.code === "40P01" || - (error?.message && error.message.includes("deadlock")) - ); -} - -/** - * Execute a function with retry logic for deadlock handling - */ -async function withDeadlockRetry( - operation: () => Promise, - context: string -): Promise { - let attempt = 0; - while (true) { - try { - return await operation(); - } catch (error: any) { - if (isDeadlockError(error) && attempt < MAX_RETRIES) { - attempt++; - const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; - const jitter = Math.random() * baseDelay; - const delay = baseDelay + jitter; - logger.warn( - `Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` - ); - await new Promise((resolve) => setTimeout(resolve, delay)); - continue; - } - throw error; - } - } -} - -/** - * Decompress a base64-encoded zlib-compressed string into parsed JSON. - */ -async function decompressConnectionLog( - compressed: string -): Promise { - const compressedBuffer = Buffer.from(compressed, "base64"); - const decompressed = await zlibInflate(compressedBuffer); - const jsonString = decompressed.toString("utf-8"); - const parsed = JSON.parse(jsonString); - - if (!Array.isArray(parsed)) { - throw new Error("Decompressed connection log data is not an array"); - } - - return parsed; -} - -/** - * Convert an ISO 8601 timestamp string to epoch seconds. - * Returns null if the input is falsy. - */ -function toEpochSeconds(isoString: string | undefined | null): number | null { - if (!isoString) { - return null; - } - const ms = new Date(isoString).getTime(); - if (isNaN(ms)) { - return null; - } - return Math.floor(ms / 1000); -} - -/** - * Flush all buffered connection log records to the database. - * - * Swaps out the buffer before writing so that any records added during the - * flush are captured in the new buffer rather than being lost. Entries that - * fail to write are re-queued back into the buffer so they will be retried - * on the next flush. - * - * This function is exported so that the application's graceful-shutdown - * cleanup handler can call it before the process exits. - */ export async function flushConnectionLogToDb(): Promise { - if (buffer.length === 0) { - return; - } - - // Atomically swap out the buffer so new data keeps flowing in - const snapshot = buffer; - buffer = []; - - logger.debug( - `Flushing ${snapshot.length} connection log record(s) to the database` - ); - - // Insert in batches to avoid overly large SQL statements - for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) { - const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE); - - try { - await withDeadlockRetry(async () => { - await db.insert(connectionAuditLog).values(batch); - }, `flush connection log batch (${batch.length} records)`); - } catch (error) { - logger.error( - `Failed to flush connection log batch of ${batch.length} records:`, - error - ); - - // Re-queue the failed batch so it is retried on the next flush - buffer = [...batch, ...buffer]; - - // Cap buffer to prevent unbounded growth if DB is unreachable - if (buffer.length > MAX_BUFFERED_RECORDS * 5) { - const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5; - buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5); - logger.warn( - `Connection log buffer overflow, dropped ${dropped} oldest records` - ); - } - - // Stop trying further batches from this snapshot — they'll be - // picked up by the next flush via the re-queued records above - const remaining = snapshot.slice(i + INSERT_BATCH_SIZE); - if (remaining.length > 0) { - buffer = [...remaining, ...buffer]; - } - break; - } - } + return; } -const flushTimer = setInterval(async () => { - try { - await flushConnectionLogToDb(); - } catch (error) { - logger.error( - "Unexpected error during periodic connection log flush:", - error - ); - } -}, FLUSH_INTERVAL_MS); - -// Calling unref() means this timer will not keep the Node.js event loop alive -// on its own — the process can still exit normally when there is no other work -// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly -// before process.exit(), so no data is lost. -flushTimer.unref(); +export async function cleanUpOldLogs(orgId: string, retentionDays: number) { + return; +} export const handleConnectionLogMessage: MessageHandler = async (context) => { - const { message, client } = context; - const newt = client as Newt; - - if (!newt) { - logger.warn("Connection log received but no newt client in context"); - return; - } - - if (!newt.siteId) { - logger.warn("Connection log received but newt has no siteId"); - return; - } - - if (!message.data?.compressed) { - logger.warn("Connection log message missing compressed data"); - return; - } - - // Look up the org for this site - const [site] = await db - .select({ orgId: sites.orgId }) - .from(sites) - .where(eq(sites.siteId, newt.siteId)); - - if (!site) { - logger.warn( - `Connection log received but site ${newt.siteId} not found in database` - ); - return; - } - - const orgId = site.orgId; - - let sessions: ConnectionSessionData[]; - try { - sessions = await decompressConnectionLog(message.data.compressed); - } catch (error) { - logger.error("Failed to decompress connection log data:", error); - return; - } - - if (sessions.length === 0) { - return; - } - - // Convert to DB records and add to the buffer - for (const session of sessions) { - // Validate required fields - if ( - !session.sessionId || - !session.resourceId || - !session.sourceAddr || - !session.destAddr || - !session.protocol - ) { - logger.debug( - `Skipping connection log session with missing required fields: ${JSON.stringify(session)}` - ); - continue; - } - - const startedAt = toEpochSeconds(session.startedAt); - if (startedAt === null) { - logger.debug( - `Skipping connection log session with invalid startedAt: ${session.startedAt}` - ); - continue; - } - - buffer.push({ - sessionId: session.sessionId, - siteResourceId: session.resourceId, - orgId, - siteId: newt.siteId, - sourceAddr: session.sourceAddr, - destAddr: session.destAddr, - protocol: session.protocol, - startedAt, - endedAt: toEpochSeconds(session.endedAt), - bytesTx: session.bytesTx ?? null, - bytesRx: session.bytesRx ?? null - }); - } - - logger.debug( - `Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})` - ); - - // If the buffer has grown large enough, trigger an immediate flush - if (buffer.length >= MAX_BUFFERED_RECORDS) { - // Fire and forget — errors are handled inside flushConnectionLogToDb - flushConnectionLogToDb().catch((error) => { - logger.error( - "Unexpected error during size-triggered connection log flush:", - error - ); - }); - } + return; }; From fe40ea58c14a5eae05efdbdd64f6460d12566b4d Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 20:05:54 -0700 Subject: [PATCH 08/12] Source client info into schema --- server/db/pg/schema/privateSchema.ts | 6 ++ server/db/sqlite/schema/privateSchema.ts | 6 ++ .../newt/handleConnectionLogMessage.ts | 64 ++++++++++++++++++- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/server/db/pg/schema/privateSchema.ts b/server/db/pg/schema/privateSchema.ts index 8fed6462a..8d4e663df 100644 --- a/server/db/pg/schema/privateSchema.ts +++ b/server/db/pg/schema/privateSchema.ts @@ -319,6 +319,12 @@ export const connectionAuditLog = pgTable( siteId: integer("siteId").references(() => sites.siteId, { onDelete: "cascade" }), + clientId: integer("clientId").references(() => clients.clientId, { + onDelete: "cascade" + }), + userId: text("userId").references(() => users.userId, { + onDelete: "cascade" + }), sourceAddr: text("sourceAddr").notNull(), destAddr: text("destAddr").notNull(), protocol: text("protocol").notNull(), diff --git a/server/db/sqlite/schema/privateSchema.ts b/server/db/sqlite/schema/privateSchema.ts index ecc386ea6..f58b5cd18 100644 --- a/server/db/sqlite/schema/privateSchema.ts +++ b/server/db/sqlite/schema/privateSchema.ts @@ -309,6 +309,12 @@ export const connectionAuditLog = sqliteTable( siteId: integer("siteId").references(() => sites.siteId, { onDelete: "cascade" }), + clientId: integer("clientId").references(() => clients.clientId, { + onDelete: "cascade" + }), + userId: text("userId").references(() => users.userId, { + onDelete: "cascade" + }), sourceAddr: text("sourceAddr").notNull(), destAddr: text("destAddr").notNull(), protocol: text("protocol").notNull(), diff --git a/server/private/routers/newt/handleConnectionLogMessage.ts b/server/private/routers/newt/handleConnectionLogMessage.ts index 7549ab37f..164c14488 100644 --- a/server/private/routers/newt/handleConnectionLogMessage.ts +++ b/server/private/routers/newt/handleConnectionLogMessage.ts @@ -1,7 +1,7 @@ import { db, logsDb } from "@server/db"; import { MessageHandler } from "@server/routers/ws"; -import { connectionAuditLog, sites, Newt } from "@server/db"; -import { and, eq, lt } from "drizzle-orm"; +import { connectionAuditLog, sites, Newt, clients, orgs } from "@server/db"; +import { and, eq, lt, inArray } from "drizzle-orm"; import logger from "@server/logger"; import { inflate } from "zlib"; import { promisify } from "util"; @@ -39,6 +39,8 @@ interface ConnectionLogRecord { siteResourceId: number; orgId: string; siteId: number; + clientId: number | null; + userId: string | null; sourceAddr: string; destAddr: string; protocol: string; @@ -243,8 +245,9 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { // Look up the org for this site const [site] = await db - .select({ orgId: sites.orgId }) + .select({ orgId: sites.orgId, orgSubnet: orgs.subnet }) .from(sites) + .innerJoin(orgs, eq(sites.orgId, orgs.orgId)) .where(eq(sites.siteId, newt.siteId)); if (!site) { @@ -256,6 +259,12 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { const orgId = site.orgId; + // Extract the CIDR suffix (e.g. "/16") from the org subnet so we can + // reconstruct the exact subnet string stored on each client record. + const cidrSuffix = site.orgSubnet?.includes("/") + ? site.orgSubnet.substring(site.orgSubnet.indexOf("/")) + : null; + let sessions: ConnectionSessionData[]; try { sessions = await decompressConnectionLog(message.data.compressed); @@ -268,6 +277,48 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { return; } + // Build a map from sourceAddr → { clientId, userId } by querying clients + // whose subnet field matches exactly. Client subnets are stored with the + // org's CIDR suffix (e.g. "100.90.128.5/16"), so we reconstruct that from + // each unique sourceAddr + the org's CIDR suffix and do a targeted IN query. + const ipToClient = new Map(); + + if (cidrSuffix) { + // Collect unique source addresses so we only query for what we need + const uniqueSourceAddrs = new Set(); + for (const session of sessions) { + if (session.sourceAddr) { + uniqueSourceAddrs.add(session.sourceAddr); + } + } + + if (uniqueSourceAddrs.size > 0) { + // Construct the exact subnet strings as stored in the DB + const subnetQueries = Array.from(uniqueSourceAddrs).map( + (addr) => `${addr}${cidrSuffix}` + ); + + const matchedClients = await db + .select({ + clientId: clients.clientId, + userId: clients.userId, + subnet: clients.subnet + }) + .from(clients) + .where( + and( + eq(clients.orgId, orgId), + inArray(clients.subnet, subnetQueries) + ) + ); + + for (const c of matchedClients) { + const ip = c.subnet.split("/")[0]; + ipToClient.set(ip, { clientId: c.clientId, userId: c.userId }); + } + } + } + // Convert to DB records and add to the buffer for (const session of sessions) { // Validate required fields @@ -292,11 +343,18 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { continue; } + // Match the source address to a client. The sourceAddr is the + // client's IP on the WireGuard network, which corresponds to the IP + // portion of the client's subnet CIDR (e.g. "100.90.128.5/24"). + const clientInfo = ipToClient.get(session.sourceAddr) ?? null; + buffer.push({ sessionId: session.sessionId, siteResourceId: session.resourceId, orgId, siteId: newt.siteId, + clientId: clientInfo?.clientId ?? null, + userId: clientInfo?.userId ?? null, sourceAddr: session.sourceAddr, destAddr: session.destAddr, protocol: session.protocol, From 6471571bc66798a97cfd3a87fe54e509f8a81822 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 20:18:03 -0700 Subject: [PATCH 09/12] Add ui for connection logs --- messages/en-US.json | 6 + server/lib/billing/tierMatrix.ts | 2 + .../auditLogs/exportConnectionAuditLog.ts | 99 +++ server/private/routers/auditLogs/index.ts | 2 + .../auditLogs/queryConnectionAuditLog.ts | 378 +++++++++++ server/private/routers/external.ts | 19 + server/private/routers/integration.ts | 19 + server/routers/auditLogs/types.ts | 31 + .../[orgId]/settings/logs/connection/page.tsx | 630 ++++++++++++++++++ src/app/navigation.tsx | 6 + 10 files changed, 1192 insertions(+) create mode 100644 server/private/routers/auditLogs/exportConnectionAuditLog.ts create mode 100644 server/private/routers/auditLogs/queryConnectionAuditLog.ts create mode 100644 src/app/[orgId]/settings/logs/connection/page.tsx diff --git a/messages/en-US.json b/messages/en-US.json index 895ee1332..3be427ee0 100644 --- a/messages/en-US.json +++ b/messages/en-US.json @@ -2345,6 +2345,12 @@ "logRetentionEndOfFollowingYear": "End of following year", "actionLogsDescription": "View a history of actions performed in this organization", "accessLogsDescription": "View access auth requests for resources in this organization", + "connectionLogs": "Connection Logs", + "connectionLogsDescription": "View connection logs for tunnels in this organization", + "sidebarLogsConnection": "Connection Logs", + "sourceAddress": "Source Address", + "destinationAddress": "Destination Address", + "duration": "Duration", "licenseRequiredToUse": "An Enterprise Edition license or Pangolin Cloud is required to use this feature. Book a demo or POC trial.", "ossEnterpriseEditionRequired": "The Enterprise Edition is required to use this feature. This feature is also available in Pangolin Cloud. Book a demo or POC trial.", "certResolver": "Certificate Resolver", diff --git a/server/lib/billing/tierMatrix.ts b/server/lib/billing/tierMatrix.ts index c08bcea71..f8a0cd2f5 100644 --- a/server/lib/billing/tierMatrix.ts +++ b/server/lib/billing/tierMatrix.ts @@ -8,6 +8,7 @@ export enum TierFeature { LogExport = "logExport", AccessLogs = "accessLogs", // set the retention period to none on downgrade ActionLogs = "actionLogs", // set the retention period to none on downgrade + ConnectionLogs = "connectionLogs", RotateCredentials = "rotateCredentials", MaintencePage = "maintencePage", // handle downgrade DevicePosture = "devicePosture", @@ -26,6 +27,7 @@ export const tierMatrix: Record = { [TierFeature.LogExport]: ["tier3", "enterprise"], [TierFeature.AccessLogs]: ["tier2", "tier3", "enterprise"], [TierFeature.ActionLogs]: ["tier2", "tier3", "enterprise"], + [TierFeature.ConnectionLogs]: ["tier2", "tier3", "enterprise"], [TierFeature.RotateCredentials]: ["tier1", "tier2", "tier3", "enterprise"], [TierFeature.MaintencePage]: ["tier1", "tier2", "tier3", "enterprise"], [TierFeature.DevicePosture]: ["tier2", "tier3", "enterprise"], diff --git a/server/private/routers/auditLogs/exportConnectionAuditLog.ts b/server/private/routers/auditLogs/exportConnectionAuditLog.ts new file mode 100644 index 000000000..9349528ad --- /dev/null +++ b/server/private/routers/auditLogs/exportConnectionAuditLog.ts @@ -0,0 +1,99 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { registry } from "@server/openApi"; +import { NextFunction } from "express"; +import { Request, Response } from "express"; +import { OpenAPITags } from "@server/openApi"; +import createHttpError from "http-errors"; +import HttpCode from "@server/types/HttpCode"; +import { fromError } from "zod-validation-error"; +import logger from "@server/logger"; +import { + queryConnectionAuditLogsParams, + queryConnectionAuditLogsQuery, + queryConnection, + countConnectionQuery +} from "./queryConnectionAuditLog"; +import { generateCSV } from "@server/routers/auditLogs/generateCSV"; +import { MAX_EXPORT_LIMIT } from "@server/routers/auditLogs"; + +registry.registerPath({ + method: "get", + path: "/org/{orgId}/logs/connection/export", + description: "Export the connection audit log for an organization as CSV", + tags: [OpenAPITags.Logs], + request: { + query: queryConnectionAuditLogsQuery, + params: queryConnectionAuditLogsParams + }, + responses: {} +}); + +export async function exportConnectionAuditLogs( + req: Request, + res: Response, + next: NextFunction +): Promise { + try { + const parsedQuery = queryConnectionAuditLogsQuery.safeParse(req.query); + if (!parsedQuery.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedQuery.error) + ) + ); + } + + const parsedParams = queryConnectionAuditLogsParams.safeParse(req.params); + if (!parsedParams.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedParams.error) + ) + ); + } + + const data = { ...parsedQuery.data, ...parsedParams.data }; + const [{ count }] = await countConnectionQuery(data); + if (count > MAX_EXPORT_LIMIT) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + `Export limit exceeded. Your selection contains ${count} rows, but the maximum is ${MAX_EXPORT_LIMIT} rows. Please select a shorter time range to reduce the data.` + ) + ); + } + + const baseQuery = queryConnection(data); + + const log = await baseQuery.limit(data.limit).offset(data.offset); + + const csvData = generateCSV(log); + + res.setHeader("Content-Type", "text/csv"); + res.setHeader( + "Content-Disposition", + `attachment; filename="connection-audit-logs-${data.orgId}-${Date.now()}.csv"` + ); + + return res.send(csvData); + } catch (error) { + logger.error(error); + return next( + createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred") + ); + } +} \ No newline at end of file diff --git a/server/private/routers/auditLogs/index.ts b/server/private/routers/auditLogs/index.ts index e1849a617..122455fea 100644 --- a/server/private/routers/auditLogs/index.ts +++ b/server/private/routers/auditLogs/index.ts @@ -15,3 +15,5 @@ export * from "./queryActionAuditLog"; export * from "./exportActionAuditLog"; export * from "./queryAccessAuditLog"; export * from "./exportAccessAuditLog"; +export * from "./queryConnectionAuditLog"; +export * from "./exportConnectionAuditLog"; diff --git a/server/private/routers/auditLogs/queryConnectionAuditLog.ts b/server/private/routers/auditLogs/queryConnectionAuditLog.ts new file mode 100644 index 000000000..f321444cd --- /dev/null +++ b/server/private/routers/auditLogs/queryConnectionAuditLog.ts @@ -0,0 +1,378 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import { + connectionAuditLog, + logsDb, + siteResources, + sites, + clients, + primaryDb +} from "@server/db"; +import { registry } from "@server/openApi"; +import { NextFunction } from "express"; +import { Request, Response } from "express"; +import { eq, gt, lt, and, count, desc, inArray } from "drizzle-orm"; +import { OpenAPITags } from "@server/openApi"; +import { z } from "zod"; +import createHttpError from "http-errors"; +import HttpCode from "@server/types/HttpCode"; +import { fromError } from "zod-validation-error"; +import { QueryConnectionAuditLogResponse } from "@server/routers/auditLogs/types"; +import response from "@server/lib/response"; +import logger from "@server/logger"; +import { getSevenDaysAgo } from "@app/lib/getSevenDaysAgo"; + +export const queryConnectionAuditLogsQuery = z.object({ + // iso string just validate its a parseable date + timeStart: z + .string() + .refine((val) => !isNaN(Date.parse(val)), { + error: "timeStart must be a valid ISO date string" + }) + .transform((val) => Math.floor(new Date(val).getTime() / 1000)) + .prefault(() => getSevenDaysAgo().toISOString()) + .openapi({ + type: "string", + format: "date-time", + description: + "Start time as ISO date string (defaults to 7 days ago)" + }), + timeEnd: z + .string() + .refine((val) => !isNaN(Date.parse(val)), { + error: "timeEnd must be a valid ISO date string" + }) + .transform((val) => Math.floor(new Date(val).getTime() / 1000)) + .optional() + .prefault(() => new Date().toISOString()) + .openapi({ + type: "string", + format: "date-time", + description: + "End time as ISO date string (defaults to current time)" + }), + protocol: z.string().optional(), + sourceAddr: z.string().optional(), + destAddr: z.string().optional(), + clientId: z + .string() + .optional() + .transform(Number) + .pipe(z.int().positive()) + .optional(), + siteId: z + .string() + .optional() + .transform(Number) + .pipe(z.int().positive()) + .optional(), + siteResourceId: z + .string() + .optional() + .transform(Number) + .pipe(z.int().positive()) + .optional(), + userId: z.string().optional(), + limit: z + .string() + .optional() + .default("1000") + .transform(Number) + .pipe(z.int().positive()), + offset: z + .string() + .optional() + .default("0") + .transform(Number) + .pipe(z.int().nonnegative()) +}); + +export const queryConnectionAuditLogsParams = z.object({ + orgId: z.string() +}); + +export const queryConnectionAuditLogsCombined = + queryConnectionAuditLogsQuery.merge(queryConnectionAuditLogsParams); +type Q = z.infer; + +function getWhere(data: Q) { + return and( + gt(connectionAuditLog.startedAt, data.timeStart), + lt(connectionAuditLog.startedAt, data.timeEnd), + eq(connectionAuditLog.orgId, data.orgId), + data.protocol + ? eq(connectionAuditLog.protocol, data.protocol) + : undefined, + data.sourceAddr + ? eq(connectionAuditLog.sourceAddr, data.sourceAddr) + : undefined, + data.destAddr + ? eq(connectionAuditLog.destAddr, data.destAddr) + : undefined, + data.clientId + ? eq(connectionAuditLog.clientId, data.clientId) + : undefined, + data.siteId + ? eq(connectionAuditLog.siteId, data.siteId) + : undefined, + data.siteResourceId + ? eq(connectionAuditLog.siteResourceId, data.siteResourceId) + : undefined, + data.userId + ? eq(connectionAuditLog.userId, data.userId) + : undefined + ); +} + +export function queryConnection(data: Q) { + return logsDb + .select({ + sessionId: connectionAuditLog.sessionId, + siteResourceId: connectionAuditLog.siteResourceId, + orgId: connectionAuditLog.orgId, + siteId: connectionAuditLog.siteId, + clientId: connectionAuditLog.clientId, + userId: connectionAuditLog.userId, + sourceAddr: connectionAuditLog.sourceAddr, + destAddr: connectionAuditLog.destAddr, + protocol: connectionAuditLog.protocol, + startedAt: connectionAuditLog.startedAt, + endedAt: connectionAuditLog.endedAt, + bytesTx: connectionAuditLog.bytesTx, + bytesRx: connectionAuditLog.bytesRx + }) + .from(connectionAuditLog) + .where(getWhere(data)) + .orderBy( + desc(connectionAuditLog.startedAt), + desc(connectionAuditLog.id) + ); +} + +export function countConnectionQuery(data: Q) { + const countQuery = logsDb + .select({ count: count() }) + .from(connectionAuditLog) + .where(getWhere(data)); + return countQuery; +} + +async function enrichWithDetails( + logs: Awaited> +) { + // Collect unique IDs from logs + const siteResourceIds = [ + ...new Set( + logs + .map((log) => log.siteResourceId) + .filter((id): id is number => id !== null && id !== undefined) + ) + ]; + const siteIds = [ + ...new Set( + logs + .map((log) => log.siteId) + .filter((id): id is number => id !== null && id !== undefined) + ) + ]; + const clientIds = [ + ...new Set( + logs + .map((log) => log.clientId) + .filter((id): id is number => id !== null && id !== undefined) + ) + ]; + + // Fetch resource details from main database + const resourceMap = new Map< + number, + { name: string; niceId: string } + >(); + if (siteResourceIds.length > 0) { + const resourceDetails = await primaryDb + .select({ + siteResourceId: siteResources.siteResourceId, + name: siteResources.name, + niceId: siteResources.niceId + }) + .from(siteResources) + .where(inArray(siteResources.siteResourceId, siteResourceIds)); + + for (const r of resourceDetails) { + resourceMap.set(r.siteResourceId, { + name: r.name, + niceId: r.niceId + }); + } + } + + // Fetch site details from main database + const siteMap = new Map(); + if (siteIds.length > 0) { + const siteDetails = await primaryDb + .select({ + siteId: sites.siteId, + name: sites.name, + niceId: sites.niceId + }) + .from(sites) + .where(inArray(sites.siteId, siteIds)); + + for (const s of siteDetails) { + siteMap.set(s.siteId, { name: s.name, niceId: s.niceId }); + } + } + + // Fetch client details from main database + const clientMap = new Map(); + if (clientIds.length > 0) { + const clientDetails = await primaryDb + .select({ + clientId: clients.clientId, + name: clients.name + }) + .from(clients) + .where(inArray(clients.clientId, clientIds)); + + for (const c of clientDetails) { + clientMap.set(c.clientId, { name: c.name }); + } + } + + // Enrich logs with details + return logs.map((log) => ({ + ...log, + resourceName: log.siteResourceId + ? resourceMap.get(log.siteResourceId)?.name ?? null + : null, + resourceNiceId: log.siteResourceId + ? resourceMap.get(log.siteResourceId)?.niceId ?? null + : null, + siteName: log.siteId + ? siteMap.get(log.siteId)?.name ?? null + : null, + siteNiceId: log.siteId + ? siteMap.get(log.siteId)?.niceId ?? null + : null, + clientName: log.clientId + ? clientMap.get(log.clientId)?.name ?? null + : null + })); +} + +async function queryUniqueFilterAttributes( + timeStart: number, + timeEnd: number, + orgId: string +) { + const baseConditions = and( + gt(connectionAuditLog.startedAt, timeStart), + lt(connectionAuditLog.startedAt, timeEnd), + eq(connectionAuditLog.orgId, orgId) + ); + + // Get unique protocols + const uniqueProtocols = await logsDb + .selectDistinct({ + protocol: connectionAuditLog.protocol + }) + .from(connectionAuditLog) + .where(baseConditions); + + return { + protocols: uniqueProtocols + .map((row) => row.protocol) + .filter((protocol): protocol is string => protocol !== null) + }; +} + +registry.registerPath({ + method: "get", + path: "/org/{orgId}/logs/connection", + description: "Query the connection audit log for an organization", + tags: [OpenAPITags.Logs], + request: { + query: queryConnectionAuditLogsQuery, + params: queryConnectionAuditLogsParams + }, + responses: {} +}); + +export async function queryConnectionAuditLogs( + req: Request, + res: Response, + next: NextFunction +): Promise { + try { + const parsedQuery = queryConnectionAuditLogsQuery.safeParse(req.query); + if (!parsedQuery.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedQuery.error) + ) + ); + } + const parsedParams = queryConnectionAuditLogsParams.safeParse( + req.params + ); + if (!parsedParams.success) { + return next( + createHttpError( + HttpCode.BAD_REQUEST, + fromError(parsedParams.error) + ) + ); + } + + const data = { ...parsedQuery.data, ...parsedParams.data }; + + const baseQuery = queryConnection(data); + + const logsRaw = await baseQuery.limit(data.limit).offset(data.offset); + + // Enrich with resource, site, and client details + const log = await enrichWithDetails(logsRaw); + + const totalCountResult = await countConnectionQuery(data); + const totalCount = totalCountResult[0].count; + + const filterAttributes = await queryUniqueFilterAttributes( + data.timeStart, + data.timeEnd, + data.orgId + ); + + return response(res, { + data: { + log: log, + pagination: { + total: totalCount, + limit: data.limit, + offset: data.offset + }, + filterAttributes + }, + success: true, + error: false, + message: "Connection audit logs retrieved successfully", + status: HttpCode.OK + }); + } catch (error) { + logger.error(error); + return next( + createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred") + ); + } +} \ No newline at end of file diff --git a/server/private/routers/external.ts b/server/private/routers/external.ts index df8ea8cbb..f06ad4517 100644 --- a/server/private/routers/external.ts +++ b/server/private/routers/external.ts @@ -478,6 +478,25 @@ authenticated.get( logs.exportAccessAuditLogs ); +authenticated.get( + "/org/:orgId/logs/connection", + verifyValidLicense, + verifyValidSubscription(tierMatrix.connectionLogs), + verifyOrgAccess, + verifyUserHasAction(ActionsEnum.exportLogs), + logs.queryConnectionAuditLogs +); + +authenticated.get( + "/org/:orgId/logs/connection/export", + verifyValidLicense, + verifyValidSubscription(tierMatrix.logExport), + verifyOrgAccess, + verifyUserHasAction(ActionsEnum.exportLogs), + logActionAudit(ActionsEnum.exportLogs), + logs.exportConnectionAuditLogs +); + authenticated.post( "/re-key/:clientId/regenerate-client-secret", verifyClientAccess, // this is first to set the org id diff --git a/server/private/routers/integration.ts b/server/private/routers/integration.ts index 97b1adade..c17835025 100644 --- a/server/private/routers/integration.ts +++ b/server/private/routers/integration.ts @@ -91,6 +91,25 @@ authenticated.get( logs.exportAccessAuditLogs ); +authenticated.get( + "/org/:orgId/logs/connection", + verifyValidLicense, + verifyValidSubscription(tierMatrix.connectionLogs), + verifyApiKeyOrgAccess, + verifyApiKeyHasAction(ActionsEnum.exportLogs), + logs.queryConnectionAuditLogs +); + +authenticated.get( + "/org/:orgId/logs/connection/export", + verifyValidLicense, + verifyValidSubscription(tierMatrix.logExport), + verifyApiKeyOrgAccess, + verifyApiKeyHasAction(ActionsEnum.exportLogs), + logActionAudit(ActionsEnum.exportLogs), + logs.exportConnectionAuditLogs +); + authenticated.put( "/org/:orgId/idp/oidc", verifyValidLicense, diff --git a/server/routers/auditLogs/types.ts b/server/routers/auditLogs/types.ts index 474aa9261..20e11e17b 100644 --- a/server/routers/auditLogs/types.ts +++ b/server/routers/auditLogs/types.ts @@ -91,3 +91,34 @@ export type QueryAccessAuditLogResponse = { locations: string[]; }; }; + +export type QueryConnectionAuditLogResponse = { + log: { + sessionId: string; + siteResourceId: number | null; + orgId: string | null; + siteId: number | null; + clientId: number | null; + userId: string | null; + sourceAddr: string; + destAddr: string; + protocol: string; + startedAt: number; + endedAt: number | null; + bytesTx: number | null; + bytesRx: number | null; + resourceName: string | null; + resourceNiceId: string | null; + siteName: string | null; + siteNiceId: string | null; + clientName: string | null; + }[]; + pagination: { + total: number; + limit: number; + offset: number; + }; + filterAttributes: { + protocols: string[]; + }; +}; diff --git a/src/app/[orgId]/settings/logs/connection/page.tsx b/src/app/[orgId]/settings/logs/connection/page.tsx new file mode 100644 index 000000000..737b1efd7 --- /dev/null +++ b/src/app/[orgId]/settings/logs/connection/page.tsx @@ -0,0 +1,630 @@ +"use client"; +import { ColumnFilter } from "@app/components/ColumnFilter"; +import { DateTimeValue } from "@app/components/DateTimePicker"; +import { LogDataTable } from "@app/components/LogDataTable"; +import { PaidFeaturesAlert } from "@app/components/PaidFeaturesAlert"; +import SettingsSectionTitle from "@app/components/SettingsSectionTitle"; +import { useEnvContext } from "@app/hooks/useEnvContext"; +import { usePaidStatus } from "@app/hooks/usePaidStatus"; +import { useStoredPageSize } from "@app/hooks/useStoredPageSize"; +import { toast } from "@app/hooks/useToast"; +import { createApiClient } from "@app/lib/api"; +import { getSevenDaysAgo } from "@app/lib/getSevenDaysAgo"; +import { build } from "@server/build"; +import { tierMatrix } from "@server/lib/billing/tierMatrix"; +import { ColumnDef } from "@tanstack/react-table"; +import axios from "axios"; +import { Cable, Monitor, Server } from "lucide-react"; +import { useTranslations } from "next-intl"; +import { useParams, useRouter, useSearchParams } from "next/navigation"; +import { useEffect, useState, useTransition } from "react"; + +function formatBytes(bytes: number | null): string { + if (bytes === null || bytes === undefined) return "—"; + if (bytes === 0) return "0 B"; + const units = ["B", "KB", "MB", "GB", "TB"]; + const i = Math.floor(Math.log(bytes) / Math.log(1024)); + const value = bytes / Math.pow(1024, i); + return `${value.toFixed(i === 0 ? 0 : 1)} ${units[i]}`; +} + +function formatDuration(startedAt: number, endedAt: number | null): string { + if (endedAt === null || endedAt === undefined) return "Active"; + const durationSec = endedAt - startedAt; + if (durationSec < 0) return "—"; + if (durationSec < 60) return `${durationSec}s`; + if (durationSec < 3600) { + const m = Math.floor(durationSec / 60); + const s = durationSec % 60; + return `${m}m ${s}s`; + } + const h = Math.floor(durationSec / 3600); + const m = Math.floor((durationSec % 3600) / 60); + return `${h}h ${m}m`; +} + +export default function ConnectionLogsPage() { + const router = useRouter(); + const api = createApiClient(useEnvContext()); + const t = useTranslations(); + const { orgId } = useParams(); + const searchParams = useSearchParams(); + + const { isPaidUser } = usePaidStatus(); + + const [rows, setRows] = useState([]); + const [isRefreshing, setIsRefreshing] = useState(false); + const [isExporting, startTransition] = useTransition(); + const [filterAttributes, setFilterAttributes] = useState<{ + protocols: string[]; + }>({ + protocols: [] + }); + + // Filter states - unified object for all filters + const [filters, setFilters] = useState<{ + protocol?: string; + }>({ + protocol: searchParams.get("protocol") || undefined + }); + + // Pagination state + const [totalCount, setTotalCount] = useState(0); + const [currentPage, setCurrentPage] = useState(0); + const [isLoading, setIsLoading] = useState(false); + + // Initialize page size from storage or default + const [pageSize, setPageSize] = useStoredPageSize( + "connection-audit-logs", + 20 + ); + + // Set default date range to last 7 days + const getDefaultDateRange = () => { + // if the time is in the url params, use that instead + const startParam = searchParams.get("start"); + const endParam = searchParams.get("end"); + if (startParam && endParam) { + return { + startDate: { + date: new Date(startParam) + }, + endDate: { + date: new Date(endParam) + } + }; + } + + const now = new Date(); + const lastWeek = getSevenDaysAgo(); + + return { + startDate: { + date: lastWeek + }, + endDate: { + date: now + } + }; + }; + + const [dateRange, setDateRange] = useState<{ + startDate: DateTimeValue; + endDate: DateTimeValue; + }>(getDefaultDateRange()); + + // Trigger search with default values on component mount + useEffect(() => { + if (build === "oss") { + return; + } + const defaultRange = getDefaultDateRange(); + queryDateTime( + defaultRange.startDate, + defaultRange.endDate, + 0, + pageSize + ); + }, [orgId]); // Re-run if orgId changes + + const handleDateRangeChange = ( + startDate: DateTimeValue, + endDate: DateTimeValue + ) => { + setDateRange({ startDate, endDate }); + setCurrentPage(0); // Reset to first page when filtering + // put the search params in the url for the time + updateUrlParamsForAllFilters({ + start: startDate.date?.toISOString() || "", + end: endDate.date?.toISOString() || "" + }); + + queryDateTime(startDate, endDate, 0, pageSize); + }; + + // Handle page changes + const handlePageChange = (newPage: number) => { + setCurrentPage(newPage); + queryDateTime( + dateRange.startDate, + dateRange.endDate, + newPage, + pageSize + ); + }; + + // Handle page size changes + const handlePageSizeChange = (newPageSize: number) => { + setPageSize(newPageSize); + setCurrentPage(0); // Reset to first page when changing page size + queryDateTime(dateRange.startDate, dateRange.endDate, 0, newPageSize); + }; + + // Handle filter changes generically + const handleFilterChange = ( + filterType: keyof typeof filters, + value: string | undefined + ) => { + // Create new filters object with updated value + const newFilters = { + ...filters, + [filterType]: value + }; + + setFilters(newFilters); + setCurrentPage(0); // Reset to first page when filtering + + // Update URL params + updateUrlParamsForAllFilters(newFilters); + + // Trigger new query with updated filters (pass directly to avoid async state issues) + queryDateTime( + dateRange.startDate, + dateRange.endDate, + 0, + pageSize, + newFilters + ); + }; + + const updateUrlParamsForAllFilters = ( + newFilters: + | typeof filters + | { + start: string; + end: string; + } + ) => { + const params = new URLSearchParams(searchParams); + Object.entries(newFilters).forEach(([key, value]) => { + if (value) { + params.set(key, value); + } else { + params.delete(key); + } + }); + router.replace(`?${params.toString()}`, { scroll: false }); + }; + + const queryDateTime = async ( + startDate: DateTimeValue, + endDate: DateTimeValue, + page: number = currentPage, + size: number = pageSize, + filtersParam?: { + protocol?: string; + } + ) => { + console.log("Date range changed:", { startDate, endDate, page, size }); + if (!isPaidUser(tierMatrix.connectionLogs)) { + console.log( + "Access denied: subscription inactive or license locked" + ); + return; + } + setIsLoading(true); + + try { + // Use the provided filters or fall back to current state + const activeFilters = filtersParam || filters; + + // Convert the date/time values to API parameters + const params: any = { + limit: size, + offset: page * size, + ...activeFilters + }; + + if (startDate?.date) { + const startDateTime = new Date(startDate.date); + if (startDate.time) { + const [hours, minutes, seconds] = startDate.time + .split(":") + .map(Number); + startDateTime.setHours(hours, minutes, seconds || 0); + } + params.timeStart = startDateTime.toISOString(); + } + + if (endDate?.date) { + const endDateTime = new Date(endDate.date); + if (endDate.time) { + const [hours, minutes, seconds] = endDate.time + .split(":") + .map(Number); + endDateTime.setHours(hours, minutes, seconds || 0); + } else { + // If no time is specified, set to NOW + const now = new Date(); + endDateTime.setHours( + now.getHours(), + now.getMinutes(), + now.getSeconds(), + now.getMilliseconds() + ); + } + params.timeEnd = endDateTime.toISOString(); + } + + const res = await api.get(`/org/${orgId}/logs/connection`, { + params + }); + if (res.status === 200) { + setRows(res.data.data.log || []); + setTotalCount(res.data.data.pagination?.total || 0); + setFilterAttributes(res.data.data.filterAttributes); + console.log("Fetched connection logs:", res.data); + } + } catch (error) { + toast({ + title: t("error"), + description: t("Failed to filter logs"), + variant: "destructive" + }); + } finally { + setIsLoading(false); + } + }; + + const refreshData = async () => { + console.log("Data refreshed"); + setIsRefreshing(true); + try { + // Refresh data with current date range and pagination + await queryDateTime( + dateRange.startDate, + dateRange.endDate, + currentPage, + pageSize + ); + } catch (error) { + toast({ + title: t("error"), + description: t("refreshError"), + variant: "destructive" + }); + } finally { + setIsRefreshing(false); + } + }; + + const exportData = async () => { + try { + // Prepare query params for export + const params: any = { + timeStart: dateRange.startDate?.date + ? new Date(dateRange.startDate.date).toISOString() + : undefined, + timeEnd: dateRange.endDate?.date + ? new Date(dateRange.endDate.date).toISOString() + : undefined, + ...filters + }; + + const response = await api.get( + `/org/${orgId}/logs/connection/export`, + { + responseType: "blob", + params + } + ); + + // Create a URL for the blob and trigger a download + const url = window.URL.createObjectURL(new Blob([response.data])); + const link = document.createElement("a"); + link.href = url; + const epoch = Math.floor(Date.now() / 1000); + link.setAttribute( + "download", + `connection-audit-logs-${orgId}-${epoch}.csv` + ); + document.body.appendChild(link); + link.click(); + link.parentNode?.removeChild(link); + } catch (error) { + let apiErrorMessage: string | null = null; + if (axios.isAxiosError(error) && error.response) { + const data = error.response.data; + + if (data instanceof Blob && data.type === "application/json") { + // Parse the Blob as JSON + const text = await data.text(); + const errorData = JSON.parse(text); + apiErrorMessage = errorData.message; + } + } + toast({ + title: t("error"), + description: apiErrorMessage ?? t("exportError"), + variant: "destructive" + }); + } + }; + + const columns: ColumnDef[] = [ + { + accessorKey: "startedAt", + header: ({ column }) => { + return t("timestamp"); + }, + cell: ({ row }) => { + return ( +
+ {new Date( + row.original.startedAt * 1000 + ).toLocaleString()} +
+ ); + } + }, + { + accessorKey: "protocol", + header: ({ column }) => { + return ( +
+ {t("protocol")} + ({ + label: protocol.toUpperCase(), + value: protocol + }) + )} + selectedValue={filters.protocol} + onValueChange={(value) => + handleFilterChange("protocol", value) + } + searchPlaceholder="Search..." + emptyMessage="None found" + /> +
+ ); + }, + cell: ({ row }) => { + return ( + + {row.original.protocol?.toUpperCase()} + + ); + } + }, + { + accessorKey: "resourceName", + header: ({ column }) => { + return t("resource"); + }, + cell: ({ row }) => { + return ( + + {row.original.resourceName ?? "—"} + + ); + } + }, + { + accessorKey: "sourceAddr", + header: ({ column }) => { + return t("sourceAddress"); + }, + cell: ({ row }) => { + return ( + + {row.original.sourceAddr} + + ); + } + }, + { + accessorKey: "destAddr", + header: ({ column }) => { + return t("destinationAddress"); + }, + cell: ({ row }) => { + return ( + + {row.original.destAddr} + + ); + } + }, + { + accessorKey: "duration", + header: ({ column }) => { + return t("duration"); + }, + cell: ({ row }) => { + return ( + + {formatDuration( + row.original.startedAt, + row.original.endedAt + )} + + ); + } + } + ]; + + const renderExpandedRow = (row: any) => { + return ( +
+
+
+
+ + Connection Details +
+
+ Session ID:{" "} + + {row.sessionId ?? "—"} + +
+
+ Protocol:{" "} + {row.protocol?.toUpperCase() ?? "—"} +
+
+ Source:{" "} + + {row.sourceAddr ?? "—"} + +
+
+ Destination:{" "} + + {row.destAddr ?? "—"} + +
+
+ Started At:{" "} + {row.startedAt + ? new Date( + row.startedAt * 1000 + ).toLocaleString() + : "—"} +
+
+ Ended At:{" "} + {row.endedAt + ? new Date( + row.endedAt * 1000 + ).toLocaleString() + : "Active"} +
+
+ Duration:{" "} + {formatDuration(row.startedAt, row.endedAt)} +
+
+
+
+ + Resource & Site +
+
+ Resource:{" "} + {row.resourceName ?? "—"} + {row.resourceNiceId && ( + + ({row.resourceNiceId}) + + )} +
+
+ Site: {row.siteName ?? "—"} + {row.siteNiceId && ( + + ({row.siteNiceId}) + + )} +
+
+ Site ID: {row.siteId ?? "—"} +
+
+ Resource ID:{" "} + {row.siteResourceId ?? "—"} +
+
+
+
+ + Client & Transfer +
+
+ Client: {row.clientName ?? "—"} + {row.clientId && ( + + (ID: {row.clientId}) + + )} +
+
+ User ID: {row.userId ?? "—"} +
+
+ Bytes Sent (TX):{" "} + {formatBytes(row.bytesTx)} +
+
+ Bytes Received (RX):{" "} + {formatBytes(row.bytesRx)} +
+
+ Total Transfer:{" "} + {formatBytes( + (row.bytesTx ?? 0) + (row.bytesRx ?? 0) + )} +
+
+
+
+ ); + }; + + return ( + <> + + + + + startTransition(exportData)} + isExporting={isExporting} + onDateRangeChange={handleDateRangeChange} + dateRange={{ + start: dateRange.startDate, + end: dateRange.endDate + }} + defaultSort={{ + id: "startedAt", + desc: true + }} + // Server-side pagination props + totalCount={totalCount} + currentPage={currentPage} + pageSize={pageSize} + onPageChange={handlePageChange} + onPageSizeChange={handlePageSizeChange} + isLoading={isLoading} + // Row expansion props + expandable={true} + renderExpandedRow={renderExpandedRow} + disabled={ + !isPaidUser(tierMatrix.connectionLogs) || build === "oss" + } + /> + + ); +} \ No newline at end of file diff --git a/src/app/navigation.tsx b/src/app/navigation.tsx index 0066721db..0a09214e3 100644 --- a/src/app/navigation.tsx +++ b/src/app/navigation.tsx @@ -3,6 +3,7 @@ import { Env } from "@app/lib/types/env"; import { build } from "@server/build"; import { Building2, + Cable, ChartLine, Combine, CreditCard, @@ -189,6 +190,11 @@ export const orgNavSections = ( title: "sidebarLogsAction", href: "/{orgId}/settings/logs/action", icon: + }, + { + title: "sidebarLogsConnection", + href: "/{orgId}/settings/logs/connection", + icon: } ] : []) From 2c6e9507b55efdedcf88f77a054475b8dd9daa51 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 21:41:53 -0700 Subject: [PATCH 10/12] Connection log page working --- server/lib/ip.ts | 10 +- .../auditLogs/queryConnectionAuditLog.ts | 156 ++++++++++++++- .../newt/handleConnectionLogMessage.ts | 16 +- server/routers/auditLogs/types.ts | 16 ++ .../[orgId]/settings/logs/connection/page.tsx | 180 ++++++++++++++++-- 5 files changed, 349 insertions(+), 29 deletions(-) diff --git a/server/lib/ip.ts b/server/lib/ip.ts index 3a29b8661..7f829bcef 100644 --- a/server/lib/ip.ts +++ b/server/lib/ip.ts @@ -581,6 +581,7 @@ export type SubnetProxyTargetV2 = { max: number; protocol: "tcp" | "udp"; }[]; + resourceId?: number; }; export function generateSubnetProxyTargetV2( @@ -617,7 +618,8 @@ export function generateSubnetProxyTargetV2( sourcePrefixes: [], destPrefix: destination, portRange, - disableIcmp + disableIcmp, + resourceId: siteResource.siteResourceId, }; } @@ -628,7 +630,8 @@ export function generateSubnetProxyTargetV2( destPrefix: `${siteResource.aliasAddress}/32`, rewriteTo: destination, portRange, - disableIcmp + disableIcmp, + resourceId: siteResource.siteResourceId, }; } } else if (siteResource.mode == "cidr") { @@ -636,7 +639,8 @@ export function generateSubnetProxyTargetV2( sourcePrefixes: [], destPrefix: siteResource.destination, portRange, - disableIcmp + disableIcmp, + resourceId: siteResource.siteResourceId, }; } diff --git a/server/private/routers/auditLogs/queryConnectionAuditLog.ts b/server/private/routers/auditLogs/queryConnectionAuditLog.ts index f321444cd..b638ed488 100644 --- a/server/private/routers/auditLogs/queryConnectionAuditLog.ts +++ b/server/private/routers/auditLogs/queryConnectionAuditLog.ts @@ -17,6 +17,7 @@ import { siteResources, sites, clients, + users, primaryDb } from "@server/db"; import { registry } from "@server/openApi"; @@ -193,6 +194,13 @@ async function enrichWithDetails( .filter((id): id is number => id !== null && id !== undefined) ) ]; + const userIds = [ + ...new Set( + logs + .map((log) => log.userId) + .filter((id): id is string => id !== null && id !== undefined) + ) + ]; // Fetch resource details from main database const resourceMap = new Map< @@ -235,18 +243,46 @@ async function enrichWithDetails( } // Fetch client details from main database - const clientMap = new Map(); + const clientMap = new Map< + number, + { name: string; niceId: string; type: string } + >(); if (clientIds.length > 0) { const clientDetails = await primaryDb .select({ clientId: clients.clientId, - name: clients.name + name: clients.name, + niceId: clients.niceId, + type: clients.type }) .from(clients) .where(inArray(clients.clientId, clientIds)); for (const c of clientDetails) { - clientMap.set(c.clientId, { name: c.name }); + clientMap.set(c.clientId, { + name: c.name, + niceId: c.niceId, + type: c.type + }); + } + } + + // Fetch user details from main database + const userMap = new Map< + string, + { email: string | null } + >(); + if (userIds.length > 0) { + const userDetails = await primaryDb + .select({ + userId: users.userId, + email: users.email + }) + .from(users) + .where(inArray(users.userId, userIds)); + + for (const u of userDetails) { + userMap.set(u.userId, { email: u.email }); } } @@ -267,6 +303,15 @@ async function enrichWithDetails( : null, clientName: log.clientId ? clientMap.get(log.clientId)?.name ?? null + : null, + clientNiceId: log.clientId + ? clientMap.get(log.clientId)?.niceId ?? null + : null, + clientType: log.clientId + ? clientMap.get(log.clientId)?.type ?? null + : null, + userEmail: log.userId + ? userMap.get(log.userId)?.email ?? null : null })); } @@ -290,10 +335,111 @@ async function queryUniqueFilterAttributes( .from(connectionAuditLog) .where(baseConditions); + // Get unique destination addresses + const uniqueDestAddrs = await logsDb + .selectDistinct({ + destAddr: connectionAuditLog.destAddr + }) + .from(connectionAuditLog) + .where(baseConditions); + + // Get unique client IDs + const uniqueClients = await logsDb + .selectDistinct({ + clientId: connectionAuditLog.clientId + }) + .from(connectionAuditLog) + .where(baseConditions); + + // Get unique resource IDs + const uniqueResources = await logsDb + .selectDistinct({ + siteResourceId: connectionAuditLog.siteResourceId + }) + .from(connectionAuditLog) + .where(baseConditions); + + // Get unique user IDs + const uniqueUsers = await logsDb + .selectDistinct({ + userId: connectionAuditLog.userId + }) + .from(connectionAuditLog) + .where(baseConditions); + + // Enrich client IDs with names from main database + const clientIds = uniqueClients + .map((row) => row.clientId) + .filter((id): id is number => id !== null); + + let clientsWithNames: Array<{ id: number; name: string }> = []; + if (clientIds.length > 0) { + const clientDetails = await primaryDb + .select({ + clientId: clients.clientId, + name: clients.name + }) + .from(clients) + .where(inArray(clients.clientId, clientIds)); + + clientsWithNames = clientDetails.map((c) => ({ + id: c.clientId, + name: c.name + })); + } + + // Enrich resource IDs with names from main database + const resourceIds = uniqueResources + .map((row) => row.siteResourceId) + .filter((id): id is number => id !== null); + + let resourcesWithNames: Array<{ id: number; name: string | null }> = []; + if (resourceIds.length > 0) { + const resourceDetails = await primaryDb + .select({ + siteResourceId: siteResources.siteResourceId, + name: siteResources.name + }) + .from(siteResources) + .where(inArray(siteResources.siteResourceId, resourceIds)); + + resourcesWithNames = resourceDetails.map((r) => ({ + id: r.siteResourceId, + name: r.name + })); + } + + // Enrich user IDs with emails from main database + const userIdsList = uniqueUsers + .map((row) => row.userId) + .filter((id): id is string => id !== null); + + let usersWithEmails: Array<{ id: string; email: string | null }> = []; + if (userIdsList.length > 0) { + const userDetails = await primaryDb + .select({ + userId: users.userId, + email: users.email + }) + .from(users) + .where(inArray(users.userId, userIdsList)); + + usersWithEmails = userDetails.map((u) => ({ + id: u.userId, + email: u.email + })); + } + return { protocols: uniqueProtocols .map((row) => row.protocol) - .filter((protocol): protocol is string => protocol !== null) + .filter((protocol): protocol is string => protocol !== null), + destAddrs: uniqueDestAddrs + .map((row) => row.destAddr) + .filter((addr): addr is string => addr !== null), + clients: clientsWithNames, + resources: resourcesWithNames, + users: usersWithEmails }; } @@ -342,7 +488,7 @@ export async function queryConnectionAuditLogs( const logsRaw = await baseQuery.limit(data.limit).offset(data.offset); - // Enrich with resource, site, and client details + // Enrich with resource, site, client, and user details const log = await enrichWithDetails(logsRaw); const totalCountResult = await countConnectionQuery(data); diff --git a/server/private/routers/newt/handleConnectionLogMessage.ts b/server/private/routers/newt/handleConnectionLogMessage.ts index 164c14488..2ac7153b5 100644 --- a/server/private/routers/newt/handleConnectionLogMessage.ts +++ b/server/private/routers/newt/handleConnectionLogMessage.ts @@ -277,6 +277,8 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { return; } + logger.debug(`Sessions: ${JSON.stringify(sessions)}`) + // Build a map from sourceAddr → { clientId, userId } by querying clients // whose subnet field matches exactly. Client subnets are stored with the // org's CIDR suffix (e.g. "100.90.128.5/16"), so we reconstruct that from @@ -295,9 +297,15 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { if (uniqueSourceAddrs.size > 0) { // Construct the exact subnet strings as stored in the DB const subnetQueries = Array.from(uniqueSourceAddrs).map( - (addr) => `${addr}${cidrSuffix}` + (addr) => { + // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") + const ip = addr.includes(":") ? addr.split(":")[0] : addr; + return `${ip}${cidrSuffix}`; + } ); + logger.debug(`Subnet queries: ${JSON.stringify(subnetQueries)}`); + const matchedClients = await db .select({ clientId: clients.clientId, @@ -314,6 +322,7 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { for (const c of matchedClients) { const ip = c.subnet.split("/")[0]; + logger.debug(`Client ${c.clientId} subnet ${c.subnet} matches ${ip}`); ipToClient.set(ip, { clientId: c.clientId, userId: c.userId }); } } @@ -346,7 +355,10 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { // Match the source address to a client. The sourceAddr is the // client's IP on the WireGuard network, which corresponds to the IP // portion of the client's subnet CIDR (e.g. "100.90.128.5/24"). - const clientInfo = ipToClient.get(session.sourceAddr) ?? null; + // Strip port if present (e.g. "100.90.128.1:38004" → "100.90.128.1") + const sourceIp = session.sourceAddr.includes(":") ? session.sourceAddr.split(":")[0] : session.sourceAddr; + const clientInfo = ipToClient.get(sourceIp) ?? null; + buffer.push({ sessionId: session.sessionId, diff --git a/server/routers/auditLogs/types.ts b/server/routers/auditLogs/types.ts index 20e11e17b..4c278cba5 100644 --- a/server/routers/auditLogs/types.ts +++ b/server/routers/auditLogs/types.ts @@ -112,6 +112,9 @@ export type QueryConnectionAuditLogResponse = { siteName: string | null; siteNiceId: string | null; clientName: string | null; + clientNiceId: string | null; + clientType: string | null; + userEmail: string | null; }[]; pagination: { total: number; @@ -120,5 +123,18 @@ export type QueryConnectionAuditLogResponse = { }; filterAttributes: { protocols: string[]; + destAddrs: string[]; + clients: { + id: number; + name: string; + }[]; + resources: { + id: number; + name: string | null; + }[]; + users: { + id: string; + email: string | null; + }[]; }; }; diff --git a/src/app/[orgId]/settings/logs/connection/page.tsx b/src/app/[orgId]/settings/logs/connection/page.tsx index 737b1efd7..7ac137467 100644 --- a/src/app/[orgId]/settings/logs/connection/page.tsx +++ b/src/app/[orgId]/settings/logs/connection/page.tsx @@ -1,4 +1,5 @@ "use client"; +import { Button } from "@app/components/ui/button"; import { ColumnFilter } from "@app/components/ColumnFilter"; import { DateTimeValue } from "@app/components/DateTimePicker"; import { LogDataTable } from "@app/components/LogDataTable"; @@ -14,7 +15,8 @@ import { build } from "@server/build"; import { tierMatrix } from "@server/lib/billing/tierMatrix"; import { ColumnDef } from "@tanstack/react-table"; import axios from "axios"; -import { Cable, Monitor, Server } from "lucide-react"; +import { ArrowUpRight, Laptop, User } from "lucide-react"; +import Link from "next/link"; import { useTranslations } from "next-intl"; import { useParams, useRouter, useSearchParams } from "next/navigation"; import { useEffect, useState, useTransition } from "react"; @@ -57,15 +59,31 @@ export default function ConnectionLogsPage() { const [isExporting, startTransition] = useTransition(); const [filterAttributes, setFilterAttributes] = useState<{ protocols: string[]; + destAddrs: string[]; + clients: { id: number; name: string }[]; + resources: { id: number; name: string | null }[]; + users: { id: string; email: string | null }[]; }>({ - protocols: [] + protocols: [], + destAddrs: [], + clients: [], + resources: [], + users: [] }); // Filter states - unified object for all filters const [filters, setFilters] = useState<{ protocol?: string; + destAddr?: string; + clientId?: string; + siteResourceId?: string; + userId?: string; }>({ - protocol: searchParams.get("protocol") || undefined + protocol: searchParams.get("protocol") || undefined, + destAddr: searchParams.get("destAddr") || undefined, + clientId: searchParams.get("clientId") || undefined, + siteResourceId: searchParams.get("siteResourceId") || undefined, + userId: searchParams.get("userId") || undefined }); // Pagination state @@ -211,9 +229,7 @@ export default function ConnectionLogsPage() { endDate: DateTimeValue, page: number = currentPage, size: number = pageSize, - filtersParam?: { - protocol?: string; - } + filtersParam?: typeof filters ) => { console.log("Date range changed:", { startDate, endDate, page, size }); if (!isPaidUser(tierMatrix.connectionLogs)) { @@ -411,9 +427,41 @@ export default function ConnectionLogsPage() { { accessorKey: "resourceName", header: ({ column }) => { - return t("resource"); + return ( +
+ {t("resource")} + ({ + value: res.id.toString(), + label: res.name || "Unnamed Resource" + }))} + selectedValue={filters.siteResourceId} + onValueChange={(value) => + handleFilterChange("siteResourceId", value) + } + searchPlaceholder="Search..." + emptyMessage="None found" + /> +
+ ); }, cell: ({ row }) => { + if (row.original.resourceName && row.original.resourceNiceId) { + return ( + + + + ); + } return ( {row.original.resourceName ?? "—"} @@ -421,6 +469,86 @@ export default function ConnectionLogsPage() { ); } }, + { + accessorKey: "clientName", + header: ({ column }) => { + return ( +
+ {t("client")} + ({ + value: c.id.toString(), + label: c.name + }))} + selectedValue={filters.clientId} + onValueChange={(value) => + handleFilterChange("clientId", value) + } + searchPlaceholder="Search..." + emptyMessage="None found" + /> +
+ ); + }, + cell: ({ row }) => { + const clientType = row.original.clientType === "olm" ? "machine" : "user"; + if (row.original.clientName && row.original.clientNiceId) { + return ( + + + + ); + } + return ( + + {row.original.clientName ?? "—"} + + ); + } + }, + { + accessorKey: "userEmail", + header: ({ column }) => { + return ( +
+ {t("user")} + ({ + value: u.id, + label: u.email || u.id + }))} + selectedValue={filters.userId} + onValueChange={(value) => + handleFilterChange("userId", value) + } + searchPlaceholder="Search..." + emptyMessage="None found" + /> +
+ ); + }, + cell: ({ row }) => { + if (row.original.userEmail || row.original.userId) { + return ( + + + {row.original.userEmail ?? row.original.userId} + + ); + } + return ; + } + }, { accessorKey: "sourceAddr", header: ({ column }) => { @@ -437,7 +565,23 @@ export default function ConnectionLogsPage() { { accessorKey: "destAddr", header: ({ column }) => { - return t("destinationAddress"); + return ( +
+ {t("destinationAddress")} + ({ + value: addr, + label: addr + }))} + selectedValue={filters.destAddr} + onValueChange={(value) => + handleFilterChange("destAddr", value) + } + searchPlaceholder="Search..." + emptyMessage="None found" + /> +
+ ); }, cell: ({ row }) => { return ( @@ -470,10 +614,9 @@ export default function ConnectionLogsPage() {
-
- + {/*
Connection Details -
+
*/}
Session ID:{" "} @@ -518,10 +661,9 @@ export default function ConnectionLogsPage() {
-
- + {/*
Resource & Site -
+
*/}
Resource:{" "} {row.resourceName ?? "—"} @@ -548,10 +690,9 @@ export default function ConnectionLogsPage() {
-
- + {/*
Client & Transfer -
+
*/}
Client: {row.clientName ?? "—"} {row.clientId && ( @@ -561,7 +702,8 @@ export default function ConnectionLogsPage() { )}
- User ID: {row.userId ?? "—"} + User:{" "} + {row.userEmail ?? row.userId ?? "—"}
Bytes Sent (TX):{" "} @@ -627,4 +769,4 @@ export default function ConnectionLogsPage() { /> ); -} \ No newline at end of file +} From f9bff5954f190ce697b9d4ed6ed3b3992343e854 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 21:49:22 -0700 Subject: [PATCH 11/12] Add filters and refine table and query --- .../[orgId]/settings/logs/connection/page.tsx | 78 ++++++++----------- 1 file changed, 33 insertions(+), 45 deletions(-) diff --git a/src/app/[orgId]/settings/logs/connection/page.tsx b/src/app/[orgId]/settings/logs/connection/page.tsx index 7ac137467..1d93e0658 100644 --- a/src/app/[orgId]/settings/logs/connection/page.tsx +++ b/src/app/[orgId]/settings/logs/connection/page.tsx @@ -639,6 +639,31 @@ export default function ConnectionLogsPage() { {row.destAddr ?? "—"}
+
+
+ {/*
+ Resource & Site +
*/} + {/*
+ Resource:{" "} + {row.resourceName ?? "—"} + {row.resourceNiceId && ( + + ({row.resourceNiceId}) + + )} +
*/} +
+ Site: {row.siteName ?? "—"} + {row.siteNiceId && ( + + ({row.siteNiceId}) + + )} +
+
+ Site ID: {row.siteId ?? "—"} +
Started At:{" "} {row.startedAt @@ -659,66 +684,29 @@ export default function ConnectionLogsPage() { Duration:{" "} {formatDuration(row.startedAt, row.endedAt)}
-
-
- {/*
- Resource & Site -
*/} -
- Resource:{" "} - {row.resourceName ?? "—"} - {row.resourceNiceId && ( - - ({row.resourceNiceId}) - - )} -
-
- Site: {row.siteName ?? "—"} - {row.siteNiceId && ( - - ({row.siteNiceId}) - - )} -
-
- Site ID: {row.siteId ?? "—"} -
-
+ {/*
Resource ID:{" "} {row.siteResourceId ?? "—"} -
+
*/}
{/*
Client & Transfer
*/} -
- Client: {row.clientName ?? "—"} - {row.clientId && ( - - (ID: {row.clientId}) - - )} -
-
- User:{" "} - {row.userEmail ?? row.userId ?? "—"} -
-
+ {/*
Bytes Sent (TX):{" "} {formatBytes(row.bytesTx)} -
-
+
*/} + {/*
Bytes Received (RX):{" "} {formatBytes(row.bytesRx)} -
-
+
*/} + {/*
Total Transfer:{" "} {formatBytes( (row.bytesTx ?? 0) + (row.bytesRx ?? 0) )} -
+
*/}
From 7b78b914493cb1bf2f5066a05296b787f1dd7517 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 22:00:53 -0700 Subject: [PATCH 12/12] Fix resource link --- src/app/[orgId]/settings/logs/connection/page.tsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/app/[orgId]/settings/logs/connection/page.tsx b/src/app/[orgId]/settings/logs/connection/page.tsx index 1d93e0658..dff42faac 100644 --- a/src/app/[orgId]/settings/logs/connection/page.tsx +++ b/src/app/[orgId]/settings/logs/connection/page.tsx @@ -449,7 +449,7 @@ export default function ConnectionLogsPage() { if (row.original.resourceName && row.original.resourceNiceId) { return (