Compare commits

..

7 Commits

Author SHA1 Message Date
Milo Schwartz
02dfeed3ce Update README.md 2026-04-13 13:03:53 -04:00
Owen Schwartz
3436105bec Merge pull request #2784 from fosrl/dev
Try to prevent deadlocks
2026-04-03 23:01:09 -04:00
Owen Schwartz
4b3375ab8e Merge pull request #2783 from fosrl/dev
Fix 1.17.0
2026-04-03 22:42:03 -04:00
Owen Schwartz
6ce165bfd5 Merge pull request #2780 from fosrl/dev
1.17.0
2026-04-03 18:19:40 -04:00
Owen Schwartz
035644eaf7 Merge pull request #2778 from fosrl/dev
1.17.0-s.2
2026-04-03 12:35:03 -04:00
Owen Schwartz
16e7233a3e Merge pull request #2777 from fosrl/dev
1.17.0-s.1
2026-04-03 12:19:23 -04:00
Owen Schwartz
1f74e1b320 Merge pull request #2776 from fosrl/dev
1.17.0-s.0
2026-04-03 11:39:35 -04:00
17 changed files with 136 additions and 300 deletions

1
.github/CODEOWNERS vendored
View File

@@ -1 +0,0 @@
* @oschwartz10612 @miloschwartz

View File

