mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-25 12:06:37 +00:00
Source client info into schema
This commit is contained in:
@@ -319,6 +319,12 @@ export const connectionAuditLog = pgTable(
|
|||||||
siteId: integer("siteId").references(() => sites.siteId, {
|
siteId: integer("siteId").references(() => sites.siteId, {
|
||||||
onDelete: "cascade"
|
onDelete: "cascade"
|
||||||
}),
|
}),
|
||||||
|
clientId: integer("clientId").references(() => clients.clientId, {
|
||||||
|
onDelete: "cascade"
|
||||||
|
}),
|
||||||
|
userId: text("userId").references(() => users.userId, {
|
||||||
|
onDelete: "cascade"
|
||||||
|
}),
|
||||||
sourceAddr: text("sourceAddr").notNull(),
|
sourceAddr: text("sourceAddr").notNull(),
|
||||||
destAddr: text("destAddr").notNull(),
|
destAddr: text("destAddr").notNull(),
|
||||||
protocol: text("protocol").notNull(),
|
protocol: text("protocol").notNull(),
|
||||||
|
|||||||
@@ -309,6 +309,12 @@ export const connectionAuditLog = sqliteTable(
|
|||||||
siteId: integer("siteId").references(() => sites.siteId, {
|
siteId: integer("siteId").references(() => sites.siteId, {
|
||||||
onDelete: "cascade"
|
onDelete: "cascade"
|
||||||
}),
|
}),
|
||||||
|
clientId: integer("clientId").references(() => clients.clientId, {
|
||||||
|
onDelete: "cascade"
|
||||||
|
}),
|
||||||
|
userId: text("userId").references(() => users.userId, {
|
||||||
|
onDelete: "cascade"
|
||||||
|
}),
|
||||||
sourceAddr: text("sourceAddr").notNull(),
|
sourceAddr: text("sourceAddr").notNull(),
|
||||||
destAddr: text("destAddr").notNull(),
|
destAddr: text("destAddr").notNull(),
|
||||||
protocol: text("protocol").notNull(),
|
protocol: text("protocol").notNull(),
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { db, logsDb } from "@server/db";
|
import { db, logsDb } from "@server/db";
|
||||||
import { MessageHandler } from "@server/routers/ws";
|
import { MessageHandler } from "@server/routers/ws";
|
||||||
import { connectionAuditLog, sites, Newt } from "@server/db";
|
import { connectionAuditLog, sites, Newt, clients, orgs } from "@server/db";
|
||||||
import { and, eq, lt } from "drizzle-orm";
|
import { and, eq, lt, inArray } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { inflate } from "zlib";
|
import { inflate } from "zlib";
|
||||||
import { promisify } from "util";
|
import { promisify } from "util";
|
||||||
@@ -39,6 +39,8 @@ interface ConnectionLogRecord {
|
|||||||
siteResourceId: number;
|
siteResourceId: number;
|
||||||
orgId: string;
|
orgId: string;
|
||||||
siteId: number;
|
siteId: number;
|
||||||
|
clientId: number | null;
|
||||||
|
userId: string | null;
|
||||||
sourceAddr: string;
|
sourceAddr: string;
|
||||||
destAddr: string;
|
destAddr: string;
|
||||||
protocol: string;
|
protocol: string;
|
||||||
@@ -243,8 +245,9 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
|
|||||||
|
|
||||||
// Look up the org for this site
|
// Look up the org for this site
|
||||||
const [site] = await db
|
const [site] = await db
|
||||||
.select({ orgId: sites.orgId })
|
.select({ orgId: sites.orgId, orgSubnet: orgs.subnet })
|
||||||
.from(sites)
|
.from(sites)
|
||||||
|
.innerJoin(orgs, eq(sites.orgId, orgs.orgId))
|
||||||
.where(eq(sites.siteId, newt.siteId));
|
.where(eq(sites.siteId, newt.siteId));
|
||||||
|
|
||||||
if (!site) {
|
if (!site) {
|
||||||
@@ -256,6 +259,12 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
|
|||||||
|
|
||||||
const orgId = site.orgId;
|
const orgId = site.orgId;
|
||||||
|
|
||||||
|
// Extract the CIDR suffix (e.g. "/16") from the org subnet so we can
|
||||||
|
// reconstruct the exact subnet string stored on each client record.
|
||||||
|
const cidrSuffix = site.orgSubnet?.includes("/")
|
||||||
|
? site.orgSubnet.substring(site.orgSubnet.indexOf("/"))
|
||||||
|
: null;
|
||||||
|
|
||||||
let sessions: ConnectionSessionData[];
|
let sessions: ConnectionSessionData[];
|
||||||
try {
|
try {
|
||||||
sessions = await decompressConnectionLog(message.data.compressed);
|
sessions = await decompressConnectionLog(message.data.compressed);
|
||||||
@@ -268,6 +277,48 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Build a map from sourceAddr → { clientId, userId } by querying clients
|
||||||
|
// whose subnet field matches exactly. Client subnets are stored with the
|
||||||
|
// org's CIDR suffix (e.g. "100.90.128.5/16"), so we reconstruct that from
|
||||||
|
// each unique sourceAddr + the org's CIDR suffix and do a targeted IN query.
|
||||||
|
const ipToClient = new Map<string, { clientId: number; userId: string | null }>();
|
||||||
|
|
||||||
|
if (cidrSuffix) {
|
||||||
|
// Collect unique source addresses so we only query for what we need
|
||||||
|
const uniqueSourceAddrs = new Set<string>();
|
||||||
|
for (const session of sessions) {
|
||||||
|
if (session.sourceAddr) {
|
||||||
|
uniqueSourceAddrs.add(session.sourceAddr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (uniqueSourceAddrs.size > 0) {
|
||||||
|
// Construct the exact subnet strings as stored in the DB
|
||||||
|
const subnetQueries = Array.from(uniqueSourceAddrs).map(
|
||||||
|
(addr) => `${addr}${cidrSuffix}`
|
||||||
|
);
|
||||||
|
|
||||||
|
const matchedClients = await db
|
||||||
|
.select({
|
||||||
|
clientId: clients.clientId,
|
||||||
|
userId: clients.userId,
|
||||||
|
subnet: clients.subnet
|
||||||
|
})
|
||||||
|
.from(clients)
|
||||||
|
.where(
|
||||||
|
and(
|
||||||
|
eq(clients.orgId, orgId),
|
||||||
|
inArray(clients.subnet, subnetQueries)
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
|
for (const c of matchedClients) {
|
||||||
|
const ip = c.subnet.split("/")[0];
|
||||||
|
ipToClient.set(ip, { clientId: c.clientId, userId: c.userId });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Convert to DB records and add to the buffer
|
// Convert to DB records and add to the buffer
|
||||||
for (const session of sessions) {
|
for (const session of sessions) {
|
||||||
// Validate required fields
|
// Validate required fields
|
||||||
@@ -292,11 +343,18 @@ export const handleConnectionLogMessage: MessageHandler = async (context) => {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Match the source address to a client. The sourceAddr is the
|
||||||
|
// client's IP on the WireGuard network, which corresponds to the IP
|
||||||
|
// portion of the client's subnet CIDR (e.g. "100.90.128.5/24").
|
||||||
|
const clientInfo = ipToClient.get(session.sourceAddr) ?? null;
|
||||||
|
|
||||||
buffer.push({
|
buffer.push({
|
||||||
sessionId: session.sessionId,
|
sessionId: session.sessionId,
|
||||||
siteResourceId: session.resourceId,
|
siteResourceId: session.resourceId,
|
||||||
orgId,
|
orgId,
|
||||||
siteId: newt.siteId,
|
siteId: newt.siteId,
|
||||||
|
clientId: clientInfo?.clientId ?? null,
|
||||||
|
userId: clientInfo?.userId ?? null,
|
||||||
sourceAddr: session.sourceAddr,
|
sourceAddr: session.sourceAddr,
|
||||||
destAddr: session.destAddr,
|
destAddr: session.destAddr,
|
||||||
protocol: session.protocol,
|
protocol: session.protocol,
|
||||||
|
|||||||
Reference in New Issue
Block a user