Compare commits

..

2 Commits

Author SHA1 Message Date
Owen
48abb9e98c Breakout sites tables 2026-04-08 22:04:12 -04:00
Owen
28ef5238c9 Add CODEOWNERS 2026-04-07 11:36:02 -04:00
17 changed files with 300 additions and 136 deletions

1
.github/CODEOWNERS vendored Normal file
View File

@@ -0,0 +1 @@
* @oschwartz10612 @miloschwartz

View File

@@ -35,13 +35,19 @@
</div> </div>
<p align="center">
<a href="https://docs.pangolin.net/careers/join-us">
<img src="https://img.shields.io/badge/🚀_We're_Hiring!-Join_Our_Team-brightgreen?style=for-the-badge" alt="We're Hiring!" />
</a>
</p>
<p align="center"> <p align="center">
<strong> <strong>
Get started with Pangolin at <a href="https://app.pangolin.net/auth/signup">app.pangolin.net</a> Get started with Pangolin at <a href="https://app.pangolin.net/auth/signup">app.pangolin.net</a>
</strong> </strong>
</p> </p>
Pangolin is an open-source, identity-based remote access platform built on WireGuard that enables secure, seamless connectivity to private and public resources. Pangolin combines reverse proxy and VPN capabilities into one platform, providing browser-based access to web applications and client-based access to any private resources with NAT traversal, all with granular access controls. Pangolin is an open-source, identity-based remote access platform built on WireGuard that enables secure, seamless connectivity to private and public resources. Pangolin combines reverse proxy and VPN capabilities into one platform, providing browser-based access to web applications and client-based access to any private resources, all with zero-trust security and granular access control.
## Installation ## Installation
@@ -54,16 +60,16 @@ Pangolin is an open-source, identity-based remote access platform built on WireG
| <img width=500 /> | Description | | <img width=500 /> | Description |
|-----------------|--------------| |-----------------|--------------|
| **Pangolin Cloud** | Fully managed service - no infrastructure required. | | **Pangolin Cloud** | Fully managed service with instant setup and pay-as-you-go pricing - no infrastructure required. Or, self-host your own [remote node](https://docs.pangolin.net/manage/remote-node/understanding-nodes) and connect to our control plane. |
| **Self-Host: Community Edition** | Free, open source, and licensed under AGPL-3. | | **Self-Host: Community Edition** | Free, open source, and licensed under AGPL-3. |
| **Self-Host: Enterprise Edition** | Licensed under Fossorial Commercial License. Free for personal and hobbyist use, and for businesses making less than \$100K USD gross annual revenue. | | **Self-Host: Enterprise Edition** | Licensed under Fossorial Commercial License. Free for personal and hobbyist use, and for businesses earning under \$100K USD annually. |
## Key Features ## Key Features
| <img width=500 /> | <img width=500 /> | | <img width=500 /> | <img width=500 /> |
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------| |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------|
| **Connect remote networks with sites**<br /><br />Pangolin's site connectors create secure tunnels from remote networks without requiring public IP addresses or open ports. Sites make any network anywhere available for authorized access. | <img src="public/screenshots/sites.png" width=500 /><tr></tr> | | **Connect remote networks with sites**<br /><br />Pangolin's lightweight site connectors create secure tunnels from remote networks without requiring public IP addresses or open ports. Sites make any network anywhere available for authorized access. | <img src="public/screenshots/sites.png" width=500 /><tr></tr> |
| **Browser-based reverse proxy access**<br /><br />Expose web applications through identity and context-aware tunneled reverse proxies. Users access applications through any web browser with authentication and granular access control. Pangolin handles routing, load balancing, health checking, and automatic SSL certificates without exposing your network directly to the internet. | <img src="public/clip.gif" width=500 /><tr></tr> | | **Browser-based reverse proxy access**<br /><br />Expose web applications through identity and context-aware tunneled reverse proxies. Pangolin handles routing, load balancing, health checking, and automatic SSL certificates without exposing your network directly to the internet. Users access applications through any web browser with authentication and granular access control. | <img src="public/clip.gif" width=500 /><tr></tr> |
| **Client-based private resource access**<br /><br />Access private resources like SSH servers, databases, RDP, and entire network ranges through Pangolin clients. Intelligent NAT traversal enables connections even through restrictive firewalls, while DNS aliases provide friendly names and fast connections to resources across all your sites. | <img src="public/screenshots/private-resources.png" width=500 /><tr></tr> | | **Client-based private resource access**<br /><br />Access private resources like SSH servers, databases, RDP, and entire network ranges through Pangolin clients. Intelligent NAT traversal enables connections even through restrictive firewalls, while DNS aliases provide friendly names and fast connections to resources across all your sites. | <img src="public/screenshots/private-resources.png" width=500 /><tr></tr> |
| **Zero-trust granular access**<br /><br />Grant users access to specific resources, not entire networks. Unlike traditional VPNs that expose full network access, Pangolin's zero-trust model ensures users can only reach the applications and services you explicitly define, reducing security risk and attack surface. | <img src="public/screenshots/user-devices.png" width=500 /><tr></tr> | | **Zero-trust granular access**<br /><br />Grant users access to specific resources, not entire networks. Unlike traditional VPNs that expose full network access, Pangolin's zero-trust model ensures users can only reach the applications and services you explicitly define, reducing security risk and attack surface. | <img src="public/screenshots/user-devices.png" width=500 /><tr></tr> |
@@ -81,7 +87,7 @@ Download the Pangolin client for your platform:
### Sign up now ### Sign up now
Create a free account at [app.pangolin.net](https://app.pangolin.net) to get started with Pangolin Cloud. Create an account at [app.pangolin.net](https://app.pangolin.net) to get started with Pangolin Cloud. A generous free tier is available.
### Check out the docs ### Check out the docs
@@ -96,3 +102,7 @@ Pangolin is dual licensed under the AGPL-3 and the [Fossorial Commercial License
## Contributions ## Contributions
Please see [CONTRIBUTING](./CONTRIBUTING.md) in the repository for guidelines and best practices. Please see [CONTRIBUTING](./CONTRIBUTING.md) in the repository for guidelines and best practices.
---
WireGuard® is a registered trademark of Jason A. Donenfeld.

View File

@@ -89,12 +89,8 @@ export const sites = pgTable("sites", {
name: varchar("name").notNull(), name: varchar("name").notNull(),
pubKey: varchar("pubKey"), pubKey: varchar("pubKey"),
subnet: varchar("subnet"), subnet: varchar("subnet"),
megabytesIn: real("bytesIn").default(0),
megabytesOut: real("bytesOut").default(0),
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
type: varchar("type").notNull(), // "newt" or "wireguard" type: varchar("type").notNull(), // "newt" or "wireguard"
online: boolean("online").notNull().default(false), online: boolean("online").notNull().default(false),
lastPing: integer("lastPing"),
address: varchar("address"), address: varchar("address"),
endpoint: varchar("endpoint"), endpoint: varchar("endpoint"),
publicKey: varchar("publicKey"), publicKey: varchar("publicKey"),
@@ -729,10 +725,7 @@ export const clients = pgTable("clients", {
name: varchar("name").notNull(), name: varchar("name").notNull(),
pubKey: varchar("pubKey"), pubKey: varchar("pubKey"),
subnet: varchar("subnet").notNull(), subnet: varchar("subnet").notNull(),
megabytesIn: real("bytesIn"),
megabytesOut: real("bytesOut"),
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
lastPing: integer("lastPing"),
type: varchar("type").notNull(), // "olm" type: varchar("type").notNull(), // "olm"
online: boolean("online").notNull().default(false), online: boolean("online").notNull().default(false),
// endpoint: varchar("endpoint"), // endpoint: varchar("endpoint"),
@@ -745,6 +738,42 @@ export const clients = pgTable("clients", {
>() >()
}); });
export const sitePing = pgTable("sitePing", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const siteBandwidth = pgTable("siteBandwidth", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
megabytesIn: real("bytesIn").default(0),
megabytesOut: real("bytesOut").default(0),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientPing = pgTable("clientPing", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const clientBandwidth = pgTable("clientBandwidth", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
megabytesIn: real("bytesIn"),
megabytesOut: real("bytesOut"),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientSitesAssociationsCache = pgTable( export const clientSitesAssociationsCache = pgTable(
"clientSitesAssociationsCache", "clientSitesAssociationsCache",
{ {
@@ -1106,3 +1135,7 @@ export type RequestAuditLog = InferSelectModel<typeof requestAuditLog>;
export type RoundTripMessageTracker = InferSelectModel< export type RoundTripMessageTracker = InferSelectModel<
typeof roundTripMessageTracker typeof roundTripMessageTracker
>; >;
export type SitePing = typeof sitePing.$inferSelect;
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
export type ClientPing = typeof clientPing.$inferSelect;
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;

View File

@@ -95,12 +95,8 @@ export const sites = sqliteTable("sites", {
name: text("name").notNull(), name: text("name").notNull(),
pubKey: text("pubKey"), pubKey: text("pubKey"),
subnet: text("subnet"), subnet: text("subnet"),
megabytesIn: integer("bytesIn").default(0),
megabytesOut: integer("bytesOut").default(0),
lastBandwidthUpdate: text("lastBandwidthUpdate"),
type: text("type").notNull(), // "newt" or "wireguard" type: text("type").notNull(), // "newt" or "wireguard"
online: integer("online", { mode: "boolean" }).notNull().default(false), online: integer("online", { mode: "boolean" }).notNull().default(false),
lastPing: integer("lastPing"),
// exit node stuff that is how to connect to the site when it has a wg server // exit node stuff that is how to connect to the site when it has a wg server
address: text("address"), // this is the address of the wireguard interface in newt address: text("address"), // this is the address of the wireguard interface in newt
@@ -399,10 +395,7 @@ export const clients = sqliteTable("clients", {
pubKey: text("pubKey"), pubKey: text("pubKey"),
olmId: text("olmId"), // to lock it to a specific olm optionally olmId: text("olmId"), // to lock it to a specific olm optionally
subnet: text("subnet").notNull(), subnet: text("subnet").notNull(),
megabytesIn: integer("bytesIn"),
megabytesOut: integer("bytesOut"),
lastBandwidthUpdate: text("lastBandwidthUpdate"),
lastPing: integer("lastPing"),
type: text("type").notNull(), // "olm" type: text("type").notNull(), // "olm"
online: integer("online", { mode: "boolean" }).notNull().default(false), online: integer("online", { mode: "boolean" }).notNull().default(false),
// endpoint: text("endpoint"), // endpoint: text("endpoint"),
@@ -414,6 +407,42 @@ export const clients = sqliteTable("clients", {
>() >()
}); });
export const sitePing = sqliteTable("sitePing", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const siteBandwidth = sqliteTable("siteBandwidth", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
megabytesIn: integer("bytesIn").default(0),
megabytesOut: integer("bytesOut").default(0),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientPing = sqliteTable("clientPing", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const clientBandwidth = sqliteTable("clientBandwidth", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
megabytesIn: integer("bytesIn"),
megabytesOut: integer("bytesOut"),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientSitesAssociationsCache = sqliteTable( export const clientSitesAssociationsCache = sqliteTable(
"clientSitesAssociationsCache", "clientSitesAssociationsCache",
{ {
@@ -1209,3 +1238,7 @@ export type DeviceWebAuthCode = InferSelectModel<typeof deviceWebAuthCodes>;
export type RoundTripMessageTracker = InferSelectModel< export type RoundTripMessageTracker = InferSelectModel<
typeof roundTripMessageTracker typeof roundTripMessageTracker
>; >;
export type SitePing = typeof sitePing.$inferSelect;
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
export type ClientPing = typeof clientPing.$inferSelect;
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;

View File

@@ -3,7 +3,7 @@ import config from "./config";
import { getHostMeta } from "./hostMeta"; import { getHostMeta } from "./hostMeta";
import logger from "@server/logger"; import logger from "@server/logger";
import { apiKeys, db, roles, siteResources } from "@server/db"; import { apiKeys, db, roles, siteResources } from "@server/db";
import { sites, users, orgs, resources, clients, idp } from "@server/db"; import { sites, users, orgs, resources, clients, idp, siteBandwidth } from "@server/db";
import { eq, count, notInArray, and, isNotNull, isNull } from "drizzle-orm"; import { eq, count, notInArray, and, isNotNull, isNull } from "drizzle-orm";
import { APP_VERSION } from "./consts"; import { APP_VERSION } from "./consts";
import crypto from "crypto"; import crypto from "crypto";
@@ -150,12 +150,13 @@ class TelemetryClient {
const siteDetails = await db const siteDetails = await db
.select({ .select({
siteName: sites.name, siteName: sites.name,
megabytesIn: sites.megabytesIn, megabytesIn: siteBandwidth.megabytesIn,
megabytesOut: sites.megabytesOut, megabytesOut: siteBandwidth.megabytesOut,
type: sites.type, type: sites.type,
online: sites.online online: sites.online
}) })
.from(sites); .from(sites)
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
const supporterKey = config.getSupporterData(); const supporterKey = config.getSupporterData();

View File

@@ -18,10 +18,11 @@ import {
subscriptionItems, subscriptionItems,
usage, usage,
sites, sites,
siteBandwidth,
customers, customers,
orgs orgs
} from "@server/db"; } from "@server/db";
import { eq, and } from "drizzle-orm"; import { eq, and, inArray } from "drizzle-orm";
import logger from "@server/logger"; import logger from "@server/logger";
import { getFeatureIdByMetricId, getFeatureIdByPriceId } from "@server/lib/billing/features"; import { getFeatureIdByMetricId, getFeatureIdByPriceId } from "@server/lib/billing/features";
import stripe from "#private/lib/stripe"; import stripe from "#private/lib/stripe";
@@ -253,14 +254,19 @@ export async function handleSubscriptionUpdated(
); );
} }
// Also reset the sites to 0 // Also reset the site bandwidth to 0
await trx await trx
.update(sites) .update(siteBandwidth)
.set({ .set({
megabytesIn: 0, megabytesIn: 0,
megabytesOut: 0 megabytesOut: 0
}) })
.where(eq(sites.orgId, orgId)); .where(
inArray(
siteBandwidth.siteId,
trx.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
)
);
}); });
} }
} }

View File

@@ -1,4 +1,5 @@
import { import {
clientBandwidth,
clients, clients,
clientSitesAssociationsCache, clientSitesAssociationsCache,
currentFingerprint, currentFingerprint,
@@ -180,8 +181,8 @@ function queryClientsBase() {
name: clients.name, name: clients.name,
pubKey: clients.pubKey, pubKey: clients.pubKey,
subnet: clients.subnet, subnet: clients.subnet,
megabytesIn: clients.megabytesIn, megabytesIn: clientBandwidth.megabytesIn,
megabytesOut: clients.megabytesOut, megabytesOut: clientBandwidth.megabytesOut,
orgName: orgs.name, orgName: orgs.name,
type: clients.type, type: clients.type,
online: clients.online, online: clients.online,
@@ -200,7 +201,8 @@ function queryClientsBase() {
.leftJoin(orgs, eq(clients.orgId, orgs.orgId)) .leftJoin(orgs, eq(clients.orgId, orgs.orgId))
.leftJoin(olms, eq(clients.clientId, olms.clientId)) .leftJoin(olms, eq(clients.clientId, olms.clientId))
.leftJoin(users, eq(clients.userId, users.userId)) .leftJoin(users, eq(clients.userId, users.userId))
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId)); .leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId))
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
} }
async function getSiteAssociations(clientIds: number[]) { async function getSiteAssociations(clientIds: number[]) {
@@ -367,9 +369,15 @@ export async function listClients(
.offset(pageSize * (page - 1)) .offset(pageSize * (page - 1))
.orderBy( .orderBy(
sort_by sort_by
? order === "asc" ? (() => {
? asc(clients[sort_by]) const field =
: desc(clients[sort_by]) sort_by === "megabytesIn"
? clientBandwidth.megabytesIn
: sort_by === "megabytesOut"
? clientBandwidth.megabytesOut
: clients.name;
return order === "asc" ? asc(field) : desc(field);
})()
: asc(clients.name) : asc(clients.name)
); );

View File

@@ -1,5 +1,6 @@
import { build } from "@server/build"; import { build } from "@server/build";
import { import {
clientBandwidth,
clients, clients,
currentFingerprint, currentFingerprint,
db, db,
@@ -211,8 +212,8 @@ function queryUserDevicesBase() {
name: clients.name, name: clients.name,
pubKey: clients.pubKey, pubKey: clients.pubKey,
subnet: clients.subnet, subnet: clients.subnet,
megabytesIn: clients.megabytesIn, megabytesIn: clientBandwidth.megabytesIn,
megabytesOut: clients.megabytesOut, megabytesOut: clientBandwidth.megabytesOut,
orgName: orgs.name, orgName: orgs.name,
type: clients.type, type: clients.type,
online: clients.online, online: clients.online,
@@ -239,7 +240,8 @@ function queryUserDevicesBase() {
.leftJoin(orgs, eq(clients.orgId, orgs.orgId)) .leftJoin(orgs, eq(clients.orgId, orgs.orgId))
.leftJoin(olms, eq(clients.clientId, olms.clientId)) .leftJoin(olms, eq(clients.clientId, olms.clientId))
.leftJoin(users, eq(clients.userId, users.userId)) .leftJoin(users, eq(clients.userId, users.userId))
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId)); .leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId))
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
} }
type OlmWithUpdateAvailable = Awaited< type OlmWithUpdateAvailable = Awaited<
@@ -427,9 +429,15 @@ export async function listUserDevices(
.offset(pageSize * (page - 1)) .offset(pageSize * (page - 1))
.orderBy( .orderBy(
sort_by sort_by
? order === "asc" ? (() => {
? asc(clients[sort_by]) const field =
: desc(clients[sort_by]) sort_by === "megabytesIn"
? clientBandwidth.megabytesIn
: sort_by === "megabytesOut"
? clientBandwidth.megabytesOut
: clients.name;
return order === "asc" ? asc(field) : desc(field);
})()
: asc(clients.clientId) : asc(clients.clientId)
); );

View File

@@ -122,7 +122,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
const snapshot = accumulator; const snapshot = accumulator;
accumulator = new Map<string, AccumulatorEntry>(); accumulator = new Map<string, AccumulatorEntry>();
const currentTime = new Date().toISOString(); const currentEpoch = Math.floor(Date.now() / 1000);
// Sort by publicKey for consistent lock ordering across concurrent // Sort by publicKey for consistent lock ordering across concurrent
// writers — deadlock-prevention strategy. // writers — deadlock-prevention strategy.
@@ -157,33 +157,52 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
orgId: string; orgId: string;
pubKey: string; pubKey: string;
}>(sql` }>(sql`
UPDATE sites WITH upsert AS (
SET INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate")
"bytesOut" = COALESCE("bytesOut", 0) + ${bytesIn}, SELECT s."siteId", ${bytesIn}, ${bytesOut}, ${currentEpoch}
"bytesIn" = COALESCE("bytesIn", 0) + ${bytesOut}, FROM "sites" s WHERE s."pubKey" = ${publicKey}
"lastBandwidthUpdate" = ${currentTime} ON CONFLICT ("siteId") DO UPDATE SET
WHERE "pubKey" = ${publicKey} "bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn",
RETURNING "orgId", "pubKey" "bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut",
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
RETURNING "siteId"
)
SELECT u."siteId", s."orgId", s."pubKey"
FROM upsert u
INNER JOIN "sites" s ON s."siteId" = u."siteId"
`); `);
results.push(...result); results.push(...result);
} }
return results; return results;
} }
// PostgreSQL: batch UPDATE … FROM (VALUES …) — single round-trip per chunk. // PostgreSQL: batch UPSERT via CTE — single round-trip per chunk.
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) => const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)` sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)`
); );
const valuesClause = sql.join(valuesList, sql`, `); const valuesClause = sql.join(valuesList, sql`, `);
return dbQueryRows<{ orgId: string; pubKey: string }>(sql` return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
UPDATE sites WITH vals(pub_key, bytes_in, bytes_out) AS (
SET VALUES ${valuesClause}
"bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in, ),
"bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out, site_lookup AS (
"lastBandwidthUpdate" = ${currentTime} SELECT s."siteId", s."orgId", s."pubKey", v.bytes_in, v.bytes_out
FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out) FROM vals v
WHERE sites."pubKey" = v.pub_key INNER JOIN "sites" s ON s."pubKey" = v.pub_key
RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey" ),
upsert AS (
INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate")
SELECT sl."siteId", sl.bytes_in, sl.bytes_out, ${currentEpoch}::integer
FROM site_lookup sl
ON CONFLICT ("siteId") DO UPDATE SET
"bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn",
"bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut",
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
RETURNING "siteId"
)
SELECT u."siteId", s."orgId", s."pubKey"
FROM upsert u
INNER JOIN "sites" s ON s."siteId" = u."siteId"
`); `);
}, `flush bandwidth chunk [${i}${chunkEnd}]`); }, `flush bandwidth chunk [${i}${chunkEnd}]`);
} catch (error) { } catch (error) {

View File

@@ -1,11 +1,11 @@
import { db, newts, sites, targetHealthCheck, targets } from "@server/db"; import { db, newts, sites, targetHealthCheck, targets, sitePing, siteBandwidth } from "@server/db";
import { import {
hasActiveConnections, hasActiveConnections,
getClientConfigVersion getClientConfigVersion
} from "#dynamic/routers/ws"; } from "#dynamic/routers/ws";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { Newt } from "@server/db"; import { Newt } from "@server/db";
import { eq, lt, isNull, and, or, ne, not } from "drizzle-orm"; import { eq, lt, isNull, and, or, ne } from "drizzle-orm";
import logger from "@server/logger"; import logger from "@server/logger";
import { sendNewtSyncMessage } from "./sync"; import { sendNewtSyncMessage } from "./sync";
import { recordPing } from "./pingAccumulator"; import { recordPing } from "./pingAccumulator";
@@ -41,17 +41,18 @@ export const startNewtOfflineChecker = (): void => {
.select({ .select({
siteId: sites.siteId, siteId: sites.siteId,
newtId: newts.newtId, newtId: newts.newtId,
lastPing: sites.lastPing lastPing: sitePing.lastPing
}) })
.from(sites) .from(sites)
.innerJoin(newts, eq(newts.siteId, sites.siteId)) .innerJoin(newts, eq(newts.siteId, sites.siteId))
.leftJoin(sitePing, eq(sitePing.siteId, sites.siteId))
.where( .where(
and( and(
eq(sites.online, true), eq(sites.online, true),
eq(sites.type, "newt"), eq(sites.type, "newt"),
or( or(
lt(sites.lastPing, twoMinutesAgo), lt(sitePing.lastPing, twoMinutesAgo),
isNull(sites.lastPing) isNull(sitePing.lastPing)
) )
) )
); );
@@ -112,15 +113,11 @@ export const startNewtOfflineChecker = (): void => {
.select({ .select({
siteId: sites.siteId, siteId: sites.siteId,
online: sites.online, online: sites.online,
lastBandwidthUpdate: sites.lastBandwidthUpdate lastBandwidthUpdate: siteBandwidth.lastBandwidthUpdate
}) })
.from(sites) .from(sites)
.where( .innerJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId))
and( .where(eq(sites.type, "wireguard"));
eq(sites.type, "wireguard"),
not(isNull(sites.lastBandwidthUpdate))
)
);
const wireguardOfflineThreshold = Math.floor( const wireguardOfflineThreshold = Math.floor(
(Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000 (Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000
@@ -128,12 +125,7 @@ export const startNewtOfflineChecker = (): void => {
// loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline // loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline
for (const site of allWireguardSites) { for (const site of allWireguardSites) {
const lastBandwidthUpdate = if ((site.lastBandwidthUpdate ?? 0) < wireguardOfflineThreshold && site.online) {
new Date(site.lastBandwidthUpdate!).getTime() / 1000;
if (
lastBandwidthUpdate < wireguardOfflineThreshold &&
site.online
) {
logger.info( logger.info(
`Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes` `Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes`
); );
@@ -142,10 +134,7 @@ export const startNewtOfflineChecker = (): void => {
.update(sites) .update(sites)
.set({ online: false }) .set({ online: false })
.where(eq(sites.siteId, site.siteId)); .where(eq(sites.siteId, site.siteId));
} else if ( } else if ((site.lastBandwidthUpdate ?? 0) >= wireguardOfflineThreshold && !site.online) {
lastBandwidthUpdate >= wireguardOfflineThreshold &&
!site.online
) {
logger.info( logger.info(
`Marking wireguard site ${site.siteId} online: recent bandwidth update` `Marking wireguard site ${site.siteId} online: recent bandwidth update`
); );

View File

@@ -1,6 +1,5 @@
import { db } from "@server/db"; import { db, clients, clientBandwidth } from "@server/db";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { clients } from "@server/db";
import { eq, sql } from "drizzle-orm"; import { eq, sql } from "drizzle-orm";
import logger from "@server/logger"; import logger from "@server/logger";
@@ -85,7 +84,7 @@ export async function flushBandwidthToDb(): Promise<void> {
const snapshot = accumulator; const snapshot = accumulator;
accumulator = new Map<string, BandwidthAccumulator>(); accumulator = new Map<string, BandwidthAccumulator>();
const currentTime = new Date().toISOString(); const currentEpoch = Math.floor(Date.now() / 1000);
// Sort by publicKey for consistent lock ordering across concurrent // Sort by publicKey for consistent lock ordering across concurrent
// writers — this is the same deadlock-prevention strategy used in the // writers — this is the same deadlock-prevention strategy used in the
@@ -101,19 +100,37 @@ export async function flushBandwidthToDb(): Promise<void> {
for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) { for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) {
try { try {
await withDeadlockRetry(async () => { await withDeadlockRetry(async () => {
// Use atomic SQL increment to avoid the SELECT-then-UPDATE // Find clientId by pubKey
// anti-pattern and the races it would introduce. const [clientRow] = await db
.select({ clientId: clients.clientId })
.from(clients)
.where(eq(clients.pubKey, publicKey))
.limit(1);
if (!clientRow) {
logger.warn(`No client found for pubKey ${publicKey}, skipping`);
return;
}
await db await db
.update(clients) .insert(clientBandwidth)
.set({ .values({
clientId: clientRow.clientId,
// Note: bytesIn from peer goes to megabytesOut (data // Note: bytesIn from peer goes to megabytesOut (data
// sent to client) and bytesOut from peer goes to // sent to client) and bytesOut from peer goes to
// megabytesIn (data received from client). // megabytesIn (data received from client).
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`, megabytesOut: bytesIn,
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`, megabytesIn: bytesOut,
lastBandwidthUpdate: currentTime lastBandwidthUpdate: currentEpoch
}) })
.where(eq(clients.pubKey, publicKey)); .onConflictDoUpdate({
target: clientBandwidth.clientId,
set: {
megabytesOut: sql`COALESCE(${clientBandwidth.megabytesOut}, 0) + ${bytesIn}`,
megabytesIn: sql`COALESCE(${clientBandwidth.megabytesIn}, 0) + ${bytesOut}`,
lastBandwidthUpdate: currentEpoch
}
});
}, `flush bandwidth for client ${publicKey}`); }, `flush bandwidth for client ${publicKey}`);
} catch (error) { } catch (error) {
logger.error( logger.error(

View File

@@ -1,6 +1,6 @@
import { db } from "@server/db"; import { db } from "@server/db";
import { sites, clients, olms } from "@server/db"; import { sites, clients, olms, sitePing, clientPing } from "@server/db";
import { inArray } from "drizzle-orm"; import { inArray, sql } from "drizzle-orm";
import logger from "@server/logger"; import logger from "@server/logger";
/** /**
@@ -81,11 +81,8 @@ export function recordClientPing(
/** /**
* Flush all accumulated site pings to the database. * Flush all accumulated site pings to the database.
* *
* Each batch of up to BATCH_SIZE rows is written with a **single** UPDATE * For each batch: first upserts individual per-site timestamps into
* statement. We use the maximum timestamp across the batch so that `lastPing` * `sitePing`, then bulk-updates `sites.online = true`.
* reflects the most recent ping seen for any site in the group. This avoids
* the multi-statement transaction that previously created additional
* row-lock ordering hazards.
*/ */
async function flushSitePingsToDb(): Promise<void> { async function flushSitePingsToDb(): Promise<void> {
if (pendingSitePings.size === 0) { if (pendingSitePings.size === 0) {
@@ -103,20 +100,25 @@ async function flushSitePingsToDb(): Promise<void> {
for (let i = 0; i < entries.length; i += BATCH_SIZE) { for (let i = 0; i < entries.length; i += BATCH_SIZE) {
const batch = entries.slice(i, i + BATCH_SIZE); const batch = entries.slice(i, i + BATCH_SIZE);
// Use the latest timestamp in the batch so that `lastPing` always
// moves forward. Using a single timestamp for the whole batch means
// we only ever need one UPDATE statement (no transaction).
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
const siteIds = batch.map(([id]) => id); const siteIds = batch.map(([id]) => id);
try { try {
await withRetry(async () => { await withRetry(async () => {
const rows = batch.map(([siteId, ts]) => ({ siteId, lastPing: ts }));
// Step 1: Upsert ping timestamps into sitePing
await db
.insert(sitePing)
.values(rows)
.onConflictDoUpdate({
target: sitePing.siteId,
set: { lastPing: sql`excluded."lastPing"` }
});
// Step 2: Update online status on sites
await db await db
.update(sites) .update(sites)
.set({ .set({ online: true })
online: true,
lastPing: maxTimestamp
})
.where(inArray(sites.siteId, siteIds)); .where(inArray(sites.siteId, siteIds));
}, "flushSitePingsToDb"); }, "flushSitePingsToDb");
} catch (error) { } catch (error) {
@@ -139,7 +141,8 @@ async function flushSitePingsToDb(): Promise<void> {
/** /**
* Flush all accumulated client (OLM) pings to the database. * Flush all accumulated client (OLM) pings to the database.
* *
* Same single-UPDATE-per-batch approach as `flushSitePingsToDb`. * For each batch: first upserts individual per-client timestamps into
* `clientPing`, then bulk-updates `clients.online = true, archived = false`.
*/ */
async function flushClientPingsToDb(): Promise<void> { async function flushClientPingsToDb(): Promise<void> {
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) { if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
@@ -161,18 +164,25 @@ async function flushClientPingsToDb(): Promise<void> {
for (let i = 0; i < entries.length; i += BATCH_SIZE) { for (let i = 0; i < entries.length; i += BATCH_SIZE) {
const batch = entries.slice(i, i + BATCH_SIZE); const batch = entries.slice(i, i + BATCH_SIZE);
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
const clientIds = batch.map(([id]) => id); const clientIds = batch.map(([id]) => id);
try { try {
await withRetry(async () => { await withRetry(async () => {
const rows = batch.map(([clientId, ts]) => ({ clientId, lastPing: ts }));
// Step 1: Upsert ping timestamps into clientPing
await db
.insert(clientPing)
.values(rows)
.onConflictDoUpdate({
target: clientPing.clientId,
set: { lastPing: sql`excluded."lastPing"` }
});
// Step 2: Update online + unarchive on clients
await db await db
.update(clients) .update(clients)
.set({ .set({ online: true, archived: false })
lastPing: maxTimestamp,
online: true,
archived: false
})
.where(inArray(clients.clientId, clientIds)); .where(inArray(clients.clientId, clientIds));
}, "flushClientPingsToDb"); }, "flushClientPingsToDb");
} catch (error) { } catch (error) {

View File

@@ -1,8 +1,8 @@
import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws"; import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws";
import { db } from "@server/db"; import { db } from "@server/db";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { clients, olms, Olm } from "@server/db"; import { clients, olms, Olm, clientPing } from "@server/db";
import { eq, lt, isNull, and, or } from "drizzle-orm"; import { eq, lt, isNull, and, or, inArray } from "drizzle-orm";
import { recordClientPing } from "@server/routers/newt/pingAccumulator"; import { recordClientPing } from "@server/routers/newt/pingAccumulator";
import logger from "@server/logger"; import logger from "@server/logger";
import { validateSessionToken } from "@server/auth/sessions/app"; import { validateSessionToken } from "@server/auth/sessions/app";
@@ -37,21 +37,33 @@ export const startOlmOfflineChecker = (): void => {
// TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING // TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING
// Find clients that haven't pinged in the last 2 minutes and mark them as offline // Find clients that haven't pinged in the last 2 minutes and mark them as offline
const offlineClients = await db const staleClientRows = await db
.update(clients) .select({
.set({ online: false }) clientId: clients.clientId,
olmId: clients.olmId,
lastPing: clientPing.lastPing
})
.from(clients)
.leftJoin(clientPing, eq(clientPing.clientId, clients.clientId))
.where( .where(
and( and(
eq(clients.online, true), eq(clients.online, true),
or( or(
lt(clients.lastPing, twoMinutesAgo), lt(clientPing.lastPing, twoMinutesAgo),
isNull(clients.lastPing) isNull(clientPing.lastPing)
) )
) )
) );
.returning();
for (const offlineClient of offlineClients) { if (staleClientRows.length > 0) {
const staleClientIds = staleClientRows.map((c) => c.clientId);
await db
.update(clients)
.set({ online: false })
.where(inArray(clients.clientId, staleClientIds));
}
for (const offlineClient of staleClientRows) {
logger.info( logger.info(
`Kicking offline olm client ${offlineClient.clientId} due to inactivity` `Kicking offline olm client ${offlineClient.clientId} due to inactivity`
); );

View File

@@ -1,7 +1,7 @@
import { NextFunction, Request, Response } from "express"; import { NextFunction, Request, Response } from "express";
import { z } from "zod"; import { z } from "zod";
import { db, sites } from "@server/db"; import { db, sites, siteBandwidth } from "@server/db";
import { eq } from "drizzle-orm"; import { eq, inArray } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode"; import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors"; import createHttpError from "http-errors";
@@ -60,12 +60,17 @@ export async function resetOrgBandwidth(
} }
await db await db
.update(sites) .update(siteBandwidth)
.set({ .set({
megabytesIn: 0, megabytesIn: 0,
megabytesOut: 0 megabytesOut: 0
}) })
.where(eq(sites.orgId, orgId)); .where(
inArray(
siteBandwidth.siteId,
db.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
)
);
return response(res, { return response(res, {
data: {}, data: {},

View File

@@ -6,6 +6,7 @@ import {
remoteExitNodes, remoteExitNodes,
roleSites, roleSites,
sites, sites,
siteBandwidth,
userSites userSites
} from "@server/db"; } from "@server/db";
import cache from "#dynamic/lib/cache"; import cache from "#dynamic/lib/cache";
@@ -155,8 +156,8 @@ function querySitesBase() {
name: sites.name, name: sites.name,
pubKey: sites.pubKey, pubKey: sites.pubKey,
subnet: sites.subnet, subnet: sites.subnet,
megabytesIn: sites.megabytesIn, megabytesIn: siteBandwidth.megabytesIn,
megabytesOut: sites.megabytesOut, megabytesOut: siteBandwidth.megabytesOut,
orgName: orgs.name, orgName: orgs.name,
type: sites.type, type: sites.type,
online: sites.online, online: sites.online,
@@ -175,7 +176,8 @@ function querySitesBase() {
.leftJoin( .leftJoin(
remoteExitNodes, remoteExitNodes,
eq(remoteExitNodes.exitNodeId, sites.exitNodeId) eq(remoteExitNodes.exitNodeId, sites.exitNodeId)
); )
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
} }
type SiteWithUpdateAvailable = Awaited<ReturnType<typeof querySitesBase>>[0] & { type SiteWithUpdateAvailable = Awaited<ReturnType<typeof querySitesBase>>[0] & {
@@ -299,9 +301,15 @@ export async function listSites(
.offset(pageSize * (page - 1)) .offset(pageSize * (page - 1))
.orderBy( .orderBy(
sort_by sort_by
? order === "asc" ? (() => {
? asc(sites[sort_by]) const field =
: desc(sites[sort_by]) sort_by === "megabytesIn"
? siteBandwidth.megabytesIn
: sort_by === "megabytesOut"
? siteBandwidth.megabytesOut
: sites.name;
return order === "asc" ? asc(field) : desc(field);
})()
: asc(sites.name) : asc(sites.name)
); );

View File

@@ -22,6 +22,7 @@ import m13 from "./scriptsPg/1.15.3";
import m14 from "./scriptsPg/1.15.4"; import m14 from "./scriptsPg/1.15.4";
import m15 from "./scriptsPg/1.16.0"; import m15 from "./scriptsPg/1.16.0";
import m16 from "./scriptsPg/1.17.0"; import m16 from "./scriptsPg/1.17.0";
import m17 from "./scriptsPg/1.18.0";
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER // THIS CANNOT IMPORT ANYTHING FROM THE SERVER
// EXCEPT FOR THE DATABASE AND THE SCHEMA // EXCEPT FOR THE DATABASE AND THE SCHEMA
@@ -43,7 +44,8 @@ const migrations = [
{ version: "1.15.3", run: m13 }, { version: "1.15.3", run: m13 },
{ version: "1.15.4", run: m14 }, { version: "1.15.4", run: m14 },
{ version: "1.16.0", run: m15 }, { version: "1.16.0", run: m15 },
{ version: "1.17.0", run: m16 } { version: "1.17.0", run: m16 },
{ version: "1.18.0", run: m17 }
// Add new migrations here as they are created // Add new migrations here as they are created
] as { ] as {
version: string; version: string;

View File

@@ -40,6 +40,7 @@ import m34 from "./scriptsSqlite/1.15.3";
import m35 from "./scriptsSqlite/1.15.4"; import m35 from "./scriptsSqlite/1.15.4";
import m36 from "./scriptsSqlite/1.16.0"; import m36 from "./scriptsSqlite/1.16.0";
import m37 from "./scriptsSqlite/1.17.0"; import m37 from "./scriptsSqlite/1.17.0";
import m38 from "./scriptsSqlite/1.18.0";
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER // THIS CANNOT IMPORT ANYTHING FROM THE SERVER
// EXCEPT FOR THE DATABASE AND THE SCHEMA // EXCEPT FOR THE DATABASE AND THE SCHEMA
@@ -77,7 +78,8 @@ const migrations = [
{ version: "1.15.3", run: m34 }, { version: "1.15.3", run: m34 },
{ version: "1.15.4", run: m35 }, { version: "1.15.4", run: m35 },
{ version: "1.16.0", run: m36 }, { version: "1.16.0", run: m36 },
{ version: "1.17.0", run: m37 } { version: "1.17.0", run: m37 },
{ version: "1.18.0", run: m38 }
// Add new migrations here as they are created // Add new migrations here as they are created
] as const; ] as const;