Merge branch 'dev' into msg-opt

This commit is contained in:
Owen
2026-03-23 16:00:50 -07:00
203 changed files with 4955 additions and 4932 deletions

View File

@@ -19,6 +19,7 @@ export enum ActionsEnum {
getSite = "getSite",
listSites = "listSites",
updateSite = "updateSite",
resetSiteBandwidth = "resetSiteBandwidth",
reGenerateSecret = "reGenerateSecret",
createResource = "createResource",
deleteResource = "deleteResource",

View File

@@ -1,6 +1,10 @@
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
async function cleanup() {
await flushBandwidthToDb();
await flushSiteBandwidthToDb();
await wsCleanup();
process.exit(0);
@@ -10,4 +14,4 @@ export async function initCleanup() {
// Handle process termination
process.on("SIGTERM", () => cleanup());
process.on("SIGINT", () => cleanup());
}
}

View File

@@ -328,6 +328,14 @@ export const approvals = pgTable("approvals", {
.notNull()
});
export const bannedEmails = pgTable("bannedEmails", {
email: varchar("email", { length: 255 }).primaryKey(),
});
export const bannedIps = pgTable("bannedIps", {
ip: varchar("ip", { length: 255 }).primaryKey(),
});
export type Approval = InferSelectModel<typeof approvals>;
export type Limit = InferSelectModel<typeof limits>;
export type Account = InferSelectModel<typeof account>;

View File

@@ -22,7 +22,8 @@ export const domains = pgTable("domains", {
tries: integer("tries").notNull().default(0),
certResolver: varchar("certResolver"),
customCertResolver: varchar("customCertResolver"),
preferWildcardCert: boolean("preferWildcardCert")
preferWildcardCert: boolean("preferWildcardCert"),
errorMessage: text("errorMessage")
});
export const dnsRecords = pgTable("dnsRecords", {
@@ -88,6 +89,7 @@ export const sites = pgTable("sites", {
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
type: varchar("type").notNull(), // "newt" or "wireguard"
online: boolean("online").notNull().default(false),
lastPing: integer("lastPing"),
address: varchar("address"),
endpoint: varchar("endpoint"),
publicKey: varchar("publicKey"),
@@ -283,6 +285,7 @@ export const users = pgTable("user", {
dateCreated: varchar("dateCreated").notNull(),
termsAcceptedTimestamp: varchar("termsAcceptedTimestamp"),
termsVersion: varchar("termsVersion"),
marketingEmailConsent: boolean("marketingEmailConsent").default(false),
serverAdmin: boolean("serverAdmin").notNull().default(false),
lastPasswordChange: bigint("lastPasswordChange", { mode: "number" })
});
@@ -719,6 +722,7 @@ export const clientSitesAssociationsCache = pgTable(
.notNull(),
siteId: integer("siteId").notNull(),
isRelayed: boolean("isRelayed").notNull().default(false),
isJitMode: boolean("isJitMode").notNull().default(false),
endpoint: varchar("endpoint"),
publicKey: varchar("publicKey") // this will act as the session's public key for hole punching so we can track when it changes
}

View File

@@ -318,6 +318,15 @@ export const approvals = sqliteTable("approvals", {
.notNull()
});
export const bannedEmails = sqliteTable("bannedEmails", {
email: text("email").primaryKey()
});
export const bannedIps = sqliteTable("bannedIps", {
ip: text("ip").primaryKey()
});
export type Approval = InferSelectModel<typeof approvals>;
export type Limit = InferSelectModel<typeof limits>;
export type Account = InferSelectModel<typeof account>;

View File

@@ -13,7 +13,8 @@ export const domains = sqliteTable("domains", {
failed: integer("failed", { mode: "boolean" }).notNull().default(false),
tries: integer("tries").notNull().default(0),
certResolver: text("certResolver"),
preferWildcardCert: integer("preferWildcardCert", { mode: "boolean" })
preferWildcardCert: integer("preferWildcardCert", { mode: "boolean" }),
errorMessage: text("errorMessage")
});
export const dnsRecords = sqliteTable("dnsRecords", {
@@ -89,6 +90,7 @@ export const sites = sqliteTable("sites", {
lastBandwidthUpdate: text("lastBandwidthUpdate"),
type: text("type").notNull(), // "newt" or "wireguard"
online: integer("online", { mode: "boolean" }).notNull().default(false),
lastPing: integer("lastPing"),
// exit node stuff that is how to connect to the site when it has a wg server
address: text("address"), // this is the address of the wireguard interface in newt
@@ -314,6 +316,9 @@ export const users = sqliteTable("user", {
dateCreated: text("dateCreated").notNull(),
termsAcceptedTimestamp: text("termsAcceptedTimestamp"),
termsVersion: text("termsVersion"),
marketingEmailConsent: integer("marketingEmailConsent", {
mode: "boolean"
}).default(false),
serverAdmin: integer("serverAdmin", { mode: "boolean" })
.notNull()
.default(false),
@@ -406,6 +411,9 @@ export const clientSitesAssociationsCache = sqliteTable(
isRelayed: integer("isRelayed", { mode: "boolean" })
.notNull()
.default(false),
isJitMode: integer("isJitMode", { mode: "boolean" })
.notNull()
.default(false),
endpoint: text("endpoint"),
publicKey: text("publicKey") // this will act as the session's public key for hole punching so we can track when it changes
}

View File

@@ -17,6 +17,7 @@ import fs from "fs";
import path from "path";
import { APP_PATH } from "./lib/consts";
import yaml from "js-yaml";
import { z } from "zod";
const dev = process.env.ENVIRONMENT !== "prod";
const externalPort = config.getRawConfig().server.integration_port;
@@ -38,12 +39,24 @@ export function createIntegrationApiServer() {
apiServer.use(cookieParser());
apiServer.use(express.json());
const openApiDocumentation = getOpenApiDocumentation();
apiServer.use(
"/v1/docs",
swaggerUi.serve,
swaggerUi.setup(getOpenApiDocumentation())
swaggerUi.setup(openApiDocumentation)
);
// Unauthenticated OpenAPI spec endpoints
apiServer.get("/v1/openapi.json", (_req, res) => {
res.json(openApiDocumentation);
});
apiServer.get("/v1/openapi.yaml", (_req, res) => {
const yamlOutput = yaml.dump(openApiDocumentation);
res.type("application/yaml").send(yamlOutput);
});
// API routes
const prefix = `/v1`;
apiServer.use(logIncomingMiddleware);
@@ -75,16 +88,6 @@ function getOpenApiDocumentation() {
}
);
for (const def of registry.definitions) {
if (def.type === "route") {
def.route.security = [
{
[bearerAuth.name]: []
}
];
}
}
registry.registerPath({
method: "get",
path: "/",
@@ -94,6 +97,74 @@ function getOpenApiDocumentation() {
responses: {}
});
registry.registerPath({
method: "get",
path: "/openapi.json",
description: "Get OpenAPI specification as JSON",
tags: [],
request: {},
responses: {
"200": {
description: "OpenAPI specification as JSON",
content: {
"application/json": {
schema: {
type: "object"
}
}
}
}
}
});
registry.registerPath({
method: "get",
path: "/openapi.yaml",
description: "Get OpenAPI specification as YAML",
tags: [],
request: {},
responses: {
"200": {
description: "OpenAPI specification as YAML",
content: {
"application/yaml": {
schema: {
type: "string"
}
}
}
}
}
});
for (const def of registry.definitions) {
if (def.type === "route") {
def.route.security = [
{
[bearerAuth.name]: []
}
];
// Ensure every route has a generic JSON response schema so Swagger UI can render responses
const existingResponses = def.route.responses;
const hasExistingResponses =
existingResponses && Object.keys(existingResponses).length > 0;
if (!hasExistingResponses) {
def.route.responses = {
"*": {
description: "",
content: {
"application/json": {
schema: z.object({})
}
}
}
};
}
}
}
const generator = new OpenApiGeneratorV3(registry.definitions);
const generated = generator.generateDocument({

View File

@@ -107,7 +107,7 @@ export async function applyBlueprint({
[target],
matchingHealthcheck ? [matchingHealthcheck] : [],
result.proxyResource.protocol,
result.proxyResource.proxyPort
site.newt.version
);
}
}

View File

@@ -4,8 +4,12 @@ import { cleanUpOldLogs as cleanUpOldActionLogs } from "#dynamic/middlewares/log
import { cleanUpOldLogs as cleanUpOldRequestLogs } from "@server/routers/badger/logRequestAudit";
import { gt, or } from "drizzle-orm";
import { cleanUpOldFingerprintSnapshots } from "@server/routers/olm/fingerprintingUtils";
import { build } from "@server/build";
export function initLogCleanupInterval() {
if (build == "saas") { // skip log cleanup for saas builds
return null;
}
return setInterval(
async () => {
const orgsToClean = await db

View File

@@ -0,0 +1,20 @@
import semver from "semver";
export function canCompress(
clientVersion: string | null | undefined,
type: "newt" | "olm"
): boolean {
try {
if (!clientVersion) return false;
// check if it is a valid semver
if (!semver.valid(clientVersion)) return false;
if (type === "newt") {
return semver.gte(clientVersion, "1.10.3");
} else if (type === "olm") {
return semver.gte(clientVersion, "1.4.3");
}
return false;
} catch {
return false;
}
}

View File

@@ -85,9 +85,7 @@ export async function deleteOrgById(
deletedNewtIds.push(deletedNewt.newtId);
await trx
.delete(newtSessions)
.where(
eq(newtSessions.newtId, deletedNewt.newtId)
);
.where(eq(newtSessions.newtId, deletedNewt.newtId));
}
}
}
@@ -121,33 +119,38 @@ export async function deleteOrgById(
eq(clientSitesAssociationsCache.clientId, client.clientId)
);
}
await trx.delete(resources).where(eq(resources.orgId, orgId));
const allOrgDomains = await trx
.select()
.from(orgDomains)
.innerJoin(domains, eq(domains.domainId, orgDomains.domainId))
.innerJoin(domains, eq(orgDomains.domainId, domains.domainId))
.where(
and(
eq(orgDomains.orgId, orgId),
eq(domains.configManaged, false)
)
);
logger.info(`Found ${allOrgDomains.length} domains to delete`);
const domainIdsToDelete: string[] = [];
for (const orgDomain of allOrgDomains) {
const domainId = orgDomain.domains.domainId;
const orgCount = await trx
.select({ count: sql<number>`count(*)` })
const [orgCount] = await trx
.select({ count: count() })
.from(orgDomains)
.where(eq(orgDomains.domainId, domainId));
if (orgCount[0].count === 1) {
logger.info(`Found ${orgCount.count} orgs using domain ${domainId}`);
if (orgCount.count === 1) {
domainIdsToDelete.push(domainId);
}
}
logger.info(`Found ${domainIdsToDelete.length} domains to delete`);
if (domainIdsToDelete.length > 0) {
await trx
.delete(domains)
.where(inArray(domains.domainId, domainIdsToDelete));
}
await trx.delete(resources).where(eq(resources.orgId, orgId));
await usageService.add(orgId, FeatureId.ORGINIZATIONS, -1, trx); // here we are decreasing the org count BEFORE deleting the org because we need to still be able to get the org to get the billing org inside of here
@@ -231,15 +234,13 @@ export function sendTerminationMessages(result: DeleteOrgByIdResult): void {
);
}
for (const olmId of result.olmsToTerminate) {
sendTerminateClient(
0,
OlmErrorCodes.TERMINATED_REKEYED,
olmId
).catch((error) => {
logger.error(
"Failed to send termination message to olm:",
error
);
});
sendTerminateClient(0, OlmErrorCodes.TERMINATED_REKEYED, olmId).catch(
(error) => {
logger.error(
"Failed to send termination message to olm:",
error
);
}
);
}
}

View File

@@ -477,6 +477,7 @@ async function handleMessagesForSiteClients(
}
if (isAdd) {
// TODO: if we are in jit mode here should we really be sending this?
await initPeerAddHandshake(
// this will kick off the add peer process for the client
client.clientId,
@@ -571,7 +572,7 @@ export async function updateClientSiteDestinations(
destinations: [
{
destinationIP: site.sites.subnet.split("/")[0],
destinationPort: site.sites.listenPort || 0
destinationPort: site.sites.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
}
]
};
@@ -579,7 +580,7 @@ export async function updateClientSiteDestinations(
// add to the existing destinations
destinations.destinations.push({
destinationIP: site.sites.subnet.split("/")[0],
destinationPort: site.sites.listenPort || 0
destinationPort: site.sites.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
});
}
@@ -666,7 +667,11 @@ async function handleSubnetProxyTargetUpdates(
if (targetToAdd) {
proxyJobs.push(
addSubnetProxyTargets(newt.newtId, [targetToAdd])
addSubnetProxyTargets(
newt.newtId,
[targetToAdd],
newt.version
)
);
}
@@ -699,7 +704,11 @@ async function handleSubnetProxyTargetUpdates(
if (targetToRemove) {
proxyJobs.push(
removeSubnetProxyTargets(newt.newtId, [targetToRemove])
removeSubnetProxyTargets(
newt.newtId,
[targetToRemove],
newt.version
)
);
}
@@ -1074,6 +1083,7 @@ async function handleMessagesForClientSites(
continue;
}
// TODO: if we are in jit mode here should we really be sending this?
await initPeerAddHandshake(
// this will kick off the add peer process for the client
client.clientId,
@@ -1140,7 +1150,7 @@ async function handleMessagesForClientResources(
// Add subnet proxy targets for each site
for (const [siteId, resources] of addedBySite.entries()) {
const [newt] = await trx
.select({ newtId: newts.newtId })
.select({ newtId: newts.newtId, version: newts.version })
.from(newts)
.where(eq(newts.siteId, siteId))
.limit(1);
@@ -1162,7 +1172,13 @@ async function handleMessagesForClientResources(
]);
if (target) {
proxyJobs.push(addSubnetProxyTargets(newt.newtId, [target]));
proxyJobs.push(
addSubnetProxyTargets(
newt.newtId,
[target],
newt.version
)
);
}
try {
@@ -1211,7 +1227,7 @@ async function handleMessagesForClientResources(
// Remove subnet proxy targets for each site
for (const [siteId, resources] of removedBySite.entries()) {
const [newt] = await trx
.select({ newtId: newts.newtId })
.select({ newtId: newts.newtId, version: newts.version })
.from(newts)
.where(eq(newts.siteId, siteId))
.limit(1);
@@ -1234,7 +1250,11 @@ async function handleMessagesForClientResources(
if (target) {
proxyJobs.push(
removeSubnetProxyTargets(newt.newtId, [target])
removeSubnetProxyTargets(
newt.newtId,
[target],
newt.version
)
);
}

View File

@@ -1,16 +0,0 @@
export enum AudienceIds {
SignUps = "",
Subscribed = "",
Churned = "",
Newsletter = ""
}
let resend;
export default resend;
export async function moveEmailToAudience(
email: string,
audienceId: AudienceIds
) {
return;
}

40
server/lib/sanitize.ts Normal file
View File

@@ -0,0 +1,40 @@
/**
* Sanitize a string field before inserting into a database TEXT column.
*
* Two passes are applied:
*
* 1. Lone UTF-16 surrogates JavaScript strings can hold unpaired surrogates
* (e.g. \uD800 without a following \uDC00-\uDFFF codepoint). These are
* valid in JS but cannot be encoded as UTF-8, triggering
* `report_invalid_encoding` in SQLite / Postgres. They are replaced with
* the Unicode replacement character U+FFFD so the data is preserved as a
* visible signal that something was malformed.
*
* 2. Null bytes and C0 control characters SQLite stores TEXT as
* null-terminated C strings, so \x00 in a value causes
* `report_invalid_encoding`. Bots and scanners routinely inject null bytes
* into URLs (e.g. `/path\u0000.jpg`). All C0 control characters in the
* range \x00-\x1F are stripped except for the three that are legitimate in
* text payloads: HT (\x09), LF (\x0A), and CR (\x0D). DEL (\x7F) is also
* stripped.
*/
export function sanitizeString(value: string): string;
export function sanitizeString(
value: string | null | undefined
): string | undefined;
export function sanitizeString(
value: string | null | undefined
): string | undefined {
if (value == null) return undefined;
return (
value
// Replace lone high surrogates (not followed by a low surrogate)
// and lone low surrogates (not preceded by a high surrogate).
.replace(
/[\uD800-\uDBFF](?![\uDC00-\uDFFF])|(?<![\uD800-\uDBFF])[\uDC00-\uDFFF]/g,
"\uFFFD"
)
// Strip null bytes, C0 control chars (except HT/LF/CR), and DEL.
.replace(/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/g, "")
);
}

View File

@@ -218,10 +218,11 @@ export class TraefikConfigManager {
return true;
}
// Fetch if it's been more than 24 hours (for renewals)
const dayInMs = 24 * 60 * 60 * 1000;
const timeSinceLastFetch =
Date.now() - this.lastCertificateFetch.getTime();
// Fetch if it's been more than 24 hours (daily routine check)
if (timeSinceLastFetch > dayInMs) {
logger.info("Fetching certificates due to 24-hour renewal check");
return true;
@@ -265,7 +266,7 @@ export class TraefikConfigManager {
return true;
}
// Check if any local certificates are missing or appear to be outdated
// Check if any local certificates are missing (needs immediate fetch)
for (const domain of domainsNeedingCerts) {
const localState = this.lastLocalCertificateState.get(domain);
if (!localState || !localState.exists) {
@@ -274,17 +275,46 @@ export class TraefikConfigManager {
);
return true;
}
}
// Check if certificate is expiring soon (within 30 days)
if (localState.expiresAt) {
const nowInSeconds = Math.floor(Date.now() / 1000);
const secondsUntilExpiry = localState.expiresAt - nowInSeconds;
const daysUntilExpiry = secondsUntilExpiry / (60 * 60 * 24);
if (daysUntilExpiry < 30) {
logger.info(
`Fetching certificates due to upcoming expiry for ${domain} (${Math.round(daysUntilExpiry)} days remaining)`
);
return true;
// For expiry checks, throttle to every 6 hours to avoid querying the
// API/DB on every monitor loop. The certificate-service renews certs
// 45 days before expiry, so checking every 6 hours is plenty frequent
// to pick up renewed certs promptly.
const renewalCheckIntervalMs = 6 * 60 * 60 * 1000; // 6 hours
if (timeSinceLastFetch > renewalCheckIntervalMs) {
// Check non-wildcard certs for expiry (within 45 days to match
// the server-side renewal window in certificate-service)
for (const domain of domainsNeedingCerts) {
const localState = this.lastLocalCertificateState.get(domain);
if (localState?.expiresAt) {
const nowInSeconds = Math.floor(Date.now() / 1000);
const secondsUntilExpiry =
localState.expiresAt - nowInSeconds;
const daysUntilExpiry = secondsUntilExpiry / (60 * 60 * 24);
if (daysUntilExpiry < 45) {
logger.info(
`Fetching certificates due to upcoming expiry for ${domain} (${Math.round(daysUntilExpiry)} days remaining)`
);
return true;
}
}
}
// Also check wildcard certificates for expiry. These are not
// included in domainsNeedingCerts since their subdomains are
// filtered out, so we must check them separately.
for (const [certDomain, state] of this.lastLocalCertificateState) {
if (state.exists && state.wildcard && state.expiresAt) {
const nowInSeconds = Math.floor(Date.now() / 1000);
const secondsUntilExpiry = state.expiresAt - nowInSeconds;
const daysUntilExpiry = secondsUntilExpiry / (60 * 60 * 24);
if (daysUntilExpiry < 45) {
logger.info(
`Fetching certificates due to upcoming expiry for wildcard cert ${certDomain} (${Math.round(daysUntilExpiry)} days remaining)`
);
return true;
}
}
}
}
@@ -361,6 +391,26 @@ export class TraefikConfigManager {
}
}
// Also include wildcard cert base domains that are
// expiring or expired so they get re-fetched even though
// their subdomains were filtered out above.
for (const [certDomain, state] of this
.lastLocalCertificateState) {
if (state.exists && state.wildcard && state.expiresAt) {
const nowInSeconds = Math.floor(Date.now() / 1000);
const secondsUntilExpiry =
state.expiresAt - nowInSeconds;
const daysUntilExpiry =
secondsUntilExpiry / (60 * 60 * 24);
if (daysUntilExpiry < 45) {
domainsToFetch.add(certDomain);
logger.info(
`Including expiring wildcard cert domain ${certDomain} in fetch (${Math.round(daysUntilExpiry)} days remaining)`
);
}
}
}
if (domainsToFetch.size > 0) {
// Get valid certificates for domains not covered by wildcards
validCertificates =
@@ -507,11 +557,18 @@ export class TraefikConfigManager {
config.getRawConfig().server
.session_cookie_name,
// deprecated
accessTokenQueryParam:
config.getRawConfig().server
.resource_access_token_param,
accessTokenIdHeader:
config.getRawConfig().server
.resource_access_token_headers.id,
accessTokenHeader:
config.getRawConfig().server
.resource_access_token_headers.token,
resourceSessionRequestParam:
config.getRawConfig().server
.resource_session_request_param

View File

@@ -14,7 +14,7 @@ import logger from "@server/logger";
import config from "@server/lib/config";
import { resources, sites, Target, targets } from "@server/db";
import createPathRewriteMiddleware from "./middleware";
import { sanitize, validatePathRewriteConfig } from "./utils";
import { sanitize, encodePath, validatePathRewriteConfig } from "./utils";
const redirectHttpsMiddlewareName = "redirect-to-https";
const badgerMiddlewareName = "badger";
@@ -44,7 +44,7 @@ export async function getTraefikConfig(
filterOutNamespaceDomains = false, // UNUSED BUT USED IN PRIVATE
generateLoginPageRouters = false, // UNUSED BUT USED IN PRIVATE
allowRawResources = true,
allowMaintenancePage = true, // UNUSED BUT USED IN PRIVATE
allowMaintenancePage = true // UNUSED BUT USED IN PRIVATE
): Promise<any> {
// Get resources with their targets and sites in a single optimized query
// Start from sites on this exit node, then join to targets and resources
@@ -127,7 +127,7 @@ export async function getTraefikConfig(
resourcesWithTargetsAndSites.forEach((row) => {
const resourceId = row.resourceId;
const resourceName = sanitize(row.resourceName) || "";
const targetPath = sanitize(row.path) || ""; // Handle null/undefined paths
const targetPath = encodePath(row.path); // Use encodePath to avoid collisions (e.g. "/a/b" vs "/a-b")
const pathMatchType = row.pathMatchType || "";
const rewritePath = row.rewritePath || "";
const rewritePathType = row.rewritePathType || "";
@@ -145,7 +145,7 @@ export async function getTraefikConfig(
const mapKey = [resourceId, pathKey].filter(Boolean).join("-");
const key = sanitize(mapKey);
if (!resourcesMap.has(key)) {
if (!resourcesMap.has(mapKey)) {
const validation = validatePathRewriteConfig(
row.path,
row.pathMatchType,
@@ -160,9 +160,10 @@ export async function getTraefikConfig(
return;
}
resourcesMap.set(key, {
resourcesMap.set(mapKey, {
resourceId: row.resourceId,
name: resourceName,
key: key,
fullDomain: row.fullDomain,
ssl: row.ssl,
http: row.http,
@@ -190,7 +191,7 @@ export async function getTraefikConfig(
});
}
resourcesMap.get(key).targets.push({
resourcesMap.get(mapKey).targets.push({
resourceId: row.resourceId,
targetId: row.targetId,
ip: row.ip,
@@ -227,8 +228,9 @@ export async function getTraefikConfig(
};
// get the key and the resource
for (const [key, resource] of resourcesMap.entries()) {
for (const [, resource] of resourcesMap.entries()) {
const targets = resource.targets as TargetWithSite[];
const key = resource.key;
const routerName = `${key}-${resource.name}-router`;
const serviceName = `${key}-${resource.name}-service`;

View File

@@ -0,0 +1,323 @@
import { assertEquals } from "../../../test/assert";
// ── Pure function copies (inlined to avoid pulling in server dependencies) ──
function sanitize(input: string | null | undefined): string | undefined {
if (!input) return undefined;
if (input.length > 50) {
input = input.substring(0, 50);
}
return input
.replace(/[^a-zA-Z0-9-]/g, "-")
.replace(/-+/g, "-")
.replace(/^-|-$/g, "");
}
function encodePath(path: string | null | undefined): string {
if (!path) return "";
return path.replace(/[^a-zA-Z0-9]/g, (ch) => {
return ch.charCodeAt(0).toString(16);
});
}
// ── Helpers ──────────────────────────────────────────────────────────
/**
* Exact replica of the OLD key computation from upstream main.
* Uses sanitize() for paths — this is what had the collision bug.
*/
function oldKeyComputation(
resourceId: number,
path: string | null,
pathMatchType: string | null,
rewritePath: string | null,
rewritePathType: string | null
): string {
const targetPath = sanitize(path) || "";
const pmt = pathMatchType || "";
const rp = rewritePath || "";
const rpt = rewritePathType || "";
const pathKey = [targetPath, pmt, rp, rpt].filter(Boolean).join("-");
const mapKey = [resourceId, pathKey].filter(Boolean).join("-");
return sanitize(mapKey) || "";
}
/**
* Replica of the NEW key computation from our fix.
* Uses encodePath() for paths — collision-free.
*/
function newKeyComputation(
resourceId: number,
path: string | null,
pathMatchType: string | null,
rewritePath: string | null,
rewritePathType: string | null
): string {
const targetPath = encodePath(path);
const pmt = pathMatchType || "";
const rp = rewritePath || "";
const rpt = rewritePathType || "";
const pathKey = [targetPath, pmt, rp, rpt].filter(Boolean).join("-");
const mapKey = [resourceId, pathKey].filter(Boolean).join("-");
return sanitize(mapKey) || "";
}
// ── Tests ────────────────────────────────────────────────────────────
function runTests() {
console.log("Running path encoding tests...\n");
let passed = 0;
// ── encodePath unit tests ────────────────────────────────────────
// Test 1: null/undefined/empty
{
assertEquals(encodePath(null), "", "null should return empty");
assertEquals(
encodePath(undefined),
"",
"undefined should return empty"
);
assertEquals(encodePath(""), "", "empty string should return empty");
console.log(" PASS: encodePath handles null/undefined/empty");
passed++;
}
// Test 2: root path
{
assertEquals(encodePath("/"), "2f", "/ should encode to 2f");
console.log(" PASS: encodePath encodes root path");
passed++;
}
// Test 3: alphanumeric passthrough
{
assertEquals(encodePath("/api"), "2fapi", "/api encodes slash only");
assertEquals(encodePath("/v1"), "2fv1", "/v1 encodes slash only");
assertEquals(encodePath("abc"), "abc", "plain alpha passes through");
console.log(" PASS: encodePath preserves alphanumeric chars");
passed++;
}
// Test 4: all special chars produce unique hex
{
const paths = ["/a/b", "/a-b", "/a.b", "/a_b", "/a b"];
const results = paths.map((p) => encodePath(p));
const unique = new Set(results);
assertEquals(
unique.size,
paths.length,
"all special-char paths must produce unique encodings"
);
console.log(
" PASS: encodePath produces unique output for different special chars"
);
passed++;
}
// Test 5: output is always alphanumeric (safe for Traefik names)
{
const paths = [
"/",
"/api",
"/a/b",
"/a-b",
"/a.b",
"/complex/path/here"
];
for (const p of paths) {
const e = encodePath(p);
assertEquals(
/^[a-zA-Z0-9]+$/.test(e),
true,
`encodePath("${p}") = "${e}" must be alphanumeric`
);
}
console.log(" PASS: encodePath output is always alphanumeric");
passed++;
}
// Test 6: deterministic
{
assertEquals(
encodePath("/api"),
encodePath("/api"),
"same input same output"
);
assertEquals(
encodePath("/a/b/c"),
encodePath("/a/b/c"),
"same input same output"
);
console.log(" PASS: encodePath is deterministic");
passed++;
}
// Test 7: many distinct paths never collide
{
const paths = [
"/",
"/api",
"/api/v1",
"/api/v2",
"/a/b",
"/a-b",
"/a.b",
"/a_b",
"/health",
"/health/check",
"/admin",
"/admin/users",
"/api/v1/users",
"/api/v1/posts",
"/app",
"/app/dashboard"
];
const encoded = new Set(paths.map((p) => encodePath(p)));
assertEquals(
encoded.size,
paths.length,
`expected ${paths.length} unique encodings, got ${encoded.size}`
);
console.log(" PASS: 16 realistic paths all produce unique encodings");
passed++;
}
// ── Collision fix: the actual bug we're fixing ───────────────────
// Test 8: /a/b and /a-b now have different keys (THE BUG FIX)
{
const keyAB = newKeyComputation(1, "/a/b", "prefix", null, null);
const keyDash = newKeyComputation(1, "/a-b", "prefix", null, null);
assertEquals(
keyAB !== keyDash,
true,
"/a/b and /a-b MUST have different keys"
);
console.log(" PASS: collision fix — /a/b vs /a-b have different keys");
passed++;
}
// Test 9: demonstrate the old bug — old code maps /a/b and /a-b to same key
{
const oldKeyAB = oldKeyComputation(1, "/a/b", "prefix", null, null);
const oldKeyDash = oldKeyComputation(1, "/a-b", "prefix", null, null);
assertEquals(
oldKeyAB,
oldKeyDash,
"old code MUST have this collision (confirms the bug exists)"
);
console.log(" PASS: confirmed old code bug — /a/b and /a-b collided");
passed++;
}
// Test 10: /api/v1 and /api-v1 — old code collision, new code fixes it
{
const oldKey1 = oldKeyComputation(1, "/api/v1", "prefix", null, null);
const oldKey2 = oldKeyComputation(1, "/api-v1", "prefix", null, null);
assertEquals(
oldKey1,
oldKey2,
"old code collision for /api/v1 vs /api-v1"
);
const newKey1 = newKeyComputation(1, "/api/v1", "prefix", null, null);
const newKey2 = newKeyComputation(1, "/api-v1", "prefix", null, null);
assertEquals(
newKey1 !== newKey2,
true,
"new code must separate /api/v1 and /api-v1"
);
console.log(" PASS: collision fix — /api/v1 vs /api-v1");
passed++;
}
// Test 11: /app.v2 and /app/v2 and /app-v2 — three-way collision fixed
{
const a = newKeyComputation(1, "/app.v2", "prefix", null, null);
const b = newKeyComputation(1, "/app/v2", "prefix", null, null);
const c = newKeyComputation(1, "/app-v2", "prefix", null, null);
const keys = new Set([a, b, c]);
assertEquals(
keys.size,
3,
"three paths must produce three unique keys"
);
console.log(
" PASS: collision fix — three-way /app.v2, /app/v2, /app-v2"
);
passed++;
}
// ── Edge cases ───────────────────────────────────────────────────
// Test 12: same path in different resources — always separate
{
const key1 = newKeyComputation(1, "/api", "prefix", null, null);
const key2 = newKeyComputation(2, "/api", "prefix", null, null);
assertEquals(
key1 !== key2,
true,
"different resources with same path must have different keys"
);
console.log(" PASS: edge case — same path, different resources");
passed++;
}
// Test 13: same resource, different pathMatchType — separate keys
{
const exact = newKeyComputation(1, "/api", "exact", null, null);
const prefix = newKeyComputation(1, "/api", "prefix", null, null);
assertEquals(
exact !== prefix,
true,
"exact vs prefix must have different keys"
);
console.log(" PASS: edge case — same path, different match types");
passed++;
}
// Test 14: same resource and path, different rewrite config — separate keys
{
const noRewrite = newKeyComputation(1, "/api", "prefix", null, null);
const withRewrite = newKeyComputation(
1,
"/api",
"prefix",
"/backend",
"prefix"
);
assertEquals(
noRewrite !== withRewrite,
true,
"with vs without rewrite must have different keys"
);
console.log(" PASS: edge case — same path, different rewrite config");
passed++;
}
// Test 15: paths with special URL characters
{
const paths = ["/api?foo", "/api#bar", "/api%20baz", "/api+qux"];
const keys = new Set(
paths.map((p) => newKeyComputation(1, p, "prefix", null, null))
);
assertEquals(
keys.size,
paths.length,
"special URL chars must produce unique keys"
);
console.log(" PASS: edge case — special URL characters in paths");
passed++;
}
console.log(`\nAll ${passed} tests passed!`);
}
try {
runTests();
} catch (error) {
console.error("Test failed:", error);
process.exit(1);
}

View File

@@ -13,6 +13,26 @@ export function sanitize(input: string | null | undefined): string | undefined {
.replace(/^-|-$/g, "");
}
/**
* Encode a URL path into a collision-free alphanumeric string suitable for use
* in Traefik map keys.
*
* Unlike sanitize(), this preserves uniqueness by encoding each non-alphanumeric
* character as its hex code. Different paths always produce different outputs.
*
* encodePath("/api") => "2fapi"
* encodePath("/a/b") => "2fa2fb"
* encodePath("/a-b") => "2fa2db" (different from /a/b)
* encodePath("/") => "2f"
* encodePath(null) => ""
*/
export function encodePath(path: string | null | undefined): string {
if (!path) return "";
return path.replace(/[^a-zA-Z0-9]/g, (ch) => {
return ch.charCodeAt(0).toString(16);
});
}
export function validatePathRewriteConfig(
path: string | null,
pathMatchType: string | null,

View File

@@ -5,17 +5,20 @@ export const registry = new OpenAPIRegistry();
export enum OpenAPITags {
Site = "Site",
Org = "Organization",
Resource = "Resource",
PublicResource = "Public Resource",
PrivateResource = "Private Resource",
Role = "Role",
User = "User",
Invitation = "Invitation",
Target = "Target",
Invitation = "User Invitation",
Target = "Resource Target",
Rule = "Rule",
AccessToken = "Access Token",
Idp = "Identity Provider",
GlobalIdp = "Identity Provider (Global)",
OrgIdp = "Identity Provider (Organization Only)",
Client = "Client",
ApiKey = "API Key",
Domain = "Domain",
Blueprint = "Blueprint",
Ssh = "SSH"
Ssh = "SSH",
Logs = "Logs"
}

View File

@@ -13,8 +13,12 @@
import { rateLimitService } from "#private/lib/rateLimit";
import { cleanup as wsCleanup } from "#private/routers/ws";
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
async function cleanup() {
await flushBandwidthToDb();
await flushSiteBandwidthToDb();
await rateLimitService.cleanup();
await wsCleanup();
@@ -25,4 +29,4 @@ export async function initCleanup() {
// Handle process termination
process.on("SIGTERM", () => cleanup());
process.on("SIGINT", () => cleanup());
}
}

View File

@@ -24,23 +24,31 @@ setInterval(() => {
*/
class AdaptiveCache {
private useRedis(): boolean {
return redisManager.isRedisEnabled() && redisManager.getHealthStatus().isHealthy;
return (
redisManager.isRedisEnabled() &&
redisManager.getHealthStatus().isHealthy
);
}
/**
* Set a value in the cache
* @param key - Cache key
* @param value - Value to cache (will be JSON stringified for Redis)
* @param ttl - Time to live in seconds (0 = no expiration)
* @param ttl - Time to live in seconds (0 = no expiration; omit = 3600s for Redis)
* @returns boolean indicating success
*/
async set(key: string, value: any, ttl?: number): Promise<boolean> {
const effectiveTtl = ttl === 0 ? undefined : ttl;
const redisTtl = ttl === 0 ? undefined : (ttl ?? 3600);
if (this.useRedis()) {
try {
const serialized = JSON.stringify(value);
const success = await redisManager.set(key, serialized, effectiveTtl);
const success = await redisManager.set(
key,
serialized,
redisTtl
);
if (success) {
logger.debug(`Set key in Redis: ${key}`);
@@ -48,7 +56,9 @@ class AdaptiveCache {
}
// Redis failed, fall through to local cache
logger.debug(`Redis set failed for key ${key}, falling back to local cache`);
logger.debug(
`Redis set failed for key ${key}, falling back to local cache`
);
} catch (error) {
logger.error(`Redis set error for key ${key}:`, error);
// Fall through to local cache
@@ -120,9 +130,14 @@ class AdaptiveCache {
}
// Some Redis deletes failed, fall through to local cache
logger.debug(`Some Redis deletes failed, falling back to local cache`);
logger.debug(
`Some Redis deletes failed, falling back to local cache`
);
} catch (error) {
logger.error(`Redis del error for keys ${keys.join(", ")}:`, error);
logger.error(
`Redis del error for keys ${keys.join(", ")}:`,
error
);
// Fall through to local cache
deletedCount = 0;
}
@@ -195,7 +210,9 @@ class AdaptiveCache {
*/
async flushAll(): Promise<void> {
if (this.useRedis()) {
logger.warn("Adaptive cache flushAll called - Redis flush not implemented, only local cache will be flushed");
logger.warn(
"Adaptive cache flushAll called - Redis flush not implemented, only local cache will be flushed"
);
}
localCache.flushAll();
@@ -239,7 +256,9 @@ class AdaptiveCache {
getTtl(key: string): number {
// Note: This only works for local cache, Redis TTL is not supported
if (this.useRedis()) {
logger.warn(`getTtl called for key ${key} but Redis TTL lookup is not implemented`);
logger.warn(
`getTtl called for key ${key} but Redis TTL lookup is not implemented`
);
}
const ttl = localCache.getTtl(key);
@@ -255,7 +274,9 @@ class AdaptiveCache {
*/
keys(): string[] {
if (this.useRedis()) {
logger.warn("keys() called but Redis keys are not included, only local cache keys returned");
logger.warn(
"keys() called but Redis keys are not included, only local cache keys returned"
);
}
return localCache.keys();
}

View File

@@ -38,10 +38,6 @@ export const privateConfigSchema = z.object({
.string()
.optional()
.transform(getEnvOrYaml("SERVER_ENCRYPTION_KEY")),
resend_api_key: z
.string()
.optional()
.transform(getEnvOrYaml("RESEND_API_KEY")),
reo_client_id: z
.string()
.optional()
@@ -61,7 +57,10 @@ export const privateConfigSchema = z.object({
.object({
host: z.string(),
port: portSchema,
password: z.string().optional(),
password: z
.string()
.optional()
.transform(getEnvOrYaml("REDIS_PASSWORD")),
db: z.int().nonnegative().optional().default(0),
replicas: z
.array(

View File

@@ -1,127 +0,0 @@
/*
* This file is part of a proprietary work.
*
* Copyright (c) 2025 Fossorial, Inc.
* All rights reserved.
*
* This file is licensed under the Fossorial Commercial License.
* You may not use this file except in compliance with the License.
* Unauthorized use, copying, modification, or distribution is strictly prohibited.
*
* This file is not licensed under the AGPLv3.
*/
import { Resend } from "resend";
import privateConfig from "#private/lib/config";
import logger from "@server/logger";
export enum AudienceIds {
SignUps = "6c4e77b2-0851-4bd6-bac8-f51f91360f1a",
Subscribed = "870b43fd-387f-44de-8fc1-707335f30b20",
Churned = "f3ae92bd-2fdb-4d77-8746-2118afd62549",
Newsletter = "5500c431-191c-42f0-a5d4-8b6d445b4ea0"
}
const resend = new Resend(
privateConfig.getRawPrivateConfig().server.resend_api_key || "missing"
);
export default resend;
export async function moveEmailToAudience(
email: string,
audienceId: AudienceIds
) {
if (process.env.ENVIRONMENT !== "prod") {
logger.debug(
`Skipping moving email ${email} to audience ${audienceId} in non-prod environment`
);
return;
}
const { error, data } = await retryWithBackoff(async () => {
const { data, error } = await resend.contacts.create({
email,
unsubscribed: false,
audienceId
});
if (error) {
throw new Error(
`Error adding email ${email} to audience ${audienceId}: ${error}`
);
}
return { error, data };
});
if (error) {
logger.error(
`Error adding email ${email} to audience ${audienceId}: ${error}`
);
return;
}
if (data) {
logger.debug(
`Added email ${email} to audience ${audienceId} with contact ID ${data.id}`
);
}
const otherAudiences = Object.values(AudienceIds).filter(
(id) => id !== audienceId
);
for (const otherAudienceId of otherAudiences) {
const { error, data } = await retryWithBackoff(async () => {
const { data, error } = await resend.contacts.remove({
email,
audienceId: otherAudienceId
});
if (error) {
throw new Error(
`Error removing email ${email} from audience ${otherAudienceId}: ${error}`
);
}
return { error, data };
});
if (error) {
logger.error(
`Error removing email ${email} from audience ${otherAudienceId}: ${error}`
);
}
if (data) {
logger.info(
`Removed email ${email} from audience ${otherAudienceId}`
);
}
}
}
type RetryOptions = {
retries?: number;
initialDelayMs?: number;
factor?: number;
};
export async function retryWithBackoff<T>(
fn: () => Promise<T>,
options: RetryOptions = {}
): Promise<T> {
const { retries = 5, initialDelayMs = 500, factor = 2 } = options;
let attempt = 0;
let delay = initialDelayMs;
while (true) {
try {
return await fn();
} catch (err) {
attempt++;
if (attempt > retries) throw err;
await new Promise((resolve) => setTimeout(resolve, delay));
delay *= factor;
}
}
}

View File

@@ -34,7 +34,11 @@ import {
import logger from "@server/logger";
import config from "@server/lib/config";
import { orgs, resources, sites, Target, targets } from "@server/db";
import { sanitize, validatePathRewriteConfig } from "@server/lib/traefik/utils";
import {
sanitize,
encodePath,
validatePathRewriteConfig
} from "@server/lib/traefik/utils";
import privateConfig from "#private/lib/config";
import createPathRewriteMiddleware from "@server/lib/traefik/middleware";
import {
@@ -170,7 +174,7 @@ export async function getTraefikConfig(
resourcesWithTargetsAndSites.forEach((row) => {
const resourceId = row.resourceId;
const resourceName = sanitize(row.resourceName) || "";
const targetPath = sanitize(row.path) || ""; // Handle null/undefined paths
const targetPath = encodePath(row.path); // Use encodePath to avoid collisions (e.g. "/a/b" vs "/a-b")
const pathMatchType = row.pathMatchType || "";
const rewritePath = row.rewritePath || "";
const rewritePathType = row.rewritePathType || "";
@@ -192,7 +196,7 @@ export async function getTraefikConfig(
const mapKey = [resourceId, pathKey].filter(Boolean).join("-");
const key = sanitize(mapKey);
if (!resourcesMap.has(key)) {
if (!resourcesMap.has(mapKey)) {
const validation = validatePathRewriteConfig(
row.path,
row.pathMatchType,
@@ -207,9 +211,10 @@ export async function getTraefikConfig(
return;
}
resourcesMap.set(key, {
resourcesMap.set(mapKey, {
resourceId: row.resourceId,
name: resourceName,
key: key,
fullDomain: row.fullDomain,
ssl: row.ssl,
http: row.http,
@@ -243,7 +248,7 @@ export async function getTraefikConfig(
}
// Add target with its associated site data
resourcesMap.get(key).targets.push({
resourcesMap.get(mapKey).targets.push({
resourceId: row.resourceId,
targetId: row.targetId,
ip: row.ip,
@@ -296,8 +301,9 @@ export async function getTraefikConfig(
};
// get the key and the resource
for (const [key, resource] of resourcesMap.entries()) {
for (const [, resource] of resourcesMap.entries()) {
const targets = resource.targets as TargetWithSite[];
const key = resource.key;
const routerName = `${key}-${resource.name}-router`;
const serviceName = `${key}-${resource.name}-service`;

View File

@@ -32,7 +32,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/logs/access/export",
description: "Export the access audit log for an organization as CSV",
tags: [OpenAPITags.Org],
tags: [OpenAPITags.Logs],
request: {
query: queryAccessAuditLogsQuery,
params: queryAccessAuditLogsParams

View File

@@ -32,7 +32,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/logs/action/export",
description: "Export the action audit log for an organization as CSV",
tags: [OpenAPITags.Org],
tags: [OpenAPITags.Logs],
request: {
query: queryActionAuditLogsQuery,
params: queryActionAuditLogsParams

View File

@@ -249,7 +249,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/logs/access",
description: "Query the access audit log for an organization",
tags: [OpenAPITags.Org],
tags: [OpenAPITags.Logs],
request: {
query: queryAccessAuditLogsQuery,
params: queryAccessAuditLogsParams

View File

@@ -160,7 +160,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/logs/action",
description: "Query the action audit log for an organization",
tags: [OpenAPITags.Org],
tags: [OpenAPITags.Logs],
request: {
query: queryActionAuditLogsQuery,
params: queryActionAuditLogsParams

View File

@@ -31,16 +31,16 @@ const getOrgSchema = z.strictObject({
orgId: z.string()
});
registry.registerPath({
method: "get",
path: "/org/{orgId}/billing/usage",
description: "Get an organization's billing usage",
tags: [OpenAPITags.Org],
request: {
params: getOrgSchema
},
responses: {}
});
// registry.registerPath({
// method: "get",
// path: "/org/{orgId}/billing/usage",
// description: "Get an organization's billing usage",
// tags: [OpenAPITags.Org],
// request: {
// params: getOrgSchema
// },
// responses: {}
// });
export async function getOrgUsage(
req: Request,

View File

@@ -24,7 +24,6 @@ import { eq, and } from "drizzle-orm";
import logger from "@server/logger";
import stripe from "#private/lib/stripe";
import { handleSubscriptionLifesycle } from "../subscriptionLifecycle";
import { AudienceIds, moveEmailToAudience } from "#private/lib/resend";
import { getSubType } from "./getSubType";
import privateConfig from "#private/lib/config";
import { getLicensePriceSet, LicenseId } from "@server/lib/billing/licenses";
@@ -172,7 +171,7 @@ export async function handleSubscriptionCreated(
const email = orgUserRes.user.email;
if (email) {
moveEmailToAudience(email, AudienceIds.Subscribed);
// TODO: update user in Sendy
}
}
} else if (type === "license") {

View File

@@ -23,7 +23,6 @@ import {
import { eq, and } from "drizzle-orm";
import logger from "@server/logger";
import { handleSubscriptionLifesycle } from "../subscriptionLifecycle";
import { AudienceIds, moveEmailToAudience } from "#private/lib/resend";
import { getSubType } from "./getSubType";
import stripe from "#private/lib/stripe";
import privateConfig from "#private/lib/config";
@@ -109,7 +108,7 @@ export async function handleSubscriptionDeleted(
const email = orgUserRes.user.email;
if (email) {
moveEmailToAudience(email, AudienceIds.Churned);
// TODO: update user in Sendy
}
}
} else if (type === "license") {

View File

@@ -515,6 +515,6 @@ authenticated.post(
verifyOrgAccess,
verifyLimits,
verifyUserHasAction(ActionsEnum.signSshKey),
logActionAudit(ActionsEnum.signSshKey),
// logActionAudit(ActionsEnum.signSshKey), // it is handled inside of the function below so we can include more metadata
ssh.signSshKey
);

View File

@@ -15,6 +15,7 @@ import { verifySessionRemoteExitNodeMiddleware } from "#private/middlewares/veri
import { Router } from "express";
import {
db,
logsDb,
exitNodes,
Resource,
ResourcePassword,
@@ -81,6 +82,7 @@ import { verifyResourceAccessToken } from "@server/auth/verifyResourceAccessToke
import semver from "semver";
import { maxmindAsnLookup } from "@server/db/maxmindAsn";
import { checkOrgAccessPolicy } from "@server/lib/checkOrgAccessPolicy";
import { sanitizeString } from "@server/lib/sanitize";
// Zod schemas for request validation
const getResourceByDomainParamsSchema = z.strictObject({
@@ -1859,24 +1861,24 @@ hybridRouter.post(
})
.map((logEntry) => ({
timestamp: logEntry.timestamp,
orgId: logEntry.orgId,
actorType: logEntry.actorType,
actor: logEntry.actor,
actorId: logEntry.actorId,
metadata: logEntry.metadata,
orgId: sanitizeString(logEntry.orgId),
actorType: sanitizeString(logEntry.actorType),
actor: sanitizeString(logEntry.actor),
actorId: sanitizeString(logEntry.actorId),
metadata: sanitizeString(logEntry.metadata),
action: logEntry.action,
resourceId: logEntry.resourceId,
reason: logEntry.reason,
location: logEntry.location,
location: sanitizeString(logEntry.location),
// userAgent: data.userAgent, // TODO: add this
// headers: data.body.headers,
// query: data.body.query,
originalRequestURL: logEntry.originalRequestURL,
scheme: logEntry.scheme,
host: logEntry.host,
path: logEntry.path,
method: logEntry.method,
ip: logEntry.ip,
originalRequestURL: sanitizeString(logEntry.originalRequestURL) ?? "",
scheme: sanitizeString(logEntry.scheme) ?? "",
host: sanitizeString(logEntry.host) ?? "",
path: sanitizeString(logEntry.path) ?? "",
method: sanitizeString(logEntry.method) ?? "",
ip: sanitizeString(logEntry.ip),
tls: logEntry.tls
}));
@@ -1884,7 +1886,7 @@ hybridRouter.post(
const batchSize = 100;
for (let i = 0; i < logEntries.length; i += batchSize) {
const batch = logEntries.slice(i, i + batchSize);
await db.insert(requestAuditLog).values(batch);
await logsDb.insert(requestAuditLog).values(batch);
}
return response(res, {

View File

@@ -52,7 +52,7 @@ registry.registerPath({
method: "put",
path: "/org/{orgId}/idp/oidc",
description: "Create an OIDC IdP for a specific organization.",
tags: [OpenAPITags.Idp, OpenAPITags.Org],
tags: [OpenAPITags.OrgIdp],
request: {
params: paramsSchema,
body: {

View File

@@ -35,7 +35,7 @@ registry.registerPath({
method: "delete",
path: "/org/{orgId}/idp/{idpId}",
description: "Delete IDP for a specific organization.",
tags: [OpenAPITags.Idp, OpenAPITags.Org],
tags: [OpenAPITags.OrgIdp],
request: {
params: paramsSchema
},

View File

@@ -50,9 +50,9 @@ async function query(idpId: number, orgId: string) {
registry.registerPath({
method: "get",
path: "/org/:orgId/idp/:idpId",
path: "/org/{orgId}/idp/{idpId}",
description: "Get an IDP by its IDP ID for a specific organization.",
tags: [OpenAPITags.Idp, OpenAPITags.Org],
tags: [OpenAPITags.OrgIdp],
request: {
params: paramsSchema
},

View File

@@ -67,7 +67,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/idp",
description: "List all IDP for a specific organization.",
tags: [OpenAPITags.Idp, OpenAPITags.Org],
tags: [OpenAPITags.OrgIdp],
request: {
query: querySchema,
params: paramsSchema

View File

@@ -59,7 +59,7 @@ registry.registerPath({
method: "post",
path: "/org/{orgId}/idp/{idpId}/oidc",
description: "Update an OIDC IdP for a specific organization.",
tags: [OpenAPITags.Idp, OpenAPITags.Org],
tags: [OpenAPITags.OrgIdp],
request: {
params: paramsSchema,
body: {

View File

@@ -38,7 +38,7 @@ export const startRemoteExitNodeOfflineChecker = (): void => {
);
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
const newlyOfflineNodes = await db
const offlineNodes = await db
.update(exitNodes)
.set({ online: false })
.where(
@@ -53,32 +53,15 @@ export const startRemoteExitNodeOfflineChecker = (): void => {
)
.returning();
// Update the sites to offline if they have not pinged either
const exitNodeIds = newlyOfflineNodes.map(
(node) => node.exitNodeId
);
const sitesOnNode = await db
.select()
.from(sites)
.where(
and(
eq(sites.online, true),
inArray(sites.exitNodeId, exitNodeIds)
)
if (offlineNodes.length > 0) {
logger.info(
`checkRemoteExitNodeOffline: Marked ${offlineNodes.length} remoteExitNode client(s) offline due to inactivity`
);
// loop through the sites and process their lastBandwidthUpdate as an iso string and if its more than 1 minute old then mark the site offline
for (const site of sitesOnNode) {
if (!site.lastBandwidthUpdate) {
continue;
}
const lastBandwidthUpdate = new Date(site.lastBandwidthUpdate);
if (Date.now() - lastBandwidthUpdate.getTime() > 60 * 1000) {
await db
.update(sites)
.set({ online: false })
.where(eq(sites.siteId, site.siteId));
for (const offlineClient of offlineNodes) {
logger.debug(
`checkRemoteExitNodeOffline: Client ${offlineClient.exitNodeId} marked offline (lastPing: ${offlineClient.lastPing})`
);
}
}
} catch (error) {

View File

@@ -52,7 +52,7 @@ registry.registerPath({
method: "get",
path: "/maintenance/info",
description: "Get maintenance information for a resource by domain.",
tags: [OpenAPITags.Resource],
tags: [OpenAPITags.PublicResource],
request: {
query: z.object({
fullDomain: z.string()

View File

@@ -14,7 +14,9 @@
import { Request, Response, NextFunction } from "express";
import { z } from "zod";
import {
actionAuditLog,
db,
logsDb,
newts,
roles,
roundTripMessageTracker,
@@ -29,12 +31,12 @@ import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
import logger from "@server/logger";
import { fromError } from "zod-validation-error";
import { OpenAPITags, registry } from "@server/openApi";
import { eq, or, and } from "drizzle-orm";
import { canUserAccessSiteResource } from "@server/auth/canUserAccessSiteResource";
import { signPublicKey, getOrgCAKeys } from "@server/lib/sshCA";
import config from "@server/lib/config";
import { sendToClient } from "#private/routers/ws";
import { ActionsEnum } from "@server/auth/actions";
const paramsSchema = z.strictObject({
orgId: z.string().nonempty()
@@ -64,6 +66,7 @@ export type SignSshKeyResponse = {
sshUsername: string;
sshHost: string;
resourceId: number;
siteId: number;
keyId: string;
validPrincipals: string[];
validAfter: string;
@@ -446,6 +449,20 @@ export async function signSshKey(
sshHost = resource.destination;
}
await logsDb.insert(actionAuditLog).values({
timestamp: Math.floor(Date.now() / 1000),
orgId: orgId,
actorType: "user",
actor: req.user?.username ?? "",
actorId: req.user?.userId ?? "",
action: ActionsEnum.signSshKey,
metadata: JSON.stringify({
resourceId: resource.siteResourceId,
resource: resource.name,
siteId: resource.siteId,
})
});
return response<SignSshKeyResponse>(res, {
data: {
certificate: cert.certificate,
@@ -453,6 +470,7 @@ export async function signSshKey(
sshUsername: usernameToUse,
sshHost: sshHost,
resourceId: resource.siteResourceId,
siteId: resource.siteId,
keyId: cert.keyId,
validPrincipals: cert.validPrincipals,
validAfter: cert.validAfter.toISOString(),

View File

@@ -17,10 +17,13 @@ import {
startRemoteExitNodeOfflineChecker
} from "#private/routers/remoteExitNode";
import { MessageHandler } from "@server/routers/ws";
import { build } from "@server/build";
export const messageHandlers: Record<string, MessageHandler> = {
"remoteExitNode/register": handleRemoteExitNodeRegisterMessage,
"remoteExitNode/ping": handleRemoteExitNodePingMessage
};
startRemoteExitNodeOfflineChecker(); // this is to handle the offline check for remote exit nodes
if (build != "saas") {
startRemoteExitNodeOfflineChecker(); // this is to handle the offline check for remote exit nodes
}

View File

@@ -12,6 +12,7 @@
*/
import { Router, Request, Response } from "express";
import zlib from "zlib";
import { Server as HttpServer } from "http";
import { WebSocket, WebSocketServer } from "ws";
import { Socket } from "net";
@@ -24,7 +25,8 @@ import {
OlmSession,
RemoteExitNode,
RemoteExitNodeSession,
remoteExitNodes
remoteExitNodes,
sites
} from "@server/db";
import { eq } from "drizzle-orm";
import { db } from "@server/db";
@@ -57,11 +59,13 @@ const MAX_PENDING_MESSAGES = 50; // Maximum messages to queue during connection
const processMessage = async (
ws: AuthenticatedWebSocket,
data: Buffer,
isBinary: boolean,
clientId: string,
clientType: ClientType
): Promise<void> => {
try {
const message: WSMessage = JSON.parse(data.toString());
const messageBuffer = isBinary ? zlib.gunzipSync(data) : data;
const message: WSMessage = JSON.parse(messageBuffer.toString());
// logger.debug(
// `Processing message from ${clientType.toUpperCase()} ID: ${clientId}, type: ${message.type}`
@@ -76,7 +80,7 @@ const processMessage = async (
clientId,
message.type, // Pass message type for granular limiting
100, // max requests per window
20, // max requests per message type per window
100, // max requests per message type per window
60 * 1000 // window in milliseconds
);
if (rateLimitResult.isLimited) {
@@ -163,8 +167,16 @@ const processPendingMessages = async (
);
const jobs = [];
for (const messageData of ws.pendingMessages) {
jobs.push(processMessage(ws, messageData, clientId, clientType));
for (const pending of ws.pendingMessages) {
jobs.push(
processMessage(
ws,
pending.data,
pending.isBinary,
clientId,
clientType
)
);
}
await Promise.all(jobs);
@@ -185,6 +197,12 @@ const connectedClients: Map<string, AuthenticatedWebSocket[]> = new Map();
// Config version tracking map (local to this node, resets on server restart)
const clientConfigVersions: Map<string, number> = new Map();
// Tracks the last Unix timestamp (seconds) at which a ping was flushed to the
// DB for a given siteId. Resets on server restart which is fine the first
// ping after startup will always write, re-establishing the online state.
const lastPingDbWrite: Map<number, number> = new Map();
const PING_DB_WRITE_INTERVAL = 45; // seconds
// Recovery tracking
let isRedisRecoveryInProgress = false;
@@ -325,7 +343,9 @@ const addClient = async (
// Check Redis first if enabled
if (redisManager.isRedisEnabled()) {
try {
const redisVersion = await redisManager.get(getConfigVersionKey(clientId));
const redisVersion = await redisManager.get(
getConfigVersionKey(clientId)
);
if (redisVersion !== null) {
configVersion = parseInt(redisVersion, 10);
// Sync to local cache
@@ -337,7 +357,10 @@ const addClient = async (
} else {
// Use local cache version and sync to Redis
configVersion = clientConfigVersions.get(clientId) || 0;
await redisManager.set(getConfigVersionKey(clientId), configVersion.toString());
await redisManager.set(
getConfigVersionKey(clientId),
configVersion.toString()
);
}
} catch (error) {
logger.error("Failed to get/set config version in Redis:", error);
@@ -432,7 +455,9 @@ const removeClient = async (
};
// Helper to get the current config version for a client
const getClientConfigVersion = async (clientId: string): Promise<number | undefined> => {
const getClientConfigVersion = async (
clientId: string
): Promise<number | undefined> => {
// Try Redis first if available
if (redisManager.isRedisEnabled()) {
try {
@@ -502,11 +527,26 @@ const sendToClientLocal = async (
};
const messageString = JSON.stringify(messageWithVersion);
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(messageString);
}
});
if (options.compress) {
logger.debug(
`Message size before compression: ${messageString.length} bytes`
);
const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8"));
logger.debug(
`Message size after compression: ${compressed.length} bytes`
);
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(compressed);
}
});
} else {
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(messageString);
}
});
}
return true;
};
@@ -532,11 +572,22 @@ const broadcastToAllExceptLocal = async (
configVersion
};
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(messageWithVersion));
}
});
if (options.compress) {
const compressed = zlib.gzipSync(
Buffer.from(JSON.stringify(messageWithVersion), "utf8")
);
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(compressed);
}
});
} else {
clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(JSON.stringify(messageWithVersion));
}
});
}
}
}
};
@@ -762,7 +813,7 @@ const setupConnection = async (
}
// Set up message handler FIRST to prevent race condition
ws.on("message", async (data) => {
ws.on("message", async (data, isBinary) => {
if (!ws.isFullyConnected) {
// Queue message for later processing with limits
ws.pendingMessages = ws.pendingMessages || [];
@@ -777,11 +828,17 @@ const setupConnection = async (
logger.debug(
`Queueing message from ${clientType.toUpperCase()} ID: ${clientId} (connection not fully established)`
);
ws.pendingMessages.push(data as Buffer);
ws.pendingMessages.push({ data: data as Buffer, isBinary });
return;
}
await processMessage(ws, data as Buffer, clientId, clientType);
await processMessage(
ws,
data as Buffer,
isBinary,
clientId,
clientType
);
});
// Set up other event handlers before async operations
@@ -796,6 +853,35 @@ const setupConnection = async (
);
});
// Handle WebSocket protocol-level pings from older newt clients that do
// not send application-level "newt/ping" messages. Update the site's
// online state and lastPing timestamp so the offline checker treats them
// the same as modern newt clients.
if (clientType === "newt") {
const newtClient = client as Newt;
ws.on("ping", async () => {
if (!newtClient.siteId) return;
const now = Math.floor(Date.now() / 1000);
const lastWrite = lastPingDbWrite.get(newtClient.siteId) ?? 0;
if (now - lastWrite < PING_DB_WRITE_INTERVAL) return;
lastPingDbWrite.set(newtClient.siteId, now);
try {
await db
.update(sites)
.set({
online: true,
lastPing: now
})
.where(eq(sites.siteId, newtClient.siteId));
} catch (error) {
logger.error(
"Error updating newt site online state on WS ping",
{ error }
);
}
});
}
ws.on("error", (error: Error) => {
logger.error(
`WebSocket error for ${clientType.toUpperCase()} ID ${clientId}:`,

View File

@@ -43,7 +43,7 @@ registry.registerPath({
method: "post",
path: "/resource/{resourceId}/access-token",
description: "Generate a new access token for a resource.",
tags: [OpenAPITags.Resource, OpenAPITags.AccessToken],
tags: [OpenAPITags.PublicResource, OpenAPITags.AccessToken],
request: {
params: generateAccssTokenParamsSchema,
body: {

View File

@@ -122,7 +122,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/access-tokens",
description: "List all access tokens in an organization.",
tags: [OpenAPITags.Org, OpenAPITags.AccessToken],
tags: [OpenAPITags.AccessToken],
request: {
params: z.object({
orgId: z.string()
@@ -135,8 +135,8 @@ registry.registerPath({
registry.registerPath({
method: "get",
path: "/resource/{resourceId}/access-tokens",
description: "List all access tokens in an organization.",
tags: [OpenAPITags.Resource, OpenAPITags.AccessToken],
description: "List all access tokens for a resource.",
tags: [OpenAPITags.PublicResource, OpenAPITags.AccessToken],
request: {
params: z.object({
resourceId: z.number()

View File

@@ -37,7 +37,7 @@ registry.registerPath({
method: "put",
path: "/org/{orgId}/api-key",
description: "Create a new API key scoped to the organization.",
tags: [OpenAPITags.Org, OpenAPITags.ApiKey],
tags: [OpenAPITags.ApiKey],
request: {
params: paramsSchema,
body: {

View File

@@ -18,7 +18,7 @@ registry.registerPath({
method: "delete",
path: "/org/{orgId}/api-key/{apiKeyId}",
description: "Delete an API key.",
tags: [OpenAPITags.Org, OpenAPITags.ApiKey],
tags: [OpenAPITags.ApiKey],
request: {
params: paramsSchema
},

View File

@@ -48,7 +48,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/api-key/{apiKeyId}/actions",
description: "List all actions set for an API key.",
tags: [OpenAPITags.Org, OpenAPITags.ApiKey],
tags: [OpenAPITags.ApiKey],
request: {
params: paramsSchema,
query: querySchema

View File

@@ -52,7 +52,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/api-keys",
description: "List all API keys for an organization",
tags: [OpenAPITags.Org, OpenAPITags.ApiKey],
tags: [OpenAPITags.ApiKey],
request: {
params: paramsSchema,
query: querySchema

View File

@@ -25,7 +25,7 @@ registry.registerPath({
path: "/org/{orgId}/api-key/{apiKeyId}/actions",
description:
"Set actions for an API key. This will replace any existing actions.",
tags: [OpenAPITags.Org, OpenAPITags.ApiKey],
tags: [OpenAPITags.ApiKey],
request: {
params: paramsSchema,
body: {

View File

@@ -20,7 +20,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/logs/request",
description: "Query the request audit log for an organization",
tags: [OpenAPITags.Org],
tags: [OpenAPITags.Logs],
request: {
query: queryAccessAuditLogsQuery.omit({
limit: true,

View File

@@ -151,7 +151,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/logs/analytics",
description: "Query the request audit analytics for an organization",
tags: [OpenAPITags.Org],
tags: [OpenAPITags.Logs],
request: {
query: queryAccessAuditLogsQuery,
params: queryRequestAuditLogsParams

View File

@@ -182,7 +182,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/logs/request",
description: "Query the request audit log for an organization",
tags: [OpenAPITags.Org],
tags: [OpenAPITags.Logs],
request: {
query: queryAccessAuditLogsQuery,
params: queryRequestAuditLogsParams

View File

@@ -1,5 +1,5 @@
import { NextFunction, Request, Response } from "express";
import { db, users } from "@server/db";
import { bannedEmails, bannedIps, db, users } from "@server/db";
import HttpCode from "@server/types/HttpCode";
import { email, z } from "zod";
import { fromError } from "zod-validation-error";
@@ -22,7 +22,6 @@ import { checkValidInvite } from "@server/auth/checkValidInvite";
import { passwordSchema } from "@server/auth/passwordSchema";
import { UserType } from "@server/types/UserTypes";
import { build } from "@server/build";
import resend, { AudienceIds, moveEmailToAudience } from "#dynamic/lib/resend";
export const signupBodySchema = z.object({
email: z.email().toLowerCase(),
@@ -66,6 +65,30 @@ export async function signup(
skipVerificationEmail
} = parsedBody.data;
const [bannedEmail] = await db
.select()
.from(bannedEmails)
.where(eq(bannedEmails.email, email))
.limit(1);
if (bannedEmail) {
return next(
createHttpError(HttpCode.FORBIDDEN, "Signup blocked. Do not attempt to continue to use this service.")
);
}
if (req.ip) {
const [bannedIp] = await db
.select()
.from(bannedIps)
.where(eq(bannedIps.ip, req.ip))
.limit(1);
if (bannedIp) {
return next(
createHttpError(HttpCode.FORBIDDEN, "Signup blocked. Do not attempt to continue to use this service.")
);
}
}
const passwordHash = await hashPassword(password);
const userId = generateId(15);
@@ -189,6 +212,7 @@ export async function signup(
dateCreated: moment().toISOString(),
termsAcceptedTimestamp: termsAcceptedTimestamp || null,
termsVersion: "1",
marketingEmailConsent: marketingEmailConsent ?? false,
lastPasswordChange: new Date().getTime()
});
@@ -212,7 +236,7 @@ export async function signup(
logger.debug(
`User ${email} opted in to marketing emails during signup.`
);
moveEmailToAudience(email, AudienceIds.SignUps);
// TODO: update user in Sendy
}
if (config.getRawConfig().flags?.require_email_verification) {

View File

@@ -5,6 +5,8 @@ import cache from "#dynamic/lib/cache";
import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs";
import { stripPortFromHost } from "@server/lib/ip";
import { sanitizeString } from "@server/lib/sanitize";
/**
Reasons:
@@ -253,24 +255,23 @@ export async function logRequestAudit(
// Add to buffer instead of writing directly to DB
auditLogBuffer.push({
timestamp,
orgId: data.orgId,
actorType,
actor,
actorId,
metadata,
orgId: sanitizeString(data.orgId),
actorType: sanitizeString(actorType),
actor: sanitizeString(actor),
actorId: sanitizeString(actorId),
metadata: sanitizeString(metadata),
action: data.action,
resourceId: data.resourceId,
reason: data.reason,
location: data.location,
originalRequestURL: body.originalRequestURL,
scheme: body.scheme,
host: body.host,
path: body.path,
method: body.method,
ip: clientIp,
location: sanitizeString(data.location),
originalRequestURL: sanitizeString(body.originalRequestURL) ?? "",
scheme: sanitizeString(body.scheme) ?? "",
host: sanitizeString(body.host) ?? "",
path: sanitizeString(body.path) ?? "",
method: sanitizeString(body.method) ?? "",
ip: sanitizeString(clientIp),
tls: body.tls
});
// Flush immediately if buffer is full, otherwise schedule a flush
if (auditLogBuffer.length >= BATCH_SIZE) {
// Fire and forget - don't block the caller

View File

@@ -20,7 +20,7 @@ registry.registerPath({
method: "put",
path: "/org/{orgId}/blueprint",
description: "Apply a base64 encoded JSON blueprint to an organization",
tags: [OpenAPITags.Org, OpenAPITags.Blueprint],
tags: [OpenAPITags.Blueprint],
request: {
params: applyBlueprintParamsSchema,
body: {

View File

@@ -43,7 +43,7 @@ registry.registerPath({
method: "put",
path: "/org/{orgId}/blueprint",
description: "Create and apply a YAML blueprint to an organization",
tags: [OpenAPITags.Org, OpenAPITags.Blueprint],
tags: [OpenAPITags.Blueprint],
request: {
params: applyBlueprintParamsSchema,
body: {

View File

@@ -53,7 +53,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/blueprint/{blueprintId}",
description: "Get a blueprint by its blueprint ID.",
tags: [OpenAPITags.Org, OpenAPITags.Blueprint],
tags: [OpenAPITags.Blueprint],
request: {
params: getBlueprintSchema
},

View File

@@ -67,7 +67,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/blueprints",
description: "List all blueprints for a organization.",
tags: [OpenAPITags.Org, OpenAPITags.Blueprint],
tags: [OpenAPITags.Blueprint],
request: {
params: z.object({
orgId: z.string()

View File

@@ -48,7 +48,7 @@ registry.registerPath({
method: "put",
path: "/org/{orgId}/client",
description: "Create a new client for an organization.",
tags: [OpenAPITags.Client, OpenAPITags.Org],
tags: [OpenAPITags.Client],
request: {
params: createClientParamsSchema,
body: {

View File

@@ -49,7 +49,7 @@ registry.registerPath({
path: "/org/{orgId}/user/{userId}/client",
description:
"Create a new client for a user and associate it with an existing olm.",
tags: [OpenAPITags.Client, OpenAPITags.Org, OpenAPITags.User],
tags: [OpenAPITags.Client],
request: {
params: paramsSchema,
body: {

View File

@@ -243,7 +243,7 @@ registry.registerPath({
path: "/org/{orgId}/client/{niceId}",
description:
"Get a client by orgId and niceId. NiceId is a readable ID for the site and unique on a per org basis.",
tags: [OpenAPITags.Org, OpenAPITags.Site],
tags: [OpenAPITags.Site],
request: {
params: z.object({
orgId: z.string(),

View File

@@ -70,7 +70,7 @@ async function getLatestOlmVersion(): Promise<string | null> {
tags = tags.filter((version) => !version.name.includes("rc"));
const latestVersion = tags[0].name;
olmVersionCache.set("latestOlmVersion", latestVersion);
olmVersionCache.set("latestOlmVersion", latestVersion, 3600);
return latestVersion;
} catch (error: any) {
@@ -237,7 +237,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/clients",
description: "List all clients for an organization.",
tags: [OpenAPITags.Client, OpenAPITags.Org],
tags: [OpenAPITags.Client],
request: {
query: listClientsSchema,
params: listClientsParamsSchema

View File

@@ -71,7 +71,7 @@ async function getLatestOlmVersion(): Promise<string | null> {
tags = tags.filter((version) => !version.name.includes("rc"));
const latestVersion = tags[0].name;
olmVersionCache.set("latestOlmVersion", latestVersion);
olmVersionCache.set("latestOlmVersion", latestVersion, 3600);
return latestVersion;
} catch (error: any) {
@@ -256,7 +256,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/user-devices",
description: "List all user devices for an organization.",
tags: [OpenAPITags.Client, OpenAPITags.Org],
tags: [OpenAPITags.Client],
request: {
query: listUserDevicesSchema,
params: listUserDevicesParamsSchema

View File

@@ -23,7 +23,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/pick-client-defaults",
description: "Return pre-requisite data for creating a client.",
tags: [OpenAPITags.Client, OpenAPITags.Site],
tags: [OpenAPITags.Client],
request: {
params: pickClientDefaultsSchema
},

View File

@@ -1,31 +1,16 @@
import { sendToClient } from "#dynamic/routers/ws";
import { S } from "@faker-js/faker/dist/airline-Dz1uGqgJ";
import { db, newts, olms, Transaction } from "@server/db";
import { db, newts, olms } from "@server/db";
import {
Alias,
convertSubnetProxyTargetsV2ToV1,
SubnetProxyTarget,
SubnetProxyTargetV2
} from "@server/lib/ip";
import { canCompress } from "@server/lib/clientVersionChecks";
import logger from "@server/logger";
import { eq } from "drizzle-orm";
import semver from "semver";
const BATCH_SIZE = 50;
const BATCH_DELAY_MS = 50;
function sleep(ms: number): Promise<void> {
return new Promise((resolve) => setTimeout(resolve, ms));
}
function chunkArray<T>(array: T[], size: number): T[][] {
const chunks: T[][] = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
const NEWT_V2_TARGETS_VERSION = ">=1.11.0";
export async function convertTargetsIfNessicary(
@@ -59,53 +44,36 @@ export async function convertTargetsIfNessicary(
export async function addTargets(
newtId: string,
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[]
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[],
version?: string | null
) {
targets = await convertTargetsIfNessicary(newtId, targets);
const batches = chunkArray<SubnetProxyTarget | SubnetProxyTargetV2>(
targets,
BATCH_SIZE
await sendToClient(
newtId,
{
type: `newt/wg/targets/add`,
data: targets
},
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
);
for (let i = 0; i < batches.length; i++) {
if (i > 0) {
await sleep(BATCH_DELAY_MS);
}
await sendToClient(
newtId,
{
type: `newt/wg/targets/add`,
data: batches[i]
},
{ incrementConfigVersion: true }
);
}
}
export async function removeTargets(
newtId: string,
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[]
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[],
version?: string | null
) {
targets = await convertTargetsIfNessicary(newtId, targets);
const batches = chunkArray<SubnetProxyTarget | SubnetProxyTargetV2>(
targets,
BATCH_SIZE
await sendToClient(
newtId,
{
type: `newt/wg/targets/remove`,
data: targets
},
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
);
for (let i = 0; i < batches.length; i++) {
if (i > 0) {
await sleep(BATCH_DELAY_MS);
}
await sendToClient(
newtId,
{
type: `newt/wg/targets/remove`,
data: batches[i]
},
{ incrementConfigVersion: true }
);
}
}
export async function updateTargets(
@@ -113,7 +81,8 @@ export async function updateTargets(
targets: {
oldTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
newTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
}
},
version?: string | null
) {
// get the newt
const [newt] = await db
@@ -143,35 +112,19 @@ export async function updateTargets(
};
}
const oldBatches = chunkArray<SubnetProxyTarget | SubnetProxyTargetV2>(
targets.oldTargets,
BATCH_SIZE
);
const newBatches = chunkArray<SubnetProxyTarget | SubnetProxyTargetV2>(
targets.newTargets,
BATCH_SIZE
);
const maxBatches = Math.max(oldBatches.length, newBatches.length);
for (let i = 0; i < maxBatches; i++) {
if (i > 0) {
await sleep(BATCH_DELAY_MS);
}
await sendToClient(
newtId,
{
type: `newt/wg/targets/update`,
data: {
oldTargets: oldBatches[i] || [],
newTargets: newBatches[i] || []
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
}
await sendToClient(
newtId,
{
type: `newt/wg/targets/update`,
data: {
oldTargets: targets.oldTargets,
newTargets: targets.newTargets
}
},
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
}
export async function addPeerData(
@@ -179,7 +132,8 @@ export async function addPeerData(
siteId: number,
remoteSubnets: string[],
aliases: Alias[],
olmId?: string
olmId?: string,
version?: string | null
) {
if (!olmId) {
const [olm] = await db
@@ -191,6 +145,7 @@ export async function addPeerData(
return; // ignore this because an olm might not be associated with the client anymore
}
olmId = olm.olmId;
version = olm.version;
}
await sendToClient(
@@ -203,7 +158,7 @@ export async function addPeerData(
aliases: aliases
}
},
{ incrementConfigVersion: true }
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
@@ -214,7 +169,8 @@ export async function removePeerData(
siteId: number,
remoteSubnets: string[],
aliases: Alias[],
olmId?: string
olmId?: string,
version?: string | null
) {
if (!olmId) {
const [olm] = await db
@@ -226,6 +182,7 @@ export async function removePeerData(
return;
}
olmId = olm.olmId;
version = olm.version;
}
await sendToClient(
@@ -238,7 +195,7 @@ export async function removePeerData(
aliases: aliases
}
},
{ incrementConfigVersion: true }
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
@@ -259,7 +216,8 @@ export async function updatePeerData(
newAliases: Alias[];
}
| undefined,
olmId?: string
olmId?: string,
version?: string | null
) {
if (!olmId) {
const [olm] = await db
@@ -271,6 +229,7 @@ export async function updatePeerData(
return;
}
olmId = olm.olmId;
version = olm.version;
}
await sendToClient(
@@ -283,7 +242,7 @@ export async function updatePeerData(
...aliases
}
},
{ incrementConfigVersion: true }
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});

View File

@@ -40,7 +40,8 @@ async function queryDomains(orgId: string, limit: number, offset: number) {
tries: domains.tries,
configManaged: domains.configManaged,
certResolver: domains.certResolver,
preferWildcardCert: domains.preferWildcardCert
preferWildcardCert: domains.preferWildcardCert,
errorMessage: domains.errorMessage
})
.from(orgDomains)
.where(eq(orgDomains.orgId, orgId))
@@ -59,7 +60,7 @@ registry.registerPath({
method: "get",
path: "/org/{orgId}/domains",
description: "List all domains for a organization.",
tags: [OpenAPITags.Org],
tags: [OpenAPITags.Domain],
request: {
params: z.object({
orgId: z.string()

View File

@@ -125,7 +125,7 @@ export async function generateRelayMappings(exitNode: ExitNode) {
// Add site as a destination for this client
const destination: PeerDestination = {
destinationIP: site.subnet.split("/")[0],
destinationPort: site.listenPort
destinationPort: site.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
};
// Check if this destination is already in the array to avoid duplicates
@@ -165,7 +165,7 @@ export async function generateRelayMappings(exitNode: ExitNode) {
const destination: PeerDestination = {
destinationIP: peer.subnet.split("/")[0],
destinationPort: peer.listenPort
destinationPort: peer.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
};
// Check for duplicates

View File

@@ -1,5 +1,5 @@
import { Request, Response, NextFunction } from "express";
import { eq, and, lt, inArray, sql } from "drizzle-orm";
import { eq, sql } from "drizzle-orm";
import { sites } from "@server/db";
import { db } from "@server/db";
import logger from "@server/logger";
@@ -11,19 +11,31 @@ import { FeatureId } from "@server/lib/billing/features";
import { checkExitNodeOrg } from "#dynamic/lib/exitNodes";
import { build } from "@server/build";
// Track sites that are already offline to avoid unnecessary queries
const offlineSites = new Set<string>();
// Retry configuration for deadlock handling
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50;
interface PeerBandwidth {
publicKey: string;
bytesIn: number;
bytesOut: number;
}
interface AccumulatorEntry {
bytesIn: number;
bytesOut: number;
/** Present when the update came through a remote exit node. */
exitNodeId?: number;
/** Whether to record egress usage for billing purposes. */
calcUsage: boolean;
}
// Retry configuration for deadlock handling
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50;
// How often to flush accumulated bandwidth data to the database
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
// In-memory accumulator: publicKey -> AccumulatorEntry
let accumulator = new Map<string, AccumulatorEntry>();
/**
* Check if an error is a deadlock error
*/
@@ -63,6 +75,220 @@ async function withDeadlockRetry<T>(
}
}
/**
* Flush all accumulated site bandwidth data to the database.
*
* Swaps out the accumulator before writing so that any bandwidth messages
* received during the flush are captured in the new accumulator rather than
* being lost or causing contention. Entries that fail to write are re-queued
* back into the accumulator so they will be retried on the next flush.
*
* This function is exported so that the application's graceful-shutdown
* cleanup handler can call it before the process exits.
*/
export async function flushSiteBandwidthToDb(): Promise<void> {
if (accumulator.size === 0) {
return;
}
// Atomically swap out the accumulator so new data keeps flowing in
// while we write the snapshot to the database.
const snapshot = accumulator;
accumulator = new Map<string, AccumulatorEntry>();
const currentTime = new Date().toISOString();
// Sort by publicKey for consistent lock ordering across concurrent
// writers — deadlock-prevention strategy.
const sortedEntries = [...snapshot.entries()].sort(([a], [b]) =>
a.localeCompare(b)
);
logger.debug(
`Flushing accumulated bandwidth data for ${sortedEntries.length} site(s) to the database`
);
// Aggregate billing usage by org, collected during the DB update loop.
const orgUsageMap = new Map<string, number>();
for (const [publicKey, { bytesIn, bytesOut, exitNodeId, calcUsage }] of sortedEntries) {
try {
const updatedSite = await withDeadlockRetry(async () => {
const [result] = await db
.update(sites)
.set({
megabytesOut: sql`COALESCE(${sites.megabytesOut}, 0) + ${bytesIn}`,
megabytesIn: sql`COALESCE(${sites.megabytesIn}, 0) + ${bytesOut}`,
lastBandwidthUpdate: currentTime,
})
.where(eq(sites.pubKey, publicKey))
.returning({
orgId: sites.orgId,
siteId: sites.siteId
});
return result;
}, `flush bandwidth for site ${publicKey}`);
if (updatedSite) {
if (exitNodeId) {
const notAllowed = await checkExitNodeOrg(
exitNodeId,
updatedSite.orgId
);
if (notAllowed) {
logger.warn(
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
);
// Skip usage tracking for this site but continue
// processing the rest.
continue;
}
}
if (calcUsage) {
const totalBandwidth = bytesIn + bytesOut;
const current = orgUsageMap.get(updatedSite.orgId) ?? 0;
orgUsageMap.set(updatedSite.orgId, current + totalBandwidth);
}
}
} catch (error) {
logger.error(
`Failed to flush bandwidth for site ${publicKey}:`,
error
);
// Re-queue the failed entry so it is retried on the next flush
// rather than silently dropped.
const existing = accumulator.get(publicKey);
if (existing) {
existing.bytesIn += bytesIn;
existing.bytesOut += bytesOut;
} else {
accumulator.set(publicKey, {
bytesIn,
bytesOut,
exitNodeId,
calcUsage
});
}
}
}
// Process billing usage updates outside the site-update loop to keep
// lock scope small and concerns separated.
if (orgUsageMap.size > 0) {
// Sort org IDs for consistent lock ordering.
const sortedOrgIds = [...orgUsageMap.keys()].sort();
for (const orgId of sortedOrgIds) {
try {
const totalBandwidth = orgUsageMap.get(orgId)!;
const bandwidthUsage = await usageService.add(
orgId,
FeatureId.EGRESS_DATA_MB,
totalBandwidth
);
if (bandwidthUsage) {
// Fire-and-forget — don't block the flush on limit checking.
usageService
.checkLimitSet(
orgId,
FeatureId.EGRESS_DATA_MB,
bandwidthUsage
)
.catch((error: any) => {
logger.error(
`Error checking bandwidth limits for org ${orgId}:`,
error
);
});
}
} catch (error) {
logger.error(
`Error processing usage for org ${orgId}:`,
error
);
// Continue with other orgs.
}
}
}
}
// ---------------------------------------------------------------------------
// Periodic flush timer
// ---------------------------------------------------------------------------
const flushTimer = setInterval(async () => {
try {
await flushSiteBandwidthToDb();
} catch (error) {
logger.error(
"Unexpected error during periodic site bandwidth flush:",
error
);
}
}, FLUSH_INTERVAL_MS);
// Allow the process to exit normally even while the timer is pending.
// The graceful-shutdown path (see server/cleanup.ts) will call
// flushSiteBandwidthToDb() explicitly before process.exit(), so no data
// is lost.
flushTimer.unref();
// ---------------------------------------------------------------------------
// Public API
// ---------------------------------------------------------------------------
/**
* Accumulate bandwidth data reported by a gerbil or remote exit node.
*
* Only peers that actually transferred data (bytesIn > 0) are added to the
* accumulator; peers with no activity are silently ignored, which means the
* flush will only write rows that have genuinely changed.
*
* The function is intentionally synchronous in its fast path so that the
* HTTP handler can respond immediately without waiting for any I/O.
*/
export async function updateSiteBandwidth(
bandwidthData: PeerBandwidth[],
calcUsageAndLimits: boolean,
exitNodeId?: number
): Promise<void> {
for (const { publicKey, bytesIn, bytesOut } of bandwidthData) {
// Skip peers that haven't transferred any data — writing zeros to the
// database would be a no-op anyway.
if (bytesIn <= 0 && bytesOut <= 0) {
continue;
}
const existing = accumulator.get(publicKey);
if (existing) {
existing.bytesIn += bytesIn;
existing.bytesOut += bytesOut;
// Retain the most-recent exitNodeId for this peer.
if (exitNodeId !== undefined) {
existing.exitNodeId = exitNodeId;
}
// Once calcUsage has been requested for a peer, keep it set for
// the lifetime of this flush window.
if (calcUsageAndLimits) {
existing.calcUsage = true;
}
} else {
accumulator.set(publicKey, {
bytesIn,
bytesOut,
exitNodeId,
calcUsage: calcUsageAndLimits
});
}
}
}
// ---------------------------------------------------------------------------
// HTTP handler
// ---------------------------------------------------------------------------
export const receiveBandwidth = async (
req: Request,
res: Response,
@@ -75,7 +301,9 @@ export const receiveBandwidth = async (
throw new Error("Invalid bandwidth data");
}
await updateSiteBandwidth(bandwidthData, build == "saas"); // we are checking the usage on saas only
// Accumulate in memory; the periodic timer (and the shutdown hook)
// will write to the database.
await updateSiteBandwidth(bandwidthData, build == "saas");
return response(res, {
data: {},
@@ -94,201 +322,3 @@ export const receiveBandwidth = async (
);
}
};
export async function updateSiteBandwidth(
bandwidthData: PeerBandwidth[],
calcUsageAndLimits: boolean,
exitNodeId?: number
) {
const currentTime = new Date();
const oneMinuteAgo = new Date(currentTime.getTime() - 60000); // 1 minute ago
// Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances
// This is critical for preventing deadlocks when multiple instances update the same sites
const sortedBandwidthData = [...bandwidthData].sort((a, b) =>
a.publicKey.localeCompare(b.publicKey)
);
// First, handle sites that are actively reporting bandwidth
const activePeers = sortedBandwidthData.filter((peer) => peer.bytesIn > 0);
// Aggregate usage data by organization (collected outside transaction)
const orgUsageMap = new Map<string, number>();
if (activePeers.length > 0) {
// Remove any active peers from offline tracking since they're sending data
activePeers.forEach((peer) => offlineSites.delete(peer.publicKey));
// Update each active site individually with retry logic
// This reduces transaction scope and allows retries per-site
for (const peer of activePeers) {
try {
const updatedSite = await withDeadlockRetry(async () => {
const [result] = await db
.update(sites)
.set({
megabytesOut: sql`${sites.megabytesOut} + ${peer.bytesIn}`,
megabytesIn: sql`${sites.megabytesIn} + ${peer.bytesOut}`,
lastBandwidthUpdate: currentTime.toISOString(),
online: true
})
.where(eq(sites.pubKey, peer.publicKey))
.returning({
online: sites.online,
orgId: sites.orgId,
siteId: sites.siteId,
lastBandwidthUpdate: sites.lastBandwidthUpdate
});
return result;
}, `update active site ${peer.publicKey}`);
if (updatedSite) {
if (exitNodeId) {
const notAllowed = await checkExitNodeOrg(
exitNodeId,
updatedSite.orgId
);
if (notAllowed) {
logger.warn(
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
);
// Skip this site but continue processing others
continue;
}
}
// Aggregate bandwidth usage for the org
const totalBandwidth = peer.bytesIn + peer.bytesOut;
const currentOrgUsage =
orgUsageMap.get(updatedSite.orgId) || 0;
orgUsageMap.set(
updatedSite.orgId,
currentOrgUsage + totalBandwidth
);
}
} catch (error) {
logger.error(
`Failed to update bandwidth for site ${peer.publicKey}:`,
error
);
// Continue with other sites
}
}
}
// Process usage updates outside of site update transactions
// This separates the concerns and reduces lock contention
if (calcUsageAndLimits && orgUsageMap.size > 0) {
// Sort org IDs to ensure consistent lock ordering
const allOrgIds = [...new Set([...orgUsageMap.keys()])].sort();
for (const orgId of allOrgIds) {
try {
// Process bandwidth usage for this org
const totalBandwidth = orgUsageMap.get(orgId);
if (totalBandwidth) {
const bandwidthUsage = await usageService.add(
orgId,
FeatureId.EGRESS_DATA_MB,
totalBandwidth
);
if (bandwidthUsage) {
// Fire and forget - don't block on limit checking
usageService
.checkLimitSet(
orgId,
FeatureId.EGRESS_DATA_MB,
bandwidthUsage
)
.catch((error: any) => {
logger.error(
`Error checking bandwidth limits for org ${orgId}:`,
error
);
});
}
}
} catch (error) {
logger.error(`Error processing usage for org ${orgId}:`, error);
// Continue with other orgs
}
}
}
// Handle sites that reported zero bandwidth but need online status updated
const zeroBandwidthPeers = sortedBandwidthData.filter(
(peer) => peer.bytesIn === 0 && !offlineSites.has(peer.publicKey)
);
if (zeroBandwidthPeers.length > 0) {
// Fetch all zero bandwidth sites in one query
const zeroBandwidthSites = await db
.select()
.from(sites)
.where(
inArray(
sites.pubKey,
zeroBandwidthPeers.map((p) => p.publicKey)
)
);
// Sort by siteId to ensure consistent lock ordering
const sortedZeroBandwidthSites = zeroBandwidthSites.sort(
(a, b) => a.siteId - b.siteId
);
for (const site of sortedZeroBandwidthSites) {
let newOnlineStatus = site.online;
// Check if site should go offline based on last bandwidth update WITH DATA
if (site.lastBandwidthUpdate) {
const lastUpdateWithData = new Date(site.lastBandwidthUpdate);
if (lastUpdateWithData < oneMinuteAgo) {
newOnlineStatus = false;
}
} else {
// No previous data update recorded, set to offline
newOnlineStatus = false;
}
// Only update online status if it changed
if (site.online !== newOnlineStatus) {
try {
const updatedSite = await withDeadlockRetry(async () => {
const [result] = await db
.update(sites)
.set({
online: newOnlineStatus
})
.where(eq(sites.siteId, site.siteId))
.returning();
return result;
}, `update offline status for site ${site.siteId}`);
if (updatedSite && exitNodeId) {
const notAllowed = await checkExitNodeOrg(
exitNodeId,
updatedSite.orgId
);
if (notAllowed) {
logger.warn(
`Exit node ${exitNodeId} is not allowed for org ${updatedSite.orgId}`
);
}
}
// If site went offline, add it to our tracking set
if (!newOnlineStatus && site.pubKey) {
offlineSites.add(site.pubKey);
}
} catch (error) {
logger.error(
`Failed to update offline status for site ${site.siteId}:`,
error
);
// Continue with other sites
}
}
}
}
}

View File

@@ -112,7 +112,7 @@ export async function updateHolePunch(
destinations: destinations
});
} catch (error) {
// logger.error(error); // FIX THIS
logger.error(error);
return next(
createHttpError(
HttpCode.INTERNAL_SERVER_ERROR,
@@ -262,7 +262,7 @@ export async function updateAndGenerateEndpointDestinations(
if (site.subnet && site.listenPort) {
destinations.push({
destinationIP: site.subnet.split("/")[0],
destinationPort: site.listenPort
destinationPort: site.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
});
}
}
@@ -339,10 +339,10 @@ export async function updateAndGenerateEndpointDestinations(
handleSiteEndpointChange(newt.siteId, updatedSite.endpoint!);
}
if (!updatedSite || !updatedSite.subnet) {
logger.warn(`Site not found: ${newt.siteId}`);
throw new Error("Site not found");
}
// if (!updatedSite || !updatedSite.subnet) {
// logger.warn(`Site not found: ${newt.siteId}`);
// throw new Error("Site not found");
// }
// Find all clients that connect to this site
// const sitesClientPairs = await db

View File

@@ -27,7 +27,7 @@ registry.registerPath({
method: "put",
path: "/idp/{idpId}/org/{orgId}",
description: "Create an IDP policy for an existing IDP on an organization.",
tags: [OpenAPITags.Idp],
tags: [OpenAPITags.GlobalIdp],
request: {
params: paramsSchema,
body: {

View File

@@ -37,7 +37,7 @@ registry.registerPath({
method: "put",
path: "/idp/oidc",
description: "Create an OIDC IdP.",
tags: [OpenAPITags.Idp],
tags: [OpenAPITags.GlobalIdp],
request: {
body: {
content: {

View File

@@ -21,7 +21,7 @@ registry.registerPath({
method: "delete",
path: "/idp/{idpId}",
description: "Delete IDP.",
tags: [OpenAPITags.Idp],
tags: [OpenAPITags.GlobalIdp],
request: {
params: paramsSchema
},

View File

@@ -19,7 +19,7 @@ registry.registerPath({
method: "delete",
path: "/idp/{idpId}/org/{orgId}",
description: "Create an OIDC IdP for an organization.",
tags: [OpenAPITags.Idp],
tags: [OpenAPITags.GlobalIdp],
request: {
params: paramsSchema
},

View File

@@ -34,7 +34,7 @@ registry.registerPath({
method: "get",
path: "/idp/{idpId}",
description: "Get an IDP by its IDP ID.",
tags: [OpenAPITags.Idp],
tags: [OpenAPITags.GlobalIdp],
request: {
params: paramsSchema
},

View File

@@ -48,7 +48,7 @@ registry.registerPath({
method: "get",
path: "/idp/{idpId}/org",
description: "List all org policies on an IDP.",
tags: [OpenAPITags.Idp],
tags: [OpenAPITags.GlobalIdp],
request: {
params: paramsSchema,
query: querySchema

View File

@@ -58,7 +58,7 @@ registry.registerPath({
method: "get",
path: "/idp",
description: "List all IDP in the system.",
tags: [OpenAPITags.Idp],
tags: [OpenAPITags.GlobalIdp],
request: {
query: querySchema
},

View File

@@ -26,7 +26,7 @@ registry.registerPath({
method: "post",
path: "/idp/{idpId}/org/{orgId}",
description: "Update an IDP org policy.",
tags: [OpenAPITags.Idp],
tags: [OpenAPITags.GlobalIdp],
request: {
params: paramsSchema,
body: {

View File

@@ -42,7 +42,7 @@ registry.registerPath({
method: "post",
path: "/idp/{idpId}/oidc",
description: "Update an OIDC IdP.",
tags: [OpenAPITags.Idp],
tags: [OpenAPITags.GlobalIdp],
request: {
params: paramsSchema,
body: {

View File

@@ -135,6 +135,13 @@ authenticated.post(
logActionAudit(ActionsEnum.updateSite),
site.updateSite
);
authenticated.post(
"/org/:orgId/reset-bandwidth",
verifyApiKeyOrgAccess,
verifyApiKeyHasAction(ActionsEnum.resetSiteBandwidth),
logActionAudit(ActionsEnum.resetSiteBandwidth),
org.resetOrgBandwidth
);
authenticated.delete(
"/site/:siteId",
@@ -309,6 +316,14 @@ authenticated.post(
siteResource.removeClientFromSiteResource
);
authenticated.post(
"/client/:clientId/site-resources",
verifyLimits,
verifyApiKeyHasAction(ActionsEnum.setResourceUsers),
logActionAudit(ActionsEnum.setResourceUsers),
siteResource.batchAddClientToSiteResources
);
authenticated.put(
"/org/:orgId/resource",
verifyApiKeyOrgAccess,

View File

@@ -15,6 +15,7 @@ import { initPeerAddHandshake, updatePeer } from "../olm/peers";
import { eq, and } from "drizzle-orm";
import config from "@server/lib/config";
import {
formatEndpoint,
generateSubnetProxyTargetV2,
SubnetProxyTargetV2
} from "@server/lib/ip";
@@ -83,40 +84,42 @@ export async function buildClientConfigurationForNewtClient(
// )
// );
// update the peer info on the olm
// if the peer has not been added yet this will be a no-op
await updatePeer(client.clients.clientId, {
siteId: site.siteId,
endpoint: site.endpoint!,
relayEndpoint: `${exitNode.endpoint}:${config.getRawConfig().gerbil.clients_start_port}`,
publicKey: site.publicKey!,
serverIP: site.address,
serverPort: site.listenPort
// remoteSubnets: generateRemoteSubnets(
// allSiteResources.map(
// ({ siteResources }) => siteResources
// )
// ),
// aliases: generateAliasConfig(
// allSiteResources.map(
// ({ siteResources }) => siteResources
// )
// )
});
if (!client.clientSitesAssociationsCache.isJitMode) { // if we are adding sites through jit then dont add the site to the olm
// update the peer info on the olm
// if the peer has not been added yet this will be a no-op
await updatePeer(client.clients.clientId, {
siteId: site.siteId,
endpoint: site.endpoint!,
relayEndpoint: `${exitNode.endpoint}:${config.getRawConfig().gerbil.clients_start_port}`,
publicKey: site.publicKey!,
serverIP: site.address,
serverPort: site.listenPort
// remoteSubnets: generateRemoteSubnets(
// allSiteResources.map(
// ({ siteResources }) => siteResources
// )
// ),
// aliases: generateAliasConfig(
// allSiteResources.map(
// ({ siteResources }) => siteResources
// )
// )
});
// also trigger the peer add handshake in case the peer was not already added to the olm and we need to hole punch
// if it has already been added this will be a no-op
await initPeerAddHandshake(
// this will kick off the add peer process for the client
client.clients.clientId,
{
siteId,
exitNode: {
publicKey: exitNode.publicKey,
endpoint: exitNode.endpoint
// also trigger the peer add handshake in case the peer was not already added to the olm and we need to hole punch
// if it has already been added this will be a no-op
await initPeerAddHandshake(
// this will kick off the add peer process for the client
client.clients.clientId,
{
siteId,
exitNode: {
publicKey: exitNode.publicKey,
endpoint: exitNode.endpoint
}
}
}
);
);
}
return {
publicKey: client.clients.pubKey!,
@@ -204,7 +207,8 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) {
hcTimeout: targetHealthCheck.hcTimeout,
hcHeaders: targetHealthCheck.hcHeaders,
hcMethod: targetHealthCheck.hcMethod,
hcTlsServerName: targetHealthCheck.hcTlsServerName
hcTlsServerName: targetHealthCheck.hcTlsServerName,
hcStatus: targetHealthCheck.hcStatus
})
.from(targets)
.innerJoin(resources, eq(targets.resourceId, resources.resourceId))
@@ -221,8 +225,8 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) {
return acc;
}
// Format target into string
const formattedTarget = `${target.internalPort}:${target.ip}:${target.port}`;
// Format target into string (handles IPv6 bracketing)
const formattedTarget = `${target.internalPort}:${formatEndpoint(target.ip, target.port)}`;
// Add to the appropriate protocol array
if (target.protocol === "tcp") {
@@ -245,9 +249,9 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) {
!target.hcInterval ||
!target.hcMethod
) {
logger.debug(
`Skipping target ${target.targetId} due to missing health check fields`
);
// logger.debug(
// `Skipping adding target health check ${target.targetId} due to missing health check fields`
// );
return null; // Skip targets with missing health check fields
}
@@ -277,7 +281,8 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) {
hcTimeout: target.hcTimeout, // in seconds
hcHeaders: hcHeadersSend,
hcMethod: target.hcMethod,
hcTlsServerName: target.hcTlsServerName
hcTlsServerName: target.hcTlsServerName,
hcStatus: target.hcStatus
};
});

View File

@@ -7,6 +7,7 @@ import { eq } from "drizzle-orm";
import { sendToExitNode } from "#dynamic/lib/exitNodes";
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
import { convertTargetsIfNessicary } from "../client/targets";
import { canCompress } from "@server/lib/clientVersionChecks";
const inputSchema = z.object({
publicKey: z.string(),
@@ -105,11 +106,11 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
const payload = {
oldDestination: {
destinationIP: existingSite.subnet?.split("/")[0],
destinationPort: existingSite.listenPort
destinationPort: existingSite.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
},
newDestination: {
destinationIP: site.subnet?.split("/")[0],
destinationPort: site.listenPort
destinationPort: site.listenPort || 1 // this satisfies gerbil for now but should be reevaluated
}
};
@@ -138,6 +139,9 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
targets: targetsToSend
}
},
options: {
compress: canCompress(newt.version, "newt")
},
broadcast: false,
excludeSender: false
};

View File

@@ -0,0 +1,36 @@
import { MessageHandler } from "@server/routers/ws";
import { db, Newt, sites } from "@server/db";
import { eq } from "drizzle-orm";
import logger from "@server/logger";
/**
* Handles disconnecting messages from sites to show disconnected in the ui
*/
export const handleNewtDisconnectingMessage: MessageHandler = async (
context
) => {
const { message, client: c, sendToClient } = context;
const newt = c as Newt;
if (!newt) {
logger.warn("Newt not found");
return;
}
if (!newt.siteId) {
logger.warn("Newt has no client ID!");
return;
}
try {
// Update the client's last ping timestamp
await db
.update(sites)
.set({
online: false
})
.where(eq(sites.siteId, newt.siteId));
} catch (error) {
logger.error("Error handling disconnecting message", { error });
}
};

View File

@@ -1,105 +1,107 @@
import { db, sites } from "@server/db";
import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws";
import { db, newts, sites } from "@server/db";
import { hasActiveConnections, getClientConfigVersion } from "#dynamic/routers/ws";
import { MessageHandler } from "@server/routers/ws";
import { clients, Newt } from "@server/db";
import { Newt } from "@server/db";
import { eq, lt, isNull, and, or } from "drizzle-orm";
import logger from "@server/logger";
import { validateSessionToken } from "@server/auth/sessions/app";
import { checkOrgAccessPolicy } from "#dynamic/lib/checkOrgAccessPolicy";
import { sendTerminateClient } from "../client/terminate";
import { encodeHexLowerCase } from "@oslojs/encoding";
import { sha256 } from "@oslojs/crypto/sha2";
import { sendNewtSyncMessage } from "./sync";
// Track if the offline checker interval is running
// let offlineCheckerInterval: NodeJS.Timeout | null = null;
// const OFFLINE_CHECK_INTERVAL = 30 * 1000; // Check every 30 seconds
// const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes
let offlineCheckerInterval: NodeJS.Timeout | null = null;
const OFFLINE_CHECK_INTERVAL = 30 * 1000; // Check every 30 seconds
const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes
/**
* Starts the background interval that checks for clients that haven't pinged recently
* and marks them as offline
* Starts the background interval that checks for newt sites that haven't
* pinged recently and marks them as offline. For backward compatibility,
* a site is only marked offline when there is no active WebSocket connection
* either — so older newt versions that don't send pings but remain connected
* continue to be treated as online.
*/
// export const startNewtOfflineChecker = (): void => {
// if (offlineCheckerInterval) {
// return; // Already running
// }
export const startNewtOfflineChecker = (): void => {
if (offlineCheckerInterval) {
return; // Already running
}
// offlineCheckerInterval = setInterval(async () => {
// try {
// const twoMinutesAgo = Math.floor(
// (Date.now() - OFFLINE_THRESHOLD_MS) / 1000
// );
offlineCheckerInterval = setInterval(async () => {
try {
const twoMinutesAgo = Math.floor(
(Date.now() - OFFLINE_THRESHOLD_MS) / 1000
);
// // TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING
// Find all online newt-type sites that haven't pinged recently
// (or have never pinged at all). Join newts to obtain the newtId
// needed for the WebSocket connection check.
const staleSites = await db
.select({
siteId: sites.siteId,
newtId: newts.newtId,
lastPing: sites.lastPing
})
.from(sites)
.innerJoin(newts, eq(newts.siteId, sites.siteId))
.where(
and(
eq(sites.online, true),
eq(sites.type, "newt"),
or(
lt(sites.lastPing, twoMinutesAgo),
isNull(sites.lastPing)
)
)
);
// // Find clients that haven't pinged in the last 2 minutes and mark them as offline
// const offlineClients = await db
// .update(clients)
// .set({ online: false })
// .where(
// and(
// eq(clients.online, true),
// or(
// lt(clients.lastPing, twoMinutesAgo),
// isNull(clients.lastPing)
// )
// )
// )
// .returning();
for (const staleSite of staleSites) {
// Backward-compatibility check: if the newt still has an
// active WebSocket connection (older clients that don't send
// pings), keep the site online.
const isConnected = await hasActiveConnections(staleSite.newtId);
if (isConnected) {
logger.debug(
`Newt ${staleSite.newtId} has not pinged recently but is still connected via WebSocket — keeping site ${staleSite.siteId} online`
);
continue;
}
// for (const offlineClient of offlineClients) {
// logger.info(
// `Kicking offline newt client ${offlineClient.clientId} due to inactivity`
// );
logger.info(
`Marking site ${staleSite.siteId} offline: newt ${staleSite.newtId} has no recent ping and no active WebSocket connection`
);
// if (!offlineClient.newtId) {
// logger.warn(
// `Offline client ${offlineClient.clientId} has no newtId, cannot disconnect`
// );
// continue;
// }
await db
.update(sites)
.set({ online: false })
.where(eq(sites.siteId, staleSite.siteId));
}
} catch (error) {
logger.error("Error in newt offline checker interval", { error });
}
}, OFFLINE_CHECK_INTERVAL);
// // Send a disconnect message to the client if connected
// try {
// await sendTerminateClient(
// offlineClient.clientId,
// offlineClient.newtId
// ); // terminate first
// // wait a moment to ensure the message is sent
// await new Promise((resolve) => setTimeout(resolve, 1000));
// await disconnectClient(offlineClient.newtId);
// } catch (error) {
// logger.error(
// `Error sending disconnect to offline newt ${offlineClient.clientId}`,
// { error }
// );
// }
// }
// } catch (error) {
// logger.error("Error in offline checker interval", { error });
// }
// }, OFFLINE_CHECK_INTERVAL);
// logger.debug("Started offline checker interval");
// };
logger.debug("Started newt offline checker interval");
};
/**
* Stops the background interval that checks for offline clients
* Stops the background interval that checks for offline newt sites.
*/
// export const stopNewtOfflineChecker = (): void => {
// if (offlineCheckerInterval) {
// clearInterval(offlineCheckerInterval);
// offlineCheckerInterval = null;
// logger.info("Stopped offline checker interval");
// }
// };
export const stopNewtOfflineChecker = (): void => {
if (offlineCheckerInterval) {
clearInterval(offlineCheckerInterval);
offlineCheckerInterval = null;
logger.info("Stopped newt offline checker interval");
}
};
/**
* Handles ping messages from clients and responds with pong
* Handles ping messages from newt clients.
*
* On each ping:
* - Marks the associated site as online.
* - Records the current timestamp as the newt's last-ping time.
* - Triggers a config sync if the newt is running an outdated config version.
* - Responds with a pong message.
*/
export const handleNewtPingMessage: MessageHandler = async (context) => {
const { message, client: c, sendToClient } = context;
const { message, client: c } = context;
const newt = c as Newt;
if (!newt) {
@@ -112,15 +114,31 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
return;
}
// get the version
try {
// Mark the site as online and record the ping timestamp.
await db
.update(sites)
.set({
online: true,
lastPing: Math.floor(Date.now() / 1000)
})
.where(eq(sites.siteId, newt.siteId));
} catch (error) {
logger.error("Error updating online state on newt ping", { error });
}
// Check config version and sync if stale.
const configVersion = await getClientConfigVersion(newt.newtId);
if (message.configVersion && configVersion != null && configVersion != message.configVersion) {
if (
message.configVersion != null &&
configVersion != null &&
configVersion !== message.configVersion
) {
logger.warn(
`Newt ping with outdated config version: ${message.configVersion} (current: ${configVersion})`
);
// get the site
const [site] = await db
.select()
.from(sites)
@@ -137,19 +155,6 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
await sendNewtSyncMessage(newt, site);
}
// try {
// // Update the client's last ping timestamp
// await db
// .update(clients)
// .set({
// lastPing: Math.floor(Date.now() / 1000),
// online: true
// })
// .where(eq(clients.clientId, newt.clientId));
// } catch (error) {
// logger.error("Error handling ping message", { error });
// }
return {
message: {
type: "pong",

View File

@@ -5,9 +5,7 @@ import { eq } from "drizzle-orm";
import { addPeer, deletePeer } from "../gerbil/peers";
import logger from "@server/logger";
import config from "@server/lib/config";
import {
findNextAvailableCidr,
} from "@server/lib/ip";
import { findNextAvailableCidr } from "@server/lib/ip";
import {
selectBestExitNode,
verifyExitNodeOrgAccess
@@ -15,6 +13,7 @@ import {
import { fetchContainers } from "./dockerSocket";
import { lockManager } from "#dynamic/lib/lock";
import { buildTargetConfigurationForNewtClient } from "./buildConfiguration";
import { canCompress } from "@server/lib/clientVersionChecks";
export type ExitNodePingResult = {
exitNodeId: number;
@@ -215,6 +214,9 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
healthCheckTargets: validHealthCheckTargets
}
},
options: {
compress: canCompress(newt.version, "newt")
},
broadcast: false, // Send to all clients
excludeSender: false // Include sender in broadcast
};

View File

@@ -10,10 +10,21 @@ interface PeerBandwidth {
bytesOut: number;
}
interface BandwidthAccumulator {
bytesIn: number;
bytesOut: number;
}
// Retry configuration for deadlock handling
const MAX_RETRIES = 3;
const BASE_DELAY_MS = 50;
// How often to flush accumulated bandwidth data to the database
const FLUSH_INTERVAL_MS = 120_000; // 120 seconds
// In-memory accumulator: publicKey -> { bytesIn, bytesOut }
let accumulator = new Map<string, BandwidthAccumulator>();
/**
* Check if an error is a deadlock error
*/
@@ -53,6 +64,90 @@ async function withDeadlockRetry<T>(
}
}
/**
* Flush all accumulated bandwidth data to the database.
*
* Swaps out the accumulator before writing so that any bandwidth messages
* received during the flush are captured in the new accumulator rather than
* being lost or causing contention. Entries that fail to write are re-queued
* back into the accumulator so they will be retried on the next flush.
*
* This function is exported so that the application's graceful-shutdown
* cleanup handler can call it before the process exits.
*/
export async function flushBandwidthToDb(): Promise<void> {
if (accumulator.size === 0) {
return;
}
// Atomically swap out the accumulator so new data keeps flowing in
// while we write the snapshot to the database.
const snapshot = accumulator;
accumulator = new Map<string, BandwidthAccumulator>();
const currentTime = new Date().toISOString();
// Sort by publicKey for consistent lock ordering across concurrent
// writers — this is the same deadlock-prevention strategy used in the
// original per-message implementation.
const sortedEntries = [...snapshot.entries()].sort(([a], [b]) =>
a.localeCompare(b)
);
logger.debug(
`Flushing accumulated bandwidth data for ${sortedEntries.length} client(s) to the database`
);
for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) {
try {
await withDeadlockRetry(async () => {
// Use atomic SQL increment to avoid the SELECT-then-UPDATE
// anti-pattern and the races it would introduce.
await db
.update(clients)
.set({
// Note: bytesIn from peer goes to megabytesOut (data
// sent to client) and bytesOut from peer goes to
// megabytesIn (data received from client).
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
lastBandwidthUpdate: currentTime
})
.where(eq(clients.pubKey, publicKey));
}, `flush bandwidth for client ${publicKey}`);
} catch (error) {
logger.error(
`Failed to flush bandwidth for client ${publicKey}:`,
error
);
// Re-queue the failed entry so it is retried on the next flush
// rather than silently dropped.
const existing = accumulator.get(publicKey);
if (existing) {
existing.bytesIn += bytesIn;
existing.bytesOut += bytesOut;
} else {
accumulator.set(publicKey, { bytesIn, bytesOut });
}
}
}
}
const flushTimer = setInterval(async () => {
try {
await flushBandwidthToDb();
} catch (error) {
logger.error("Unexpected error during periodic bandwidth flush:", error);
}
}, FLUSH_INTERVAL_MS);
// Calling unref() means this timer will not keep the Node.js event loop alive
// on its own — the process can still exit normally when there is no other work
// left. The graceful-shutdown path (see server/cleanup.ts) will call
// flushBandwidthToDb() explicitly before process.exit(), so no data is lost.
flushTimer.unref();
export const handleReceiveBandwidthMessage: MessageHandler = async (
context
) => {
@@ -69,40 +164,21 @@ export const handleReceiveBandwidthMessage: MessageHandler = async (
throw new Error("Invalid bandwidth data");
}
// Sort bandwidth data by publicKey to ensure consistent lock ordering across all instances
// This is critical for preventing deadlocks when multiple instances update the same clients
const sortedBandwidthData = [...bandwidthData].sort((a, b) =>
a.publicKey.localeCompare(b.publicKey)
);
// Accumulate the incoming data in memory; the periodic timer (and the
// shutdown hook) will take care of writing it to the database.
for (const { publicKey, bytesIn, bytesOut } of bandwidthData) {
// Skip peers that haven't transferred any data — writing zeros to the
// database would be a no-op anyway.
if (bytesIn <= 0 && bytesOut <= 0) {
continue;
}
const currentTime = new Date().toISOString();
// Update each client individually with retry logic
// This reduces transaction scope and allows retries per-client
for (const peer of sortedBandwidthData) {
const { publicKey, bytesIn, bytesOut } = peer;
try {
await withDeadlockRetry(async () => {
// Use atomic SQL increment to avoid SELECT then UPDATE pattern
// This eliminates the need to read the current value first
await db
.update(clients)
.set({
// Note: bytesIn from peer goes to megabytesOut (data sent to client)
// and bytesOut from peer goes to megabytesIn (data received from client)
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
lastBandwidthUpdate: currentTime
})
.where(eq(clients.pubKey, publicKey));
}, `update client bandwidth ${publicKey}`);
} catch (error) {
logger.error(
`Failed to update bandwidth for client ${publicKey}:`,
error
);
// Continue with other clients even if one fails
const existing = accumulator.get(publicKey);
if (existing) {
existing.bytesIn += bytesIn;
existing.bytesOut += bytesOut;
} else {
accumulator.set(publicKey, { bytesIn, bytesOut });
}
}
};

View File

@@ -7,3 +7,4 @@ export * from "./handleSocketMessages";
export * from "./handleNewtPingRequestMessage";
export * from "./handleApplyBlueprintMessage";
export * from "./handleNewtPingMessage";
export * from "./handleNewtDisconnectingMessage";

View File

@@ -6,6 +6,7 @@ import {
buildClientConfigurationForNewtClient,
buildTargetConfigurationForNewtClient
} from "./buildConfiguration";
import { canCompress } from "@server/lib/clientVersionChecks";
export async function sendNewtSyncMessage(newt: Newt, site: Site) {
const { tcpTargets, udpTargets, validHealthCheckTargets } =
@@ -24,18 +25,24 @@ export async function sendNewtSyncMessage(newt: Newt, site: Site) {
exitNode
);
await sendToClient(newt.newtId, {
type: "newt/sync",
data: {
proxyTargets: {
udp: udpTargets,
tcp: tcpTargets
},
healthCheckTargets: validHealthCheckTargets,
peers: peers,
clientTargets: targets
await sendToClient(
newt.newtId,
{
type: "newt/sync",
data: {
proxyTargets: {
udp: udpTargets,
tcp: tcpTargets
},
healthCheckTargets: validHealthCheckTargets,
peers: peers,
clientTargets: targets
}
},
{
compress: canCompress(newt.version, "newt")
}
}).catch((error) => {
).catch((error) => {
logger.warn(`Error sending newt sync message:`, error);
});
}

View File

@@ -2,13 +2,14 @@ import { Target, TargetHealthCheck, db, targetHealthCheck } from "@server/db";
import { sendToClient } from "#dynamic/routers/ws";
import logger from "@server/logger";
import { eq, inArray } from "drizzle-orm";
import { canCompress } from "@server/lib/clientVersionChecks";
export async function addTargets(
newtId: string,
targets: Target[],
healthCheckData: TargetHealthCheck[],
protocol: string,
port: number | null = null
version?: string | null
) {
//create a list of udp and tcp targets
const payloadTargets = targets.map((target) => {
@@ -22,7 +23,7 @@ export async function addTargets(
data: {
targets: payloadTargets
}
}, { incrementConfigVersion: true });
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
// Create a map for quick lookup
const healthCheckMap = new Map<number, TargetHealthCheck>();
@@ -103,14 +104,14 @@ export async function addTargets(
data: {
targets: validHealthCheckTargets
}
}, { incrementConfigVersion: true });
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
}
export async function removeTargets(
newtId: string,
targets: Target[],
protocol: string,
port: number | null = null
version?: string | null
) {
//create a list of udp and tcp targets
const payloadTargets = targets.map((target) => {
@@ -135,5 +136,5 @@ export async function removeTargets(
data: {
ids: healthCheckTargets
}
}, { incrementConfigVersion: true });
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
}

View File

@@ -1,5 +1,17 @@
import { Client, clientSiteResourcesAssociationsCache, clientSitesAssociationsCache, db, exitNodes, siteResources, sites } from "@server/db";
import { generateAliasConfig, generateRemoteSubnets } from "@server/lib/ip";
import {
Client,
clientSiteResourcesAssociationsCache,
clientSitesAssociationsCache,
db,
exitNodes,
siteResources,
sites
} from "@server/db";
import {
Alias,
generateAliasConfig,
generateRemoteSubnets
} from "@server/lib/ip";
import logger from "@server/logger";
import { and, eq } from "drizzle-orm";
import { addPeer, deletePeer } from "../newt/peers";
@@ -8,9 +20,19 @@ import config from "@server/lib/config";
export async function buildSiteConfigurationForOlmClient(
client: Client,
publicKey: string | null,
relay: boolean
relay: boolean,
jitMode: boolean = false
) {
const siteConfigurations = [];
const siteConfigurations: {
siteId: number;
name?: string
endpoint?: string
publicKey?: string
serverIP?: string | null
serverPort?: number | null
remoteSubnets?: string[];
aliases: Alias[];
}[] = [];
// Get all sites data
const sitesData = await db
@@ -27,6 +49,40 @@ export async function buildSiteConfigurationForOlmClient(
sites: site,
clientSitesAssociationsCache: association
} of sitesData) {
const allSiteResources = await db // only get the site resources that this client has access to
.select()
.from(siteResources)
.innerJoin(
clientSiteResourcesAssociationsCache,
eq(
siteResources.siteResourceId,
clientSiteResourcesAssociationsCache.siteResourceId
)
)
.where(
and(
eq(siteResources.siteId, site.siteId),
eq(
clientSiteResourcesAssociationsCache.clientId,
client.clientId
)
)
);
if (jitMode) {
// Add site configuration to the array
siteConfigurations.push({
siteId: site.siteId,
// remoteSubnets: generateRemoteSubnets(
// allSiteResources.map(({ siteResources }) => siteResources)
// ),
aliases: generateAliasConfig(
allSiteResources.map(({ siteResources }) => siteResources)
)
});
continue;
}
if (!site.exitNodeId) {
logger.warn(
`Site ${site.siteId} does not have exit node, skipping`
@@ -42,6 +98,13 @@ export async function buildSiteConfigurationForOlmClient(
continue;
}
if (!site.publicKey || site.publicKey == "") { // the site is not ready to accept new peers
logger.warn(
`Site ${site.siteId} has no public key, skipping`
);
continue;
}
// if (site.lastHolePunch && now - site.lastHolePunch > 6 && relay) {
// logger.warn(
// `Site ${site.siteId} last hole punch is too old, skipping`
@@ -103,26 +166,6 @@ export async function buildSiteConfigurationForOlmClient(
relayEndpoint = `${exitNode.endpoint}:${config.getRawConfig().gerbil.clients_start_port}`;
}
const allSiteResources = await db // only get the site resources that this client has access to
.select()
.from(siteResources)
.innerJoin(
clientSiteResourcesAssociationsCache,
eq(
siteResources.siteResourceId,
clientSiteResourcesAssociationsCache.siteResourceId
)
)
.where(
and(
eq(siteResources.siteId, site.siteId),
eq(
clientSiteResourcesAssociationsCache.clientId,
client.clientId
)
)
);
// Add site configuration to the array
siteConfigurations.push({
siteId: site.siteId,

View File

@@ -6,7 +6,7 @@ import logger from "@server/logger";
/**
* Handles disconnecting messages from clients to show disconnected in the ui
*/
export const handleOlmDisconnecingMessage: MessageHandler = async (context) => {
export const handleOlmDisconnectingMessage: MessageHandler = async (context) => {
const { message, client: c, sendToClient } = context;
const olm = c as Olm;

View File

@@ -17,6 +17,9 @@ import { getUserDeviceName } from "@server/db/names";
import { buildSiteConfigurationForOlmClient } from "./buildConfiguration";
import { OlmErrorCodes, sendOlmError } from "./error";
import { handleFingerprintInsertion } from "./fingerprintingUtils";
import { Alias } from "@server/lib/ip";
import { build } from "@server/build";
import { canCompress } from "@server/lib/clientVersionChecks";
export const handleOlmRegisterMessage: MessageHandler = async (context) => {
logger.info("Handling register olm message!");
@@ -207,6 +210,32 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
}
}
// Get all sites data
const sitesCountResult = await db
.select({ count: count() })
.from(sites)
.innerJoin(
clientSitesAssociationsCache,
eq(sites.siteId, clientSitesAssociationsCache.siteId)
)
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
// Extract the count value from the result array
const sitesCount =
sitesCountResult.length > 0 ? sitesCountResult[0].count : 0;
// Prepare an array to store site configurations
logger.debug(`Found ${sitesCount} sites for client ${client.clientId}`);
let jitMode = false;
if (sitesCount > 250 && build == "saas") {
// THIS IS THE MAX ON THE BUSINESS TIER
// we have too many sites
// If we have too many sites we need to drop into fully JIT mode by not sending any of the sites
logger.info("Too many sites (%d), dropping into JIT mode", sitesCount);
jitMode = true;
}
logger.debug(
`Olm client ID: ${client.clientId}, Public Key: ${publicKey}, Relay: ${relay}`
);
@@ -233,28 +262,12 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
await db
.update(clientSitesAssociationsCache)
.set({
isRelayed: relay == true
isRelayed: relay == true,
isJitMode: jitMode
})
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
}
// Get all sites data
const sitesCountResult = await db
.select({ count: count() })
.from(sites)
.innerJoin(
clientSitesAssociationsCache,
eq(sites.siteId, clientSitesAssociationsCache.siteId)
)
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
// Extract the count value from the result array
const sitesCount =
sitesCountResult.length > 0 ? sitesCountResult[0].count : 0;
// Prepare an array to store site configurations
logger.debug(`Found ${sitesCount} sites for client ${client.clientId}`);
// this prevents us from accepting a register from an olm that has not hole punched yet.
// the olm will pump the register so we can keep checking
// TODO: I still think there is a better way to do this rather than locking it out here but ???
@@ -265,19 +278,14 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
return;
}
// NOTE: its important that the client here is the old client and the public key is the new key
// NOTE: its important that the client here is the old client and the public key is the new key
const siteConfigurations = await buildSiteConfigurationForOlmClient(
client,
publicKey,
relay
relay,
jitMode
);
// REMOVED THIS SO IT CREATES THE INTERFACE AND JUST WAITS FOR THE SITES
// if (siteConfigurations.length === 0) {
// logger.warn("No valid site configurations found");
// return;
// }
// Return connect message with all site configurations
return {
message: {
@@ -288,6 +296,9 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
utilitySubnet: org.utilitySubnet
}
},
options: {
compress: canCompress(olm.version, "olm")
},
broadcast: false,
excludeSender: false
};

View File

@@ -18,7 +18,7 @@ export const handleOlmRelayMessage: MessageHandler = async (context) => {
}
if (!olm.clientId) {
logger.warn("Olm has no site!"); // TODO: Maybe we create the site here?
logger.warn("Olm has no client!");
return;
}
@@ -41,7 +41,7 @@ export const handleOlmRelayMessage: MessageHandler = async (context) => {
return;
}
const { siteId } = message.data;
const { siteId, chainId } = message.data;
// Get the site
const [site] = await db
@@ -90,7 +90,8 @@ export const handleOlmRelayMessage: MessageHandler = async (context) => {
data: {
siteId: siteId,
relayEndpoint: exitNode.endpoint,
relayPort: config.getRawConfig().gerbil.clients_start_port
relayPort: config.getRawConfig().gerbil.clients_start_port,
chainId
}
},
broadcast: false,

View File

@@ -0,0 +1,241 @@
import {
clientSiteResourcesAssociationsCache,
clientSitesAssociationsCache,
db,
exitNodes,
Site,
siteResources
} from "@server/db";
import { MessageHandler } from "@server/routers/ws";
import { clients, Olm, sites } from "@server/db";
import { and, eq, or } from "drizzle-orm";
import logger from "@server/logger";
import { initPeerAddHandshake } from "./peers";
export const handleOlmServerInitAddPeerHandshake: MessageHandler = async (
context
) => {
logger.info("Handling register olm message!");
const { message, client: c, sendToClient } = context;
const olm = c as Olm;
if (!olm) {
logger.warn("Olm not found");
return;
}
if (!olm.clientId) {
logger.warn("Olm has no client!"); // TODO: Maybe we create the site here?
return;
}
const clientId = olm.clientId;
const [client] = await db
.select()
.from(clients)
.where(eq(clients.clientId, clientId))
.limit(1);
if (!client) {
logger.warn("Client not found");
return;
}
const { siteId, resourceId, chainId } = message.data;
let site: Site | null = null;
if (siteId) {
// get the site
const [siteRes] = await db
.select()
.from(sites)
.where(eq(sites.siteId, siteId))
.limit(1);
if (siteRes) {
site = siteRes;
}
}
if (resourceId && !site) {
const resources = await db
.select()
.from(siteResources)
.where(
and(
or(
eq(siteResources.niceId, resourceId),
eq(siteResources.alias, resourceId)
),
eq(siteResources.orgId, client.orgId)
)
);
if (!resources || resources.length === 0) {
logger.error(`handleOlmServerPeerAddMessage: Resource not found`);
// cancel the request from the olm side to not keep doing this
await sendToClient(
olm.olmId,
{
type: "olm/wg/peer/chain/cancel",
data: {
chainId
}
},
{ incrementConfigVersion: false }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
return;
}
if (resources.length > 1) {
// error but this should not happen because the nice id cant contain a dot and the alias has to have a dot and both have to be unique within the org so there should never be multiple matches
logger.error(
`handleOlmServerPeerAddMessage: Multiple resources found matching the criteria`
);
return;
}
const resource = resources[0];
const currentResourceAssociationCaches = await db
.select()
.from(clientSiteResourcesAssociationsCache)
.where(
and(
eq(
clientSiteResourcesAssociationsCache.siteResourceId,
resource.siteResourceId
),
eq(
clientSiteResourcesAssociationsCache.clientId,
client.clientId
)
)
);
if (currentResourceAssociationCaches.length === 0) {
logger.error(
`handleOlmServerPeerAddMessage: Client ${client.clientId} does not have access to resource ${resource.siteResourceId}`
);
// cancel the request from the olm side to not keep doing this
await sendToClient(
olm.olmId,
{
type: "olm/wg/peer/chain/cancel",
data: {
chainId
}
},
{ incrementConfigVersion: false }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
return;
}
const siteIdFromResource = resource.siteId;
// get the site
const [siteRes] = await db
.select()
.from(sites)
.where(eq(sites.siteId, siteIdFromResource));
if (!siteRes) {
logger.error(
`handleOlmServerPeerAddMessage: Site with ID ${site} not found`
);
return;
}
site = siteRes;
}
if (!site) {
logger.error(`handleOlmServerPeerAddMessage: Site not found`);
return;
}
// check if the client can access this site using the cache
const currentSiteAssociationCaches = await db
.select()
.from(clientSitesAssociationsCache)
.where(
and(
eq(clientSitesAssociationsCache.clientId, client.clientId),
eq(clientSitesAssociationsCache.siteId, site.siteId)
)
);
if (currentSiteAssociationCaches.length === 0) {
logger.error(
`handleOlmServerPeerAddMessage: Client ${client.clientId} does not have access to site ${site.siteId}`
);
// cancel the request from the olm side to not keep doing this
await sendToClient(
olm.olmId,
{
type: "olm/wg/peer/chain/cancel",
data: {
chainId
}
},
{ incrementConfigVersion: false }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
return;
}
if (!site.exitNodeId) {
logger.error(
`handleOlmServerPeerAddMessage: Site with ID ${site.siteId} has no exit node`
);
// cancel the request from the olm side to not keep doing this
await sendToClient(
olm.olmId,
{
type: "olm/wg/peer/chain/cancel",
data: {
chainId
}
},
{ incrementConfigVersion: false }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
return;
}
// get the exit node from the side
const [exitNode] = await db
.select()
.from(exitNodes)
.where(eq(exitNodes.exitNodeId, site.exitNodeId));
if (!exitNode) {
logger.error(
`handleOlmServerPeerAddMessage: Site with ID ${site.siteId} has no exit node`
);
return;
}
// also trigger the peer add handshake in case the peer was not already added to the olm and we need to hole punch
// if it has already been added this will be a no-op
await initPeerAddHandshake(
// this will kick off the add peer process for the client
client.clientId,
{
siteId: site.siteId,
exitNode: {
publicKey: exitNode.publicKey,
endpoint: exitNode.endpoint
}
},
olm.olmId,
chainId
);
return;
};

View File

@@ -54,7 +54,7 @@ export const handleOlmServerPeerAddMessage: MessageHandler = async (
return;
}
const { siteId } = message.data;
const { siteId, chainId } = message.data;
// get the site
const [site] = await db
@@ -179,7 +179,8 @@ export const handleOlmServerPeerAddMessage: MessageHandler = async (
),
aliases: generateAliasConfig(
allSiteResources.map(({ siteResources }) => siteResources)
)
),
chainId: chainId,
}
},
broadcast: false,

View File

@@ -17,7 +17,7 @@ export const handleOlmUnRelayMessage: MessageHandler = async (context) => {
}
if (!olm.clientId) {
logger.warn("Olm has no site!"); // TODO: Maybe we create the site here?
logger.warn("Olm has no client!");
return;
}
@@ -40,7 +40,7 @@ export const handleOlmUnRelayMessage: MessageHandler = async (context) => {
return;
}
const { siteId } = message.data;
const { siteId, chainId } = message.data;
// Get the site
const [site] = await db
@@ -87,7 +87,8 @@ export const handleOlmUnRelayMessage: MessageHandler = async (context) => {
type: "olm/wg/peer/unrelay",
data: {
siteId: siteId,
endpoint: site.endpoint
endpoint: site.endpoint,
chainId
}
},
broadcast: false,

View File

@@ -11,3 +11,4 @@ export * from "./handleOlmServerPeerAddMessage";
export * from "./handleOlmUnRelayMessage";
export * from "./recoverOlmWithFingerprint";
export * from "./handleOlmDisconnectingMessage";
export * from "./handleOlmServerInitAddPeerHandshake";

View File

@@ -1,8 +1,9 @@
import { sendToClient } from "#dynamic/routers/ws";
import { db, olms } from "@server/db";
import { clientSitesAssociationsCache, db, olms } from "@server/db";
import { canCompress } from "@server/lib/clientVersionChecks";
import config from "@server/lib/config";
import logger from "@server/logger";
import { eq } from "drizzle-orm";
import { and, eq } from "drizzle-orm";
import { Alias } from "yaml";
export async function addPeer(
@@ -18,7 +19,8 @@ export async function addPeer(
remoteSubnets: string[] | null; // optional, comma-separated list of subnets that this site can access
aliases: Alias[];
},
olmId?: string
olmId?: string,
version?: string | null
) {
if (!olmId) {
const [olm] = await db
@@ -30,6 +32,7 @@ export async function addPeer(
return; // ignore this because an olm might not be associated with the client anymore
}
olmId = olm.olmId;
version = olm.version;
}
await sendToClient(
@@ -48,7 +51,7 @@ export async function addPeer(
aliases: peer.aliases
}
},
{ incrementConfigVersion: true }
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
@@ -60,7 +63,8 @@ export async function deletePeer(
clientId: number,
siteId: number,
publicKey: string,
olmId?: string
olmId?: string,
version?: string | null
) {
if (!olmId) {
const [olm] = await db
@@ -72,6 +76,7 @@ export async function deletePeer(
return;
}
olmId = olm.olmId;
version = olm.version;
}
await sendToClient(
@@ -83,7 +88,7 @@ export async function deletePeer(
siteId: siteId
}
},
{ incrementConfigVersion: true }
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
@@ -103,7 +108,8 @@ export async function updatePeer(
remoteSubnets?: string[] | null; // optional, comma-separated list of subnets that
aliases?: Alias[] | null;
},
olmId?: string
olmId?: string,
version?: string | null
) {
if (!olmId) {
const [olm] = await db
@@ -115,6 +121,7 @@ export async function updatePeer(
return;
}
olmId = olm.olmId;
version = olm.version;
}
await sendToClient(
@@ -132,7 +139,7 @@ export async function updatePeer(
aliases: peer.aliases
}
},
{ incrementConfigVersion: true }
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
@@ -149,7 +156,8 @@ export async function initPeerAddHandshake(
endpoint: string;
};
},
olmId?: string
olmId?: string,
chainId?: string
) {
if (!olmId) {
const [olm] = await db
@@ -173,7 +181,8 @@ export async function initPeerAddHandshake(
publicKey: peer.exitNode.publicKey,
relayPort: config.getRawConfig().gerbil.clients_start_port,
endpoint: peer.exitNode.endpoint
}
},
chainId
}
},
{ incrementConfigVersion: true }
@@ -181,6 +190,17 @@ export async function initPeerAddHandshake(
logger.warn(`Error sending message:`, error);
});
// update the clientSiteAssociationsCache to make the isJitMode flag false so that JIT mode is disabled for this site if it restarts or something after the connection
await db
.update(clientSitesAssociationsCache)
.set({ isJitMode: false })
.where(
and(
eq(clientSitesAssociationsCache.clientId, clientId),
eq(clientSitesAssociationsCache.siteId, peer.siteId)
)
);
logger.info(
`Initiated peer add handshake for site ${peer.siteId} to olm ${olmId}`
);

View File

@@ -1,9 +1,17 @@
import { Client, db, exitNodes, Olm, sites, clientSitesAssociationsCache } from "@server/db";
import {
Client,
db,
exitNodes,
Olm,
sites,
clientSitesAssociationsCache
} from "@server/db";
import { buildSiteConfigurationForOlmClient } from "./buildConfiguration";
import { sendToClient } from "#dynamic/routers/ws";
import logger from "@server/logger";
import { eq, inArray } from "drizzle-orm";
import config from "@server/lib/config";
import { canCompress } from "@server/lib/clientVersionChecks";
export async function sendOlmSyncMessage(olm: Olm, client: Client) {
// NOTE: WE ARE HARDCODING THE RELAY PARAMETER TO FALSE HERE BUT IN THE REGISTER MESSAGE ITS DEFINED BY THE CLIENT
@@ -17,10 +25,7 @@ export async function sendOlmSyncMessage(olm: Olm, client: Client) {
const clientSites = await db
.select()
.from(clientSitesAssociationsCache)
.innerJoin(
sites,
eq(sites.siteId, clientSitesAssociationsCache.siteId)
)
.innerJoin(sites, eq(sites.siteId, clientSitesAssociationsCache.siteId))
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
// Extract unique exit node IDs
@@ -68,13 +73,20 @@ export async function sendOlmSyncMessage(olm: Olm, client: Client) {
logger.debug("sendOlmSyncMessage: sending sync message");
await sendToClient(olm.olmId, {
type: "olm/sync",
data: {
sites: siteConfigurations,
exitNodes: exitNodesData
await sendToClient(
olm.olmId,
{
type: "olm/sync",
data: {
sites: siteConfigurations,
exitNodes: exitNodesData
}
},
{
compress: canCompress(olm.version, "olm")
}
}).catch((error) => {
).catch((error) => {
logger.warn(`Error sending olm sync message:`, error);
});
}

View File

@@ -8,3 +8,4 @@ export * from "./getOrgOverview";
export * from "./listOrgs";
export * from "./pickOrgDefaults";
export * from "./checkOrgUserAccess";
export * from "./resetOrgBandwidth";

Some files were not shown because too many files have changed in this diff Show More