mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-25 03:56:38 +00:00
Compare commits
14 Commits
dev
...
crowdin_de
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b6c9046405 | ||
|
|
ea2ec84f08 | ||
|
|
40a14ac5f6 | ||
|
|
78169f087f | ||
|
|
4021107d08 | ||
|
|
e78aaeb344 | ||
|
|
0087ca997d | ||
|
|
61d4d314d6 | ||
|
|
2e8b4ddbdb | ||
|
|
3996b86f14 | ||
|
|
7388ef8588 | ||
|
|
bb2edb23e5 | ||
|
|
4cad384476 | ||
|
|
ba07eb1303 |
115
license.py
115
license.py
@@ -1,115 +0,0 @@
|
|||||||
import os
|
|
||||||
import sys
|
|
||||||
|
|
||||||
# --- Configuration ---
|
|
||||||
# The header text to be added to the files.
|
|
||||||
HEADER_TEXT = """/*
|
|
||||||
* This file is part of a proprietary work.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2025 Fossorial, Inc.
|
|
||||||
* All rights reserved.
|
|
||||||
*
|
|
||||||
* This file is licensed under the Fossorial Commercial License.
|
|
||||||
* You may not use this file except in compliance with the License.
|
|
||||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
|
||||||
*
|
|
||||||
* This file is not licensed under the AGPLv3.
|
|
||||||
*/
|
|
||||||
"""
|
|
||||||
|
|
||||||
def should_add_header(file_path):
|
|
||||||
"""
|
|
||||||
Checks if a file should receive the commercial license header.
|
|
||||||
Returns True if 'private' is in the path or file content.
|
|
||||||
"""
|
|
||||||
# Check if 'private' is in the file path (case-insensitive)
|
|
||||||
if 'server/private' in file_path.lower():
|
|
||||||
return True
|
|
||||||
|
|
||||||
# Check if 'private' is in the file content (case-insensitive)
|
|
||||||
# try:
|
|
||||||
# with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
|
||||||
# content = f.read()
|
|
||||||
# if 'private' in content.lower():
|
|
||||||
# return True
|
|
||||||
# except Exception as e:
|
|
||||||
# print(f"Could not read file {file_path}: {e}")
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
def process_directory(root_dir):
|
|
||||||
"""
|
|
||||||
Recursively scans a directory and adds headers to qualifying .ts or .tsx files,
|
|
||||||
skipping any 'node_modules' directories.
|
|
||||||
"""
|
|
||||||
print(f"Scanning directory: {root_dir}")
|
|
||||||
files_processed = 0
|
|
||||||
headers_added = 0
|
|
||||||
|
|
||||||
for root, dirs, files in os.walk(root_dir):
|
|
||||||
# --- MODIFICATION ---
|
|
||||||
# Exclude 'node_modules' directories from the scan to improve performance.
|
|
||||||
if 'node_modules' in dirs:
|
|
||||||
dirs.remove('node_modules')
|
|
||||||
|
|
||||||
for file in files:
|
|
||||||
if file.endswith('.ts') or file.endswith('.tsx'):
|
|
||||||
file_path = os.path.join(root, file)
|
|
||||||
files_processed += 1
|
|
||||||
|
|
||||||
try:
|
|
||||||
with open(file_path, 'r+', encoding='utf-8') as f:
|
|
||||||
original_content = f.read()
|
|
||||||
has_header = original_content.startswith(HEADER_TEXT.strip())
|
|
||||||
|
|
||||||
if should_add_header(file_path):
|
|
||||||
# Add header only if it's not already there
|
|
||||||
if not has_header:
|
|
||||||
f.seek(0, 0) # Go to the beginning of the file
|
|
||||||
f.write(HEADER_TEXT.strip() + '\n\n' + original_content)
|
|
||||||
print(f"Added header to: {file_path}")
|
|
||||||
headers_added += 1
|
|
||||||
else:
|
|
||||||
print(f"Header already exists in: {file_path}")
|
|
||||||
else:
|
|
||||||
# Remove header if it exists but shouldn't be there
|
|
||||||
if has_header:
|
|
||||||
# Find the end of the header and remove it (including following newlines)
|
|
||||||
header_with_newlines = HEADER_TEXT.strip() + '\n\n'
|
|
||||||
if original_content.startswith(header_with_newlines):
|
|
||||||
content_without_header = original_content[len(header_with_newlines):]
|
|
||||||
else:
|
|
||||||
# Handle case where there might be different newline patterns
|
|
||||||
header_end = len(HEADER_TEXT.strip())
|
|
||||||
# Skip any newlines after the header
|
|
||||||
while header_end < len(original_content) and original_content[header_end] in '\n\r':
|
|
||||||
header_end += 1
|
|
||||||
content_without_header = original_content[header_end:]
|
|
||||||
|
|
||||||
f.seek(0)
|
|
||||||
f.write(content_without_header)
|
|
||||||
f.truncate()
|
|
||||||
print(f"Removed header from: {file_path}")
|
|
||||||
headers_added += 1 # Reusing counter for modifications
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
print(f"Error processing file {file_path}: {e}")
|
|
||||||
|
|
||||||
print("\n--- Scan Complete ---")
|
|
||||||
print(f"Total .ts or .tsx files found: {files_processed}")
|
|
||||||
print(f"Files modified (headers added/removed): {headers_added}")
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
# Get the target directory from the command line arguments.
|
|
||||||
# If no directory is provided, it uses the current directory ('.').
|
|
||||||
if len(sys.argv) > 1:
|
|
||||||
target_directory = sys.argv[1]
|
|
||||||
else:
|
|
||||||
target_directory = '.' # Default to current directory
|
|
||||||
|
|
||||||
if not os.path.isdir(target_directory):
|
|
||||||
print(f"Error: Directory '{target_directory}' not found.")
|
|
||||||
sys.exit(1)
|
|
||||||
|
|
||||||
process_directory(os.path.abspath(target_directory))
|
|
||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Въведете конфигурационния токен от сървърната конзола.",
|
"setupTokenDescription": "Въведете конфигурационния токен от сървърната конзола.",
|
||||||
"setupTokenRequired": "Необходим е конфигурационен токен",
|
"setupTokenRequired": "Необходим е конфигурационен токен",
|
||||||
"actionUpdateSite": "Актуализиране на сайт",
|
"actionUpdateSite": "Актуализиране на сайт",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Изброяване на позволените роли за сайта",
|
"actionListSiteRoles": "Изброяване на позволените роли за сайта",
|
||||||
"actionCreateResource": "Създаване на ресурс",
|
"actionCreateResource": "Създаване на ресурс",
|
||||||
"actionDeleteResource": "Изтриване на ресурс",
|
"actionDeleteResource": "Изтриване на ресурс",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Zadejte nastavovací token z konzole serveru.",
|
"setupTokenDescription": "Zadejte nastavovací token z konzole serveru.",
|
||||||
"setupTokenRequired": "Je vyžadován token nastavení",
|
"setupTokenRequired": "Je vyžadován token nastavení",
|
||||||
"actionUpdateSite": "Aktualizovat stránku",
|
"actionUpdateSite": "Aktualizovat stránku",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Seznam povolených rolí webu",
|
"actionListSiteRoles": "Seznam povolených rolí webu",
|
||||||
"actionCreateResource": "Vytvořit zdroj",
|
"actionCreateResource": "Vytvořit zdroj",
|
||||||
"actionDeleteResource": "Odstranit dokument",
|
"actionDeleteResource": "Odstranit dokument",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Geben Sie das Setup-Token von der Serverkonsole ein.",
|
"setupTokenDescription": "Geben Sie das Setup-Token von der Serverkonsole ein.",
|
||||||
"setupTokenRequired": "Setup-Token ist erforderlich",
|
"setupTokenRequired": "Setup-Token ist erforderlich",
|
||||||
"actionUpdateSite": "Standorte aktualisieren",
|
"actionUpdateSite": "Standorte aktualisieren",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Erlaubte Standort-Rollen auflisten",
|
"actionListSiteRoles": "Erlaubte Standort-Rollen auflisten",
|
||||||
"actionCreateResource": "Ressource erstellen",
|
"actionCreateResource": "Ressource erstellen",
|
||||||
"actionDeleteResource": "Ressource löschen",
|
"actionDeleteResource": "Ressource löschen",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Ingrese el token de configuración desde la consola del servidor.",
|
"setupTokenDescription": "Ingrese el token de configuración desde la consola del servidor.",
|
||||||
"setupTokenRequired": "Se requiere el token de configuración",
|
"setupTokenRequired": "Se requiere el token de configuración",
|
||||||
"actionUpdateSite": "Actualizar sitio",
|
"actionUpdateSite": "Actualizar sitio",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Lista de roles permitidos del sitio",
|
"actionListSiteRoles": "Lista de roles permitidos del sitio",
|
||||||
"actionCreateResource": "Crear Recurso",
|
"actionCreateResource": "Crear Recurso",
|
||||||
"actionDeleteResource": "Eliminar Recurso",
|
"actionDeleteResource": "Eliminar Recurso",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Entrez le jeton de configuration depuis la console du serveur.",
|
"setupTokenDescription": "Entrez le jeton de configuration depuis la console du serveur.",
|
||||||
"setupTokenRequired": "Le jeton de configuration est requis.",
|
"setupTokenRequired": "Le jeton de configuration est requis.",
|
||||||
"actionUpdateSite": "Mettre à jour un site",
|
"actionUpdateSite": "Mettre à jour un site",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Lister les rôles autorisés du site",
|
"actionListSiteRoles": "Lister les rôles autorisés du site",
|
||||||
"actionCreateResource": "Créer une ressource",
|
"actionCreateResource": "Créer une ressource",
|
||||||
"actionDeleteResource": "Supprimer une ressource",
|
"actionDeleteResource": "Supprimer une ressource",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Inserisci il token di configurazione dalla console del server.",
|
"setupTokenDescription": "Inserisci il token di configurazione dalla console del server.",
|
||||||
"setupTokenRequired": "Il token di configurazione è richiesto",
|
"setupTokenRequired": "Il token di configurazione è richiesto",
|
||||||
"actionUpdateSite": "Aggiorna Sito",
|
"actionUpdateSite": "Aggiorna Sito",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Elenca Ruoli Sito Consentiti",
|
"actionListSiteRoles": "Elenca Ruoli Sito Consentiti",
|
||||||
"actionCreateResource": "Crea Risorsa",
|
"actionCreateResource": "Crea Risorsa",
|
||||||
"actionDeleteResource": "Elimina Risorsa",
|
"actionDeleteResource": "Elimina Risorsa",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "서버 콘솔에서 설정 토큰 입력.",
|
"setupTokenDescription": "서버 콘솔에서 설정 토큰 입력.",
|
||||||
"setupTokenRequired": "설정 토큰이 필요합니다",
|
"setupTokenRequired": "설정 토큰이 필요합니다",
|
||||||
"actionUpdateSite": "사이트 업데이트",
|
"actionUpdateSite": "사이트 업데이트",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "허용된 사이트 역할 목록",
|
"actionListSiteRoles": "허용된 사이트 역할 목록",
|
||||||
"actionCreateResource": "리소스 생성",
|
"actionCreateResource": "리소스 생성",
|
||||||
"actionDeleteResource": "리소스 삭제",
|
"actionDeleteResource": "리소스 삭제",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Skriv inn oppsetttoken fra serverkonsollen.",
|
"setupTokenDescription": "Skriv inn oppsetttoken fra serverkonsollen.",
|
||||||
"setupTokenRequired": "Oppsetttoken er nødvendig",
|
"setupTokenRequired": "Oppsetttoken er nødvendig",
|
||||||
"actionUpdateSite": "Oppdater område",
|
"actionUpdateSite": "Oppdater område",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "List opp tillatte områderoller",
|
"actionListSiteRoles": "List opp tillatte områderoller",
|
||||||
"actionCreateResource": "Opprett ressurs",
|
"actionCreateResource": "Opprett ressurs",
|
||||||
"actionDeleteResource": "Slett ressurs",
|
"actionDeleteResource": "Slett ressurs",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Voer het setup-token in vanaf de serverconsole.",
|
"setupTokenDescription": "Voer het setup-token in vanaf de serverconsole.",
|
||||||
"setupTokenRequired": "Setup-token is vereist",
|
"setupTokenRequired": "Setup-token is vereist",
|
||||||
"actionUpdateSite": "Site bijwerken",
|
"actionUpdateSite": "Site bijwerken",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Toon toegestane sitenollen",
|
"actionListSiteRoles": "Toon toegestane sitenollen",
|
||||||
"actionCreateResource": "Bron maken",
|
"actionCreateResource": "Bron maken",
|
||||||
"actionDeleteResource": "Document verwijderen",
|
"actionDeleteResource": "Document verwijderen",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Wprowadź token konfiguracji z konsoli serwera.",
|
"setupTokenDescription": "Wprowadź token konfiguracji z konsoli serwera.",
|
||||||
"setupTokenRequired": "Wymagany jest token konfiguracji",
|
"setupTokenRequired": "Wymagany jest token konfiguracji",
|
||||||
"actionUpdateSite": "Aktualizuj witrynę",
|
"actionUpdateSite": "Aktualizuj witrynę",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Lista dozwolonych ról witryny",
|
"actionListSiteRoles": "Lista dozwolonych ról witryny",
|
||||||
"actionCreateResource": "Utwórz zasób",
|
"actionCreateResource": "Utwórz zasób",
|
||||||
"actionDeleteResource": "Usuń zasób",
|
"actionDeleteResource": "Usuń zasób",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Digite o token de configuração do console do servidor.",
|
"setupTokenDescription": "Digite o token de configuração do console do servidor.",
|
||||||
"setupTokenRequired": "Token de configuração é necessário",
|
"setupTokenRequired": "Token de configuração é necessário",
|
||||||
"actionUpdateSite": "Atualizar Site",
|
"actionUpdateSite": "Atualizar Site",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Listar Funções Permitidas do Site",
|
"actionListSiteRoles": "Listar Funções Permitidas do Site",
|
||||||
"actionCreateResource": "Criar Recurso",
|
"actionCreateResource": "Criar Recurso",
|
||||||
"actionDeleteResource": "Eliminar Recurso",
|
"actionDeleteResource": "Eliminar Recurso",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Введите токен настройки из консоли сервера.",
|
"setupTokenDescription": "Введите токен настройки из консоли сервера.",
|
||||||
"setupTokenRequired": "Токен настройки обязателен",
|
"setupTokenRequired": "Токен настройки обязателен",
|
||||||
"actionUpdateSite": "Обновить сайт",
|
"actionUpdateSite": "Обновить сайт",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "Список разрешенных ролей сайта",
|
"actionListSiteRoles": "Список разрешенных ролей сайта",
|
||||||
"actionCreateResource": "Создать ресурс",
|
"actionCreateResource": "Создать ресурс",
|
||||||
"actionDeleteResource": "Удалить ресурс",
|
"actionDeleteResource": "Удалить ресурс",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "Sunucu konsolundan kurulum simgesini girin.",
|
"setupTokenDescription": "Sunucu konsolundan kurulum simgesini girin.",
|
||||||
"setupTokenRequired": "Kurulum simgesi gerekli",
|
"setupTokenRequired": "Kurulum simgesi gerekli",
|
||||||
"actionUpdateSite": "Siteyi Güncelle",
|
"actionUpdateSite": "Siteyi Güncelle",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "İzin Verilen Site Rolleri Listele",
|
"actionListSiteRoles": "İzin Verilen Site Rolleri Listele",
|
||||||
"actionCreateResource": "Kaynak Oluştur",
|
"actionCreateResource": "Kaynak Oluştur",
|
||||||
"actionDeleteResource": "Kaynağı Sil",
|
"actionDeleteResource": "Kaynağı Sil",
|
||||||
|
|||||||
@@ -1119,6 +1119,7 @@
|
|||||||
"setupTokenDescription": "从服务器控制台输入设置令牌。",
|
"setupTokenDescription": "从服务器控制台输入设置令牌。",
|
||||||
"setupTokenRequired": "需要设置令牌",
|
"setupTokenRequired": "需要设置令牌",
|
||||||
"actionUpdateSite": "更新站点",
|
"actionUpdateSite": "更新站点",
|
||||||
|
"actionResetSiteBandwidth": "Reset Organization Bandwidth",
|
||||||
"actionListSiteRoles": "允许站点角色列表",
|
"actionListSiteRoles": "允许站点角色列表",
|
||||||
"actionCreateResource": "创建资源",
|
"actionCreateResource": "创建资源",
|
||||||
"actionDeleteResource": "删除资源",
|
"actionDeleteResource": "删除资源",
|
||||||
|
|||||||
@@ -1,10 +1,8 @@
|
|||||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||||
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
|
||||||
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
||||||
|
|
||||||
async function cleanup() {
|
async function cleanup() {
|
||||||
await stopPingAccumulator();
|
|
||||||
await flushBandwidthToDb();
|
await flushBandwidthToDb();
|
||||||
await flushSiteBandwidthToDb();
|
await flushSiteBandwidthToDb();
|
||||||
await wsCleanup();
|
await wsCleanup();
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
||||||
|
import { Pool } from "pg";
|
||||||
import { readConfigFile } from "@server/lib/readConfigFile";
|
import { readConfigFile } from "@server/lib/readConfigFile";
|
||||||
import { withReplicas } from "drizzle-orm/pg-core";
|
import { withReplicas } from "drizzle-orm/pg-core";
|
||||||
import { createPool } from "./poolConfig";
|
|
||||||
|
|
||||||
function createDb() {
|
function createDb() {
|
||||||
const config = readConfigFile();
|
const config = readConfigFile();
|
||||||
@@ -39,17 +39,12 @@ function createDb() {
|
|||||||
|
|
||||||
// Create connection pools instead of individual connections
|
// Create connection pools instead of individual connections
|
||||||
const poolConfig = config.postgres.pool;
|
const poolConfig = config.postgres.pool;
|
||||||
const maxConnections = poolConfig?.max_connections || 20;
|
const primaryPool = new Pool({
|
||||||
const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000;
|
|
||||||
const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000;
|
|
||||||
|
|
||||||
const primaryPool = createPool(
|
|
||||||
connectionString,
|
connectionString,
|
||||||
maxConnections,
|
max: poolConfig?.max_connections || 20,
|
||||||
idleTimeoutMs,
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
connectionTimeoutMs,
|
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
|
||||||
"primary"
|
});
|
||||||
);
|
|
||||||
|
|
||||||
const replicas = [];
|
const replicas = [];
|
||||||
|
|
||||||
@@ -60,16 +55,14 @@ function createDb() {
|
|||||||
})
|
})
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
const maxReplicaConnections =
|
|
||||||
poolConfig?.max_replica_connections || 20;
|
|
||||||
for (const conn of replicaConnections) {
|
for (const conn of replicaConnections) {
|
||||||
const replicaPool = createPool(
|
const replicaPool = new Pool({
|
||||||
conn.connection_string,
|
connectionString: conn.connection_string,
|
||||||
maxReplicaConnections,
|
max: poolConfig?.max_replica_connections || 20,
|
||||||
idleTimeoutMs,
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
connectionTimeoutMs,
|
connectionTimeoutMillis:
|
||||||
"replica"
|
poolConfig?.connection_timeout_ms || 5000
|
||||||
);
|
});
|
||||||
replicas.push(
|
replicas.push(
|
||||||
DrizzlePostgres(replicaPool, {
|
DrizzlePostgres(replicaPool, {
|
||||||
logger: process.env.QUERY_LOGGING == "true"
|
logger: process.env.QUERY_LOGGING == "true"
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
||||||
|
import { Pool } from "pg";
|
||||||
import { readConfigFile } from "@server/lib/readConfigFile";
|
import { readConfigFile } from "@server/lib/readConfigFile";
|
||||||
import { withReplicas } from "drizzle-orm/pg-core";
|
import { withReplicas } from "drizzle-orm/pg-core";
|
||||||
import { build } from "@server/build";
|
import { build } from "@server/build";
|
||||||
import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver";
|
import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver";
|
||||||
import { createPool } from "./poolConfig";
|
|
||||||
|
|
||||||
function createLogsDb() {
|
function createLogsDb() {
|
||||||
// Only use separate logs database in SaaS builds
|
// Only use separate logs database in SaaS builds
|
||||||
@@ -42,17 +42,12 @@ function createLogsDb() {
|
|||||||
|
|
||||||
// Create separate connection pool for logs database
|
// Create separate connection pool for logs database
|
||||||
const poolConfig = logsConfig?.pool || config.postgres?.pool;
|
const poolConfig = logsConfig?.pool || config.postgres?.pool;
|
||||||
const maxConnections = poolConfig?.max_connections || 20;
|
const primaryPool = new Pool({
|
||||||
const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000;
|
|
||||||
const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000;
|
|
||||||
|
|
||||||
const primaryPool = createPool(
|
|
||||||
connectionString,
|
connectionString,
|
||||||
maxConnections,
|
max: poolConfig?.max_connections || 20,
|
||||||
idleTimeoutMs,
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
connectionTimeoutMs,
|
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
|
||||||
"logs-primary"
|
});
|
||||||
);
|
|
||||||
|
|
||||||
const replicas = [];
|
const replicas = [];
|
||||||
|
|
||||||
@@ -63,16 +58,14 @@ function createLogsDb() {
|
|||||||
})
|
})
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
const maxReplicaConnections =
|
|
||||||
poolConfig?.max_replica_connections || 20;
|
|
||||||
for (const conn of replicaConnections) {
|
for (const conn of replicaConnections) {
|
||||||
const replicaPool = createPool(
|
const replicaPool = new Pool({
|
||||||
conn.connection_string,
|
connectionString: conn.connection_string,
|
||||||
maxReplicaConnections,
|
max: poolConfig?.max_replica_connections || 20,
|
||||||
idleTimeoutMs,
|
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
||||||
connectionTimeoutMs,
|
connectionTimeoutMillis:
|
||||||
"logs-replica"
|
poolConfig?.connection_timeout_ms || 5000
|
||||||
);
|
});
|
||||||
replicas.push(
|
replicas.push(
|
||||||
DrizzlePostgres(replicaPool, {
|
DrizzlePostgres(replicaPool, {
|
||||||
logger: process.env.QUERY_LOGGING == "true"
|
logger: process.env.QUERY_LOGGING == "true"
|
||||||
|
|||||||
@@ -1,63 +0,0 @@
|
|||||||
import { Pool, PoolConfig } from "pg";
|
|
||||||
import logger from "@server/logger";
|
|
||||||
|
|
||||||
export function createPoolConfig(
|
|
||||||
connectionString: string,
|
|
||||||
maxConnections: number,
|
|
||||||
idleTimeoutMs: number,
|
|
||||||
connectionTimeoutMs: number
|
|
||||||
): PoolConfig {
|
|
||||||
return {
|
|
||||||
connectionString,
|
|
||||||
max: maxConnections,
|
|
||||||
idleTimeoutMillis: idleTimeoutMs,
|
|
||||||
connectionTimeoutMillis: connectionTimeoutMs,
|
|
||||||
// TCP keepalive to prevent silent connection drops by NAT gateways,
|
|
||||||
// load balancers, and other intermediate network devices (e.g. AWS
|
|
||||||
// NAT Gateway drops idle TCP connections after ~350s)
|
|
||||||
keepAlive: true,
|
|
||||||
keepAliveInitialDelayMillis: 10000, // send first keepalive after 10s of idle
|
|
||||||
// Allow connections to be released and recreated more aggressively
|
|
||||||
// to avoid stale connections building up
|
|
||||||
allowExitOnIdle: false
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
export function attachPoolErrorHandlers(pool: Pool, label: string): void {
|
|
||||||
pool.on("error", (err) => {
|
|
||||||
// This catches errors on idle clients in the pool. Without this
|
|
||||||
// handler an unexpected disconnect would crash the process.
|
|
||||||
logger.error(
|
|
||||||
`Unexpected error on idle ${label} database client: ${err.message}`
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
pool.on("connect", (client) => {
|
|
||||||
// Set a statement timeout on every new connection so a single slow
|
|
||||||
// query can't block the pool forever
|
|
||||||
client.query("SET statement_timeout = '30s'").catch((err: Error) => {
|
|
||||||
logger.warn(
|
|
||||||
`Failed to set statement_timeout on ${label} client: ${err.message}`
|
|
||||||
);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
export function createPool(
|
|
||||||
connectionString: string,
|
|
||||||
maxConnections: number,
|
|
||||||
idleTimeoutMs: number,
|
|
||||||
connectionTimeoutMs: number,
|
|
||||||
label: string
|
|
||||||
): Pool {
|
|
||||||
const pool = new Pool(
|
|
||||||
createPoolConfig(
|
|
||||||
connectionString,
|
|
||||||
maxConnections,
|
|
||||||
idleTimeoutMs,
|
|
||||||
connectionTimeoutMs
|
|
||||||
)
|
|
||||||
);
|
|
||||||
attachPoolErrorHandlers(pool, label);
|
|
||||||
return pool;
|
|
||||||
}
|
|
||||||
123
server/lib/ip.ts
123
server/lib/ip.ts
@@ -571,129 +571,6 @@ export function generateSubnetProxyTargets(
|
|||||||
return targets;
|
return targets;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type SubnetProxyTargetV2 = {
|
|
||||||
sourcePrefixes: string[]; // must be cidrs
|
|
||||||
destPrefix: string; // must be a cidr
|
|
||||||
disableIcmp?: boolean;
|
|
||||||
rewriteTo?: string; // must be a cidr
|
|
||||||
portRange?: {
|
|
||||||
min: number;
|
|
||||||
max: number;
|
|
||||||
protocol: "tcp" | "udp";
|
|
||||||
}[];
|
|
||||||
};
|
|
||||||
|
|
||||||
export function generateSubnetProxyTargetV2(
|
|
||||||
siteResource: SiteResource,
|
|
||||||
clients: {
|
|
||||||
clientId: number;
|
|
||||||
pubKey: string | null;
|
|
||||||
subnet: string | null;
|
|
||||||
}[]
|
|
||||||
): SubnetProxyTargetV2 | undefined {
|
|
||||||
if (clients.length === 0) {
|
|
||||||
logger.debug(
|
|
||||||
`No clients have access to site resource ${siteResource.siteResourceId}, skipping target generation.`
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let target: SubnetProxyTargetV2 | null = null;
|
|
||||||
|
|
||||||
const portRange = [
|
|
||||||
...parsePortRangeString(siteResource.tcpPortRangeString, "tcp"),
|
|
||||||
...parsePortRangeString(siteResource.udpPortRangeString, "udp")
|
|
||||||
];
|
|
||||||
const disableIcmp = siteResource.disableIcmp ?? false;
|
|
||||||
|
|
||||||
if (siteResource.mode == "host") {
|
|
||||||
let destination = siteResource.destination;
|
|
||||||
// check if this is a valid ip
|
|
||||||
const ipSchema = z.union([z.ipv4(), z.ipv6()]);
|
|
||||||
if (ipSchema.safeParse(destination).success) {
|
|
||||||
destination = `${destination}/32`;
|
|
||||||
|
|
||||||
target = {
|
|
||||||
sourcePrefixes: [],
|
|
||||||
destPrefix: destination,
|
|
||||||
portRange,
|
|
||||||
disableIcmp
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (siteResource.alias && siteResource.aliasAddress) {
|
|
||||||
// also push a match for the alias address
|
|
||||||
target = {
|
|
||||||
sourcePrefixes: [],
|
|
||||||
destPrefix: `${siteResource.aliasAddress}/32`,
|
|
||||||
rewriteTo: destination,
|
|
||||||
portRange,
|
|
||||||
disableIcmp
|
|
||||||
};
|
|
||||||
}
|
|
||||||
} else if (siteResource.mode == "cidr") {
|
|
||||||
target = {
|
|
||||||
sourcePrefixes: [],
|
|
||||||
destPrefix: siteResource.destination,
|
|
||||||
portRange,
|
|
||||||
disableIcmp
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!target) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const clientSite of clients) {
|
|
||||||
if (!clientSite.subnet) {
|
|
||||||
logger.debug(
|
|
||||||
`Client ${clientSite.clientId} has no subnet, skipping for site resource ${siteResource.siteResourceId}.`
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const clientPrefix = `${clientSite.subnet.split("/")[0]}/32`;
|
|
||||||
|
|
||||||
// add client prefix to source prefixes
|
|
||||||
target.sourcePrefixes.push(clientPrefix);
|
|
||||||
}
|
|
||||||
|
|
||||||
// print a nice representation of the targets
|
|
||||||
// logger.debug(
|
|
||||||
// `Generated subnet proxy targets for: ${JSON.stringify(targets, null, 2)}`
|
|
||||||
// );
|
|
||||||
|
|
||||||
return target;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Converts a SubnetProxyTargetV2 to an array of SubnetProxyTarget (v1)
|
|
||||||
* by expanding each source prefix into its own target entry.
|
|
||||||
* @param targetV2 - The v2 target to convert
|
|
||||||
* @returns Array of v1 SubnetProxyTarget objects
|
|
||||||
*/
|
|
||||||
export function convertSubnetProxyTargetsV2ToV1(
|
|
||||||
targetsV2: SubnetProxyTargetV2[]
|
|
||||||
): SubnetProxyTarget[] {
|
|
||||||
return targetsV2.flatMap((targetV2) =>
|
|
||||||
targetV2.sourcePrefixes.map((sourcePrefix) => ({
|
|
||||||
sourcePrefix,
|
|
||||||
destPrefix: targetV2.destPrefix,
|
|
||||||
...(targetV2.disableIcmp !== undefined && {
|
|
||||||
disableIcmp: targetV2.disableIcmp
|
|
||||||
}),
|
|
||||||
...(targetV2.rewriteTo !== undefined && {
|
|
||||||
rewriteTo: targetV2.rewriteTo
|
|
||||||
}),
|
|
||||||
...(targetV2.portRange !== undefined && {
|
|
||||||
portRange: targetV2.portRange
|
|
||||||
})
|
|
||||||
}))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Custom schema for validating port range strings
|
// Custom schema for validating port range strings
|
||||||
// Format: "80,443,8000-9000" or "*" for all ports, or empty string
|
// Format: "80,443,8000-9000" or "*" for all ports, or empty string
|
||||||
export const portRangeStringSchema = z
|
export const portRangeStringSchema = z
|
||||||
|
|||||||
@@ -302,8 +302,8 @@ export const configSchema = z
|
|||||||
.optional()
|
.optional()
|
||||||
.default({
|
.default({
|
||||||
block_size: 24,
|
block_size: 24,
|
||||||
subnet_group: "100.90.128.0/20",
|
subnet_group: "100.90.128.0/24",
|
||||||
utility_subnet_group: "100.96.128.0/20"
|
utility_subnet_group: "100.96.128.0/24"
|
||||||
}),
|
}),
|
||||||
rate_limits: z
|
rate_limits: z
|
||||||
.object({
|
.object({
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ import logger from "@server/logger";
|
|||||||
import {
|
import {
|
||||||
generateAliasConfig,
|
generateAliasConfig,
|
||||||
generateRemoteSubnets,
|
generateRemoteSubnets,
|
||||||
generateSubnetProxyTargetV2,
|
generateSubnetProxyTargets,
|
||||||
parseEndpoint,
|
parseEndpoint,
|
||||||
formatEndpoint
|
formatEndpoint
|
||||||
} from "@server/lib/ip";
|
} from "@server/lib/ip";
|
||||||
@@ -660,16 +660,19 @@ async function handleSubnetProxyTargetUpdates(
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (addedClients.length > 0) {
|
if (addedClients.length > 0) {
|
||||||
const targetToAdd = generateSubnetProxyTargetV2(
|
const targetsToAdd = generateSubnetProxyTargets(
|
||||||
siteResource,
|
siteResource,
|
||||||
addedClients
|
addedClients
|
||||||
);
|
);
|
||||||
|
|
||||||
if (targetToAdd) {
|
if (targetsToAdd.length > 0) {
|
||||||
|
logger.info(
|
||||||
|
`Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
||||||
|
);
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
addSubnetProxyTargets(
|
addSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
[targetToAdd],
|
targetsToAdd,
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -697,16 +700,19 @@ async function handleSubnetProxyTargetUpdates(
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (removedClients.length > 0) {
|
if (removedClients.length > 0) {
|
||||||
const targetToRemove = generateSubnetProxyTargetV2(
|
const targetsToRemove = generateSubnetProxyTargets(
|
||||||
siteResource,
|
siteResource,
|
||||||
removedClients
|
removedClients
|
||||||
);
|
);
|
||||||
|
|
||||||
if (targetToRemove) {
|
if (targetsToRemove.length > 0) {
|
||||||
|
logger.info(
|
||||||
|
`Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
||||||
|
);
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
removeSubnetProxyTargets(
|
removeSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
[targetToRemove],
|
targetsToRemove,
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -1163,7 +1169,7 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const resource of resources) {
|
for (const resource of resources) {
|
||||||
const target = generateSubnetProxyTargetV2(resource, [
|
const targets = generateSubnetProxyTargets(resource, [
|
||||||
{
|
{
|
||||||
clientId: client.clientId,
|
clientId: client.clientId,
|
||||||
pubKey: client.pubKey,
|
pubKey: client.pubKey,
|
||||||
@@ -1171,11 +1177,11 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (target) {
|
if (targets.length > 0) {
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
addSubnetProxyTargets(
|
addSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
[target],
|
targets,
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -1240,7 +1246,7 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const resource of resources) {
|
for (const resource of resources) {
|
||||||
const target = generateSubnetProxyTargetV2(resource, [
|
const targets = generateSubnetProxyTargets(resource, [
|
||||||
{
|
{
|
||||||
clientId: client.clientId,
|
clientId: client.clientId,
|
||||||
pubKey: client.pubKey,
|
pubKey: client.pubKey,
|
||||||
@@ -1248,11 +1254,11 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (target) {
|
if (targets.length > 0) {
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
removeSubnetProxyTargets(
|
removeSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
[target],
|
targets,
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,22 +0,0 @@
|
|||||||
/**
|
|
||||||
* Returns a cached plaintext token from Redis if one exists and decrypts
|
|
||||||
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
|
||||||
* encrypted value in Redis with the given TTL, and returns it.
|
|
||||||
*
|
|
||||||
* Failures at the Redis layer are non-fatal – the function always falls
|
|
||||||
* through to session creation so the caller is never blocked by a Redis outage.
|
|
||||||
*
|
|
||||||
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
|
||||||
* @param secret Server secret used for AES encryption/decryption
|
|
||||||
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
|
||||||
* @param createSession Factory that mints a new session and returns its raw token
|
|
||||||
*/
|
|
||||||
export async function getOrCreateCachedToken(
|
|
||||||
cacheKey: string,
|
|
||||||
secret: string,
|
|
||||||
ttlSeconds: number,
|
|
||||||
createSession: () => Promise<string>
|
|
||||||
): Promise<string> {
|
|
||||||
const token = await createSession();
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
@@ -15,10 +15,8 @@ import { rateLimitService } from "#private/lib/rateLimit";
|
|||||||
import { cleanup as wsCleanup } from "#private/routers/ws";
|
import { cleanup as wsCleanup } from "#private/routers/ws";
|
||||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||||
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
|
||||||
|
|
||||||
async function cleanup() {
|
async function cleanup() {
|
||||||
await stopPingAccumulator();
|
|
||||||
await flushBandwidthToDb();
|
await flushBandwidthToDb();
|
||||||
await flushSiteBandwidthToDb();
|
await flushSiteBandwidthToDb();
|
||||||
await rateLimitService.cleanup();
|
await rateLimitService.cleanup();
|
||||||
|
|||||||
@@ -1,16 +1,3 @@
|
|||||||
/*
|
|
||||||
* This file is part of a proprietary work.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2025 Fossorial, Inc.
|
|
||||||
* All rights reserved.
|
|
||||||
*
|
|
||||||
* This file is licensed under the Fossorial Commercial License.
|
|
||||||
* You may not use this file except in compliance with the License.
|
|
||||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
|
||||||
*
|
|
||||||
* This file is not licensed under the AGPLv3.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import NodeCache from "node-cache";
|
import NodeCache from "node-cache";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { redisManager } from "@server/private/lib/redis";
|
import { redisManager } from "@server/private/lib/redis";
|
||||||
|
|||||||
@@ -57,10 +57,7 @@ export const privateConfigSchema = z.object({
|
|||||||
.object({
|
.object({
|
||||||
host: z.string(),
|
host: z.string(),
|
||||||
port: portSchema,
|
port: portSchema,
|
||||||
password: z
|
password: z.string().optional(),
|
||||||
.string()
|
|
||||||
.optional()
|
|
||||||
.transform(getEnvOrYaml("REDIS_PASSWORD")),
|
|
||||||
db: z.int().nonnegative().optional().default(0),
|
db: z.int().nonnegative().optional().default(0),
|
||||||
replicas: z
|
replicas: z
|
||||||
.array(
|
.array(
|
||||||
|
|||||||
@@ -1,77 +0,0 @@
|
|||||||
/*
|
|
||||||
* This file is part of a proprietary work.
|
|
||||||
*
|
|
||||||
* Copyright (c) 2025 Fossorial, Inc.
|
|
||||||
* All rights reserved.
|
|
||||||
*
|
|
||||||
* This file is licensed under the Fossorial Commercial License.
|
|
||||||
* You may not use this file except in compliance with the License.
|
|
||||||
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
|
||||||
*
|
|
||||||
* This file is not licensed under the AGPLv3.
|
|
||||||
*/
|
|
||||||
|
|
||||||
import redisManager from "#private/lib/redis";
|
|
||||||
import { encrypt, decrypt } from "@server/lib/crypto";
|
|
||||||
import logger from "@server/logger";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns a cached plaintext token from Redis if one exists and decrypts
|
|
||||||
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
|
||||||
* encrypted value in Redis with the given TTL, and returns it.
|
|
||||||
*
|
|
||||||
* Failures at the Redis layer are non-fatal – the function always falls
|
|
||||||
* through to session creation so the caller is never blocked by a Redis outage.
|
|
||||||
*
|
|
||||||
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
|
||||||
* @param secret Server secret used for AES encryption/decryption
|
|
||||||
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
|
||||||
* @param createSession Factory that mints a new session and returns its raw token
|
|
||||||
*/
|
|
||||||
export async function getOrCreateCachedToken(
|
|
||||||
cacheKey: string,
|
|
||||||
secret: string,
|
|
||||||
ttlSeconds: number,
|
|
||||||
createSession: () => Promise<string>
|
|
||||||
): Promise<string> {
|
|
||||||
if (redisManager.isRedisEnabled()) {
|
|
||||||
try {
|
|
||||||
const cached = await redisManager.get(cacheKey);
|
|
||||||
if (cached) {
|
|
||||||
const token = decrypt(cached, secret);
|
|
||||||
if (token) {
|
|
||||||
logger.debug(`Token cache hit for key: ${cacheKey}`);
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
// Decryption produced an empty string – treat as a miss
|
|
||||||
logger.warn(
|
|
||||||
`Token cache decryption returned empty string for key: ${cacheKey}, treating as miss`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
} catch (e) {
|
|
||||||
logger.warn(
|
|
||||||
`Token cache read/decrypt failed for key ${cacheKey}, falling through to session creation:`,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const token = await createSession();
|
|
||||||
|
|
||||||
if (redisManager.isRedisEnabled()) {
|
|
||||||
try {
|
|
||||||
const encrypted = encrypt(token, secret);
|
|
||||||
await redisManager.set(cacheKey, encrypted, ttlSeconds);
|
|
||||||
logger.debug(
|
|
||||||
`Token cached in Redis for key: ${cacheKey} (TTL ${ttlSeconds}s)`
|
|
||||||
);
|
|
||||||
} catch (e) {
|
|
||||||
logger.warn(
|
|
||||||
`Token cache write failed for key ${cacheKey} (session was still created):`,
|
|
||||||
e
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
@@ -23,10 +23,8 @@ import { z } from "zod";
|
|||||||
import { fromError } from "zod-validation-error";
|
import { fromError } from "zod-validation-error";
|
||||||
import {
|
import {
|
||||||
createRemoteExitNodeSession,
|
createRemoteExitNodeSession,
|
||||||
validateRemoteExitNodeSessionToken,
|
validateRemoteExitNodeSessionToken
|
||||||
EXPIRES
|
|
||||||
} from "#private/auth/sessions/remoteExitNode";
|
} from "#private/auth/sessions/remoteExitNode";
|
||||||
import { getOrCreateCachedToken } from "@server/private/lib/tokenCache";
|
|
||||||
import { verifyPassword } from "@server/auth/password";
|
import { verifyPassword } from "@server/auth/password";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
@@ -105,22 +103,13 @@ export async function getRemoteExitNodeToken(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a cached token if one exists to prevent thundering herd on
|
const resToken = generateSessionToken();
|
||||||
// simultaneous restarts; falls back to creating a fresh session when
|
|
||||||
// Redis is unavailable or the cache has expired.
|
|
||||||
const resToken = await getOrCreateCachedToken(
|
|
||||||
`remote_exit_node:token_cache:${existingRemoteExitNode.remoteExitNodeId}`,
|
|
||||||
config.getRawConfig().server.secret!,
|
|
||||||
Math.floor(EXPIRES / 1000),
|
|
||||||
async () => {
|
|
||||||
const token = generateSessionToken();
|
|
||||||
await createRemoteExitNodeSession(
|
await createRemoteExitNodeSession(
|
||||||
token,
|
resToken,
|
||||||
existingRemoteExitNode.remoteExitNodeId
|
existingRemoteExitNode.remoteExitNodeId
|
||||||
);
|
);
|
||||||
return token;
|
|
||||||
}
|
// logger.debug(`Created RemoteExitNode token response: ${JSON.stringify(resToken)}`);
|
||||||
);
|
|
||||||
|
|
||||||
return response<{ token: string }>(res, {
|
return response<{ token: string }>(res, {
|
||||||
data: {
|
data: {
|
||||||
|
|||||||
@@ -24,7 +24,6 @@ import {
|
|||||||
sites,
|
sites,
|
||||||
userOrgs
|
userOrgs
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import { logAccessAudit } from "#private/lib/logAccessAudit";
|
|
||||||
import { isLicensedOrSubscribed } from "#private/lib/isLicencedOrSubscribed";
|
import { isLicensedOrSubscribed } from "#private/lib/isLicencedOrSubscribed";
|
||||||
import { tierMatrix } from "@server/lib/billing/tierMatrix";
|
import { tierMatrix } from "@server/lib/billing/tierMatrix";
|
||||||
import response from "@server/lib/response";
|
import response from "@server/lib/response";
|
||||||
@@ -464,24 +463,6 @@ export async function signSshKey(
|
|||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
await logAccessAudit({
|
|
||||||
action: true,
|
|
||||||
type: "ssh",
|
|
||||||
orgId: orgId,
|
|
||||||
resourceId: resource.siteResourceId,
|
|
||||||
user: req.user
|
|
||||||
? { username: req.user.username ?? "", userId: req.user.userId }
|
|
||||||
: undefined,
|
|
||||||
metadata: {
|
|
||||||
resourceName: resource.name,
|
|
||||||
siteId: resource.siteId,
|
|
||||||
sshUsername: usernameToUse,
|
|
||||||
sshHost: sshHost
|
|
||||||
},
|
|
||||||
userAgent: req.headers["user-agent"],
|
|
||||||
requestIp: req.ip
|
|
||||||
});
|
|
||||||
|
|
||||||
return response<SignSshKeyResponse>(res, {
|
return response<SignSshKeyResponse>(res, {
|
||||||
data: {
|
data: {
|
||||||
certificate: cert.certificate,
|
certificate: cert.certificate,
|
||||||
|
|||||||
@@ -19,14 +19,17 @@ import { Socket } from "net";
|
|||||||
import {
|
import {
|
||||||
Newt,
|
Newt,
|
||||||
newts,
|
newts,
|
||||||
Olm,
|
NewtSession,
|
||||||
olms,
|
olms,
|
||||||
|
Olm,
|
||||||
|
OlmSession,
|
||||||
RemoteExitNode,
|
RemoteExitNode,
|
||||||
|
RemoteExitNodeSession,
|
||||||
remoteExitNodes,
|
remoteExitNodes,
|
||||||
|
sites
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
|
||||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
@@ -194,7 +197,11 @@ const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
|
|||||||
// Config version tracking map (local to this node, resets on server restart)
|
// Config version tracking map (local to this node, resets on server restart)
|
||||||
const clientConfigVersions: Map<string, number> = new Map();
|
const clientConfigVersions: Map<string, number> = new Map();
|
||||||
|
|
||||||
|
// Tracks the last Unix timestamp (seconds) at which a ping was flushed to the
|
||||||
|
// DB for a given siteId. Resets on server restart which is fine – the first
|
||||||
|
// ping after startup will always write, re-establishing the online state.
|
||||||
|
const lastPingDbWrite: Map<number, number> = new Map();
|
||||||
|
const PING_DB_WRITE_INTERVAL = 45; // seconds
|
||||||
|
|
||||||
// Recovery tracking
|
// Recovery tracking
|
||||||
let isRedisRecoveryInProgress = false;
|
let isRedisRecoveryInProgress = false;
|
||||||
@@ -846,16 +853,32 @@ const setupConnection = async (
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Handle WebSocket protocol-level pings from older newt clients that do
|
||||||
|
// not send application-level "newt/ping" messages. Update the site's
|
||||||
|
// online state and lastPing timestamp so the offline checker treats them
|
||||||
|
// the same as modern newt clients.
|
||||||
if (clientType === "newt") {
|
if (clientType === "newt") {
|
||||||
const newtClient = client as Newt;
|
const newtClient = client as Newt;
|
||||||
ws.on("ping", () => {
|
ws.on("ping", async () => {
|
||||||
if (!newtClient.siteId) return;
|
if (!newtClient.siteId) return;
|
||||||
// Record the ping in the accumulator instead of writing to the
|
const now = Math.floor(Date.now() / 1000);
|
||||||
// database on every WS ping frame. The accumulator flushes all
|
const lastWrite = lastPingDbWrite.get(newtClient.siteId) ?? 0;
|
||||||
// pending pings in a single batched UPDATE every ~10s, which
|
if (now - lastWrite < PING_DB_WRITE_INTERVAL) return;
|
||||||
// prevents connection pool exhaustion under load (especially
|
lastPingDbWrite.set(newtClient.siteId, now);
|
||||||
// with cross-region latency to the database).
|
try {
|
||||||
recordPing(newtClient.siteId);
|
await db
|
||||||
|
.update(sites)
|
||||||
|
.set({
|
||||||
|
online: true,
|
||||||
|
lastPing: now
|
||||||
|
})
|
||||||
|
.where(eq(sites.siteId, newtClient.siteId));
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
"Error updating newt site online state on WS ping",
|
||||||
|
{ error }
|
||||||
|
);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,54 +1,15 @@
|
|||||||
import { sendToClient } from "#dynamic/routers/ws";
|
import { sendToClient } from "#dynamic/routers/ws";
|
||||||
import { db, newts, olms } from "@server/db";
|
import { db, olms, Transaction } from "@server/db";
|
||||||
import {
|
|
||||||
Alias,
|
|
||||||
convertSubnetProxyTargetsV2ToV1,
|
|
||||||
SubnetProxyTarget,
|
|
||||||
SubnetProxyTargetV2
|
|
||||||
} from "@server/lib/ip";
|
|
||||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
import { Alias, SubnetProxyTarget } from "@server/lib/ip";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import semver from "semver";
|
|
||||||
|
|
||||||
const NEWT_V2_TARGETS_VERSION = ">=1.10.3";
|
|
||||||
|
|
||||||
export async function convertTargetsIfNessicary(
|
|
||||||
newtId: string,
|
|
||||||
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[]
|
|
||||||
) {
|
|
||||||
// get the newt
|
|
||||||
const [newt] = await db
|
|
||||||
.select()
|
|
||||||
.from(newts)
|
|
||||||
.where(eq(newts.newtId, newtId));
|
|
||||||
if (!newt) {
|
|
||||||
throw new Error(`No newt found for id: ${newtId}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
// check the semver
|
|
||||||
if (
|
|
||||||
newt.version &&
|
|
||||||
!semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION)
|
|
||||||
) {
|
|
||||||
logger.debug(
|
|
||||||
`addTargets Newt version ${newt.version} does not support targets v2 falling back`
|
|
||||||
);
|
|
||||||
targets = convertSubnetProxyTargetsV2ToV1(
|
|
||||||
targets as SubnetProxyTargetV2[]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
return targets;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function addTargets(
|
export async function addTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[],
|
targets: SubnetProxyTarget[],
|
||||||
version?: string | null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
targets = await convertTargetsIfNessicary(newtId, targets);
|
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
@@ -61,11 +22,9 @@ export async function addTargets(
|
|||||||
|
|
||||||
export async function removeTargets(
|
export async function removeTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[],
|
targets: SubnetProxyTarget[],
|
||||||
version?: string | null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
targets = await convertTargetsIfNessicary(newtId, targets);
|
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
@@ -79,39 +38,11 @@ export async function removeTargets(
|
|||||||
export async function updateTargets(
|
export async function updateTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: {
|
targets: {
|
||||||
oldTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
|
oldTargets: SubnetProxyTarget[];
|
||||||
newTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
|
newTargets: SubnetProxyTarget[];
|
||||||
},
|
},
|
||||||
version?: string | null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
// get the newt
|
|
||||||
const [newt] = await db
|
|
||||||
.select()
|
|
||||||
.from(newts)
|
|
||||||
.where(eq(newts.newtId, newtId));
|
|
||||||
if (!newt) {
|
|
||||||
logger.error(`addTargetsL No newt found for id: ${newtId}`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check the semver
|
|
||||||
if (
|
|
||||||
newt.version &&
|
|
||||||
!semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION)
|
|
||||||
) {
|
|
||||||
logger.debug(
|
|
||||||
`addTargets Newt version ${newt.version} does not support targets v2 falling back`
|
|
||||||
);
|
|
||||||
targets = {
|
|
||||||
oldTargets: convertSubnetProxyTargetsV2ToV1(
|
|
||||||
targets.oldTargets as SubnetProxyTargetV2[]
|
|
||||||
),
|
|
||||||
newTargets: convertSubnetProxyTargetsV2ToV1(
|
|
||||||
targets.newTargets as SubnetProxyTargetV2[]
|
|
||||||
)
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -16,8 +16,8 @@ import { eq, and } from "drizzle-orm";
|
|||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
import {
|
import {
|
||||||
formatEndpoint,
|
formatEndpoint,
|
||||||
generateSubnetProxyTargetV2,
|
generateSubnetProxyTargets,
|
||||||
SubnetProxyTargetV2
|
SubnetProxyTarget
|
||||||
} from "@server/lib/ip";
|
} from "@server/lib/ip";
|
||||||
|
|
||||||
export async function buildClientConfigurationForNewtClient(
|
export async function buildClientConfigurationForNewtClient(
|
||||||
@@ -143,7 +143,7 @@ export async function buildClientConfigurationForNewtClient(
|
|||||||
.from(siteResources)
|
.from(siteResources)
|
||||||
.where(eq(siteResources.siteId, siteId));
|
.where(eq(siteResources.siteId, siteId));
|
||||||
|
|
||||||
const targetsToSend: SubnetProxyTargetV2[] = [];
|
const targetsToSend: SubnetProxyTarget[] = [];
|
||||||
|
|
||||||
for (const resource of allSiteResources) {
|
for (const resource of allSiteResources) {
|
||||||
// Get clients associated with this specific resource
|
// Get clients associated with this specific resource
|
||||||
@@ -168,14 +168,12 @@ export async function buildClientConfigurationForNewtClient(
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
const resourceTarget = generateSubnetProxyTargetV2(
|
const resourceTargets = generateSubnetProxyTargets(
|
||||||
resource,
|
resource,
|
||||||
resourceClients
|
resourceClients
|
||||||
);
|
);
|
||||||
|
|
||||||
if (resourceTarget) {
|
targetsToSend.push(...resourceTargets);
|
||||||
targetsToSend.push(resourceTarget);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -1,8 +1,6 @@
|
|||||||
import { generateSessionToken } from "@server/auth/sessions/app";
|
import { generateSessionToken } from "@server/auth/sessions/app";
|
||||||
import { db, newtSessions } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { newts } from "@server/db";
|
import { newts } from "@server/db";
|
||||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
|
||||||
import { EXPIRES } from "@server/auth/sessions/newt";
|
|
||||||
import HttpCode from "@server/types/HttpCode";
|
import HttpCode from "@server/types/HttpCode";
|
||||||
import response from "@server/lib/response";
|
import response from "@server/lib/response";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
@@ -94,19 +92,8 @@ export async function getNewtToken(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return a cached token if one exists to prevent thundering herd on
|
const resToken = generateSessionToken();
|
||||||
// simultaneous restarts; falls back to creating a fresh session when
|
await createNewtSession(resToken, existingNewt.newtId);
|
||||||
// Redis is unavailable or the cache has expired.
|
|
||||||
const resToken = await getOrCreateCachedToken(
|
|
||||||
`newt:token_cache:${existingNewt.newtId}`,
|
|
||||||
config.getRawConfig().server.secret!,
|
|
||||||
Math.floor(EXPIRES / 1000),
|
|
||||||
async () => {
|
|
||||||
const token = generateSessionToken();
|
|
||||||
await createNewtSession(token, existingNewt.newtId);
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
return response<{ token: string; serverVersion: string }>(res, {
|
return response<{ token: string; serverVersion: string }>(res, {
|
||||||
data: {
|
data: {
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db";
|
|||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
||||||
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
||||||
import { convertTargetsIfNessicary } from "../client/targets";
|
|
||||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
|
||||||
const inputSchema = z.object({
|
const inputSchema = z.object({
|
||||||
@@ -128,15 +127,13 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
|||||||
exitNode
|
exitNode
|
||||||
);
|
);
|
||||||
|
|
||||||
const targetsToSend = await convertTargetsIfNessicary(newt.newtId, targets);
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
message: {
|
message: {
|
||||||
type: "newt/wg/receive-config",
|
type: "newt/wg/receive-config",
|
||||||
data: {
|
data: {
|
||||||
ipAddress: site.address,
|
ipAddress: site.address,
|
||||||
peers,
|
peers,
|
||||||
targets: targetsToSend
|
targets
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
options: {
|
options: {
|
||||||
|
|||||||
@@ -5,7 +5,6 @@ import { Newt } from "@server/db";
|
|||||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { sendNewtSyncMessage } from "./sync";
|
import { sendNewtSyncMessage } from "./sync";
|
||||||
import { recordPing } from "./pingAccumulator";
|
|
||||||
|
|
||||||
// Track if the offline checker interval is running
|
// Track if the offline checker interval is running
|
||||||
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||||
@@ -115,12 +114,18 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record the ping in memory; it will be flushed to the database
|
try {
|
||||||
// periodically by the ping accumulator (every ~10s) in a single
|
// Mark the site as online and record the ping timestamp.
|
||||||
// batched UPDATE instead of one query per ping. This prevents
|
await db
|
||||||
// connection pool exhaustion under load, especially with
|
.update(sites)
|
||||||
// cross-region latency to the database.
|
.set({
|
||||||
recordPing(newt.siteId);
|
online: true,
|
||||||
|
lastPing: Math.floor(Date.now() / 1000)
|
||||||
|
})
|
||||||
|
.where(eq(sites.siteId, newt.siteId));
|
||||||
|
} catch (error) {
|
||||||
|
logger.error("Error updating online state on newt ping", { error });
|
||||||
|
}
|
||||||
|
|
||||||
// Check config version and sync if stale.
|
// Check config version and sync if stale.
|
||||||
const configVersion = await getClientConfigVersion(newt.newtId);
|
const configVersion = await getClientConfigVersion(newt.newtId);
|
||||||
|
|||||||
@@ -1,382 +0,0 @@
|
|||||||
import { db } from "@server/db";
|
|
||||||
import { sites, clients, olms } from "@server/db";
|
|
||||||
import { eq, inArray } from "drizzle-orm";
|
|
||||||
import logger from "@server/logger";
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Ping Accumulator
|
|
||||||
*
|
|
||||||
* Instead of writing to the database on every single newt/olm ping (which
|
|
||||||
* causes pool exhaustion under load, especially with cross-region latency),
|
|
||||||
* we accumulate pings in memory and flush them to the database periodically
|
|
||||||
* in a single batch.
|
|
||||||
*
|
|
||||||
* This is the same pattern used for bandwidth flushing in
|
|
||||||
* receiveBandwidth.ts and handleReceiveBandwidthMessage.ts.
|
|
||||||
*
|
|
||||||
* Supports two kinds of pings:
|
|
||||||
* - **Site pings** (from newts): update `sites.online` and `sites.lastPing`
|
|
||||||
* - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`,
|
|
||||||
* `clients.archived`, and optionally reset `olms.archived`
|
|
||||||
*/
|
|
||||||
|
|
||||||
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
|
||||||
const MAX_RETRIES = 2;
|
|
||||||
const BASE_DELAY_MS = 50;
|
|
||||||
|
|
||||||
// ── Site (newt) pings ──────────────────────────────────────────────────
|
|
||||||
// Map of siteId -> latest ping timestamp (unix seconds)
|
|
||||||
const pendingSitePings: Map<number, number> = new Map();
|
|
||||||
|
|
||||||
// ── Client (OLM) pings ────────────────────────────────────────────────
|
|
||||||
// Map of clientId -> latest ping timestamp (unix seconds)
|
|
||||||
const pendingClientPings: Map<number, number> = new Map();
|
|
||||||
// Set of olmIds whose `archived` flag should be reset to false
|
|
||||||
const pendingOlmArchiveResets: Set<string> = new Set();
|
|
||||||
|
|
||||||
let flushTimer: NodeJS.Timeout | null = null;
|
|
||||||
|
|
||||||
// ── Public API ─────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Record a ping for a newt site. This does NOT write to the database
|
|
||||||
* immediately. Instead it stores the latest ping timestamp in memory,
|
|
||||||
* to be flushed periodically by the background timer.
|
|
||||||
*/
|
|
||||||
export function recordSitePing(siteId: number): void {
|
|
||||||
const now = Math.floor(Date.now() / 1000);
|
|
||||||
pendingSitePings.set(siteId, now);
|
|
||||||
}
|
|
||||||
|
|
||||||
/** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */
|
|
||||||
export const recordPing = recordSitePing;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Record a ping for an OLM client. Batches the `clients` table update
|
|
||||||
* (`online`, `lastPing`, `archived`) and, when `olmArchived` is true,
|
|
||||||
* also queues an `olms` table update to clear the archived flag.
|
|
||||||
*/
|
|
||||||
export function recordClientPing(
|
|
||||||
clientId: number,
|
|
||||||
olmId: string,
|
|
||||||
olmArchived: boolean
|
|
||||||
): void {
|
|
||||||
const now = Math.floor(Date.now() / 1000);
|
|
||||||
pendingClientPings.set(clientId, now);
|
|
||||||
if (olmArchived) {
|
|
||||||
pendingOlmArchiveResets.add(olmId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Flush Logic ────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flush all accumulated site pings to the database.
|
|
||||||
*/
|
|
||||||
async function flushSitePingsToDb(): Promise<void> {
|
|
||||||
if (pendingSitePings.size === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Snapshot and clear so new pings arriving during the flush go into a
|
|
||||||
// fresh map for the next cycle.
|
|
||||||
const pingsToFlush = new Map(pendingSitePings);
|
|
||||||
pendingSitePings.clear();
|
|
||||||
|
|
||||||
// Sort by siteId for consistent lock ordering (prevents deadlocks)
|
|
||||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
|
||||||
([a], [b]) => a - b
|
|
||||||
);
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
|
||||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
|
||||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await withRetry(async () => {
|
|
||||||
// Group by timestamp for efficient bulk updates
|
|
||||||
const byTimestamp = new Map<number, number[]>();
|
|
||||||
for (const [siteId, timestamp] of batch) {
|
|
||||||
const group = byTimestamp.get(timestamp) || [];
|
|
||||||
group.push(siteId);
|
|
||||||
byTimestamp.set(timestamp, group);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (byTimestamp.size === 1) {
|
|
||||||
const [timestamp, siteIds] = Array.from(
|
|
||||||
byTimestamp.entries()
|
|
||||||
)[0];
|
|
||||||
await db
|
|
||||||
.update(sites)
|
|
||||||
.set({
|
|
||||||
online: true,
|
|
||||||
lastPing: timestamp
|
|
||||||
})
|
|
||||||
.where(inArray(sites.siteId, siteIds));
|
|
||||||
} else {
|
|
||||||
await db.transaction(async (tx) => {
|
|
||||||
for (const [timestamp, siteIds] of byTimestamp) {
|
|
||||||
await tx
|
|
||||||
.update(sites)
|
|
||||||
.set({
|
|
||||||
online: true,
|
|
||||||
lastPing: timestamp
|
|
||||||
})
|
|
||||||
.where(inArray(sites.siteId, siteIds));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, "flushSitePingsToDb");
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
for (const [siteId, timestamp] of batch) {
|
|
||||||
const existing = pendingSitePings.get(siteId);
|
|
||||||
if (!existing || existing < timestamp) {
|
|
||||||
pendingSitePings.set(siteId, timestamp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flush all accumulated client (OLM) pings to the database.
|
|
||||||
*/
|
|
||||||
async function flushClientPingsToDb(): Promise<void> {
|
|
||||||
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Snapshot and clear
|
|
||||||
const pingsToFlush = new Map(pendingClientPings);
|
|
||||||
pendingClientPings.clear();
|
|
||||||
|
|
||||||
const olmResetsToFlush = new Set(pendingOlmArchiveResets);
|
|
||||||
pendingOlmArchiveResets.clear();
|
|
||||||
|
|
||||||
// ── Flush client pings ─────────────────────────────────────────────
|
|
||||||
if (pingsToFlush.size > 0) {
|
|
||||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
|
||||||
([a], [b]) => a - b
|
|
||||||
);
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
|
||||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
|
||||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await withRetry(async () => {
|
|
||||||
const byTimestamp = new Map<number, number[]>();
|
|
||||||
for (const [clientId, timestamp] of batch) {
|
|
||||||
const group = byTimestamp.get(timestamp) || [];
|
|
||||||
group.push(clientId);
|
|
||||||
byTimestamp.set(timestamp, group);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (byTimestamp.size === 1) {
|
|
||||||
const [timestamp, clientIds] = Array.from(
|
|
||||||
byTimestamp.entries()
|
|
||||||
)[0];
|
|
||||||
await db
|
|
||||||
.update(clients)
|
|
||||||
.set({
|
|
||||||
lastPing: timestamp,
|
|
||||||
online: true,
|
|
||||||
archived: false
|
|
||||||
})
|
|
||||||
.where(inArray(clients.clientId, clientIds));
|
|
||||||
} else {
|
|
||||||
await db.transaction(async (tx) => {
|
|
||||||
for (const [timestamp, clientIds] of byTimestamp) {
|
|
||||||
await tx
|
|
||||||
.update(clients)
|
|
||||||
.set({
|
|
||||||
lastPing: timestamp,
|
|
||||||
online: true,
|
|
||||||
archived: false
|
|
||||||
})
|
|
||||||
.where(
|
|
||||||
inArray(clients.clientId, clientIds)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, "flushClientPingsToDb");
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`,
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
for (const [clientId, timestamp] of batch) {
|
|
||||||
const existing = pendingClientPings.get(clientId);
|
|
||||||
if (!existing || existing < timestamp) {
|
|
||||||
pendingClientPings.set(clientId, timestamp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Flush OLM archive resets ───────────────────────────────────────
|
|
||||||
if (olmResetsToFlush.size > 0) {
|
|
||||||
const olmIds = Array.from(olmResetsToFlush).sort();
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
|
||||||
for (let i = 0; i < olmIds.length; i += BATCH_SIZE) {
|
|
||||||
const batch = olmIds.slice(i, i + BATCH_SIZE);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await withRetry(async () => {
|
|
||||||
await db
|
|
||||||
.update(olms)
|
|
||||||
.set({ archived: false })
|
|
||||||
.where(inArray(olms.olmId, batch));
|
|
||||||
}, "flushOlmArchiveResets");
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`,
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
for (const olmId of batch) {
|
|
||||||
pendingOlmArchiveResets.add(olmId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flush everything — called by the interval timer and during shutdown.
|
|
||||||
*/
|
|
||||||
export async function flushPingsToDb(): Promise<void> {
|
|
||||||
await flushSitePingsToDb();
|
|
||||||
await flushClientPingsToDb();
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Retry / Error Helpers ──────────────────────────────────────────────
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Simple retry wrapper with exponential backoff for transient errors
|
|
||||||
* (connection timeouts, unexpected disconnects).
|
|
||||||
*/
|
|
||||||
async function withRetry<T>(
|
|
||||||
operation: () => Promise<T>,
|
|
||||||
context: string
|
|
||||||
): Promise<T> {
|
|
||||||
let attempt = 0;
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
return await operation();
|
|
||||||
} catch (error: any) {
|
|
||||||
if (isTransientError(error) && attempt < MAX_RETRIES) {
|
|
||||||
attempt++;
|
|
||||||
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
|
||||||
const jitter = Math.random() * baseDelay;
|
|
||||||
const delay = baseDelay + jitter;
|
|
||||||
logger.warn(
|
|
||||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
|
||||||
);
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Detect transient connection errors that are safe to retry.
|
|
||||||
*/
|
|
||||||
function isTransientError(error: any): boolean {
|
|
||||||
if (!error) return false;
|
|
||||||
|
|
||||||
const message = (error.message || "").toLowerCase();
|
|
||||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
|
||||||
const code = error.code || "";
|
|
||||||
|
|
||||||
// Connection timeout / terminated
|
|
||||||
if (
|
|
||||||
message.includes("connection timeout") ||
|
|
||||||
message.includes("connection terminated") ||
|
|
||||||
message.includes("timeout exceeded when trying to connect") ||
|
|
||||||
causeMessage.includes("connection terminated unexpectedly") ||
|
|
||||||
causeMessage.includes("connection timeout")
|
|
||||||
) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// PostgreSQL deadlock
|
|
||||||
if (code === "40P01" || message.includes("deadlock")) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ECONNRESET, ECONNREFUSED, EPIPE
|
|
||||||
if (
|
|
||||||
code === "ECONNRESET" ||
|
|
||||||
code === "ECONNREFUSED" ||
|
|
||||||
code === "EPIPE" ||
|
|
||||||
code === "ETIMEDOUT"
|
|
||||||
) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ── Lifecycle ──────────────────────────────────────────────────────────
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Start the background flush timer. Call this once at server startup.
|
|
||||||
*/
|
|
||||||
export function startPingAccumulator(): void {
|
|
||||||
if (flushTimer) {
|
|
||||||
return; // Already running
|
|
||||||
}
|
|
||||||
|
|
||||||
flushTimer = setInterval(async () => {
|
|
||||||
try {
|
|
||||||
await flushPingsToDb();
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Unhandled error in ping accumulator flush", {
|
|
||||||
error
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, FLUSH_INTERVAL_MS);
|
|
||||||
|
|
||||||
// Don't prevent the process from exiting
|
|
||||||
flushTimer.unref();
|
|
||||||
|
|
||||||
logger.info(
|
|
||||||
`Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stop the background flush timer and perform a final flush.
|
|
||||||
* Call this during graceful shutdown.
|
|
||||||
*/
|
|
||||||
export async function stopPingAccumulator(): Promise<void> {
|
|
||||||
if (flushTimer) {
|
|
||||||
clearInterval(flushTimer);
|
|
||||||
flushTimer = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Final flush to persist any remaining pings
|
|
||||||
try {
|
|
||||||
await flushPingsToDb();
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Error during final ping accumulator flush", { error });
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.info("Ping accumulator stopped");
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get the number of pending (unflushed) pings. Useful for monitoring.
|
|
||||||
*/
|
|
||||||
export function getPendingPingCount(): number {
|
|
||||||
return pendingSitePings.size + pendingClientPings.size;
|
|
||||||
}
|
|
||||||
@@ -8,7 +8,7 @@ import {
|
|||||||
ExitNode,
|
ExitNode,
|
||||||
exitNodes,
|
exitNodes,
|
||||||
sites,
|
sites,
|
||||||
clientSitesAssociationsCache,
|
clientSitesAssociationsCache
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import { olms } from "@server/db";
|
import { olms } from "@server/db";
|
||||||
import HttpCode from "@server/types/HttpCode";
|
import HttpCode from "@server/types/HttpCode";
|
||||||
@@ -20,10 +20,8 @@ import { z } from "zod";
|
|||||||
import { fromError } from "zod-validation-error";
|
import { fromError } from "zod-validation-error";
|
||||||
import {
|
import {
|
||||||
createOlmSession,
|
createOlmSession,
|
||||||
validateOlmSessionToken,
|
validateOlmSessionToken
|
||||||
EXPIRES
|
|
||||||
} from "@server/auth/sessions/olm";
|
} from "@server/auth/sessions/olm";
|
||||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
|
||||||
import { verifyPassword } from "@server/auth/password";
|
import { verifyPassword } from "@server/auth/password";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
@@ -134,19 +132,8 @@ export async function getOlmToken(
|
|||||||
|
|
||||||
logger.debug("Creating new olm session token");
|
logger.debug("Creating new olm session token");
|
||||||
|
|
||||||
// Return a cached token if one exists to prevent thundering herd on
|
const resToken = generateSessionToken();
|
||||||
// simultaneous restarts; falls back to creating a fresh session when
|
await createOlmSession(resToken, existingOlm.olmId);
|
||||||
// Redis is unavailable or the cache has expired.
|
|
||||||
const resToken = await getOrCreateCachedToken(
|
|
||||||
`olm:token_cache:${existingOlm.olmId}`,
|
|
||||||
config.getRawConfig().server.secret!,
|
|
||||||
Math.floor(EXPIRES / 1000),
|
|
||||||
async () => {
|
|
||||||
const token = generateSessionToken();
|
|
||||||
await createOlmSession(token, existingOlm.olmId);
|
|
||||||
return token;
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let clientIdToUse;
|
let clientIdToUse;
|
||||||
if (orgId) {
|
if (orgId) {
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ import { db } from "@server/db";
|
|||||||
import { MessageHandler } from "@server/routers/ws";
|
import { MessageHandler } from "@server/routers/ws";
|
||||||
import { clients, olms, Olm } from "@server/db";
|
import { clients, olms, Olm } from "@server/db";
|
||||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||||
import { recordClientPing } from "@server/routers/newt/pingAccumulator";
|
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { validateSessionToken } from "@server/auth/sessions/app";
|
import { validateSessionToken } from "@server/auth/sessions/app";
|
||||||
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
|
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
|
||||||
@@ -202,12 +201,22 @@ export const handleOlmPingMessage: MessageHandler = async (context) => {
|
|||||||
await sendOlmSyncMessage(olm, client);
|
await sendOlmSyncMessage(olm, client);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Record the ping in memory; it will be flushed to the database
|
// Update the client's last ping timestamp
|
||||||
// periodically by the ping accumulator (every ~10s) in a single
|
await db
|
||||||
// batched UPDATE instead of one query per ping. This prevents
|
.update(clients)
|
||||||
// connection pool exhaustion under load, especially with
|
.set({
|
||||||
// cross-region latency to the database.
|
lastPing: Math.floor(Date.now() / 1000),
|
||||||
recordClientPing(olm.clientId, olm.olmId, !!olm.archived);
|
online: true,
|
||||||
|
archived: false
|
||||||
|
})
|
||||||
|
.where(eq(clients.clientId, olm.clientId));
|
||||||
|
|
||||||
|
if (olm.archived) {
|
||||||
|
await db
|
||||||
|
.update(olms)
|
||||||
|
.set({ archived: false })
|
||||||
|
.where(eq(olms.olmId, olm.olmId));
|
||||||
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Error handling ping message", { error });
|
logger.error("Error handling ping message", { error });
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ const createSiteResourceSchema = z
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
message:
|
message:
|
||||||
"Destination must be a valid IPV4 address or valid domain AND alias is required"
|
"Destination must be a valid IP address or valid domain AND alias is required"
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
.refine(
|
.refine(
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ import { updatePeerData, updateTargets } from "@server/routers/client/targets";
|
|||||||
import {
|
import {
|
||||||
generateAliasConfig,
|
generateAliasConfig,
|
||||||
generateRemoteSubnets,
|
generateRemoteSubnets,
|
||||||
generateSubnetProxyTargetV2,
|
generateSubnetProxyTargets,
|
||||||
isIpInCidr,
|
isIpInCidr,
|
||||||
portRangeStringSchema
|
portRangeStringSchema
|
||||||
} from "@server/lib/ip";
|
} from "@server/lib/ip";
|
||||||
@@ -608,18 +608,18 @@ export async function handleMessagingForUpdatedSiteResource(
|
|||||||
|
|
||||||
// Only update targets on newt if destination changed
|
// Only update targets on newt if destination changed
|
||||||
if (destinationChanged || portRangesChanged) {
|
if (destinationChanged || portRangesChanged) {
|
||||||
const oldTarget = generateSubnetProxyTargetV2(
|
const oldTargets = generateSubnetProxyTargets(
|
||||||
existingSiteResource,
|
existingSiteResource,
|
||||||
mergedAllClients
|
mergedAllClients
|
||||||
);
|
);
|
||||||
const newTarget = generateSubnetProxyTargetV2(
|
const newTargets = generateSubnetProxyTargets(
|
||||||
updatedSiteResource,
|
updatedSiteResource,
|
||||||
mergedAllClients
|
mergedAllClients
|
||||||
);
|
);
|
||||||
|
|
||||||
await updateTargets(newt.newtId, {
|
await updateTargets(newt.newtId, {
|
||||||
oldTargets: oldTarget ? [oldTarget] : [],
|
oldTargets: oldTargets,
|
||||||
newTargets: newTarget ? [newTarget] : []
|
newTargets: newTargets
|
||||||
}, newt.version);
|
}, newt.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -11,7 +11,6 @@ import {
|
|||||||
startNewtOfflineChecker,
|
startNewtOfflineChecker,
|
||||||
handleNewtDisconnectingMessage
|
handleNewtDisconnectingMessage
|
||||||
} from "../newt";
|
} from "../newt";
|
||||||
import { startPingAccumulator } from "../newt/pingAccumulator";
|
|
||||||
import {
|
import {
|
||||||
handleOlmRegisterMessage,
|
handleOlmRegisterMessage,
|
||||||
handleOlmRelayMessage,
|
handleOlmRelayMessage,
|
||||||
@@ -47,10 +46,6 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
|||||||
"ws/round-trip/complete": handleRoundTripMessage
|
"ws/round-trip/complete": handleRoundTripMessage
|
||||||
};
|
};
|
||||||
|
|
||||||
// Start the ping accumulator for all builds — it batches per-site online/lastPing
|
|
||||||
// updates into periodic bulk writes, preventing connection pool exhaustion.
|
|
||||||
startPingAccumulator();
|
|
||||||
|
|
||||||
if (build != "saas") {
|
if (build != "saas") {
|
||||||
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
||||||
startNewtOfflineChecker(); // this is to handle the offline check for newts
|
startNewtOfflineChecker(); // this is to handle the offline check for newts
|
||||||
|
|||||||
@@ -6,7 +6,6 @@ import { Socket } from "net";
|
|||||||
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
|
||||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||||
import { messageHandlers } from "./messageHandlers";
|
import { messageHandlers } from "./messageHandlers";
|
||||||
@@ -387,14 +386,22 @@ const setupConnection = async (
|
|||||||
// the same as modern newt clients.
|
// the same as modern newt clients.
|
||||||
if (clientType === "newt") {
|
if (clientType === "newt") {
|
||||||
const newtClient = client as Newt;
|
const newtClient = client as Newt;
|
||||||
ws.on("ping", () => {
|
ws.on("ping", async () => {
|
||||||
if (!newtClient.siteId) return;
|
if (!newtClient.siteId) return;
|
||||||
// Record the ping in the accumulator instead of writing to the
|
try {
|
||||||
// database on every WS ping frame. The accumulator flushes all
|
await db
|
||||||
// pending pings in a single batched UPDATE every ~10s, which
|
.update(sites)
|
||||||
// prevents connection pool exhaustion under load (especially
|
.set({
|
||||||
// with cross-region latency to the database).
|
online: true,
|
||||||
recordPing(newtClient.siteId);
|
lastPing: Math.floor(Date.now() / 1000)
|
||||||
|
})
|
||||||
|
.where(eq(sites.siteId, newtClient.siteId));
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
"Error updating newt site online state on WS ping",
|
||||||
|
{ error }
|
||||||
|
);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -275,8 +275,6 @@ export default function Page() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
const disabled = !isPaidUser(tierMatrix.orgOidc);
|
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<>
|
<>
|
||||||
<div className="flex justify-between">
|
<div className="flex justify-between">
|
||||||
@@ -294,9 +292,6 @@ export default function Page() {
|
|||||||
</Button>
|
</Button>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<PaidFeaturesAlert tiers={tierMatrix.orgOidc} />
|
|
||||||
|
|
||||||
<fieldset disabled={disabled} className={disabled ? "opacity-50 pointer-events-none" : ""}>
|
|
||||||
<SettingsContainer>
|
<SettingsContainer>
|
||||||
<SettingsSection>
|
<SettingsSection>
|
||||||
<SettingsSectionHeader>
|
<SettingsSectionHeader>
|
||||||
@@ -817,10 +812,9 @@ export default function Page() {
|
|||||||
</Button>
|
</Button>
|
||||||
<Button
|
<Button
|
||||||
type="submit"
|
type="submit"
|
||||||
disabled={createLoading || disabled}
|
disabled={createLoading || !isPaidUser(tierMatrix.orgOidc)}
|
||||||
loading={createLoading}
|
loading={createLoading}
|
||||||
onClick={() => {
|
onClick={() => {
|
||||||
if (disabled) return;
|
|
||||||
// log any issues with the form
|
// log any issues with the form
|
||||||
console.log(form.formState.errors);
|
console.log(form.formState.errors);
|
||||||
form.handleSubmit(onSubmit)();
|
form.handleSubmit(onSubmit)();
|
||||||
@@ -829,7 +823,6 @@ export default function Page() {
|
|||||||
{t("idpSubmit")}
|
{t("idpSubmit")}
|
||||||
</Button>
|
</Button>
|
||||||
</div>
|
</div>
|
||||||
</fieldset>
|
|
||||||
</>
|
</>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -493,8 +493,7 @@ export default function GeneralPage() {
|
|||||||
{
|
{
|
||||||
value: "whitelistedEmail",
|
value: "whitelistedEmail",
|
||||||
label: "Whitelisted Email"
|
label: "Whitelisted Email"
|
||||||
},
|
}
|
||||||
{ value: "ssh", label: "SSH" }
|
|
||||||
]}
|
]}
|
||||||
selectedValue={filters.type}
|
selectedValue={filters.type}
|
||||||
onValueChange={(value) =>
|
onValueChange={(value) =>
|
||||||
@@ -508,12 +507,13 @@ export default function GeneralPage() {
|
|||||||
);
|
);
|
||||||
},
|
},
|
||||||
cell: ({ row }) => {
|
cell: ({ row }) => {
|
||||||
const typeLabel =
|
// should be capitalized first letter
|
||||||
row.original.type === "ssh"
|
return (
|
||||||
? "SSH"
|
<span>
|
||||||
: row.original.type.charAt(0).toUpperCase() +
|
{row.original.type.charAt(0).toUpperCase() +
|
||||||
row.original.type.slice(1);
|
row.original.type.slice(1) || "-"}
|
||||||
return <span>{typeLabel || "-"}</span>;
|
</span>
|
||||||
|
);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user