Compare commits

..

7 Commits

Author SHA1 Message Date
Milo Schwartz
02dfeed3ce Update README.md 2026-04-13 13:03:53 -04:00
Owen Schwartz
3436105bec Merge pull request #2784 from fosrl/dev
Try to prevent deadlocks
2026-04-03 23:01:09 -04:00
Owen Schwartz
4b3375ab8e Merge pull request #2783 from fosrl/dev
Fix 1.17.0
2026-04-03 22:42:03 -04:00
Owen Schwartz
6ce165bfd5 Merge pull request #2780 from fosrl/dev
1.17.0
2026-04-03 18:19:40 -04:00
Owen Schwartz
035644eaf7 Merge pull request #2778 from fosrl/dev
1.17.0-s.2
2026-04-03 12:35:03 -04:00
Owen Schwartz
16e7233a3e Merge pull request #2777 from fosrl/dev
1.17.0-s.1
2026-04-03 12:19:23 -04:00
Owen Schwartz
1f74e1b320 Merge pull request #2776 from fosrl/dev
1.17.0-s.0
2026-04-03 11:39:35 -04:00
17 changed files with 136 additions and 300 deletions

1
.github/CODEOWNERS vendored
View File

@@ -1 +0,0 @@
* @oschwartz10612 @miloschwartz

View File

