mirror of
https://github.com/fosrl/pangolin.git
synced 2026-02-22 12:56:37 +00:00
Compare commits
6 Commits
k8s
...
logs-datab
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b622aca221 | ||
|
|
eedf57af89 | ||
|
|
756f3f32ca | ||
|
|
5987f6b2cd | ||
|
|
09a9457021 | ||
|
|
ca4643ec36 |
@@ -1,4 +1,5 @@
|
|||||||
export * from "./driver";
|
export * from "./driver";
|
||||||
|
export * from "./logsDriver";
|
||||||
export * from "./schema/schema";
|
export * from "./schema/schema";
|
||||||
export * from "./schema/privateSchema";
|
export * from "./schema/privateSchema";
|
||||||
export * from "./migrate";
|
export * from "./migrate";
|
||||||
|
|||||||
89
server/db/pg/logsDriver.ts
Normal file
89
server/db/pg/logsDriver.ts
Normal file
@@ -0,0 +1,89 @@
|
|||||||
|
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
||||||
|
import { Pool } from "pg";
|
||||||
|
import { readConfigFile } from "@server/lib/readConfigFile";
|
||||||
|
import { readPrivateConfigFile } from "@server/private/lib/readConfigFile";
|
||||||
|
import { withReplicas } from "drizzle-orm/pg-core";
|
||||||
|
import { build } from "@server/build";
|
||||||
|
import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver";
|
||||||
|
|
||||||
|
function createLogsDb() {
|
||||||
|
// Only use separate logs database in SaaS builds
|
||||||
|
if (build !== "saas") {
|
||||||
|
return mainDb;
|
||||||
|
}
|
||||||
|
|
||||||
|
const config = readConfigFile();
|
||||||
|
const privateConfig = readPrivateConfigFile();
|
||||||
|
|
||||||
|
// Merge configs, prioritizing private config
|
||||||
|
const logsConfig = privateConfig.postgres_logs || config.postgres_logs;
|
||||||
|
|
||||||
|
// Check environment variable first
|
||||||
|
let connectionString = process.env.POSTGRES_LOGS_CONNECTION_STRING;
|
||||||
|
let replicaConnections: Array<{ connection_string: string }> = [];
|
||||||
|
|
||||||
|
if (!connectionString && logsConfig) {
|
||||||
|
connectionString = logsConfig.connection_string;
|
||||||
|
replicaConnections = logsConfig.replicas || [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// If POSTGRES_LOGS_REPLICA_CONNECTION_STRINGS is set, use it
|
||||||
|
if (process.env.POSTGRES_LOGS_REPLICA_CONNECTION_STRINGS) {
|
||||||
|
replicaConnections =
|
||||||
|
process.env.POSTGRES_LOGS_REPLICA_CONNECTION_STRINGS.split(",").map(
|
||||||
|
(conn) => ({
|
||||||
|
connection_string: conn.trim()
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If no logs database is configured, fall back to main database
|
||||||
|
if (!connectionString) {
|
||||||
|
return mainDb;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create separate connection pool for logs database
|
||||||
|
const poolConfig = logsConfig?.pool || config.postgres?.pool;
|
||||||
|
const primaryPool = new Pool({
|
||||||
|
connectionString,
|
||||||
|
max: poolConfig?.max_connections || 20,
|
||||||
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
|
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
|
||||||
|
});
|
||||||
|
|
||||||
|
const replicas = [];
|
||||||
|
|
||||||
|
if (!replicaConnections.length) {
|
||||||
|
replicas.push(
|
||||||
|
DrizzlePostgres(primaryPool, {
|
||||||
|
logger: process.env.QUERY_LOGGING == "true"
|
||||||
|
})
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
for (const conn of replicaConnections) {
|
||||||
|
const replicaPool = new Pool({
|
||||||
|
connectionString: conn.connection_string,
|
||||||
|
max: poolConfig?.max_replica_connections || 20,
|
||||||
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
|
connectionTimeoutMillis:
|
||||||
|
poolConfig?.connection_timeout_ms || 5000
|
||||||
|
});
|
||||||
|
replicas.push(
|
||||||
|
DrizzlePostgres(replicaPool, {
|
||||||
|
logger: process.env.QUERY_LOGGING == "true"
|
||||||
|
})
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return withReplicas(
|
||||||
|
DrizzlePostgres(primaryPool, {
|
||||||
|
logger: process.env.QUERY_LOGGING == "true"
|
||||||
|
}),
|
||||||
|
replicas as any
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
export const logsDb = createLogsDb();
|
||||||
|
export default logsDb;
|
||||||
|
export const primaryLogsDb = logsDb.$primary;
|
||||||
@@ -1,4 +1,5 @@
|
|||||||
export * from "./driver";
|
export * from "./driver";
|
||||||
|
export * from "./logsDriver";
|
||||||
export * from "./schema/schema";
|
export * from "./schema/schema";
|
||||||
export * from "./schema/privateSchema";
|
export * from "./schema/privateSchema";
|
||||||
export * from "./migrate";
|
export * from "./migrate";
|
||||||
|
|||||||
7
server/db/sqlite/logsDriver.ts
Normal file
7
server/db/sqlite/logsDriver.ts
Normal file
@@ -0,0 +1,7 @@
|
|||||||
|
import { db as mainDb } from "./driver";
|
||||||
|
|
||||||
|
// SQLite doesn't support separate databases for logs in the same way as Postgres
|
||||||
|
// Always use the main database connection for SQLite
|
||||||
|
export const logsDb = mainDb;
|
||||||
|
export default logsDb;
|
||||||
|
export const primaryLogsDb = logsDb;
|
||||||
@@ -46,8 +46,6 @@ export class UsageService {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
let orgIdToUse = await this.getBillingOrg(orgId, transaction);
|
|
||||||
|
|
||||||
// Truncate value to 11 decimal places
|
// Truncate value to 11 decimal places
|
||||||
value = this.truncateValue(value);
|
value = this.truncateValue(value);
|
||||||
|
|
||||||
@@ -59,6 +57,7 @@ export class UsageService {
|
|||||||
try {
|
try {
|
||||||
let usage;
|
let usage;
|
||||||
if (transaction) {
|
if (transaction) {
|
||||||
|
const orgIdToUse = await this.getBillingOrg(orgId, transaction);
|
||||||
usage = await this.internalAddUsage(
|
usage = await this.internalAddUsage(
|
||||||
orgIdToUse,
|
orgIdToUse,
|
||||||
featureId,
|
featureId,
|
||||||
@@ -67,6 +66,7 @@ export class UsageService {
|
|||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
await db.transaction(async (trx) => {
|
await db.transaction(async (trx) => {
|
||||||
|
const orgIdToUse = await this.getBillingOrg(orgId, trx);
|
||||||
usage = await this.internalAddUsage(
|
usage = await this.internalAddUsage(
|
||||||
orgIdToUse,
|
orgIdToUse,
|
||||||
featureId,
|
featureId,
|
||||||
@@ -92,7 +92,7 @@ export class UsageService {
|
|||||||
const delay = baseDelay + jitter;
|
const delay = baseDelay + jitter;
|
||||||
|
|
||||||
logger.warn(
|
logger.warn(
|
||||||
`Deadlock detected for ${orgIdToUse}/${featureId}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms`
|
`Deadlock detected for ${orgId}/${featureId}, retrying attempt ${attempt}/${maxRetries} after ${delay.toFixed(0)}ms`
|
||||||
);
|
);
|
||||||
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
@@ -100,7 +100,7 @@ export class UsageService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
logger.error(
|
logger.error(
|
||||||
`Failed to add usage for ${orgIdToUse}/${featureId} after ${attempt} attempts:`,
|
`Failed to add usage for ${orgId}/${featureId} after ${attempt} attempts:`,
|
||||||
error
|
error
|
||||||
);
|
);
|
||||||
break;
|
break;
|
||||||
@@ -169,7 +169,7 @@ export class UsageService {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
let orgIdToUse = await this.getBillingOrg(orgId);
|
const orgIdToUse = await this.getBillingOrg(orgId);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
// Truncate value to 11 decimal places if provided
|
// Truncate value to 11 decimal places if provided
|
||||||
@@ -227,7 +227,7 @@ export class UsageService {
|
|||||||
orgId: string,
|
orgId: string,
|
||||||
featureId: FeatureId
|
featureId: FeatureId
|
||||||
): Promise<string | null> {
|
): Promise<string | null> {
|
||||||
let orgIdToUse = await this.getBillingOrg(orgId);
|
const orgIdToUse = await this.getBillingOrg(orgId);
|
||||||
|
|
||||||
const cacheKey = `customer_${orgIdToUse}_${featureId}`;
|
const cacheKey = `customer_${orgIdToUse}_${featureId}`;
|
||||||
const cached = cache.get<string>(cacheKey);
|
const cached = cache.get<string>(cacheKey);
|
||||||
@@ -274,7 +274,7 @@ export class UsageService {
|
|||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
let orgIdToUse = await this.getBillingOrg(orgId, trx);
|
const orgIdToUse = await this.getBillingOrg(orgId, trx);
|
||||||
|
|
||||||
const usageId = `${orgIdToUse}-${featureId}`;
|
const usageId = `${orgIdToUse}-${featureId}`;
|
||||||
|
|
||||||
@@ -382,7 +382,7 @@ export class UsageService {
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
let orgIdToUse = await this.getBillingOrg(orgId, trx);
|
const orgIdToUse = await this.getBillingOrg(orgId, trx);
|
||||||
|
|
||||||
// This method should check the current usage against the limits set for the organization
|
// This method should check the current usage against the limits set for the organization
|
||||||
// and kick out all of the sites on the org
|
// and kick out all of the sites on the org
|
||||||
|
|||||||
@@ -189,6 +189,46 @@ export const configSchema = z
|
|||||||
.prefault({})
|
.prefault({})
|
||||||
})
|
})
|
||||||
.optional(),
|
.optional(),
|
||||||
|
postgres_logs: z
|
||||||
|
.object({
|
||||||
|
connection_string: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.transform(getEnvOrYaml("POSTGRES_LOGS_CONNECTION_STRING")),
|
||||||
|
replicas: z
|
||||||
|
.array(
|
||||||
|
z.object({
|
||||||
|
connection_string: z.string()
|
||||||
|
})
|
||||||
|
)
|
||||||
|
.optional(),
|
||||||
|
pool: z
|
||||||
|
.object({
|
||||||
|
max_connections: z
|
||||||
|
.number()
|
||||||
|
.positive()
|
||||||
|
.optional()
|
||||||
|
.default(20),
|
||||||
|
max_replica_connections: z
|
||||||
|
.number()
|
||||||
|
.positive()
|
||||||
|
.optional()
|
||||||
|
.default(10),
|
||||||
|
idle_timeout_ms: z
|
||||||
|
.number()
|
||||||
|
.positive()
|
||||||
|
.optional()
|
||||||
|
.default(30000),
|
||||||
|
connection_timeout_ms: z
|
||||||
|
.number()
|
||||||
|
.positive()
|
||||||
|
.optional()
|
||||||
|
.default(5000)
|
||||||
|
})
|
||||||
|
.optional()
|
||||||
|
.prefault({})
|
||||||
|
})
|
||||||
|
.optional(),
|
||||||
traefik: z
|
traefik: z
|
||||||
.object({
|
.object({
|
||||||
http_entrypoint: z.string().optional().default("web"),
|
http_entrypoint: z.string().optional().default("web"),
|
||||||
|
|||||||
@@ -78,7 +78,8 @@ export async function getOrgTierData(
|
|||||||
if (
|
if (
|
||||||
subscription.type === "tier1" ||
|
subscription.type === "tier1" ||
|
||||||
subscription.type === "tier2" ||
|
subscription.type === "tier2" ||
|
||||||
subscription.type === "tier3"
|
subscription.type === "tier3" ||
|
||||||
|
subscription.type === "enterprise"
|
||||||
) {
|
) {
|
||||||
tier = subscription.type;
|
tier = subscription.type;
|
||||||
active = true;
|
active = true;
|
||||||
|
|||||||
@@ -11,7 +11,7 @@
|
|||||||
* This file is not licensed under the AGPLv3.
|
* This file is not licensed under the AGPLv3.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { accessAuditLog, db, orgs } from "@server/db";
|
import { accessAuditLog, logsDb, db, orgs } from "@server/db";
|
||||||
import { getCountryCodeForIp } from "@server/lib/geoip";
|
import { getCountryCodeForIp } from "@server/lib/geoip";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { and, eq, lt } from "drizzle-orm";
|
import { and, eq, lt } from "drizzle-orm";
|
||||||
@@ -52,7 +52,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
|
|||||||
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
|
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await db
|
await logsDb
|
||||||
.delete(accessAuditLog)
|
.delete(accessAuditLog)
|
||||||
.where(
|
.where(
|
||||||
and(
|
and(
|
||||||
@@ -124,7 +124,7 @@ export async function logAccessAudit(data: {
|
|||||||
? await getCountryCodeFromIp(data.requestIp)
|
? await getCountryCodeFromIp(data.requestIp)
|
||||||
: undefined;
|
: undefined;
|
||||||
|
|
||||||
await db.insert(accessAuditLog).values({
|
await logsDb.insert(accessAuditLog).values({
|
||||||
timestamp: timestamp,
|
timestamp: timestamp,
|
||||||
orgId: data.orgId,
|
orgId: data.orgId,
|
||||||
actorType,
|
actorType,
|
||||||
|
|||||||
@@ -83,6 +83,46 @@ export const privateConfigSchema = z.object({
|
|||||||
// .optional()
|
// .optional()
|
||||||
})
|
})
|
||||||
.optional(),
|
.optional(),
|
||||||
|
postgres_logs: z
|
||||||
|
.object({
|
||||||
|
connection_string: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.transform(getEnvOrYaml("POSTGRES_LOGS_CONNECTION_STRING")),
|
||||||
|
replicas: z
|
||||||
|
.array(
|
||||||
|
z.object({
|
||||||
|
connection_string: z.string()
|
||||||
|
})
|
||||||
|
)
|
||||||
|
.optional(),
|
||||||
|
pool: z
|
||||||
|
.object({
|
||||||
|
max_connections: z
|
||||||
|
.number()
|
||||||
|
.positive()
|
||||||
|
.optional()
|
||||||
|
.default(20),
|
||||||
|
max_replica_connections: z
|
||||||
|
.number()
|
||||||
|
.positive()
|
||||||
|
.optional()
|
||||||
|
.default(10),
|
||||||
|
idle_timeout_ms: z
|
||||||
|
.number()
|
||||||
|
.positive()
|
||||||
|
.optional()
|
||||||
|
.default(30000),
|
||||||
|
connection_timeout_ms: z
|
||||||
|
.number()
|
||||||
|
.positive()
|
||||||
|
.optional()
|
||||||
|
.default(5000)
|
||||||
|
})
|
||||||
|
.optional()
|
||||||
|
.prefault({})
|
||||||
|
})
|
||||||
|
.optional(),
|
||||||
gerbil: z
|
gerbil: z
|
||||||
.object({
|
.object({
|
||||||
local_exit_node_reachable_at: z
|
local_exit_node_reachable_at: z
|
||||||
|
|||||||
@@ -12,7 +12,7 @@
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
import { ActionsEnum } from "@server/auth/actions";
|
import { ActionsEnum } from "@server/auth/actions";
|
||||||
import { actionAuditLog, db, orgs } from "@server/db";
|
import { actionAuditLog, logsDb, db, orgs } from "@server/db";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import HttpCode from "@server/types/HttpCode";
|
import HttpCode from "@server/types/HttpCode";
|
||||||
import { Request, Response, NextFunction } from "express";
|
import { Request, Response, NextFunction } from "express";
|
||||||
@@ -54,7 +54,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
|
|||||||
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
|
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await db
|
await logsDb
|
||||||
.delete(actionAuditLog)
|
.delete(actionAuditLog)
|
||||||
.where(
|
.where(
|
||||||
and(
|
and(
|
||||||
@@ -123,7 +123,7 @@ export function logActionAudit(action: ActionsEnum) {
|
|||||||
metadata = JSON.stringify(req.params);
|
metadata = JSON.stringify(req.params);
|
||||||
}
|
}
|
||||||
|
|
||||||
await db.insert(actionAuditLog).values({
|
await logsDb.insert(actionAuditLog).values({
|
||||||
timestamp,
|
timestamp,
|
||||||
orgId,
|
orgId,
|
||||||
actorType,
|
actorType,
|
||||||
|
|||||||
@@ -11,11 +11,11 @@
|
|||||||
* This file is not licensed under the AGPLv3.
|
* This file is not licensed under the AGPLv3.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { accessAuditLog, db, resources } from "@server/db";
|
import { accessAuditLog, logsDb, resources, db, primaryDb } from "@server/db";
|
||||||
import { registry } from "@server/openApi";
|
import { registry } from "@server/openApi";
|
||||||
import { NextFunction } from "express";
|
import { NextFunction } from "express";
|
||||||
import { Request, Response } from "express";
|
import { Request, Response } from "express";
|
||||||
import { eq, gt, lt, and, count, desc } from "drizzle-orm";
|
import { eq, gt, lt, and, count, desc, inArray } from "drizzle-orm";
|
||||||
import { OpenAPITags } from "@server/openApi";
|
import { OpenAPITags } from "@server/openApi";
|
||||||
import { z } from "zod";
|
import { z } from "zod";
|
||||||
import createHttpError from "http-errors";
|
import createHttpError from "http-errors";
|
||||||
@@ -115,7 +115,7 @@ function getWhere(data: Q) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function queryAccess(data: Q) {
|
export function queryAccess(data: Q) {
|
||||||
return db
|
return logsDb
|
||||||
.select({
|
.select({
|
||||||
orgId: accessAuditLog.orgId,
|
orgId: accessAuditLog.orgId,
|
||||||
action: accessAuditLog.action,
|
action: accessAuditLog.action,
|
||||||
@@ -133,16 +133,46 @@ export function queryAccess(data: Q) {
|
|||||||
actor: accessAuditLog.actor
|
actor: accessAuditLog.actor
|
||||||
})
|
})
|
||||||
.from(accessAuditLog)
|
.from(accessAuditLog)
|
||||||
.leftJoin(
|
|
||||||
resources,
|
|
||||||
eq(accessAuditLog.resourceId, resources.resourceId)
|
|
||||||
)
|
|
||||||
.where(getWhere(data))
|
.where(getWhere(data))
|
||||||
.orderBy(desc(accessAuditLog.timestamp), desc(accessAuditLog.id));
|
.orderBy(desc(accessAuditLog.timestamp), desc(accessAuditLog.id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function enrichWithResourceDetails(logs: Awaited<ReturnType<typeof queryAccess>>) {
|
||||||
|
// If logs database is the same as main database, we can do a join
|
||||||
|
// Otherwise, we need to fetch resource details separately
|
||||||
|
const resourceIds = logs
|
||||||
|
.map(log => log.resourceId)
|
||||||
|
.filter((id): id is number => id !== null && id !== undefined);
|
||||||
|
|
||||||
|
if (resourceIds.length === 0) {
|
||||||
|
return logs.map(log => ({ ...log, resourceName: null, resourceNiceId: null }));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch resource details from main database
|
||||||
|
const resourceDetails = await primaryDb
|
||||||
|
.select({
|
||||||
|
resourceId: resources.resourceId,
|
||||||
|
name: resources.name,
|
||||||
|
niceId: resources.niceId
|
||||||
|
})
|
||||||
|
.from(resources)
|
||||||
|
.where(inArray(resources.resourceId, resourceIds));
|
||||||
|
|
||||||
|
// Create a map for quick lookup
|
||||||
|
const resourceMap = new Map(
|
||||||
|
resourceDetails.map(r => [r.resourceId, { name: r.name, niceId: r.niceId }])
|
||||||
|
);
|
||||||
|
|
||||||
|
// Enrich logs with resource details
|
||||||
|
return logs.map(log => ({
|
||||||
|
...log,
|
||||||
|
resourceName: log.resourceId ? resourceMap.get(log.resourceId)?.name ?? null : null,
|
||||||
|
resourceNiceId: log.resourceId ? resourceMap.get(log.resourceId)?.niceId ?? null : null
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
export function countAccessQuery(data: Q) {
|
export function countAccessQuery(data: Q) {
|
||||||
const countQuery = db
|
const countQuery = logsDb
|
||||||
.select({ count: count() })
|
.select({ count: count() })
|
||||||
.from(accessAuditLog)
|
.from(accessAuditLog)
|
||||||
.where(getWhere(data));
|
.where(getWhere(data));
|
||||||
@@ -161,7 +191,7 @@ async function queryUniqueFilterAttributes(
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Get unique actors
|
// Get unique actors
|
||||||
const uniqueActors = await db
|
const uniqueActors = await logsDb
|
||||||
.selectDistinct({
|
.selectDistinct({
|
||||||
actor: accessAuditLog.actor
|
actor: accessAuditLog.actor
|
||||||
})
|
})
|
||||||
@@ -169,7 +199,7 @@ async function queryUniqueFilterAttributes(
|
|||||||
.where(baseConditions);
|
.where(baseConditions);
|
||||||
|
|
||||||
// Get unique locations
|
// Get unique locations
|
||||||
const uniqueLocations = await db
|
const uniqueLocations = await logsDb
|
||||||
.selectDistinct({
|
.selectDistinct({
|
||||||
locations: accessAuditLog.location
|
locations: accessAuditLog.location
|
||||||
})
|
})
|
||||||
@@ -177,25 +207,40 @@ async function queryUniqueFilterAttributes(
|
|||||||
.where(baseConditions);
|
.where(baseConditions);
|
||||||
|
|
||||||
// Get unique resources with names
|
// Get unique resources with names
|
||||||
const uniqueResources = await db
|
const uniqueResources = await logsDb
|
||||||
.selectDistinct({
|
.selectDistinct({
|
||||||
id: accessAuditLog.resourceId,
|
id: accessAuditLog.resourceId
|
||||||
name: resources.name
|
|
||||||
})
|
})
|
||||||
.from(accessAuditLog)
|
.from(accessAuditLog)
|
||||||
.leftJoin(
|
|
||||||
resources,
|
|
||||||
eq(accessAuditLog.resourceId, resources.resourceId)
|
|
||||||
)
|
|
||||||
.where(baseConditions);
|
.where(baseConditions);
|
||||||
|
|
||||||
|
// Fetch resource names from main database for the unique resource IDs
|
||||||
|
const resourceIds = uniqueResources
|
||||||
|
.map(row => row.id)
|
||||||
|
.filter((id): id is number => id !== null);
|
||||||
|
|
||||||
|
let resourcesWithNames: Array<{ id: number; name: string | null }> = [];
|
||||||
|
|
||||||
|
if (resourceIds.length > 0) {
|
||||||
|
const resourceDetails = await primaryDb
|
||||||
|
.select({
|
||||||
|
resourceId: resources.resourceId,
|
||||||
|
name: resources.name
|
||||||
|
})
|
||||||
|
.from(resources)
|
||||||
|
.where(inArray(resources.resourceId, resourceIds));
|
||||||
|
|
||||||
|
resourcesWithNames = resourceDetails.map(r => ({
|
||||||
|
id: r.resourceId,
|
||||||
|
name: r.name
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
actors: uniqueActors
|
actors: uniqueActors
|
||||||
.map((row) => row.actor)
|
.map((row) => row.actor)
|
||||||
.filter((actor): actor is string => actor !== null),
|
.filter((actor): actor is string => actor !== null),
|
||||||
resources: uniqueResources.filter(
|
resources: resourcesWithNames,
|
||||||
(row): row is { id: number; name: string | null } => row.id !== null
|
|
||||||
),
|
|
||||||
locations: uniqueLocations
|
locations: uniqueLocations
|
||||||
.map((row) => row.locations)
|
.map((row) => row.locations)
|
||||||
.filter((location): location is string => location !== null)
|
.filter((location): location is string => location !== null)
|
||||||
@@ -243,7 +288,10 @@ export async function queryAccessAuditLogs(
|
|||||||
|
|
||||||
const baseQuery = queryAccess(data);
|
const baseQuery = queryAccess(data);
|
||||||
|
|
||||||
const log = await baseQuery.limit(data.limit).offset(data.offset);
|
const logsRaw = await baseQuery.limit(data.limit).offset(data.offset);
|
||||||
|
|
||||||
|
// Enrich with resource details (handles cross-database scenario)
|
||||||
|
const log = await enrichWithResourceDetails(logsRaw);
|
||||||
|
|
||||||
const totalCountResult = await countAccessQuery(data);
|
const totalCountResult = await countAccessQuery(data);
|
||||||
const totalCount = totalCountResult[0].count;
|
const totalCount = totalCountResult[0].count;
|
||||||
|
|||||||
@@ -11,7 +11,7 @@
|
|||||||
* This file is not licensed under the AGPLv3.
|
* This file is not licensed under the AGPLv3.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import { actionAuditLog, db } from "@server/db";
|
import { actionAuditLog, logsDb } from "@server/db";
|
||||||
import { registry } from "@server/openApi";
|
import { registry } from "@server/openApi";
|
||||||
import { NextFunction } from "express";
|
import { NextFunction } from "express";
|
||||||
import { Request, Response } from "express";
|
import { Request, Response } from "express";
|
||||||
@@ -97,7 +97,7 @@ function getWhere(data: Q) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function queryAction(data: Q) {
|
export function queryAction(data: Q) {
|
||||||
return db
|
return logsDb
|
||||||
.select({
|
.select({
|
||||||
orgId: actionAuditLog.orgId,
|
orgId: actionAuditLog.orgId,
|
||||||
action: actionAuditLog.action,
|
action: actionAuditLog.action,
|
||||||
@@ -113,7 +113,7 @@ export function queryAction(data: Q) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function countActionQuery(data: Q) {
|
export function countActionQuery(data: Q) {
|
||||||
const countQuery = db
|
const countQuery = logsDb
|
||||||
.select({ count: count() })
|
.select({ count: count() })
|
||||||
.from(actionAuditLog)
|
.from(actionAuditLog)
|
||||||
.where(getWhere(data));
|
.where(getWhere(data));
|
||||||
@@ -132,14 +132,14 @@ async function queryUniqueFilterAttributes(
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Get unique actors
|
// Get unique actors
|
||||||
const uniqueActors = await db
|
const uniqueActors = await logsDb
|
||||||
.selectDistinct({
|
.selectDistinct({
|
||||||
actor: actionAuditLog.actor
|
actor: actionAuditLog.actor
|
||||||
})
|
})
|
||||||
.from(actionAuditLog)
|
.from(actionAuditLog)
|
||||||
.where(baseConditions);
|
.where(baseConditions);
|
||||||
|
|
||||||
const uniqueActions = await db
|
const uniqueActions = await logsDb
|
||||||
.selectDistinct({
|
.selectDistinct({
|
||||||
action: actionAuditLog.action
|
action: actionAuditLog.action
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { db, requestAuditLog, driver, primaryDb } from "@server/db";
|
import { logsDb, requestAuditLog, driver, primaryLogsDb } from "@server/db";
|
||||||
import { registry } from "@server/openApi";
|
import { registry } from "@server/openApi";
|
||||||
import { NextFunction } from "express";
|
import { NextFunction } from "express";
|
||||||
import { Request, Response } from "express";
|
import { Request, Response } from "express";
|
||||||
@@ -74,12 +74,12 @@ async function query(query: Q) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const [all] = await primaryDb
|
const [all] = await primaryLogsDb
|
||||||
.select({ total: count() })
|
.select({ total: count() })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(baseConditions);
|
.where(baseConditions);
|
||||||
|
|
||||||
const [blocked] = await primaryDb
|
const [blocked] = await primaryLogsDb
|
||||||
.select({ total: count() })
|
.select({ total: count() })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(and(baseConditions, eq(requestAuditLog.action, false)));
|
.where(and(baseConditions, eq(requestAuditLog.action, false)));
|
||||||
@@ -90,7 +90,7 @@ async function query(query: Q) {
|
|||||||
|
|
||||||
const DISTINCT_LIMIT = 500;
|
const DISTINCT_LIMIT = 500;
|
||||||
|
|
||||||
const requestsPerCountry = await primaryDb
|
const requestsPerCountry = await primaryLogsDb
|
||||||
.selectDistinct({
|
.selectDistinct({
|
||||||
code: requestAuditLog.location,
|
code: requestAuditLog.location,
|
||||||
count: totalQ
|
count: totalQ
|
||||||
@@ -118,7 +118,7 @@ async function query(query: Q) {
|
|||||||
const booleanTrue = driver === "pg" ? sql`true` : sql`1`;
|
const booleanTrue = driver === "pg" ? sql`true` : sql`1`;
|
||||||
const booleanFalse = driver === "pg" ? sql`false` : sql`0`;
|
const booleanFalse = driver === "pg" ? sql`false` : sql`0`;
|
||||||
|
|
||||||
const requestsPerDay = await primaryDb
|
const requestsPerDay = await primaryLogsDb
|
||||||
.select({
|
.select({
|
||||||
day: groupByDayFunction.as("day"),
|
day: groupByDayFunction.as("day"),
|
||||||
allowedCount:
|
allowedCount:
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
import { db, primaryDb, requestAuditLog, resources } from "@server/db";
|
import { logsDb, primaryLogsDb, requestAuditLog, resources, db, primaryDb } from "@server/db";
|
||||||
import { registry } from "@server/openApi";
|
import { registry } from "@server/openApi";
|
||||||
import { NextFunction } from "express";
|
import { NextFunction } from "express";
|
||||||
import { Request, Response } from "express";
|
import { Request, Response } from "express";
|
||||||
import { eq, gt, lt, and, count, desc } from "drizzle-orm";
|
import { eq, gt, lt, and, count, desc, inArray } from "drizzle-orm";
|
||||||
import { OpenAPITags } from "@server/openApi";
|
import { OpenAPITags } from "@server/openApi";
|
||||||
import { z } from "zod";
|
import { z } from "zod";
|
||||||
import createHttpError from "http-errors";
|
import createHttpError from "http-errors";
|
||||||
@@ -107,7 +107,7 @@ function getWhere(data: Q) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function queryRequest(data: Q) {
|
export function queryRequest(data: Q) {
|
||||||
return primaryDb
|
return primaryLogsDb
|
||||||
.select({
|
.select({
|
||||||
id: requestAuditLog.id,
|
id: requestAuditLog.id,
|
||||||
timestamp: requestAuditLog.timestamp,
|
timestamp: requestAuditLog.timestamp,
|
||||||
@@ -129,21 +129,49 @@ export function queryRequest(data: Q) {
|
|||||||
host: requestAuditLog.host,
|
host: requestAuditLog.host,
|
||||||
path: requestAuditLog.path,
|
path: requestAuditLog.path,
|
||||||
method: requestAuditLog.method,
|
method: requestAuditLog.method,
|
||||||
tls: requestAuditLog.tls,
|
tls: requestAuditLog.tls
|
||||||
resourceName: resources.name,
|
|
||||||
resourceNiceId: resources.niceId
|
|
||||||
})
|
})
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.leftJoin(
|
|
||||||
resources,
|
|
||||||
eq(requestAuditLog.resourceId, resources.resourceId)
|
|
||||||
) // TODO: Is this efficient?
|
|
||||||
.where(getWhere(data))
|
.where(getWhere(data))
|
||||||
.orderBy(desc(requestAuditLog.timestamp));
|
.orderBy(desc(requestAuditLog.timestamp));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async function enrichWithResourceDetails(logs: Awaited<ReturnType<typeof queryRequest>>) {
|
||||||
|
// If logs database is the same as main database, we can do a join
|
||||||
|
// Otherwise, we need to fetch resource details separately
|
||||||
|
const resourceIds = logs
|
||||||
|
.map(log => log.resourceId)
|
||||||
|
.filter((id): id is number => id !== null && id !== undefined);
|
||||||
|
|
||||||
|
if (resourceIds.length === 0) {
|
||||||
|
return logs.map(log => ({ ...log, resourceName: null, resourceNiceId: null }));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fetch resource details from main database
|
||||||
|
const resourceDetails = await primaryDb
|
||||||
|
.select({
|
||||||
|
resourceId: resources.resourceId,
|
||||||
|
name: resources.name,
|
||||||
|
niceId: resources.niceId
|
||||||
|
})
|
||||||
|
.from(resources)
|
||||||
|
.where(inArray(resources.resourceId, resourceIds));
|
||||||
|
|
||||||
|
// Create a map for quick lookup
|
||||||
|
const resourceMap = new Map(
|
||||||
|
resourceDetails.map(r => [r.resourceId, { name: r.name, niceId: r.niceId }])
|
||||||
|
);
|
||||||
|
|
||||||
|
// Enrich logs with resource details
|
||||||
|
return logs.map(log => ({
|
||||||
|
...log,
|
||||||
|
resourceName: log.resourceId ? resourceMap.get(log.resourceId)?.name ?? null : null,
|
||||||
|
resourceNiceId: log.resourceId ? resourceMap.get(log.resourceId)?.niceId ?? null : null
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
export function countRequestQuery(data: Q) {
|
export function countRequestQuery(data: Q) {
|
||||||
const countQuery = primaryDb
|
const countQuery = primaryLogsDb
|
||||||
.select({ count: count() })
|
.select({ count: count() })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(getWhere(data));
|
.where(getWhere(data));
|
||||||
@@ -185,36 +213,31 @@ async function queryUniqueFilterAttributes(
|
|||||||
uniquePaths,
|
uniquePaths,
|
||||||
uniqueResources
|
uniqueResources
|
||||||
] = await Promise.all([
|
] = await Promise.all([
|
||||||
primaryDb
|
primaryLogsDb
|
||||||
.selectDistinct({ actor: requestAuditLog.actor })
|
.selectDistinct({ actor: requestAuditLog.actor })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(baseConditions)
|
.where(baseConditions)
|
||||||
.limit(DISTINCT_LIMIT + 1),
|
.limit(DISTINCT_LIMIT + 1),
|
||||||
primaryDb
|
primaryLogsDb
|
||||||
.selectDistinct({ locations: requestAuditLog.location })
|
.selectDistinct({ locations: requestAuditLog.location })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(baseConditions)
|
.where(baseConditions)
|
||||||
.limit(DISTINCT_LIMIT + 1),
|
.limit(DISTINCT_LIMIT + 1),
|
||||||
primaryDb
|
primaryLogsDb
|
||||||
.selectDistinct({ hosts: requestAuditLog.host })
|
.selectDistinct({ hosts: requestAuditLog.host })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(baseConditions)
|
.where(baseConditions)
|
||||||
.limit(DISTINCT_LIMIT + 1),
|
.limit(DISTINCT_LIMIT + 1),
|
||||||
primaryDb
|
primaryLogsDb
|
||||||
.selectDistinct({ paths: requestAuditLog.path })
|
.selectDistinct({ paths: requestAuditLog.path })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(baseConditions)
|
.where(baseConditions)
|
||||||
.limit(DISTINCT_LIMIT + 1),
|
.limit(DISTINCT_LIMIT + 1),
|
||||||
primaryDb
|
primaryLogsDb
|
||||||
.selectDistinct({
|
.selectDistinct({
|
||||||
id: requestAuditLog.resourceId,
|
id: requestAuditLog.resourceId
|
||||||
name: resources.name
|
|
||||||
})
|
})
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.leftJoin(
|
|
||||||
resources,
|
|
||||||
eq(requestAuditLog.resourceId, resources.resourceId)
|
|
||||||
)
|
|
||||||
.where(baseConditions)
|
.where(baseConditions)
|
||||||
.limit(DISTINCT_LIMIT + 1)
|
.limit(DISTINCT_LIMIT + 1)
|
||||||
]);
|
]);
|
||||||
@@ -231,13 +254,33 @@ async function queryUniqueFilterAttributes(
|
|||||||
// throw new Error("Too many distinct filter attributes to retrieve. Please refine your time range.");
|
// throw new Error("Too many distinct filter attributes to retrieve. Please refine your time range.");
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
// Fetch resource names from main database for the unique resource IDs
|
||||||
|
const resourceIds = uniqueResources
|
||||||
|
.map(row => row.id)
|
||||||
|
.filter((id): id is number => id !== null);
|
||||||
|
|
||||||
|
let resourcesWithNames: Array<{ id: number; name: string | null }> = [];
|
||||||
|
|
||||||
|
if (resourceIds.length > 0) {
|
||||||
|
const resourceDetails = await primaryDb
|
||||||
|
.select({
|
||||||
|
resourceId: resources.resourceId,
|
||||||
|
name: resources.name
|
||||||
|
})
|
||||||
|
.from(resources)
|
||||||
|
.where(inArray(resources.resourceId, resourceIds));
|
||||||
|
|
||||||
|
resourcesWithNames = resourceDetails.map(r => ({
|
||||||
|
id: r.resourceId,
|
||||||
|
name: r.name
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
actors: uniqueActors
|
actors: uniqueActors
|
||||||
.map((row) => row.actor)
|
.map((row) => row.actor)
|
||||||
.filter((actor): actor is string => actor !== null),
|
.filter((actor): actor is string => actor !== null),
|
||||||
resources: uniqueResources.filter(
|
resources: resourcesWithNames,
|
||||||
(row): row is { id: number; name: string | null } => row.id !== null
|
|
||||||
),
|
|
||||||
locations: uniqueLocations
|
locations: uniqueLocations
|
||||||
.map((row) => row.locations)
|
.map((row) => row.locations)
|
||||||
.filter((location): location is string => location !== null),
|
.filter((location): location is string => location !== null),
|
||||||
@@ -280,7 +323,10 @@ export async function queryRequestAuditLogs(
|
|||||||
|
|
||||||
const baseQuery = queryRequest(data);
|
const baseQuery = queryRequest(data);
|
||||||
|
|
||||||
const log = await baseQuery.limit(data.limit).offset(data.offset);
|
const logsRaw = await baseQuery.limit(data.limit).offset(data.offset);
|
||||||
|
|
||||||
|
// Enrich with resource details (handles cross-database scenario)
|
||||||
|
const log = await enrichWithResourceDetails(logsRaw);
|
||||||
|
|
||||||
const totalCountResult = await countRequestQuery(data);
|
const totalCountResult = await countRequestQuery(data);
|
||||||
const totalCount = totalCountResult[0].count;
|
const totalCount = totalCountResult[0].count;
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { db, orgs, requestAuditLog } from "@server/db";
|
import { logsDb, primaryLogsDb, db, orgs, requestAuditLog } from "@server/db";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { and, eq, lt, sql } from "drizzle-orm";
|
import { and, eq, lt, sql } from "drizzle-orm";
|
||||||
import cache from "@server/lib/cache";
|
import cache from "@server/lib/cache";
|
||||||
@@ -69,7 +69,7 @@ async function flushAuditLogs() {
|
|||||||
try {
|
try {
|
||||||
// Use a transaction to ensure all inserts succeed or fail together
|
// Use a transaction to ensure all inserts succeed or fail together
|
||||||
// This prevents index corruption from partial writes
|
// This prevents index corruption from partial writes
|
||||||
await db.transaction(async (tx) => {
|
await logsDb.transaction(async (tx) => {
|
||||||
// Batch insert logs in groups of 25 to avoid overwhelming the database
|
// Batch insert logs in groups of 25 to avoid overwhelming the database
|
||||||
const BATCH_DB_SIZE = 25;
|
const BATCH_DB_SIZE = 25;
|
||||||
for (let i = 0; i < logsToWrite.length; i += BATCH_DB_SIZE) {
|
for (let i = 0; i < logsToWrite.length; i += BATCH_DB_SIZE) {
|
||||||
@@ -162,7 +162,7 @@ export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
|
|||||||
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
|
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await db
|
await logsDb
|
||||||
.delete(requestAuditLog)
|
.delete(requestAuditLog)
|
||||||
.where(
|
.where(
|
||||||
and(
|
and(
|
||||||
|
|||||||
@@ -197,7 +197,6 @@ export async function updateSiteBandwidth(
|
|||||||
usageService
|
usageService
|
||||||
.checkLimitSet(
|
.checkLimitSet(
|
||||||
orgId,
|
orgId,
|
||||||
|
|
||||||
FeatureId.EGRESS_DATA_MB,
|
FeatureId.EGRESS_DATA_MB,
|
||||||
bandwidthUsage
|
bandwidthUsage
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -82,13 +82,13 @@ export default async function RootLayout({
|
|||||||
<body className={`${font.className} h-screen-safe overflow-hidden`}>
|
<body className={`${font.className} h-screen-safe overflow-hidden`}>
|
||||||
<StoreInternalRedirect />
|
<StoreInternalRedirect />
|
||||||
<TopLoader />
|
<TopLoader />
|
||||||
{build === "saas" && (
|
{/* build === "saas" && (
|
||||||
<Script
|
<Script
|
||||||
src="https://rybbit.fossorial.io/api/script.js"
|
src="https://rybbit.fossorial.io/api/script.js"
|
||||||
data-site-id="fe1ff2a33287"
|
data-site-id="fe1ff2a33287"
|
||||||
strategy="afterInteractive"
|
strategy="afterInteractive"
|
||||||
/>
|
/>
|
||||||
)}
|
)*/}
|
||||||
<ViewportHeightFix />
|
<ViewportHeightFix />
|
||||||
<NextIntlClientProvider>
|
<NextIntlClientProvider>
|
||||||
<ThemeProvider
|
<ThemeProvider
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ export const isOrgSubscribed = cache(async (orgId: string) => {
|
|||||||
try {
|
try {
|
||||||
const subRes = await getCachedSubscription(orgId);
|
const subRes = await getCachedSubscription(orgId);
|
||||||
subscribed =
|
subscribed =
|
||||||
(subRes.data.data.tier == "tier1" || subRes.data.data.tier == "tier2" || subRes.data.data.tier == "tier3") &&
|
(subRes.data.data.tier == "tier1" || subRes.data.data.tier == "tier2" || subRes.data.data.tier == "tier3" || subRes.data.data.tier == "enterprise") &&
|
||||||
subRes.data.data.active;
|
subRes.data.data.active;
|
||||||
} catch {}
|
} catch {}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -42,7 +42,8 @@ export function SubscriptionStatusProvider({
|
|||||||
if (
|
if (
|
||||||
subscription.type == "tier1" ||
|
subscription.type == "tier1" ||
|
||||||
subscription.type == "tier2" ||
|
subscription.type == "tier2" ||
|
||||||
subscription.type == "tier3"
|
subscription.type == "tier3" ||
|
||||||
|
subscription.type == "enterprise"
|
||||||
) {
|
) {
|
||||||
return {
|
return {
|
||||||
tier: subscription.type,
|
tier: subscription.type,
|
||||||
@@ -61,7 +62,7 @@ export function SubscriptionStatusProvider({
|
|||||||
const isSubscribed = () => {
|
const isSubscribed = () => {
|
||||||
const { tier, active } = getTier();
|
const { tier, active } = getTier();
|
||||||
return (
|
return (
|
||||||
(tier == "tier1" || tier == "tier2" || tier == "tier3") &&
|
(tier == "tier1" || tier == "tier2" || tier == "tier3" || tier == "enterprise") &&
|
||||||
active
|
active
|
||||||
);
|
);
|
||||||
};
|
};
|
||||||
|
|||||||
Reference in New Issue
Block a user