mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-26 12:36:41 +00:00
Compare commits
14 Commits
1.16.2-s.2
...
crowdin_de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6c9046405 | ||
|
|
ea2ec84f08 | ||
|
|
40a14ac5f6 | ||
|
|
78169f087f | ||
|
|
4021107d08 | ||
|
|
e78aaeb344 | ||
|
|
0087ca997d | ||
|
|
61d4d314d6 | ||
|
|
2e8b4ddbdb | ||
|
|
3996b86f14 | ||
|
|
7388ef8588 | ||
|
|
bb2edb23e5 | ||
|
|
4cad384476 | ||
|
|
ba07eb1303 |
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Въведете конфигурационния токен от сървърната конзола.",
|
"setupTokenDescription": "Въведете конфигурационния токен от сървърната конзола.",
|
||||||
"setupTokenRequired": "Необходим е конфигурационен токен",
|
"setupTokenRequired": "Необходим е конфигурационен токен",
|
||||||
"actionUpdateSite": "Актуализиране на сайт",
|
"actionUpdateSite": "Актуализиране на сайт",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Изброяване на позволените роли за сайта",
|
"actionListSiteRoles": "Изброяване на позволените роли за сайта",
|
||||||
"actionCreateResource": "Създаване на ресурс",
|
"actionCreateResource": "Създаване на ресурс",
|
||||||
"actionDeleteResource": "Изтриване на ресурс",
|
"actionDeleteResource": "Изтриване на ресурс",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Zadejte nastavovací token z konzole serveru.",
|
"setupTokenDescription": "Zadejte nastavovací token z konzole serveru.",
|
||||||
"setupTokenRequired": "Je vyžadován token nastavení",
|
"setupTokenRequired": "Je vyžadován token nastavení",
|
||||||
"actionUpdateSite": "Aktualizovat stránku",
|
"actionUpdateSite": "Aktualizovat stránku",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Seznam povolených rolí webu",
|
"actionListSiteRoles": "Seznam povolených rolí webu",
|
||||||
"actionCreateResource": "Vytvořit zdroj",
|
"actionCreateResource": "Vytvořit zdroj",
|
||||||
"actionDeleteResource": "Odstranit dokument",
|
"actionDeleteResource": "Odstranit dokument",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Geben Sie das Setup-Token von der Serverkonsole ein.",
|
"setupTokenDescription": "Geben Sie das Setup-Token von der Serverkonsole ein.",
|
||||||
"setupTokenRequired": "Setup-Token ist erforderlich",
|
"setupTokenRequired": "Setup-Token ist erforderlich",
|
||||||
"actionUpdateSite": "Standorte aktualisieren",
|
"actionUpdateSite": "Standorte aktualisieren",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Erlaubte Standort-Rollen auflisten",
|
"actionListSiteRoles": "Erlaubte Standort-Rollen auflisten",
|
||||||
"actionCreateResource": "Ressource erstellen",
|
"actionCreateResource": "Ressource erstellen",
|
||||||
"actionDeleteResource": "Ressource löschen",
|
"actionDeleteResource": "Ressource löschen",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Ingrese el token de configuración desde la consola del servidor.",
|
"setupTokenDescription": "Ingrese el token de configuración desde la consola del servidor.",
|
||||||
"setupTokenRequired": "Se requiere el token de configuración",
|
"setupTokenRequired": "Se requiere el token de configuración",
|
||||||
"actionUpdateSite": "Actualizar sitio",
|
"actionUpdateSite": "Actualizar sitio",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Lista de roles permitidos del sitio",
|
"actionListSiteRoles": "Lista de roles permitidos del sitio",
|
||||||
"actionCreateResource": "Crear Recurso",
|
"actionCreateResource": "Crear Recurso",
|
||||||
"actionDeleteResource": "Eliminar Recurso",
|
"actionDeleteResource": "Eliminar Recurso",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Entrez le jeton de configuration depuis la console du serveur.",
|
"setupTokenDescription": "Entrez le jeton de configuration depuis la console du serveur.",
|
||||||
"setupTokenRequired": "Le jeton de configuration est requis.",
|
"setupTokenRequired": "Le jeton de configuration est requis.",
|
||||||
"actionUpdateSite": "Mettre à jour un site",
|
"actionUpdateSite": "Mettre à jour un site",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Lister les rôles autorisés du site",
|
"actionListSiteRoles": "Lister les rôles autorisés du site",
|
||||||
"actionCreateResource": "Créer une ressource",
|
"actionCreateResource": "Créer une ressource",
|
||||||
"actionDeleteResource": "Supprimer une ressource",
|
"actionDeleteResource": "Supprimer une ressource",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Inserisci il token di configurazione dalla console del server.",
|
"setupTokenDescription": "Inserisci il token di configurazione dalla console del server.",
|
||||||
"setupTokenRequired": "Il token di configurazione è richiesto",
|
"setupTokenRequired": "Il token di configurazione è richiesto",
|
||||||
"actionUpdateSite": "Aggiorna Sito",
|
"actionUpdateSite": "Aggiorna Sito",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Elenca Ruoli Sito Consentiti",
|
"actionListSiteRoles": "Elenca Ruoli Sito Consentiti",
|
||||||
"actionCreateResource": "Crea Risorsa",
|
"actionCreateResource": "Crea Risorsa",
|
||||||
"actionDeleteResource": "Elimina Risorsa",
|
"actionDeleteResource": "Elimina Risorsa",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "서버 콘솔에서 설정 토큰 입력.",
|
"setupTokenDescription": "서버 콘솔에서 설정 토큰 입력.",
|
||||||
"setupTokenRequired": "설정 토큰이 필요합니다",
|
"setupTokenRequired": "설정 토큰이 필요합니다",
|
||||||
"actionUpdateSite": "사이트 업데이트",
|
"actionUpdateSite": "사이트 업데이트",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "허용된 사이트 역할 목록",
|
"actionListSiteRoles": "허용된 사이트 역할 목록",
|
||||||
"actionCreateResource": "리소스 생성",
|
"actionCreateResource": "리소스 생성",
|
||||||
"actionDeleteResource": "리소스 삭제",
|
"actionDeleteResource": "리소스 삭제",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Skriv inn oppsetttoken fra serverkonsollen.",
|
"setupTokenDescription": "Skriv inn oppsetttoken fra serverkonsollen.",
|
||||||
"setupTokenRequired": "Oppsetttoken er nødvendig",
|
"setupTokenRequired": "Oppsetttoken er nødvendig",
|
||||||
"actionUpdateSite": "Oppdater område",
|
"actionUpdateSite": "Oppdater område",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "List opp tillatte områderoller",
|
"actionListSiteRoles": "List opp tillatte områderoller",
|
||||||
"actionCreateResource": "Opprett ressurs",
|
"actionCreateResource": "Opprett ressurs",
|
||||||
"actionDeleteResource": "Slett ressurs",
|
"actionDeleteResource": "Slett ressurs",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Voer het setup-token in vanaf de serverconsole.",
|
"setupTokenDescription": "Voer het setup-token in vanaf de serverconsole.",
|
||||||
"setupTokenRequired": "Setup-token is vereist",
|
"setupTokenRequired": "Setup-token is vereist",
|
||||||
"actionUpdateSite": "Site bijwerken",
|
"actionUpdateSite": "Site bijwerken",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Toon toegestane sitenollen",
|
"actionListSiteRoles": "Toon toegestane sitenollen",
|
||||||
"actionCreateResource": "Bron maken",
|
"actionCreateResource": "Bron maken",
|
||||||
"actionDeleteResource": "Document verwijderen",
|
"actionDeleteResource": "Document verwijderen",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Wprowadź token konfiguracji z konsoli serwera.",
|
"setupTokenDescription": "Wprowadź token konfiguracji z konsoli serwera.",
|
||||||
"setupTokenRequired": "Wymagany jest token konfiguracji",
|
"setupTokenRequired": "Wymagany jest token konfiguracji",
|
||||||
"actionUpdateSite": "Aktualizuj witrynę",
|
"actionUpdateSite": "Aktualizuj witrynę",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Lista dozwolonych ról witryny",
|
"actionListSiteRoles": "Lista dozwolonych ról witryny",
|
||||||
"actionCreateResource": "Utwórz zasób",
|
"actionCreateResource": "Utwórz zasób",
|
||||||
"actionDeleteResource": "Usuń zasób",
|
"actionDeleteResource": "Usuń zasób",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Digite o token de configuração do console do servidor.",
|
"setupTokenDescription": "Digite o token de configuração do console do servidor.",
|
||||||
"setupTokenRequired": "Token de configuração é necessário",
|
"setupTokenRequired": "Token de configuração é necessário",
|
||||||
"actionUpdateSite": "Atualizar Site",
|
"actionUpdateSite": "Atualizar Site",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Listar Funções Permitidas do Site",
|
"actionListSiteRoles": "Listar Funções Permitidas do Site",
|
||||||
"actionCreateResource": "Criar Recurso",
|
"actionCreateResource": "Criar Recurso",
|
||||||
"actionDeleteResource": "Eliminar Recurso",
|
"actionDeleteResource": "Eliminar Recurso",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Введите токен настройки из консоли сервера.",
|
"setupTokenDescription": "Введите токен настройки из консоли сервера.",
|
||||||
"setupTokenRequired": "Токен настройки обязателен",
|
"setupTokenRequired": "Токен настройки обязателен",
|
||||||
"actionUpdateSite": "Обновить сайт",
|
"actionUpdateSite": "Обновить сайт",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Список разрешенных ролей сайта",
|
"actionListSiteRoles": "Список разрешенных ролей сайта",
|
||||||
"actionCreateResource": "Создать ресурс",
|
"actionCreateResource": "Создать ресурс",
|
||||||
"actionDeleteResource": "Удалить ресурс",
|
"actionDeleteResource": "Удалить ресурс",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Sunucu konsolundan kurulum simgesini girin.",
|
"setupTokenDescription": "Sunucu konsolundan kurulum simgesini girin.",
|
||||||
"setupTokenRequired": "Kurulum simgesi gerekli",
|
"setupTokenRequired": "Kurulum simgesi gerekli",
|
||||||
"actionUpdateSite": "Siteyi Güncelle",
|
"actionUpdateSite": "Siteyi Güncelle",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "İzin Verilen Site Rolleri Listele",
|
"actionListSiteRoles": "İzin Verilen Site Rolleri Listele",
|
||||||
"actionCreateResource": "Kaynak Oluştur",
|
"actionCreateResource": "Kaynak Oluştur",
|
||||||
"actionDeleteResource": "Kaynağı Sil",
|
"actionDeleteResource": "Kaynağı Sil",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "从服务器控制台输入设置令牌。",
|
"setupTokenDescription": "从服务器控制台输入设置令牌。",
|
||||||
"setupTokenRequired": "需要设置令牌",
|
"setupTokenRequired": "需要设置令牌",
|
||||||
"actionUpdateSite": "更新站点",
|
"actionUpdateSite": "更新站点",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "允许站点角色列表",
|
"actionListSiteRoles": "允许站点角色列表",
|
||||||
"actionCreateResource": "创建资源",
|
"actionCreateResource": "创建资源",
|
||||||
"actionDeleteResource": "删除资源",
|
"actionDeleteResource": "删除资源",
|
||||||
|
|||||||
@@ -1,10 +1,8 @@
|
|||||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||||
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
|
||||||
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
||||||
|
|
||||||
async function cleanup() {
|
async function cleanup() {
|
||||||
await stopPingAccumulator();
|
|
||||||
await flushBandwidthToDb();
|
await flushBandwidthToDb();
|
||||||
await flushSiteBandwidthToDb();
|
await flushSiteBandwidthToDb();
|
||||||
await wsCleanup();
|
await wsCleanup();
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
||||||
|
import { Pool } from "pg";
|
||||||
import { readConfigFile } from "@server/lib/readConfigFile";
|
import { readConfigFile } from "@server/lib/readConfigFile";
|
||||||
import { withReplicas } from "drizzle-orm/pg-core";
|
import { withReplicas } from "drizzle-orm/pg-core";
|
||||||
import { createPool } from "./poolConfig";
|
|
||||||
|
|
||||||
function createDb() {
|
function createDb() {
|
||||||
const config = readConfigFile();
|
const config = readConfigFile();
|
||||||
@@ -39,17 +39,12 @@ function createDb() {
|
|||||||
|
|
||||||
// Create connection pools instead of individual connections
|
// Create connection pools instead of individual connections
|
||||||
const poolConfig = config.postgres.pool;
|
const poolConfig = config.postgres.pool;
|
||||||
const maxConnections = poolConfig?.max_connections || 20;
|
const primaryPool = new Pool({
|
||||||
const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000;
|
|
||||||
const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000;
|
|
||||||
|
|
||||||
const primaryPool = createPool(
|
|
||||||
connectionString,
|
connectionString,
|
||||||
maxConnections,
|
max: poolConfig?.max_connections || 20,
|
||||||
idleTimeoutMs,
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
connectionTimeoutMs,
|
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
|
||||||
"primary"
|
});
|
||||||
);
|
|
||||||
|
|
||||||
const replicas = [];
|
const replicas = [];
|
||||||
|
|
||||||
@@ -60,16 +55,14 @@ function createDb() {
|
|||||||
})
|
})
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
const maxReplicaConnections =
|
|
||||||
poolConfig?.max_replica_connections || 20;
|
|
||||||
for (const conn of replicaConnections) {
|
for (const conn of replicaConnections) {
|
||||||
const replicaPool = createPool(
|
const replicaPool = new Pool({
|
||||||
conn.connection_string,
|
connectionString: conn.connection_string,
|
||||||
maxReplicaConnections,
|
max: poolConfig?.max_replica_connections || 20,
|
||||||
idleTimeoutMs,
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
connectionTimeoutMs,
|
connectionTimeoutMillis:
|
||||||
"replica"
|
poolConfig?.connection_timeout_ms || 5000
|
||||||
);
|
});
|
||||||
replicas.push(
|
replicas.push(
|
||||||
DrizzlePostgres(replicaPool, {
|
DrizzlePostgres(replicaPool, {
|
||||||
logger: process.env.QUERY_LOGGING == "true"
|
logger: process.env.QUERY_LOGGING == "true"
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
||||||
|
import { Pool } from "pg";
|
||||||
import { readConfigFile } from "@server/lib/readConfigFile";
|
import { readConfigFile } from "@server/lib/readConfigFile";
|
||||||
import { withReplicas } from "drizzle-orm/pg-core";
|
import { withReplicas } from "drizzle-orm/pg-core";
|
||||||
import { build } from "@server/build";
|
import { build } from "@server/build";
|
||||||
import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver";
|
import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver";
|
||||||
import { createPool } from "./poolConfig";
|
|
||||||
|
|
||||||
function createLogsDb() {
|
function createLogsDb() {
|
||||||
// Only use separate logs database in SaaS builds
|
// Only use separate logs database in SaaS builds
|
||||||
@@ -42,17 +42,12 @@ function createLogsDb() {
|
|||||||
|
|
||||||
// Create separate connection pool for logs database
|
// Create separate connection pool for logs database
|
||||||
const poolConfig = logsConfig?.pool || config.postgres?.pool;
|
const poolConfig = logsConfig?.pool || config.postgres?.pool;
|
||||||
const maxConnections = poolConfig?.max_connections || 20;
|
const primaryPool = new Pool({
|
||||||
const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000;
|
|
||||||
const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000;
|
|
||||||
|
|
||||||
const primaryPool = createPool(
|
|
||||||
connectionString,
|
connectionString,
|
||||||
maxConnections,
|
max: poolConfig?.max_connections || 20,
|
||||||
idleTimeoutMs,
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
connectionTimeoutMs,
|
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
|
||||||
"logs-primary"
|
});
|
||||||
);
|
|
||||||
|
|
||||||
const replicas = [];
|
const replicas = [];
|
||||||
|
|
||||||
@@ -63,16 +58,14 @@ function createLogsDb() {
|
|||||||
})
|
})
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
const maxReplicaConnections =
|
|
||||||
poolConfig?.max_replica_connections || 20;
|
|
||||||
for (const conn of replicaConnections) {
|
for (const conn of replicaConnections) {
|
||||||
const replicaPool = createPool(
|
const replicaPool = new Pool({
|
||||||
conn.connection_string,
|
connectionString: conn.connection_string,
|
||||||
maxReplicaConnections,
|
max: poolConfig?.max_replica_connections || 20,
|
||||||
idleTimeoutMs,
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
connectionTimeoutMs,
|
connectionTimeoutMillis:
|
||||||
"logs-replica"
|
poolConfig?.connection_timeout_ms || 5000
|
||||||
);
|
});
|
||||||
replicas.push(
|
replicas.push(
|
||||||
DrizzlePostgres(replicaPool, {
|
DrizzlePostgres(replicaPool, {
|
||||||
logger: process.env.QUERY_LOGGING == "true"
|
logger: process.env.QUERY_LOGGING == "true"
|
||||||
|
|||||||
@@ -1,63 +0,0 @@
|
|||||||
import { Pool, PoolConfig } from "pg";
|
|
||||||
import logger from "@server/logger";
|
|
||||||
|
|
||||||
export function createPoolConfig(
|
|
||||||
connectionString: string,
|
|
||||||
maxConnections: number,
|
|
||||||
idleTimeoutMs: number,
|
|
||||||
connectionTimeoutMs: number
|
|
||||||
): PoolConfig {
|
|
||||||
return {
|
|
||||||
connectionString,
|
|
||||||
max: maxConnections,
|
|
||||||
idleTimeoutMillis: idleTimeoutMs,
|
|
||||||
connectionTimeoutMillis: connectionTimeoutMs,
|
|
||||||
// TCP keepalive to prevent silent connection drops by NAT gateways,
|
|
||||||
// load balancers, and other intermediate network devices (e.g. AWS
|
|
||||||
// NAT Gateway drops idle TCP connections after ~350s)
|
|
||||||
keepAlive: true,
|
|
||||||
keepAliveInitialDelayMillis: 10000, // send first keepalive after 10s of idle
|
|
||||||
// Allow connections to be released and recreated more aggressively
|
|
||||||
// to avoid stale connections building up
|
|
||||||
allowExitOnIdle: false
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
export function attachPoolErrorHandlers(pool: Pool, label: string): void {
|
|
||||||
pool.on("error", (err) => {
|
|
||||||
// This catches errors on idle clients in the pool. Without this
|
|
||||||
// handler an unexpected disconnect would crash the process.
|
|
||||||
logger.error(
|
|
||||||
`Unexpected error on idle ${label} database client: ${err.message}`
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
pool.on("connect", (client) => {
|
|
||||||
// Set a statement timeout on every new connection so a single slow
|
|
||||||
// query can't block the pool forever
|
|
||||||
client.query("SET statement_timeout = '30s'").catch((err: Error) => {
|
|
||||||
logger.warn(
|
|
||||||
`Failed to set statement_timeout on ${label} client: ${err.message}`
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
export function createPool(
|
|
||||||
connectionString: string,
|
|
||||||
maxConnections: number,
|
|
||||||
idleTimeoutMs: number,
|
|
||||||
connectionTimeoutMs: number,
|
|
||||||
label: string
|
|
||||||
): Pool {
|
|
||||||
const pool = new Pool(
|
|
||||||
createPoolConfig(
|
|
||||||
connectionString,
|
|
||||||
maxConnections,
|
|
||||||
idleTimeoutMs,
|
|
||||||
connectionTimeoutMs
|
|
||||||
)
|
|
||||||
);
|
|
||||||
attachPoolErrorHandlers(pool, label);
|
|
||||||
return pool;
|
|
||||||
}
|
|
||||||
@@ -1,22 +0,0 @@
|
|||||||
/**
|
|
||||||
* Returns a cached plaintext token from Redis if one exists and decrypts
|
|
||||||
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
|
||||||
* encrypted value in Redis with the given TTL, and returns it.
|
|
||||||
*
|
|
||||||
* Failures at the Redis layer are non-fatal – the function always falls
|
|
||||||
* through to session creation so the caller is never blocked by a Redis outage.
|
|
||||||
*
|
|
||||||
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
|
||||||
* @param secret Server secret used for AES encryption/decryption
|
|
||||||
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
|
||||||
* @param createSession Factory that mints a new session and returns its raw token
|
|
||||||
*/
|
|
||||||
export async function getOrCreateCachedToken(
|
|
||||||
cacheKey: string,
|
|
||||||
secret: string,
|
|
||||||
ttlSeconds: number,
|
|
||||||
createSession: () => Promise<string>
|
|
||||||
): Promise<string> {
|
|
||||||
const token = await createSession();
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
@@ -15,10 +15,8 @@ import { rateLimitService } from "#private/lib/rateLimit";
|
|||||||
import { cleanup as wsCleanup } from "#private/routers/ws";
|
import { cleanup as wsCleanup } from "#private/routers/ws";
|
||||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||||
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
|
||||||
|
|
||||||
async function cleanup() {
|
async function cleanup() {
|
||||||
await stopPingAccumulator();
|
|
||||||
await flushBandwidthToDb();
|
await flushBandwidthToDb();
|
||||||
await flushSiteBandwidthToDb();
|
await flushSiteBandwidthToDb();
|
||||||
await rateLimitService.cleanup();
|
await rateLimitService.cleanup();
|
||||||
|
|||||||
@@ -1,16 +1,3 @@
|
|||||||
/*
|
|
||||||
* This file is part of a proprietary work.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2025 Fossorial, Inc.
|
|
||||||
* All rights reserved.
|
|
||||||
*
|
|
||||||
* This file is licensed under the Fossorial Commercial License.
|
|
||||||
* You may not use this file except in compliance with the License.
|
|
||||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
|
||||||
*
|
|
||||||
* This file is not licensed under the AGPLv3.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import NodeCache from "node-cache";
|
import NodeCache from "node-cache";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { redisManager } from "@server/private/lib/redis";
|
import { redisManager } from "@server/private/lib/redis";
|
||||||
|
|||||||
@@ -1,77 +0,0 @@
|
|||||||
/*
|
|
||||||
* This file is part of a proprietary work.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2025 Fossorial, Inc.
|
|
||||||
* All rights reserved.
|
|
||||||
*
|
|
||||||
* This file is licensed under the Fossorial Commercial License.
|
|
||||||
* You may not use this file except in compliance with the License.
|
|
||||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
|
||||||
*
|
|
||||||
* This file is not licensed under the AGPLv3.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import redisManager from "#private/lib/redis";
|
|
||||||
import { encrypt, decrypt } from "@server/lib/crypto";
|
|
||||||
import logger from "@server/logger";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a cached plaintext token from Redis if one exists and decrypts
|
|
||||||
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
|
||||||
* encrypted value in Redis with the given TTL, and returns it.
|
|
||||||
*
|
|
||||||
* Failures at the Redis layer are non-fatal – the function always falls
|
|
||||||
* through to session creation so the caller is never blocked by a Redis outage.
|
|
||||||
*
|
|
||||||
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
|
||||||
* @param secret Server secret used for AES encryption/decryption
|
|
||||||
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
|
||||||
* @param createSession Factory that mints a new session and returns its raw token
|
|
||||||
*/
|
|
||||||
export async function getOrCreateCachedToken(
|
|
||||||
cacheKey: string,
|
|
||||||
secret: string,
|
|
||||||
ttlSeconds: number,
|
|
||||||
createSession: () => Promise<string>
|
|
||||||
): Promise<string> {
|
|
||||||
if (redisManager.isRedisEnabled()) {
|
|
||||||
try {
|
|
||||||
const cached = await redisManager.get(cacheKey);
|
|
||||||
if (cached) {
|
|
||||||
const token = decrypt(cached, secret);
|
|
||||||
if (token) {
|
|
||||||
logger.debug(`Token cache hit for key: ${cacheKey}`);
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
// Decryption produced an empty string – treat as a miss
|
|
||||||
logger.warn(
|
|
||||||
`Token cache decryption returned empty string for key: ${cacheKey}, treating as miss`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.warn(
|
|
||||||
`Token cache read/decrypt failed for key ${cacheKey}, falling through to session creation:`,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const token = await createSession();
|
|
||||||
|
|
||||||
if (redisManager.isRedisEnabled()) {
|
|
||||||
try {
|
|
||||||
const encrypted = encrypt(token, secret);
|
|
||||||
await redisManager.set(cacheKey, encrypted, ttlSeconds);
|
|
||||||
logger.debug(
|
|
||||||
`Token cached in Redis for key: ${cacheKey} (TTL ${ttlSeconds}s)`
|
|
||||||
);
|
|
||||||
} catch (e) {
|
|
||||||
logger.warn(
|
|
||||||
`Token cache write failed for key ${cacheKey} (session was still created):`,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
@@ -23,10 +23,8 @@ import { z } from "zod";
|
|||||||
import { fromError } from "zod-validation-error";
|
import { fromError } from "zod-validation-error";
|
||||||
import {
|
import {
|
||||||
createRemoteExitNodeSession,
|
createRemoteExitNodeSession,
|
||||||
validateRemoteExitNodeSessionToken,
|
validateRemoteExitNodeSessionToken
|
||||||
EXPIRES
|
|
||||||
} from "#private/auth/sessions/remoteExitNode";
|
} from "#private/auth/sessions/remoteExitNode";
|
||||||
import { getOrCreateCachedToken } from "@server/private/lib/tokenCache";
|
|
||||||
import { verifyPassword } from "@server/auth/password";
|
import { verifyPassword } from "@server/auth/password";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
@@ -105,22 +103,13 @@ export async function getRemoteExitNodeToken(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a cached token if one exists to prevent thundering herd on
|
const resToken = generateSessionToken();
|
||||||
// simultaneous restarts; falls back to creating a fresh session when
|
|
||||||
// Redis is unavailable or the cache has expired.
|
|
||||||
const resToken = await getOrCreateCachedToken(
|
|
||||||
`remote_exit_node:token_cache:${existingRemoteExitNode.remoteExitNodeId}`,
|
|
||||||
config.getRawConfig().server.secret!,
|
|
||||||
Math.floor(EXPIRES / 1000),
|
|
||||||
async () => {
|
|
||||||
const token = generateSessionToken();
|
|
||||||
await createRemoteExitNodeSession(
|
await createRemoteExitNodeSession(
|
||||||
token,
|
resToken,
|
||||||
existingRemoteExitNode.remoteExitNodeId
|
existingRemoteExitNode.remoteExitNodeId
|
||||||
);
|
);
|
||||||
return token;
|
|
||||||
}
|
// logger.debug(`Created RemoteExitNode token response: ${JSON.stringify(resToken)}`);
|
||||||
);
|
|
||||||
|
|
||||||
return response<{ token: string }>(res, {
|
return response<{ token: string }>(res, {
|
||||||
data: {
|
data: {
|
||||||
|
|||||||
@@ -19,14 +19,17 @@ import { Socket } from "net";
|
|||||||
import {
|
import {
|
||||||
Newt,
|
Newt,
|
||||||
newts,
|
newts,
|
||||||
Olm,
|
NewtSession,
|
||||||
olms,
|
olms,
|
||||||
|
Olm,
|
||||||
|
OlmSession,
|
||||||
RemoteExitNode,
|
RemoteExitNode,
|
||||||
|
RemoteExitNodeSession,
|
||||||
remoteExitNodes,
|
remoteExitNodes,
|
||||||
|
sites
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
|
||||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
@@ -194,7 +197,11 @@ const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
|
|||||||
// Config version tracking map (local to this node, resets on server restart)
|
// Config version tracking map (local to this node, resets on server restart)
|
||||||
const clientConfigVersions: Map<string, number> = new Map();
|
const clientConfigVersions: Map<string, number> = new Map();
|
||||||
|
|
||||||
|
// Tracks the last Unix timestamp (seconds) at which a ping was flushed to the
|
||||||
|
// DB for a given siteId. Resets on server restart which is fine – the first
|
||||||
|
// ping after startup will always write, re-establishing the online state.
|
||||||
|
const lastPingDbWrite: Map<number, number> = new Map();
|
||||||
|
const PING_DB_WRITE_INTERVAL = 45; // seconds
|
||||||
|
|
||||||
// Recovery tracking
|
// Recovery tracking
|
||||||
let isRedisRecoveryInProgress = false;
|
let isRedisRecoveryInProgress = false;
|
||||||
@@ -846,16 +853,32 @@ const setupConnection = async (
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Handle WebSocket protocol-level pings from older newt clients that do
|
||||||
|
// not send application-level "newt/ping" messages. Update the site's
|
||||||
|
// online state and lastPing timestamp so the offline checker treats them
|
||||||
|
// the same as modern newt clients.
|
||||||
if (clientType === "newt") {
|
if (clientType === "newt") {
|
||||||
const newtClient = client as Newt;
|
const newtClient = client as Newt;
|
||||||
ws.on("ping", () => {
|
ws.on("ping", async () => {
|
||||||
if (!newtClient.siteId) return;
|
if (!newtClient.siteId) return;
|
||||||
// Record the ping in the accumulator instead of writing to the
|
const now = Math.floor(Date.now() / 1000);
|
||||||
// database on every WS ping frame. The accumulator flushes all
|
const lastWrite = lastPingDbWrite.get(newtClient.siteId) ?? 0;
|
||||||
// pending pings in a single batched UPDATE every ~10s, which
|
if (now - lastWrite < PING_DB_WRITE_INTERVAL) return;
|
||||||
// prevents connection pool exhaustion under load (especially
|
lastPingDbWrite.set(newtClient.siteId, now);
|
||||||
// with cross-region latency to the database).
|
try {
|
||||||
recordPing(newtClient.siteId);
|
await db
|
||||||
|
.update(sites)
|
||||||
|
.set({
|
||||||
|
online: true,
|
||||||
|
lastPing: now
|
||||||
|
})
|
||||||
|
.where(eq(sites.siteId, newtClient.siteId));
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
"Error updating newt site online state on WS ping",
|
||||||
|
{ error }
|
||||||
|
);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { Request, Response, NextFunction } from "express";
|
import { Request, Response, NextFunction } from "express";
|
||||||
import { sql } from "drizzle-orm";
|
import { eq, sql } from "drizzle-orm";
|
||||||
|
import { sites } from "@server/db";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import createHttpError from "http-errors";
|
import createHttpError from "http-errors";
|
||||||
@@ -30,10 +31,7 @@ const MAX_RETRIES = 3;
|
|||||||
const BASE_DELAY_MS = 50;
|
const BASE_DELAY_MS = 50;
|
||||||
|
|
||||||
// How often to flush accumulated bandwidth data to the database
|
// How often to flush accumulated bandwidth data to the database
|
||||||
const FLUSH_INTERVAL_MS = 300_000; // 300 seconds
|
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
|
||||||
|
|
||||||
// Maximum number of sites to include in a single batch UPDATE statement
|
|
||||||
const BATCH_CHUNK_SIZE = 250;
|
|
||||||
|
|
||||||
// In-memory accumulator: publicKey -> AccumulatorEntry
|
// In-memory accumulator: publicKey -> AccumulatorEntry
|
||||||
let accumulator = new Map<string, AccumulatorEntry>();
|
let accumulator = new Map<string, AccumulatorEntry>();
|
||||||
@@ -77,33 +75,13 @@ async function withDeadlockRetry<T>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Execute a raw SQL query that returns rows, in a way that works across both
|
|
||||||
* the PostgreSQL driver (which exposes `execute`) and the SQLite driver (which
|
|
||||||
* exposes `all`). Drizzle's typed query builder doesn't support bulk
|
|
||||||
* UPDATE … FROM (VALUES …) natively, so we drop to raw SQL here.
|
|
||||||
*/
|
|
||||||
async function dbQueryRows<T extends Record<string, unknown>>(
|
|
||||||
query: Parameters<(typeof sql)["join"]>[0][number]
|
|
||||||
): Promise<T[]> {
|
|
||||||
const anyDb = db as any;
|
|
||||||
if (typeof anyDb.execute === "function") {
|
|
||||||
// PostgreSQL (node-postgres via Drizzle) — returns { rows: [...] } or an array
|
|
||||||
const result = await anyDb.execute(query);
|
|
||||||
return (Array.isArray(result) ? result : (result.rows ?? [])) as T[];
|
|
||||||
}
|
|
||||||
// SQLite (better-sqlite3 via Drizzle) — returns an array directly
|
|
||||||
return (await anyDb.all(query)) as T[];
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush all accumulated site bandwidth data to the database.
|
* Flush all accumulated site bandwidth data to the database.
|
||||||
*
|
*
|
||||||
* Swaps out the accumulator before writing so that any bandwidth messages
|
* Swaps out the accumulator before writing so that any bandwidth messages
|
||||||
* received during the flush are captured in the new accumulator rather than
|
* received during the flush are captured in the new accumulator rather than
|
||||||
* being lost or causing contention. Sites are updated in chunks via a single
|
* being lost or causing contention. Entries that fail to write are re-queued
|
||||||
* batch UPDATE per chunk. Failed chunks are discarded — exact per-flush
|
* back into the accumulator so they will be retried on the next flush.
|
||||||
* accuracy is not critical and re-queuing is not worth the added complexity.
|
|
||||||
*
|
*
|
||||||
* This function is exported so that the application's graceful-shutdown
|
* This function is exported so that the application's graceful-shutdown
|
||||||
* cleanup handler can call it before the process exits.
|
* cleanup handler can call it before the process exits.
|
||||||
@@ -130,76 +108,76 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
|||||||
`Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database`
|
`Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database`
|
||||||
);
|
);
|
||||||
|
|
||||||
// Build a lookup so post-processing can reach each entry by publicKey.
|
// Aggregate billing usage by org, collected during the DB update loop.
|
||||||
const snapshotMap = new Map(sortedEntries);
|
|
||||||
|
|
||||||
// Aggregate billing usage by org across all chunks.
|
|
||||||
const orgUsageMap = new Map<string, number>();
|
const orgUsageMap = new Map<string, number>();
|
||||||
|
|
||||||
// Process in chunks so individual queries stay at a reasonable size.
|
for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) {
|
||||||
for (let i = 0; i < sortedEntries.length; i += BATCH_CHUNK_SIZE) {
|
|
||||||
const chunk = sortedEntries.slice(i, i + BATCH_CHUNK_SIZE);
|
|
||||||
const chunkEnd = i + chunk.length - 1;
|
|
||||||
|
|
||||||
// Build a parameterised VALUES list: (pubKey, bytesIn, bytesOut), ...
|
|
||||||
// Both PostgreSQL and SQLite (≥ 3.33.0, which better-sqlite3 bundles)
|
|
||||||
// support UPDATE … FROM (VALUES …), letting us update the whole chunk
|
|
||||||
// in a single query instead of N individual round-trips.
|
|
||||||
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
|
|
||||||
sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)`
|
|
||||||
);
|
|
||||||
const valuesClause = sql.join(valuesList, sql`, `);
|
|
||||||
|
|
||||||
let rows: { orgId: string; pubKey: string }[] = [];
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
rows = await withDeadlockRetry(async () => {
|
const updatedSite = await withDeadlockRetry(async () => {
|
||||||
return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
|
const [result] = await db
|
||||||
UPDATE sites
|
.update(sites)
|
||||||
SET
|
.set({
|
||||||
"bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in,
|
megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`,
|
||||||
"bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out,
|
megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`,
|
||||||
"lastBandwidthUpdate" = ${currentTime}
|
lastBandwidthUpdate: currentTime,
|
||||||
FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out)
|
})
|
||||||
WHERE sites."pubKey" = v.pub_key
|
.where(eq(sites.pubKey, publicKey))
|
||||||
RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey"
|
.returning({
|
||||||
`);
|
orgId: sites.orgId,
|
||||||
}, `flush bandwidth chunk [${i}–${chunkEnd}]`);
|
siteId: sites.siteId
|
||||||
} catch (error) {
|
});
|
||||||
logger.error(
|
return result;
|
||||||
`Failed to flush bandwidth chunk [${i}–${chunkEnd}], discarding ${chunk.length} site(s):`,
|
}, `flush bandwidth for site ${publicKey}`);
|
||||||
error
|
|
||||||
);
|
|
||||||
// Discard the chunk — exact per-flush accuracy is not critical.
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Collect billing usage from the returned rows.
|
|
||||||
for (const { orgId, pubKey } of rows) {
|
|
||||||
const entry = snapshotMap.get(pubKey);
|
|
||||||
if (!entry) continue;
|
|
||||||
|
|
||||||
const { bytesIn, bytesOut, exitNodeId, calcUsage } = entry;
|
|
||||||
|
|
||||||
|
if (updatedSite) {
|
||||||
if (exitNodeId) {
|
if (exitNodeId) {
|
||||||
const notAllowed = await checkExitNodeOrg(exitNodeId, orgId);
|
const notAllowed = await checkExitNodeOrg(
|
||||||
|
exitNodeId,
|
||||||
|
updatedSite.orgId
|
||||||
|
);
|
||||||
if (notAllowed) {
|
if (notAllowed) {
|
||||||
logger.warn(
|
logger.warn(
|
||||||
`Exit node ${exitNodeId} is not allowed for org ${orgId}`
|
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
|
||||||
);
|
);
|
||||||
|
// Skip usage tracking for this site but continue
|
||||||
|
// processing the rest.
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (calcUsage) {
|
if (calcUsage) {
|
||||||
const current = orgUsageMap.get(orgId) ?? 0;
|
const totalBandwidth = bytesIn + bytesOut;
|
||||||
orgUsageMap.set(orgId, current + bytesIn + bytesOut);
|
const current = orgUsageMap.get(updatedSite.orgId) ?? 0;
|
||||||
|
orgUsageMap.set(updatedSite.orgId, current + totalBandwidth);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to flush bandwidth for site ${publicKey}:`,
|
||||||
|
error
|
||||||
|
);
|
||||||
|
|
||||||
|
// Re-queue the failed entry so it is retried on the next flush
|
||||||
|
// rather than silently dropped.
|
||||||
|
const existing = accumulator.get(publicKey);
|
||||||
|
if (existing) {
|
||||||
|
existing.bytesIn += bytesIn;
|
||||||
|
existing.bytesOut += bytesOut;
|
||||||
|
} else {
|
||||||
|
accumulator.set(publicKey, {
|
||||||
|
bytesIn,
|
||||||
|
bytesOut,
|
||||||
|
exitNodeId,
|
||||||
|
calcUsage
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process billing usage updates after all chunks are written.
|
// Process billing usage updates outside the site-update loop to keep
|
||||||
|
// lock scope small and concerns separated.
|
||||||
if (orgUsageMap.size > 0) {
|
if (orgUsageMap.size > 0) {
|
||||||
|
// Sort org IDs for consistent lock ordering.
|
||||||
const sortedOrgIds = [...orgUsageMap.keys()].sort();
|
const sortedOrgIds = [...orgUsageMap.keys()].sort();
|
||||||
|
|
||||||
for (const orgId of sortedOrgIds) {
|
for (const orgId of sortedOrgIds) {
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
import { generateSessionToken } from "@server/auth/sessions/app";
|
import { generateSessionToken } from "@server/auth/sessions/app";
|
||||||
import { db, newtSessions } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { newts } from "@server/db";
|
import { newts } from "@server/db";
|
||||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
|
||||||
import { EXPIRES } from "@server/auth/sessions/newt";
|
|
||||||
import HttpCode from "@server/types/HttpCode";
|
import HttpCode from "@server/types/HttpCode";
|
||||||
import response from "@server/lib/response";
|
import response from "@server/lib/response";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
@@ -94,19 +92,8 @@ export async function getNewtToken(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a cached token if one exists to prevent thundering herd on
|
const resToken = generateSessionToken();
|
||||||
// simultaneous restarts; falls back to creating a fresh session when
|
await createNewtSession(resToken, existingNewt.newtId);
|
||||||
// Redis is unavailable or the cache has expired.
|
|
||||||
const resToken = await getOrCreateCachedToken(
|
|
||||||
`newt:token_cache:${existingNewt.newtId}`,
|
|
||||||
config.getRawConfig().server.secret!,
|
|
||||||
Math.floor(EXPIRES / 1000),
|
|
||||||
async () => {
|
|
||||||
const token = generateSessionToken();
|
|
||||||
await createNewtSession(token, existingNewt.newtId);
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
return response<{ token: string; serverVersion: string }>(res, {
|
return response<{ token: string; serverVersion: string }>(res, {
|
||||||
data: {
|
data: {
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import { Newt } from "@server/db";
|
|||||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { sendNewtSyncMessage } from "./sync";
|
import { sendNewtSyncMessage } from "./sync";
|
||||||
import { recordPing } from "./pingAccumulator";
|
|
||||||
|
|
||||||
// Track if the offline checker interval is running
|
// Track if the offline checker interval is running
|
||||||
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||||
@@ -115,12 +114,18 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record the ping in memory; it will be flushed to the database
|
try {
|
||||||
// periodically by the ping accumulator (every ~10s) in a single
|
// Mark the site as online and record the ping timestamp.
|
||||||
// batched UPDATE instead of one query per ping. This prevents
|
await db
|
||||||
// connection pool exhaustion under load, especially with
|
.update(sites)
|
||||||
// cross-region latency to the database.
|
.set({
|
||||||
recordPing(newt.siteId);
|
online: true,
|
||||||
|
lastPing: Math.floor(Date.now() / 1000)
|
||||||
|
})
|
||||||
|
.where(eq(sites.siteId, newt.siteId));
|
||||||
|
} catch (error) {
|
||||||
|
logger.error("Error updating online state on newt ping", { error });
|
||||||
|
}
|
||||||
|
|
||||||
// Check config version and sync if stale.
|
// Check config version and sync if stale.
|
||||||
const configVersion = await getClientConfigVersion(newt.newtId);
|
const configVersion = await getClientConfigVersion(newt.newtId);
|
||||||
|
|||||||
@@ -1,382 +0,0 @@
|
|||||||
import { db } from "@server/db";
|
|
||||||
import { sites, clients, olms } from "@server/db";
|
|
||||||
import { eq, inArray } from "drizzle-orm";
|
|
||||||
import logger from "@server/logger";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Ping Accumulator
|
|
||||||
*
|
|
||||||
* Instead of writing to the database on every single newt/olm ping (which
|
|
||||||
* causes pool exhaustion under load, especially with cross-region latency),
|
|
||||||
* we accumulate pings in memory and flush them to the database periodically
|
|
||||||
* in a single batch.
|
|
||||||
*
|
|
||||||
* This is the same pattern used for bandwidth flushing in
|
|
||||||
* receiveBandwidth.ts and handleReceiveBandwidthMessage.ts.
|
|
||||||
*
|
|
||||||
* Supports two kinds of pings:
|
|
||||||
* - **Site pings** (from newts): update `sites.online` and `sites.lastPing`
|
|
||||||
* - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`,
|
|
||||||
* `clients.archived`, and optionally reset `olms.archived`
|
|
||||||
*/
|
|
||||||
|
|
||||||
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
|
||||||
const MAX_RETRIES = 2;
|
|
||||||
const BASE_DELAY_MS = 50;
|
|
||||||
|
|
||||||
// ── Site (newt) pings ──────────────────────────────────────────────────
|
|
||||||
// Map of siteId -> latest ping timestamp (unix seconds)
|
|
||||||
const pendingSitePings: Map<number, number> = new Map();
|
|
||||||
|
|
||||||
// ── Client (OLM) pings ────────────────────────────────────────────────
|
|
||||||
// Map of clientId -> latest ping timestamp (unix seconds)
|
|
||||||
const pendingClientPings: Map<number, number> = new Map();
|
|
||||||
// Set of olmIds whose `archived` flag should be reset to false
|
|
||||||
const pendingOlmArchiveResets: Set<string> = new Set();
|
|
||||||
|
|
||||||
let flushTimer: NodeJS.Timeout | null = null;
|
|
||||||
|
|
||||||
// ── Public API ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Record a ping for a newt site. This does NOT write to the database
|
|
||||||
* immediately. Instead it stores the latest ping timestamp in memory,
|
|
||||||
* to be flushed periodically by the background timer.
|
|
||||||
*/
|
|
||||||
export function recordSitePing(siteId: number): void {
|
|
||||||
const now = Math.floor(Date.now() / 1000);
|
|
||||||
pendingSitePings.set(siteId, now);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */
|
|
||||||
export const recordPing = recordSitePing;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Record a ping for an OLM client. Batches the `clients` table update
|
|
||||||
* (`online`, `lastPing`, `archived`) and, when `olmArchived` is true,
|
|
||||||
* also queues an `olms` table update to clear the archived flag.
|
|
||||||
*/
|
|
||||||
export function recordClientPing(
|
|
||||||
clientId: number,
|
|
||||||
olmId: string,
|
|
||||||
olmArchived: boolean
|
|
||||||
): void {
|
|
||||||
const now = Math.floor(Date.now() / 1000);
|
|
||||||
pendingClientPings.set(clientId, now);
|
|
||||||
if (olmArchived) {
|
|
||||||
pendingOlmArchiveResets.add(olmId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Flush Logic ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flush all accumulated site pings to the database.
|
|
||||||
*/
|
|
||||||
async function flushSitePingsToDb(): Promise<void> {
|
|
||||||
if (pendingSitePings.size === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Snapshot and clear so new pings arriving during the flush go into a
|
|
||||||
// fresh map for the next cycle.
|
|
||||||
const pingsToFlush = new Map(pendingSitePings);
|
|
||||||
pendingSitePings.clear();
|
|
||||||
|
|
||||||
// Sort by siteId for consistent lock ordering (prevents deadlocks)
|
|
||||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
|
||||||
([a], [b]) => a - b
|
|
||||||
);
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
|
||||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
|
||||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await withRetry(async () => {
|
|
||||||
// Group by timestamp for efficient bulk updates
|
|
||||||
const byTimestamp = new Map<number, number[]>();
|
|
||||||
for (const [siteId, timestamp] of batch) {
|
|
||||||
const group = byTimestamp.get(timestamp) || [];
|
|
||||||
group.push(siteId);
|
|
||||||
byTimestamp.set(timestamp, group);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (byTimestamp.size === 1) {
|
|
||||||
const [timestamp, siteIds] = Array.from(
|
|
||||||
byTimestamp.entries()
|
|
||||||
)[0];
|
|
||||||
await db
|
|
||||||
.update(sites)
|
|
||||||
.set({
|
|
||||||
online: true,
|
|
||||||
lastPing: timestamp
|
|
||||||
})
|
|
||||||
.where(inArray(sites.siteId, siteIds));
|
|
||||||
} else {
|
|
||||||
await db.transaction(async (tx) => {
|
|
||||||
for (const [timestamp, siteIds] of byTimestamp) {
|
|
||||||
await tx
|
|
||||||
.update(sites)
|
|
||||||
.set({
|
|
||||||
online: true,
|
|
||||||
lastPing: timestamp
|
|
||||||
})
|
|
||||||
.where(inArray(sites.siteId, siteIds));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, "flushSitePingsToDb");
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
for (const [siteId, timestamp] of batch) {
|
|
||||||
const existing = pendingSitePings.get(siteId);
|
|
||||||
if (!existing || existing < timestamp) {
|
|
||||||
pendingSitePings.set(siteId, timestamp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flush all accumulated client (OLM) pings to the database.
|
|
||||||
*/
|
|
||||||
async function flushClientPingsToDb(): Promise<void> {
|
|
||||||
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Snapshot and clear
|
|
||||||
const pingsToFlush = new Map(pendingClientPings);
|
|
||||||
pendingClientPings.clear();
|
|
||||||
|
|
||||||
const olmResetsToFlush = new Set(pendingOlmArchiveResets);
|
|
||||||
pendingOlmArchiveResets.clear();
|
|
||||||
|
|
||||||
// ── Flush client pings ─────────────────────────────────────────────
|
|
||||||
if (pingsToFlush.size > 0) {
|
|
||||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
|
||||||
([a], [b]) => a - b
|
|
||||||
);
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
|
||||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
|
||||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await withRetry(async () => {
|
|
||||||
const byTimestamp = new Map<number, number[]>();
|
|
||||||
for (const [clientId, timestamp] of batch) {
|
|
||||||
const group = byTimestamp.get(timestamp) || [];
|
|
||||||
group.push(clientId);
|
|
||||||
byTimestamp.set(timestamp, group);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (byTimestamp.size === 1) {
|
|
||||||
const [timestamp, clientIds] = Array.from(
|
|
||||||
byTimestamp.entries()
|
|
||||||
)[0];
|
|
||||||
await db
|
|
||||||
.update(clients)
|
|
||||||
.set({
|
|
||||||
lastPing: timestamp,
|
|
||||||
online: true,
|
|
||||||
archived: false
|
|
||||||
})
|
|
||||||
.where(inArray(clients.clientId, clientIds));
|
|
||||||
} else {
|
|
||||||
await db.transaction(async (tx) => {
|
|
||||||
for (const [timestamp, clientIds] of byTimestamp) {
|
|
||||||
await tx
|
|
||||||
.update(clients)
|
|
||||||
.set({
|
|
||||||
lastPing: timestamp,
|
|
||||||
online: true,
|
|
||||||
archived: false
|
|
||||||
})
|
|
||||||
.where(
|
|
||||||
inArray(clients.clientId, clientIds)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, "flushClientPingsToDb");
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`,
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
for (const [clientId, timestamp] of batch) {
|
|
||||||
const existing = pendingClientPings.get(clientId);
|
|
||||||
if (!existing || existing < timestamp) {
|
|
||||||
pendingClientPings.set(clientId, timestamp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Flush OLM archive resets ───────────────────────────────────────
|
|
||||||
if (olmResetsToFlush.size > 0) {
|
|
||||||
const olmIds = Array.from(olmResetsToFlush).sort();
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
|
||||||
for (let i = 0; i < olmIds.length; i += BATCH_SIZE) {
|
|
||||||
const batch = olmIds.slice(i, i + BATCH_SIZE);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await withRetry(async () => {
|
|
||||||
await db
|
|
||||||
.update(olms)
|
|
||||||
.set({ archived: false })
|
|
||||||
.where(inArray(olms.olmId, batch));
|
|
||||||
}, "flushOlmArchiveResets");
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`,
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
for (const olmId of batch) {
|
|
||||||
pendingOlmArchiveResets.add(olmId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flush everything — called by the interval timer and during shutdown.
|
|
||||||
*/
|
|
||||||
export async function flushPingsToDb(): Promise<void> {
|
|
||||||
await flushSitePingsToDb();
|
|
||||||
await flushClientPingsToDb();
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Retry / Error Helpers ──────────────────────────────────────────────
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Simple retry wrapper with exponential backoff for transient errors
|
|
||||||
* (connection timeouts, unexpected disconnects).
|
|
||||||
*/
|
|
||||||
async function withRetry<T>(
|
|
||||||
operation: () => Promise<T>,
|
|
||||||
context: string
|
|
||||||
): Promise<T> {
|
|
||||||
let attempt = 0;
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
return await operation();
|
|
||||||
} catch (error: any) {
|
|
||||||
if (isTransientError(error) && attempt < MAX_RETRIES) {
|
|
||||||
attempt++;
|
|
||||||
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
|
||||||
const jitter = Math.random() * baseDelay;
|
|
||||||
const delay = baseDelay + jitter;
|
|
||||||
logger.warn(
|
|
||||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
|
||||||
);
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Detect transient connection errors that are safe to retry.
|
|
||||||
*/
|
|
||||||
function isTransientError(error: any): boolean {
|
|
||||||
if (!error) return false;
|
|
||||||
|
|
||||||
const message = (error.message || "").toLowerCase();
|
|
||||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
|
||||||
const code = error.code || "";
|
|
||||||
|
|
||||||
// Connection timeout / terminated
|
|
||||||
if (
|
|
||||||
message.includes("connection timeout") ||
|
|
||||||
message.includes("connection terminated") ||
|
|
||||||
message.includes("timeout exceeded when trying to connect") ||
|
|
||||||
causeMessage.includes("connection terminated unexpectedly") ||
|
|
||||||
causeMessage.includes("connection timeout")
|
|
||||||
) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// PostgreSQL deadlock
|
|
||||||
if (code === "40P01" || message.includes("deadlock")) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ECONNRESET, ECONNREFUSED, EPIPE
|
|
||||||
if (
|
|
||||||
code === "ECONNRESET" ||
|
|
||||||
code === "ECONNREFUSED" ||
|
|
||||||
code === "EPIPE" ||
|
|
||||||
code === "ETIMEDOUT"
|
|
||||||
) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Lifecycle ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start the background flush timer. Call this once at server startup.
|
|
||||||
*/
|
|
||||||
export function startPingAccumulator(): void {
|
|
||||||
if (flushTimer) {
|
|
||||||
return; // Already running
|
|
||||||
}
|
|
||||||
|
|
||||||
flushTimer = setInterval(async () => {
|
|
||||||
try {
|
|
||||||
await flushPingsToDb();
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Unhandled error in ping accumulator flush", {
|
|
||||||
error
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, FLUSH_INTERVAL_MS);
|
|
||||||
|
|
||||||
// Don't prevent the process from exiting
|
|
||||||
flushTimer.unref();
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
`Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stop the background flush timer and perform a final flush.
|
|
||||||
* Call this during graceful shutdown.
|
|
||||||
*/
|
|
||||||
export async function stopPingAccumulator(): Promise<void> {
|
|
||||||
if (flushTimer) {
|
|
||||||
clearInterval(flushTimer);
|
|
||||||
flushTimer = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Final flush to persist any remaining pings
|
|
||||||
try {
|
|
||||||
await flushPingsToDb();
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Error during final ping accumulator flush", { error });
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Ping accumulator stopped");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of pending (unflushed) pings. Useful for monitoring.
|
|
||||||
*/
|
|
||||||
export function getPendingPingCount(): number {
|
|
||||||
return pendingSitePings.size + pendingClientPings.size;
|
|
||||||
}
|
|
||||||
@@ -8,7 +8,7 @@ import {
|
|||||||
ExitNode,
|
ExitNode,
|
||||||
exitNodes,
|
exitNodes,
|
||||||
sites,
|
sites,
|
||||||
clientSitesAssociationsCache,
|
clientSitesAssociationsCache
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import { olms } from "@server/db";
|
import { olms } from "@server/db";
|
||||||
import HttpCode from "@server/types/HttpCode";
|
import HttpCode from "@server/types/HttpCode";
|
||||||
@@ -20,10 +20,8 @@ import { z } from "zod";
|
|||||||
import { fromError } from "zod-validation-error";
|
import { fromError } from "zod-validation-error";
|
||||||
import {
|
import {
|
||||||
createOlmSession,
|
createOlmSession,
|
||||||
validateOlmSessionToken,
|
validateOlmSessionToken
|
||||||
EXPIRES
|
|
||||||
} from "@server/auth/sessions/olm";
|
} from "@server/auth/sessions/olm";
|
||||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
|
||||||
import { verifyPassword } from "@server/auth/password";
|
import { verifyPassword } from "@server/auth/password";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
@@ -134,19 +132,8 @@ export async function getOlmToken(
|
|||||||
|
|
||||||
logger.debug("Creating new olm session token");
|
logger.debug("Creating new olm session token");
|
||||||
|
|
||||||
// Return a cached token if one exists to prevent thundering herd on
|
const resToken = generateSessionToken();
|
||||||
// simultaneous restarts; falls back to creating a fresh session when
|
await createOlmSession(resToken, existingOlm.olmId);
|
||||||
// Redis is unavailable or the cache has expired.
|
|
||||||
const resToken = await getOrCreateCachedToken(
|
|
||||||
`olm:token_cache:${existingOlm.olmId}`,
|
|
||||||
config.getRawConfig().server.secret!,
|
|
||||||
Math.floor(EXPIRES / 1000),
|
|
||||||
async () => {
|
|
||||||
const token = generateSessionToken();
|
|
||||||
await createOlmSession(token, existingOlm.olmId);
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let clientIdToUse;
|
let clientIdToUse;
|
||||||
if (orgId) {
|
if (orgId) {
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ import { db } from "@server/db";
|
|||||||
import { MessageHandler } from "@server/routers/ws";
|
import { MessageHandler } from "@server/routers/ws";
|
||||||
import { clients, olms, Olm } from "@server/db";
|
import { clients, olms, Olm } from "@server/db";
|
||||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||||
import { recordClientPing } from "@server/routers/newt/pingAccumulator";
|
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { validateSessionToken } from "@server/auth/sessions/app";
|
import { validateSessionToken } from "@server/auth/sessions/app";
|
||||||
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
|
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
|
||||||
@@ -202,12 +201,22 @@ export const handleOlmPingMessage: MessageHandler = async (context) => {
|
|||||||
await sendOlmSyncMessage(olm, client);
|
await sendOlmSyncMessage(olm, client);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record the ping in memory; it will be flushed to the database
|
// Update the client's last ping timestamp
|
||||||
// periodically by the ping accumulator (every ~10s) in a single
|
await db
|
||||||
// batched UPDATE instead of one query per ping. This prevents
|
.update(clients)
|
||||||
// connection pool exhaustion under load, especially with
|
.set({
|
||||||
// cross-region latency to the database.
|
lastPing: Math.floor(Date.now() / 1000),
|
||||||
recordClientPing(olm.clientId, olm.olmId, !!olm.archived);
|
online: true,
|
||||||
|
archived: false
|
||||||
|
})
|
||||||
|
.where(eq(clients.clientId, olm.clientId));
|
||||||
|
|
||||||
|
if (olm.archived) {
|
||||||
|
await db
|
||||||
|
.update(olms)
|
||||||
|
.set({ archived: false })
|
||||||
|
.where(eq(olms.olmId, olm.olmId));
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Error handling ping message", { error });
|
logger.error("Error handling ping message", { error });
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import {
|
|||||||
startNewtOfflineChecker,
|
startNewtOfflineChecker,
|
||||||
handleNewtDisconnectingMessage
|
handleNewtDisconnectingMessage
|
||||||
} from "../newt";
|
} from "../newt";
|
||||||
import { startPingAccumulator } from "../newt/pingAccumulator";
|
|
||||||
import {
|
import {
|
||||||
handleOlmRegisterMessage,
|
handleOlmRegisterMessage,
|
||||||
handleOlmRelayMessage,
|
handleOlmRelayMessage,
|
||||||
@@ -47,10 +46,6 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
|||||||
"ws/round-trip/complete": handleRoundTripMessage
|
"ws/round-trip/complete": handleRoundTripMessage
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start the ping accumulator for all builds — it batches per-site online/lastPing
|
|
||||||
// updates into periodic bulk writes, preventing connection pool exhaustion.
|
|
||||||
startPingAccumulator();
|
|
||||||
|
|
||||||
if (build != "saas") {
|
if (build != "saas") {
|
||||||
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
||||||
startNewtOfflineChecker(); // this is to handle the offline check for newts
|
startNewtOfflineChecker(); // this is to handle the offline check for newts
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import { Socket } from "net";
|
|||||||
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
|
||||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||||
import { messageHandlers } from "./messageHandlers";
|
import { messageHandlers } from "./messageHandlers";
|
||||||
@@ -387,14 +386,22 @@ const setupConnection = async (
|
|||||||
// the same as modern newt clients.
|
// the same as modern newt clients.
|
||||||
if (clientType === "newt") {
|
if (clientType === "newt") {
|
||||||
const newtClient = client as Newt;
|
const newtClient = client as Newt;
|
||||||
ws.on("ping", () => {
|
ws.on("ping", async () => {
|
||||||
if (!newtClient.siteId) return;
|
if (!newtClient.siteId) return;
|
||||||
// Record the ping in the accumulator instead of writing to the
|
try {
|
||||||
// database on every WS ping frame. The accumulator flushes all
|
await db
|
||||||
// pending pings in a single batched UPDATE every ~10s, which
|
.update(sites)
|
||||||
// prevents connection pool exhaustion under load (especially
|
.set({
|
||||||
// with cross-region latency to the database).
|
online: true,
|
||||||
recordPing(newtClient.siteId);
|
lastPing: Math.floor(Date.now() / 1000)
|
||||||
|
})
|
||||||
|
.where(eq(sites.siteId, newtClient.siteId));
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
"Error updating newt site online state on WS ping",
|
||||||
|
{ error }
|
||||||
|
);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -275,8 +275,6 @@ export default function Page() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const disabled = !isPaidUser(tierMatrix.orgOidc);
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<>
|
<>
|
||||||
<div className="flex justify-between">
|
<div className="flex justify-between">
|
||||||
@@ -294,9 +292,6 @@ export default function Page() {
|
|||||||
</Button>
|
</Button>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<PaidFeaturesAlert tiers={tierMatrix.orgOidc} />
|
|
||||||
|
|
||||||
<fieldset disabled={disabled} className={disabled ? "opacity-50 pointer-events-none" : ""}>
|
|
||||||
<SettingsContainer>
|
<SettingsContainer>
|
||||||
<SettingsSection>
|
<SettingsSection>
|
||||||
<SettingsSectionHeader>
|
<SettingsSectionHeader>
|
||||||
@@ -817,10 +812,9 @@ export default function Page() {
|
|||||||
</Button>
|
</Button>
|
||||||
<Button
|
<Button
|
||||||
type="submit"
|
type="submit"
|
||||||
disabled={createLoading || disabled}
|
disabled={createLoading || !isPaidUser(tierMatrix.orgOidc)}
|
||||||
loading={createLoading}
|
loading={createLoading}
|
||||||
onClick={() => {
|
onClick={() => {
|
||||||
if (disabled) return;
|
|
||||||
// log any issues with the form
|
// log any issues with the form
|
||||||
console.log(form.formState.errors);
|
console.log(form.formState.errors);
|
||||||
form.handleSubmit(onSubmit)();
|
form.handleSubmit(onSubmit)();
|
||||||
@@ -829,7 +823,6 @@ export default function Page() {
|
|||||||
{t("idpSubmit")}
|
{t("idpSubmit")}
|
||||||
</Button>
|
</Button>
|
||||||
</div>
|
</div>
|
||||||
</fieldset>
|
|
||||||
</>
|
</>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user