@@ -35,19 +35,13 @@
</div>
<p align="center">
<a href="https://docs.pangolin.net/careers/join-us">
<img src="https://img.shields.io/badge/🚀_We're_Hiring!-Join_Our_Team-brightgreen?style=for-the-badge" alt="We're Hiring!" />
</a>
</p>
<p align="center">
<strong>
Get started with Pangolin at <a href="https://app.pangolin.net/auth/signup">app.pangolin.net</a>
</strong>
</p>
Pangolin is an open-source, identity-based remote access platform built on WireGuard that enables secure, seamless connectivity to private and public resources. Pangolin combines reverse proxy and VPN capabilities into one platform, providing browser-based access to web applications and client-based access to any private resources, all with zero-trust security and granular access control.
Pangolin is an open-source, identity-based remote access platform built on WireGuard that enables secure, seamless connectivity to private and public resources. Pangolin combines reverse proxy and VPN capabilities into one platform, providing browser-based access to web applications and client-based access to any private resources with NAT traversal, all with granular access controls.
## Installation
@@ -60,16 +54,16 @@ Pangolin is an open-source, identity-based remote access platform built on WireG
| <img width=500 /> | Description |
|-----------------|--------------|
| **Pangolin Cloud** | Fully managed service with instant setup and pay-as-you-go pricing - no infrastructure required. Or, self-host your own [remote node](https://docs.pangolin.net/manage/remote-node/understanding-nodes) and connect to our control plane. |
| **Pangolin Cloud** | Fully managed service - no infrastructure required. |
| **Self-Host: Community Edition** | Free, open source, and licensed under AGPL-3. |
| **Self-Host: Enterprise Edition** | Licensed under Fossorial Commercial License. Free for personal and hobbyist use, and for businesses earning under \$100K USD annually. |
| **Self-Host: Enterprise Edition** | Licensed under Fossorial Commercial License. Free for personal and hobbyist use, and for businesses making less than \$100K USD gross annual revenue. |
## Key Features
| <img width=500 /> | <img width=500 /> |
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------|
| **Connect remote networks with sites**<br /><br />Pangolin's lightweight site connectors create secure tunnels from remote networks without requiring public IP addresses or open ports. Sites make any network anywhere available for authorized access. | <img src="public/screenshots/sites.png" width=500 /><tr></tr> |
| **Browser-based reverse proxy access**<br /><br />Expose web applications through identity and context-aware tunneled reverse proxies. Pangolin handles routing, load balancing, health checking, and automatic SSL certificates without exposing your network directly to the internet. Users access applications through any web browser with authentication and granular access control. | <img src="public/clip.gif" width=500 /><tr></tr> |
| **Connect remote networks with sites**<br /><br />Pangolin's site connectors create secure tunnels from remote networks without requiring public IP addresses or open ports. Sites make any network anywhere available for authorized access. | <img src="public/screenshots/sites.png" width=500 /><tr></tr> |
| **Browser-based reverse proxy access**<br /><br />Expose web applications through identity and context-aware tunneled reverse proxies. Users access applications through any web browser with authentication and granular access control. Pangolin handles routing, load balancing, health checking, and automatic SSL certificates without exposing your network directly to the internet. | <img src="public/clip.gif" width=500 /><tr></tr> |
| **Client-based private resource access**<br /><br />Access private resources like SSH servers, databases, RDP, and entire network ranges through Pangolin clients. Intelligent NAT traversal enables connections even through restrictive firewalls, while DNS aliases provide friendly names and fast connections to resources across all your sites. | <img src="public/screenshots/private-resources.png" width=500 /><tr></tr> |
| **Zero-trust granular access**<br /><br />Grant users access to specific resources, not entire networks. Unlike traditional VPNs that expose full network access, Pangolin's zero-trust model ensures users can only reach the applications and services you explicitly define, reducing security risk and attack surface. | <img src="public/screenshots/user-devices.png" width=500 /><tr></tr> |
@@ -87,7 +81,7 @@ Download the Pangolin client for your platform:
### Sign up now
Create an account at [app.pangolin.net](https://app.pangolin.net) to get started with Pangolin Cloud. A generous free tier is available.
Create a free account at [app.pangolin.net](https://app.pangolin.net) to get started with Pangolin Cloud.
### Check out the docs
@@ -102,7 +96,3 @@ Pangolin is dual licensed under the AGPL-3 and the [Fossorial Commercial License
## Contributions
Please see [CONTRIBUTING](./CONTRIBUTING.md) in the repository for guidelines and best practices.
---
WireGuard® is a registered trademark of Jason A. Donenfeld.

View File

@@ -89,8 +89,12 @@ export const sites = pgTable("sites", {
name: varchar("name").notNull(),
pubKey: varchar("pubKey"),
subnet: varchar("subnet"),
megabytesIn: real("bytesIn").default(0),
megabytesOut: real("bytesOut").default(0),
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
type: varchar("type").notNull(), // "newt" or "wireguard"
online: boolean("online").notNull().default(false),
lastPing: integer("lastPing"),
address: varchar("address"),
endpoint: varchar("endpoint"),
publicKey: varchar("publicKey"),
@@ -725,7 +729,10 @@ export const clients = pgTable("clients", {
name: varchar("name").notNull(),
pubKey: varchar("pubKey"),
subnet: varchar("subnet").notNull(),
megabytesIn: real("bytesIn"),
megabytesOut: real("bytesOut"),
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
lastPing: integer("lastPing"),
type: varchar("type").notNull(), // "olm"
online: boolean("online").notNull().default(false),
// endpoint: varchar("endpoint"),
@@ -738,42 +745,6 @@ export const clients = pgTable("clients", {
>()
});
export const sitePing = pgTable("sitePing", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const siteBandwidth = pgTable("siteBandwidth", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
megabytesIn: real("bytesIn").default(0),
megabytesOut: real("bytesOut").default(0),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientPing = pgTable("clientPing", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const clientBandwidth = pgTable("clientBandwidth", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
megabytesIn: real("bytesIn"),
megabytesOut: real("bytesOut"),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientSitesAssociationsCache = pgTable(
"clientSitesAssociationsCache",
{
@@ -1135,7 +1106,3 @@ export type RequestAuditLog = InferSelectModel<typeof requestAuditLog>;
export type RoundTripMessageTracker = InferSelectModel<
typeof roundTripMessageTracker
>;
export type SitePing = typeof sitePing.$inferSelect;
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
export type ClientPing = typeof clientPing.$inferSelect;
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;

View File

@@ -95,8 +95,12 @@ export const sites = sqliteTable("sites", {
name: text("name").notNull(),
pubKey: text("pubKey"),
subnet: text("subnet"),
megabytesIn: integer("bytesIn").default(0),
megabytesOut: integer("bytesOut").default(0),
lastBandwidthUpdate: text("lastBandwidthUpdate"),
type: text("type").notNull(), // "newt" or "wireguard"
online: integer("online", { mode: "boolean" }).notNull().default(false),
lastPing: integer("lastPing"),
// exit node stuff that is how to connect to the site when it has a wg server
address: text("address"), // this is the address of the wireguard interface in newt
@@ -395,7 +399,10 @@ export const clients = sqliteTable("clients", {
pubKey: text("pubKey"),
olmId: text("olmId"), // to lock it to a specific olm optionally
subnet: text("subnet").notNull(),
megabytesIn: integer("bytesIn"),
megabytesOut: integer("bytesOut"),
lastBandwidthUpdate: text("lastBandwidthUpdate"),
lastPing: integer("lastPing"),
type: text("type").notNull(), // "olm"
online: integer("online", { mode: "boolean" }).notNull().default(false),
// endpoint: text("endpoint"),
@@ -407,42 +414,6 @@ export const clients = sqliteTable("clients", {
>()
});
export const sitePing = sqliteTable("sitePing", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const siteBandwidth = sqliteTable("siteBandwidth", {
siteId: integer("siteId")
.primaryKey()
.references(() => sites.siteId, { onDelete: "cascade" })
.notNull(),
megabytesIn: integer("bytesIn").default(0),
megabytesOut: integer("bytesOut").default(0),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientPing = sqliteTable("clientPing", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
lastPing: integer("lastPing")
});
export const clientBandwidth = sqliteTable("clientBandwidth", {
clientId: integer("clientId")
.primaryKey()
.references(() => clients.clientId, { onDelete: "cascade" })
.notNull(),
megabytesIn: integer("bytesIn"),
megabytesOut: integer("bytesOut"),
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
});
export const clientSitesAssociationsCache = sqliteTable(
"clientSitesAssociationsCache",
{
@@ -1238,7 +1209,3 @@ export type DeviceWebAuthCode = InferSelectModel<typeof deviceWebAuthCodes>;
export type RoundTripMessageTracker = InferSelectModel<
typeof roundTripMessageTracker
>;
export type SitePing = typeof sitePing.$inferSelect;
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
export type ClientPing = typeof clientPing.$inferSelect;
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;

View File

@@ -3,7 +3,7 @@ import config from "./config";
import { getHostMeta } from "./hostMeta";
import logger from "@server/logger";
import { apiKeys, db, roles, siteResources } from "@server/db";
import { sites, users, orgs, resources, clients, idp, siteBandwidth } from "@server/db";
import { sites, users, orgs, resources, clients, idp } from "@server/db";
import { eq, count, notInArray, and, isNotNull, isNull } from "drizzle-orm";
import { APP_VERSION } from "./consts";
import crypto from "crypto";
@@ -150,13 +150,12 @@ class TelemetryClient {
const siteDetails = await db
.select({
siteName: sites.name,
megabytesIn: siteBandwidth.megabytesIn,
megabytesOut: siteBandwidth.megabytesOut,
megabytesIn: sites.megabytesIn,
megabytesOut: sites.megabytesOut,
type: sites.type,
online: sites.online
})
.from(sites)
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
.from(sites);
const supporterKey = config.getSupporterData();

View File

@@ -18,11 +18,10 @@ import {
subscriptionItems,
usage,
sites,
siteBandwidth,
customers,
orgs
} from "@server/db";
import { eq, and, inArray } from "drizzle-orm";
import { eq, and } from "drizzle-orm";
import logger from "@server/logger";
import { getFeatureIdByMetricId, getFeatureIdByPriceId } from "@server/lib/billing/features";
import stripe from "#private/lib/stripe";
@@ -254,19 +253,14 @@ export async function handleSubscriptionUpdated(
);
}
// Also reset the site bandwidth to 0
// Also reset the sites to 0
await trx
.update(siteBandwidth)
.update(sites)
.set({
megabytesIn: 0,
megabytesOut: 0
})
.where(
inArray(
siteBandwidth.siteId,
trx.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
)
);
.where(eq(sites.orgId, orgId));
});
}
}

View File

@@ -1,5 +1,4 @@
import {
clientBandwidth,
clients,
clientSitesAssociationsCache,
currentFingerprint,
@@ -181,8 +180,8 @@ function queryClientsBase() {
name: clients.name,
pubKey: clients.pubKey,
subnet: clients.subnet,
megabytesIn: clientBandwidth.megabytesIn,
megabytesOut: clientBandwidth.megabytesOut,
megabytesIn: clients.megabytesIn,
megabytesOut: clients.megabytesOut,
orgName: orgs.name,
type: clients.type,
online: clients.online,
@@ -201,8 +200,7 @@ function queryClientsBase() {
.leftJoin(orgs, eq(clients.orgId, orgs.orgId))
.leftJoin(olms, eq(clients.clientId, olms.clientId))
.leftJoin(users, eq(clients.userId, users.userId))
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId))
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId));
}
async function getSiteAssociations(clientIds: number[]) {
@@ -369,15 +367,9 @@ export async function listClients(
.offset(pageSize * (page - 1))
.orderBy(
sort_by
? (() => {
const field =
sort_by === "megabytesIn"
? clientBandwidth.megabytesIn
: sort_by === "megabytesOut"
? clientBandwidth.megabytesOut
: clients.name;
return order === "asc" ? asc(field) : desc(field);
})()
? order === "asc"
? asc(clients[sort_by])
: desc(clients[sort_by])
: asc(clients.name)
);

View File

@@ -1,6 +1,5 @@
import { build } from "@server/build";
import {
clientBandwidth,
clients,
currentFingerprint,
db,
@@ -212,8 +211,8 @@ function queryUserDevicesBase() {
name: clients.name,
pubKey: clients.pubKey,
subnet: clients.subnet,
megabytesIn: clientBandwidth.megabytesIn,
megabytesOut: clientBandwidth.megabytesOut,
megabytesIn: clients.megabytesIn,
megabytesOut: clients.megabytesOut,
orgName: orgs.name,
type: clients.type,
online: clients.online,
@@ -240,8 +239,7 @@ function queryUserDevicesBase() {
.leftJoin(orgs, eq(clients.orgId, orgs.orgId))
.leftJoin(olms, eq(clients.clientId, olms.clientId))
.leftJoin(users, eq(clients.userId, users.userId))
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId))
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId));
}
type OlmWithUpdateAvailable = Awaited<
@@ -429,15 +427,9 @@ export async function listUserDevices(
.offset(pageSize * (page - 1))
.orderBy(
sort_by
? (() => {
const field =
sort_by === "megabytesIn"
? clientBandwidth.megabytesIn
: sort_by === "megabytesOut"
? clientBandwidth.megabytesOut
: clients.name;
return order === "asc" ? asc(field) : desc(field);
})()
? order === "asc"
? asc(clients[sort_by])
: desc(clients[sort_by])
: asc(clients.clientId)
);

View File

@@ -122,7 +122,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
const snapshot = accumulator;
accumulator = new Map<string, AccumulatorEntry>();
const currentEpoch = Math.floor(Date.now() / 1000);
const currentTime = new Date().toISOString();
// Sort by publicKey for consistent lock ordering across concurrent
// writers — deadlock-prevention strategy.
@@ -157,52 +157,33 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
orgId: string;
pubKey: string;
}>(sql`
WITH upsert AS (
INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate")
SELECT s."siteId", ${bytesIn}, ${bytesOut}, ${currentEpoch}
FROM "sites" s WHERE s."pubKey" = ${publicKey}
ON CONFLICT ("siteId") DO UPDATE SET
"bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn",
"bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut",
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
RETURNING "siteId"
)
SELECT u."siteId", s."orgId", s."pubKey"
FROM upsert u
INNER JOIN "sites" s ON s."siteId" = u."siteId"
UPDATE sites
SET
"bytesOut" = COALESCE("bytesOut", 0) + ${bytesIn},
"bytesIn" = COALESCE("bytesIn", 0) + ${bytesOut},
"lastBandwidthUpdate" = ${currentTime}
WHERE "pubKey" = ${publicKey}
RETURNING "orgId", "pubKey"
`);
results.push(...result);
}
return results;
}
// PostgreSQL: batch UPSERT via CTE — single round-trip per chunk.
// PostgreSQL: batch UPDATE … FROM (VALUES …) — single round-trip per chunk.
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)`
);
const valuesClause = sql.join(valuesList, sql`, `);
return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
WITH vals(pub_key, bytes_in, bytes_out) AS (
VALUES ${valuesClause}
),
site_lookup AS (
SELECT s."siteId", s."orgId", s."pubKey", v.bytes_in, v.bytes_out
FROM vals v
INNER JOIN "sites" s ON s."pubKey" = v.pub_key
),
upsert AS (
INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate")
SELECT sl."siteId", sl.bytes_in, sl.bytes_out, ${currentEpoch}::integer
FROM site_lookup sl
ON CONFLICT ("siteId") DO UPDATE SET
"bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn",
"bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut",
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
RETURNING "siteId"
)
SELECT u."siteId", s."orgId", s."pubKey"
FROM upsert u
INNER JOIN "sites" s ON s."siteId" = u."siteId"
UPDATE sites
SET
"bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in,
"bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out,
"lastBandwidthUpdate" = ${currentTime}
FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out)
WHERE sites."pubKey" = v.pub_key
RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey"
`);
}, `flush bandwidth chunk [${i}${chunkEnd}]`);
} catch (error) {

View File

@@ -1,11 +1,11 @@
import { db, newts, sites, targetHealthCheck, targets, sitePing, siteBandwidth } from "@server/db";
import { db, newts, sites, targetHealthCheck, targets } from "@server/db";
import {
hasActiveConnections,
getClientConfigVersion
} from "#dynamic/routers/ws";
import { MessageHandler } from "@server/routers/ws";
import { Newt } from "@server/db";
import { eq, lt, isNull, and, or, ne } from "drizzle-orm";
import { eq, lt, isNull, and, or, ne, not } from "drizzle-orm";
import logger from "@server/logger";
import { sendNewtSyncMessage } from "./sync";
import { recordPing } from "./pingAccumulator";
@@ -41,18 +41,17 @@ export const startNewtOfflineChecker = (): void => {
.select({
siteId: sites.siteId,
newtId: newts.newtId,
lastPing: sitePing.lastPing
lastPing: sites.lastPing
})
.from(sites)
.innerJoin(newts, eq(newts.siteId, sites.siteId))
.leftJoin(sitePing, eq(sitePing.siteId, sites.siteId))
.where(
and(
eq(sites.online, true),
eq(sites.type, "newt"),
or(
lt(sitePing.lastPing, twoMinutesAgo),
isNull(sitePing.lastPing)
lt(sites.lastPing, twoMinutesAgo),
isNull(sites.lastPing)
)
)
);
@@ -113,11 +112,15 @@ export const startNewtOfflineChecker = (): void => {
.select({
siteId: sites.siteId,
online: sites.online,
lastBandwidthUpdate: siteBandwidth.lastBandwidthUpdate
lastBandwidthUpdate: sites.lastBandwidthUpdate
})
.from(sites)
.innerJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId))
.where(eq(sites.type, "wireguard"));
.where(
and(
eq(sites.type, "wireguard"),
not(isNull(sites.lastBandwidthUpdate))
)
);
const wireguardOfflineThreshold = Math.floor(
(Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000
@@ -125,7 +128,12 @@ export const startNewtOfflineChecker = (): void => {
// loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline
for (const site of allWireguardSites) {
if ((site.lastBandwidthUpdate ?? 0) < wireguardOfflineThreshold && site.online) {
const lastBandwidthUpdate =
new Date(site.lastBandwidthUpdate!).getTime() / 1000;
if (
lastBandwidthUpdate < wireguardOfflineThreshold &&
site.online
) {
logger.info(
`Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes`
);
@@ -134,7 +142,10 @@ export const startNewtOfflineChecker = (): void => {
.update(sites)
.set({ online: false })
.where(eq(sites.siteId, site.siteId));
} else if ((site.lastBandwidthUpdate ?? 0) >= wireguardOfflineThreshold && !site.online) {
} else if (
lastBandwidthUpdate >= wireguardOfflineThreshold &&
!site.online
) {
logger.info(
`Marking wireguard site ${site.siteId} online: recent bandwidth update`
);

View File

@@ -1,5 +1,6 @@
import { db, clients, clientBandwidth } from "@server/db";
import { db } from "@server/db";
import { MessageHandler } from "@server/routers/ws";
import { clients } from "@server/db";
import { eq, sql } from "drizzle-orm";
import logger from "@server/logger";
@@ -84,7 +85,7 @@ export async function flushBandwidthToDb(): Promise<void> {
const snapshot = accumulator;
accumulator = new Map<string, BandwidthAccumulator>();
const currentEpoch = Math.floor(Date.now() / 1000);
const currentTime = new Date().toISOString();
// Sort by publicKey for consistent lock ordering across concurrent
// writers — this is the same deadlock-prevention strategy used in the
@@ -100,37 +101,19 @@ export async function flushBandwidthToDb(): Promise<void> {
for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) {
try {
await withDeadlockRetry(async () => {
// Find clientId by pubKey
const [clientRow] = await db
.select({ clientId: clients.clientId })
.from(clients)
.where(eq(clients.pubKey, publicKey))
.limit(1);
if (!clientRow) {
logger.warn(`No client found for pubKey ${publicKey}, skipping`);
return;
}
// Use atomic SQL increment to avoid the SELECT-then-UPDATE
// anti-pattern and the races it would introduce.
await db
.insert(clientBandwidth)
.values({
clientId: clientRow.clientId,
.update(clients)
.set({
// Note: bytesIn from peer goes to megabytesOut (data
// sent to client) and bytesOut from peer goes to
// megabytesIn (data received from client).
megabytesOut: bytesIn,
megabytesIn: bytesOut,
lastBandwidthUpdate: currentEpoch
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
lastBandwidthUpdate: currentTime
})
.onConflictDoUpdate({
target: clientBandwidth.clientId,
set: {
megabytesOut: sql`COALESCE(${clientBandwidth.megabytesOut}, 0) + ${bytesIn}`,
megabytesIn: sql`COALESCE(${clientBandwidth.megabytesIn}, 0) + ${bytesOut}`,
lastBandwidthUpdate: currentEpoch
}
});
.where(eq(clients.pubKey, publicKey));
}, `flush bandwidth for client ${publicKey}`);
} catch (error) {
logger.error(

View File

@@ -1,6 +1,6 @@
import { db } from "@server/db";
import { sites, clients, olms, sitePing, clientPing } from "@server/db";
import { inArray, sql } from "drizzle-orm";
import { sites, clients, olms } from "@server/db";
import { inArray } from "drizzle-orm";
import logger from "@server/logger";
/**
@@ -81,8 +81,11 @@ export function recordClientPing(
/**
* Flush all accumulated site pings to the database.
*
* For each batch: first upserts individual per-site timestamps into
* `sitePing`, then bulk-updates `sites.online = true`.
* Each batch of up to BATCH_SIZE rows is written with a **single** UPDATE
* statement. We use the maximum timestamp across the batch so that `lastPing`
* reflects the most recent ping seen for any site in the group. This avoids
* the multi-statement transaction that previously created additional
* row-lock ordering hazards.
*/
async function flushSitePingsToDb(): Promise<void> {
if (pendingSitePings.size === 0) {
@@ -100,25 +103,20 @@ async function flushSitePingsToDb(): Promise<void> {
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
const batch = entries.slice(i, i + BATCH_SIZE);
// Use the latest timestamp in the batch so that `lastPing` always
// moves forward. Using a single timestamp for the whole batch means
// we only ever need one UPDATE statement (no transaction).
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
const siteIds = batch.map(([id]) => id);
try {
await withRetry(async () => {
const rows = batch.map(([siteId, ts]) => ({ siteId, lastPing: ts }));
// Step 1: Upsert ping timestamps into sitePing
await db
.insert(sitePing)
.values(rows)
.onConflictDoUpdate({
target: sitePing.siteId,
set: { lastPing: sql`excluded."lastPing"` }
});
// Step 2: Update online status on sites
await db
.update(sites)
.set({ online: true })
.set({
online: true,
lastPing: maxTimestamp
})
.where(inArray(sites.siteId, siteIds));
}, "flushSitePingsToDb");
} catch (error) {
@@ -141,8 +139,7 @@ async function flushSitePingsToDb(): Promise<void> {
/**
* Flush all accumulated client (OLM) pings to the database.
*
* For each batch: first upserts individual per-client timestamps into
* `clientPing`, then bulk-updates `clients.online = true, archived = false`.
* Same single-UPDATE-per-batch approach as `flushSitePingsToDb`.
*/
async function flushClientPingsToDb(): Promise<void> {
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
@@ -164,25 +161,18 @@ async function flushClientPingsToDb(): Promise<void> {
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
const batch = entries.slice(i, i + BATCH_SIZE);
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
const clientIds = batch.map(([id]) => id);
try {
await withRetry(async () => {
const rows = batch.map(([clientId, ts]) => ({ clientId, lastPing: ts }));
// Step 1: Upsert ping timestamps into clientPing
await db
.insert(clientPing)
.values(rows)
.onConflictDoUpdate({
target: clientPing.clientId,
set: { lastPing: sql`excluded."lastPing"` }
});
// Step 2: Update online + unarchive on clients
await db
.update(clients)
.set({ online: true, archived: false })
.set({
lastPing: maxTimestamp,
online: true,
archived: false
})
.where(inArray(clients.clientId, clientIds));
}, "flushClientPingsToDb");
} catch (error) {

View File

@@ -1,8 +1,8 @@
import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws";
import { db } from "@server/db";
import { MessageHandler } from "@server/routers/ws";
import { clients, olms, Olm, clientPing } from "@server/db";
import { eq, lt, isNull, and, or, inArray } from "drizzle-orm";
import { clients, olms, Olm } from "@server/db";
import { eq, lt, isNull, and, or } from "drizzle-orm";
import { recordClientPing } from "@server/routers/newt/pingAccumulator";
import logger from "@server/logger";
import { validateSessionToken } from "@server/auth/sessions/app";
@@ -37,33 +37,21 @@ export const startOlmOfflineChecker = (): void => {
// TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
const staleClientRows = await db
.select({
clientId: clients.clientId,
olmId: clients.olmId,
lastPing: clientPing.lastPing
})
.from(clients)
.leftJoin(clientPing, eq(clientPing.clientId, clients.clientId))
const offlineClients = await db
.update(clients)
.set({ online: false })
.where(
and(
eq(clients.online, true),
or(
lt(clientPing.lastPing, twoMinutesAgo),
isNull(clientPing.lastPing)
lt(clients.lastPing, twoMinutesAgo),
isNull(clients.lastPing)
)
)
);
)
.returning();
if (staleClientRows.length > 0) {
const staleClientIds = staleClientRows.map((c) => c.clientId);
await db
.update(clients)
.set({ online: false })
.where(inArray(clients.clientId, staleClientIds));
}
for (const offlineClient of staleClientRows) {
for (const offlineClient of offlineClients) {
logger.info(
`Kicking offline olm client ${offlineClient.clientId} due to inactivity`
);

View File

@@ -1,7 +1,7 @@
import { NextFunction, Request, Response } from "express";
import { z } from "zod";
import { db, sites, siteBandwidth } from "@server/db";
import { eq, inArray } from "drizzle-orm";
import { db, sites } from "@server/db";
import { eq } from "drizzle-orm";
import response from "@server/lib/response";
import HttpCode from "@server/types/HttpCode";
import createHttpError from "http-errors";
@@ -60,17 +60,12 @@ export async function resetOrgBandwidth(
}
await db
.update(siteBandwidth)
.update(sites)
.set({
megabytesIn: 0,
megabytesOut: 0
})
.where(
inArray(
siteBandwidth.siteId,
db.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
)
);
.where(eq(sites.orgId, orgId));
return response(res, {
data: {},

View File

@@ -6,7 +6,6 @@ import {
remoteExitNodes,
roleSites,
sites,
siteBandwidth,
userSites
} from "@server/db";
import cache from "#dynamic/lib/cache";
@@ -156,8 +155,8 @@ function querySitesBase() {
name: sites.name,
pubKey: sites.pubKey,
subnet: sites.subnet,
megabytesIn: siteBandwidth.megabytesIn,
megabytesOut: siteBandwidth.megabytesOut,
megabytesIn: sites.megabytesIn,
megabytesOut: sites.megabytesOut,
orgName: orgs.name,
type: sites.type,
online: sites.online,
@@ -176,8 +175,7 @@ function querySitesBase() {
.leftJoin(
remoteExitNodes,
eq(remoteExitNodes.exitNodeId, sites.exitNodeId)
)
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
);
}
type SiteWithUpdateAvailable = Awaited<ReturnType<typeof querySitesBase>>[0] & {
@@ -301,15 +299,9 @@ export async function listSites(
.offset(pageSize * (page - 1))
.orderBy(
sort_by
? (() => {
const field =
sort_by === "megabytesIn"
? siteBandwidth.megabytesIn
: sort_by === "megabytesOut"
? siteBandwidth.megabytesOut
: sites.name;
return order === "asc" ? asc(field) : desc(field);
})()
? order === "asc"
? asc(sites[sort_by])
: desc(sites[sort_by])
: asc(sites.name)
);

View File

@@ -22,7 +22,6 @@ import m13 from "./scriptsPg/1.15.3";
import m14 from "./scriptsPg/1.15.4";
import m15 from "./scriptsPg/1.16.0";
import m16 from "./scriptsPg/1.17.0";
import m17 from "./scriptsPg/1.18.0";
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
// EXCEPT FOR THE DATABASE AND THE SCHEMA
@@ -44,8 +43,7 @@ const migrations = [
{ version: "1.15.3", run: m13 },
{ version: "1.15.4", run: m14 },
{ version: "1.16.0", run: m15 },
{ version: "1.17.0", run: m16 },
{ version: "1.18.0", run: m17 }
{ version: "1.17.0", run: m16 }
// Add new migrations here as they are created
] as {
version: string;

View File

@@ -40,7 +40,6 @@ import m34 from "./scriptsSqlite/1.15.3";
import m35 from "./scriptsSqlite/1.15.4";
import m36 from "./scriptsSqlite/1.16.0";
import m37 from "./scriptsSqlite/1.17.0";
import m38 from "./scriptsSqlite/1.18.0";
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
// EXCEPT FOR THE DATABASE AND THE SCHEMA
@@ -78,8 +77,7 @@ const migrations = [
{ version: "1.15.3", run: m34 },
{ version: "1.15.4", run: m35 },
{ version: "1.16.0", run: m36 },
{ version: "1.17.0", run: m37 },
{ version: "1.18.0", run: m38 }
{ version: "1.17.0", run: m37 }
// Add new migrations here as they are created
] as const;