mirror of
https://github.com/fosrl/pangolin.git
synced 2026-04-10 20:06:37 +00:00
Merge branch 'dev' into private-site-ha
This commit is contained in:
@@ -18,8 +18,8 @@ import { eq, and } from "drizzle-orm";
|
||||
import config from "@server/lib/config";
|
||||
import {
|
||||
formatEndpoint,
|
||||
generateSubnetProxyTargets,
|
||||
SubnetProxyTarget
|
||||
generateSubnetProxyTargetV2,
|
||||
SubnetProxyTargetV2
|
||||
} from "@server/lib/ip";
|
||||
|
||||
export async function buildClientConfigurationForNewtClient(
|
||||
@@ -148,7 +148,7 @@ export async function buildClientConfigurationForNewtClient(
|
||||
.where(eq(siteNetworks.siteId, siteId))
|
||||
.then((rows) => rows.map((r) => r.siteResources));
|
||||
|
||||
const targetsToSend: SubnetProxyTarget[] = [];
|
||||
const targetsToSend: SubnetProxyTargetV2[] = [];
|
||||
|
||||
for (const resource of allSiteResources) {
|
||||
// Get clients associated with this specific resource
|
||||
@@ -173,12 +173,14 @@ export async function buildClientConfigurationForNewtClient(
|
||||
)
|
||||
);
|
||||
|
||||
const resourceTargets = generateSubnetProxyTargets(
|
||||
const resourceTarget = generateSubnetProxyTargetV2(
|
||||
resource,
|
||||
resourceClients
|
||||
);
|
||||
|
||||
targetsToSend.push(...resourceTargets);
|
||||
if (resourceTarget) {
|
||||
targetsToSend.push(resourceTarget);
|
||||
}
|
||||
}
|
||||
|
||||
return {
|
||||
|
||||
@@ -46,7 +46,7 @@ export async function createNewt(
|
||||
|
||||
const { newtId, secret } = parsedBody.data;
|
||||
|
||||
if (req.user && !req.userOrgRoleId) {
|
||||
if (req.user && (!req.userOrgRoleIds || req.userOrgRoleIds.length === 0)) {
|
||||
return next(
|
||||
createHttpError(HttpCode.FORBIDDEN, "User does not have a role")
|
||||
);
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import { generateSessionToken } from "@server/auth/sessions/app";
|
||||
import { db } from "@server/db";
|
||||
import { db, newtSessions } from "@server/db";
|
||||
import { newts } from "@server/db";
|
||||
import { getOrCreateCachedToken } from "#dynamic/lib/tokenCache";
|
||||
import { EXPIRES } from "@server/auth/sessions/newt";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
import response from "@server/lib/response";
|
||||
import { eq } from "drizzle-orm";
|
||||
@@ -92,8 +94,19 @@ export async function getNewtToken(
|
||||
);
|
||||
}
|
||||
|
||||
const resToken = generateSessionToken();
|
||||
await createNewtSession(resToken, existingNewt.newtId);
|
||||
// Return a cached token if one exists to prevent thundering herd on
|
||||
// simultaneous restarts; falls back to creating a fresh session when
|
||||
// Redis is unavailable or the cache has expired.
|
||||
const resToken = await getOrCreateCachedToken(
|
||||
`newt:token_cache:${existingNewt.newtId}`,
|
||||
config.getRawConfig().server.secret!,
|
||||
Math.floor(EXPIRES / 1000),
|
||||
async () => {
|
||||
const token = generateSessionToken();
|
||||
await createNewtSession(token, existingNewt.newtId);
|
||||
return token;
|
||||
}
|
||||
);
|
||||
|
||||
return response<{ token: string; serverVersion: string }>(res, {
|
||||
data: {
|
||||
|
||||
13
server/routers/newt/handleConnectionLogMessage.ts
Normal file
13
server/routers/newt/handleConnectionLogMessage.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
|
||||
export async function flushConnectionLogToDb(): Promise<void> {
|
||||
return;
|
||||
}
|
||||
|
||||
export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
|
||||
return;
|
||||
}
|
||||
|
||||
export const handleConnectionLogMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
};
|
||||
@@ -6,14 +6,9 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
||||
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
||||
import { convertTargetsIfNessicary } from "../client/targets";
|
||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||
|
||||
const inputSchema = z.object({
|
||||
publicKey: z.string(),
|
||||
port: z.int().positive()
|
||||
});
|
||||
|
||||
type Input = z.infer<typeof inputSchema>;
|
||||
import config from "@server/lib/config";
|
||||
|
||||
export const handleGetConfigMessage: MessageHandler = async (context) => {
|
||||
const { message, client, sendToClient } = context;
|
||||
@@ -33,16 +28,7 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
}
|
||||
|
||||
const parsed = inputSchema.safeParse(message.data);
|
||||
if (!parsed.success) {
|
||||
logger.error(
|
||||
"handleGetConfigMessage: Invalid input: " +
|
||||
fromError(parsed.error).toString()
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
const { publicKey, port } = message.data as Input;
|
||||
const { publicKey, port, chainId } = message.data;
|
||||
const siteId = newt.siteId;
|
||||
|
||||
// Get the current site data
|
||||
@@ -70,7 +56,7 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
||||
|
||||
if (existingSite.lastHolePunch && now - existingSite.lastHolePunch > 5) {
|
||||
logger.warn(
|
||||
`handleGetConfigMessage: Site ${existingSite.siteId} last hole punch is too old, skipping`
|
||||
`Site last hole punch is too old; skipping this register. The site is failing to hole punch and identify its network address with the server. Can the client reach the server on UDP port ${config.getRawConfig().gerbil.clients_start_port}?`
|
||||
);
|
||||
return;
|
||||
}
|
||||
@@ -127,13 +113,16 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
||||
exitNode
|
||||
);
|
||||
|
||||
const targetsToSend = await convertTargetsIfNessicary(newt.newtId, targets);
|
||||
|
||||
return {
|
||||
message: {
|
||||
type: "newt/wg/receive-config",
|
||||
data: {
|
||||
ipAddress: site.address,
|
||||
peers,
|
||||
targets
|
||||
targets: targetsToSend,
|
||||
chainId: chainId
|
||||
}
|
||||
},
|
||||
options: {
|
||||
|
||||
@@ -6,7 +6,9 @@ import logger from "@server/logger";
|
||||
/**
|
||||
* Handles disconnecting messages from sites to show disconnected in the ui
|
||||
*/
|
||||
export const handleNewtDisconnectingMessage: MessageHandler = async (context) => {
|
||||
export const handleNewtDisconnectingMessage: MessageHandler = async (
|
||||
context
|
||||
) => {
|
||||
const { message, client: c, sendToClient } = context;
|
||||
const newt = c as Newt;
|
||||
|
||||
@@ -27,7 +29,7 @@ export const handleNewtDisconnectingMessage: MessageHandler = async (context) =>
|
||||
.set({
|
||||
online: false
|
||||
})
|
||||
.where(eq(sites.siteId, sites.siteId));
|
||||
.where(eq(sites.siteId, newt.siteId));
|
||||
} catch (error) {
|
||||
logger.error("Error handling disconnecting message", { error });
|
||||
}
|
||||
|
||||
@@ -1,15 +1,20 @@
|
||||
import { db, newts, sites } from "@server/db";
|
||||
import { hasActiveConnections, getClientConfigVersion } from "#dynamic/routers/ws";
|
||||
import { db, newts, sites, targetHealthCheck, targets } from "@server/db";
|
||||
import {
|
||||
hasActiveConnections,
|
||||
getClientConfigVersion
|
||||
} from "#dynamic/routers/ws";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { Newt } from "@server/db";
|
||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||
import { eq, lt, isNull, and, or, ne, not } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { sendNewtSyncMessage } from "./sync";
|
||||
import { recordPing } from "./pingAccumulator";
|
||||
|
||||
// Track if the offline checker interval is running
|
||||
let offlineCheckerInterval: NodeJS.Timeout | null = null;
|
||||
const OFFLINE_CHECK_INTERVAL = 30 * 1000; // Check every 30 seconds
|
||||
const OFFLINE_THRESHOLD_MS = 2 * 60 * 1000; // 2 minutes
|
||||
const OFFLINE_THRESHOLD_BANDWIDTH_MS = 8 * 60 * 1000; // 8 minutes
|
||||
|
||||
/**
|
||||
* Starts the background interval that checks for newt sites that haven't
|
||||
@@ -55,7 +60,9 @@ export const startNewtOfflineChecker = (): void => {
|
||||
// Backward-compatibility check: if the newt still has an
|
||||
// active WebSocket connection (older clients that don't send
|
||||
// pings), keep the site online.
|
||||
const isConnected = await hasActiveConnections(staleSite.newtId);
|
||||
const isConnected = await hasActiveConnections(
|
||||
staleSite.newtId
|
||||
);
|
||||
if (isConnected) {
|
||||
logger.debug(
|
||||
`Newt ${staleSite.newtId} has not pinged recently but is still connected via WebSocket — keeping site ${staleSite.siteId} online`
|
||||
@@ -71,6 +78,83 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.update(sites)
|
||||
.set({ online: false })
|
||||
.where(eq(sites.siteId, staleSite.siteId));
|
||||
|
||||
const healthChecksOnSite = await db
|
||||
.select()
|
||||
.from(targetHealthCheck)
|
||||
.innerJoin(
|
||||
targets,
|
||||
eq(targets.targetId, targetHealthCheck.targetId)
|
||||
)
|
||||
.innerJoin(sites, eq(sites.siteId, targets.siteId))
|
||||
.where(eq(sites.siteId, staleSite.siteId));
|
||||
|
||||
for (const healthCheck of healthChecksOnSite) {
|
||||
logger.info(
|
||||
`Marking health check ${healthCheck.targetHealthCheck.targetHealthCheckId} offline due to site ${staleSite.siteId} being marked offline`
|
||||
);
|
||||
await db
|
||||
.update(targetHealthCheck)
|
||||
.set({ hcHealth: "unknown" })
|
||||
.where(
|
||||
eq(
|
||||
targetHealthCheck.targetHealthCheckId,
|
||||
healthCheck.targetHealthCheck
|
||||
.targetHealthCheckId
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// this part only effects self hosted. Its not efficient but we dont expect people to have very many wireguard sites
|
||||
// select all of the wireguard sites to evaluate if they need to be offline due to the last bandwidth update
|
||||
const allWireguardSites = await db
|
||||
.select({
|
||||
siteId: sites.siteId,
|
||||
online: sites.online,
|
||||
lastBandwidthUpdate: sites.lastBandwidthUpdate
|
||||
})
|
||||
.from(sites)
|
||||
.where(
|
||||
and(
|
||||
eq(sites.type, "wireguard"),
|
||||
not(isNull(sites.lastBandwidthUpdate))
|
||||
)
|
||||
);
|
||||
|
||||
const wireguardOfflineThreshold = Math.floor(
|
||||
(Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000
|
||||
);
|
||||
|
||||
// loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline
|
||||
for (const site of allWireguardSites) {
|
||||
const lastBandwidthUpdate =
|
||||
new Date(site.lastBandwidthUpdate!).getTime() / 1000;
|
||||
if (
|
||||
lastBandwidthUpdate < wireguardOfflineThreshold &&
|
||||
site.online
|
||||
) {
|
||||
logger.info(
|
||||
`Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes`
|
||||
);
|
||||
|
||||
await db
|
||||
.update(sites)
|
||||
.set({ online: false })
|
||||
.where(eq(sites.siteId, site.siteId));
|
||||
} else if (
|
||||
lastBandwidthUpdate >= wireguardOfflineThreshold &&
|
||||
!site.online
|
||||
) {
|
||||
logger.info(
|
||||
`Marking wireguard site ${site.siteId} online: recent bandwidth update`
|
||||
);
|
||||
|
||||
await db
|
||||
.update(sites)
|
||||
.set({ online: true })
|
||||
.where(eq(sites.siteId, site.siteId));
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error("Error in newt offline checker interval", { error });
|
||||
@@ -114,18 +198,12 @@ export const handleNewtPingMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Mark the site as online and record the ping timestamp.
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: Math.floor(Date.now() / 1000)
|
||||
})
|
||||
.where(eq(sites.siteId, newt.siteId));
|
||||
} catch (error) {
|
||||
logger.error("Error updating online state on newt ping", { error });
|
||||
}
|
||||
// Record the ping in memory; it will be flushed to the database
|
||||
// periodically by the ping accumulator (every ~10s) in a single
|
||||
// batched UPDATE instead of one query per ping. This prevents
|
||||
// connection pool exhaustion under load, especially with
|
||||
// cross-region latency to the database.
|
||||
recordPing(newt.siteId);
|
||||
|
||||
// Check config version and sync if stale.
|
||||
const configVersion = await getClientConfigVersion(newt.newtId);
|
||||
|
||||
@@ -33,7 +33,7 @@ export const handleNewtPingRequestMessage: MessageHandler = async (context) => {
|
||||
return;
|
||||
}
|
||||
|
||||
const { noCloud } = message.data;
|
||||
const { noCloud, chainId } = message.data;
|
||||
|
||||
const exitNodesList = await listExitNodes(
|
||||
site.orgId,
|
||||
@@ -98,7 +98,8 @@ export const handleNewtPingRequestMessage: MessageHandler = async (context) => {
|
||||
message: {
|
||||
type: "newt/ping/exitNodes",
|
||||
data: {
|
||||
exitNodes: filteredExitNodes
|
||||
exitNodes: filteredExitNodes,
|
||||
chainId: chainId
|
||||
}
|
||||
},
|
||||
broadcast: false, // Send to all clients
|
||||
|
||||
@@ -43,7 +43,7 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
|
||||
|
||||
const siteId = newt.siteId;
|
||||
|
||||
const { publicKey, pingResults, newtVersion, backwardsCompatible } =
|
||||
const { publicKey, pingResults, newtVersion, backwardsCompatible, chainId } =
|
||||
message.data;
|
||||
if (!publicKey) {
|
||||
logger.warn("Public key not provided");
|
||||
@@ -211,7 +211,8 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
|
||||
udp: udpTargets,
|
||||
tcp: tcpTargets
|
||||
},
|
||||
healthCheckTargets: validHealthCheckTargets
|
||||
healthCheckTargets: validHealthCheckTargets,
|
||||
chainId: chainId
|
||||
}
|
||||
},
|
||||
options: {
|
||||
|
||||
@@ -8,3 +8,5 @@ export * from "./handleNewtPingRequestMessage";
|
||||
export * from "./handleApplyBlueprintMessage";
|
||||
export * from "./handleNewtPingMessage";
|
||||
export * from "./handleNewtDisconnectingMessage";
|
||||
export * from "./handleConnectionLogMessage";
|
||||
export * from "./registerNewt";
|
||||
|
||||
392
server/routers/newt/pingAccumulator.ts
Normal file
392
server/routers/newt/pingAccumulator.ts
Normal file
@@ -0,0 +1,392 @@
|
||||
import { db } from "@server/db";
|
||||
import { sites, clients, olms } from "@server/db";
|
||||
import { inArray } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
|
||||
/**
|
||||
* Ping Accumulator
|
||||
*
|
||||
* Instead of writing to the database on every single newt/olm ping (which
|
||||
* causes pool exhaustion under load, especially with cross-region latency),
|
||||
* we accumulate pings in memory and flush them to the database periodically
|
||||
* in a single batch.
|
||||
*
|
||||
* This is the same pattern used for bandwidth flushing in
|
||||
* receiveBandwidth.ts and handleReceiveBandwidthMessage.ts.
|
||||
*
|
||||
* Supports two kinds of pings:
|
||||
* - **Site pings** (from newts): update `sites.online` and `sites.lastPing`
|
||||
* - **Client pings** (from OLMs): update `clients.online`, `clients.lastPing`,
|
||||
* `clients.archived`, and optionally reset `olms.archived`
|
||||
*/
|
||||
|
||||
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
||||
const MAX_RETRIES = 5;
|
||||
const BASE_DELAY_MS = 50;
|
||||
|
||||
// ── Site (newt) pings ──────────────────────────────────────────────────
|
||||
// Map of siteId -> latest ping timestamp (unix seconds)
|
||||
const pendingSitePings: Map<number, number> = new Map();
|
||||
|
||||
// ── Client (OLM) pings ────────────────────────────────────────────────
|
||||
// Map of clientId -> latest ping timestamp (unix seconds)
|
||||
const pendingClientPings: Map<number, number> = new Map();
|
||||
// Set of olmIds whose `archived` flag should be reset to false
|
||||
const pendingOlmArchiveResets: Set<string> = new Set();
|
||||
|
||||
let flushTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
/**
|
||||
* Guard that prevents two flush cycles from running concurrently.
|
||||
* setInterval does not await async callbacks, so without this a slow flush
|
||||
* (e.g. due to DB latency) would overlap with the next scheduled cycle and
|
||||
* the two concurrent bulk UPDATEs would deadlock each other.
|
||||
*/
|
||||
let isFlushing = false;
|
||||
|
||||
// ── Public API ─────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Record a ping for a newt site. This does NOT write to the database
|
||||
* immediately. Instead it stores the latest ping timestamp in memory,
|
||||
* to be flushed periodically by the background timer.
|
||||
*/
|
||||
export function recordSitePing(siteId: number): void {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
pendingSitePings.set(siteId, now);
|
||||
}
|
||||
|
||||
/** @deprecated Use `recordSitePing` instead. Alias kept for existing call-sites. */
|
||||
export const recordPing = recordSitePing;
|
||||
|
||||
/**
|
||||
* Record a ping for an OLM client. Batches the `clients` table update
|
||||
* (`online`, `lastPing`, `archived`) and, when `olmArchived` is true,
|
||||
* also queues an `olms` table update to clear the archived flag.
|
||||
*/
|
||||
export function recordClientPing(
|
||||
clientId: number,
|
||||
olmId: string,
|
||||
olmArchived: boolean
|
||||
): void {
|
||||
const now = Math.floor(Date.now() / 1000);
|
||||
pendingClientPings.set(clientId, now);
|
||||
if (olmArchived) {
|
||||
pendingOlmArchiveResets.add(olmId);
|
||||
}
|
||||
}
|
||||
|
||||
// ── Flush Logic ────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Flush all accumulated site pings to the database.
|
||||
*
|
||||
* Each batch of up to BATCH_SIZE rows is written with a **single** UPDATE
|
||||
* statement. We use the maximum timestamp across the batch so that `lastPing`
|
||||
* reflects the most recent ping seen for any site in the group. This avoids
|
||||
* the multi-statement transaction that previously created additional
|
||||
* row-lock ordering hazards.
|
||||
*/
|
||||
async function flushSitePingsToDb(): Promise<void> {
|
||||
if (pendingSitePings.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Snapshot and clear so new pings arriving during the flush go into a
|
||||
// fresh map for the next cycle.
|
||||
const pingsToFlush = new Map(pendingSitePings);
|
||||
pendingSitePings.clear();
|
||||
|
||||
const entries = Array.from(pingsToFlush.entries());
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
// Use the latest timestamp in the batch so that `lastPing` always
|
||||
// moves forward. Using a single timestamp for the whole batch means
|
||||
// we only ever need one UPDATE statement (no transaction).
|
||||
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
|
||||
const siteIds = batch.map(([id]) => id);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
await db
|
||||
.update(sites)
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: maxTimestamp
|
||||
})
|
||||
.where(inArray(sites.siteId, siteIds));
|
||||
}, "flushSitePingsToDb");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
||||
{ error }
|
||||
);
|
||||
// Re-queue only if the preserved timestamp is newer than any
|
||||
// update that may have landed since we snapshotted.
|
||||
for (const [siteId, timestamp] of batch) {
|
||||
const existing = pendingSitePings.get(siteId);
|
||||
if (!existing || existing < timestamp) {
|
||||
pendingSitePings.set(siteId, timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush all accumulated client (OLM) pings to the database.
|
||||
*
|
||||
* Same single-UPDATE-per-batch approach as `flushSitePingsToDb`.
|
||||
*/
|
||||
async function flushClientPingsToDb(): Promise<void> {
|
||||
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Snapshot and clear
|
||||
const pingsToFlush = new Map(pendingClientPings);
|
||||
pendingClientPings.clear();
|
||||
|
||||
const olmResetsToFlush = new Set(pendingOlmArchiveResets);
|
||||
pendingOlmArchiveResets.clear();
|
||||
|
||||
// ── Flush client pings ─────────────────────────────────────────────
|
||||
if (pingsToFlush.size > 0) {
|
||||
const entries = Array.from(pingsToFlush.entries());
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
|
||||
const clientIds = batch.map(([id]) => id);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
await db
|
||||
.update(clients)
|
||||
.set({
|
||||
lastPing: maxTimestamp,
|
||||
online: true,
|
||||
archived: false
|
||||
})
|
||||
.where(inArray(clients.clientId, clientIds));
|
||||
}, "flushClientPingsToDb");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush client ping batch (${batch.length} clients), re-queuing for next cycle`,
|
||||
{ error }
|
||||
);
|
||||
for (const [clientId, timestamp] of batch) {
|
||||
const existing = pendingClientPings.get(clientId);
|
||||
if (!existing || existing < timestamp) {
|
||||
pendingClientPings.set(clientId, timestamp);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Flush OLM archive resets ───────────────────────────────────────
|
||||
if (olmResetsToFlush.size > 0) {
|
||||
const olmIds = Array.from(olmResetsToFlush).sort();
|
||||
|
||||
const BATCH_SIZE = 50;
|
||||
for (let i = 0; i < olmIds.length; i += BATCH_SIZE) {
|
||||
const batch = olmIds.slice(i, i + BATCH_SIZE);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
await db
|
||||
.update(olms)
|
||||
.set({ archived: false })
|
||||
.where(inArray(olms.olmId, batch));
|
||||
}, "flushOlmArchiveResets");
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
`Failed to flush OLM archive reset batch (${batch.length} olms), re-queuing for next cycle`,
|
||||
{ error }
|
||||
);
|
||||
for (const olmId of batch) {
|
||||
pendingOlmArchiveResets.add(olmId);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Flush everything — called by the interval timer and during shutdown.
|
||||
*/
|
||||
export async function flushPingsToDb(): Promise<void> {
|
||||
await flushSitePingsToDb();
|
||||
await flushClientPingsToDb();
|
||||
}
|
||||
|
||||
// ── Retry / Error Helpers ──────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Simple retry wrapper with exponential backoff for transient errors
|
||||
* (deadlocks, connection timeouts, unexpected disconnects).
|
||||
*
|
||||
* PostgreSQL deadlocks (40P01) are always safe to retry: the database
|
||||
* guarantees exactly one winner per deadlock pair, so the loser just needs
|
||||
* to try again. MAX_RETRIES is intentionally higher than typical connection
|
||||
* retry budgets to give deadlock victims enough chances to succeed.
|
||||
*/
|
||||
async function withRetry<T>(
|
||||
operation: () => Promise<T>,
|
||||
context: string
|
||||
): Promise<T> {
|
||||
let attempt = 0;
|
||||
while (true) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: any) {
|
||||
if (isTransientError(error) && attempt < MAX_RETRIES) {
|
||||
attempt++;
|
||||
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
||||
const jitter = Math.random() * baseDelay;
|
||||
const delay = baseDelay + jitter;
|
||||
logger.warn(
|
||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`,
|
||||
{ code: error?.code ?? error?.cause?.code }
|
||||
);
|
||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||
continue;
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Detect transient errors that are safe to retry.
|
||||
*/
|
||||
function isTransientError(error: any): boolean {
|
||||
if (!error) return false;
|
||||
|
||||
const message = (error.message || "").toLowerCase();
|
||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||
const code = error.code || error.cause?.code || "";
|
||||
|
||||
// Connection timeout / terminated
|
||||
if (
|
||||
message.includes("connection timeout") ||
|
||||
message.includes("connection terminated") ||
|
||||
message.includes("timeout exceeded when trying to connect") ||
|
||||
causeMessage.includes("connection terminated unexpectedly") ||
|
||||
causeMessage.includes("connection timeout")
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL deadlock detected — always safe to retry (one winner guaranteed)
|
||||
if (code === "40P01" || message.includes("deadlock")) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// PostgreSQL serialization failure
|
||||
if (code === "40001") {
|
||||
return true;
|
||||
}
|
||||
|
||||
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
|
||||
if (
|
||||
code === "ECONNRESET" ||
|
||||
code === "ECONNREFUSED" ||
|
||||
code === "EPIPE" ||
|
||||
code === "ETIMEDOUT"
|
||||
) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
// ── Lifecycle ──────────────────────────────────────────────────────────
|
||||
|
||||
/**
|
||||
* Start the background flush timer. Call this once at server startup.
|
||||
*/
|
||||
export function startPingAccumulator(): void {
|
||||
if (flushTimer) {
|
||||
return; // Already running
|
||||
}
|
||||
|
||||
flushTimer = setInterval(async () => {
|
||||
// Skip this tick if the previous flush is still in progress.
|
||||
// setInterval does not await async callbacks, so without this guard
|
||||
// two flush cycles can run concurrently and deadlock each other on
|
||||
// overlapping bulk UPDATE statements.
|
||||
if (isFlushing) {
|
||||
logger.debug(
|
||||
"Ping accumulator: previous flush still in progress, skipping cycle"
|
||||
);
|
||||
return;
|
||||
}
|
||||
|
||||
isFlushing = true;
|
||||
try {
|
||||
await flushPingsToDb();
|
||||
} catch (error) {
|
||||
logger.error("Unhandled error in ping accumulator flush", {
|
||||
error
|
||||
});
|
||||
} finally {
|
||||
isFlushing = false;
|
||||
}
|
||||
}, FLUSH_INTERVAL_MS);
|
||||
|
||||
// Don't prevent the process from exiting
|
||||
flushTimer.unref();
|
||||
|
||||
logger.info(
|
||||
`Ping accumulator started (flush interval: ${FLUSH_INTERVAL_MS}ms)`
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop the background flush timer and perform a final flush.
|
||||
* Call this during graceful shutdown.
|
||||
*/
|
||||
export async function stopPingAccumulator(): Promise<void> {
|
||||
if (flushTimer) {
|
||||
clearInterval(flushTimer);
|
||||
flushTimer = null;
|
||||
}
|
||||
|
||||
// Final flush to persist any remaining pings.
|
||||
// Wait for any in-progress flush to finish first so we don't race.
|
||||
if (isFlushing) {
|
||||
logger.debug(
|
||||
"Ping accumulator: waiting for in-progress flush before stopping…"
|
||||
);
|
||||
await new Promise<void>((resolve) => {
|
||||
const poll = setInterval(() => {
|
||||
if (!isFlushing) {
|
||||
clearInterval(poll);
|
||||
resolve();
|
||||
}
|
||||
}, 50);
|
||||
});
|
||||
}
|
||||
|
||||
try {
|
||||
await flushPingsToDb();
|
||||
} catch (error) {
|
||||
logger.error("Error during final ping accumulator flush", { error });
|
||||
}
|
||||
|
||||
logger.info("Ping accumulator stopped");
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of pending (unflushed) pings. Useful for monitoring.
|
||||
*/
|
||||
export function getPendingPingCount(): number {
|
||||
return pendingSitePings.size + pendingClientPings.size;
|
||||
}
|
||||
289
server/routers/newt/registerNewt.ts
Normal file
289
server/routers/newt/registerNewt.ts
Normal file
@@ -0,0 +1,289 @@
|
||||
import { Request, Response, NextFunction } from "express";
|
||||
import { z } from "zod";
|
||||
import { db } from "@server/db";
|
||||
import {
|
||||
siteProvisioningKeys,
|
||||
siteProvisioningKeyOrg,
|
||||
newts,
|
||||
orgs,
|
||||
roles,
|
||||
roleSites,
|
||||
sites
|
||||
} from "@server/db";
|
||||
import response from "@server/lib/response";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
import createHttpError from "http-errors";
|
||||
import logger from "@server/logger";
|
||||
import { eq, and, sql } from "drizzle-orm";
|
||||
import { fromError } from "zod-validation-error";
|
||||
import { verifyPassword, hashPassword } from "@server/auth/password";
|
||||
import {
|
||||
generateId,
|
||||
generateIdFromEntropySize
|
||||
} from "@server/auth/sessions/app";
|
||||
import { getUniqueSiteName } from "@server/db/names";
|
||||
import moment from "moment";
|
||||
import { build } from "@server/build";
|
||||
import { usageService } from "@server/lib/billing/usageService";
|
||||
import { FeatureId } from "@server/lib/billing";
|
||||
import { INSPECT_MAX_BYTES } from "buffer";
|
||||
import { getNextAvailableClientSubnet } from "@server/lib/ip";
|
||||
|
||||
const bodySchema = z.object({
|
||||
provisioningKey: z.string().nonempty(),
|
||||
name: z.string().optional()
|
||||
});
|
||||
|
||||
export type RegisterNewtBody = z.infer<typeof bodySchema>;
|
||||
|
||||
export type RegisterNewtResponse = {
|
||||
newtId: string;
|
||||
secret: string;
|
||||
};
|
||||
|
||||
export async function registerNewt(
|
||||
req: Request,
|
||||
res: Response,
|
||||
next: NextFunction
|
||||
): Promise<any> {
|
||||
try {
|
||||
const parsedBody = bodySchema.safeParse(req.body);
|
||||
if (!parsedBody.success) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.BAD_REQUEST,
|
||||
fromError(parsedBody.error).toString()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
const { provisioningKey, name } = parsedBody.data;
|
||||
|
||||
// Keys are in the format "siteProvisioningKeyId.secret"
|
||||
const dotIndex = provisioningKey.indexOf(".");
|
||||
if (dotIndex === -1) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.BAD_REQUEST,
|
||||
"Invalid provisioning key format"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
const provisioningKeyId = provisioningKey.substring(0, dotIndex);
|
||||
const provisioningKeySecret = provisioningKey.substring(dotIndex + 1);
|
||||
|
||||
// Look up the provisioning key by ID, joining to get the orgId
|
||||
const [keyRecord] = await db
|
||||
.select({
|
||||
siteProvisioningKeyId:
|
||||
siteProvisioningKeys.siteProvisioningKeyId,
|
||||
siteProvisioningKeyHash:
|
||||
siteProvisioningKeys.siteProvisioningKeyHash,
|
||||
orgId: siteProvisioningKeyOrg.orgId,
|
||||
maxBatchSize: siteProvisioningKeys.maxBatchSize,
|
||||
numUsed: siteProvisioningKeys.numUsed,
|
||||
validUntil: siteProvisioningKeys.validUntil,
|
||||
approveNewSites: siteProvisioningKeys.approveNewSites,
|
||||
})
|
||||
.from(siteProvisioningKeys)
|
||||
.innerJoin(
|
||||
siteProvisioningKeyOrg,
|
||||
eq(
|
||||
siteProvisioningKeys.siteProvisioningKeyId,
|
||||
siteProvisioningKeyOrg.siteProvisioningKeyId
|
||||
)
|
||||
)
|
||||
.where(
|
||||
eq(
|
||||
siteProvisioningKeys.siteProvisioningKeyId,
|
||||
provisioningKeyId
|
||||
)
|
||||
)
|
||||
.limit(1);
|
||||
|
||||
if (!keyRecord) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.UNAUTHORIZED,
|
||||
"Invalid provisioning key"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// Verify the secret portion against the stored hash
|
||||
const validSecret = await verifyPassword(
|
||||
provisioningKeySecret,
|
||||
keyRecord.siteProvisioningKeyHash
|
||||
);
|
||||
if (!validSecret) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.UNAUTHORIZED,
|
||||
"Invalid provisioning key"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (keyRecord.maxBatchSize && keyRecord.numUsed >= keyRecord.maxBatchSize) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.UNAUTHORIZED,
|
||||
"Provisioning key has reached its maximum usage"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
if (keyRecord.validUntil && new Date(keyRecord.validUntil) < new Date()) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.UNAUTHORIZED,
|
||||
"Provisioning key has expired"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
const { orgId } = keyRecord;
|
||||
|
||||
// Verify the org exists
|
||||
const [org] = await db.select().from(orgs).where(eq(orgs.orgId, orgId));
|
||||
if (!org) {
|
||||
return next(
|
||||
createHttpError(HttpCode.NOT_FOUND, "Organization not found")
|
||||
);
|
||||
}
|
||||
if (!org.subnet) {
|
||||
return next(
|
||||
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "Organization subnet not found")
|
||||
);
|
||||
}
|
||||
|
||||
// SaaS billing check
|
||||
if (build == "saas") {
|
||||
const usage = await usageService.getUsage(orgId, FeatureId.SITES);
|
||||
if (!usage) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.NOT_FOUND,
|
||||
"No usage data found for this organization"
|
||||
)
|
||||
);
|
||||
}
|
||||
const rejectSites = await usageService.checkLimitSet(
|
||||
orgId,
|
||||
FeatureId.SITES,
|
||||
{
|
||||
...usage,
|
||||
instantaneousValue: (usage.instantaneousValue || 0) + 1
|
||||
}
|
||||
);
|
||||
if (rejectSites) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.FORBIDDEN,
|
||||
"Site limit exceeded. Please upgrade your plan."
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
const niceId = await getUniqueSiteName(orgId);
|
||||
const newtId = generateId(15);
|
||||
const newtSecret = generateIdFromEntropySize(25);
|
||||
const secretHash = await hashPassword(newtSecret);
|
||||
|
||||
let newSiteId: number | undefined;
|
||||
|
||||
await db.transaction(async (trx) => {
|
||||
|
||||
const newClientAddress = await getNextAvailableClientSubnet(orgId);
|
||||
if (!newClientAddress) {
|
||||
return next(
|
||||
createHttpError(
|
||||
HttpCode.INTERNAL_SERVER_ERROR,
|
||||
"No available subnet found"
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
let clientAddress = newClientAddress.split("/")[0];
|
||||
clientAddress = `${clientAddress}/${org.subnet!.split("/")[1]}`; // we want the block size of the whole org
|
||||
|
||||
// Create the site (type "newt", name = niceId)
|
||||
const [newSite] = await trx
|
||||
.insert(sites)
|
||||
.values({
|
||||
orgId,
|
||||
name: name || niceId,
|
||||
niceId,
|
||||
address: clientAddress,
|
||||
type: "newt",
|
||||
dockerSocketEnabled: true,
|
||||
status: keyRecord.approveNewSites ? "approved" : "pending",
|
||||
})
|
||||
.returning();
|
||||
|
||||
newSiteId = newSite.siteId;
|
||||
|
||||
// Grant admin role access to the new site
|
||||
const [adminRole] = await trx
|
||||
.select()
|
||||
.from(roles)
|
||||
.where(and(eq(roles.isAdmin, true), eq(roles.orgId, orgId)))
|
||||
.limit(1);
|
||||
|
||||
if (!adminRole) {
|
||||
throw new Error(`Admin role not found for org ${orgId}`);
|
||||
}
|
||||
|
||||
await trx.insert(roleSites).values({
|
||||
roleId: adminRole.roleId,
|
||||
siteId: newSite.siteId
|
||||
});
|
||||
|
||||
// Create the newt for this site
|
||||
await trx.insert(newts).values({
|
||||
newtId,
|
||||
secretHash,
|
||||
siteId: newSite.siteId,
|
||||
dateCreated: moment().toISOString()
|
||||
});
|
||||
|
||||
// Consume the provisioning key — cascade removes siteProvisioningKeyOrg
|
||||
await trx
|
||||
.update(siteProvisioningKeys)
|
||||
.set({
|
||||
lastUsed: moment().toISOString(),
|
||||
numUsed: sql`${siteProvisioningKeys.numUsed} + 1`
|
||||
})
|
||||
.where(
|
||||
eq(
|
||||
siteProvisioningKeys.siteProvisioningKeyId,
|
||||
provisioningKeyId
|
||||
)
|
||||
);
|
||||
|
||||
await usageService.add(orgId, FeatureId.SITES, 1, trx);
|
||||
});
|
||||
|
||||
logger.info(
|
||||
`Provisioned new site (ID: ${newSiteId}) and newt (ID: ${newtId}) for org ${orgId} via provisioning key ${provisioningKeyId}`
|
||||
);
|
||||
|
||||
return response<RegisterNewtResponse>(res, {
|
||||
data: {
|
||||
newtId,
|
||||
secret: newtSecret
|
||||
},
|
||||
success: true,
|
||||
error: false,
|
||||
message: "Newt registered successfully",
|
||||
status: HttpCode.CREATED
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error(error);
|
||||
return next(
|
||||
createHttpError(HttpCode.INTERNAL_SERVER_ERROR, "An error occurred")
|
||||
);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user