mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-25 03:56:38 +00:00
Compare commits
29 Commits
1.16.2-s.1
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ce59a8a52b | ||
|
|
62c63ddcaa | ||
|
|
dfd604c781 | ||
|
|
38d30b0214 | ||
|
|
c96c5e8ae8 | ||
|
|
6f71e9f0f2 | ||
|
|
d17ec6dc1f | ||
|
|
c36a019f5d | ||
|
|
cf2dfdea5b | ||
|
|
985e1bb9ab | ||
|
|
fff38aac85 | ||
|
|
5a2a97b23a | ||
|
|
5b894e8682 | ||
|
|
19f8c1772f | ||
|
|
37d331e813 | ||
|
|
c660df55cd | ||
|
|
7c8b865379 | ||
|
|
3cca0c09c0 | ||
|
|
85335bfecc | ||
|
|
7c2b4f422a | ||
|
|
ad2a0ae127 | ||
|
|
6c2c620c99 | ||
|
|
f643abf19a | ||
|
|
a1729033cf | ||
|
|
7311766512 | ||
|
|
b01fcc70fe | ||
|
|
35fed74e49 | ||
|
|
6cf1b9b010 | ||
|
|
dae169540b |
115
license.py
Normal file
115
license.py
Normal file
@@ -0,0 +1,115 @@
|
|||||||
|
import os
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# --- Configuration ---
|
||||||
|
# The header text to be added to the files.
|
||||||
|
HEADER_TEXT = """/*
|
||||||
|
* This file is part of a proprietary work.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2025 Fossorial, Inc.
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* This file is licensed under the Fossorial Commercial License.
|
||||||
|
* You may not use this file except in compliance with the License.
|
||||||
|
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||||
|
*
|
||||||
|
* This file is not licensed under the AGPLv3.
|
||||||
|
*/
|
||||||
|
"""
|
||||||
|
|
||||||
|
def should_add_header(file_path):
|
||||||
|
"""
|
||||||
|
Checks if a file should receive the commercial license header.
|
||||||
|
Returns True if 'private' is in the path or file content.
|
||||||
|
"""
|
||||||
|
# Check if 'private' is in the file path (case-insensitive)
|
||||||
|
if 'server/private' in file_path.lower():
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Check if 'private' is in the file content (case-insensitive)
|
||||||
|
# try:
|
||||||
|
# with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
|
||||||
|
# content = f.read()
|
||||||
|
# if 'private' in content.lower():
|
||||||
|
# return True
|
||||||
|
# except Exception as e:
|
||||||
|
# print(f"Could not read file {file_path}: {e}")
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
def process_directory(root_dir):
|
||||||
|
"""
|
||||||
|
Recursively scans a directory and adds headers to qualifying .ts or .tsx files,
|
||||||
|
skipping any 'node_modules' directories.
|
||||||
|
"""
|
||||||
|
print(f"Scanning directory: {root_dir}")
|
||||||
|
files_processed = 0
|
||||||
|
headers_added = 0
|
||||||
|
|
||||||
|
for root, dirs, files in os.walk(root_dir):
|
||||||
|
# --- MODIFICATION ---
|
||||||
|
# Exclude 'node_modules' directories from the scan to improve performance.
|
||||||
|
if 'node_modules' in dirs:
|
||||||
|
dirs.remove('node_modules')
|
||||||
|
|
||||||
|
for file in files:
|
||||||
|
if file.endswith('.ts') or file.endswith('.tsx'):
|
||||||
|
file_path = os.path.join(root, file)
|
||||||
|
files_processed += 1
|
||||||
|
|
||||||
|
try:
|
||||||
|
with open(file_path, 'r+', encoding='utf-8') as f:
|
||||||
|
original_content = f.read()
|
||||||
|
has_header = original_content.startswith(HEADER_TEXT.strip())
|
||||||
|
|
||||||
|
if should_add_header(file_path):
|
||||||
|
# Add header only if it's not already there
|
||||||
|
if not has_header:
|
||||||
|
f.seek(0, 0) # Go to the beginning of the file
|
||||||
|
f.write(HEADER_TEXT.strip() + '\n\n' + original_content)
|
||||||
|
print(f"Added header to: {file_path}")
|
||||||
|
headers_added += 1
|
||||||
|
else:
|
||||||
|
print(f"Header already exists in: {file_path}")
|
||||||
|
else:
|
||||||
|
# Remove header if it exists but shouldn't be there
|
||||||
|
if has_header:
|
||||||
|
# Find the end of the header and remove it (including following newlines)
|
||||||
|
header_with_newlines = HEADER_TEXT.strip() + '\n\n'
|
||||||
|
if original_content.startswith(header_with_newlines):
|
||||||
|
content_without_header = original_content[len(header_with_newlines):]
|
||||||
|
else:
|
||||||
|
# Handle case where there might be different newline patterns
|
||||||
|
header_end = len(HEADER_TEXT.strip())
|
||||||
|
# Skip any newlines after the header
|
||||||
|
while header_end < len(original_content) and original_content[header_end] in '\n\r':
|
||||||
|
header_end += 1
|
||||||
|
content_without_header = original_content[header_end:]
|
||||||
|
|
||||||
|
f.seek(0)
|
||||||
|
f.write(content_without_header)
|
||||||
|
f.truncate()
|
||||||
|
print(f"Removed header from: {file_path}")
|
||||||
|
headers_added += 1 # Reusing counter for modifications
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
print(f"Error processing file {file_path}: {e}")
|
||||||
|
|
||||||
|
print("\n--- Scan Complete ---")
|
||||||
|
print(f"Total .ts or .tsx files found: {files_processed}")
|
||||||
|
print(f"Files modified (headers added/removed): {headers_added}")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
# Get the target directory from the command line arguments.
|
||||||
|
# If no directory is provided, it uses the current directory ('.').
|
||||||
|
if len(sys.argv) > 1:
|
||||||
|
target_directory = sys.argv[1]
|
||||||
|
else:
|
||||||
|
target_directory = '.' # Default to current directory
|
||||||
|
|
||||||
|
if not os.path.isdir(target_directory):
|
||||||
|
print(f"Error: Directory '{target_directory}' not found.")
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
|
process_directory(os.path.abspath(target_directory))
|
||||||
@@ -1,8 +1,10 @@
|
|||||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||||
|
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
||||||
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
||||||
|
|
||||||
async function cleanup() {
|
async function cleanup() {
|
||||||
|
await stopPingAccumulator();
|
||||||
await flushBandwidthToDb();
|
await flushBandwidthToDb();
|
||||||
await flushSiteBandwidthToDb();
|
await flushSiteBandwidthToDb();
|
||||||
await wsCleanup();
|
await wsCleanup();
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
||||||
import { Pool } from "pg";
|
|
||||||
import { readConfigFile } from "@server/lib/readConfigFile";
|
import { readConfigFile } from "@server/lib/readConfigFile";
|
||||||
import { withReplicas } from "drizzle-orm/pg-core";
|
import { withReplicas } from "drizzle-orm/pg-core";
|
||||||
|
import { createPool } from "./poolConfig";
|
||||||
|
|
||||||
function createDb() {
|
function createDb() {
|
||||||
const config = readConfigFile();
|
const config = readConfigFile();
|
||||||
@@ -39,12 +39,17 @@ function createDb() {
|
|||||||
|
|
||||||
// Create connection pools instead of individual connections
|
// Create connection pools instead of individual connections
|
||||||
const poolConfig = config.postgres.pool;
|
const poolConfig = config.postgres.pool;
|
||||||
const primaryPool = new Pool({
|
const maxConnections = poolConfig?.max_connections || 20;
|
||||||
|
const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000;
|
||||||
|
const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000;
|
||||||
|
|
||||||
|
const primaryPool = createPool(
|
||||||
connectionString,
|
connectionString,
|
||||||
max: poolConfig?.max_connections || 20,
|
maxConnections,
|
||||||
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
idleTimeoutMs,
|
||||||
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
|
connectionTimeoutMs,
|
||||||
});
|
"primary"
|
||||||
|
);
|
||||||
|
|
||||||
const replicas = [];
|
const replicas = [];
|
||||||
|
|
||||||
@@ -55,14 +60,16 @@ function createDb() {
|
|||||||
})
|
})
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
|
const maxReplicaConnections =
|
||||||
|
poolConfig?.max_replica_connections || 20;
|
||||||
for (const conn of replicaConnections) {
|
for (const conn of replicaConnections) {
|
||||||
const replicaPool = new Pool({
|
const replicaPool = createPool(
|
||||||
connectionString: conn.connection_string,
|
conn.connection_string,
|
||||||
max: poolConfig?.max_replica_connections || 20,
|
maxReplicaConnections,
|
||||||
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
idleTimeoutMs,
|
||||||
connectionTimeoutMillis:
|
connectionTimeoutMs,
|
||||||
poolConfig?.connection_timeout_ms || 5000
|
"replica"
|
||||||
});
|
);
|
||||||
replicas.push(
|
replicas.push(
|
||||||
DrizzlePostgres(replicaPool, {
|
DrizzlePostgres(replicaPool, {
|
||||||
logger: process.env.QUERY_LOGGING == "true"
|
logger: process.env.QUERY_LOGGING == "true"
|
||||||
|
|||||||
@@ -1,9 +1,9 @@
|
|||||||
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
import { drizzle as DrizzlePostgres } from "drizzle-orm/node-postgres";
|
||||||
import { Pool } from "pg";
|
|
||||||
import { readConfigFile } from "@server/lib/readConfigFile";
|
import { readConfigFile } from "@server/lib/readConfigFile";
|
||||||
import { withReplicas } from "drizzle-orm/pg-core";
|
import { withReplicas } from "drizzle-orm/pg-core";
|
||||||
import { build } from "@server/build";
|
import { build } from "@server/build";
|
||||||
import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver";
|
import { db as mainDb, primaryDb as mainPrimaryDb } from "./driver";
|
||||||
|
import { createPool } from "./poolConfig";
|
||||||
|
|
||||||
function createLogsDb() {
|
function createLogsDb() {
|
||||||
// Only use separate logs database in SaaS builds
|
// Only use separate logs database in SaaS builds
|
||||||
@@ -42,12 +42,17 @@ function createLogsDb() {
|
|||||||
|
|
||||||
// Create separate connection pool for logs database
|
// Create separate connection pool for logs database
|
||||||
const poolConfig = logsConfig?.pool || config.postgres?.pool;
|
const poolConfig = logsConfig?.pool || config.postgres?.pool;
|
||||||
const primaryPool = new Pool({
|
const maxConnections = poolConfig?.max_connections || 20;
|
||||||
|
const idleTimeoutMs = poolConfig?.idle_timeout_ms || 30000;
|
||||||
|
const connectionTimeoutMs = poolConfig?.connection_timeout_ms || 5000;
|
||||||
|
|
||||||
|
const primaryPool = createPool(
|
||||||
connectionString,
|
connectionString,
|
||||||
max: poolConfig?.max_connections || 20,
|
maxConnections,
|
||||||
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
idleTimeoutMs,
|
||||||
connectionTimeoutMillis: poolConfig?.connection_timeout_ms || 5000
|
connectionTimeoutMs,
|
||||||
});
|
"logs-primary"
|
||||||
|
);
|
||||||
|
|
||||||
const replicas = [];
|
const replicas = [];
|
||||||
|
|
||||||
@@ -58,14 +63,16 @@ function createLogsDb() {
|
|||||||
})
|
})
|
||||||
);
|
);
|
||||||
} else {
|
} else {
|
||||||
|
const maxReplicaConnections =
|
||||||
|
poolConfig?.max_replica_connections || 20;
|
||||||
for (const conn of replicaConnections) {
|
for (const conn of replicaConnections) {
|
||||||
const replicaPool = new Pool({
|
const replicaPool = createPool(
|
||||||
connectionString: conn.connection_string,
|
conn.connection_string,
|
||||||
max: poolConfig?.max_replica_connections || 20,
|
maxReplicaConnections,
|
||||||
idleTimeoutMillis: poolConfig?.idle_timeout_ms || 30000,
|
idleTimeoutMs,
|
||||||
connectionTimeoutMillis:
|
connectionTimeoutMs,
|
||||||
poolConfig?.connection_timeout_ms || 5000
|
"logs-replica"
|
||||||
});
|
);
|
||||||
replicas.push(
|
replicas.push(
|
||||||
DrizzlePostgres(replicaPool, {
|
DrizzlePostgres(replicaPool, {
|
||||||
logger: process.env.QUERY_LOGGING == "true"
|
logger: process.env.QUERY_LOGGING == "true"
|
||||||
|
|||||||
63
server/db/pg/poolConfig.ts
Normal file
63
server/db/pg/poolConfig.ts
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
import { Pool, PoolConfig } from "pg";
|
||||||
|
import logger from "@server/logger";
|
||||||
|
|
||||||
|
export function createPoolConfig(
|
||||||
|
connectionString: string,
|
||||||
|
maxConnections: number,
|
||||||
|
idleTimeoutMs: number,
|
||||||
|
connectionTimeoutMs: number
|
||||||
|
): PoolConfig {
|
||||||
|
return {
|
||||||
|
connectionString,
|
||||||
|
max: maxConnections,
|
||||||
|
idleTimeoutMillis: idleTimeoutMs,
|
||||||
|
connectionTimeoutMillis: connectionTimeoutMs,
|
||||||
|
// TCP keepalive to prevent silent connection drops by NAT gateways,
|
||||||
|
// load balancers, and other intermediate network devices (e.g. AWS
|
||||||
|
// NAT Gateway drops idle TCP connections after ~350s)
|
||||||
|
keepAlive: true,
|
||||||
|
keepAliveInitialDelayMillis: 10000, // send first keepalive after 10s of idle
|
||||||
|
// Allow connections to be released and recreated more aggressively
|
||||||
|
// to avoid stale connections building up
|
||||||
|
allowExitOnIdle: false
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
export function attachPoolErrorHandlers(pool: Pool, label: string): void {
|
||||||
|
pool.on("error", (err) => {
|
||||||
|
// This catches errors on idle clients in the pool. Without this
|
||||||
|
// handler an unexpected disconnect would crash the process.
|
||||||
|
logger.error(
|
||||||
|
`Unexpected error on idle ${label} database client: ${err.message}`
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
pool.on("connect", (client) => {
|
||||||
|
// Set a statement timeout on every new connection so a single slow
|
||||||
|
// query can't block the pool forever
|
||||||
|
client.query("SET statement_timeout = '30s'").catch((err: Error) => {
|
||||||
|
logger.warn(
|
||||||
|
`Failed to set statement_timeout on ${label} client: ${err.message}`
|
||||||
|
);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createPool(
|
||||||
|
connectionString: string,
|
||||||
|
maxConnections: number,
|
||||||
|
idleTimeoutMs: number,
|
||||||
|
connectionTimeoutMs: number,
|
||||||
|
label: string
|
||||||
|
): Pool {
|
||||||
|
const pool = new Pool(
|
||||||
|
createPoolConfig(
|
||||||
|
connectionString,
|
||||||
|
maxConnections,
|
||||||
|
idleTimeoutMs,
|
||||||
|
connectionTimeoutMs
|
||||||
|
)
|
||||||
|
);
|
||||||
|
attachPoolErrorHandlers(pool, label);
|
||||||
|
return pool;
|
||||||
|
}
|
||||||
123
server/lib/ip.ts
123
server/lib/ip.ts
@@ -571,6 +571,129 @@ export function generateSubnetProxyTargets(
|
|||||||
return targets;
|
return targets;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type SubnetProxyTargetV2 = {
|
||||||
|
sourcePrefixes: string[]; // must be cidrs
|
||||||
|
destPrefix: string; // must be a cidr
|
||||||
|
disableIcmp?: boolean;
|
||||||
|
rewriteTo?: string; // must be a cidr
|
||||||
|
portRange?: {
|
||||||
|
min: number;
|
||||||
|
max: number;
|
||||||
|
protocol: "tcp" | "udp";
|
||||||
|
}[];
|
||||||
|
};
|
||||||
|
|
||||||
|
export function generateSubnetProxyTargetV2(
|
||||||
|
siteResource: SiteResource,
|
||||||
|
clients: {
|
||||||
|
clientId: number;
|
||||||
|
pubKey: string | null;
|
||||||
|
subnet: string | null;
|
||||||
|
}[]
|
||||||
|
): SubnetProxyTargetV2 | undefined {
|
||||||
|
if (clients.length === 0) {
|
||||||
|
logger.debug(
|
||||||
|
`No clients have access to site resource ${siteResource.siteResourceId}, skipping target generation.`
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let target: SubnetProxyTargetV2 | null = null;
|
||||||
|
|
||||||
|
const portRange = [
|
||||||
|
...parsePortRangeString(siteResource.tcpPortRangeString, "tcp"),
|
||||||
|
...parsePortRangeString(siteResource.udpPortRangeString, "udp")
|
||||||
|
];
|
||||||
|
const disableIcmp = siteResource.disableIcmp ?? false;
|
||||||
|
|
||||||
|
if (siteResource.mode == "host") {
|
||||||
|
let destination = siteResource.destination;
|
||||||
|
// check if this is a valid ip
|
||||||
|
const ipSchema = z.union([z.ipv4(), z.ipv6()]);
|
||||||
|
if (ipSchema.safeParse(destination).success) {
|
||||||
|
destination = `${destination}/32`;
|
||||||
|
|
||||||
|
target = {
|
||||||
|
sourcePrefixes: [],
|
||||||
|
destPrefix: destination,
|
||||||
|
portRange,
|
||||||
|
disableIcmp
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (siteResource.alias && siteResource.aliasAddress) {
|
||||||
|
// also push a match for the alias address
|
||||||
|
target = {
|
||||||
|
sourcePrefixes: [],
|
||||||
|
destPrefix: `${siteResource.aliasAddress}/32`,
|
||||||
|
rewriteTo: destination,
|
||||||
|
portRange,
|
||||||
|
disableIcmp
|
||||||
|
};
|
||||||
|
}
|
||||||
|
} else if (siteResource.mode == "cidr") {
|
||||||
|
target = {
|
||||||
|
sourcePrefixes: [],
|
||||||
|
destPrefix: siteResource.destination,
|
||||||
|
portRange,
|
||||||
|
disableIcmp
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!target) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const clientSite of clients) {
|
||||||
|
if (!clientSite.subnet) {
|
||||||
|
logger.debug(
|
||||||
|
`Client ${clientSite.clientId} has no subnet, skipping for site resource ${siteResource.siteResourceId}.`
|
||||||
|
);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
const clientPrefix = `${clientSite.subnet.split("/")[0]}/32`;
|
||||||
|
|
||||||
|
// add client prefix to source prefixes
|
||||||
|
target.sourcePrefixes.push(clientPrefix);
|
||||||
|
}
|
||||||
|
|
||||||
|
// print a nice representation of the targets
|
||||||
|
// logger.debug(
|
||||||
|
// `Generated subnet proxy targets for: ${JSON.stringify(targets, null, 2)}`
|
||||||
|
// );
|
||||||
|
|
||||||
|
return target;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Converts a SubnetProxyTargetV2 to an array of SubnetProxyTarget (v1)
|
||||||
|
* by expanding each source prefix into its own target entry.
|
||||||
|
* @param targetV2 - The v2 target to convert
|
||||||
|
* @returns Array of v1 SubnetProxyTarget objects
|
||||||
|
*/
|
||||||
|
export function convertSubnetProxyTargetsV2ToV1(
|
||||||
|
targetsV2: SubnetProxyTargetV2[]
|
||||||
|
): SubnetProxyTarget[] {
|
||||||
|
return targetsV2.flatMap((targetV2) =>
|
||||||
|
targetV2.sourcePrefixes.map((sourcePrefix) => ({
|
||||||
|
sourcePrefix,
|
||||||
|
destPrefix: targetV2.destPrefix,
|
||||||
|
...(targetV2.disableIcmp !== undefined && {
|
||||||
|
disableIcmp: targetV2.disableIcmp
|
||||||
|
}),
|
||||||
|
...(targetV2.rewriteTo !== undefined && {
|
||||||
|
rewriteTo: targetV2.rewriteTo
|
||||||
|
}),
|
||||||
|
...(targetV2.portRange !== undefined && {
|
||||||
|
portRange: targetV2.portRange
|
||||||
|
})
|
||||||
|
}))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// Custom schema for validating port range strings
|
// Custom schema for validating port range strings
|
||||||
// Format: "80,443,8000-9000" or "*" for all ports, or empty string
|
// Format: "80,443,8000-9000" or "*" for all ports, or empty string
|
||||||
export const portRangeStringSchema = z
|
export const portRangeStringSchema = z
|
||||||
|
|||||||
@@ -302,8 +302,8 @@ export const configSchema = z
|
|||||||
.optional()
|
.optional()
|
||||||
.default({
|
.default({
|
||||||
block_size: 24,
|
block_size: 24,
|
||||||
subnet_group: "100.90.128.0/24",
|
subnet_group: "100.90.128.0/20",
|
||||||
utility_subnet_group: "100.96.128.0/24"
|
utility_subnet_group: "100.96.128.0/20"
|
||||||
}),
|
}),
|
||||||
rate_limits: z
|
rate_limits: z
|
||||||
.object({
|
.object({
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ import logger from "@server/logger";
|
|||||||
import {
|
import {
|
||||||
generateAliasConfig,
|
generateAliasConfig,
|
||||||
generateRemoteSubnets,
|
generateRemoteSubnets,
|
||||||
generateSubnetProxyTargets,
|
generateSubnetProxyTargetV2,
|
||||||
parseEndpoint,
|
parseEndpoint,
|
||||||
formatEndpoint
|
formatEndpoint
|
||||||
} from "@server/lib/ip";
|
} from "@server/lib/ip";
|
||||||
@@ -660,19 +660,16 @@ async function handleSubnetProxyTargetUpdates(
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (addedClients.length > 0) {
|
if (addedClients.length > 0) {
|
||||||
const targetsToAdd = generateSubnetProxyTargets(
|
const targetToAdd = generateSubnetProxyTargetV2(
|
||||||
siteResource,
|
siteResource,
|
||||||
addedClients
|
addedClients
|
||||||
);
|
);
|
||||||
|
|
||||||
if (targetsToAdd.length > 0) {
|
if (targetToAdd) {
|
||||||
logger.info(
|
|
||||||
`Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
|
||||||
);
|
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
addSubnetProxyTargets(
|
addSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
targetsToAdd,
|
[targetToAdd],
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -700,19 +697,16 @@ async function handleSubnetProxyTargetUpdates(
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (removedClients.length > 0) {
|
if (removedClients.length > 0) {
|
||||||
const targetsToRemove = generateSubnetProxyTargets(
|
const targetToRemove = generateSubnetProxyTargetV2(
|
||||||
siteResource,
|
siteResource,
|
||||||
removedClients
|
removedClients
|
||||||
);
|
);
|
||||||
|
|
||||||
if (targetsToRemove.length > 0) {
|
if (targetToRemove) {
|
||||||
logger.info(
|
|
||||||
`Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
|
||||||
);
|
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
removeSubnetProxyTargets(
|
removeSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
targetsToRemove,
|
[targetToRemove],
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -1169,7 +1163,7 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const resource of resources) {
|
for (const resource of resources) {
|
||||||
const targets = generateSubnetProxyTargets(resource, [
|
const target = generateSubnetProxyTargetV2(resource, [
|
||||||
{
|
{
|
||||||
clientId: client.clientId,
|
clientId: client.clientId,
|
||||||
pubKey: client.pubKey,
|
pubKey: client.pubKey,
|
||||||
@@ -1177,11 +1171,11 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (targets.length > 0) {
|
if (target) {
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
addSubnetProxyTargets(
|
addSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
targets,
|
[target],
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -1246,7 +1240,7 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const resource of resources) {
|
for (const resource of resources) {
|
||||||
const targets = generateSubnetProxyTargets(resource, [
|
const target = generateSubnetProxyTargetV2(resource, [
|
||||||
{
|
{
|
||||||
clientId: client.clientId,
|
clientId: client.clientId,
|
||||||
pubKey: client.pubKey,
|
pubKey: client.pubKey,
|
||||||
@@ -1254,11 +1248,11 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (targets.length > 0) {
|
if (target) {
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
removeSubnetProxyTargets(
|
removeSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
targets,
|
[target],
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|||||||
22
server/lib/tokenCache.ts
Normal file
22
server/lib/tokenCache.ts
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
/**
|
||||||
|
* Returns a cached plaintext token from Redis if one exists and decrypts
|
||||||
|
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
||||||
|
* encrypted value in Redis with the given TTL, and returns it.
|
||||||
|
*
|
||||||
|
* Failures at the Redis layer are non-fatal – the function always falls
|
||||||
|
* through to session creation so the caller is never blocked by a Redis outage.
|
||||||
|
*
|
||||||
|
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
||||||
|
* @param secret Server secret used for AES encryption/decryption
|
||||||
|
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
||||||
|
* @param createSession Factory that mints a new session and returns its raw token
|
||||||
|
*/
|
||||||
|
export async function getOrCreateCachedToken(
|
||||||
|
cacheKey: string,
|
||||||
|
secret: string,
|
||||||
|
ttlSeconds: number,
|
||||||
|
createSession: () => Promise<string>
|
||||||
|
): Promise<string> {
|
||||||
|
const token = await createSession();
|
||||||
|
return token;
|
||||||
|
}
|
||||||
@@ -15,8 +15,10 @@ import { rateLimitService } from "#private/lib/rateLimit";
|
|||||||
import { cleanup as wsCleanup } from "#private/routers/ws";
|
import { cleanup as wsCleanup } from "#private/routers/ws";
|
||||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||||
|
import { stopPingAccumulator } from "@server/routers/newt/pingAccumulator";
|
||||||
|
|
||||||
async function cleanup() {
|
async function cleanup() {
|
||||||
|
await stopPingAccumulator();
|
||||||
await flushBandwidthToDb();
|
await flushBandwidthToDb();
|
||||||
await flushSiteBandwidthToDb();
|
await flushSiteBandwidthToDb();
|
||||||
await rateLimitService.cleanup();
|
await rateLimitService.cleanup();
|
||||||
|
|||||||
@@ -1,3 +1,16 @@
|
|||||||
|
/*
|
||||||
|
* This file is part of a proprietary work.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2025 Fossorial, Inc.
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* This file is licensed under the Fossorial Commercial License.
|
||||||
|
* You may not use this file except in compliance with the License.
|
||||||
|
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||||
|
*
|
||||||
|
* This file is not licensed under the AGPLv3.
|
||||||
|
*/
|
||||||
|
|
||||||
import NodeCache from "node-cache";
|
import NodeCache from "node-cache";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { redisManager } from "@server/private/lib/redis";
|
import { redisManager } from "@server/private/lib/redis";
|
||||||
@@ -24,23 +37,31 @@ setInterval(() => {
|
|||||||
*/
|
*/
|
||||||
class AdaptiveCache {
|
class AdaptiveCache {
|
||||||
private useRedis(): boolean {
|
private useRedis(): boolean {
|
||||||
return redisManager.isRedisEnabled() && redisManager.getHealthStatus().isHealthy;
|
return (
|
||||||
|
redisManager.isRedisEnabled() &&
|
||||||
|
redisManager.getHealthStatus().isHealthy
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set a value in the cache
|
* Set a value in the cache
|
||||||
* @param key - Cache key
|
* @param key - Cache key
|
||||||
* @param value - Value to cache (will be JSON stringified for Redis)
|
* @param value - Value to cache (will be JSON stringified for Redis)
|
||||||
* @param ttl - Time to live in seconds (0 = no expiration)
|
* @param ttl - Time to live in seconds (0 = no expiration; omit = 3600s for Redis)
|
||||||
* @returns boolean indicating success
|
* @returns boolean indicating success
|
||||||
*/
|
*/
|
||||||
async set(key: string, value: any, ttl?: number): Promise<boolean> {
|
async set(key: string, value: any, ttl?: number): Promise<boolean> {
|
||||||
const effectiveTtl = ttl === 0 ? undefined : ttl;
|
const effectiveTtl = ttl === 0 ? undefined : ttl;
|
||||||
|
const redisTtl = ttl === 0 ? undefined : (ttl ?? 3600);
|
||||||
|
|
||||||
if (this.useRedis()) {
|
if (this.useRedis()) {
|
||||||
try {
|
try {
|
||||||
const serialized = JSON.stringify(value);
|
const serialized = JSON.stringify(value);
|
||||||
const success = await redisManager.set(key, serialized, effectiveTtl);
|
const success = await redisManager.set(
|
||||||
|
key,
|
||||||
|
serialized,
|
||||||
|
redisTtl
|
||||||
|
);
|
||||||
|
|
||||||
if (success) {
|
if (success) {
|
||||||
logger.debug(`Set key in Redis: ${key}`);
|
logger.debug(`Set key in Redis: ${key}`);
|
||||||
@@ -48,7 +69,9 @@ class AdaptiveCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Redis failed, fall through to local cache
|
// Redis failed, fall through to local cache
|
||||||
logger.debug(`Redis set failed for key ${key}, falling back to local cache`);
|
logger.debug(
|
||||||
|
`Redis set failed for key ${key}, falling back to local cache`
|
||||||
|
);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(`Redis set error for key ${key}:`, error);
|
logger.error(`Redis set error for key ${key}:`, error);
|
||||||
// Fall through to local cache
|
// Fall through to local cache
|
||||||
@@ -120,9 +143,14 @@ class AdaptiveCache {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Some Redis deletes failed, fall through to local cache
|
// Some Redis deletes failed, fall through to local cache
|
||||||
logger.debug(`Some Redis deletes failed, falling back to local cache`);
|
logger.debug(
|
||||||
|
`Some Redis deletes failed, falling back to local cache`
|
||||||
|
);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(`Redis del error for keys ${keys.join(", ")}:`, error);
|
logger.error(
|
||||||
|
`Redis del error for keys ${keys.join(", ")}:`,
|
||||||
|
error
|
||||||
|
);
|
||||||
// Fall through to local cache
|
// Fall through to local cache
|
||||||
deletedCount = 0;
|
deletedCount = 0;
|
||||||
}
|
}
|
||||||
@@ -195,7 +223,9 @@ class AdaptiveCache {
|
|||||||
*/
|
*/
|
||||||
async flushAll(): Promise<void> {
|
async flushAll(): Promise<void> {
|
||||||
if (this.useRedis()) {
|
if (this.useRedis()) {
|
||||||
logger.warn("Adaptive cache flushAll called - Redis flush not implemented, only local cache will be flushed");
|
logger.warn(
|
||||||
|
"Adaptive cache flushAll called - Redis flush not implemented, only local cache will be flushed"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
localCache.flushAll();
|
localCache.flushAll();
|
||||||
@@ -239,7 +269,9 @@ class AdaptiveCache {
|
|||||||
getTtl(key: string): number {
|
getTtl(key: string): number {
|
||||||
// Note: This only works for local cache, Redis TTL is not supported
|
// Note: This only works for local cache, Redis TTL is not supported
|
||||||
if (this.useRedis()) {
|
if (this.useRedis()) {
|
||||||
logger.warn(`getTtl called for key ${key} but Redis TTL lookup is not implemented`);
|
logger.warn(
|
||||||
|
`getTtl called for key ${key} but Redis TTL lookup is not implemented`
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const ttl = localCache.getTtl(key);
|
const ttl = localCache.getTtl(key);
|
||||||
@@ -255,7 +287,9 @@ class AdaptiveCache {
|
|||||||
*/
|
*/
|
||||||
keys(): string[] {
|
keys(): string[] {
|
||||||
if (this.useRedis()) {
|
if (this.useRedis()) {
|
||||||
logger.warn("keys() called but Redis keys are not included, only local cache keys returned");
|
logger.warn(
|
||||||
|
"keys() called but Redis keys are not included, only local cache keys returned"
|
||||||
|
);
|
||||||
}
|
}
|
||||||
return localCache.keys();
|
return localCache.keys();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -57,7 +57,10 @@ export const privateConfigSchema = z.object({
|
|||||||
.object({
|
.object({
|
||||||
host: z.string(),
|
host: z.string(),
|
||||||
port: portSchema,
|
port: portSchema,
|
||||||
password: z.string().optional(),
|
password: z
|
||||||
|
.string()
|
||||||
|
.optional()
|
||||||
|
.transform(getEnvOrYaml("REDIS_PASSWORD")),
|
||||||
db: z.int().nonnegative().optional().default(0),
|
db: z.int().nonnegative().optional().default(0),
|
||||||
replicas: z
|
replicas: z
|
||||||
.array(
|
.array(
|
||||||
|
|||||||
77
server/private/lib/tokenCache.ts
Normal file
77
server/private/lib/tokenCache.ts
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
/*
|
||||||
|
* This file is part of a proprietary work.
|
||||||
|
*
|
||||||
|
* Copyright (c) 2025 Fossorial, Inc.
|
||||||
|
* All rights reserved.
|
||||||
|
*
|
||||||
|
* This file is licensed under the Fossorial Commercial License.
|
||||||
|
* You may not use this file except in compliance with the License.
|
||||||
|
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
|
||||||
|
*
|
||||||
|
* This file is not licensed under the AGPLv3.
|
||||||
|
*/
|
||||||
|
|
||||||
|
import redisManager from "#private/lib/redis";
|
||||||
|
import { encrypt, decrypt } from "@server/lib/crypto";
|
||||||
|
import logger from "@server/logger";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns a cached plaintext token from Redis if one exists and decrypts
|
||||||
|
* cleanly, otherwise calls `createSession` to mint a fresh token, stores the
|
||||||
|
* encrypted value in Redis with the given TTL, and returns it.
|
||||||
|
*
|
||||||
|
* Failures at the Redis layer are non-fatal – the function always falls
|
||||||
|
* through to session creation so the caller is never blocked by a Redis outage.
|
||||||
|
*
|
||||||
|
* @param cacheKey Unique Redis key, e.g. `"newt:token_cache:abc123"`
|
||||||
|
* @param secret Server secret used for AES encryption/decryption
|
||||||
|
* @param ttlSeconds Cache TTL in seconds (should match session expiry)
|
||||||
|
* @param createSession Factory that mints a new session and returns its raw token
|
||||||
|
*/
|
||||||
|
export async function getOrCreateCachedToken(
|
||||||
|
cacheKey: string,
|
||||||
|
secret: string,
|
||||||
|
ttlSeconds: number,
|
||||||
|
createSession: () => Promise<string>
|
||||||
|
): Promise<string> {
|
||||||
|
if (redisManager.isRedisEnabled()) {
|
||||||
|
try {
|
||||||
|
const cached = await redisManager.get(cacheKey);
|
||||||
|
if (cached) {
|
||||||
|
const token = decrypt(cached, secret);
|
||||||
|
if (token) {
|
||||||
|
logger.debug(`Token cache hit for key: ${cacheKey}`);
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
// Decryption produced an empty string – treat as a miss
|
||||||
|
logger.warn(
|
||||||
|
`Token cache decryption returned empty string for key: ${cacheKey}, treating as miss`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
} catch (e) {
|
||||||
|
logger.warn(
|
||||||
|
`Token cache read/decrypt failed for key ${cacheKey}, falling through to session creation:`,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const token = await createSession();
|
||||||
|
|
||||||
|
if (redisManager.isRedisEnabled()) {
|
||||||
|
try {
|
||||||
|
const encrypted = encrypt(token, secret);
|
||||||
|
await redisManager.set(cacheKey, encrypted, ttlSeconds);
|
||||||
|
logger.debug(
|
||||||
|
`Token cached in Redis for key: ${cacheKey} (TTL ${ttlSeconds}s)`
|
||||||
|
);
|
||||||
|
} catch (e) {
|
||||||
|
logger.warn(
|
||||||
|
`Token cache write failed for key ${cacheKey} (session was still created):`,
|
||||||
|
e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return token;
|
||||||
|
}
|
||||||
@@ -15,6 +15,7 @@ import { verifySessionRemoteExitNodeMiddleware } from "#private/middlewares/veri
|
|||||||
import { Router } from "express";
|
import { Router } from "express";
|
||||||
import {
|
import {
|
||||||
db,
|
db,
|
||||||
|
logsDb,
|
||||||
exitNodes,
|
exitNodes,
|
||||||
Resource,
|
Resource,
|
||||||
ResourcePassword,
|
ResourcePassword,
|
||||||
@@ -1885,7 +1886,7 @@ hybridRouter.post(
|
|||||||
const batchSize = 100;
|
const batchSize = 100;
|
||||||
for (let i = 0; i < logEntries.length; i += batchSize) {
|
for (let i = 0; i < logEntries.length; i += batchSize) {
|
||||||
const batch = logEntries.slice(i, i + batchSize);
|
const batch = logEntries.slice(i, i + batchSize);
|
||||||
await db.insert(requestAuditLog).values(batch);
|
await logsDb.insert(requestAuditLog).values(batch);
|
||||||
}
|
}
|
||||||
|
|
||||||
return response(res, {
|
return response(res, {
|
||||||
|
|||||||
@@ -23,8 +23,10 @@ import { z } from "zod";
|
|||||||
import { fromError } from "zod-validation-error";
|
import { fromError } from "zod-validation-error";
|
||||||
import {
|
import {
|
||||||
createRemoteExitNodeSession,
|
createRemoteExitNodeSession,
|
||||||
validateRemoteExitNodeSessionToken
|
validateRemoteExitNodeSessionToken,
|
||||||
|
EXPIRES
|
||||||
} from "#private/auth/sessions/remoteExitNode";
|
} from "#private/auth/sessions/remoteExitNode";
|
||||||
|
import { getOrCreateCachedToken } from "@server/private/lib/tokenCache";
|
||||||
import { verifyPassword } from "@server/auth/password";
|
import { verifyPassword } from "@server/auth/password";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
@@ -103,14 +105,23 @@ export async function getRemoteExitNodeToken(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const resToken = generateSessionToken();
|
// Return a cached token if one exists to prevent thundering herd on
|
||||||
await createRemoteExitNodeSession(
|
// simultaneous restarts; falls back to creating a fresh session when
|
||||||
resToken,
|
// Redis is unavailable or the cache has expired.
|
||||||
existingRemoteExitNode.remoteExitNodeId
|
const resToken = await getOrCreateCachedToken(
|
||||||
|
`remote_exit_node:token_cache:${existingRemoteExitNode.remoteExitNodeId}`,
|
||||||
|
config.getRawConfig().server.secret!,
|
||||||
|
Math.floor(EXPIRES / 1000),
|
||||||
|
async () => {
|
||||||
|
const token = generateSessionToken();
|
||||||
|
await createRemoteExitNodeSession(
|
||||||
|
token,
|
||||||
|
existingRemoteExitNode.remoteExitNodeId
|
||||||
|
);
|
||||||
|
return token;
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
|
||||||
// logger.debug(`Created RemoteExitNode token response: ${JSON.stringify(resToken)}`);
|
|
||||||
|
|
||||||
return response<{ token: string }>(res, {
|
return response<{ token: string }>(res, {
|
||||||
data: {
|
data: {
|
||||||
token: resToken
|
token: resToken
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ export const startRemoteExitNodeOfflineChecker = (): void => {
|
|||||||
);
|
);
|
||||||
|
|
||||||
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
|
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
|
||||||
const newlyOfflineNodes = await db
|
const offlineNodes = await db
|
||||||
.update(exitNodes)
|
.update(exitNodes)
|
||||||
.set({ online: false })
|
.set({ online: false })
|
||||||
.where(
|
.where(
|
||||||
@@ -53,32 +53,15 @@ export const startRemoteExitNodeOfflineChecker = (): void => {
|
|||||||
)
|
)
|
||||||
.returning();
|
.returning();
|
||||||
|
|
||||||
// Update the sites to offline if they have not pinged either
|
if (offlineNodes.length > 0) {
|
||||||
const exitNodeIds = newlyOfflineNodes.map(
|
logger.info(
|
||||||
(node) => node.exitNodeId
|
`checkRemoteExitNodeOffline: Marked ${offlineNodes.length} remoteExitNode client(s) offline due to inactivity`
|
||||||
);
|
|
||||||
|
|
||||||
const sitesOnNode = await db
|
|
||||||
.select()
|
|
||||||
.from(sites)
|
|
||||||
.where(
|
|
||||||
and(
|
|
||||||
eq(sites.online, true),
|
|
||||||
inArray(sites.exitNodeId, exitNodeIds)
|
|
||||||
)
|
|
||||||
);
|
);
|
||||||
|
|
||||||
// loop through the sites and process their lastBandwidthUpdate as an iso string and if its more than 1 minute old then mark the site offline
|
for (const offlineClient of offlineNodes) {
|
||||||
for (const site of sitesOnNode) {
|
logger.debug(
|
||||||
if (!site.lastBandwidthUpdate) {
|
`checkRemoteExitNodeOffline: Client ${offlineClient.exitNodeId} marked offline (lastPing: ${offlineClient.lastPing})`
|
||||||
continue;
|
);
|
||||||
}
|
|
||||||
const lastBandwidthUpdate = new Date(site.lastBandwidthUpdate);
|
|
||||||
if (Date.now() - lastBandwidthUpdate.getTime() > 60 * 1000) {
|
|
||||||
await db
|
|
||||||
.update(sites)
|
|
||||||
.set({ online: false })
|
|
||||||
.where(eq(sites.siteId, site.siteId));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ import {
|
|||||||
sites,
|
sites,
|
||||||
userOrgs
|
userOrgs
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
|
import { logAccessAudit } from "#private/lib/logAccessAudit";
|
||||||
import { isLicensedOrSubscribed } from "#private/lib/isLicencedOrSubscribed";
|
import { isLicensedOrSubscribed } from "#private/lib/isLicencedOrSubscribed";
|
||||||
import { tierMatrix } from "@server/lib/billing/tierMatrix";
|
import { tierMatrix } from "@server/lib/billing/tierMatrix";
|
||||||
import response from "@server/lib/response";
|
import response from "@server/lib/response";
|
||||||
@@ -463,6 +464,24 @@ export async function signSshKey(
|
|||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
|
await logAccessAudit({
|
||||||
|
action: true,
|
||||||
|
type: "ssh",
|
||||||
|
orgId: orgId,
|
||||||
|
resourceId: resource.siteResourceId,
|
||||||
|
user: req.user
|
||||||
|
? { username: req.user.username ?? "", userId: req.user.userId }
|
||||||
|
: undefined,
|
||||||
|
metadata: {
|
||||||
|
resourceName: resource.name,
|
||||||
|
siteId: resource.siteId,
|
||||||
|
sshUsername: usernameToUse,
|
||||||
|
sshHost: sshHost
|
||||||
|
},
|
||||||
|
userAgent: req.headers["user-agent"],
|
||||||
|
requestIp: req.ip
|
||||||
|
});
|
||||||
|
|
||||||
return response<SignSshKeyResponse>(res, {
|
return response<SignSshKeyResponse>(res, {
|
||||||
data: {
|
data: {
|
||||||
certificate: cert.certificate,
|
certificate: cert.certificate,
|
||||||
|
|||||||
@@ -19,17 +19,14 @@ import { Socket } from "net";
|
|||||||
import {
|
import {
|
||||||
Newt,
|
Newt,
|
||||||
newts,
|
newts,
|
||||||
NewtSession,
|
|
||||||
olms,
|
|
||||||
Olm,
|
Olm,
|
||||||
OlmSession,
|
olms,
|
||||||
RemoteExitNode,
|
RemoteExitNode,
|
||||||
RemoteExitNodeSession,
|
|
||||||
remoteExitNodes,
|
remoteExitNodes,
|
||||||
sites
|
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
|
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
||||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
@@ -197,11 +194,7 @@ const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
|
|||||||
// Config version tracking map (local to this node, resets on server restart)
|
// Config version tracking map (local to this node, resets on server restart)
|
||||||
const clientConfigVersions: Map<string, number> = new Map();
|
const clientConfigVersions: Map<string, number> = new Map();
|
||||||
|
|
||||||
// Tracks the last Unix timestamp (seconds) at which a ping was flushed to the
|
|
||||||
// DB for a given siteId. Resets on server restart which is fine – the first
|
|
||||||
// ping after startup will always write, re-establishing the online state.
|
|
||||||
const lastPingDbWrite: Map<number, number> = new Map();
|
|
||||||
const PING_DB_WRITE_INTERVAL = 45; // seconds
|
|
||||||
|
|
||||||
// Recovery tracking
|
// Recovery tracking
|
||||||
let isRedisRecoveryInProgress = false;
|
let isRedisRecoveryInProgress = false;
|
||||||
@@ -853,32 +846,16 @@ const setupConnection = async (
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Handle WebSocket protocol-level pings from older newt clients that do
|
|
||||||
// not send application-level "newt/ping" messages. Update the site's
|
|
||||||
// online state and lastPing timestamp so the offline checker treats them
|
|
||||||
// the same as modern newt clients.
|
|
||||||
if (clientType === "newt") {
|
if (clientType === "newt") {
|
||||||
const newtClient = client as Newt;
|
const newtClient = client as Newt;
|
||||||
ws.on("ping", async () => {
|
ws.on("ping", () => {
|
||||||
if (!newtClient.siteId) return;
|
if (!newtClient.siteId) return;
|
||||||
const now = Math.floor(Date.now() / 1000);
|
// Record the ping in the accumulator instead of writing to the
|
||||||
const lastWrite = lastPingDbWrite.get(newtClient.siteId) ?? 0;
|
// database on every WS ping frame. The accumulator flushes all
|
||||||
if (now - lastWrite < PING_DB_WRITE_INTERVAL) return;
|
// pending pings in a single batched UPDATE every ~10s, which
|
||||||
lastPingDbWrite.set(newtClient.siteId, now);
|
// prevents connection pool exhaustion under load (especially
|
||||||
try {
|
// with cross-region latency to the database).
|
||||||
await db
|
recordPing(newtClient.siteId);
|
||||||
.update(sites)
|
|
||||||
.set({
|
|
||||||
online: true,
|
|
||||||
lastPing: now
|
|
||||||
})
|
|
||||||
.where(eq(sites.siteId, newtClient.siteId));
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
"Error updating newt site online state on WS ping",
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -70,7 +70,7 @@ async function getLatestOlmVersion(): Promise<string | null> {
|
|||||||
tags = tags.filter((version) => !version.name.includes("rc"));
|
tags = tags.filter((version) => !version.name.includes("rc"));
|
||||||
const latestVersion = tags[0].name;
|
const latestVersion = tags[0].name;
|
||||||
|
|
||||||
olmVersionCache.set("latestOlmVersion", latestVersion);
|
olmVersionCache.set("latestOlmVersion", latestVersion, 3600);
|
||||||
|
|
||||||
return latestVersion;
|
return latestVersion;
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
|
|||||||
@@ -71,7 +71,7 @@ async function getLatestOlmVersion(): Promise<string | null> {
|
|||||||
tags = tags.filter((version) => !version.name.includes("rc"));
|
tags = tags.filter((version) => !version.name.includes("rc"));
|
||||||
const latestVersion = tags[0].name;
|
const latestVersion = tags[0].name;
|
||||||
|
|
||||||
olmVersionCache.set("latestOlmVersion", latestVersion);
|
olmVersionCache.set("latestOlmVersion", latestVersion, 3600);
|
||||||
|
|
||||||
return latestVersion;
|
return latestVersion;
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
|
|||||||
@@ -1,15 +1,54 @@
|
|||||||
import { sendToClient } from "#dynamic/routers/ws";
|
import { sendToClient } from "#dynamic/routers/ws";
|
||||||
import { db, olms, Transaction } from "@server/db";
|
import { db, newts, olms } from "@server/db";
|
||||||
|
import {
|
||||||
|
Alias,
|
||||||
|
convertSubnetProxyTargetsV2ToV1,
|
||||||
|
SubnetProxyTarget,
|
||||||
|
SubnetProxyTargetV2
|
||||||
|
} from "@server/lib/ip";
|
||||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
import { Alias, SubnetProxyTarget } from "@server/lib/ip";
|
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
|
import semver from "semver";
|
||||||
|
|
||||||
|
const NEWT_V2_TARGETS_VERSION = ">=1.10.3";
|
||||||
|
|
||||||
|
export async function convertTargetsIfNessicary(
|
||||||
|
newtId: string,
|
||||||
|
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[]
|
||||||
|
) {
|
||||||
|
// get the newt
|
||||||
|
const [newt] = await db
|
||||||
|
.select()
|
||||||
|
.from(newts)
|
||||||
|
.where(eq(newts.newtId, newtId));
|
||||||
|
if (!newt) {
|
||||||
|
throw new Error(`No newt found for id: ${newtId}`);
|
||||||
|
}
|
||||||
|
|
||||||
|
// check the semver
|
||||||
|
if (
|
||||||
|
newt.version &&
|
||||||
|
!semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION)
|
||||||
|
) {
|
||||||
|
logger.debug(
|
||||||
|
`addTargets Newt version ${newt.version} does not support targets v2 falling back`
|
||||||
|
);
|
||||||
|
targets = convertSubnetProxyTargetsV2ToV1(
|
||||||
|
targets as SubnetProxyTargetV2[]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
return targets;
|
||||||
|
}
|
||||||
|
|
||||||
export async function addTargets(
|
export async function addTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: SubnetProxyTarget[],
|
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[],
|
||||||
version?: string | null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
|
targets = await convertTargetsIfNessicary(newtId, targets);
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
@@ -22,9 +61,11 @@ export async function addTargets(
|
|||||||
|
|
||||||
export async function removeTargets(
|
export async function removeTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: SubnetProxyTarget[],
|
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[],
|
||||||
version?: string | null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
|
targets = await convertTargetsIfNessicary(newtId, targets);
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
@@ -38,11 +79,39 @@ export async function removeTargets(
|
|||||||
export async function updateTargets(
|
export async function updateTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: {
|
targets: {
|
||||||
oldTargets: SubnetProxyTarget[];
|
oldTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
|
||||||
newTargets: SubnetProxyTarget[];
|
newTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
|
||||||
},
|
},
|
||||||
version?: string | null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
|
// get the newt
|
||||||
|
const [newt] = await db
|
||||||
|
.select()
|
||||||
|
.from(newts)
|
||||||
|
.where(eq(newts.newtId, newtId));
|
||||||
|
if (!newt) {
|
||||||
|
logger.error(`addTargetsL No newt found for id: ${newtId}`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check the semver
|
||||||
|
if (
|
||||||
|
newt.version &&
|
||||||
|
!semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION)
|
||||||
|
) {
|
||||||
|
logger.debug(
|
||||||
|
`addTargets Newt version ${newt.version} does not support targets v2 falling back`
|
||||||
|
);
|
||||||
|
targets = {
|
||||||
|
oldTargets: convertSubnetProxyTargetsV2ToV1(
|
||||||
|
targets.oldTargets as SubnetProxyTargetV2[]
|
||||||
|
),
|
||||||
|
newTargets: convertSubnetProxyTargetsV2ToV1(
|
||||||
|
targets.newTargets as SubnetProxyTargetV2[]
|
||||||
|
)
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -16,8 +16,8 @@ import { eq, and } from "drizzle-orm";
|
|||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
import {
|
import {
|
||||||
formatEndpoint,
|
formatEndpoint,
|
||||||
generateSubnetProxyTargets,
|
generateSubnetProxyTargetV2,
|
||||||
SubnetProxyTarget
|
SubnetProxyTargetV2
|
||||||
} from "@server/lib/ip";
|
} from "@server/lib/ip";
|
||||||
|
|
||||||
export async function buildClientConfigurationForNewtClient(
|
export async function buildClientConfigurationForNewtClient(
|
||||||
@@ -143,7 +143,7 @@ export async function buildClientConfigurationForNewtClient(
|
|||||||
.from(siteResources)
|
.from(siteResources)
|
||||||
.where(eq(siteResources.siteId, siteId));
|
.where(eq(siteResources.siteId, siteId));
|
||||||
|
|
||||||
const targetsToSend: SubnetProxyTarget[] = [];
|
const targetsToSend: SubnetProxyTargetV2[] = [];
|
||||||
|
|
||||||
for (const resource of allSiteResources) {
|
for (const resource of allSiteResources) {
|
||||||
// Get clients associated with this specific resource
|
// Get clients associated with this specific resource
|
||||||
@@ -168,12 +168,14 @@ export async function buildClientConfigurationForNewtClient(
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
const resourceTargets = generateSubnetProxyTargets(
|
const resourceTarget = generateSubnetProxyTargetV2(
|
||||||
resource,
|
resource,
|
||||||
resourceClients
|
resourceClients
|
||||||
);
|
);
|
||||||
|
|
||||||
targetsToSend.push(...resourceTargets);
|
if (resourceTarget) {
|
||||||
|
targetsToSend.push(resourceTarget);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import { generateSessionToken } from "@server/auth/sessions/app";
|
import { generateSessionToken } from "@server/auth/sessions/app";
|
||||||
import { db } from "@server/db";
|
import { db, newtSessions } from "@server/db";
|
||||||
import { newts } from "@server/db";
|
import { newts } from "@server/db";
|
||||||
|
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
||||||
|
import { EXPIRES } from "@server/auth/sessions/newt";
|
||||||
import HttpCode from "@server/types/HttpCode";
|
import HttpCode from "@server/types/HttpCode";
|
||||||
import response from "@server/lib/response";
|
import response from "@server/lib/response";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
@@ -92,8 +94,19 @@ export async function getNewtToken(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
const resToken = generateSessionToken();
|
// Return a cached token if one exists to prevent thundering herd on
|
||||||
await createNewtSession(resToken, existingNewt.newtId);
|
// simultaneous restarts; falls back to creating a fresh session when
|
||||||
|
// Redis is unavailable or the cache has expired.
|
||||||
|
const resToken = await getOrCreateCachedToken(
|
||||||
|
`newt:token_cache:${existingNewt.newtId}`,
|
||||||
|
config.getRawConfig().server.secret!,
|
||||||
|
Math.floor(EXPIRES / 1000),
|
||||||
|
async () => {
|
||||||
|
const token = generateSessionToken();
|
||||||
|
await createNewtSession(token, existingNewt.newtId);
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
return response<{ token: string; serverVersion: string }>(res, {
|
return response<{ token: string; serverVersion: string }>(res, {
|
||||||
data: {
|
data: {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db";
|
|||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
||||||
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
||||||
|
import { convertTargetsIfNessicary } from "../client/targets";
|
||||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
|
||||||
const inputSchema = z.object({
|
const inputSchema = z.object({
|
||||||
@@ -127,13 +128,15 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
|||||||
exitNode
|
exitNode
|
||||||
);
|
);
|
||||||
|
|
||||||
|
const targetsToSend = await convertTargetsIfNessicary(newt.newtId, targets);
|
||||||
|
|
||||||
return {
|
return {
|
||||||
message: {
|
message: {
|
||||||
type: "newt/wg/receive-config",
|
type: "newt/wg/receive-config",
|
||||||
data: {
|
data: {
|
||||||
ipAddress: site.address,
|
ipAddress: site.address,
|
||||||
peers,
|
peers,
|
||||||
targets
|
targets: targetsToSend
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
options: {
|
options: {
|
||||||
|
|||||||
@@ -6,7 +6,9 @@ import logger from "@server/logger";
|
|||||||
/**
|
/**
|
||||||
* Handles disconnecting messages from sites to show disconnected in the ui
|
* Handles disconnecting messages from sites to show disconnected in the ui
|
||||||
*/
|
*/
|
||||||
export const handleNewtDisconnectingMessage: MessageHandler = async (context) => {
|
export const handleNewtDisconnectingMessage: MessageHandler = async (
|
||||||
|
context
|
||||||
|
) => {
|
||||||
const { message, client: c, sendToClient } = context;
|
const { message, client: c, sendToClient } = context;
|
||||||
const newt = c as Newt;
|
const newt = c as Newt;
|
||||||
|
|
||||||
@@ -27,7 +29,7 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (context) =>
|
|||||||
.set({
|
.set({
|
||||||
online: false
|
online: false
|
||||||
})
|
})
|
||||||
.where(eq(sites.siteId, sites.siteId));
|
.where(eq(sites.siteId, newt.siteId));
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Error handling disconnecting message", { error });
|
logger.error("Error handling disconnecting message", { error });
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import { Newt } from "@server/db";
|
|||||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { sendNewtSyncMessage } from "./sync";
|
import { sendNewtSyncMessage } from "./sync";
|
||||||
|
import { recordPing } from "./pingAccumulator";
|
||||||
|
|
||||||
// Track if the offline checker interval is running
|
// Track if the offline checker interval is running
|
||||||
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||||
@@ -114,18 +115,12 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
// Record the ping in memory; it will be flushed to the database
|
||||||
// Mark the site as online and record the ping timestamp.
|
// periodically by the ping accumulator (every ~10s) in a single
|
||||||
await db
|
// batched UPDATE instead of one query per ping. This prevents
|
||||||
.update(sites)
|
// connection pool exhaustion under load, especially with
|
||||||
.set({
|
// cross-region latency to the database.
|
||||||
online: true,
|
recordPing(newt.siteId);
|
||||||
lastPing: Math.floor(Date.now() / 1000)
|
|
||||||
})
|
|
||||||
.where(eq(sites.siteId, newt.siteId));
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Error updating online state on newt ping", { error });
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check config version and sync if stale.
|
// Check config version and sync if stale.
|
||||||
const configVersion = await getClientConfigVersion(newt.newtId);
|
const configVersion = await getClientConfigVersion(newt.newtId);
|
||||||
|
|||||||
382
server/routers/newt/pingAccumulator.ts
Normal file
382
server/routers/newt/pingAccumulator.ts
Normal file
@@ -0,0 +1,382 @@
|
|||||||
|
import { db } from "@server/db";
|
||||||
|
import { sites, clients, olms } from "@server/db";
|
||||||
|
import { eq, inArray } from "drizzle-orm";
|
||||||
|
import logger from "@server/logger";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ping Accumulator
|
||||||
|
*
|
||||||
|
* Instead of writing to the database on every single newt/olm ping (which
|
||||||
|
* causes pool exhaustion under load, especially with cross-region latency),
|
||||||
|
* we accumulate pings in memory and flush them to the database periodically
|
||||||
|
* in a single batch.
|
||||||
|
*
|
||||||
|
* This is the same pattern used for bandwidth flushing in
|
||||||
|
* receiveBandwidth.ts and handleReceiveBandwidthMessage.ts.
|
||||||
|
*
|
||||||
|
* Supports two kinds of pings:
|
||||||
|
* - **Site pings** (from newts): update `sites.online` and `sites.lastPing`
|
||||||
|
* - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`,
|
||||||
|
* `clients.archived`, and optionally reset `olms.archived`
|
||||||
|
*/
|
||||||
|
|
||||||
|
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
||||||
|
const MAX_RETRIES = 2;
|
||||||
|
const BASE_DELAY_MS = 50;
|
||||||
|
|
||||||
|
// ── Site (newt) pings ──────────────────────────────────────────────────
|
||||||
|
// Map of siteId -> latest ping timestamp (unix seconds)
|
||||||
|
const pendingSitePings: Map<number, number> = new Map();
|
||||||
|
|
||||||
|
// ── Client (OLM) pings ────────────────────────────────────────────────
|
||||||
|
// Map of clientId -> latest ping timestamp (unix seconds)
|
||||||
|
const pendingClientPings: Map<number, number> = new Map();
|
||||||
|
// Set of olmIds whose `archived` flag should be reset to false
|
||||||
|
const pendingOlmArchiveResets: Set<string> = new Set();
|
||||||
|
|
||||||
|
let flushTimer: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
|
// ── Public API ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a ping for a newt site. This does NOT write to the database
|
||||||
|
* immediately. Instead it stores the latest ping timestamp in memory,
|
||||||
|
* to be flushed periodically by the background timer.
|
||||||
|
*/
|
||||||
|
export function recordSitePing(siteId: number): void {
|
||||||
|
const now = Math.floor(Date.now() / 1000);
|
||||||
|
pendingSitePings.set(siteId, now);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */
|
||||||
|
export const recordPing = recordSitePing;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Record a ping for an OLM client. Batches the `clients` table update
|
||||||
|
* (`online`, `lastPing`, `archived`) and, when `olmArchived` is true,
|
||||||
|
* also queues an `olms` table update to clear the archived flag.
|
||||||
|
*/
|
||||||
|
export function recordClientPing(
|
||||||
|
clientId: number,
|
||||||
|
olmId: string,
|
||||||
|
olmArchived: boolean
|
||||||
|
): void {
|
||||||
|
const now = Math.floor(Date.now() / 1000);
|
||||||
|
pendingClientPings.set(clientId, now);
|
||||||
|
if (olmArchived) {
|
||||||
|
pendingOlmArchiveResets.add(olmId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Flush Logic ────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flush all accumulated site pings to the database.
|
||||||
|
*/
|
||||||
|
async function flushSitePingsToDb(): Promise<void> {
|
||||||
|
if (pendingSitePings.size === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot and clear so new pings arriving during the flush go into a
|
||||||
|
// fresh map for the next cycle.
|
||||||
|
const pingsToFlush = new Map(pendingSitePings);
|
||||||
|
pendingSitePings.clear();
|
||||||
|
|
||||||
|
// Sort by siteId for consistent lock ordering (prevents deadlocks)
|
||||||
|
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
||||||
|
([a], [b]) => a - b
|
||||||
|
);
|
||||||
|
|
||||||
|
const BATCH_SIZE = 50;
|
||||||
|
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
||||||
|
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await withRetry(async () => {
|
||||||
|
// Group by timestamp for efficient bulk updates
|
||||||
|
const byTimestamp = new Map<number, number[]>();
|
||||||
|
for (const [siteId, timestamp] of batch) {
|
||||||
|
const group = byTimestamp.get(timestamp) || [];
|
||||||
|
group.push(siteId);
|
||||||
|
byTimestamp.set(timestamp, group);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (byTimestamp.size === 1) {
|
||||||
|
const [timestamp, siteIds] = Array.from(
|
||||||
|
byTimestamp.entries()
|
||||||
|
)[0];
|
||||||
|
await db
|
||||||
|
.update(sites)
|
||||||
|
.set({
|
||||||
|
online: true,
|
||||||
|
lastPing: timestamp
|
||||||
|
})
|
||||||
|
.where(inArray(sites.siteId, siteIds));
|
||||||
|
} else {
|
||||||
|
await db.transaction(async (tx) => {
|
||||||
|
for (const [timestamp, siteIds] of byTimestamp) {
|
||||||
|
await tx
|
||||||
|
.update(sites)
|
||||||
|
.set({
|
||||||
|
online: true,
|
||||||
|
lastPing: timestamp
|
||||||
|
})
|
||||||
|
.where(inArray(sites.siteId, siteIds));
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}, "flushSitePingsToDb");
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
||||||
|
{ error }
|
||||||
|
);
|
||||||
|
for (const [siteId, timestamp] of batch) {
|
||||||
|
const existing = pendingSitePings.get(siteId);
|
||||||
|
if (!existing || existing < timestamp) {
|
||||||
|
pendingSitePings.set(siteId, timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flush all accumulated client (OLM) pings to the database.
|
||||||
|
*/
|
||||||
|
async function flushClientPingsToDb(): Promise<void> {
|
||||||
|
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Snapshot and clear
|
||||||
|
const pingsToFlush = new Map(pendingClientPings);
|
||||||
|
pendingClientPings.clear();
|
||||||
|
|
||||||
|
const olmResetsToFlush = new Set(pendingOlmArchiveResets);
|
||||||
|
pendingOlmArchiveResets.clear();
|
||||||
|
|
||||||
|
// ── Flush client pings ─────────────────────────────────────────────
|
||||||
|
if (pingsToFlush.size > 0) {
|
||||||
|
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
||||||
|
([a], [b]) => a - b
|
||||||
|
);
|
||||||
|
|
||||||
|
const BATCH_SIZE = 50;
|
||||||
|
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
||||||
|
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await withRetry(async () => {
|
||||||
|
const byTimestamp = new Map<number, number[]>();
|
||||||
|
for (const [clientId, timestamp] of batch) {
|
||||||
|
const group = byTimestamp.get(timestamp) || [];
|
||||||
|
group.push(clientId);
|
||||||
|
byTimestamp.set(timestamp, group);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (byTimestamp.size === 1) {
|
||||||
|
const [timestamp, clientIds] = Array.from(
|
||||||
|
byTimestamp.entries()
|
||||||
|
)[0];
|
||||||
|
await db
|
||||||
|
.update(clients)
|
||||||
|
.set({
|
||||||
|
lastPing: timestamp,
|
||||||
|
online: true,
|
||||||
|
archived: false
|
||||||
|
})
|
||||||
|
.where(inArray(clients.clientId, clientIds));
|
||||||
|
} else {
|
||||||
|
await db.transaction(async (tx) => {
|
||||||
|
for (const [timestamp, clientIds] of byTimestamp) {
|
||||||
|
await tx
|
||||||
|
.update(clients)
|
||||||
|
.set({
|
||||||
|
lastPing: timestamp,
|
||||||
|
online: true,
|
||||||
|
archived: false
|
||||||
|
})
|
||||||
|
.where(
|
||||||
|
inArray(clients.clientId, clientIds)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}, "flushClientPingsToDb");
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`,
|
||||||
|
{ error }
|
||||||
|
);
|
||||||
|
for (const [clientId, timestamp] of batch) {
|
||||||
|
const existing = pendingClientPings.get(clientId);
|
||||||
|
if (!existing || existing < timestamp) {
|
||||||
|
pendingClientPings.set(clientId, timestamp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Flush OLM archive resets ───────────────────────────────────────
|
||||||
|
if (olmResetsToFlush.size > 0) {
|
||||||
|
const olmIds = Array.from(olmResetsToFlush).sort();
|
||||||
|
|
||||||
|
const BATCH_SIZE = 50;
|
||||||
|
for (let i = 0; i < olmIds.length; i += BATCH_SIZE) {
|
||||||
|
const batch = olmIds.slice(i, i + BATCH_SIZE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
await withRetry(async () => {
|
||||||
|
await db
|
||||||
|
.update(olms)
|
||||||
|
.set({ archived: false })
|
||||||
|
.where(inArray(olms.olmId, batch));
|
||||||
|
}, "flushOlmArchiveResets");
|
||||||
|
} catch (error) {
|
||||||
|
logger.error(
|
||||||
|
`Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`,
|
||||||
|
{ error }
|
||||||
|
);
|
||||||
|
for (const olmId of batch) {
|
||||||
|
pendingOlmArchiveResets.add(olmId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Flush everything — called by the interval timer and during shutdown.
|
||||||
|
*/
|
||||||
|
export async function flushPingsToDb(): Promise<void> {
|
||||||
|
await flushSitePingsToDb();
|
||||||
|
await flushClientPingsToDb();
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Retry / Error Helpers ──────────────────────────────────────────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Simple retry wrapper with exponential backoff for transient errors
|
||||||
|
* (connection timeouts, unexpected disconnects).
|
||||||
|
*/
|
||||||
|
async function withRetry<T>(
|
||||||
|
operation: () => Promise<T>,
|
||||||
|
context: string
|
||||||
|
): Promise<T> {
|
||||||
|
let attempt = 0;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return await operation();
|
||||||
|
} catch (error: any) {
|
||||||
|
if (isTransientError(error) && attempt < MAX_RETRIES) {
|
||||||
|
attempt++;
|
||||||
|
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
||||||
|
const jitter = Math.random() * baseDelay;
|
||||||
|
const delay = baseDelay + jitter;
|
||||||
|
logger.warn(
|
||||||
|
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
||||||
|
);
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
throw error;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Detect transient connection errors that are safe to retry.
|
||||||
|
*/
|
||||||
|
function isTransientError(error: any): boolean {
|
||||||
|
if (!error) return false;
|
||||||
|
|
||||||
|
const message = (error.message || "").toLowerCase();
|
||||||
|
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||||
|
const code = error.code || "";
|
||||||
|
|
||||||
|
// Connection timeout / terminated
|
||||||
|
if (
|
||||||
|
message.includes("connection timeout") ||
|
||||||
|
message.includes("connection terminated") ||
|
||||||
|
message.includes("timeout exceeded when trying to connect") ||
|
||||||
|
causeMessage.includes("connection terminated unexpectedly") ||
|
||||||
|
causeMessage.includes("connection timeout")
|
||||||
|
) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// PostgreSQL deadlock
|
||||||
|
if (code === "40P01" || message.includes("deadlock")) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ECONNRESET, ECONNREFUSED, EPIPE
|
||||||
|
if (
|
||||||
|
code === "ECONNRESET" ||
|
||||||
|
code === "ECONNREFUSED" ||
|
||||||
|
code === "EPIPE" ||
|
||||||
|
code === "ETIMEDOUT"
|
||||||
|
) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Lifecycle ──────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start the background flush timer. Call this once at server startup.
|
||||||
|
*/
|
||||||
|
export function startPingAccumulator(): void {
|
||||||
|
if (flushTimer) {
|
||||||
|
return; // Already running
|
||||||
|
}
|
||||||
|
|
||||||
|
flushTimer = setInterval(async () => {
|
||||||
|
try {
|
||||||
|
await flushPingsToDb();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error("Unhandled error in ping accumulator flush", {
|
||||||
|
error
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}, FLUSH_INTERVAL_MS);
|
||||||
|
|
||||||
|
// Don't prevent the process from exiting
|
||||||
|
flushTimer.unref();
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
`Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stop the background flush timer and perform a final flush.
|
||||||
|
* Call this during graceful shutdown.
|
||||||
|
*/
|
||||||
|
export async function stopPingAccumulator(): Promise<void> {
|
||||||
|
if (flushTimer) {
|
||||||
|
clearInterval(flushTimer);
|
||||||
|
flushTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Final flush to persist any remaining pings
|
||||||
|
try {
|
||||||
|
await flushPingsToDb();
|
||||||
|
} catch (error) {
|
||||||
|
logger.error("Error during final ping accumulator flush", { error });
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info("Ping accumulator stopped");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the number of pending (unflushed) pings. Useful for monitoring.
|
||||||
|
*/
|
||||||
|
export function getPendingPingCount(): number {
|
||||||
|
return pendingSitePings.size + pendingClientPings.size;
|
||||||
|
}
|
||||||
@@ -8,7 +8,7 @@ import {
|
|||||||
ExitNode,
|
ExitNode,
|
||||||
exitNodes,
|
exitNodes,
|
||||||
sites,
|
sites,
|
||||||
clientSitesAssociationsCache
|
clientSitesAssociationsCache,
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import { olms } from "@server/db";
|
import { olms } from "@server/db";
|
||||||
import HttpCode from "@server/types/HttpCode";
|
import HttpCode from "@server/types/HttpCode";
|
||||||
@@ -20,8 +20,10 @@ import { z } from "zod";
|
|||||||
import { fromError } from "zod-validation-error";
|
import { fromError } from "zod-validation-error";
|
||||||
import {
|
import {
|
||||||
createOlmSession,
|
createOlmSession,
|
||||||
validateOlmSessionToken
|
validateOlmSessionToken,
|
||||||
|
EXPIRES
|
||||||
} from "@server/auth/sessions/olm";
|
} from "@server/auth/sessions/olm";
|
||||||
|
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
||||||
import { verifyPassword } from "@server/auth/password";
|
import { verifyPassword } from "@server/auth/password";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
@@ -132,8 +134,19 @@ export async function getOlmToken(
|
|||||||
|
|
||||||
logger.debug("Creating new olm session token");
|
logger.debug("Creating new olm session token");
|
||||||
|
|
||||||
const resToken = generateSessionToken();
|
// Return a cached token if one exists to prevent thundering herd on
|
||||||
await createOlmSession(resToken, existingOlm.olmId);
|
// simultaneous restarts; falls back to creating a fresh session when
|
||||||
|
// Redis is unavailable or the cache has expired.
|
||||||
|
const resToken = await getOrCreateCachedToken(
|
||||||
|
`olm:token_cache:${existingOlm.olmId}`,
|
||||||
|
config.getRawConfig().server.secret!,
|
||||||
|
Math.floor(EXPIRES / 1000),
|
||||||
|
async () => {
|
||||||
|
const token = generateSessionToken();
|
||||||
|
await createOlmSession(token, existingOlm.olmId);
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
let clientIdToUse;
|
let clientIdToUse;
|
||||||
if (orgId) {
|
if (orgId) {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { db } from "@server/db";
|
|||||||
import { MessageHandler } from "@server/routers/ws";
|
import { MessageHandler } from "@server/routers/ws";
|
||||||
import { clients, olms, Olm } from "@server/db";
|
import { clients, olms, Olm } from "@server/db";
|
||||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||||
|
import { recordClientPing } from "@server/routers/newt/pingAccumulator";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { validateSessionToken } from "@server/auth/sessions/app";
|
import { validateSessionToken } from "@server/auth/sessions/app";
|
||||||
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
|
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
|
||||||
@@ -201,22 +202,12 @@ export const handleOlmPingMessage: MessageHandler = async (context) => {
|
|||||||
await sendOlmSyncMessage(olm, client);
|
await sendOlmSyncMessage(olm, client);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the client's last ping timestamp
|
// Record the ping in memory; it will be flushed to the database
|
||||||
await db
|
// periodically by the ping accumulator (every ~10s) in a single
|
||||||
.update(clients)
|
// batched UPDATE instead of one query per ping. This prevents
|
||||||
.set({
|
// connection pool exhaustion under load, especially with
|
||||||
lastPing: Math.floor(Date.now() / 1000),
|
// cross-region latency to the database.
|
||||||
online: true,
|
recordClientPing(olm.clientId, olm.olmId, !!olm.archived);
|
||||||
archived: false
|
|
||||||
})
|
|
||||||
.where(eq(clients.clientId, olm.clientId));
|
|
||||||
|
|
||||||
if (olm.archived) {
|
|
||||||
await db
|
|
||||||
.update(olms)
|
|
||||||
.set({ archived: false })
|
|
||||||
.where(eq(olms.olmId, olm.olmId));
|
|
||||||
}
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Error handling ping message", { error });
|
logger.error("Error handling ping message", { error });
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -55,7 +55,7 @@ async function getLatestNewtVersion(): Promise<string | null> {
|
|||||||
tags = tags.filter((version) => !version.name.includes("rc"));
|
tags = tags.filter((version) => !version.name.includes("rc"));
|
||||||
const latestVersion = tags[0].name;
|
const latestVersion = tags[0].name;
|
||||||
|
|
||||||
await cache.set("latestNewtVersion", latestVersion);
|
await cache.set("latestNewtVersion", latestVersion, 3600);
|
||||||
|
|
||||||
return latestVersion;
|
return latestVersion;
|
||||||
} catch (error: any) {
|
} catch (error: any) {
|
||||||
@@ -180,7 +180,7 @@ registry.registerPath({
|
|||||||
method: "get",
|
method: "get",
|
||||||
path: "/org/{orgId}/sites",
|
path: "/org/{orgId}/sites",
|
||||||
description: "List all sites in an organization",
|
description: "List all sites in an organization",
|
||||||
tags: [OpenAPITags.Site],
|
tags: [OpenAPITags.Org, OpenAPITags.Site],
|
||||||
request: {
|
request: {
|
||||||
params: listSitesParamsSchema,
|
params: listSitesParamsSchema,
|
||||||
query: listSitesSchema
|
query: listSitesSchema
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ const createSiteResourceSchema = z
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
message:
|
message:
|
||||||
"Destination must be a valid IP address or valid domain AND alias is required"
|
"Destination must be a valid IPV4 address or valid domain AND alias is required"
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
.refine(
|
.refine(
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ import { updatePeerData, updateTargets } from "@server/routers/client/targets";
|
|||||||
import {
|
import {
|
||||||
generateAliasConfig,
|
generateAliasConfig,
|
||||||
generateRemoteSubnets,
|
generateRemoteSubnets,
|
||||||
generateSubnetProxyTargets,
|
generateSubnetProxyTargetV2,
|
||||||
isIpInCidr,
|
isIpInCidr,
|
||||||
portRangeStringSchema
|
portRangeStringSchema
|
||||||
} from "@server/lib/ip";
|
} from "@server/lib/ip";
|
||||||
@@ -608,18 +608,18 @@ export async function handleMessagingForUpdatedSiteResource(
|
|||||||
|
|
||||||
// Only update targets on newt if destination changed
|
// Only update targets on newt if destination changed
|
||||||
if (destinationChanged || portRangesChanged) {
|
if (destinationChanged || portRangesChanged) {
|
||||||
const oldTargets = generateSubnetProxyTargets(
|
const oldTarget = generateSubnetProxyTargetV2(
|
||||||
existingSiteResource,
|
existingSiteResource,
|
||||||
mergedAllClients
|
mergedAllClients
|
||||||
);
|
);
|
||||||
const newTargets = generateSubnetProxyTargets(
|
const newTarget = generateSubnetProxyTargetV2(
|
||||||
updatedSiteResource,
|
updatedSiteResource,
|
||||||
mergedAllClients
|
mergedAllClients
|
||||||
);
|
);
|
||||||
|
|
||||||
await updateTargets(newt.newtId, {
|
await updateTargets(newt.newtId, {
|
||||||
oldTargets: oldTargets,
|
oldTargets: oldTarget ? [oldTarget] : [],
|
||||||
newTargets: newTargets
|
newTargets: newTarget ? [newTarget] : []
|
||||||
}, newt.version);
|
}, newt.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -201,7 +201,7 @@ export async function inviteUser(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
await cache.set(email, attempts + 1);
|
await cache.set("regenerateInvite:" + email, attempts + 1, 3600);
|
||||||
|
|
||||||
const inviteId = existingInvite[0].inviteId; // Retrieve the original inviteId
|
const inviteId = existingInvite[0].inviteId; // Retrieve the original inviteId
|
||||||
const token = generateRandomString(
|
const token = generateRandomString(
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import {
|
|||||||
startNewtOfflineChecker,
|
startNewtOfflineChecker,
|
||||||
handleNewtDisconnectingMessage
|
handleNewtDisconnectingMessage
|
||||||
} from "../newt";
|
} from "../newt";
|
||||||
|
import { startPingAccumulator } from "../newt/pingAccumulator";
|
||||||
import {
|
import {
|
||||||
handleOlmRegisterMessage,
|
handleOlmRegisterMessage,
|
||||||
handleOlmRelayMessage,
|
handleOlmRelayMessage,
|
||||||
@@ -46,6 +47,10 @@ export const messageHandlers: Record<string, MessageHandler> = {
|
|||||||
"ws/round-trip/complete": handleRoundTripMessage
|
"ws/round-trip/complete": handleRoundTripMessage
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Start the ping accumulator for all builds — it batches per-site online/lastPing
|
||||||
|
// updates into periodic bulk writes, preventing connection pool exhaustion.
|
||||||
|
startPingAccumulator();
|
||||||
|
|
||||||
if (build != "saas") {
|
if (build != "saas") {
|
||||||
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
startOlmOfflineChecker(); // this is to handle the offline check for olms
|
||||||
startNewtOfflineChecker(); // this is to handle the offline check for newts
|
startNewtOfflineChecker(); // this is to handle the offline check for newts
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import { Socket } from "net";
|
|||||||
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
import { Newt, newts, NewtSession, olms, Olm, OlmSession, sites } from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
|
import { recordPing } from "@server/routers/newt/pingAccumulator";
|
||||||
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
import { validateNewtSessionToken } from "@server/auth/sessions/newt";
|
||||||
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
import { validateOlmSessionToken } from "@server/auth/sessions/olm";
|
||||||
import { messageHandlers } from "./messageHandlers";
|
import { messageHandlers } from "./messageHandlers";
|
||||||
@@ -386,22 +387,14 @@ const setupConnection = async (
|
|||||||
// the same as modern newt clients.
|
// the same as modern newt clients.
|
||||||
if (clientType === "newt") {
|
if (clientType === "newt") {
|
||||||
const newtClient = client as Newt;
|
const newtClient = client as Newt;
|
||||||
ws.on("ping", async () => {
|
ws.on("ping", () => {
|
||||||
if (!newtClient.siteId) return;
|
if (!newtClient.siteId) return;
|
||||||
try {
|
// Record the ping in the accumulator instead of writing to the
|
||||||
await db
|
// database on every WS ping frame. The accumulator flushes all
|
||||||
.update(sites)
|
// pending pings in a single batched UPDATE every ~10s, which
|
||||||
.set({
|
// prevents connection pool exhaustion under load (especially
|
||||||
online: true,
|
// with cross-region latency to the database).
|
||||||
lastPing: Math.floor(Date.now() / 1000)
|
recordPing(newtClient.siteId);
|
||||||
})
|
|
||||||
.where(eq(sites.siteId, newtClient.siteId));
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
"Error updating newt site online state on WS ping",
|
|
||||||
{ error }
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -275,6 +275,8 @@ export default function Page() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const disabled = !isPaidUser(tierMatrix.orgOidc);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
<>
|
<>
|
||||||
<div className="flex justify-between">
|
<div className="flex justify-between">
|
||||||
@@ -292,6 +294,9 @@ export default function Page() {
|
|||||||
</Button>
|
</Button>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
|
<PaidFeaturesAlert tiers={tierMatrix.orgOidc} />
|
||||||
|
|
||||||
|
<fieldset disabled={disabled} className={disabled ? "opacity-50 pointer-events-none" : ""}>
|
||||||
<SettingsContainer>
|
<SettingsContainer>
|
||||||
<SettingsSection>
|
<SettingsSection>
|
||||||
<SettingsSectionHeader>
|
<SettingsSectionHeader>
|
||||||
@@ -812,9 +817,10 @@ export default function Page() {
|
|||||||
</Button>
|
</Button>
|
||||||
<Button
|
<Button
|
||||||
type="submit"
|
type="submit"
|
||||||
disabled={createLoading || !isPaidUser(tierMatrix.orgOidc)}
|
disabled={createLoading || disabled}
|
||||||
loading={createLoading}
|
loading={createLoading}
|
||||||
onClick={() => {
|
onClick={() => {
|
||||||
|
if (disabled) return;
|
||||||
// log any issues with the form
|
// log any issues with the form
|
||||||
console.log(form.formState.errors);
|
console.log(form.formState.errors);
|
||||||
form.handleSubmit(onSubmit)();
|
form.handleSubmit(onSubmit)();
|
||||||
@@ -823,6 +829,7 @@ export default function Page() {
|
|||||||
{t("idpSubmit")}
|
{t("idpSubmit")}
|
||||||
</Button>
|
</Button>
|
||||||
</div>
|
</div>
|
||||||
|
</fieldset>
|
||||||
</>
|
</>
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -493,7 +493,8 @@ export default function GeneralPage() {
|
|||||||
{
|
{
|
||||||
value: "whitelistedEmail",
|
value: "whitelistedEmail",
|
||||||
label: "Whitelisted Email"
|
label: "Whitelisted Email"
|
||||||
}
|
},
|
||||||
|
{ value: "ssh", label: "SSH" }
|
||||||
]}
|
]}
|
||||||
selectedValue={filters.type}
|
selectedValue={filters.type}
|
||||||
onValueChange={(value) =>
|
onValueChange={(value) =>
|
||||||
@@ -507,13 +508,12 @@ export default function GeneralPage() {
|
|||||||
);
|
);
|
||||||
},
|
},
|
||||||
cell: ({ row }) => {
|
cell: ({ row }) => {
|
||||||
// should be capitalized first letter
|
const typeLabel =
|
||||||
return (
|
row.original.type === "ssh"
|
||||||
<span>
|
? "SSH"
|
||||||
{row.original.type.charAt(0).toUpperCase() +
|
: row.original.type.charAt(0).toUpperCase() +
|
||||||
row.original.type.slice(1) || "-"}
|
row.original.type.slice(1);
|
||||||
</span>
|
return <span>{typeLabel || "-"}</span>;
|
||||||
);
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ import { usePathname, useRouter } from "next/navigation";
|
|||||||
import { useMemo, useState } from "react";
|
import { useMemo, useState } from "react";
|
||||||
import { useUserContext } from "@app/hooks/useUserContext";
|
import { useUserContext } from "@app/hooks/useUserContext";
|
||||||
import { useTranslations } from "next-intl";
|
import { useTranslations } from "next-intl";
|
||||||
|
import { build } from "@server/build";
|
||||||
|
|
||||||
interface OrgSelectorProps {
|
interface OrgSelectorProps {
|
||||||
orgId?: string;
|
orgId?: string;
|
||||||
@@ -50,6 +51,11 @@ export function OrgSelector({
|
|||||||
|
|
||||||
const selectedOrg = orgs?.find((org) => org.orgId === orgId);
|
const selectedOrg = orgs?.find((org) => org.orgId === orgId);
|
||||||
|
|
||||||
|
let canCreateOrg = !env.flags.disableUserCreateOrg || user.serverAdmin;
|
||||||
|
if (build === "saas" && user.type !== "internal") {
|
||||||
|
canCreateOrg = false;
|
||||||
|
}
|
||||||
|
|
||||||
const sortedOrgs = useMemo(() => {
|
const sortedOrgs = useMemo(() => {
|
||||||
if (!orgs?.length) return orgs ?? [];
|
if (!orgs?.length) return orgs ?? [];
|
||||||
return [...orgs].sort((a, b) => {
|
return [...orgs].sort((a, b) => {
|
||||||
@@ -161,7 +167,7 @@ export function OrgSelector({
|
|||||||
</CommandGroup>
|
</CommandGroup>
|
||||||
</CommandList>
|
</CommandList>
|
||||||
</Command>
|
</Command>
|
||||||
{(!env.flags.disableUserCreateOrg || user.serverAdmin) && (
|
{canCreateOrg && (
|
||||||
<div className="p-2 border-t border-border">
|
<div className="p-2 border-t border-border">
|
||||||
<Button
|
<Button
|
||||||
variant="ghost"
|
variant="ghost"
|
||||||
|
|||||||
Reference in New Issue
Block a user