From fe40ea58c14a5eae05efdbdd64f6460d12566b4d Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 20:05:54 -0700 Subject: [PATCH] Source client info into schema --- server/db/pg/schema/privateSchema.ts | 6 ++ server/db/sqlite/schema/privateSchema.ts | 6 ++ .../newt/handleConnectionLogMessage.ts | 64 ++++++++++++++++++- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/server/db/pg/schema/privateSchema.ts b/server/db/pg/schema/privateSchema.ts index 8fed6462a..8d4e663df 100644 --- a/server/db/pg/schema/privateSchema.ts +++ b/server/db/pg/schema/privateSchema.ts @@ -319,6 +319,12 @@ export const connectionAuditLog = pgTable( siteId: integer("siteId").references(() => sites.siteId, { onDelete: "cascade" }), + clientId: integer("clientId").references(() => clients.clientId, { + onDelete: "cascade" + }), + userId: text("userId").references(() => users.userId, { + onDelete: "cascade" + }), sourceAddr: text("sourceAddr").notNull(), destAddr: text("destAddr").notNull(), protocol: text("protocol").notNull(), diff --git a/server/db/sqlite/schema/privateSchema.ts b/server/db/sqlite/schema/privateSchema.ts index ecc386ea6..f58b5cd18 100644 --- a/server/db/sqlite/schema/privateSchema.ts +++ b/server/db/sqlite/schema/privateSchema.ts @@ -309,6 +309,12 @@ export const connectionAuditLog = sqliteTable( siteId: integer("siteId").references(() => sites.siteId, { onDelete: "cascade" }), + clientId: integer("clientId").references(() => clients.clientId, { + onDelete: "cascade" + }), + userId: text("userId").references(() => users.userId, { + onDelete: "cascade" + }), sourceAddr: text("sourceAddr").notNull(), destAddr: text("destAddr").notNull(), protocol: text("protocol").notNull(), diff --git a/server/private/routers/newt/handleConnectionLogMessage.ts b/server/private/routers/newt/handleConnectionLogMessage.ts index 7549ab37f..164c14488 100644 --- a/server/private/routers/newt/handleConnectionLogMessage.ts +++ b/server/private/routers/newt/handleConnectionLogMessage.ts @@ -1,7 +1,7 @@ import { db, logsDb } from "@server/db"; import { MessageHandler } from "@server/routers/ws"; -import { connectionAuditLog, sites, Newt } from "@server/db"; -import { and, eq, lt } from "drizzle-orm"; +import { connectionAuditLog, sites, Newt, clients, orgs } from "@server/db"; +import { and, eq, lt, inArray } from "drizzle-orm"; import logger from "@server/logger"; import { inflate } from "zlib"; import { promisify } from "util"; @@ -39,6 +39,8 @@ interface ConnectionLogRecord { siteResourceId: number; orgId: string; siteId: number; + clientId: number | null; + userId: string | null; sourceAddr: string; destAddr: string; protocol: string; @@ -243,8 +245,9 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { // Look up the org for this site const [site] = await db - .select({ orgId: sites.orgId }) + .select({ orgId: sites.orgId, orgSubnet: orgs.subnet }) .from(sites) + .innerJoin(orgs, eq(sites.orgId, orgs.orgId)) .where(eq(sites.siteId, newt.siteId)); if (!site) { @@ -256,6 +259,12 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { const orgId = site.orgId; + // Extract the CIDR suffix (e.g. "/16") from the org subnet so we can + // reconstruct the exact subnet string stored on each client record. + const cidrSuffix = site.orgSubnet?.includes("/") + ? site.orgSubnet.substring(site.orgSubnet.indexOf("/")) + : null; + let sessions: ConnectionSessionData[]; try { sessions = await decompressConnectionLog(message.data.compressed); @@ -268,6 +277,48 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { return; } + // Build a map from sourceAddr → { clientId, userId } by querying clients + // whose subnet field matches exactly. Client subnets are stored with the + // org's CIDR suffix (e.g. "100.90.128.5/16"), so we reconstruct that from + // each unique sourceAddr + the org's CIDR suffix and do a targeted IN query. + const ipToClient = new Map(); + + if (cidrSuffix) { + // Collect unique source addresses so we only query for what we need + const uniqueSourceAddrs = new Set(); + for (const session of sessions) { + if (session.sourceAddr) { + uniqueSourceAddrs.add(session.sourceAddr); + } + } + + if (uniqueSourceAddrs.size > 0) { + // Construct the exact subnet strings as stored in the DB + const subnetQueries = Array.from(uniqueSourceAddrs).map( + (addr) => `${addr}${cidrSuffix}` + ); + + const matchedClients = await db + .select({ + clientId: clients.clientId, + userId: clients.userId, + subnet: clients.subnet + }) + .from(clients) + .where( + and( + eq(clients.orgId, orgId), + inArray(clients.subnet, subnetQueries) + ) + ); + + for (const c of matchedClients) { + const ip = c.subnet.split("/")[0]; + ipToClient.set(ip, { clientId: c.clientId, userId: c.userId }); + } + } + } + // Convert to DB records and add to the buffer for (const session of sessions) { // Validate required fields @@ -292,11 +343,18 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => { continue; } + // Match the source address to a client. The sourceAddr is the + // client's IP on the WireGuard network, which corresponds to the IP + // portion of the client's subnet CIDR (e.g. "100.90.128.5/24"). + const clientInfo = ipToClient.get(session.sourceAddr) ?? null; + buffer.push({ sessionId: session.sessionId, siteResourceId: session.resourceId, orgId, siteId: newt.siteId, + clientId: clientInfo?.clientId ?? null, + userId: clientInfo?.userId ?? null, sourceAddr: session.sourceAddr, destAddr: session.destAddr, protocol: session.protocol,