Compare commits

..

2 Commits

Author SHA1 Message Date
Owen
b622aca221 Try to route logs requests to a different database 2026-02-20 17:20:01 -08:00
miloschwartz
eedf57af89 disable rybbit in saas 2026-02-19 17:54:40 -08:00
16 changed files with 569 additions and 314 deletions

469
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -33,7 +33,7 @@
},
"dependencies": {
"@asteasolutions/zod-to-openapi": "8.4.0",
"@aws-sdk/client-s3": "3.993.0",
"@aws-sdk/client-s3": "3.989.0",
"@faker-js/faker": "10.3.0",
"@headlessui/react": "2.2.9",
"@hookform/resolvers": "5.2.2",
@@ -89,7 +89,7 @@
"jmespath": "0.16.0",
"js-yaml": "4.1.1",
"jsonwebtoken": "9.0.3",
"lucide-react": "0.574.0",
"lucide-react": "0.563.0",
"maxmind": "5.0.5",
"moment": "2.30.1",
"next": "15.5.12",
@@ -115,7 +115,7 @@
"sshpk": "^1.18.0",
"stripe": "20.3.1",
"swagger-ui-express": "5.0.1",
"tailwind-merge": "3.5.0",
"tailwind-merge": "3.4.0",
"topojson-client": "3.1.0",
"tw-animate-css": "1.4.0",
"use-debounce": "^10.1.0",

View File

@@ -1,4 +1,5 @@
export * from "./driver";
export * from "./logsDriver";
export * from "./schema/schema";
export * from "./schema/privateSchema";
export * from "./migrate";

View File

@@ -0,0 +1,89 @@
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
import { Pool } from "pg";
import { readConfigFile } from "@server/lib/readConfigFile";
import { readPrivateConfigFile } from "@server/private/lib/readConfigFile";
import { withReplicas } from "drizzle-orm/pg-core";
import { build } from "@server/build";
import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver";
function createLogsDb() {
// Only use separate logs database in SaaS builds
if (build !== "saas") {
return mainDb;
}
const config = readConfigFile();
const privateConfig = readPrivateConfigFile();
// Merge configs, prioritizing private config
const logsConfig = privateConfig.postgres_logs || config.postgres_logs;
// Check environment variable first
let connectionString = process.env.POSTGRES_LOGS_CONNECTION_STRING;
let replicaConnections: Array<{ connection_string: string }> = [];
if (!connectionString && logsConfig) {
connectionString = logsConfig.connection_string;
replicaConnections = logsConfig.replicas || [];
}
// If POSTGRES_LOGS_REPLICA_CONNECTION_STRINGS is set, use it
if (process.env.POSTGRES_LOGS_REPLICA_CONNECTION_STRINGS) {
replicaConnections =
process.env.POSTGRES_LOGS_REPLICA_CONNECTION_STRINGS.split(",").map(
(conn) => ({
connection_string: conn.trim()
})
);
}
// If no logs database is configured, fall back to main database
if (!connectionString) {
return mainDb;
}
// Create separate connection pool for logs database
const poolConfig = logsConfig?.pool || config.postgres?.pool;
const primaryPool = new Pool({
connectionString,
max: poolConfig?.max_connections || 20,
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
});
const replicas = [];
if (!replicaConnections.length) {
replicas.push(
DrizzlePostgres(primaryPool, {
logger: process.env.QUERY_LOGGING == "true"
})
);
} else {
for (const conn of replicaConnections) {
const replicaPool = new Pool({
connectionString: conn.connection_string,
max: poolConfig?.max_replica_connections || 20,
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
connectionTimeoutMillis:
poolConfig?.connection_timeout_ms || 5000
});
replicas.push(
DrizzlePostgres(replicaPool, {
logger: process.env.QUERY_LOGGING == "true"
})
);
}
}
return withReplicas(
DrizzlePostgres(primaryPool, {
logger: process.env.QUERY_LOGGING == "true"
}),
replicas as any
);
}
export const logsDb = createLogsDb();
export default logsDb;
export const primaryLogsDb = logsDb.$primary;

View File

@@ -1,4 +1,5 @@
export * from "./driver";
export * from "./logsDriver";
export * from "./schema/schema";
export * from "./schema/privateSchema";
export * from "./migrate";

View File

@@ -0,0 +1,7 @@
import { db as mainDb } from "./driver";
// SQLite doesn't support separate databases for logs in the same way as Postgres
// Always use the main database connection for SQLite
export const logsDb = mainDb;
export default logsDb;
export const primaryLogsDb = logsDb;

View File

@@ -189,6 +189,46 @@ export const configSchema = z
.prefault({})
})
.optional(),
postgres_logs: z
.object({
connection_string: z
.string()
.optional()
.transform(getEnvOrYaml("POSTGRES_LOGS_CONNECTION_STRING")),
replicas: z
.array(
z.object({
connection_string: z.string()
})
)
.optional(),
pool: z
.object({
max_connections: z
.number()
.positive()
.optional()
.default(20),
max_replica_connections: z
.number()
.positive()
.optional()
.default(10),
idle_timeout_ms: z
.number()
.positive()
.optional()
.default(30000),
connection_timeout_ms: z
.number()
.positive()
.optional()
.default(5000)
})
.optional()
.prefault({})
})
.optional(),
traefik: z
.object({
http_entrypoint: z.string().optional().default("web"),

View File

@@ -11,7 +11,7 @@
* This file is not licensed under the AGPLv3.
*/
import { accessAuditLog, db, orgs } from "@server/db";
import { accessAuditLog, logsDb, db, orgs } from "@server/db";
import { getCountryCodeForIp } from "@server/lib/geoip";
import logger from "@server/logger";
import { and, eq, lt } from "drizzle-orm";
@@ -52,7 +52,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
try {
await db
await logsDb
.delete(accessAuditLog)
.where(
and(
@@ -124,7 +124,7 @@ export async function logAccessAudit(data: {
? await getCountryCodeFromIp(data.requestIp)
: undefined;
await db.insert(accessAuditLog).values({
await logsDb.insert(accessAuditLog).values({
timestamp: timestamp,
orgId: data.orgId,
actorType,

View File

@@ -83,6 +83,46 @@ export const privateConfigSchema = z.object({
// .optional()
})
.optional(),
postgres_logs: z
.object({
connection_string: z
.string()
.optional()
.transform(getEnvOrYaml("POSTGRES_LOGS_CONNECTION_STRING")),
replicas: z
.array(
z.object({
connection_string: z.string()
})
)
.optional(),
pool: z
.object({
max_connections: z
.number()
.positive()
.optional()
.default(20),
max_replica_connections: z
.number()
.positive()
.optional()
.default(10),
idle_timeout_ms: z
.number()
.positive()
.optional()
.default(30000),
connection_timeout_ms: z
.number()
.positive()
.optional()
.default(5000)
})
.optional()
.prefault({})
})
.optional(),
gerbil: z
.object({
local_exit_node_reachable_at: z

View File

@@ -12,7 +12,7 @@
*/
import { ActionsEnum } from "@server/auth/actions";
import { actionAuditLog, db, orgs } from "@server/db";
import { actionAuditLog, logsDb, db, orgs } from "@server/db";
import logger from "@server/logger";
import HttpCode from "@server/types/HttpCode";
import { Request, Response, NextFunction } from "express";
@@ -54,7 +54,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
try {
await db
await logsDb
.delete(actionAuditLog)
.where(
and(
@@ -123,7 +123,7 @@ export function logActionAudit(action: ActionsEnum) {
metadata = JSON.stringify(req.params);
}
await db.insert(actionAuditLog).values({
await logsDb.insert(actionAuditLog).values({
timestamp,
orgId,
actorType,

View File

@@ -11,11 +11,11 @@
* This file is not licensed under the AGPLv3.
*/
import { accessAuditLog, db, resources } from "@server/db";
import { accessAuditLog, logsDb, resources, db, primaryDb } from "@server/db";
import { registry } from "@server/openApi";
import { NextFunction } from "express";
import { Request, Response } from "express";
import { eq, gt, lt, and, count, desc } from "drizzle-orm";
import { eq, gt, lt, and, count, desc, inArray } from "drizzle-orm";
import { OpenAPITags } from "@server/openApi";
import { z } from "zod";
import createHttpError from "http-errors";
@@ -115,7 +115,7 @@ function getWhere(data: Q) {
}
export function queryAccess(data: Q) {
return db
return logsDb
.select({
orgId: accessAuditLog.orgId,
action: accessAuditLog.action,
@@ -133,16 +133,46 @@ export function queryAccess(data: Q) {
actor: accessAuditLog.actor
})
.from(accessAuditLog)
.leftJoin(
resources,
eq(accessAuditLog.resourceId, resources.resourceId)
)
.where(getWhere(data))
.orderBy(desc(accessAuditLog.timestamp), desc(accessAuditLog.id));
}
async function enrichWithResourceDetails(logs: Awaited<ReturnType<typeof queryAccess>>) {
// If logs database is the same as main database, we can do a join
// Otherwise, we need to fetch resource details separately
const resourceIds = logs
.map(log => log.resourceId)
.filter((id): id is number => id !== null && id !== undefined);
if (resourceIds.length === 0) {
return logs.map(log => ({ ...log, resourceName: null, resourceNiceId: null }));
}
// Fetch resource details from main database
const resourceDetails = await primaryDb
.select({
resourceId: resources.resourceId,
name: resources.name,
niceId: resources.niceId
})
.from(resources)
.where(inArray(resources.resourceId, resourceIds));
// Create a map for quick lookup
const resourceMap = new Map(
resourceDetails.map(r => [r.resourceId, { name: r.name, niceId: r.niceId }])
);
// Enrich logs with resource details
return logs.map(log => ({
...log,
resourceName: log.resourceId ? resourceMap.get(log.resourceId)?.name ?? null : null,
resourceNiceId: log.resourceId ? resourceMap.get(log.resourceId)?.niceId ?? null : null
}));
}
export function countAccessQuery(data: Q) {
const countQuery = db
const countQuery = logsDb
.select({ count: count() })
.from(accessAuditLog)
.where(getWhere(data));
@@ -161,7 +191,7 @@ async function queryUniqueFilterAttributes(
);
// Get unique actors
const uniqueActors = await db
const uniqueActors = await logsDb
.selectDistinct({
actor: accessAuditLog.actor
})
@@ -169,7 +199,7 @@ async function queryUniqueFilterAttributes(
.where(baseConditions);
// Get unique locations
const uniqueLocations = await db
const uniqueLocations = await logsDb
.selectDistinct({
locations: accessAuditLog.location
})
@@ -177,25 +207,40 @@ async function queryUniqueFilterAttributes(
.where(baseConditions);
// Get unique resources with names
const uniqueResources = await db
const uniqueResources = await logsDb
.selectDistinct({
id: accessAuditLog.resourceId,
name: resources.name
id: accessAuditLog.resourceId
})
.from(accessAuditLog)
.leftJoin(
resources,
eq(accessAuditLog.resourceId, resources.resourceId)
)
.where(baseConditions);
// Fetch resource names from main database for the unique resource IDs
const resourceIds = uniqueResources
.map(row => row.id)
.filter((id): id is number => id !== null);
let resourcesWithNames: Array<{ id: number; name: string | null }> = [];
if (resourceIds.length > 0) {
const resourceDetails = await primaryDb
.select({
resourceId: resources.resourceId,
name: resources.name
})
.from(resources)
.where(inArray(resources.resourceId, resourceIds));
resourcesWithNames = resourceDetails.map(r => ({
id: r.resourceId,
name: r.name
}));
}
return {
actors: uniqueActors
.map((row) => row.actor)
.filter((actor): actor is string => actor !== null),
resources: uniqueResources.filter(
(row): row is { id: number; name: string | null } => row.id !== null
),
resources: resourcesWithNames,
locations: uniqueLocations
.map((row) => row.locations)
.filter((location): location is string => location !== null)
@@ -243,7 +288,10 @@ export async function queryAccessAuditLogs(
const baseQuery = queryAccess(data);
const log = await baseQuery.limit(data.limit).offset(data.offset);
const logsRaw = await baseQuery.limit(data.limit).offset(data.offset);
// Enrich with resource details (handles cross-database scenario)
const log = await enrichWithResourceDetails(logsRaw);
const totalCountResult = await countAccessQuery(data);
const totalCount = totalCountResult[0].count;

View File

@@ -11,7 +11,7 @@
* This file is not licensed under the AGPLv3.
*/
import { actionAuditLog, db } from "@server/db";
import { actionAuditLog, logsDb } from "@server/db";
import { registry } from "@server/openApi";
import { NextFunction } from "express";
import { Request, Response } from "express";
@@ -97,7 +97,7 @@ function getWhere(data: Q) {
}
export function queryAction(data: Q) {
return db
return logsDb
.select({
orgId: actionAuditLog.orgId,
action: actionAuditLog.action,
@@ -113,7 +113,7 @@ export function queryAction(data: Q) {
}
export function countActionQuery(data: Q) {
const countQuery = db
const countQuery = logsDb
.select({ count: count() })
.from(actionAuditLog)
.where(getWhere(data));
@@ -132,14 +132,14 @@ async function queryUniqueFilterAttributes(
);
// Get unique actors
const uniqueActors = await db
const uniqueActors = await logsDb
.selectDistinct({
actor: actionAuditLog.actor
})
.from(actionAuditLog)
.where(baseConditions);
const uniqueActions = await db
const uniqueActions = await logsDb
.selectDistinct({
action: actionAuditLog.action
})

View File

@@ -1,4 +1,4 @@
import { db, requestAuditLog, driver, primaryDb } from "@server/db";
import { logsDb, requestAuditLog, driver, primaryLogsDb } from "@server/db";
import { registry } from "@server/openApi";
import { NextFunction } from "express";
import { Request, Response } from "express";
@@ -74,12 +74,12 @@ async function query(query: Q) {
);
}
const [all] = await primaryDb
const [all] = await primaryLogsDb
.select({ total: count() })
.from(requestAuditLog)
.where(baseConditions);
const [blocked] = await primaryDb
const [blocked] = await primaryLogsDb
.select({ total: count() })
.from(requestAuditLog)
.where(and(baseConditions, eq(requestAuditLog.action, false)));
@@ -90,7 +90,7 @@ async function query(query: Q) {
const DISTINCT_LIMIT = 500;
const requestsPerCountry = await primaryDb
const requestsPerCountry = await primaryLogsDb
.selectDistinct({
code: requestAuditLog.location,
count: totalQ
@@ -118,7 +118,7 @@ async function query(query: Q) {
const booleanTrue = driver === "pg" ? sql`true` : sql`1`;
const booleanFalse = driver === "pg" ? sql`false` : sql`0`;
const requestsPerDay = await primaryDb
const requestsPerDay = await primaryLogsDb
.select({
day: groupByDayFunction.as("day"),
allowedCount:

View File

@@ -1,8 +1,8 @@
import { db, primaryDb, requestAuditLog, resources } from "@server/db";
import { logsDb, primaryLogsDb, requestAuditLog, resources, db, primaryDb } from "@server/db";
import { registry } from "@server/openApi";
import { NextFunction } from "express";
import { Request, Response } from "express";
import { eq, gt, lt, and, count, desc } from "drizzle-orm";
import { eq, gt, lt, and, count, desc, inArray } from "drizzle-orm";
import { OpenAPITags } from "@server/openApi";
import { z } from "zod";
import createHttpError from "http-errors";
@@ -107,7 +107,7 @@ function getWhere(data: Q) {
}
export function queryRequest(data: Q) {
return primaryDb
return primaryLogsDb
.select({
id: requestAuditLog.id,
timestamp: requestAuditLog.timestamp,
@@ -129,21 +129,49 @@ export function queryRequest(data: Q) {
host: requestAuditLog.host,
path: requestAuditLog.path,
method: requestAuditLog.method,
tls: requestAuditLog.tls,
resourceName: resources.name,
resourceNiceId: resources.niceId
tls: requestAuditLog.tls
})
.from(requestAuditLog)
.leftJoin(
resources,
eq(requestAuditLog.resourceId, resources.resourceId)
) // TODO: Is this efficient?
.where(getWhere(data))
.orderBy(desc(requestAuditLog.timestamp));
}
async function enrichWithResourceDetails(logs: Awaited<ReturnType<typeof queryRequest>>) {
// If logs database is the same as main database, we can do a join
// Otherwise, we need to fetch resource details separately
const resourceIds = logs
.map(log => log.resourceId)
.filter((id): id is number => id !== null && id !== undefined);
if (resourceIds.length === 0) {
return logs.map(log => ({ ...log, resourceName: null, resourceNiceId: null }));
}
// Fetch resource details from main database
const resourceDetails = await primaryDb
.select({
resourceId: resources.resourceId,
name: resources.name,
niceId: resources.niceId
})
.from(resources)
.where(inArray(resources.resourceId, resourceIds));
// Create a map for quick lookup
const resourceMap = new Map(
resourceDetails.map(r => [r.resourceId, { name: r.name, niceId: r.niceId }])
);
// Enrich logs with resource details
return logs.map(log => ({
...log,
resourceName: log.resourceId ? resourceMap.get(log.resourceId)?.name ?? null : null,
resourceNiceId: log.resourceId ? resourceMap.get(log.resourceId)?.niceId ?? null : null
}));
}
export function countRequestQuery(data: Q) {
const countQuery = primaryDb
const countQuery = primaryLogsDb
.select({ count: count() })
.from(requestAuditLog)
.where(getWhere(data));
@@ -185,36 +213,31 @@ async function queryUniqueFilterAttributes(
uniquePaths,
uniqueResources
] = await Promise.all([
primaryDb
primaryLogsDb
.selectDistinct({ actor: requestAuditLog.actor })
.from(requestAuditLog)
.where(baseConditions)
.limit(DISTINCT_LIMIT + 1),
primaryDb
primaryLogsDb
.selectDistinct({ locations: requestAuditLog.location })
.from(requestAuditLog)
.where(baseConditions)
.limit(DISTINCT_LIMIT + 1),
primaryDb
primaryLogsDb
.selectDistinct({ hosts: requestAuditLog.host })
.from(requestAuditLog)
.where(baseConditions)
.limit(DISTINCT_LIMIT + 1),
primaryDb
primaryLogsDb
.selectDistinct({ paths: requestAuditLog.path })
.from(requestAuditLog)
.where(baseConditions)
.limit(DISTINCT_LIMIT + 1),
primaryDb
primaryLogsDb
.selectDistinct({
id: requestAuditLog.resourceId,
name: resources.name
id: requestAuditLog.resourceId
})
.from(requestAuditLog)
.leftJoin(
resources,
eq(requestAuditLog.resourceId, resources.resourceId)
)
.where(baseConditions)
.limit(DISTINCT_LIMIT + 1)
]);
@@ -231,13 +254,33 @@ async function queryUniqueFilterAttributes(
// throw new Error("Too many distinct filter attributes to retrieve. Please refine your time range.");
// }
// Fetch resource names from main database for the unique resource IDs
const resourceIds = uniqueResources
.map(row => row.id)
.filter((id): id is number => id !== null);
let resourcesWithNames: Array<{ id: number; name: string | null }> = [];
if (resourceIds.length > 0) {
const resourceDetails = await primaryDb
.select({
resourceId: resources.resourceId,
name: resources.name
})
.from(resources)
.where(inArray(resources.resourceId, resourceIds));
resourcesWithNames = resourceDetails.map(r => ({
id: r.resourceId,
name: r.name
}));
}
return {
actors: uniqueActors
.map((row) => row.actor)
.filter((actor): actor is string => actor !== null),
resources: uniqueResources.filter(
(row): row is { id: number; name: string | null } => row.id !== null
),
resources: resourcesWithNames,
locations: uniqueLocations
.map((row) => row.locations)
.filter((location): location is string => location !== null),
@@ -280,7 +323,10 @@ export async function queryRequestAuditLogs(
const baseQuery = queryRequest(data);
const log = await baseQuery.limit(data.limit).offset(data.offset);
const logsRaw = await baseQuery.limit(data.limit).offset(data.offset);
// Enrich with resource details (handles cross-database scenario)
const log = await enrichWithResourceDetails(logsRaw);
const totalCountResult = await countRequestQuery(data);
const totalCount = totalCountResult[0].count;

View File

@@ -1,4 +1,4 @@
import { db, orgs, requestAuditLog } from "@server/db";
import { logsDb, primaryLogsDb, db, orgs, requestAuditLog } from "@server/db";
import logger from "@server/logger";
import { and, eq, lt, sql } from "drizzle-orm";
import cache from "@server/lib/cache";
@@ -69,7 +69,7 @@ async function flushAuditLogs() {
try {
// Use a transaction to ensure all inserts succeed or fail together
// This prevents index corruption from partial writes
await db.transaction(async (tx) => {
await logsDb.transaction(async (tx) => {
// Batch insert logs in groups of 25 to avoid overwhelming the database
const BATCH_DB_SIZE = 25;
for (let i = 0; i < logsToWrite.length; i += BATCH_DB_SIZE) {
@@ -162,7 +162,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
try {
await db
await logsDb
.delete(requestAuditLog)
.where(
and(

View File

@@ -82,13 +82,13 @@ export default async function RootLayout({
<body className={`${font.className} h-screen-safe overflow-hidden`}>
<StoreInternalRedirect />
<TopLoader />
{build === "saas" && (
{/* build === "saas" && (
<Script
src="https://rybbit.fossorial.io/api/script.js"
data-site-id="fe1ff2a33287"
strategy="afterInteractive"
/>
)}
)*/}
<ViewportHeightFix />
<NextIntlClientProvider>
<ThemeProvider