@@ -35,19 +35,13 @@
</div> </div>
<p align="center">
<a href="https://docs.pangolin.net/careers/join-us">
<img src="https://img.shields.io/badge/🚀_We're_Hiring!-Join_Our_Team-brightgreen?style=for-the-badge" alt="We're Hiring!" />
</a>
</p>
<p align="center"> <p align="center">
<strong> <strong>
Get started with Pangolin at <a href="https://app.pangolin.net/auth/signup">app.pangolin.net</a> Get started with Pangolin at <a href="https://app.pangolin.net/auth/signup">app.pangolin.net</a>
</strong> </strong>
</p> </p>
Pangolin is an open-source, identity-based remote access platform built on WireGuard that enables secure, seamless connectivity to private and public resources. Pangolin combines reverse proxy and VPN capabilities into one platform, providing browser-based access to web applications and client-based access to any private resources, all with zero-trust security and granular access control. Pangolin is an open-source, identity-based remote access platform built on WireGuard that enables secure, seamless connectivity to private and public resources. Pangolin combines reverse proxy and VPN capabilities into one platform, providing browser-based access to web applications and client-based access to any private resources with NAT traversal, all with granular access controls.
## Installation ## Installation
@@ -60,16 +54,16 @@ Pangolin is an open-source, identity-based remote access platform built on WireG
| <img width=500 /> | Description | | <img width=500 /> | Description |
|-----------------|--------------| |-----------------|--------------|
| **Pangolin Cloud** | Fully managed service with instant setup and pay-as-you-go pricing - no infrastructure required. Or, self-host your own [remote node](https://docs.pangolin.net/manage/remote-node/understanding-nodes) and connect to our control plane. | | **Pangolin Cloud** | Fully managed service - no infrastructure required. |
| **Self-Host: Community Edition** | Free, open source, and licensed under AGPL-3. | | **Self-Host: Community Edition** | Free, open source, and licensed under AGPL-3. |
| **Self-Host: Enterprise Edition** | Licensed under Fossorial Commercial License. Free for personal and hobbyist use, and for businesses earning under \$100K USD annually. | | **Self-Host: Enterprise Edition** | Licensed under Fossorial Commercial License. Free for personal and hobbyist use, and for businesses making less than \$100K USD gross annual revenue. |
## Key Features ## Key Features
| <img width=500 /> | <img width=500 /> | | <img width=500 /> | <img width=500 /> |
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------| |----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------|
| **Connect remote networks with sites**<br /><br />Pangolin's lightweight site connectors create secure tunnels from remote networks without requiring public IP addresses or open ports. Sites make any network anywhere available for authorized access. | <img src="public/screenshots/sites.png" width=500 /><tr></tr> | | **Connect remote networks with sites**<br /><br />Pangolin's site connectors create secure tunnels from remote networks without requiring public IP addresses or open ports. Sites make any network anywhere available for authorized access. | <img src="public/screenshots/sites.png" width=500 /><tr></tr> |
| **Browser-based reverse proxy access**<br /><br />Expose web applications through identity and context-aware tunneled reverse proxies. Pangolin handles routing, load balancing, health checking, and automatic SSL certificates without exposing your network directly to the internet. Users access applications through any web browser with authentication and granular access control. | <img src="public/clip.gif" width=500 /><tr></tr> | | **Browser-based reverse proxy access**<br /><br />Expose web applications through identity and context-aware tunneled reverse proxies. Users access applications through any web browser with authentication and granular access control. Pangolin handles routing, load balancing, health checking, and automatic SSL certificates without exposing your network directly to the internet. | <img src="public/clip.gif" width=500 /><tr></tr> |
| **Client-based private resource access**<br /><br />Access private resources like SSH servers, databases, RDP, and entire network ranges through Pangolin clients. Intelligent NAT traversal enables connections even through restrictive firewalls, while DNS aliases provide friendly names and fast connections to resources across all your sites. | <img src="public/screenshots/private-resources.png" width=500 /><tr></tr> | | **Client-based private resource access**<br /><br />Access private resources like SSH servers, databases, RDP, and entire network ranges through Pangolin clients. Intelligent NAT traversal enables connections even through restrictive firewalls, while DNS aliases provide friendly names and fast connections to resources across all your sites. | <img src="public/screenshots/private-resources.png" width=500 /><tr></tr> |
| **Zero-trust granular access**<br /><br />Grant users access to specific resources, not entire networks. Unlike traditional VPNs that expose full network access, Pangolin's zero-trust model ensures users can only reach the applications and services you explicitly define, reducing security risk and attack surface. | <img src="public/screenshots/user-devices.png" width=500 /><tr></tr> | | **Zero-trust granular access**<br /><br />Grant users access to specific resources, not entire networks. Unlike traditional VPNs that expose full network access, Pangolin's zero-trust model ensures users can only reach the applications and services you explicitly define, reducing security risk and attack surface. | <img src="public/screenshots/user-devices.png" width=500 /><tr></tr> |
@@ -87,7 +81,7 @@ Download the Pangolin client for your platform:
### Sign up now ### Sign up now
Create an account at [app.pangolin.net](https://app.pangolin.net) to get started with Pangolin Cloud. A generous free tier is available. Create a free account at [app.pangolin.net](https://app.pangolin.net) to get started with Pangolin Cloud.
### Check out the docs ### Check out the docs
@@ -102,7 +96,3 @@ Pangolin is dual licensed under the AGPL-3 and the [Fossorial Commercial License
## Contributions ## Contributions
Please see [CONTRIBUTING](./CONTRIBUTING.md) in the repository for guidelines and best practices. Please see [CONTRIBUTING](./CONTRIBUTING.md) in the repository for guidelines and best practices.
---
WireGuard® is a registered trademark of Jason A. Donenfeld.

View File

@@ -89,8 +89,12 @@ export const sites = pgTable("sites", {
name: varchar("name").notNull(), name: varchar("name").notNull(),
pubKey: varchar("pubKey"), pubKey: varchar("pubKey"),
subnet: varchar("subnet"), subnet: varchar("subnet"),
megabytesIn: real("bytesIn").default(0),
megabytesOut: real("bytesOut").default(0),
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
type: varchar("type").notNull(), // "newt" or "wireguard" type: varchar("type").notNull(), // "newt" or "wireguard"
online: boolean("online").notNull().default(false), online: boolean("online").notNull().default(false),
lastPing: integer("lastPing"),
address: varchar("address"), address: varchar("address"),
endpoint: varchar("endpoint"), endpoint: varchar("endpoint"),
publicKey: varchar("publicKey"), publicKey: varchar("publicKey"),
@@ -725,7 +729,10 @@ export const clients = pgTable("clients", {
name: varchar("name").notNull(), name: varchar("name").notNull(),
pubKey: varchar("pubKey"), pubKey: varchar("pubKey"),
subnet: varchar("subnet").notNull(), subnet: varchar("subnet").notNull(),
megabytesIn: real("bytesIn"),
megabytesOut: real("bytesOut"),
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
lastPing: integer("lastPing"),
type: varchar("type").notNull(), // "olm" type: varchar("type").notNull(), // "olm"
online: boolean("online").notNull().default(false), online: boolean("online").notNull().default(false),
// endpoint: varchar("endpoint"), // endpoint: varchar("endpoint"),
@@ -738,42 +745,6 @@ export const clients = pgTable("clients", {
>() >()
}); });
export const sitePing = pgTable("sitePing", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const siteBandwidth = pgTable("siteBandwidth", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
megabytesIn: real("bytesIn").default(0),
megabytesOut: real("bytesOut").default(0),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientPing = pgTable("clientPing", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const clientBandwidth = pgTable("clientBandwidth", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
megabytesIn: real("bytesIn"),
megabytesOut: real("bytesOut"),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientSitesAssociationsCache = pgTable( export const clientSitesAssociationsCache = pgTable(
"clientSitesAssociationsCache", "clientSitesAssociationsCache",
{ {
@@ -1135,7 +1106,3 @@ export type RequestAuditLog = InferSelectModel<typeof requestAuditLog>;
export type RoundTripMessageTracker = InferSelectModel< export type RoundTripMessageTracker = InferSelectModel<
typeof roundTripMessageTracker typeof roundTripMessageTracker
>; >;
export type SitePing = typeof sitePing.$inferSelect;
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
export type ClientPing = typeof clientPing.$inferSelect;
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;

View File

@@ -95,8 +95,12 @@ export const sites = sqliteTable("sites", {
name: text("name").notNull(), name: text("name").notNull(),
pubKey: text("pubKey"), pubKey: text("pubKey"),
subnet: text("subnet"), subnet: text("subnet"),
megabytesIn: integer("bytesIn").default(0),
megabytesOut: integer("bytesOut").default(0),
lastBandwidthUpdate: text("lastBandwidthUpdate"),
type: text("type").notNull(), // "newt" or "wireguard" type: text("type").notNull(), // "newt" or "wireguard"
online: integer("online", { mode: "boolean" }).notNull().default(false), online: integer("online", { mode: "boolean" }).notNull().default(false),
lastPing: integer("lastPing"),
// exit node stuff that is how to connect to the site when it has a wg server // exit node stuff that is how to connect to the site when it has a wg server
address: text("address"), // this is the address of the wireguard interface in newt address: text("address"), // this is the address of the wireguard interface in newt
@@ -395,7 +399,10 @@ export const clients = sqliteTable("clients", {
pubKey: text("pubKey"), pubKey: text("pubKey"),
olmId: text("olmId"), // to lock it to a specific olm optionally olmId: text("olmId"), // to lock it to a specific olm optionally
subnet: text("subnet").notNull(), subnet: text("subnet").notNull(),
megabytesIn: integer("bytesIn"),
megabytesOut: integer("bytesOut"),
lastBandwidthUpdate: text("lastBandwidthUpdate"),
lastPing: integer("lastPing"),
type: text("type").notNull(), // "olm" type: text("type").notNull(), // "olm"
online: integer("online", { mode: "boolean" }).notNull().default(false), online: integer("online", { mode: "boolean" }).notNull().default(false),
// endpoint: text("endpoint"), // endpoint: text("endpoint"),
@@ -407,42 +414,6 @@ export const clients = sqliteTable("clients", {
>() >()
}); });
export const sitePing = sqliteTable("sitePing", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const siteBandwidth = sqliteTable("siteBandwidth", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
megabytesIn: integer("bytesIn").default(0),
megabytesOut: integer("bytesOut").default(0),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientPing = sqliteTable("clientPing", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const clientBandwidth = sqliteTable("clientBandwidth", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
megabytesIn: integer("bytesIn"),
megabytesOut: integer("bytesOut"),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientSitesAssociationsCache = sqliteTable( export const clientSitesAssociationsCache = sqliteTable(
"clientSitesAssociationsCache", "clientSitesAssociationsCache",
{ {
@@ -1238,7 +1209,3 @@ export type DeviceWebAuthCode = InferSelectModel<typeof deviceWebAuthCodes>;
export type RoundTripMessageTracker = InferSelectModel< export type RoundTripMessageTracker = InferSelectModel<
typeof roundTripMessageTracker typeof roundTripMessageTracker
>; >;
export type SitePing = typeof sitePing.$inferSelect;
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
export type ClientPing = typeof clientPing.$inferSelect;
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;

View File

@@ -3,7 +3,7 @@ import config from "./config";
import { getHostMeta } from "./hostMeta"; import { getHostMeta } from "./hostMeta";
import logger from "@server/logger"; import logger from "@server/logger";
import { apiKeys, db, roles, siteResources } from "@server/db"; import { apiKeys, db, roles, siteResources } from "@server/db";
import { sites, users, orgs, resources, clients, idp, siteBandwidth } from "@server/db"; import { sites, users, orgs, resources, clients, idp } from "@server/db";
import { eq, count, notInArray, and, isNotNull, isNull } from "drizzle-orm"; import { eq, count, notInArray, and, isNotNull, isNull } from "drizzle-orm";
import { APP_VERSION } from "./consts"; import { APP_VERSION } from "./consts";
import crypto from "crypto"; import crypto from "crypto";
@@ -150,13 +150,12 @@ class TelemetryClient {
const siteDetails = await db const siteDetails = await db
.select({ .select({
siteName: sites.name, siteName: sites.name,
megabytesIn: siteBandwidth.megabytesIn, megabytesIn: sites.megabytesIn,
megabytesOut: siteBandwidth.megabytesOut, megabytesOut: sites.megabytesOut,
type: sites.type, type: sites.type,
online: sites.online online: sites.online
}) })
.from(sites) .from(sites);
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
const supporterKey = config.getSupporterData(); const supporterKey = config.getSupporterData();

View File

@@ -18,11 +18,10 @@ import {
subscriptionItems, subscriptionItems,
usage, usage,
sites, sites,
siteBandwidth,
customers, customers,
orgs orgs
} from "@server/db"; } from "@server/db";
import { eq, and, inArray } from "drizzle-orm"; import { eq, and } from "drizzle-orm";
import logger from "@server/logger"; import logger from "@server/logger";
import { getFeatureIdByMetricId, getFeatureIdByPriceId } from "@server/lib/billing/features"; import { getFeatureIdByMetricId, getFeatureIdByPriceId } from "@server/lib/billing/features";
import stripe from "#private/lib/stripe"; import stripe from "#private/lib/stripe";
@@ -254,19 +253,14 @@ export async function handleSubscriptionUpdated(
); );
} }
// Also reset the site bandwidth to 0 // Also reset the sites to 0
await trx await trx
.update(siteBandwidth) .update(sites)
.set({ .set({
megabytesIn: 0, megabytesIn: 0,
megabytesOut: 0 megabytesOut: 0
}) })
.where( .where(eq(sites.orgId, orgId));
inArray(
siteBandwidth.siteId,
trx.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
)
);
}); });
} }
} }

View File

@@ -1,5 +1,4 @@
import { import {
clientBandwidth,
clients, clients,
clientSitesAssociationsCache, clientSitesAssociationsCache,
currentFingerprint, currentFingerprint,
@@ -181,8 +180,8 @@ function queryClientsBase() {
name: clients.name, name: clients.name,
pubKey: clients.pubKey, pubKey: clients.pubKey,
subnet: clients.subnet, subnet: clients.subnet,
megabytesIn: clientBandwidth.megabytesIn, megabytesIn: clients.megabytesIn,
megabytesOut: clientBandwidth.megabytesOut, megabytesOut: clients.megabytesOut,
orgName: orgs.name, orgName: orgs.name,
type: clients.type, type: clients.type,
online: clients.online, online: clients.online,
@@ -201,8 +200,7 @@ function queryClientsBase() {
.leftJoin(orgs, eq(clients.orgId, orgs.orgId)) .leftJoin(orgs, eq(clients.orgId, orgs.orgId))
.leftJoin(olms, eq(clients.clientId, olms.clientId)) .leftJoin(olms, eq(clients.clientId, olms.clientId))
.leftJoin(users, eq(clients.userId, users.userId)) .leftJoin(users, eq(clients.userId, users.userId))
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId)) .leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId));
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
} }
async function getSiteAssociations(clientIds: number[]) { async function getSiteAssociations(clientIds: number[]) {
@@ -369,15 +367,9 @@ export async function listClients(
.offset(pageSize * (page - 1)) .offset(pageSize * (page - 1))
.orderBy( .orderBy(
sort_by sort_by
? (() => { ? order === "asc"
const field = ? asc(clients[sort_by])
sort_by === "megabytesIn" : desc(clients[sort_by])
? clientBandwidth.megabytesIn
: sort_by === "megabytesOut"
? clientBandwidth.megabytesOut
: clients.name;
return order === "asc" ? asc(field) : desc(field);
})()
: asc(clients.name) : asc(clients.name)
); );

View File

@@ -1,6 +1,5 @@
import { build } from "@server/build"; import { build } from "@server/build";
import { import {
clientBandwidth,
clients, clients,
currentFingerprint, currentFingerprint,
db, db,
@@ -212,8 +211,8 @@ function queryUserDevicesBase() {
name: clients.name, name: clients.name,
pubKey: clients.pubKey, pubKey: clients.pubKey,
subnet: clients.subnet, subnet: clients.subnet,
megabytesIn: clientBandwidth.megabytesIn, megabytesIn: clients.megabytesIn,
megabytesOut: clientBandwidth.megabytesOut, megabytesOut: clients.megabytesOut,
orgName: orgs.name, orgName: orgs.name,
type: clients.type, type: clients.type,
online: clients.online, online: clients.online,
@@ -240,8 +239,7 @@ function queryUserDevicesBase() {
.leftJoin(orgs, eq(clients.orgId, orgs.orgId)) .leftJoin(orgs, eq(clients.orgId, orgs.orgId))
.leftJoin(olms, eq(clients.clientId, olms.clientId)) .leftJoin(olms, eq(clients.clientId, olms.clientId))
.leftJoin(users, eq(clients.userId, users.userId)) .leftJoin(users, eq(clients.userId, users.userId))
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId)) .leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId));
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
} }
type OlmWithUpdateAvailable = Awaited< type OlmWithUpdateAvailable = Awaited<
@@ -429,15 +427,9 @@ export async function listUserDevices(
.offset(pageSize * (page - 1)) .offset(pageSize * (page - 1))
.orderBy( .orderBy(
sort_by sort_by
? (() => { ? order === "asc"
const field = ? asc(clients[sort_by])
sort_by === "megabytesIn" : desc(clients[sort_by])
? clientBandwidth.megabytesIn
: sort_by === "megabytesOut"
? clientBandwidth.megabytesOut
: clients.name;
return order === "asc" ? asc(field) : desc(field);
})()
: asc(clients.clientId) : asc(clients.clientId)
); );

View File

@@ -122,7 +122,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
const snapshot = accumulator; const snapshot = accumulator;
accumulator = new Map<string, AccumulatorEntry>(); accumulator = new Map<string, AccumulatorEntry>();
const currentEpoch = Math.floor(Date.now() / 1000); const currentTime = new Date().toISOString();
// Sort by publicKey for consistent lock ordering across concurrent // Sort by publicKey for consistent lock ordering across concurrent
// writers — deadlock-prevention strategy. // writers — deadlock-prevention strategy.
@@ -157,52 +157,33 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
orgId: string; orgId: string;
pubKey: string; pubKey: string;
}>(sql` }>(sql`
WITH upsert AS ( UPDATE sites
INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate") SET
SELECT s."siteId", ${bytesIn}, ${bytesOut}, ${currentEpoch} "bytesOut" = COALESCE("bytesOut", 0) + ${bytesIn},
FROM "sites" s WHERE s."pubKey" = ${publicKey} "bytesIn" = COALESCE("bytesIn", 0) + ${bytesOut},
ON CONFLICT ("siteId") DO UPDATE SET "lastBandwidthUpdate" = ${currentTime}
"bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn", WHERE "pubKey" = ${publicKey}
"bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut", RETURNING "orgId", "pubKey"
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
RETURNING "siteId"
)
SELECT u."siteId", s."orgId", s."pubKey"
FROM upsert u
INNER JOIN "sites" s ON s."siteId" = u."siteId"
`); `);
results.push(...result); results.push(...result);
} }
return results; return results;
} }
// PostgreSQL: batch UPSERT via CTE — single round-trip per chunk. // PostgreSQL: batch UPDATE … FROM (VALUES …) — single round-trip per chunk.
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) => const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)` sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)`
); );
const valuesClause = sql.join(valuesList, sql`, `); const valuesClause = sql.join(valuesList, sql`, `);
return dbQueryRows<{ orgId: string; pubKey: string }>(sql` return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
WITH vals(pub_key, bytes_in, bytes_out) AS ( UPDATE sites
VALUES ${valuesClause} SET
), "bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in,
site_lookup AS ( "bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out,
SELECT s."siteId", s."orgId", s."pubKey", v.bytes_in, v.bytes_out "lastBandwidthUpdate" = ${currentTime}
FROM vals v FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out)
INNER JOIN "sites" s ON s."pubKey" = v.pub_key WHERE sites."pubKey" = v.pub_key
), RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey"
upsert AS (
INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate")
SELECT sl."siteId", sl.bytes_in, sl.bytes_out, ${currentEpoch}::integer
FROM site_lookup sl
ON CONFLICT ("siteId") DO UPDATE SET
"bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn",
"bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut",
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
RETURNING "siteId"
)
SELECT u."siteId", s."orgId", s."pubKey"
FROM upsert u
INNER JOIN "sites" s ON s."siteId" = u."siteId"
`); `);
}, `flush bandwidth chunk [${i}${chunkEnd}]`); }, `flush bandwidth chunk [${i}${chunkEnd}]`);
} catch (error) { } catch (error) {

View File

@@ -1,11 +1,11 @@
import { db, newts, sites, targetHealthCheck, targets, sitePing, siteBandwidth } from "@server/db"; import { db, newts, sites, targetHealthCheck, targets } from "@server/db";
import { import {
hasActiveConnections, hasActiveConnections,
getClientConfigVersion getClientConfigVersion
} from "#dynamic/routers/ws"; } from "#dynamic/routers/ws";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { Newt } from "@server/db"; import { Newt } from "@server/db";
import { eq, lt, isNull, and, or, ne } from "drizzle-orm"; import { eq, lt, isNull, and, or, ne, not } from "drizzle-orm";
import logger from "@server/logger"; import logger from "@server/logger";
import { sendNewtSyncMessage } from "./sync"; import { sendNewtSyncMessage } from "./sync";
import { recordPing } from "./pingAccumulator"; import { recordPing } from "./pingAccumulator";
@@ -41,18 +41,17 @@ export const startNewtOfflineChecker = (): void => {
.select({ .select({
siteId: sites.siteId, siteId: sites.siteId,
newtId: newts.newtId, newtId: newts.newtId,
lastPing: sitePing.lastPing lastPing: sites.lastPing
}) })
.from(sites) .from(sites)
.innerJoin(newts, eq(newts.siteId, sites.siteId)) .innerJoin(newts, eq(newts.siteId, sites.siteId))
.leftJoin(sitePing, eq(sitePing.siteId, sites.siteId))
.where( .where(
and( and(
eq(sites.online, true), eq(sites.online, true),
eq(sites.type, "newt"), eq(sites.type, "newt"),
or( or(
lt(sitePing.lastPing, twoMinutesAgo), lt(sites.lastPing, twoMinutesAgo),
isNull(sitePing.lastPing) isNull(sites.lastPing)
) )
) )
); );
@@ -113,11 +112,15 @@ export const startNewtOfflineChecker = (): void => {
.select({ .select({
siteId: sites.siteId, siteId: sites.siteId,
online: sites.online, online: sites.online,
lastBandwidthUpdate: siteBandwidth.lastBandwidthUpdate lastBandwidthUpdate: sites.lastBandwidthUpdate
}) })
.from(sites) .from(sites)
.innerJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId)) .where(
.where(eq(sites.type, "wireguard")); and(
eq(sites.type, "wireguard"),
not(isNull(sites.lastBandwidthUpdate))
)
);
const wireguardOfflineThreshold = Math.floor( const wireguardOfflineThreshold = Math.floor(
(Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000 (Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000
@@ -125,7 +128,12 @@ export const startNewtOfflineChecker = (): void => {
// loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline // loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline
for (const site of allWireguardSites) { for (const site of allWireguardSites) {
if ((site.lastBandwidthUpdate ?? 0) < wireguardOfflineThreshold && site.online) { const lastBandwidthUpdate =
new Date(site.lastBandwidthUpdate!).getTime() / 1000;
if (
lastBandwidthUpdate < wireguardOfflineThreshold &&
site.online
) {
logger.info( logger.info(
`Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes` `Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes`
); );
@@ -134,7 +142,10 @@ export const startNewtOfflineChecker = (): void => {
.update(sites) .update(sites)
.set({ online: false }) .set({ online: false })
.where(eq(sites.siteId, site.siteId)); .where(eq(sites.siteId, site.siteId));
} else if ((site.lastBandwidthUpdate ?? 0) >= wireguardOfflineThreshold && !site.online) { } else if (
lastBandwidthUpdate >= wireguardOfflineThreshold &&
!site.online
) {
logger.info( logger.info(
`Marking wireguard site ${site.siteId} online: recent bandwidth update` `Marking wireguard site ${site.siteId} online: recent bandwidth update`
); );

View File

@@ -1,5 +1,6 @@
import { db, clients, clientBandwidth } from "@server/db"; import { db } from "@server/db";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { clients } from "@server/db";
import { eq, sql } from "drizzle-orm"; import { eq, sql } from "drizzle-orm";
import logger from "@server/logger"; import logger from "@server/logger";
@@ -84,7 +85,7 @@ export async function flushBandwidthToDb(): Promise<void> {
const snapshot = accumulator; const snapshot = accumulator;
accumulator = new Map<string, BandwidthAccumulator>(); accumulator = new Map<string, BandwidthAccumulator>();
const currentEpoch = Math.floor(Date.now() / 1000); const currentTime = new Date().toISOString();
// Sort by publicKey for consistent lock ordering across concurrent // Sort by publicKey for consistent lock ordering across concurrent
// writers — this is the same deadlock-prevention strategy used in the // writers — this is the same deadlock-prevention strategy used in the
@@ -100,37 +101,19 @@ export async function flushBandwidthToDb(): Promise<void> {
for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) { for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) {
try { try {
await withDeadlockRetry(async () => { await withDeadlockRetry(async () => {
// Find clientId by pubKey // Use atomic SQL increment to avoid the SELECT-then-UPDATE
const [clientRow] = await db // anti-pattern and the races it would introduce.
.select({ clientId: clients.clientId })
.from(clients)
.where(eq(clients.pubKey, publicKey))
.limit(1);
if (!clientRow) {
logger.warn(`No client found for pubKey ${publicKey}, skipping`);
return;
}
await db await db
.insert(clientBandwidth) .update(clients)
.values({ .set({
clientId: clientRow.clientId,
// Note: bytesIn from peer goes to megabytesOut (data // Note: bytesIn from peer goes to megabytesOut (data
// sent to client) and bytesOut from peer goes to // sent to client) and bytesOut from peer goes to
// megabytesIn (data received from client). // megabytesIn (data received from client).
megabytesOut: bytesIn, megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
megabytesIn: bytesOut, megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
lastBandwidthUpdate: currentEpoch lastBandwidthUpdate: currentTime
}) })
.onConflictDoUpdate({ .where(eq(clients.pubKey, publicKey));
target: clientBandwidth.clientId,
set: {
megabytesOut: sql`COALESCE(${clientBandwidth.megabytesOut}, 0) + ${bytesIn}`,
megabytesIn: sql`COALESCE(${clientBandwidth.megabytesIn}, 0) + ${bytesOut}`,
lastBandwidthUpdate: currentEpoch
}
});
}, `flush bandwidth for client ${publicKey}`); }, `flush bandwidth for client ${publicKey}`);
} catch (error) { } catch (error) {
logger.error( logger.error(

View File

@@ -1,6 +1,6 @@
import { db } from "@server/db"; import { db } from "@server/db";
import { sites, clients, olms, sitePing, clientPing } from "@server/db"; import { sites, clients, olms } from "@server/db";
import { inArray, sql } from "drizzle-orm"; import { inArray } from "drizzle-orm";
import logger from "@server/logger"; import logger from "@server/logger";
/** /**
@@ -81,8 +81,11 @@ export function recordClientPing(
/** /**
* Flush all accumulated site pings to the database. * Flush all accumulated site pings to the database.
* *
* For each batch: first upserts individual per-site timestamps into * Each batch of up to BATCH_SIZE rows is written with a **single** UPDATE
* `sitePing`, then bulk-updates `sites.online = true`. * statement. We use the maximum timestamp across the batch so that `lastPing`
* reflects the most recent ping seen for any site in the group. This avoids
* the multi-statement transaction that previously created additional
* row-lock ordering hazards.
*/ */
async function flushSitePingsToDb(): Promise<void> { async function flushSitePingsToDb(): Promise<void> {
if (pendingSitePings.size === 0) { if (pendingSitePings.size === 0) {
@@ -100,25 +103,20 @@ async function flushSitePingsToDb(): Promise<void> {
for (let i = 0; i < entries.length; i += BATCH_SIZE) { for (let i = 0; i < entries.length; i += BATCH_SIZE) {
const batch = entries.slice(i, i + BATCH_SIZE); const batch = entries.slice(i, i + BATCH_SIZE);
// Use the latest timestamp in the batch so that `lastPing` always
// moves forward. Using a single timestamp for the whole batch means
// we only ever need one UPDATE statement (no transaction).
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
const siteIds = batch.map(([id]) => id); const siteIds = batch.map(([id]) => id);
try { try {
await withRetry(async () => { await withRetry(async () => {
const rows = batch.map(([siteId, ts]) => ({ siteId, lastPing: ts }));
// Step 1: Upsert ping timestamps into sitePing
await db
.insert(sitePing)
.values(rows)
.onConflictDoUpdate({
target: sitePing.siteId,
set: { lastPing: sql`excluded."lastPing"` }
});
// Step 2: Update online status on sites
await db await db
.update(sites) .update(sites)
.set({ online: true }) .set({
online: true,
lastPing: maxTimestamp
})
.where(inArray(sites.siteId, siteIds)); .where(inArray(sites.siteId, siteIds));
}, "flushSitePingsToDb"); }, "flushSitePingsToDb");
} catch (error) { } catch (error) {
@@ -141,8 +139,7 @@ async function flushSitePingsToDb(): Promise<void> {
/** /**
* Flush all accumulated client (OLM) pings to the database. * Flush all accumulated client (OLM) pings to the database.
* *
* For each batch: first upserts individual per-client timestamps into * Same single-UPDATE-per-batch approach as `flushSitePingsToDb`.
* `clientPing`, then bulk-updates `clients.online = true, archived = false`.
*/ */
async function flushClientPingsToDb(): Promise<void> { async function flushClientPingsToDb(): Promise<void> {
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) { if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
@@ -164,25 +161,18 @@ async function flushClientPingsToDb(): Promise<void> {
for (let i = 0; i < entries.length; i += BATCH_SIZE) { for (let i = 0; i < entries.length; i += BATCH_SIZE) {
const batch = entries.slice(i, i + BATCH_SIZE); const batch = entries.slice(i, i + BATCH_SIZE);
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
const clientIds = batch.map(([id]) => id); const clientIds = batch.map(([id]) => id);
try { try {
await withRetry(async () => { await withRetry(async () => {
const rows = batch.map(([clientId, ts]) => ({ clientId, lastPing: ts }));
// Step 1: Upsert ping timestamps into clientPing
await db
.insert(clientPing)
.values(rows)
.onConflictDoUpdate({
target: clientPing.clientId,
set: { lastPing: sql`excluded."lastPing"` }
});
// Step 2: Update online + unarchive on clients
await db await db
.update(clients) .update(clients)
.set({ online: true, archived: false }) .set({
lastPing: maxTimestamp,
online: true,
archived: false
})
.where(inArray(clients.clientId, clientIds)); .where(inArray(clients.clientId, clientIds));
}, "flushClientPingsToDb"); }, "flushClientPingsToDb");
} catch (error) { } catch (error) {

View File

@@ -1,8 +1,8 @@
import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws"; import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws";
import { db } from "@server/db"; import { db } from "@server/db";
import { MessageHandler } from "@server/routers/ws"; import { MessageHandler } from "@server/routers/ws";
import { clients, olms, Olm, clientPing } from "@server/db"; import { clients, olms, Olm } from "@server/db";
import { eq, lt, isNull, and, or, inArray } from "drizzle-orm"; import { eq, lt, isNull, and, or } from "drizzle-orm";
import { recordClientPing } from "@server/routers/newt/pingAccumulator"; import { recordClientPing } from "@server/routers/newt/pingAccumulator";
import logger from "@server/logger"; import logger from "@server/logger";
import { validateSessionToken } from "@server/auth/sessions/app"; import { validateSessionToken } from "@server/auth/sessions/app";
@@ -37,33 +37,21 @@ export const startOlmOfflineChecker = (): void => {
// TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING // TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING
// Find clients that haven't pinged in the last 2 minutes and mark them as offline // Find clients that haven't pinged in the last 2 minutes and mark them as offline
const staleClientRows = await db const offlineClients = await db
.select({ .update(clients)
clientId: clients.clientId, .set({ online: false })
olmId: clients.olmId,
lastPing: clientPing.lastPing
})
.from(clients)
.leftJoin(clientPing, eq(clientPing.clientId, clients.clientId))
.where( .where(
and( and(
eq(clients.online, true), eq(clients.online, true),
or( or(
lt(clientPing.lastPing, twoMinutesAgo), lt(clients.lastPing, twoMinutesAgo),
isNull(clientPing.lastPing) isNull(clients.lastPing)
) )
) )
); )
.returning();
if (staleClientRows.length > 0) { for (const offlineClient of offlineClients) {
const staleClientIds = staleClientRows.map((c) => c.clientId);
await db
.update(clients)
.set({ online: false })
.where(inArray(clients.clientId, staleClientIds));
}
for (const offlineClient of staleClientRows) {
logger.info( logger.info(
`Kicking offline olm client ${offlineClient.clientId} due to inactivity` `Kicking offline olm client ${offlineClient.clientId} due to inactivity`
); );

View File

@@ -1,7 +1,7 @@
import { NextFunction, Request, Response } from "express"; import { NextFunction, Request, Response } from "express";
import { z } from "zod"; import { z } from "zod";
import { db, sites, siteBandwidth } from "@server/db"; import { db, sites } from "@server/db";
import { eq, inArray } from "drizzle-orm"; import { eq } from "drizzle-orm";
import response from "@server/lib/response"; import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode"; import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors"; import createHttpError from "http-errors";
@@ -60,17 +60,12 @@ export async function resetOrgBandwidth(
} }
await db await db
.update(siteBandwidth) .update(sites)
.set({ .set({
megabytesIn: 0, megabytesIn: 0,
megabytesOut: 0 megabytesOut: 0
}) })
.where( .where(eq(sites.orgId, orgId));
inArray(
siteBandwidth.siteId,
db.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
)
);
return response(res, { return response(res, {
data: {}, data: {},

View File

@@ -6,7 +6,6 @@ import {
remoteExitNodes, remoteExitNodes,
roleSites, roleSites,
sites, sites,
siteBandwidth,
userSites userSites
} from "@server/db"; } from "@server/db";
import cache from "#dynamic/lib/cache"; import cache from "#dynamic/lib/cache";
@@ -156,8 +155,8 @@ function querySitesBase() {
name: sites.name, name: sites.name,
pubKey: sites.pubKey, pubKey: sites.pubKey,
subnet: sites.subnet, subnet: sites.subnet,
megabytesIn: siteBandwidth.megabytesIn, megabytesIn: sites.megabytesIn,
megabytesOut: siteBandwidth.megabytesOut, megabytesOut: sites.megabytesOut,
orgName: orgs.name, orgName: orgs.name,
type: sites.type, type: sites.type,
online: sites.online, online: sites.online,
@@ -176,8 +175,7 @@ function querySitesBase() {
.leftJoin( .leftJoin(
remoteExitNodes, remoteExitNodes,
eq(remoteExitNodes.exitNodeId, sites.exitNodeId) eq(remoteExitNodes.exitNodeId, sites.exitNodeId)
) );
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
} }
type SiteWithUpdateAvailable = Awaited<ReturnType<typeof querySitesBase>>[0] & { type SiteWithUpdateAvailable = Awaited<ReturnType<typeof querySitesBase>>[0] & {
@@ -301,15 +299,9 @@ export async function listSites(
.offset(pageSize * (page - 1)) .offset(pageSize * (page - 1))
.orderBy( .orderBy(
sort_by sort_by
? (() => { ? order === "asc"
const field = ? asc(sites[sort_by])
sort_by === "megabytesIn" : desc(sites[sort_by])
? siteBandwidth.megabytesIn
: sort_by === "megabytesOut"
? siteBandwidth.megabytesOut
: sites.name;
return order === "asc" ? asc(field) : desc(field);
})()
: asc(sites.name) : asc(sites.name)
); );

View File

@@ -22,7 +22,6 @@ import m13 from "./scriptsPg/1.15.3";
import m14 from "./scriptsPg/1.15.4"; import m14 from "./scriptsPg/1.15.4";
import m15 from "./scriptsPg/1.16.0"; import m15 from "./scriptsPg/1.16.0";
import m16 from "./scriptsPg/1.17.0"; import m16 from "./scriptsPg/1.17.0";
import m17 from "./scriptsPg/1.18.0";
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER // THIS CANNOT IMPORT ANYTHING FROM THE SERVER
// EXCEPT FOR THE DATABASE AND THE SCHEMA // EXCEPT FOR THE DATABASE AND THE SCHEMA
@@ -44,8 +43,7 @@ const migrations = [
{ version: "1.15.3", run: m13 }, { version: "1.15.3", run: m13 },
{ version: "1.15.4", run: m14 }, { version: "1.15.4", run: m14 },
{ version: "1.16.0", run: m15 }, { version: "1.16.0", run: m15 },
{ version: "1.17.0", run: m16 }, { version: "1.17.0", run: m16 }
{ version: "1.18.0", run: m17 }
// Add new migrations here as they are created // Add new migrations here as they are created
] as { ] as {
version: string; version: string;

View File

@@ -40,7 +40,6 @@ import m34 from "./scriptsSqlite/1.15.3";
import m35 from "./scriptsSqlite/1.15.4"; import m35 from "./scriptsSqlite/1.15.4";
import m36 from "./scriptsSqlite/1.16.0"; import m36 from "./scriptsSqlite/1.16.0";
import m37 from "./scriptsSqlite/1.17.0"; import m37 from "./scriptsSqlite/1.17.0";
import m38 from "./scriptsSqlite/1.18.0";
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER // THIS CANNOT IMPORT ANYTHING FROM THE SERVER
// EXCEPT FOR THE DATABASE AND THE SCHEMA // EXCEPT FOR THE DATABASE AND THE SCHEMA
@@ -78,8 +77,7 @@ const migrations = [
{ version: "1.15.3", run: m34 }, { version: "1.15.3", run: m34 },
{ version: "1.15.4", run: m35 }, { version: "1.15.4", run: m35 },
{ version: "1.16.0", run: m36 }, { version: "1.16.0", run: m36 },
{ version: "1.17.0", run: m37 }, { version: "1.17.0", run: m37 }
{ version: "1.18.0", run: m38 }
// Add new migrations here as they are created // Add new migrations here as they are created
] as const; ] as const;