mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-03 09:16:40 +00:00
Force big queries onto primary db to prevent 40001
This commit is contained in:
@@ -20,6 +20,7 @@ function createDb() {
|
|||||||
|
|
||||||
export const db = createDb();
|
export const db = createDb();
|
||||||
export default db;
|
export default db;
|
||||||
|
export const driver: "pg" | "sqlite" = "sqlite";
|
||||||
export type Transaction = Parameters<
|
export type Transaction = Parameters<
|
||||||
Parameters<(typeof db)["transaction"]>[0]
|
Parameters<(typeof db)["transaction"]>[0]
|
||||||
>[0];
|
>[0];
|
||||||
|
|||||||
@@ -12,6 +12,11 @@ import response from "@server/lib/response";
|
|||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { getSevenDaysAgo } from "@app/lib/getSevenDaysAgo";
|
import { getSevenDaysAgo } from "@app/lib/getSevenDaysAgo";
|
||||||
|
|
||||||
|
let primaryDb = db;
|
||||||
|
if (driver == "pg") {
|
||||||
|
primaryDb = db.$primary as typeof db; // select the primary instance in a replicated setup
|
||||||
|
}
|
||||||
|
|
||||||
const queryAccessAuditLogsQuery = z.object({
|
const queryAccessAuditLogsQuery = z.object({
|
||||||
// iso string just validate its a parseable date
|
// iso string just validate its a parseable date
|
||||||
timeStart: z
|
timeStart: z
|
||||||
@@ -74,12 +79,12 @@ async function query(query: Q) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const [all] = await db
|
const [all] = await primaryDb
|
||||||
.select({ total: count() })
|
.select({ total: count() })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(baseConditions);
|
.where(baseConditions);
|
||||||
|
|
||||||
const [blocked] = await db
|
const [blocked] = await primaryDb
|
||||||
.select({ total: count() })
|
.select({ total: count() })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(and(baseConditions, eq(requestAuditLog.action, false)));
|
.where(and(baseConditions, eq(requestAuditLog.action, false)));
|
||||||
@@ -88,7 +93,9 @@ async function query(query: Q) {
|
|||||||
.mapWith(Number)
|
.mapWith(Number)
|
||||||
.as("total");
|
.as("total");
|
||||||
|
|
||||||
const requestsPerCountry = await db
|
const DISTINCT_LIMIT = 500;
|
||||||
|
|
||||||
|
const requestsPerCountry = await primaryDb
|
||||||
.selectDistinct({
|
.selectDistinct({
|
||||||
code: requestAuditLog.location,
|
code: requestAuditLog.location,
|
||||||
count: totalQ
|
count: totalQ
|
||||||
@@ -96,7 +103,16 @@ async function query(query: Q) {
|
|||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(and(baseConditions, not(isNull(requestAuditLog.location))))
|
.where(and(baseConditions, not(isNull(requestAuditLog.location))))
|
||||||
.groupBy(requestAuditLog.location)
|
.groupBy(requestAuditLog.location)
|
||||||
.orderBy(desc(totalQ));
|
.orderBy(desc(totalQ))
|
||||||
|
.limit(DISTINCT_LIMIT+1);
|
||||||
|
|
||||||
|
if (requestsPerCountry.length > DISTINCT_LIMIT) {
|
||||||
|
// throw an error
|
||||||
|
throw createHttpError(
|
||||||
|
HttpCode.BAD_REQUEST,
|
||||||
|
`Too many distinct countries. Please narrow your query.`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
const groupByDayFunction =
|
const groupByDayFunction =
|
||||||
driver === "pg"
|
driver === "pg"
|
||||||
@@ -106,7 +122,7 @@ async function query(query: Q) {
|
|||||||
const booleanTrue = driver === "pg" ? sql`true` : sql`1`;
|
const booleanTrue = driver === "pg" ? sql`true` : sql`1`;
|
||||||
const booleanFalse = driver === "pg" ? sql`false` : sql`0`;
|
const booleanFalse = driver === "pg" ? sql`false` : sql`0`;
|
||||||
|
|
||||||
const requestsPerDay = await db
|
const requestsPerDay = await primaryDb
|
||||||
.select({
|
.select({
|
||||||
day: groupByDayFunction.as("day"),
|
day: groupByDayFunction.as("day"),
|
||||||
allowedCount:
|
allowedCount:
|
||||||
|
|||||||
@@ -1,4 +1,4 @@
|
|||||||
import { db, requestAuditLog, resources } from "@server/db";
|
import { db, driver, requestAuditLog, resources } from "@server/db";
|
||||||
import { registry } from "@server/openApi";
|
import { registry } from "@server/openApi";
|
||||||
import { NextFunction } from "express";
|
import { NextFunction } from "express";
|
||||||
import { Request, Response } from "express";
|
import { Request, Response } from "express";
|
||||||
@@ -13,6 +13,11 @@ import response from "@server/lib/response";
|
|||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { getSevenDaysAgo } from "@app/lib/getSevenDaysAgo";
|
import { getSevenDaysAgo } from "@app/lib/getSevenDaysAgo";
|
||||||
|
|
||||||
|
let primaryDb = db;
|
||||||
|
if (driver == "pg") {
|
||||||
|
primaryDb = db.$primary as typeof db; // select the primary instance in a replicated setup
|
||||||
|
}
|
||||||
|
|
||||||
export const queryAccessAuditLogsQuery = z.object({
|
export const queryAccessAuditLogsQuery = z.object({
|
||||||
// iso string just validate its a parseable date
|
// iso string just validate its a parseable date
|
||||||
timeStart: z
|
timeStart: z
|
||||||
@@ -107,7 +112,7 @@ function getWhere(data: Q) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function queryRequest(data: Q) {
|
export function queryRequest(data: Q) {
|
||||||
return db
|
return primaryDb
|
||||||
.select({
|
.select({
|
||||||
id: requestAuditLog.id,
|
id: requestAuditLog.id,
|
||||||
timestamp: requestAuditLog.timestamp,
|
timestamp: requestAuditLog.timestamp,
|
||||||
@@ -143,7 +148,7 @@ export function queryRequest(data: Q) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
export function countRequestQuery(data: Q) {
|
export function countRequestQuery(data: Q) {
|
||||||
const countQuery = db
|
const countQuery = primaryDb
|
||||||
.select({ count: count() })
|
.select({ count: count() })
|
||||||
.from(requestAuditLog)
|
.from(requestAuditLog)
|
||||||
.where(getWhere(data));
|
.where(getWhere(data));
|
||||||
@@ -173,50 +178,61 @@ async function queryUniqueFilterAttributes(
|
|||||||
eq(requestAuditLog.orgId, orgId)
|
eq(requestAuditLog.orgId, orgId)
|
||||||
);
|
);
|
||||||
|
|
||||||
// Get unique actors
|
const DISTINCT_LIMIT = 500;
|
||||||
const uniqueActors = await db
|
|
||||||
.selectDistinct({
|
|
||||||
actor: requestAuditLog.actor
|
|
||||||
})
|
|
||||||
.from(requestAuditLog)
|
|
||||||
.where(baseConditions);
|
|
||||||
|
|
||||||
// Get unique locations
|
// TODO: SOMEONE PLEASE OPTIMIZE THIS!!!!!
|
||||||
const uniqueLocations = await db
|
|
||||||
.selectDistinct({
|
|
||||||
locations: requestAuditLog.location
|
|
||||||
})
|
|
||||||
.from(requestAuditLog)
|
|
||||||
.where(baseConditions);
|
|
||||||
|
|
||||||
// Get unique actors
|
// Run all queries in parallel
|
||||||
const uniqueHosts = await db
|
const [
|
||||||
.selectDistinct({
|
uniqueActors,
|
||||||
hosts: requestAuditLog.host
|
uniqueLocations,
|
||||||
})
|
uniqueHosts,
|
||||||
.from(requestAuditLog)
|
uniquePaths,
|
||||||
.where(baseConditions);
|
uniqueResources
|
||||||
|
] = await Promise.all([
|
||||||
|
primaryDb
|
||||||
|
.selectDistinct({ actor: requestAuditLog.actor })
|
||||||
|
.from(requestAuditLog)
|
||||||
|
.where(baseConditions)
|
||||||
|
.limit(DISTINCT_LIMIT+1),
|
||||||
|
primaryDb
|
||||||
|
.selectDistinct({ locations: requestAuditLog.location })
|
||||||
|
.from(requestAuditLog)
|
||||||
|
.where(baseConditions)
|
||||||
|
.limit(DISTINCT_LIMIT+1),
|
||||||
|
primaryDb
|
||||||
|
.selectDistinct({ hosts: requestAuditLog.host })
|
||||||
|
.from(requestAuditLog)
|
||||||
|
.where(baseConditions)
|
||||||
|
.limit(DISTINCT_LIMIT+1),
|
||||||
|
primaryDb
|
||||||
|
.selectDistinct({ paths: requestAuditLog.path })
|
||||||
|
.from(requestAuditLog)
|
||||||
|
.where(baseConditions)
|
||||||
|
.limit(DISTINCT_LIMIT+1),
|
||||||
|
primaryDb
|
||||||
|
.selectDistinct({
|
||||||
|
id: requestAuditLog.resourceId,
|
||||||
|
name: resources.name
|
||||||
|
})
|
||||||
|
.from(requestAuditLog)
|
||||||
|
.leftJoin(
|
||||||
|
resources,
|
||||||
|
eq(requestAuditLog.resourceId, resources.resourceId)
|
||||||
|
)
|
||||||
|
.where(baseConditions)
|
||||||
|
.limit(DISTINCT_LIMIT+1)
|
||||||
|
]);
|
||||||
|
|
||||||
// Get unique actors
|
if (
|
||||||
const uniquePaths = await db
|
uniqueActors.length > DISTINCT_LIMIT ||
|
||||||
.selectDistinct({
|
uniqueLocations.length > DISTINCT_LIMIT ||
|
||||||
paths: requestAuditLog.path
|
uniqueHosts.length > DISTINCT_LIMIT ||
|
||||||
})
|
uniquePaths.length > DISTINCT_LIMIT ||
|
||||||
.from(requestAuditLog)
|
uniqueResources.length > DISTINCT_LIMIT
|
||||||
.where(baseConditions);
|
) {
|
||||||
|
throw new Error("Too many distinct filter attributes to retrieve. Please refine your time range.");
|
||||||
// Get unique resources with names
|
}
|
||||||
const uniqueResources = await db
|
|
||||||
.selectDistinct({
|
|
||||||
id: requestAuditLog.resourceId,
|
|
||||||
name: resources.name
|
|
||||||
})
|
|
||||||
.from(requestAuditLog)
|
|
||||||
.leftJoin(
|
|
||||||
resources,
|
|
||||||
eq(requestAuditLog.resourceId, resources.resourceId)
|
|
||||||
)
|
|
||||||
.where(baseConditions);
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
actors: uniqueActors
|
actors: uniqueActors
|
||||||
@@ -295,6 +311,12 @@ export async function queryRequestAuditLogs(
|
|||||||
});
|
});
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(error);
|
logger.error(error);
|
||||||
|
// if the message is "Too many distinct filter attributes to retrieve. Please refine your time range.", return a 400 and the message
|
||||||
|
if (error instanceof Error && error.message === "Too many distinct filter attributes to retrieve. Please refine your time range.") {
|
||||||
|
return next(
|
||||||
|
createHttpError(HttpCode.BAD_REQUEST, error.message)
|
||||||
|
);
|
||||||
|
}
|
||||||
return next(
|
return next(
|
||||||
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
|
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -194,11 +194,23 @@ export async function getOlmToken(
|
|||||||
.where(inArray(exitNodes.exitNodeId, exitNodeIds));
|
.where(inArray(exitNodes.exitNodeId, exitNodeIds));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Map exitNodeId to siteIds
|
||||||
|
const exitNodeIdToSiteIds: Record<number, number[]> = {};
|
||||||
|
for (const { sites: site } of clientSites) {
|
||||||
|
if (site.exitNodeId !== null) {
|
||||||
|
if (!exitNodeIdToSiteIds[site.exitNodeId]) {
|
||||||
|
exitNodeIdToSiteIds[site.exitNodeId] = [];
|
||||||
|
}
|
||||||
|
exitNodeIdToSiteIds[site.exitNodeId].push(site.siteId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const exitNodesHpData = allExitNodes.map((exitNode: ExitNode) => {
|
const exitNodesHpData = allExitNodes.map((exitNode: ExitNode) => {
|
||||||
return {
|
return {
|
||||||
publicKey: exitNode.publicKey,
|
publicKey: exitNode.publicKey,
|
||||||
relayPort: config.getRawConfig().gerbil.clients_start_port,
|
relayPort: config.getRawConfig().gerbil.clients_start_port,
|
||||||
endpoint: exitNode.endpoint
|
endpoint: exitNode.endpoint,
|
||||||
|
siteIds: exitNodeIdToSiteIds[exitNode.exitNodeId] ?? []
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user