mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-25 20:16:40 +00:00
Compare commits
17 Commits
1.16.2-s.1
...
thundering
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dfd604c781 | ||
|
|
c96c5e8ae8 | ||
|
|
6f71e9f0f2 | ||
|
|
d17ec6dc1f | ||
|
|
c36a019f5d | ||
|
|
cf2dfdea5b | ||
|
|
985e1bb9ab | ||
|
|
85335bfecc | ||
|
|
7c2b4f422a | ||
|
|
ad2a0ae127 | ||
|
|
6c2c620c99 | ||
|
|
f643abf19a | ||
|
|
a1729033cf | ||
|
|
7311766512 | ||
|
|
17105f3a51 | ||
|
|
edcfbd26e4 | ||
|
|
0c4d9ea164 |
@@ -1,8 +1,10 @@
|
||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
||||
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
||||
|
||||
async function cleanup() {
|
||||
await stopPingAccumulator();
|
||||
await flushBandwidthToDb();
|
||||
await flushSiteBandwidthToDb();
|
||||
await wsCleanup();
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
||||
import { Pool } from "pg";
|
||||
import { readConfigFile } from "@server/lib/readConfigFile";
|
||||
import { withReplicas } from "drizzle-orm/pg-core";
|
||||
import { createPool } from "./poolConfig";
|
||||
|
||||
function createDb() {
|
||||
const config = readConfigFile();
|
||||
@@ -39,12 +39,17 @@ function createDb() {
|
||||
|
||||
// Create connection pools instead of individual connections
|
||||
const poolConfig = config.postgres.pool;
|
||||
const primaryPool = new Pool({
|
||||
const maxConnections = poolConfig?.max_connections || 20;
|
||||
const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000;
|
||||
const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000;
|
||||
|
||||
const primaryPool = createPool(
|
||||
connectionString,
|
||||
max: poolConfig?.max_connections || 20,
|
||||
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
|
||||
});
|
||||
maxConnections,
|
||||
idleTimeoutMs,
|
||||
connectionTimeoutMs,
|
||||
"primary"
|
||||
);
|
||||
|
||||
const replicas = [];
|
||||
|
||||
@@ -55,14 +60,16 @@ function createDb() {
|
||||
})
|
||||
);
|
||||
} else {
|
||||
const maxReplicaConnections =
|
||||
poolConfig?.max_replica_connections || 20;
|
||||
for (const conn of replicaConnections) {
|
||||
const replicaPool = new Pool({
|
||||
connectionString: conn.connection_string,
|
||||
max: poolConfig?.max_replica_connections || 20,
|
||||
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||
connectionTimeoutMillis:
|
||||
poolConfig?.connection_timeout_ms || 5000
|
||||
});
|
||||
const replicaPool = createPool(
|
||||
conn.connection_string,
|
||||
maxReplicaConnections,
|
||||
idleTimeoutMs,
|
||||
connectionTimeoutMs,
|
||||
"replica"
|
||||
);
|
||||
replicas.push(
|
||||
DrizzlePostgres(replicaPool, {
|
||||
logger: process.env.QUERY_LOGGING == "true"
|
||||
|
||||
@@ -1,9 +1,9 @@
|
||||
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
||||
import { Pool } from "pg";
|
||||
import { readConfigFile } from "@server/lib/readConfigFile";
|
||||
import { withReplicas } from "drizzle-orm/pg-core";
|
||||
import { build } from "@server/build";
|
||||
import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver";
|
||||
import { createPool } from "./poolConfig";
|
||||
|
||||
function createLogsDb() {
|
||||
// Only use separate logs database in SaaS builds
|
||||
@@ -42,12 +42,17 @@ function createLogsDb() {
|
||||
|
||||
// Create separate connection pool for logs database
|
||||
const poolConfig = logsConfig?.pool || config.postgres?.pool;
|
||||
const primaryPool = new Pool({
|
||||
const maxConnections = poolConfig?.max_connections || 20;
|
||||
const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000;
|
||||
const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000;
|
||||
|
||||
const primaryPool = createPool(
|
||||
connectionString,
|
||||
max: poolConfig?.max_connections || 20,
|
||||
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
|
||||
});
|
||||
maxConnections,
|
||||
idleTimeoutMs,
|
||||
connectionTimeoutMs,
|
||||
"logs-primary"
|
||||
);
|
||||
|
||||
const replicas = [];
|
||||
|
||||
@@ -58,14 +63,16 @@ function createLogsDb() {
|
||||
})
|
||||
);
|
||||
} else {
|
||||
const maxReplicaConnections =
|
||||
poolConfig?.max_replica_connections || 20;
|
||||
for (const conn of replicaConnections) {
|
||||
const replicaPool = new Pool({
|
||||
connectionString: conn.connection_string,
|
||||
max: poolConfig?.max_replica_connections || 20,
|
||||
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||
connectionTimeoutMillis:
|
||||
poolConfig?.connection_timeout_ms || 5000
|
||||
});
|
||||
const replicaPool = createPool(
|
||||
conn.connection_string,
|
||||
maxReplicaConnections,
|
||||
idleTimeoutMs,
|
||||
connectionTimeoutMs,
|
||||
"logs-replica"
|
||||
);
|
||||
replicas.push(
|
||||
DrizzlePostgres(replicaPool, {
|
||||
logger: process.env.QUERY_LOGGING == "true"
|
||||
|
||||
63
server/db/pg/poolConfig.ts
Normal file
63
server/db/pg/poolConfig.ts
Normal file
@@ -0,0 +1,63 @@
|
||||
import { Pool, PoolConfig } from "pg";
|
||||
import logger from "@server/logger";
|
||||
|
||||
export function createPoolConfig(
|
||||
connectionString: string,
|
||||
maxConnections: number,
|
||||
idleTimeoutMs: number,
|
||||
connectionTimeoutMs: number
|
||||
): PoolConfig {
|
||||
return {
|
||||
connectionString,
|
||||
max: maxConnections,
|
||||
idleTimeoutMillis: idleTimeoutMs,
|
||||
connectionTimeoutMillis: connectionTimeoutMs,
|
||||
// TCP keepalive to prevent silent connection drops by NAT gateways,
|
||||
// load balancers, and other intermediate network devices (e.g. AWS
|
||||
// NAT Gateway drops idle TCP connections after ~350s)
|
||||
keepAlive: true,
|
||||
keepAliveInitialDelayMillis: 10000, // send first keepalive after 10s of idle
|
||||
// Allow connections to be released and recreated more aggressively
|
||||
// to avoid stale connections building up
|
||||
allowExitOnIdle: false
|
||||
};
|
||||
}
|
||||
|
||||
export function attachPoolErrorHandlers(pool: Pool, label: string): void {
|
||||
pool.on("error", (err) => {
|
||||
// This catches errors on idle clients in the pool. Without this
|
||||
// handler an unexpected disconnect would crash the process.
|
||||
logger.error(
|
||||
`Unexpected error on idle ${label} database client: ${err.message}`
|
||||
);
|
||||
});
|
||||
|
||||
pool.on("connect", (client) => {
|
||||
// Set a statement timeout on every new connection so a single slow
|
||||
// query can't block the pool forever
|
||||
client.query("SET statement_timeout = '30s'").catch((err: Error) => {
|
||||
logger.warn(
|
||||
`Failed to set statement_timeout on ${label} client: ${err.message}`
|
||||
);
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
export function createPool(
|
||||
connectionString: string,
|
||||
maxConnections: number,
|
||||
idleTimeoutMs: number,
|
||||
connectionTimeoutMs: number,
|
||||
label: string
|
||||
): Pool {
|
||||
const pool = new Pool(
|
||||
createPoolConfig(
|
||||
connectionString,
|
||||
maxConnections,
|
||||
idleTimeoutMs,
|
||||
connectionTimeoutMs
|
||||
)
|
||||
);
|
||||
attachPoolErrorHandlers(pool, label);
|
||||
return pool;
|
||||
}
|
||||
40
server/lib/sanitize.ts
Normal file
40
server/lib/sanitize.ts
Normal file
@@ -0,0 +1,40 @@
|
||||
/**
|
||||
* Sanitize a string field before inserting into a database TEXT column.
|
||||
*
|
||||
* Two passes are applied:
|
||||
*
|
||||
* 1. Lone UTF-16 surrogates – JavaScript strings can hold unpaired surrogates
|
||||
* (e.g. \uD800 without a following \uDC00-\uDFFF codepoint). These are
|
||||
* valid in JS but cannot be encoded as UTF-8, triggering
|
||||
* `report_invalid_encoding` in SQLite / Postgres. They are replaced with
|
||||
* the Unicode replacement character U+FFFD so the data is preserved as a
|
||||
* visible signal that something was malformed.
|
||||
*
|
||||
* 2. Null bytes and C0 control characters – SQLite stores TEXT as
|
||||
* null-terminated C strings, so \x00 in a value causes
|
||||
* `report_invalid_encoding`. Bots and scanners routinely inject null bytes
|
||||
* into URLs (e.g. `/path\u0000.jpg`). All C0 control characters in the
|
||||
* range \x00-\x1F are stripped except for the three that are legitimate in
|
||||
* text payloads: HT (\x09), LF (\x0A), and CR (\x0D). DEL (\x7F) is also
|
||||
* stripped.
|
||||
*/
|
||||
export function sanitizeString(value: string): string;
|
||||
export function sanitizeString(
|
||||
value: string | null | undefined
|
||||
): string | undefined;
|
||||
export function sanitizeString(
|
||||
value: string | null | undefined
|
||||
): string | undefined {
|
||||
if (value == null) return undefined;
|
||||
return (
|
||||
value
|
||||
// Replace lone high surrogates (not followed by a low surrogate)
|
||||
// and lone low surrogates (not preceded by a high surrogate).
|
||||
.replace(
|
||||
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/g,
|
||||
"\uFFFD"
|
||||
)
|
||||
// Strip null bytes, C0 control chars (except HT/LF/CR), and DEL.
|
||||
.replace(/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/g, "")
|
||||
);
|
||||
}
|
||||
22
server/lib/tokenCache.ts
Normal file
22
server/lib/tokenCache.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Returns a cached plaintext token from Redis if one exists and decrypts
|
||||
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
||||
* encrypted value in Redis with the given TTL, and returns it.
|
||||
*
|
||||
* Failures at the Redis layer are non-fatal – the function always falls
|
||||
* through to session creation so the caller is never blocked by a Redis outage.
|
||||
*
|
||||
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
||||
* @param secret Server secret used for AES encryption/decryption
|
||||
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
||||
* @param createSession Factory that mints a new session and returns its raw token
|
||||
*/
|
||||
export async function getOrCreateCachedToken(
|
||||
cacheKey: string,
|
||||
secret: string,
|
||||
ttlSeconds: number,
|
||||
createSession: () => Promise<string>
|
||||
): Promise<string> {
|
||||
const token = await createSession();
|
||||
return token;
|
||||
}
|
||||
@@ -15,8 +15,10 @@ import { rateLimitService } from "#private/lib/rateLimit";
|
||||
import { cleanup as wsCleanup } from "#private/routers/ws";
|
||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
||||
|
||||
async function cleanup() {
|
||||
await stopPingAccumulator();
|
||||
await flushBandwidthToDb();
|
||||
await flushSiteBandwidthToDb();
|
||||
await rateLimitService.cleanup();
|
||||
|
||||
@@ -1,3 +1,16 @@
|
||||
/*
|
||||
* This file is part of a proprietary work.
|
||||
*
|
||||
* Copyright (c) 2025 Fossorial, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This file is licensed under the Fossorial Commercial License.
|
||||
* You may not use this file except in compliance with the License.
|
||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||
*
|
||||
* This file is not licensed under the AGPLv3.
|
||||
*/
|
||||
|
||||
import NodeCache from "node-cache";
|
||||
import logger from "@server/logger";
|
||||
import { redisManager } from "@server/private/lib/redis";
|
||||
@@ -24,23 +37,31 @@ setInterval(() => {
|
||||
*/
|
||||
class AdaptiveCache {
|
||||
private useRedis(): boolean {
|
||||
return redisManager.isRedisEnabled() && redisManager.getHealthStatus().isHealthy;
|
||||
return (
|
||||
redisManager.isRedisEnabled() &&
|
||||
redisManager.getHealthStatus().isHealthy
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set a value in the cache
|
||||
* @param key - Cache key
|
||||
* @param value - Value to cache (will be JSON stringified for Redis)
|
||||
* @param ttl - Time to live in seconds (0 = no expiration)
|
||||
* @param ttl - Time to live in seconds (0 = no expiration; omit = 3600s for Redis)
|
||||
* @returns boolean indicating success
|
||||
*/
|
||||
async set(key: string, value: any, ttl?: number): Promise<boolean> {
|
||||
const effectiveTtl = ttl === 0 ? undefined : ttl;
|
||||
const redisTtl = ttl === 0 ? undefined : (ttl ?? 3600);
|
||||
|
||||
if (this.useRedis()) {
|
||||
try {
|
||||
const serialized = JSON.stringify(value);
|
||||
const success = await redisManager.set(key, serialized, effectiveTtl);
|
||||
const success = await redisManager.set(
|
||||
key,
|
||||
serialized,
|
||||
redisTtl
|
||||
);
|
||||
|
||||
if (success) {
|
||||
logger.debug(`Set key in Redis: ${key}`);
|
||||
@@ -48,7 +69,9 @@ class AdaptiveCache {
|
||||
}
|
||||
|
||||
// Redis failed, fall through to local cache
|
||||
logger.debug(`Redis set failed for key ${key}, falling back to local cache`);
|
||||
logger.debug(
|
||||
`Redis set failed for key ${key}, falling back to local cache`
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(`Redis set error for key ${key}:`, error);
|
||||
// Fall through to local cache
|
||||
@@ -120,9 +143,14 @@ class AdaptiveCache {
|
||||
}
|
||||
|
||||
// Some Redis deletes failed, fall through to local cache
|
||||
logger.debug(`Some Redis deletes failed, falling back to local cache`);
|
||||
logger.debug(
|
||||
`Some Redis deletes failed, falling back to local cache`
|
||||
);
|
||||
} catch (error) {
|
||||
logger.error(`Redis del error for keys ${keys.join(", ")}:`, error);
|
||||
logger.error(
|
||||
`Redis del error for keys ${keys.join(", ")}:`,
|
||||
error
|
||||
);
|
||||
// Fall through to local cache
|
||||
deletedCount = 0;
|
||||
}
|
||||
@@ -195,7 +223,9 @@ class AdaptiveCache {
|
||||
*/
|
||||
async flushAll(): Promise<void> {
|
||||
if (this.useRedis()) {
|
||||
logger.warn("Adaptive cache flushAll called - Redis flush not implemented, only local cache will be flushed");
|
||||
logger.warn(
|
||||
"Adaptive cache flushAll called - Redis flush not implemented, only local cache will be flushed"
|
||||
);
|
||||
}
|
||||
|
||||
localCache.flushAll();
|
||||
@@ -239,7 +269,9 @@ class AdaptiveCache {
|
||||
getTtl(key: string): number {
|
||||
// Note: This only works for local cache, Redis TTL is not supported
|
||||
if (this.useRedis()) {
|
||||
logger.warn(`getTtl called for key ${key} but Redis TTL lookup is not implemented`);
|
||||
logger.warn(
|
||||
`getTtl called for key ${key} but Redis TTL lookup is not implemented`
|
||||
);
|
||||
}
|
||||
|
||||
const ttl = localCache.getTtl(key);
|
||||
@@ -255,7 +287,9 @@ class AdaptiveCache {
|
||||
*/
|
||||
keys(): string[] {
|
||||
if (this.useRedis()) {
|
||||
logger.warn("keys() called but Redis keys are not included, only local cache keys returned");
|
||||
logger.warn(
|
||||
"keys() called but Redis keys are not included, only local cache keys returned"
|
||||
);
|
||||
}
|
||||
return localCache.keys();
|
||||
}
|
||||
|
||||
77
server/private/lib/tokenCache.ts
Normal file
77
server/private/lib/tokenCache.ts
Normal file
@@ -0,0 +1,77 @@
|
||||
/*
|
||||
* This file is part of a proprietary work.
|
||||
*
|
||||
* Copyright (c) 2025 Fossorial, Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* This file is licensed under the Fossorial Commercial License.
|
||||
* You may not use this file except in compliance with the License.
|
||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||
*
|
||||
* This file is not licensed under the AGPLv3.
|
||||
*/
|
||||
|
||||
import redisManager from "#private/lib/redis";
|
||||
import { encrypt, decrypt } from "@server/lib/crypto";
|
||||
import logger from "@server/logger";
|
||||
|
||||
/**
|
||||
* Returns a cached plaintext token from Redis if one exists and decrypts
|
||||
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
||||
* encrypted value in Redis with the given TTL, and returns it.
|
||||
*
|
||||
* Failures at the Redis layer are non-fatal – the function always falls
|
||||
* through to session creation so the caller is never blocked by a Redis outage.
|
||||
*
|
||||
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
||||
* @param secret Server secret used for AES encryption/decryption
|
||||
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
||||
* @param createSession Factory that mints a new session and returns its raw token
|
||||
*/
|
||||
export async function getOrCreateCachedToken(
|
||||
cacheKey: string,
|
||||
secret: string,
|
||||
ttlSeconds: number,
|
||||
createSession: () => Promise<string>
|
||||
): Promise<string> {
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
const cached = await redisManager.get(cacheKey);
|
||||
if (cached) {
|
||||
const token = decrypt(cached, secret);
|
||||
if (token) {
|
||||
logger.debug(`Token cache hit for key: ${cacheKey}`);
|
||||
return token;
|
||||
}
|
||||
// Decryption produced an empty string – treat as a miss
|
||||
logger.warn(
|
||||
`Token cache decryption returned empty string for key: ${cacheKey}, treating as miss`
|
||||
);
|
||||
}
|
||||
} catch (e) {
|
||||
logger.warn(
|
||||
`Token cache read/decrypt failed for key ${cacheKey}, falling through to session creation:`,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const token = await createSession();
|
||||
|
||||
if (redisManager.isRedisEnabled()) {
|
||||
try {
|
||||
const encrypted = encrypt(token, secret);
|
||||
await redisManager.set(cacheKey, encrypted, ttlSeconds);
|
||||
logger.debug(
|
||||
`Token cached in Redis for key: ${cacheKey} (TTL ${ttlSeconds}s)`
|
||||
);
|
||||
} catch (e) {
|
||||
logger.warn(
|
||||
`Token cache write failed for key ${cacheKey} (session was still created):`,
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return token;
|
||||
}
|
||||
@@ -15,6 +15,7 @@ import { verifySessionRemoteExitNodeMiddleware } from "#private/middlewares/veri
|
||||
import { Router } from "express";
|
||||
import {
|
||||
db,
|
||||
logsDb,
|
||||
exitNodes,
|
||||
Resource,
|
||||
ResourcePassword,
|
||||
@@ -81,6 +82,7 @@ import { verifyResourceAccessToken } from "@server/auth/verifyResourceAccessToke
|
||||
import semver from "semver";
|
||||
import { maxmindAsnLookup } from "@server/db/maxmindAsn";
|
||||
import { checkOrgAccessPolicy } from "@server/lib/checkOrgAccessPolicy";
|
||||
import { sanitizeString } from "@server/lib/sanitize";
|
||||
|
||||
// Zod schemas for request validation
|
||||
const getResourceByDomainParamsSchema = z.strictObject({
|
||||
@@ -1859,24 +1861,24 @@ hybridRouter.post(
|
||||
})
|
||||
.map((logEntry) => ({
|
||||
timestamp: logEntry.timestamp,
|
||||
orgId: logEntry.orgId,
|
||||
actorType: logEntry.actorType,
|
||||
actor: logEntry.actor,
|
||||
actorId: logEntry.actorId,
|
||||
metadata: logEntry.metadata,
|
||||
orgId: sanitizeString(logEntry.orgId),
|
||||
actorType: sanitizeString(logEntry.actorType),
|
||||
actor: sanitizeString(logEntry.actor),
|
||||
actorId: sanitizeString(logEntry.actorId),
|
||||
metadata: sanitizeString(logEntry.metadata),
|
||||
action: logEntry.action,
|
||||
resourceId: logEntry.resourceId,
|
||||
reason: logEntry.reason,
|
||||
location: logEntry.location,
|
||||
location: sanitizeString(logEntry.location),
|
||||
// userAgent: data.userAgent, // TODO: add this
|
||||
// headers: data.body.headers,
|
||||
// query: data.body.query,
|
||||
originalRequestURL: logEntry.originalRequestURL,
|
||||
scheme: logEntry.scheme,
|
||||
host: logEntry.host,
|
||||
path: logEntry.path,
|
||||
method: logEntry.method,
|
||||
ip: logEntry.ip,
|
||||
originalRequestURL: sanitizeString(logEntry.originalRequestURL) ?? "",
|
||||
scheme: sanitizeString(logEntry.scheme) ?? "",
|
||||
host: sanitizeString(logEntry.host) ?? "",
|
||||
path: sanitizeString(logEntry.path) ?? "",
|
||||
method: sanitizeString(logEntry.method) ?? "",
|
||||
ip: sanitizeString(logEntry.ip),
|
||||
tls: logEntry.tls
|
||||
}));
|
||||
|
||||
@@ -1884,7 +1886,7 @@ hybridRouter.post(
|
||||
const batchSize = 100;
|
||||
for (let i = 0; i < logEntries.length; i += batchSize) {
|
||||
const batch = logEntries.slice(i, i + batchSize);
|
||||
await db.insert(requestAuditLog).values(batch);
|
||||
await logsDb.insert(requestAuditLog).values(batch);
|
||||
}
|
||||
|
||||
return response(res, {
|
||||
|
||||
@@ -23,8 +23,10 @@ import { z } from "zod";
|
||||
import { fromError } from "zod-validation-error";
|
||||
import {
|
||||
createRemoteExitNodeSession,
|
||||
validateRemoteExitNodeSessionToken
|
||||
validateRemoteExitNodeSessionToken,
|
||||
EXPIRES
|
||||
} from "#private/auth/sessions/remoteExitNode";
|
||||
import { getOrCreateCachedToken } from "@server/private/lib/tokenCache";
|
||||
import { verifyPassword } from "@server/auth/password";
|
||||
import logger from "@server/logger";
|
||||
import config from "@server/lib/config";
|
||||
@@ -103,14 +105,23 @@ export async function getRemoteExitNodeToken(
|
||||
);
|
||||
}
|
||||
|
||||
const resToken = generateSessionToken();
|
||||
await createRemoteExitNodeSession(
|
||||
resToken,
|
||||
existingRemoteExitNode.remoteExitNodeId
|
||||
// Return a cached token if one exists to prevent thundering herd on
|
||||
// simultaneous restarts; falls back to creating a fresh session when
|
||||
// Redis is unavailable or the cache has expired.
|
||||
const resToken = await getOrCreateCachedToken(
|
||||
`remote_exit_node:token_cache:${existingRemoteExitNode.remoteExitNodeId}`,
|
||||
config.getRawConfig().server.secret!,
|
||||
Math.floor(EXPIRES / 1000),
|
||||
async () => {
|
||||
const token = generateSessionToken();
|
||||
await createRemoteExitNodeSession(
|
||||
token,
|
||||
existingRemoteExitNode.remoteExitNodeId
|
||||
);
|
||||
return token;
|
||||
}
|
||||
);
|
||||
|
||||
// logger.debug(`Created RemoteExitNode token response: ${JSON.stringify(resToken)}`);
|
||||
|
||||
return response<{ token: string }>(res, {
|
||||
data: {
|
||||
token: resToken
|
||||
|
||||
@@ -38,7 +38,7 @@ export const startRemoteExitNodeOfflineChecker = (): void => {
|
||||
);
|
||||
|
||||
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
|
||||
const newlyOfflineNodes = await db
|
||||
const offlineNodes = await db
|
||||
.update(exitNodes)
|
||||
.set({ online: false })
|
||||
.where(
|
||||
@@ -53,32 +53,15 @@ export const startRemoteExitNodeOfflineChecker = (): void => {
|
||||
)
|
||||
.returning();
|
||||
|
||||
// Update the sites to offline if they have not pinged either
|
||||
const exitNodeIds = newlyOfflineNodes.map(
|
||||
(node) => node.exitNodeId
|
||||
);
|
||||
|
||||
const sitesOnNode = await db
|
||||
.select()
|
||||
.from(sites)
|
||||
.where(
|
||||
and(
|
||||
eq(sites.online, true),
|
||||
inArray(sites.exitNodeId, exitNodeIds)
|
||||
)
|
||||
if (offlineNodes.length > 0) {
|
||||
logger.info(
|
||||
`checkRemoteExitNodeOffline: Marked ${offlineNodes.length} remoteExitNode client(s) offline due to inactivity`
|
||||
);
|
||||
|
||||
// loop through the sites and process their lastBandwidthUpdate as an iso string and if its more than 1 minute old then mark the site offline
|
||||
for (const site of sitesOnNode) {
|
||||
if (!site.lastBandwidthUpdate) {
|
||||
continue;
|
||||
}
|
||||
const lastBandwidthUpdate = new Date(site.lastBandwidthUpdate);
|
||||
if (Date.now() - lastBandwidthUpdate.getTime() > 60 * 1000) {
|
||||
await db
|
||||
.update(sites)
|
||||
.set({ online: false })
|
||||
.where(eq(sites.siteId, site.siteId));
|
||||
for (const offlineClient of offlineNodes) {
|
||||
logger.debug(
|
||||
`checkRemoteExitNodeOffline: Client ${offlineClient.exitNodeId} marked offline (lastPing: ${offlineClient.lastPing})`
|
||||
);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
|
||||
@@ -19,17 +19,14 @@ import { Socket } from "net";
|
||||
import {
|
||||
Newt,
|
||||
newts,
|
||||
NewtSession,
|
||||
olms,
|
||||
Olm,
|
||||
OlmSession,
|
||||
olms,
|
||||
RemoteExitNode,
|
||||
RemoteExitNodeSession,
|
||||
remoteExitNodes,
|
||||
sites
|
||||
} from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { db } from "@server/db";
|
||||
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||
import logger from "@server/logger";
|
||||
@@ -197,11 +194,7 @@ const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
|
||||
// Config version tracking map (local to this node, resets on server restart)
|
||||
const clientConfigVersions: Map<string, number> = new Map();
|
||||
|
||||
// Tracks the last Unix timestamp (seconds) at which a ping was flushed to the
|
||||
// DB for a given siteId. Resets on server restart which is fine – the first
|
||||
// ping after startup will always write, re-establishing the online state.
|
||||
const lastPingDbWrite: Map<number, number> = new Map();
|
||||
const PING_DB_WRITE_INTERVAL = 45; // seconds
|
||||
|
||||
|
||||
// Recovery tracking
|
||||
let isRedisRecoveryInProgress = false;
|
||||
@@ -853,32 +846,16 @@ const setupConnection = async (
|
||||
);
|
||||
});
|
||||
|
||||
// Handle WebSocket protocol-level pings from older newt clients that do
|
||||
// not send application-level "newt/ping" messages. Update the site's
|
||||
// online state and lastPing timestamp so the offline checker treats them
|
||||
// the same as modern newt clients.
|
||||
if (clientType === "newt") {
|
||||
const newtClient = client as Newt;
|
||||
ws.on("ping", async () => {
|
||||
ws.on("ping", () => {
|
||||
if (!newtClient.siteId) return;
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
const lastWrite = lastPingDbWrite.get(newtClient.siteId) ?? 0;
|
||||
if (now - lastWrite < PING_DB_WRITE_INTERVAL) return;
|
||||
lastPingDbWrite.set(newtClient.siteId, now);
|
||||
try {
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: now
|
||||
})
|
||||
.where(eq(sites.siteId, newtClient.siteId));
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Error updating newt site online state on WS ping",
|
||||
{ error }
|
||||
);
|
||||
}
|
||||
// Record the ping in the accumulator instead of writing to the
|
||||
// database on every WS ping frame. The accumulator flushes all
|
||||
// pending pings in a single batched UPDATE every ~10s, which
|
||||
// prevents connection pool exhaustion under load (especially
|
||||
// with cross-region latency to the database).
|
||||
recordPing(newtClient.siteId);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -5,25 +5,7 @@ import cache from "#dynamic/lib/cache";
|
||||
import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs";
|
||||
import { stripPortFromHost } from "@server/lib/ip";
|
||||
|
||||
/**
|
||||
* Sanitize a string field by replacing lone UTF-16 surrogates (which cannot
|
||||
* be encoded as valid UTF-8) with the Unicode replacement character, and
|
||||
* stripping ASCII control characters that are invalid in most text columns.
|
||||
*/
|
||||
function sanitizeString(value: string | undefined | null): string | undefined {
|
||||
if (value == null) return undefined;
|
||||
return (
|
||||
value
|
||||
// Replace lone high surrogates (not followed by a low surrogate)
|
||||
// and lone low surrogates (not preceded by a high surrogate)
|
||||
.replace(
|
||||
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/g,
|
||||
"\uFFFD"
|
||||
)
|
||||
// Strip C0 control characters except HT (\x09), LF (\x0A), CR (\x0D)
|
||||
.replace(/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/g, "")
|
||||
);
|
||||
}
|
||||
import { sanitizeString } from "@server/lib/sanitize";
|
||||
|
||||
/**
|
||||
|
||||
|
||||
@@ -70,7 +70,7 @@ async function getLatestOlmVersion(): Promise<string | null> {
|
||||
tags = tags.filter((version) => !version.name.includes("rc"));
|
||||
const latestVersion = tags[0].name;
|
||||
|
||||
olmVersionCache.set("latestOlmVersion", latestVersion);
|
||||
olmVersionCache.set("latestOlmVersion", latestVersion, 3600);
|
||||
|
||||
return latestVersion;
|
||||
} catch (error: any) {
|
||||
|
||||
@@ -71,7 +71,7 @@ async function getLatestOlmVersion(): Promise<string | null> {
|
||||
tags = tags.filter((version) => !version.name.includes("rc"));
|
||||
const latestVersion = tags[0].name;
|
||||
|
||||
olmVersionCache.set("latestOlmVersion", latestVersion);
|
||||
olmVersionCache.set("latestOlmVersion", latestVersion, 3600);
|
||||
|
||||
return latestVersion;
|
||||
} catch (error: any) {
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { generateSessionToken } from "@server/auth/sessions/app";
|
||||
import { db } from "@server/db";
|
||||
import { db, newtSessions } from "@server/db";
|
||||
import { newts } from "@server/db";
|
||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
||||
import { EXPIRES } from "@server/auth/sessions/newt";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
import response from "@server/lib/response";
|
||||
import { eq } from "drizzle-orm";
|
||||
@@ -92,8 +94,19 @@ export async function getNewtToken(
|
||||
);
|
||||
}
|
||||
|
||||
const resToken = generateSessionToken();
|
||||
await createNewtSession(resToken, existingNewt.newtId);
|
||||
// Return a cached token if one exists to prevent thundering herd on
|
||||
// simultaneous restarts; falls back to creating a fresh session when
|
||||
// Redis is unavailable or the cache has expired.
|
||||
const resToken = await getOrCreateCachedToken(
|
||||
`newt:token_cache:${existingNewt.newtId}`,
|
||||
config.getRawConfig().server.secret!,
|
||||
Math.floor(EXPIRES / 1000),
|
||||
async () => {
|
||||
const token = generateSessionToken();
|
||||
await createNewtSession(token, existingNewt.newtId);
|
||||
return token;
|
||||
}
|
||||
);
|
||||
|
||||
return response<{ token: string; serverVersion: string }>(res, {
|
||||
data: {
|
||||
|
||||
@@ -6,7 +6,9 @@ import logger from "@server/logger";
|
||||
/**
|
||||
* Handles disconnecting messages from sites to show disconnected in the ui
|
||||
*/
|
||||
export const handleNewtDisconnectingMessage: MessageHandler = async (context) => {
|
||||
export const handleNewtDisconnectingMessage: MessageHandler = async (
|
||||
context
|
||||
) => {
|
||||
const { message, client: c, sendToClient } = context;
|
||||
const newt = c as Newt;
|
||||
|
||||
@@ -27,7 +29,7 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (context) =>
|
||||
.set({
|
||||
online: false
|
||||
})
|
||||
.where(eq(sites.siteId, sites.siteId));
|
||||
.where(eq(sites.siteId, newt.siteId));
|
||||
} catch (error) {
|
||||
logger.error("Error handling disconnecting message", { error });
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import { Newt } from "@server/db";
|
||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { sendNewtSyncMessage } from "./sync";
|
||||
import { recordPing } from "./pingAccumulator";
|
||||
|
||||
// Track if the offline checker interval is running
|
||||
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||
@@ -114,18 +115,12 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Mark the site as online and record the ping timestamp.
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: Math.floor(Date.now() / 1000)
|
||||
})
|
||||
.where(eq(sites.siteId, newt.siteId));
|
||||
} catch (error) {
|
||||
logger.error("Error updating online state on newt ping", { error });
|
||||
}
|
||||
// Record the ping in memory; it will be flushed to the database
|
||||
// periodically by the ping accumulator (every ~10s) in a single
|
||||
// batched UPDATE instead of one query per ping. This prevents
|
||||
// connection pool exhaustion under load, especially with
|
||||
// cross-region latency to the database.
|
||||
recordPing(newt.siteId);
|
||||
|
||||
// Check config version and sync if stale.
|
||||
const configVersion = await getClientConfigVersion(newt.newtId);
|
||||
|
||||
382
server/routers/newt/pingAccumulator.ts
Normal file
382
server/routers/newt/pingAccumulator.ts
Normal file
@@ -0,0 +1,382 @@
|
||||
import { db } from "@server/db";
|
||||
import { sites, clients, olms } from "@server/db";
|
||||
import { eq, inArray } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
|
||||
/**
|
||||
* Ping Accumulator
|
||||
*
|
||||
* Instead of writing to the database on every single newt/olm ping (which
|
||||
* causes pool exhaustion under load, especially with cross-region latency),
|
||||
* we accumulate pings in memory and flush them to the database periodically
|
||||
* in a single batch.
|
||||
*
|
||||
* This is the same pattern used for bandwidth flushing in
|
||||
* receiveBandwidth.ts and handleReceiveBandwidthMessage.ts.
|
||||
*
|
||||
* Supports two kinds of pings:
|
||||
* - **Site pings** (from newts): update `sites.online` and `sites.lastPing`
|
||||
* - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`,
|
||||
* `clients.archived`, and optionally reset `olms.archived`
|
||||
*/
|
||||
|
||||
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
||||
const MAX_RETRIES = 2;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
// ── Site (newt) pings ──────────────────────────────────────────────────
|
||||
// Map of siteId -> latest ping timestamp (unix seconds)
|
||||
const pendingSitePings: Map<number, number> = new Map();
|
||||
|
||||
// ── Client (OLM) pings ────────────────────────────────────────────────
|
||||
// Map of clientId -> latest ping timestamp (unix seconds)
|
||||
const pendingClientPings: Map<number, number> = new Map();
|
||||
// Set of olmIds whose `archived` flag should be reset to false
|
||||
const pendingOlmArchiveResets: Set<string> = new Set();
|
||||
|
||||
let flushTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
// ── Public API ─────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Record a ping for a newt site. This does NOT write to the database
|
||||
* immediately. Instead it stores the latest ping timestamp in memory,
|
||||
* to be flushed periodically by the background timer.
|
||||
*/
|
||||
export function recordSitePing(siteId: number): void {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
pendingSitePings.set(siteId, now);
|
||||
}
|
||||
|
||||
/** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */
|
||||
export const recordPing = recordSitePing;
|
||||
|
||||
/**
|
||||
* Record a ping for an OLM client. Batches the `clients` table update
|
||||
* (`online`, `lastPing`, `archived`) and, when `olmArchived` is true,
|
||||
* also queues an `olms` table update to clear the archived flag.
|
||||
*/
|
||||
export function recordClientPing(
|
||||
clientId: number,
|
||||
olmId: string,
|
||||
olmArchived: boolean
|
||||
): void {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
pendingClientPings.set(clientId, now);
|
||||
if (olmArchived) {
|
||||
pendingOlmArchiveResets.add(olmId);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Flush Logic ────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Flush all accumulated site pings to the database.
|
||||
*/
|
||||
async function flushSitePingsToDb(): Promise<void> {
|
||||
if (pendingSitePings.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Snapshot and clear so new pings arriving during the flush go into a
|
||||
// fresh map for the next cycle.
|
||||
const pingsToFlush = new Map(pendingSitePings);
|
||||
pendingSitePings.clear();
|
||||
|
||||
// Sort by siteId for consistent lock ordering (prevents deadlocks)
|
||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
||||
([a], [b]) => a - b
|
||||
);
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
// Group by timestamp for efficient bulk updates
|
||||
const byTimestamp = new Map<number, number[]>();
|
||||
for (const [siteId, timestamp] of batch) {
|
||||
const group = byTimestamp.get(timestamp) || [];
|
||||
group.push(siteId);
|
||||
byTimestamp.set(timestamp, group);
|
||||
}
|
||||
|
||||
if (byTimestamp.size === 1) {
|
||||
const [timestamp, siteIds] = Array.from(
|
||||
byTimestamp.entries()
|
||||
)[0];
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: timestamp
|
||||
})
|
||||
.where(inArray(sites.siteId, siteIds));
|
||||
} else {
|
||||
await db.transaction(async (tx) => {
|
||||
for (const [timestamp, siteIds] of byTimestamp) {
|
||||
await tx
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: timestamp
|
||||
})
|
||||
.where(inArray(sites.siteId, siteIds));
|
||||
}
|
||||
});
|
||||
}
|
||||
}, "flushSitePingsToDb");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
||||
{ error }
|
||||
);
|
||||
for (const [siteId, timestamp] of batch) {
|
||||
const existing = pendingSitePings.get(siteId);
|
||||
if (!existing || existing < timestamp) {
|
||||
pendingSitePings.set(siteId, timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all accumulated client (OLM) pings to the database.
|
||||
*/
|
||||
async function flushClientPingsToDb(): Promise<void> {
|
||||
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Snapshot and clear
|
||||
const pingsToFlush = new Map(pendingClientPings);
|
||||
pendingClientPings.clear();
|
||||
|
||||
const olmResetsToFlush = new Set(pendingOlmArchiveResets);
|
||||
pendingOlmArchiveResets.clear();
|
||||
|
||||
// ── Flush client pings ─────────────────────────────────────────────
|
||||
if (pingsToFlush.size > 0) {
|
||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
||||
([a], [b]) => a - b
|
||||
);
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
const byTimestamp = new Map<number, number[]>();
|
||||
for (const [clientId, timestamp] of batch) {
|
||||
const group = byTimestamp.get(timestamp) || [];
|
||||
group.push(clientId);
|
||||
byTimestamp.set(timestamp, group);
|
||||
}
|
||||
|
||||
if (byTimestamp.size === 1) {
|
||||
const [timestamp, clientIds] = Array.from(
|
||||
byTimestamp.entries()
|
||||
)[0];
|
||||
await db
|
||||
.update(clients)
|
||||
.set({
|
||||
lastPing: timestamp,
|
||||
online: true,
|
||||
archived: false
|
||||
})
|
||||
.where(inArray(clients.clientId, clientIds));
|
||||
} else {
|
||||
await db.transaction(async (tx) => {
|
||||
for (const [timestamp, clientIds] of byTimestamp) {
|
||||
await tx
|
||||
.update(clients)
|
||||
.set({
|
||||
lastPing: timestamp,
|
||||
online: true,
|
||||
archived: false
|
||||
})
|
||||
.where(
|
||||
inArray(clients.clientId, clientIds)
|
||||
);
|
||||
}
|
||||
});
|
||||
}
|
||||
}, "flushClientPingsToDb");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`,
|
||||
{ error }
|
||||
);
|
||||
for (const [clientId, timestamp] of batch) {
|
||||
const existing = pendingClientPings.get(clientId);
|
||||
if (!existing || existing < timestamp) {
|
||||
pendingClientPings.set(clientId, timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Flush OLM archive resets ───────────────────────────────────────
|
||||
if (olmResetsToFlush.size > 0) {
|
||||
const olmIds = Array.from(olmResetsToFlush).sort();
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
for (let i = 0; i < olmIds.length; i += BATCH_SIZE) {
|
||||
const batch = olmIds.slice(i, i + BATCH_SIZE);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
await db
|
||||
.update(olms)
|
||||
.set({ archived: false })
|
||||
.where(inArray(olms.olmId, batch));
|
||||
}, "flushOlmArchiveResets");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`,
|
||||
{ error }
|
||||
);
|
||||
for (const olmId of batch) {
|
||||
pendingOlmArchiveResets.add(olmId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush everything — called by the interval timer and during shutdown.
|
||||
*/
|
||||
export async function flushPingsToDb(): Promise<void> {
|
||||
await flushSitePingsToDb();
|
||||
await flushClientPingsToDb();
|
||||
}
|
||||
|
||||
// ── Retry / Error Helpers ──────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Simple retry wrapper with exponential backoff for transient errors
|
||||
* (connection timeouts, unexpected disconnects).
|
||||
*/
|
||||
async function withRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
context: string
|
||||
): Promise<T> {
|
||||
let attempt = 0;
|
||||
while (true) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: any) {
|
||||
if (isTransientError(error) && attempt < MAX_RETRIES) {
|
||||
attempt++;
|
||||
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
||||
const jitter = Math.random() * baseDelay;
|
||||
const delay = baseDelay + jitter;
|
||||
logger.warn(
|
||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
continue;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect transient connection errors that are safe to retry.
|
||||
*/
|
||||
function isTransientError(error: any): boolean {
|
||||
if (!error) return false;
|
||||
|
||||
const message = (error.message || "").toLowerCase();
|
||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||
const code = error.code || "";
|
||||
|
||||
// Connection timeout / terminated
|
||||
if (
|
||||
message.includes("connection timeout") ||
|
||||
message.includes("connection terminated") ||
|
||||
message.includes("timeout exceeded when trying to connect") ||
|
||||
causeMessage.includes("connection terminated unexpectedly") ||
|
||||
causeMessage.includes("connection timeout")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL deadlock
|
||||
if (code === "40P01" || message.includes("deadlock")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// ECONNRESET, ECONNREFUSED, EPIPE
|
||||
if (
|
||||
code === "ECONNRESET" ||
|
||||
code === "ECONNREFUSED" ||
|
||||
code === "EPIPE" ||
|
||||
code === "ETIMEDOUT"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// ── Lifecycle ──────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Start the background flush timer. Call this once at server startup.
|
||||
*/
|
||||
export function startPingAccumulator(): void {
|
||||
if (flushTimer) {
|
||||
return; // Already running
|
||||
}
|
||||
|
||||
flushTimer = setInterval(async () => {
|
||||
try {
|
||||
await flushPingsToDb();
|
||||
} catch (error) {
|
||||
logger.error("Unhandled error in ping accumulator flush", {
|
||||
error
|
||||
});
|
||||
}
|
||||
}, FLUSH_INTERVAL_MS);
|
||||
|
||||
// Don't prevent the process from exiting
|
||||
flushTimer.unref();
|
||||
|
||||
logger.info(
|
||||
`Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the background flush timer and perform a final flush.
|
||||
* Call this during graceful shutdown.
|
||||
*/
|
||||
export async function stopPingAccumulator(): Promise<void> {
|
||||
if (flushTimer) {
|
||||
clearInterval(flushTimer);
|
||||
flushTimer = null;
|
||||
}
|
||||
|
||||
// Final flush to persist any remaining pings
|
||||
try {
|
||||
await flushPingsToDb();
|
||||
} catch (error) {
|
||||
logger.error("Error during final ping accumulator flush", { error });
|
||||
}
|
||||
|
||||
logger.info("Ping accumulator stopped");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of pending (unflushed) pings. Useful for monitoring.
|
||||
*/
|
||||
export function getPendingPingCount(): number {
|
||||
return pendingSitePings.size + pendingClientPings.size;
|
||||
}
|
||||
@@ -8,7 +8,7 @@ import {
|
||||
ExitNode,
|
||||
exitNodes,
|
||||
sites,
|
||||
clientSitesAssociationsCache
|
||||
clientSitesAssociationsCache,
|
||||
} from "@server/db";
|
||||
import { olms } from "@server/db";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
@@ -20,8 +20,10 @@ import { z } from "zod";
|
||||
import { fromError } from "zod-validation-error";
|
||||
import {
|
||||
createOlmSession,
|
||||
validateOlmSessionToken
|
||||
validateOlmSessionToken,
|
||||
EXPIRES
|
||||
} from "@server/auth/sessions/olm";
|
||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
||||
import { verifyPassword } from "@server/auth/password";
|
||||
import logger from "@server/logger";
|
||||
import config from "@server/lib/config";
|
||||
@@ -132,8 +134,19 @@ export async function getOlmToken(
|
||||
|
||||
logger.debug("Creating new olm session token");
|
||||
|
||||
const resToken = generateSessionToken();
|
||||
await createOlmSession(resToken, existingOlm.olmId);
|
||||
// Return a cached token if one exists to prevent thundering herd on
|
||||
// simultaneous restarts; falls back to creating a fresh session when
|
||||
// Redis is unavailable or the cache has expired.
|
||||
const resToken = await getOrCreateCachedToken(
|
||||
`olm:token_cache:${existingOlm.olmId}`,
|
||||
config.getRawConfig().server.secret!,
|
||||
Math.floor(EXPIRES / 1000),
|
||||
async () => {
|
||||
const token = generateSessionToken();
|
||||
await createOlmSession(token, existingOlm.olmId);
|
||||
return token;
|
||||
}
|
||||
);
|
||||
|
||||
let clientIdToUse;
|
||||
if (orgId) {
|
||||
|
||||
@@ -3,6 +3,7 @@ import { db } from "@server/db";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { clients, olms, Olm } from "@server/db";
|
||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||
import { recordClientPing } from "@server/routers/newt/pingAccumulator";
|
||||
import logger from "@server/logger";
|
||||
import { validateSessionToken } from "@server/auth/sessions/app";
|
||||
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
|
||||
@@ -201,22 +202,12 @@ export const handleOlmPingMessage: MessageHandler = async (context) => {
|
||||
await sendOlmSyncMessage(olm, client);
|
||||
}
|
||||
|
||||
// Update the client's last ping timestamp
|
||||
await db
|
||||
.update(clients)
|
||||
.set({
|
||||
lastPing: Math.floor(Date.now() / 1000),
|
||||
online: true,
|
||||
archived: false
|
||||
})
|
||||
.where(eq(clients.clientId, olm.clientId));
|
||||
|
||||
if (olm.archived) {
|
||||
await db
|
||||
.update(olms)
|
||||
.set({ archived: false })
|
||||
.where(eq(olms.olmId, olm.olmId));
|
||||
}
|
||||
// Record the ping in memory; it will be flushed to the database
|
||||
// periodically by the ping accumulator (every ~10s) in a single
|
||||
// batched UPDATE instead of one query per ping. This prevents
|
||||
// connection pool exhaustion under load, especially with
|
||||
// cross-region latency to the database.
|
||||
recordClientPing(olm.clientId, olm.olmId, !!olm.archived);
|
||||
} catch (error) {
|
||||
logger.error("Error handling ping message", { error });
|
||||
}
|
||||
|
||||
@@ -55,7 +55,7 @@ async function getLatestNewtVersion(): Promise<string | null> {
|
||||
tags = tags.filter((version) => !version.name.includes("rc"));
|
||||
const latestVersion = tags[0].name;
|
||||
|
||||
await cache.set("latestNewtVersion", latestVersion);
|
||||
await cache.set("latestNewtVersion", latestVersion, 3600);
|
||||
|
||||
return latestVersion;
|
||||
} catch (error: any) {
|
||||
@@ -180,7 +180,7 @@ registry.registerPath({
|
||||
method: "get",
|
||||
path: "/org/{orgId}/sites",
|
||||
description: "List all sites in an organization",
|
||||
tags: [OpenAPITags.Site],
|
||||
tags: [OpenAPITags.Org, OpenAPITags.Site],
|
||||
request: {
|
||||
params: listSitesParamsSchema,
|
||||
query: listSitesSchema
|
||||
|
||||
@@ -201,7 +201,7 @@ export async function inviteUser(
|
||||
);
|
||||
}
|
||||
|
||||
await cache.set(email, attempts + 1);
|
||||
await cache.set("regenerateInvite:" + email, attempts + 1, 3600);
|
||||
|
||||
const inviteId = existingInvite[0].inviteId; // Retrieve the original inviteId
|
||||
const token = generateRandomString(
|
||||
|
||||
@@ -11,6 +11,7 @@ import {
|
||||
startNewtOfflineChecker,
|
||||
handleNewtDisconnectingMessage
|
||||
} from "../newt";
|
||||
import { startPingAccumulator } from "../newt/pingAccumulator";
|
||||
import {
|
||||
handleOlmRegisterMessage,
|
||||
handleOlmRelayMessage,
|
||||
@@ -46,6 +47,10 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
||||
"ws/round-trip/complete": handleRoundTripMessage
|
||||
};
|
||||
|
||||
// Start the ping accumulator for all builds — it batches per-site online/lastPing
|
||||
// updates into periodic bulk writes, preventing connection pool exhaustion.
|
||||
startPingAccumulator();
|
||||
|
||||
if (build != "saas") {
|
||||
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
||||
startNewtOfflineChecker(); // this is to handle the offline check for newts
|
||||
|
||||
@@ -6,6 +6,7 @@ import { Socket } from "net";
|
||||
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { db } from "@server/db";
|
||||
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||
import { messageHandlers } from "./messageHandlers";
|
||||
@@ -386,22 +387,14 @@ const setupConnection = async (
|
||||
// the same as modern newt clients.
|
||||
if (clientType === "newt") {
|
||||
const newtClient = client as Newt;
|
||||
ws.on("ping", async () => {
|
||||
ws.on("ping", () => {
|
||||
if (!newtClient.siteId) return;
|
||||
try {
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: Math.floor(Date.now() / 1000)
|
||||
})
|
||||
.where(eq(sites.siteId, newtClient.siteId));
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
"Error updating newt site online state on WS ping",
|
||||
{ error }
|
||||
);
|
||||
}
|
||||
// Record the ping in the accumulator instead of writing to the
|
||||
// database on every WS ping frame. The accumulator flushes all
|
||||
// pending pings in a single batched UPDATE every ~10s, which
|
||||
// prevents connection pool exhaustion under load (especially
|
||||
// with cross-region latency to the database).
|
||||
recordPing(newtClient.siteId);
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -275,6 +275,8 @@ export default function Page() {
|
||||
}
|
||||
}
|
||||
|
||||
const disabled = !isPaidUser(tierMatrix.orgOidc);
|
||||
|
||||
return (
|
||||
<>
|
||||
<div className="flex justify-between">
|
||||
@@ -292,6 +294,9 @@ export default function Page() {
|
||||
</Button>
|
||||
</div>
|
||||
|
||||
<PaidFeaturesAlert tiers={tierMatrix.orgOidc} />
|
||||
|
||||
<fieldset disabled={disabled} className={disabled ? "opacity-50 pointer-events-none" : ""}>
|
||||
<SettingsContainer>
|
||||
<SettingsSection>
|
||||
<SettingsSectionHeader>
|
||||
@@ -812,9 +817,10 @@ export default function Page() {
|
||||
</Button>
|
||||
<Button
|
||||
type="submit"
|
||||
disabled={createLoading || !isPaidUser(tierMatrix.orgOidc)}
|
||||
disabled={createLoading || disabled}
|
||||
loading={createLoading}
|
||||
onClick={() => {
|
||||
if (disabled) return;
|
||||
// log any issues with the form
|
||||
console.log(form.formState.errors);
|
||||
form.handleSubmit(onSubmit)();
|
||||
@@ -823,6 +829,7 @@ export default function Page() {
|
||||
{t("idpSubmit")}
|
||||
</Button>
|
||||
</div>
|
||||
</fieldset>
|
||||
</>
|
||||
);
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ import { usePathname, useRouter } from "next/navigation";
|
||||
import { useMemo, useState } from "react";
|
||||
import { useUserContext } from "@app/hooks/useUserContext";
|
||||
import { useTranslations } from "next-intl";
|
||||
import { build } from "@server/build";
|
||||
|
||||
interface OrgSelectorProps {
|
||||
orgId?: string;
|
||||
@@ -50,6 +51,11 @@ export function OrgSelector({
|
||||
|
||||
const selectedOrg = orgs?.find((org) => org.orgId === orgId);
|
||||
|
||||
let canCreateOrg = !env.flags.disableUserCreateOrg || user.serverAdmin;
|
||||
if (build === "saas" && user.type !== "internal") {
|
||||
canCreateOrg = false;
|
||||
}
|
||||
|
||||
const sortedOrgs = useMemo(() => {
|
||||
if (!orgs?.length) return orgs ?? [];
|
||||
return [...orgs].sort((a, b) => {
|
||||
@@ -161,7 +167,7 @@ export function OrgSelector({
|
||||
</CommandGroup>
|
||||
</CommandList>
|
||||
</Command>
|
||||
{(!env.flags.disableUserCreateOrg || user.serverAdmin) && (
|
||||
{canCreateOrg && (
|
||||
<div className="p-2 border-t border-border">
|
||||
<Button
|
||||
variant="ghost"
|
||||
|
||||
Reference in New Issue
Block a user