From dae169540bd3430f698725f9d8c830f441e5be96 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 2 Mar 2026 16:49:17 -0800 Subject: [PATCH 01/16] Fix defaults for orgs --- server/lib/readConfigFile.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/lib/readConfigFile.ts b/server/lib/readConfigFile.ts index cca0aa6aa..f6c8592e9 100644 --- a/server/lib/readConfigFile.ts +++ b/server/lib/readConfigFile.ts @@ -302,8 +302,8 @@ export const configSchema = z .optional() .default({ block_size: 24, - subnet_group: "100.90.128.0/24", - utility_subnet_group: "100.96.128.0/24" + subnet_group: "100.90.128.0/20", + utility_subnet_group: "100.96.128.0/20" }), rate_limits: z .object({ From 6cf1b9b0108d5f4eda8e22f716f91e8874a4c558 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 2 Mar 2026 18:51:48 -0800 Subject: [PATCH 02/16] Support improved targets msg v2 --- server/lib/ip.ts | 123 +++++++++++ server/lib/rebuildClientAssociations.ts | 32 ++- server/routers/client/targets.ts | 209 ++++++++++++++---- server/routers/newt/buildConfiguration.ts | 26 ++- server/routers/newt/handleGetConfigMessage.ts | 5 +- .../siteResource/updateSiteResource.ts | 10 +- 6 files changed, 326 insertions(+), 79 deletions(-) diff --git a/server/lib/ip.ts b/server/lib/ip.ts index 21ec78c1b..3a29b8661 100644 --- a/server/lib/ip.ts +++ b/server/lib/ip.ts @@ -571,6 +571,129 @@ export function generateSubnetProxyTargets( return targets; } +export type SubnetProxyTargetV2 = { + sourcePrefixes: string[]; // must be cidrs + destPrefix: string; // must be a cidr + disableIcmp?: boolean; + rewriteTo?: string; // must be a cidr + portRange?: { + min: number; + max: number; + protocol: "tcp" | "udp"; + }[]; +}; + +export function generateSubnetProxyTargetV2( + siteResource: SiteResource, + clients: { + clientId: number; + pubKey: string | null; + subnet: string | null; + }[] +): SubnetProxyTargetV2 | undefined { + if (clients.length === 0) { + logger.debug( + `No clients have access to site resource ${siteResource.siteResourceId}, skipping target generation.` + ); + return; + } + + let target: SubnetProxyTargetV2 | null = null; + + const portRange = [ + ...parsePortRangeString(siteResource.tcpPortRangeString, "tcp"), + ...parsePortRangeString(siteResource.udpPortRangeString, "udp") + ]; + const disableIcmp = siteResource.disableIcmp ?? false; + + if (siteResource.mode == "host") { + let destination = siteResource.destination; + // check if this is a valid ip + const ipSchema = z.union([z.ipv4(), z.ipv6()]); + if (ipSchema.safeParse(destination).success) { + destination = `${destination}/32`; + + target = { + sourcePrefixes: [], + destPrefix: destination, + portRange, + disableIcmp + }; + } + + if (siteResource.alias && siteResource.aliasAddress) { + // also push a match for the alias address + target = { + sourcePrefixes: [], + destPrefix: `${siteResource.aliasAddress}/32`, + rewriteTo: destination, + portRange, + disableIcmp + }; + } + } else if (siteResource.mode == "cidr") { + target = { + sourcePrefixes: [], + destPrefix: siteResource.destination, + portRange, + disableIcmp + }; + } + + if (!target) { + return; + } + + for (const clientSite of clients) { + if (!clientSite.subnet) { + logger.debug( + `Client ${clientSite.clientId} has no subnet, skipping for site resource ${siteResource.siteResourceId}.` + ); + continue; + } + + const clientPrefix = `${clientSite.subnet.split("/")[0]}/32`; + + // add client prefix to source prefixes + target.sourcePrefixes.push(clientPrefix); + } + + // print a nice representation of the targets + // logger.debug( + // `Generated subnet proxy targets for: ${JSON.stringify(targets, null, 2)}` + // ); + + return target; +} + + +/** + * Converts a SubnetProxyTargetV2 to an array of SubnetProxyTarget (v1) + * by expanding each source prefix into its own target entry. + * @param targetV2 - The v2 target to convert + * @returns Array of v1 SubnetProxyTarget objects + */ + export function convertSubnetProxyTargetsV2ToV1( + targetsV2: SubnetProxyTargetV2[] + ): SubnetProxyTarget[] { + return targetsV2.flatMap((targetV2) => + targetV2.sourcePrefixes.map((sourcePrefix) => ({ + sourcePrefix, + destPrefix: targetV2.destPrefix, + ...(targetV2.disableIcmp !== undefined && { + disableIcmp: targetV2.disableIcmp + }), + ...(targetV2.rewriteTo !== undefined && { + rewriteTo: targetV2.rewriteTo + }), + ...(targetV2.portRange !== undefined && { + portRange: targetV2.portRange + }) + })) + ); + } + + // Custom schema for validating port range strings // Format: "80,443,8000-9000" or "*" for all ports, or empty string export const portRangeStringSchema = z diff --git a/server/lib/rebuildClientAssociations.ts b/server/lib/rebuildClientAssociations.ts index 625e57935..915c3648d 100644 --- a/server/lib/rebuildClientAssociations.ts +++ b/server/lib/rebuildClientAssociations.ts @@ -32,7 +32,7 @@ import logger from "@server/logger"; import { generateAliasConfig, generateRemoteSubnets, - generateSubnetProxyTargets, + generateSubnetProxyTargetV2, parseEndpoint, formatEndpoint } from "@server/lib/ip"; @@ -659,17 +659,14 @@ async function handleSubnetProxyTargetUpdates( ); if (addedClients.length > 0) { - const targetsToAdd = generateSubnetProxyTargets( + const targetToAdd = generateSubnetProxyTargetV2( siteResource, addedClients ); - if (targetsToAdd.length > 0) { - logger.info( - `Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` - ); + if (targetToAdd) { proxyJobs.push( - addSubnetProxyTargets(newt.newtId, targetsToAdd) + addSubnetProxyTargets(newt.newtId, [targetToAdd]) ); } @@ -695,17 +692,14 @@ async function handleSubnetProxyTargetUpdates( ); if (removedClients.length > 0) { - const targetsToRemove = generateSubnetProxyTargets( + const targetToRemove = generateSubnetProxyTargetV2( siteResource, removedClients ); - if (targetsToRemove.length > 0) { - logger.info( - `Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}` - ); + if (targetToRemove) { proxyJobs.push( - removeSubnetProxyTargets(newt.newtId, targetsToRemove) + removeSubnetProxyTargets(newt.newtId, [targetToRemove]) ); } @@ -1159,7 +1153,7 @@ async function handleMessagesForClientResources( } for (const resource of resources) { - const targets = generateSubnetProxyTargets(resource, [ + const target = generateSubnetProxyTargetV2(resource, [ { clientId: client.clientId, pubKey: client.pubKey, @@ -1167,8 +1161,8 @@ async function handleMessagesForClientResources( } ]); - if (targets.length > 0) { - proxyJobs.push(addSubnetProxyTargets(newt.newtId, targets)); + if (target) { + proxyJobs.push(addSubnetProxyTargets(newt.newtId, [target])); } try { @@ -1230,7 +1224,7 @@ async function handleMessagesForClientResources( } for (const resource of resources) { - const targets = generateSubnetProxyTargets(resource, [ + const target = generateSubnetProxyTargetV2(resource, [ { clientId: client.clientId, pubKey: client.pubKey, @@ -1238,9 +1232,9 @@ async function handleMessagesForClientResources( } ]); - if (targets.length > 0) { + if (target) { proxyJobs.push( - removeSubnetProxyTargets(newt.newtId, targets) + removeSubnetProxyTargets(newt.newtId, [target]) ); } diff --git a/server/routers/client/targets.ts b/server/routers/client/targets.ts index bf612d352..48b7e216d 100644 --- a/server/routers/client/targets.ts +++ b/server/routers/client/targets.ts @@ -1,8 +1,15 @@ import { sendToClient } from "#dynamic/routers/ws"; -import { db, olms, Transaction } from "@server/db"; -import { Alias, SubnetProxyTarget } from "@server/lib/ip"; +import { S } from "@faker-js/faker/dist/airline-Dz1uGqgJ"; +import { db, newts, olms, Transaction } from "@server/db"; +import { + Alias, + convertSubnetProxyTargetsV2ToV1, + SubnetProxyTarget, + SubnetProxyTargetV2 +} from "@server/lib/ip"; import logger from "@server/logger"; import { eq } from "drizzle-orm"; +import semver from "semver"; const BATCH_SIZE = 50; const BATCH_DELAY_MS = 50; @@ -19,57 +26,149 @@ function chunkArray(array: T[], size: number): T[][] { return chunks; } -export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) { - const batches = chunkArray(targets, BATCH_SIZE); +const NEWT_V2_TARGETS_VERSION = ">=1.11.0"; + +export async function convertTargetsIfNessicary( + newtId: string, + targets: SubnetProxyTarget[] | SubnetProxyTargetV2[] +) { + // get the newt + const [newt] = await db + .select() + .from(newts) + .where(eq(newts.newtId, newtId)); + if (!newt) { + throw new Error(`No newt found for id: ${newtId}`); + } + + // check the semver + if ( + newt.version && + !semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION) + ) { + logger.debug( + `addTargets Newt version ${newt.version} does not support targets v2 falling back` + ); + targets = convertSubnetProxyTargetsV2ToV1( + targets as SubnetProxyTargetV2[] + ); + } + + return targets; +} + +export async function addTargets( + newtId: string, + targets: SubnetProxyTarget[] | SubnetProxyTargetV2[] +) { + targets = await convertTargetsIfNessicary(newtId, targets); + + const batches = chunkArray( + targets, + BATCH_SIZE + ); + for (let i = 0; i < batches.length; i++) { if (i > 0) { await sleep(BATCH_DELAY_MS); } - await sendToClient(newtId, { - type: `newt/wg/targets/add`, - data: batches[i] - }, { incrementConfigVersion: true }); + await sendToClient( + newtId, + { + type: `newt/wg/targets/add`, + data: batches[i] + }, + { incrementConfigVersion: true } + ); } } export async function removeTargets( newtId: string, - targets: SubnetProxyTarget[] + targets: SubnetProxyTarget[] | SubnetProxyTargetV2[] ) { - const batches = chunkArray(targets, BATCH_SIZE); + targets = await convertTargetsIfNessicary(newtId, targets); + + const batches = chunkArray( + targets, + BATCH_SIZE + ); for (let i = 0; i < batches.length; i++) { if (i > 0) { await sleep(BATCH_DELAY_MS); } - await sendToClient(newtId, { - type: `newt/wg/targets/remove`, - data: batches[i] - },{ incrementConfigVersion: true }); + await sendToClient( + newtId, + { + type: `newt/wg/targets/remove`, + data: batches[i] + }, + { incrementConfigVersion: true } + ); } } export async function updateTargets( newtId: string, targets: { - oldTargets: SubnetProxyTarget[]; - newTargets: SubnetProxyTarget[]; + oldTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[]; + newTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[]; } ) { - const oldBatches = chunkArray(targets.oldTargets, BATCH_SIZE); - const newBatches = chunkArray(targets.newTargets, BATCH_SIZE); + // get the newt + const [newt] = await db + .select() + .from(newts) + .where(eq(newts.newtId, newtId)); + if (!newt) { + logger.error(`addTargetsL No newt found for id: ${newtId}`); + return; + } + + // check the semver + if ( + newt.version && + !semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION) + ) { + logger.debug( + `addTargets Newt version ${newt.version} does not support targets v2 falling back` + ); + targets = { + oldTargets: convertSubnetProxyTargetsV2ToV1( + targets.oldTargets as SubnetProxyTargetV2[] + ), + newTargets: convertSubnetProxyTargetsV2ToV1( + targets.newTargets as SubnetProxyTargetV2[] + ) + }; + } + + const oldBatches = chunkArray( + targets.oldTargets, + BATCH_SIZE + ); + const newBatches = chunkArray( + targets.newTargets, + BATCH_SIZE + ); + const maxBatches = Math.max(oldBatches.length, newBatches.length); for (let i = 0; i < maxBatches; i++) { if (i > 0) { await sleep(BATCH_DELAY_MS); } - await sendToClient(newtId, { - type: `newt/wg/targets/update`, - data: { - oldTargets: oldBatches[i] || [], - newTargets: newBatches[i] || [] - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + newtId, + { + type: `newt/wg/targets/update`, + data: { + oldTargets: oldBatches[i] || [], + newTargets: newBatches[i] || [] + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -94,14 +193,18 @@ export async function addPeerData( olmId = olm.olmId; } - await sendToClient(olmId, { - type: `olm/wg/peer/data/add`, - data: { - siteId: siteId, - remoteSubnets: remoteSubnets, - aliases: aliases - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + olmId, + { + type: `olm/wg/peer/data/add`, + data: { + siteId: siteId, + remoteSubnets: remoteSubnets, + aliases: aliases + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -125,14 +228,18 @@ export async function removePeerData( olmId = olm.olmId; } - await sendToClient(olmId, { - type: `olm/wg/peer/data/remove`, - data: { - siteId: siteId, - remoteSubnets: remoteSubnets, - aliases: aliases - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + olmId, + { + type: `olm/wg/peer/data/remove`, + data: { + siteId: siteId, + remoteSubnets: remoteSubnets, + aliases: aliases + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } @@ -166,14 +273,18 @@ export async function updatePeerData( olmId = olm.olmId; } - await sendToClient(olmId, { - type: `olm/wg/peer/data/update`, - data: { - siteId: siteId, - ...remoteSubnets, - ...aliases - } - }, { incrementConfigVersion: true }).catch((error) => { + await sendToClient( + olmId, + { + type: `olm/wg/peer/data/update`, + data: { + siteId: siteId, + ...remoteSubnets, + ...aliases + } + }, + { incrementConfigVersion: true } + ).catch((error) => { logger.warn(`Error sending message:`, error); }); } diff --git a/server/routers/newt/buildConfiguration.ts b/server/routers/newt/buildConfiguration.ts index e349f24e8..c20e713f6 100644 --- a/server/routers/newt/buildConfiguration.ts +++ b/server/routers/newt/buildConfiguration.ts @@ -1,9 +1,23 @@ -import { clients, clientSiteResourcesAssociationsCache, clientSitesAssociationsCache, db, ExitNode, resources, Site, siteResources, targetHealthCheck, targets } from "@server/db"; +import { + clients, + clientSiteResourcesAssociationsCache, + clientSitesAssociationsCache, + db, + ExitNode, + resources, + Site, + siteResources, + targetHealthCheck, + targets +} from "@server/db"; import logger from "@server/logger"; import { initPeerAddHandshake, updatePeer } from "../olm/peers"; import { eq, and } from "drizzle-orm"; import config from "@server/lib/config"; -import { generateSubnetProxyTargets, SubnetProxyTarget } from "@server/lib/ip"; +import { + generateSubnetProxyTargetV2, + SubnetProxyTargetV2 +} from "@server/lib/ip"; export async function buildClientConfigurationForNewtClient( site: Site, @@ -126,7 +140,7 @@ export async function buildClientConfigurationForNewtClient( .from(siteResources) .where(eq(siteResources.siteId, siteId)); - const targetsToSend: SubnetProxyTarget[] = []; + const targetsToSend: SubnetProxyTargetV2[] = []; for (const resource of allSiteResources) { // Get clients associated with this specific resource @@ -151,12 +165,14 @@ export async function buildClientConfigurationForNewtClient( ) ); - const resourceTargets = generateSubnetProxyTargets( + const resourceTarget = generateSubnetProxyTargetV2( resource, resourceClients ); - targetsToSend.push(...resourceTargets); + if (resourceTarget) { + targetsToSend.push(resourceTarget); + } } return { diff --git a/server/routers/newt/handleGetConfigMessage.ts b/server/routers/newt/handleGetConfigMessage.ts index 801c8b65a..d17a37e48 100644 --- a/server/routers/newt/handleGetConfigMessage.ts +++ b/server/routers/newt/handleGetConfigMessage.ts @@ -6,6 +6,7 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db"; import { eq } from "drizzle-orm"; import { sendToExitNode } from "#dynamic/lib/exitNodes"; import { buildClientConfigurationForNewtClient } from "./buildConfiguration"; +import { convertTargetsIfNessicary } from "../client/targets"; const inputSchema = z.object({ publicKey: z.string(), @@ -126,13 +127,15 @@ export const handleGetConfigMessage: MessageHandler = async (context) => { exitNode ); + const targetsToSend = await convertTargetsIfNessicary(newt.newtId, targets); + return { message: { type: "newt/wg/receive-config", data: { ipAddress: site.address, peers, - targets + targets: targetsToSend } }, broadcast: false, diff --git a/server/routers/siteResource/updateSiteResource.ts b/server/routers/siteResource/updateSiteResource.ts index 242b92265..c8119840f 100644 --- a/server/routers/siteResource/updateSiteResource.ts +++ b/server/routers/siteResource/updateSiteResource.ts @@ -24,7 +24,7 @@ import { updatePeerData, updateTargets } from "@server/routers/client/targets"; import { generateAliasConfig, generateRemoteSubnets, - generateSubnetProxyTargets, + generateSubnetProxyTargetV2, isIpInCidr, portRangeStringSchema } from "@server/lib/ip"; @@ -608,18 +608,18 @@ export async function handleMessagingForUpdatedSiteResource( // Only update targets on newt if destination changed if (destinationChanged || portRangesChanged) { - const oldTargets = generateSubnetProxyTargets( + const oldTarget = generateSubnetProxyTargetV2( existingSiteResource, mergedAllClients ); - const newTargets = generateSubnetProxyTargets( + const newTarget = generateSubnetProxyTargetV2( updatedSiteResource, mergedAllClients ); await updateTargets(newt.newtId, { - oldTargets: oldTargets, - newTargets: newTargets + oldTargets: [oldTarget], + newTargets: [newTarget] }); } From b01fcc70feab3d86939c5ae6fd43e8bcb4ee50db Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 3 Mar 2026 14:45:18 -0800 Subject: [PATCH 03/16] Fix ts and add note about ipv4 --- server/routers/siteResource/createSiteResource.ts | 2 +- server/routers/siteResource/updateSiteResource.ts | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/routers/siteResource/createSiteResource.ts b/server/routers/siteResource/createSiteResource.ts index bbdc3638d..b12b8d100 100644 --- a/server/routers/siteResource/createSiteResource.ts +++ b/server/routers/siteResource/createSiteResource.ts @@ -88,7 +88,7 @@ const createSiteResourceSchema = z }, { message: - "Destination must be a valid IP address or valid domain AND alias is required" + "Destination must be a valid IPV4 address or valid domain AND alias is required" } ) .refine( diff --git a/server/routers/siteResource/updateSiteResource.ts b/server/routers/siteResource/updateSiteResource.ts index c8119840f..bc5daa55f 100644 --- a/server/routers/siteResource/updateSiteResource.ts +++ b/server/routers/siteResource/updateSiteResource.ts @@ -618,8 +618,8 @@ export async function handleMessagingForUpdatedSiteResource( ); await updateTargets(newt.newtId, { - oldTargets: [oldTarget], - newTargets: [newTarget] + oldTargets: oldTarget ? [oldTarget] : [], + newTargets: newTarget ? [newTarget] : [] }); } From 3cca0c09c06e1a928c5cae9b8b5ad10ab231ee88 Mon Sep 17 00:00:00 2001 From: Noe Charmet Date: Mon, 23 Mar 2026 11:09:19 +0100 Subject: [PATCH 04/16] Allow setting Redis password from env --- server/private/lib/readConfigFile.ts | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/private/lib/readConfigFile.ts b/server/private/lib/readConfigFile.ts index 0ce6d0272..54260009b 100644 --- a/server/private/lib/readConfigFile.ts +++ b/server/private/lib/readConfigFile.ts @@ -57,7 +57,10 @@ export const privateConfigSchema = z.object({ .object({ host: z.string(), port: portSchema, - password: z.string().optional(), + password: z + .string() + .optional() + .transform(getEnvOrYaml("REDIS_PASSWORD")), db: z.int().nonnegative().optional().default(0), replicas: z .array( From 37d331e813a67274c22bfcd7839bbdd8bc692541 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 23 Mar 2026 16:05:05 -0700 Subject: [PATCH 05/16] Update version --- server/routers/client/targets.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/routers/client/targets.ts b/server/routers/client/targets.ts index 7cd59133c..b2d49db4c 100644 --- a/server/routers/client/targets.ts +++ b/server/routers/client/targets.ts @@ -11,7 +11,7 @@ import logger from "@server/logger"; import { eq } from "drizzle-orm"; import semver from "semver"; -const NEWT_V2_TARGETS_VERSION = ">=1.11.0"; +const NEWT_V2_TARGETS_VERSION = ">=1.10.3"; export async function convertTargetsIfNessicary( newtId: string, From 5b894e8682a0aa4db18c714da06db19cd19abe87 Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 16:01:54 -0700 Subject: [PATCH 06/16] Disable everything if not paid --- src/app/[orgId]/settings/(private)/idp/create/page.tsx | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/app/[orgId]/settings/(private)/idp/create/page.tsx b/src/app/[orgId]/settings/(private)/idp/create/page.tsx index 4c783e9b2..fc2c6c382 100644 --- a/src/app/[orgId]/settings/(private)/idp/create/page.tsx +++ b/src/app/[orgId]/settings/(private)/idp/create/page.tsx @@ -275,6 +275,8 @@ export default function Page() { } } + const disabled = !isPaidUser(tierMatrix.orgOidc); + return ( <>
@@ -292,6 +294,9 @@ export default function Page() {
+ + +
@@ -812,9 +817,10 @@ export default function Page() { +
); } From 5a2a97b23a5034c92e57179571d5485888fe001a Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 16:12:13 -0700 Subject: [PATCH 07/16] Add better pooling controls --- server/db/pg/driver.ts | 35 ++++++++++++--------- server/db/pg/logsDriver.ts | 35 ++++++++++++--------- server/db/pg/poolConfig.ts | 63 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 28 deletions(-) create mode 100644 server/db/pg/poolConfig.ts diff --git a/server/db/pg/driver.ts b/server/db/pg/driver.ts index 5b357d060..9366e32e1 100644 --- a/server/db/pg/driver.ts +++ b/server/db/pg/driver.ts @@ -1,7 +1,7 @@ import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres"; -import { Pool } from "pg"; import { readConfigFile } from "@server/lib/readConfigFile"; import { withReplicas } from "drizzle-orm/pg-core"; +import { createPool } from "./poolConfig"; function createDb() { const config = readConfigFile(); @@ -39,12 +39,17 @@ function createDb() { // Create connection pools instead of individual connections const poolConfig = config.postgres.pool; - const primaryPool = new Pool({ + const maxConnections = poolConfig?.max_connections || 20; + const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000; + const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000; + + const primaryPool = createPool( connectionString, - max: poolConfig?.max_connections || 20, - idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, - connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000 - }); + maxConnections, + idleTimeoutMs, + connectionTimeoutMs, + "primary" + ); const replicas = []; @@ -55,14 +60,16 @@ function createDb() { }) ); } else { + const maxReplicaConnections = + poolConfig?.max_replica_connections || 20; for (const conn of replicaConnections) { - const replicaPool = new Pool({ - connectionString: conn.connection_string, - max: poolConfig?.max_replica_connections || 20, - idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, - connectionTimeoutMillis: - poolConfig?.connection_timeout_ms || 5000 - }); + const replicaPool = createPool( + conn.connection_string, + maxReplicaConnections, + idleTimeoutMs, + connectionTimeoutMs, + "replica" + ); replicas.push( DrizzlePostgres(replicaPool, { logger: process.env.QUERY_LOGGING == "true" @@ -84,4 +91,4 @@ export default db; export const primaryDb = db.$primary; export type Transaction = Parameters< Parameters<(typeof db)["transaction"]>[0] ->[0]; +>[0]; \ No newline at end of file diff --git a/server/db/pg/logsDriver.ts b/server/db/pg/logsDriver.ts index 49e26f89f..146b8fb2f 100644 --- a/server/db/pg/logsDriver.ts +++ b/server/db/pg/logsDriver.ts @@ -1,9 +1,9 @@ import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres"; -import { Pool } from "pg"; import { readConfigFile } from "@server/lib/readConfigFile"; import { withReplicas } from "drizzle-orm/pg-core"; import { build } from "@server/build"; import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver"; +import { createPool } from "./poolConfig"; function createLogsDb() { // Only use separate logs database in SaaS builds @@ -42,12 +42,17 @@ function createLogsDb() { // Create separate connection pool for logs database const poolConfig = logsConfig?.pool || config.postgres?.pool; - const primaryPool = new Pool({ + const maxConnections = poolConfig?.max_connections || 20; + const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000; + const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000; + + const primaryPool = createPool( connectionString, - max: poolConfig?.max_connections || 20, - idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, - connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000 - }); + maxConnections, + idleTimeoutMs, + connectionTimeoutMs, + "logs-primary" + ); const replicas = []; @@ -58,14 +63,16 @@ function createLogsDb() { }) ); } else { + const maxReplicaConnections = + poolConfig?.max_replica_connections || 20; for (const conn of replicaConnections) { - const replicaPool = new Pool({ - connectionString: conn.connection_string, - max: poolConfig?.max_replica_connections || 20, - idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, - connectionTimeoutMillis: - poolConfig?.connection_timeout_ms || 5000 - }); + const replicaPool = createPool( + conn.connection_string, + maxReplicaConnections, + idleTimeoutMs, + connectionTimeoutMs, + "logs-replica" + ); replicas.push( DrizzlePostgres(replicaPool, { logger: process.env.QUERY_LOGGING == "true" @@ -84,4 +91,4 @@ function createLogsDb() { export const logsDb = createLogsDb(); export default logsDb; -export const primaryLogsDb = logsDb.$primary; +export const primaryLogsDb = logsDb.$primary; \ No newline at end of file diff --git a/server/db/pg/poolConfig.ts b/server/db/pg/poolConfig.ts new file mode 100644 index 000000000..f753121c1 --- /dev/null +++ b/server/db/pg/poolConfig.ts @@ -0,0 +1,63 @@ +import { Pool, PoolConfig } from "pg"; +import logger from "@server/logger"; + +export function createPoolConfig( + connectionString: string, + maxConnections: number, + idleTimeoutMs: number, + connectionTimeoutMs: number +): PoolConfig { + return { + connectionString, + max: maxConnections, + idleTimeoutMillis: idleTimeoutMs, + connectionTimeoutMillis: connectionTimeoutMs, + // TCP keepalive to prevent silent connection drops by NAT gateways, + // load balancers, and other intermediate network devices (e.g. AWS + // NAT Gateway drops idle TCP connections after ~350s) + keepAlive: true, + keepAliveInitialDelayMillis: 10000, // send first keepalive after 10s of idle + // Allow connections to be released and recreated more aggressively + // to avoid stale connections building up + allowExitOnIdle: false + }; +} + +export function attachPoolErrorHandlers(pool: Pool, label: string): void { + pool.on("error", (err) => { + // This catches errors on idle clients in the pool. Without this + // handler an unexpected disconnect would crash the process. + logger.error( + `Unexpected error on idle ${label} database client: ${err.message}` + ); + }); + + pool.on("connect", (client) => { + // Set a statement timeout on every new connection so a single slow + // query can't block the pool forever + client.query("SET statement_timeout = '30s'").catch((err: Error) => { + logger.warn( + `Failed to set statement_timeout on ${label} client: ${err.message}` + ); + }); + }); +} + +export function createPool( + connectionString: string, + maxConnections: number, + idleTimeoutMs: number, + connectionTimeoutMs: number, + label: string +): Pool { + const pool = new Pool( + createPoolConfig( + connectionString, + maxConnections, + idleTimeoutMs, + connectionTimeoutMs + ) + ); + attachPoolErrorHandlers(pool, label); + return pool; +} \ No newline at end of file From fff38aac85eae7205417e302c7a16aeff2f7a1ce Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 16:26:56 -0700 Subject: [PATCH 08/16] Add ssh access log --- server/private/routers/ssh/signSshKey.ts | 19 +++++++++++++++++++ src/app/[orgId]/settings/logs/access/page.tsx | 16 ++++++++-------- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/server/private/routers/ssh/signSshKey.ts b/server/private/routers/ssh/signSshKey.ts index 5cffb4a34..c39d2e0ae 100644 --- a/server/private/routers/ssh/signSshKey.ts +++ b/server/private/routers/ssh/signSshKey.ts @@ -24,6 +24,7 @@ import { sites, userOrgs } from "@server/db"; +import { logAccessAudit } from "#private/lib/logAccessAudit"; import { isLicensedOrSubscribed } from "#private/lib/isLicencedOrSubscribed"; import { tierMatrix } from "@server/lib/billing/tierMatrix"; import response from "@server/lib/response"; @@ -463,6 +464,24 @@ export async function signSshKey( }) }); + await logAccessAudit({ + action: true, + type: "ssh", + orgId: orgId, + resourceId: resource.siteResourceId, + user: req.user + ? { username: req.user.username ?? "", userId: req.user.userId } + : undefined, + metadata: { + resourceName: resource.name, + siteId: resource.siteId, + sshUsername: usernameToUse, + sshHost: sshHost + }, + userAgent: req.headers["user-agent"], + requestIp: req.ip + }); + return response(res, { data: { certificate: cert.certificate, diff --git a/src/app/[orgId]/settings/logs/access/page.tsx b/src/app/[orgId]/settings/logs/access/page.tsx index 810022b98..dbb7b6708 100644 --- a/src/app/[orgId]/settings/logs/access/page.tsx +++ b/src/app/[orgId]/settings/logs/access/page.tsx @@ -493,7 +493,8 @@ export default function GeneralPage() { { value: "whitelistedEmail", label: "Whitelisted Email" - } + }, + { value: "ssh", label: "SSH" } ]} selectedValue={filters.type} onValueChange={(value) => @@ -507,13 +508,12 @@ export default function GeneralPage() { ); }, cell: ({ row }) => { - // should be capitalized first letter - return ( - - {row.original.type.charAt(0).toUpperCase() + - row.original.type.slice(1) || "-"} - - ); + const typeLabel = + row.original.type === "ssh" + ? "SSH" + : row.original.type.charAt(0).toUpperCase() + + row.original.type.slice(1); + return {typeLabel || "-"}; } }, { From 985e1bb9abae7ca24d746431b3d2f2d5418f4a6a Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 16:01:54 -0700 Subject: [PATCH 09/16] Disable everything if not paid --- src/app/[orgId]/settings/(private)/idp/create/page.tsx | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/app/[orgId]/settings/(private)/idp/create/page.tsx b/src/app/[orgId]/settings/(private)/idp/create/page.tsx index 4c783e9b2..fc2c6c382 100644 --- a/src/app/[orgId]/settings/(private)/idp/create/page.tsx +++ b/src/app/[orgId]/settings/(private)/idp/create/page.tsx @@ -275,6 +275,8 @@ export default function Page() { } } + const disabled = !isPaidUser(tierMatrix.orgOidc); + return ( <>
@@ -292,6 +294,9 @@ export default function Page() {
+ + +
@@ -812,9 +817,10 @@ export default function Page() { +
); } From cf2dfdea5b667090f3034a5a5bbbb9ad212d7843 Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 16:12:13 -0700 Subject: [PATCH 10/16] Add better pooling controls --- server/db/pg/driver.ts | 35 ++++++++++++--------- server/db/pg/logsDriver.ts | 35 ++++++++++++--------- server/db/pg/poolConfig.ts | 63 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 105 insertions(+), 28 deletions(-) create mode 100644 server/db/pg/poolConfig.ts diff --git a/server/db/pg/driver.ts b/server/db/pg/driver.ts index 5b357d060..9366e32e1 100644 --- a/server/db/pg/driver.ts +++ b/server/db/pg/driver.ts @@ -1,7 +1,7 @@ import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres"; -import { Pool } from "pg"; import { readConfigFile } from "@server/lib/readConfigFile"; import { withReplicas } from "drizzle-orm/pg-core"; +import { createPool } from "./poolConfig"; function createDb() { const config = readConfigFile(); @@ -39,12 +39,17 @@ function createDb() { // Create connection pools instead of individual connections const poolConfig = config.postgres.pool; - const primaryPool = new Pool({ + const maxConnections = poolConfig?.max_connections || 20; + const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000; + const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000; + + const primaryPool = createPool( connectionString, - max: poolConfig?.max_connections || 20, - idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, - connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000 - }); + maxConnections, + idleTimeoutMs, + connectionTimeoutMs, + "primary" + ); const replicas = []; @@ -55,14 +60,16 @@ function createDb() { }) ); } else { + const maxReplicaConnections = + poolConfig?.max_replica_connections || 20; for (const conn of replicaConnections) { - const replicaPool = new Pool({ - connectionString: conn.connection_string, - max: poolConfig?.max_replica_connections || 20, - idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, - connectionTimeoutMillis: - poolConfig?.connection_timeout_ms || 5000 - }); + const replicaPool = createPool( + conn.connection_string, + maxReplicaConnections, + idleTimeoutMs, + connectionTimeoutMs, + "replica" + ); replicas.push( DrizzlePostgres(replicaPool, { logger: process.env.QUERY_LOGGING == "true" @@ -84,4 +91,4 @@ export default db; export const primaryDb = db.$primary; export type Transaction = Parameters< Parameters<(typeof db)["transaction"]>[0] ->[0]; +>[0]; \ No newline at end of file diff --git a/server/db/pg/logsDriver.ts b/server/db/pg/logsDriver.ts index 49e26f89f..146b8fb2f 100644 --- a/server/db/pg/logsDriver.ts +++ b/server/db/pg/logsDriver.ts @@ -1,9 +1,9 @@ import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres"; -import { Pool } from "pg"; import { readConfigFile } from "@server/lib/readConfigFile"; import { withReplicas } from "drizzle-orm/pg-core"; import { build } from "@server/build"; import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver"; +import { createPool } from "./poolConfig"; function createLogsDb() { // Only use separate logs database in SaaS builds @@ -42,12 +42,17 @@ function createLogsDb() { // Create separate connection pool for logs database const poolConfig = logsConfig?.pool || config.postgres?.pool; - const primaryPool = new Pool({ + const maxConnections = poolConfig?.max_connections || 20; + const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000; + const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000; + + const primaryPool = createPool( connectionString, - max: poolConfig?.max_connections || 20, - idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, - connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000 - }); + maxConnections, + idleTimeoutMs, + connectionTimeoutMs, + "logs-primary" + ); const replicas = []; @@ -58,14 +63,16 @@ function createLogsDb() { }) ); } else { + const maxReplicaConnections = + poolConfig?.max_replica_connections || 20; for (const conn of replicaConnections) { - const replicaPool = new Pool({ - connectionString: conn.connection_string, - max: poolConfig?.max_replica_connections || 20, - idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000, - connectionTimeoutMillis: - poolConfig?.connection_timeout_ms || 5000 - }); + const replicaPool = createPool( + conn.connection_string, + maxReplicaConnections, + idleTimeoutMs, + connectionTimeoutMs, + "logs-replica" + ); replicas.push( DrizzlePostgres(replicaPool, { logger: process.env.QUERY_LOGGING == "true" @@ -84,4 +91,4 @@ function createLogsDb() { export const logsDb = createLogsDb(); export default logsDb; -export const primaryLogsDb = logsDb.$primary; +export const primaryLogsDb = logsDb.$primary; \ No newline at end of file diff --git a/server/db/pg/poolConfig.ts b/server/db/pg/poolConfig.ts new file mode 100644 index 000000000..f753121c1 --- /dev/null +++ b/server/db/pg/poolConfig.ts @@ -0,0 +1,63 @@ +import { Pool, PoolConfig } from "pg"; +import logger from "@server/logger"; + +export function createPoolConfig( + connectionString: string, + maxConnections: number, + idleTimeoutMs: number, + connectionTimeoutMs: number +): PoolConfig { + return { + connectionString, + max: maxConnections, + idleTimeoutMillis: idleTimeoutMs, + connectionTimeoutMillis: connectionTimeoutMs, + // TCP keepalive to prevent silent connection drops by NAT gateways, + // load balancers, and other intermediate network devices (e.g. AWS + // NAT Gateway drops idle TCP connections after ~350s) + keepAlive: true, + keepAliveInitialDelayMillis: 10000, // send first keepalive after 10s of idle + // Allow connections to be released and recreated more aggressively + // to avoid stale connections building up + allowExitOnIdle: false + }; +} + +export function attachPoolErrorHandlers(pool: Pool, label: string): void { + pool.on("error", (err) => { + // This catches errors on idle clients in the pool. Without this + // handler an unexpected disconnect would crash the process. + logger.error( + `Unexpected error on idle ${label} database client: ${err.message}` + ); + }); + + pool.on("connect", (client) => { + // Set a statement timeout on every new connection so a single slow + // query can't block the pool forever + client.query("SET statement_timeout = '30s'").catch((err: Error) => { + logger.warn( + `Failed to set statement_timeout on ${label} client: ${err.message}` + ); + }); + }); +} + +export function createPool( + connectionString: string, + maxConnections: number, + idleTimeoutMs: number, + connectionTimeoutMs: number, + label: string +): Pool { + const pool = new Pool( + createPoolConfig( + connectionString, + maxConnections, + idleTimeoutMs, + connectionTimeoutMs + ) + ); + attachPoolErrorHandlers(pool, label); + return pool; +} \ No newline at end of file From d17ec6dc1fe0b20fe92ebabe1dd64db72e8d2707 Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 17:39:43 -0700 Subject: [PATCH 11/16] Try to solve th problem --- server/cleanup.ts | 2 + server/private/cleanup.ts | 2 + server/private/routers/ws/ws.ts | 37 +- server/routers/newt/getNewtToken.ts | 22 +- server/routers/newt/handleNewtPingMessage.ts | 19 +- server/routers/newt/pingAccumulator.ts | 382 +++++++++++++++++++ server/routers/olm/getOlmToken.ts | 4 +- server/routers/olm/handleOlmPingMessage.ts | 23 +- server/routers/ws/messageHandlers.ts | 5 + server/routers/ws/ws.ts | 23 +- 10 files changed, 446 insertions(+), 73 deletions(-) create mode 100644 server/routers/newt/pingAccumulator.ts diff --git a/server/cleanup.ts b/server/cleanup.ts index 137654827..3c462f3f2 100644 --- a/server/cleanup.ts +++ b/server/cleanup.ts @@ -1,8 +1,10 @@ import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; +import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator"; import { cleanup as wsCleanup } from "#dynamic/routers/ws"; async function cleanup() { + await stopPingAccumulator(); await flushBandwidthToDb(); await flushSiteBandwidthToDb(); await wsCleanup(); diff --git a/server/private/cleanup.ts b/server/private/cleanup.ts index 0bd9822dd..5321fbc9e 100644 --- a/server/private/cleanup.ts +++ b/server/private/cleanup.ts @@ -15,8 +15,10 @@ import { rateLimitService } from "#private/lib/rateLimit"; import { cleanup as wsCleanup } from "#private/routers/ws"; import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage"; import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth"; +import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator"; async function cleanup() { + await stopPingAccumulator(); await flushBandwidthToDb(); await flushSiteBandwidthToDb(); await rateLimitService.cleanup(); diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index 4bfda5da8..d96c55c91 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -30,6 +30,7 @@ import { } from "@server/db"; import { eq } from "drizzle-orm"; import { db } from "@server/db"; +import { recordPing } from "@server/routers/newt/pingAccumulator"; import { validateNewtSessionToken } from "@server/auth/sessions/newt"; import { validateOlmSessionToken } from "@server/auth/sessions/olm"; import logger from "@server/logger"; @@ -197,11 +198,7 @@ const connectedClients: Map = new Map(); // Config version tracking map (local to this node, resets on server restart) const clientConfigVersions: Map = new Map(); -// Tracks the last Unix timestamp (seconds) at which a ping was flushed to the -// DB for a given siteId. Resets on server restart which is fine – the first -// ping after startup will always write, re-establishing the online state. -const lastPingDbWrite: Map = new Map(); -const PING_DB_WRITE_INTERVAL = 45; // seconds + // Recovery tracking let isRedisRecoveryInProgress = false; @@ -853,32 +850,16 @@ const setupConnection = async ( ); }); - // Handle WebSocket protocol-level pings from older newt clients that do - // not send application-level "newt/ping" messages. Update the site's - // online state and lastPing timestamp so the offline checker treats them - // the same as modern newt clients. if (clientType === "newt") { const newtClient = client as Newt; - ws.on("ping", async () => { + ws.on("ping", () => { if (!newtClient.siteId) return; - const now = Math.floor(Date.now() / 1000); - const lastWrite = lastPingDbWrite.get(newtClient.siteId) ?? 0; - if (now - lastWrite < PING_DB_WRITE_INTERVAL) return; - lastPingDbWrite.set(newtClient.siteId, now); - try { - await db - .update(sites) - .set({ - online: true, - lastPing: now - }) - .where(eq(sites.siteId, newtClient.siteId)); - } catch (error) { - logger.error( - "Error updating newt site online state on WS ping", - { error } - ); - } + // Record the ping in the accumulator instead of writing to the + // database on every WS ping frame. The accumulator flushes all + // pending pings in a single batched UPDATE every ~10s, which + // prevents connection pool exhaustion under load (especially + // with cross-region latency to the database). + recordPing(newtClient.siteId); }); } diff --git a/server/routers/newt/getNewtToken.ts b/server/routers/newt/getNewtToken.ts index 637973582..bc3cca9fc 100644 --- a/server/routers/newt/getNewtToken.ts +++ b/server/routers/newt/getNewtToken.ts @@ -1,5 +1,5 @@ import { generateSessionToken } from "@server/auth/sessions/app"; -import { db } from "@server/db"; +import { db, newtSessions } from "@server/db"; import { newts } from "@server/db"; import HttpCode from "@server/types/HttpCode"; import response from "@server/lib/response"; @@ -92,6 +92,26 @@ export async function getNewtToken( ); } + const [existingSession] = await db + .select() + .from(newtSessions) + .where(eq(newtSessions.newtId, existingNewt.newtId)); + + // if the session still has time in the expires, reuse it + if (existingSession && (existingSession.expiresAt + 30 * 60 * 1000) > Date.now()) { + return response<{ token: string; serverVersion: string }>(res, { + data: { + token: existingSession.sessionId, + serverVersion: APP_VERSION + }, + success: true, + error: false, + message: "Token created successfully", + status: HttpCode.OK + }); + } + + // otherwise generate a new one const resToken = generateSessionToken(); await createNewtSession(resToken, existingNewt.newtId); diff --git a/server/routers/newt/handleNewtPingMessage.ts b/server/routers/newt/handleNewtPingMessage.ts index 319647b83..da25852a0 100644 --- a/server/routers/newt/handleNewtPingMessage.ts +++ b/server/routers/newt/handleNewtPingMessage.ts @@ -5,6 +5,7 @@ import { Newt } from "@server/db"; import { eq, lt, isNull, and, or } from "drizzle-orm"; import logger from "@server/logger"; import { sendNewtSyncMessage } from "./sync"; +import { recordPing } from "./pingAccumulator"; // Track if the offline checker interval is running let offlineCheckerInterval: NodeJS.Timeout | null = null; @@ -114,18 +115,12 @@ export const handleNewtPingMessage: MessageHandler = async (context) => { return; } - try { - // Mark the site as online and record the ping timestamp. - await db - .update(sites) - .set({ - online: true, - lastPing: Math.floor(Date.now() / 1000) - }) - .where(eq(sites.siteId, newt.siteId)); - } catch (error) { - logger.error("Error updating online state on newt ping", { error }); - } + // Record the ping in memory; it will be flushed to the database + // periodically by the ping accumulator (every ~10s) in a single + // batched UPDATE instead of one query per ping. This prevents + // connection pool exhaustion under load, especially with + // cross-region latency to the database. + recordPing(newt.siteId); // Check config version and sync if stale. const configVersion = await getClientConfigVersion(newt.newtId); diff --git a/server/routers/newt/pingAccumulator.ts b/server/routers/newt/pingAccumulator.ts new file mode 100644 index 000000000..83afd613e --- /dev/null +++ b/server/routers/newt/pingAccumulator.ts @@ -0,0 +1,382 @@ +import { db } from "@server/db"; +import { sites, clients, olms } from "@server/db"; +import { eq, inArray } from "drizzle-orm"; +import logger from "@server/logger"; + +/** + * Ping Accumulator + * + * Instead of writing to the database on every single newt/olm ping (which + * causes pool exhaustion under load, especially with cross-region latency), + * we accumulate pings in memory and flush them to the database periodically + * in a single batch. + * + * This is the same pattern used for bandwidth flushing in + * receiveBandwidth.ts and handleReceiveBandwidthMessage.ts. + * + * Supports two kinds of pings: + * - **Site pings** (from newts): update `sites.online` and `sites.lastPing` + * - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`, + * `clients.archived`, and optionally reset `olms.archived` + */ + +const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds +const MAX_RETRIES = 2; +const BASE_DELAY_MS = 50; + +// ── Site (newt) pings ────────────────────────────────────────────────── +// Map of siteId -> latest ping timestamp (unix seconds) +const pendingSitePings: Map = new Map(); + +// ── Client (OLM) pings ──────────────────────────────────────────────── +// Map of clientId -> latest ping timestamp (unix seconds) +const pendingClientPings: Map = new Map(); +// Set of olmIds whose `archived` flag should be reset to false +const pendingOlmArchiveResets: Set = new Set(); + +let flushTimer: NodeJS.Timeout | null = null; + +// ── Public API ───────────────────────────────────────────────────────── + +/** + * Record a ping for a newt site. This does NOT write to the database + * immediately. Instead it stores the latest ping timestamp in memory, + * to be flushed periodically by the background timer. + */ +export function recordSitePing(siteId: number): void { + const now = Math.floor(Date.now() / 1000); + pendingSitePings.set(siteId, now); +} + +/** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */ +export const recordPing = recordSitePing; + +/** + * Record a ping for an OLM client. Batches the `clients` table update + * (`online`, `lastPing`, `archived`) and, when `olmArchived` is true, + * also queues an `olms` table update to clear the archived flag. + */ +export function recordClientPing( + clientId: number, + olmId: string, + olmArchived: boolean +): void { + const now = Math.floor(Date.now() / 1000); + pendingClientPings.set(clientId, now); + if (olmArchived) { + pendingOlmArchiveResets.add(olmId); + } +} + +// ── Flush Logic ──────────────────────────────────────────────────────── + +/** + * Flush all accumulated site pings to the database. + */ +async function flushSitePingsToDb(): Promise { + if (pendingSitePings.size === 0) { + return; + } + + // Snapshot and clear so new pings arriving during the flush go into a + // fresh map for the next cycle. + const pingsToFlush = new Map(pendingSitePings); + pendingSitePings.clear(); + + // Sort by siteId for consistent lock ordering (prevents deadlocks) + const sortedEntries = Array.from(pingsToFlush.entries()).sort( + ([a], [b]) => a - b + ); + + const BATCH_SIZE = 50; + for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) { + const batch = sortedEntries.slice(i, i + BATCH_SIZE); + + try { + await withRetry(async () => { + // Group by timestamp for efficient bulk updates + const byTimestamp = new Map(); + for (const [siteId, timestamp] of batch) { + const group = byTimestamp.get(timestamp) || []; + group.push(siteId); + byTimestamp.set(timestamp, group); + } + + if (byTimestamp.size === 1) { + const [timestamp, siteIds] = Array.from( + byTimestamp.entries() + )[0]; + await db + .update(sites) + .set({ + online: true, + lastPing: timestamp + }) + .where(inArray(sites.siteId, siteIds)); + } else { + await db.transaction(async (tx) => { + for (const [timestamp, siteIds] of byTimestamp) { + await tx + .update(sites) + .set({ + online: true, + lastPing: timestamp + }) + .where(inArray(sites.siteId, siteIds)); + } + }); + } + }, "flushSitePingsToDb"); + } catch (error) { + logger.error( + `Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`, + { error } + ); + for (const [siteId, timestamp] of batch) { + const existing = pendingSitePings.get(siteId); + if (!existing || existing < timestamp) { + pendingSitePings.set(siteId, timestamp); + } + } + } + } +} + +/** + * Flush all accumulated client (OLM) pings to the database. + */ +async function flushClientPingsToDb(): Promise { + if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) { + return; + } + + // Snapshot and clear + const pingsToFlush = new Map(pendingClientPings); + pendingClientPings.clear(); + + const olmResetsToFlush = new Set(pendingOlmArchiveResets); + pendingOlmArchiveResets.clear(); + + // ── Flush client pings ───────────────────────────────────────────── + if (pingsToFlush.size > 0) { + const sortedEntries = Array.from(pingsToFlush.entries()).sort( + ([a], [b]) => a - b + ); + + const BATCH_SIZE = 50; + for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) { + const batch = sortedEntries.slice(i, i + BATCH_SIZE); + + try { + await withRetry(async () => { + const byTimestamp = new Map(); + for (const [clientId, timestamp] of batch) { + const group = byTimestamp.get(timestamp) || []; + group.push(clientId); + byTimestamp.set(timestamp, group); + } + + if (byTimestamp.size === 1) { + const [timestamp, clientIds] = Array.from( + byTimestamp.entries() + )[0]; + await db + .update(clients) + .set({ + lastPing: timestamp, + online: true, + archived: false + }) + .where(inArray(clients.clientId, clientIds)); + } else { + await db.transaction(async (tx) => { + for (const [timestamp, clientIds] of byTimestamp) { + await tx + .update(clients) + .set({ + lastPing: timestamp, + online: true, + archived: false + }) + .where( + inArray(clients.clientId, clientIds) + ); + } + }); + } + }, "flushClientPingsToDb"); + } catch (error) { + logger.error( + `Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`, + { error } + ); + for (const [clientId, timestamp] of batch) { + const existing = pendingClientPings.get(clientId); + if (!existing || existing < timestamp) { + pendingClientPings.set(clientId, timestamp); + } + } + } + } + } + + // ── Flush OLM archive resets ─────────────────────────────────────── + if (olmResetsToFlush.size > 0) { + const olmIds = Array.from(olmResetsToFlush).sort(); + + const BATCH_SIZE = 50; + for (let i = 0; i < olmIds.length; i += BATCH_SIZE) { + const batch = olmIds.slice(i, i + BATCH_SIZE); + + try { + await withRetry(async () => { + await db + .update(olms) + .set({ archived: false }) + .where(inArray(olms.olmId, batch)); + }, "flushOlmArchiveResets"); + } catch (error) { + logger.error( + `Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`, + { error } + ); + for (const olmId of batch) { + pendingOlmArchiveResets.add(olmId); + } + } + } + } +} + +/** + * Flush everything — called by the interval timer and during shutdown. + */ +export async function flushPingsToDb(): Promise { + await flushSitePingsToDb(); + await flushClientPingsToDb(); +} + +// ── Retry / Error Helpers ────────────────────────────────────────────── + +/** + * Simple retry wrapper with exponential backoff for transient errors + * (connection timeouts, unexpected disconnects). + */ +async function withRetry( + operation: () => Promise, + context: string +): Promise { + let attempt = 0; + while (true) { + try { + return await operation(); + } catch (error: any) { + if (isTransientError(error) && attempt < MAX_RETRIES) { + attempt++; + const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS; + const jitter = Math.random() * baseDelay; + const delay = baseDelay + jitter; + logger.warn( + `Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms` + ); + await new Promise((resolve) => setTimeout(resolve, delay)); + continue; + } + throw error; + } + } +} + +/** + * Detect transient connection errors that are safe to retry. + */ +function isTransientError(error: any): boolean { + if (!error) return false; + + const message = (error.message || "").toLowerCase(); + const causeMessage = (error.cause?.message || "").toLowerCase(); + const code = error.code || ""; + + // Connection timeout / terminated + if ( + message.includes("connection timeout") || + message.includes("connection terminated") || + message.includes("timeout exceeded when trying to connect") || + causeMessage.includes("connection terminated unexpectedly") || + causeMessage.includes("connection timeout") + ) { + return true; + } + + // PostgreSQL deadlock + if (code === "40P01" || message.includes("deadlock")) { + return true; + } + + // ECONNRESET, ECONNREFUSED, EPIPE + if ( + code === "ECONNRESET" || + code === "ECONNREFUSED" || + code === "EPIPE" || + code === "ETIMEDOUT" + ) { + return true; + } + + return false; +} + +// ── Lifecycle ────────────────────────────────────────────────────────── + +/** + * Start the background flush timer. Call this once at server startup. + */ +export function startPingAccumulator(): void { + if (flushTimer) { + return; // Already running + } + + flushTimer = setInterval(async () => { + try { + await flushPingsToDb(); + } catch (error) { + logger.error("Unhandled error in ping accumulator flush", { + error + }); + } + }, FLUSH_INTERVAL_MS); + + // Don't prevent the process from exiting + flushTimer.unref(); + + logger.info( + `Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)` + ); +} + +/** + * Stop the background flush timer and perform a final flush. + * Call this during graceful shutdown. + */ +export async function stopPingAccumulator(): Promise { + if (flushTimer) { + clearInterval(flushTimer); + flushTimer = null; + } + + // Final flush to persist any remaining pings + try { + await flushPingsToDb(); + } catch (error) { + logger.error("Error during final ping accumulator flush", { error }); + } + + logger.info("Ping accumulator stopped"); +} + +/** + * Get the number of pending (unflushed) pings. Useful for monitoring. + */ +export function getPendingPingCount(): number { + return pendingSitePings.size + pendingClientPings.size; +} \ No newline at end of file diff --git a/server/routers/olm/getOlmToken.ts b/server/routers/olm/getOlmToken.ts index 2734a63bc..027e7ec15 100644 --- a/server/routers/olm/getOlmToken.ts +++ b/server/routers/olm/getOlmToken.ts @@ -8,7 +8,9 @@ import { ExitNode, exitNodes, sites, - clientSitesAssociationsCache + clientSitesAssociationsCache, + olmSessions, + olmSessions } from "@server/db"; import { olms } from "@server/db"; import HttpCode from "@server/types/HttpCode"; diff --git a/server/routers/olm/handleOlmPingMessage.ts b/server/routers/olm/handleOlmPingMessage.ts index efcbf1696..0f520b234 100644 --- a/server/routers/olm/handleOlmPingMessage.ts +++ b/server/routers/olm/handleOlmPingMessage.ts @@ -3,6 +3,7 @@ import { db } from "@server/db"; import { MessageHandler } from "@server/routers/ws"; import { clients, olms, Olm } from "@server/db"; import { eq, lt, isNull, and, or } from "drizzle-orm"; +import { recordClientPing } from "@server/routers/newt/pingAccumulator"; import logger from "@server/logger"; import { validateSessionToken } from "@server/auth/sessions/app"; import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy"; @@ -201,22 +202,12 @@ export const handleOlmPingMessage: MessageHandler = async (context) => { await sendOlmSyncMessage(olm, client); } - // Update the client's last ping timestamp - await db - .update(clients) - .set({ - lastPing: Math.floor(Date.now() / 1000), - online: true, - archived: false - }) - .where(eq(clients.clientId, olm.clientId)); - - if (olm.archived) { - await db - .update(olms) - .set({ archived: false }) - .where(eq(olms.olmId, olm.olmId)); - } + // Record the ping in memory; it will be flushed to the database + // periodically by the ping accumulator (every ~10s) in a single + // batched UPDATE instead of one query per ping. This prevents + // connection pool exhaustion under load, especially with + // cross-region latency to the database. + recordClientPing(olm.clientId, olm.olmId, !!olm.archived); } catch (error) { logger.error("Error handling ping message", { error }); } diff --git a/server/routers/ws/messageHandlers.ts b/server/routers/ws/messageHandlers.ts index 628caafd5..143e4d516 100644 --- a/server/routers/ws/messageHandlers.ts +++ b/server/routers/ws/messageHandlers.ts @@ -11,6 +11,7 @@ import { startNewtOfflineChecker, handleNewtDisconnectingMessage } from "../newt"; +import { startPingAccumulator } from "../newt/pingAccumulator"; import { handleOlmRegisterMessage, handleOlmRelayMessage, @@ -46,6 +47,10 @@ export const messageHandlers: Record = { "ws/round-trip/complete": handleRoundTripMessage }; +// Start the ping accumulator for all builds — it batches per-site online/lastPing +// updates into periodic bulk writes, preventing connection pool exhaustion. +startPingAccumulator(); + if (build != "saas") { startOlmOfflineChecker(); // this is to handle the offline check for olms startNewtOfflineChecker(); // this is to handle the offline check for newts diff --git a/server/routers/ws/ws.ts b/server/routers/ws/ws.ts index 08a7dbd4c..6e6312715 100644 --- a/server/routers/ws/ws.ts +++ b/server/routers/ws/ws.ts @@ -6,6 +6,7 @@ import { Socket } from "net"; import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db"; import { eq } from "drizzle-orm"; import { db } from "@server/db"; +import { recordPing } from "@server/routers/newt/pingAccumulator"; import { validateNewtSessionToken } from "@server/auth/sessions/newt"; import { validateOlmSessionToken } from "@server/auth/sessions/olm"; import { messageHandlers } from "./messageHandlers"; @@ -386,22 +387,14 @@ const setupConnection = async ( // the same as modern newt clients. if (clientType === "newt") { const newtClient = client as Newt; - ws.on("ping", async () => { + ws.on("ping", () => { if (!newtClient.siteId) return; - try { - await db - .update(sites) - .set({ - online: true, - lastPing: Math.floor(Date.now() / 1000) - }) - .where(eq(sites.siteId, newtClient.siteId)); - } catch (error) { - logger.error( - "Error updating newt site online state on WS ping", - { error } - ); - } + // Record the ping in the accumulator instead of writing to the + // database on every WS ping frame. The accumulator flushes all + // pending pings in a single batched UPDATE every ~10s, which + // prevents connection pool exhaustion under load (especially + // with cross-region latency to the database). + recordPing(newtClient.siteId); }); } From 6f71e9f0f21719e050cd9a48da278f402bef4c5c Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 17:54:51 -0700 Subject: [PATCH 12/16] Clean up --- server/private/routers/ws/ws.ts | 5 ----- server/routers/newt/getNewtToken.ts | 19 ------------------- server/routers/olm/getOlmToken.ts | 2 -- 3 files changed, 26 deletions(-) diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index d96c55c91..67b83f931 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -19,14 +19,9 @@ import { Socket } from "net"; import { Newt, newts, - NewtSession, - olms, Olm, - OlmSession, RemoteExitNode, - RemoteExitNodeSession, remoteExitNodes, - sites } from "@server/db"; import { eq } from "drizzle-orm"; import { db } from "@server/db"; diff --git a/server/routers/newt/getNewtToken.ts b/server/routers/newt/getNewtToken.ts index bc3cca9fc..9d3da7e97 100644 --- a/server/routers/newt/getNewtToken.ts +++ b/server/routers/newt/getNewtToken.ts @@ -92,25 +92,6 @@ export async function getNewtToken( ); } - const [existingSession] = await db - .select() - .from(newtSessions) - .where(eq(newtSessions.newtId, existingNewt.newtId)); - - // if the session still has time in the expires, reuse it - if (existingSession && (existingSession.expiresAt + 30 * 60 * 1000) > Date.now()) { - return response<{ token: string; serverVersion: string }>(res, { - data: { - token: existingSession.sessionId, - serverVersion: APP_VERSION - }, - success: true, - error: false, - message: "Token created successfully", - status: HttpCode.OK - }); - } - // otherwise generate a new one const resToken = generateSessionToken(); await createNewtSession(resToken, existingNewt.newtId); diff --git a/server/routers/olm/getOlmToken.ts b/server/routers/olm/getOlmToken.ts index 027e7ec15..741b29f0a 100644 --- a/server/routers/olm/getOlmToken.ts +++ b/server/routers/olm/getOlmToken.ts @@ -9,8 +9,6 @@ import { exitNodes, sites, clientSitesAssociationsCache, - olmSessions, - olmSessions } from "@server/db"; import { olms } from "@server/db"; import HttpCode from "@server/types/HttpCode"; From c96c5e8ae8f556f541be24bb7755f93b2897428a Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 18:12:51 -0700 Subject: [PATCH 13/16] Cache token for thundering hurd --- server/lib/tokenCache.ts | 22 ++++++ server/private/lib/cache.ts | 13 ++++ server/private/lib/tokenCache.ts | 77 +++++++++++++++++++ .../remoteExitNode/getRemoteExitNodeToken.ts | 25 ++++-- server/routers/newt/getNewtToken.ts | 18 ++++- server/routers/olm/getOlmToken.ts | 19 ++++- 6 files changed, 161 insertions(+), 13 deletions(-) create mode 100644 server/lib/tokenCache.ts create mode 100644 server/private/lib/tokenCache.ts diff --git a/server/lib/tokenCache.ts b/server/lib/tokenCache.ts new file mode 100644 index 000000000..022f46c15 --- /dev/null +++ b/server/lib/tokenCache.ts @@ -0,0 +1,22 @@ +/** + * Returns a cached plaintext token from Redis if one exists and decrypts + * cleanly, otherwise calls `createSession` to mint a fresh token, stores the + * encrypted value in Redis with the given TTL, and returns it. + * + * Failures at the Redis layer are non-fatal – the function always falls + * through to session creation so the caller is never blocked by a Redis outage. + * + * @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"` + * @param secret Server secret used for AES encryption/decryption + * @param ttlSeconds Cache TTL in seconds (should match session expiry) + * @param createSession Factory that mints a new session and returns its raw token + */ +export async function getOrCreateCachedToken( + cacheKey: string, + secret: string, + ttlSeconds: number, + createSession: () => Promise +): Promise { + const token = await createSession(); + return token; +} diff --git a/server/private/lib/cache.ts b/server/private/lib/cache.ts index e8c03ba3d..1a2006d46 100644 --- a/server/private/lib/cache.ts +++ b/server/private/lib/cache.ts @@ -1,3 +1,16 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + import NodeCache from "node-cache"; import logger from "@server/logger"; import { redisManager } from "@server/private/lib/redis"; diff --git a/server/private/lib/tokenCache.ts b/server/private/lib/tokenCache.ts new file mode 100644 index 000000000..bb6645688 --- /dev/null +++ b/server/private/lib/tokenCache.ts @@ -0,0 +1,77 @@ +/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ + +import redisManager from "#dynamic/lib/redis"; +import { encrypt, decrypt } from "@server/lib/crypto"; +import logger from "@server/logger"; + +/** + * Returns a cached plaintext token from Redis if one exists and decrypts + * cleanly, otherwise calls `createSession` to mint a fresh token, stores the + * encrypted value in Redis with the given TTL, and returns it. + * + * Failures at the Redis layer are non-fatal – the function always falls + * through to session creation so the caller is never blocked by a Redis outage. + * + * @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"` + * @param secret Server secret used for AES encryption/decryption + * @param ttlSeconds Cache TTL in seconds (should match session expiry) + * @param createSession Factory that mints a new session and returns its raw token + */ +export async function getOrCreateCachedToken( + cacheKey: string, + secret: string, + ttlSeconds: number, + createSession: () => Promise +): Promise { + if (redisManager.isRedisEnabled()) { + try { + const cached = await redisManager.get(cacheKey); + if (cached) { + const token = decrypt(cached, secret); + if (token) { + logger.debug(`Token cache hit for key: ${cacheKey}`); + return token; + } + // Decryption produced an empty string – treat as a miss + logger.warn( + `Token cache decryption returned empty string for key: ${cacheKey}, treating as miss` + ); + } + } catch (e) { + logger.warn( + `Token cache read/decrypt failed for key ${cacheKey}, falling through to session creation:`, + e + ); + } + } + + const token = await createSession(); + + if (redisManager.isRedisEnabled()) { + try { + const encrypted = encrypt(token, secret); + await redisManager.set(cacheKey, encrypted, ttlSeconds); + logger.debug( + `Token cached in Redis for key: ${cacheKey} (TTL ${ttlSeconds}s)` + ); + } catch (e) { + logger.warn( + `Token cache write failed for key ${cacheKey} (session was still created):`, + e + ); + } + } + + return token; +} diff --git a/server/private/routers/remoteExitNode/getRemoteExitNodeToken.ts b/server/private/routers/remoteExitNode/getRemoteExitNodeToken.ts index 24f0de159..025e2d34e 100644 --- a/server/private/routers/remoteExitNode/getRemoteExitNodeToken.ts +++ b/server/private/routers/remoteExitNode/getRemoteExitNodeToken.ts @@ -23,8 +23,10 @@ import { z } from "zod"; import { fromError } from "zod-validation-error"; import { createRemoteExitNodeSession, - validateRemoteExitNodeSessionToken + validateRemoteExitNodeSessionToken, + EXPIRES } from "#private/auth/sessions/remoteExitNode"; +import { getOrCreateCachedToken } from "@server/private/lib/tokenCache"; import { verifyPassword } from "@server/auth/password"; import logger from "@server/logger"; import config from "@server/lib/config"; @@ -103,14 +105,23 @@ export async function getRemoteExitNodeToken( ); } - const resToken = generateSessionToken(); - await createRemoteExitNodeSession( - resToken, - existingRemoteExitNode.remoteExitNodeId + // Return a cached token if one exists to prevent thundering herd on + // simultaneous restarts; falls back to creating a fresh session when + // Redis is unavailable or the cache has expired. + const resToken = await getOrCreateCachedToken( + `remote_exit_node:token_cache:${existingRemoteExitNode.remoteExitNodeId}`, + config.getRawConfig().server.secret!, + Math.floor(EXPIRES / 1000), + async () => { + const token = generateSessionToken(); + await createRemoteExitNodeSession( + token, + existingRemoteExitNode.remoteExitNodeId + ); + return token; + } ); - // logger.debug(`Created RemoteExitNode token response: ${JSON.stringify(resToken)}`); - return response<{ token: string }>(res, { data: { token: resToken diff --git a/server/routers/newt/getNewtToken.ts b/server/routers/newt/getNewtToken.ts index 9d3da7e97..c5abb9968 100644 --- a/server/routers/newt/getNewtToken.ts +++ b/server/routers/newt/getNewtToken.ts @@ -1,6 +1,8 @@ import { generateSessionToken } from "@server/auth/sessions/app"; import { db, newtSessions } from "@server/db"; import { newts } from "@server/db"; +import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache"; +import { EXPIRES } from "@server/auth/sessions/newt"; import HttpCode from "@server/types/HttpCode"; import response from "@server/lib/response"; import { eq } from "drizzle-orm"; @@ -92,9 +94,19 @@ export async function getNewtToken( ); } - // otherwise generate a new one - const resToken = generateSessionToken(); - await createNewtSession(resToken, existingNewt.newtId); + // Return a cached token if one exists to prevent thundering herd on + // simultaneous restarts; falls back to creating a fresh session when + // Redis is unavailable or the cache has expired. + const resToken = await getOrCreateCachedToken( + `newt:token_cache:${existingNewt.newtId}`, + config.getRawConfig().server.secret!, + Math.floor(EXPIRES / 1000), + async () => { + const token = generateSessionToken(); + await createNewtSession(token, existingNewt.newtId); + return token; + } + ); return response<{ token: string; serverVersion: string }>(res, { data: { diff --git a/server/routers/olm/getOlmToken.ts b/server/routers/olm/getOlmToken.ts index 741b29f0a..5b8411eb7 100644 --- a/server/routers/olm/getOlmToken.ts +++ b/server/routers/olm/getOlmToken.ts @@ -20,8 +20,10 @@ import { z } from "zod"; import { fromError } from "zod-validation-error"; import { createOlmSession, - validateOlmSessionToken + validateOlmSessionToken, + EXPIRES } from "@server/auth/sessions/olm"; +import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache"; import { verifyPassword } from "@server/auth/password"; import logger from "@server/logger"; import config from "@server/lib/config"; @@ -132,8 +134,19 @@ export async function getOlmToken( logger.debug("Creating new olm session token"); - const resToken = generateSessionToken(); - await createOlmSession(resToken, existingOlm.olmId); + // Return a cached token if one exists to prevent thundering herd on + // simultaneous restarts; falls back to creating a fresh session when + // Redis is unavailable or the cache has expired. + const resToken = await getOrCreateCachedToken( + `olm:token_cache:${existingOlm.olmId}`, + config.getRawConfig().server.secret!, + Math.floor(EXPIRES / 1000), + async () => { + const token = generateSessionToken(); + await createOlmSession(token, existingOlm.olmId); + return token; + } + ); let clientIdToUse; if (orgId) { From 38d30b0214d25a77c731b2ddd72dba611692371e Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 18:13:57 -0700 Subject: [PATCH 14/16] Add license script --- license.py | 115 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100644 license.py diff --git a/license.py b/license.py new file mode 100644 index 000000000..865dfad7a --- /dev/null +++ b/license.py @@ -0,0 +1,115 @@ +import os +import sys + +# --- Configuration --- +# The header text to be added to the files. +HEADER_TEXT = """/* + * This file is part of a proprietary work. + * + * Copyright (c) 2025 Fossorial, Inc. + * All rights reserved. + * + * This file is licensed under the Fossorial Commercial License. + * You may not use this file except in compliance with the License. + * Unauthorized use, copying, modification, or distribution is strictly prohibited. + * + * This file is not licensed under the AGPLv3. + */ +""" + +def should_add_header(file_path): + """ + Checks if a file should receive the commercial license header. + Returns True if 'private' is in the path or file content. + """ + # Check if 'private' is in the file path (case-insensitive) + if 'server/private' in file_path.lower(): + return True + + # Check if 'private' is in the file content (case-insensitive) + # try: + # with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: + # content = f.read() + # if 'private' in content.lower(): + # return True + # except Exception as e: + # print(f"Could not read file {file_path}: {e}") + + return False + +def process_directory(root_dir): + """ + Recursively scans a directory and adds headers to qualifying .ts or .tsx files, + skipping any 'node_modules' directories. + """ + print(f"Scanning directory: {root_dir}") + files_processed = 0 + headers_added = 0 + + for root, dirs, files in os.walk(root_dir): + # --- MODIFICATION --- + # Exclude 'node_modules' directories from the scan to improve performance. + if 'node_modules' in dirs: + dirs.remove('node_modules') + + for file in files: + if file.endswith('.ts') or file.endswith('.tsx'): + file_path = os.path.join(root, file) + files_processed += 1 + + try: + with open(file_path, 'r+', encoding='utf-8') as f: + original_content = f.read() + has_header = original_content.startswith(HEADER_TEXT.strip()) + + if should_add_header(file_path): + # Add header only if it's not already there + if not has_header: + f.seek(0, 0) # Go to the beginning of the file + f.write(HEADER_TEXT.strip() + '\n\n' + original_content) + print(f"Added header to: {file_path}") + headers_added += 1 + else: + print(f"Header already exists in: {file_path}") + else: + # Remove header if it exists but shouldn't be there + if has_header: + # Find the end of the header and remove it (including following newlines) + header_with_newlines = HEADER_TEXT.strip() + '\n\n' + if original_content.startswith(header_with_newlines): + content_without_header = original_content[len(header_with_newlines):] + else: + # Handle case where there might be different newline patterns + header_end = len(HEADER_TEXT.strip()) + # Skip any newlines after the header + while header_end < len(original_content) and original_content[header_end] in '\n\r': + header_end += 1 + content_without_header = original_content[header_end:] + + f.seek(0) + f.write(content_without_header) + f.truncate() + print(f"Removed header from: {file_path}") + headers_added += 1 # Reusing counter for modifications + + except Exception as e: + print(f"Error processing file {file_path}: {e}") + + print("\n--- Scan Complete ---") + print(f"Total .ts or .tsx files found: {files_processed}") + print(f"Files modified (headers added/removed): {headers_added}") + + +if __name__ == "__main__": + # Get the target directory from the command line arguments. + # If no directory is provided, it uses the current directory ('.'). + if len(sys.argv) > 1: + target_directory = sys.argv[1] + else: + target_directory = '.' # Default to current directory + + if not os.path.isdir(target_directory): + print(f"Error: Directory '{target_directory}' not found.") + sys.exit(1) + + process_directory(os.path.abspath(target_directory)) From dfd604c78105e2bdf0ed8943e2fd2f34365e7192 Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 24 Mar 2026 20:27:34 -0700 Subject: [PATCH 15/16] Fix import problems --- server/private/lib/tokenCache.ts | 2 +- server/private/routers/ws/ws.ts | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/server/private/lib/tokenCache.ts b/server/private/lib/tokenCache.ts index bb6645688..284f1d698 100644 --- a/server/private/lib/tokenCache.ts +++ b/server/private/lib/tokenCache.ts @@ -11,7 +11,7 @@ * This file is not licensed under the AGPLv3. */ -import redisManager from "#dynamic/lib/redis"; +import redisManager from "#private/lib/redis"; import { encrypt, decrypt } from "@server/lib/crypto"; import logger from "@server/logger"; diff --git a/server/private/routers/ws/ws.ts b/server/private/routers/ws/ws.ts index 67b83f931..21f4fad37 100644 --- a/server/private/routers/ws/ws.ts +++ b/server/private/routers/ws/ws.ts @@ -20,6 +20,7 @@ import { Newt, newts, Olm, + olms, RemoteExitNode, remoteExitNodes, } from "@server/db"; From 395cab795c3d23f182651cd5d3bbe1316cb2ff5b Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 25 Mar 2026 20:33:58 -0700 Subject: [PATCH 16/16] Batch set bandwidth --- server/routers/gerbil/receiveBandwidth.ts | 146 +++++++++++++--------- 1 file changed, 84 insertions(+), 62 deletions(-) diff --git a/server/routers/gerbil/receiveBandwidth.ts b/server/routers/gerbil/receiveBandwidth.ts index b73ce986d..042c844aa 100644 --- a/server/routers/gerbil/receiveBandwidth.ts +++ b/server/routers/gerbil/receiveBandwidth.ts @@ -1,6 +1,5 @@ import { Request, Response, NextFunction } from "express"; -import { eq, sql } from "drizzle-orm"; -import { sites } from "@server/db"; +import { sql } from "drizzle-orm"; import { db } from "@server/db"; import logger from "@server/logger"; import createHttpError from "http-errors"; @@ -31,7 +30,10 @@ const MAX_RETRIES = 3; const BASE_DELAY_MS = 50; // How often to flush accumulated bandwidth data to the database -const FLUSH_INTERVAL_MS = 30_000; // 30 seconds +const FLUSH_INTERVAL_MS = 300_000; // 300 seconds + +// Maximum number of sites to include in a single batch UPDATE statement +const BATCH_CHUNK_SIZE = 250; // In-memory accumulator: publicKey -> AccumulatorEntry let accumulator = new Map(); @@ -75,13 +77,33 @@ async function withDeadlockRetry( } } +/** + * Execute a raw SQL query that returns rows, in a way that works across both + * the PostgreSQL driver (which exposes `execute`) and the SQLite driver (which + * exposes `all`). Drizzle's typed query builder doesn't support bulk + * UPDATE … FROM (VALUES …) natively, so we drop to raw SQL here. + */ +async function dbQueryRows>( + query: Parameters<(typeof sql)["join"]>[0][number] +): Promise { + const anyDb = db as any; + if (typeof anyDb.execute === "function") { + // PostgreSQL (node-postgres via Drizzle) — returns { rows: [...] } or an array + const result = await anyDb.execute(query); + return (Array.isArray(result) ? result : (result.rows ?? [])) as T[]; + } + // SQLite (better-sqlite3 via Drizzle) — returns an array directly + return (await anyDb.all(query)) as T[]; +} + /** * Flush all accumulated site bandwidth data to the database. * * Swaps out the accumulator before writing so that any bandwidth messages * received during the flush are captured in the new accumulator rather than - * being lost or causing contention. Entries that fail to write are re-queued - * back into the accumulator so they will be retried on the next flush. + * being lost or causing contention. Sites are updated in chunks via a single + * batch UPDATE per chunk. Failed chunks are discarded — exact per-flush + * accuracy is not critical and re-queuing is not worth the added complexity. * * This function is exported so that the application's graceful-shutdown * cleanup handler can call it before the process exits. @@ -108,76 +130,76 @@ export async function flushSiteBandwidthToDb(): Promise { `Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database` ); - // Aggregate billing usage by org, collected during the DB update loop. + // Build a lookup so post-processing can reach each entry by publicKey. + const snapshotMap = new Map(sortedEntries); + + // Aggregate billing usage by org across all chunks. const orgUsageMap = new Map(); - for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) { + // Process in chunks so individual queries stay at a reasonable size. + for (let i = 0; i < sortedEntries.length; i += BATCH_CHUNK_SIZE) { + const chunk = sortedEntries.slice(i, i + BATCH_CHUNK_SIZE); + const chunkEnd = i + chunk.length - 1; + + // Build a parameterised VALUES list: (pubKey, bytesIn, bytesOut), ... + // Both PostgreSQL and SQLite (≥ 3.33.0, which better-sqlite3 bundles) + // support UPDATE … FROM (VALUES …), letting us update the whole chunk + // in a single query instead of N individual round-trips. + const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) => + sql`(${publicKey}, ${bytesIn}, ${bytesOut})` + ); + const valuesClause = sql.join(valuesList, sql`, `); + + let rows: { orgId: string; pubKey: string }[] = []; + try { - const updatedSite = await withDeadlockRetry(async () => { - const [result] = await db - .update(sites) - .set({ - megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`, - megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`, - lastBandwidthUpdate: currentTime, - }) - .where(eq(sites.pubKey, publicKey)) - .returning({ - orgId: sites.orgId, - siteId: sites.siteId - }); - return result; - }, `flush bandwidth for site ${publicKey}`); - - if (updatedSite) { - if (exitNodeId) { - const notAllowed = await checkExitNodeOrg( - exitNodeId, - updatedSite.orgId - ); - if (notAllowed) { - logger.warn( - `Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}` - ); - // Skip usage tracking for this site but continue - // processing the rest. - continue; - } - } - - if (calcUsage) { - const totalBandwidth = bytesIn + bytesOut; - const current = orgUsageMap.get(updatedSite.orgId) ?? 0; - orgUsageMap.set(updatedSite.orgId, current + totalBandwidth); - } - } + rows = await withDeadlockRetry(async () => { + return dbQueryRows<{ orgId: string; pubKey: string }>(sql` + UPDATE sites + SET + "bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in, + "bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out, + "lastBandwidthUpdate" = ${currentTime} + FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out) + WHERE sites."pubKey" = v.pub_key + RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey" + `); + }, `flush bandwidth chunk [${i}–${chunkEnd}]`); } catch (error) { logger.error( - `Failed to flush bandwidth for site ${publicKey}:`, + `Failed to flush bandwidth chunk [${i}–${chunkEnd}], discarding ${chunk.length} site(s):`, error ); + // Discard the chunk — exact per-flush accuracy is not critical. + continue; + } - // Re-queue the failed entry so it is retried on the next flush - // rather than silently dropped. - const existing = accumulator.get(publicKey); - if (existing) { - existing.bytesIn += bytesIn; - existing.bytesOut += bytesOut; - } else { - accumulator.set(publicKey, { - bytesIn, - bytesOut, - exitNodeId, - calcUsage - }); + // Collect billing usage from the returned rows. + for (const { orgId, pubKey } of rows) { + const entry = snapshotMap.get(pubKey); + if (!entry) continue; + + const { bytesIn, bytesOut, exitNodeId, calcUsage } = entry; + + if (exitNodeId) { + const notAllowed = await checkExitNodeOrg(exitNodeId, orgId); + if (notAllowed) { + logger.warn( + `Exit node ${exitNodeId} is not allowed for org ${orgId}` + ); + continue; + } + } + + if (calcUsage) { + const current = orgUsageMap.get(orgId) ?? 0; + orgUsageMap.set(orgId, current + bytesIn + bytesOut); } } } - // Process billing usage updates outside the site-update loop to keep - // lock scope small and concerns separated. + // Process billing usage updates after all chunks are written. if (orgUsageMap.size > 0) { - // Sort org IDs for consistent lock ordering. const sortedOrgIds = [...orgUsageMap.keys()].sort(); for (const orgId of sortedOrgIds) {