mirror of
https://github.com/fosrl/pangolin.git
synced 2026-04-14 05:46:38 +00:00
Compare commits
7 Commits
breakout-s
...
miloschwar
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
02dfeed3ce | ||
|
|
3436105bec | ||
|
|
4b3375ab8e | ||
|
|
6ce165bfd5 | ||
|
|
035644eaf7 | ||
|
|
16e7233a3e | ||
|
|
1f74e1b320 |
1
.github/CODEOWNERS
vendored
1
.github/CODEOWNERS
vendored
@@ -1 +0,0 @@
|
||||
* @oschwartz10612 @miloschwartz
|
||||
22
README.md
22
README.md
@@ -35,19 +35,13 @@
|
||||
|
||||
</div>
|
||||
|
||||
<p align="center">
|
||||
<a href="https://docs.pangolin.net/careers/join-us">
|
||||
<img src="https://img.shields.io/badge/🚀_We're_Hiring!-Join_Our_Team-brightgreen?style=for-the-badge" alt="We're Hiring!" />
|
||||
</a>
|
||||
</p>
|
||||
|
||||
<p align="center">
|
||||
<strong>
|
||||
Get started with Pangolin at <a href="https://app.pangolin.net/auth/signup">app.pangolin.net</a>
|
||||
</strong>
|
||||
</p>
|
||||
|
||||
Pangolin is an open-source, identity-based remote access platform built on WireGuard that enables secure, seamless connectivity to private and public resources. Pangolin combines reverse proxy and VPN capabilities into one platform, providing browser-based access to web applications and client-based access to any private resources, all with zero-trust security and granular access control.
|
||||
Pangolin is an open-source, identity-based remote access platform built on WireGuard that enables secure, seamless connectivity to private and public resources. Pangolin combines reverse proxy and VPN capabilities into one platform, providing browser-based access to web applications and client-based access to any private resources with NAT traversal, all with granular access controls.
|
||||
|
||||
## Installation
|
||||
|
||||
@@ -60,16 +54,16 @@ Pangolin is an open-source, identity-based remote access platform built on WireG
|
||||
|
||||
| <img width=500 /> | Description |
|
||||
|-----------------|--------------|
|
||||
| **Pangolin Cloud** | Fully managed service with instant setup and pay-as-you-go pricing - no infrastructure required. Or, self-host your own [remote node](https://docs.pangolin.net/manage/remote-node/understanding-nodes) and connect to our control plane. |
|
||||
| **Pangolin Cloud** | Fully managed service - no infrastructure required. |
|
||||
| **Self-Host: Community Edition** | Free, open source, and licensed under AGPL-3. |
|
||||
| **Self-Host: Enterprise Edition** | Licensed under Fossorial Commercial License. Free for personal and hobbyist use, and for businesses earning under \$100K USD annually. |
|
||||
| **Self-Host: Enterprise Edition** | Licensed under Fossorial Commercial License. Free for personal and hobbyist use, and for businesses making less than \$100K USD gross annual revenue. |
|
||||
|
||||
## Key Features
|
||||
|
||||
| <img width=500 /> | <img width=500 /> |
|
||||
|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------|
|
||||
| **Connect remote networks with sites**<br /><br />Pangolin's lightweight site connectors create secure tunnels from remote networks without requiring public IP addresses or open ports. Sites make any network anywhere available for authorized access. | <img src="public/screenshots/sites.png" width=500 /><tr></tr> |
|
||||
| **Browser-based reverse proxy access**<br /><br />Expose web applications through identity and context-aware tunneled reverse proxies. Pangolin handles routing, load balancing, health checking, and automatic SSL certificates without exposing your network directly to the internet. Users access applications through any web browser with authentication and granular access control. | <img src="public/clip.gif" width=500 /><tr></tr> |
|
||||
| **Connect remote networks with sites**<br /><br />Pangolin's site connectors create secure tunnels from remote networks without requiring public IP addresses or open ports. Sites make any network anywhere available for authorized access. | <img src="public/screenshots/sites.png" width=500 /><tr></tr> |
|
||||
| **Browser-based reverse proxy access**<br /><br />Expose web applications through identity and context-aware tunneled reverse proxies. Users access applications through any web browser with authentication and granular access control. Pangolin handles routing, load balancing, health checking, and automatic SSL certificates without exposing your network directly to the internet. | <img src="public/clip.gif" width=500 /><tr></tr> |
|
||||
| **Client-based private resource access**<br /><br />Access private resources like SSH servers, databases, RDP, and entire network ranges through Pangolin clients. Intelligent NAT traversal enables connections even through restrictive firewalls, while DNS aliases provide friendly names and fast connections to resources across all your sites. | <img src="public/screenshots/private-resources.png" width=500 /><tr></tr> |
|
||||
| **Zero-trust granular access**<br /><br />Grant users access to specific resources, not entire networks. Unlike traditional VPNs that expose full network access, Pangolin's zero-trust model ensures users can only reach the applications and services you explicitly define, reducing security risk and attack surface. | <img src="public/screenshots/user-devices.png" width=500 /><tr></tr> |
|
||||
|
||||
@@ -87,7 +81,7 @@ Download the Pangolin client for your platform:
|
||||
|
||||
### Sign up now
|
||||
|
||||
Create an account at [app.pangolin.net](https://app.pangolin.net) to get started with Pangolin Cloud. A generous free tier is available.
|
||||
Create a free account at [app.pangolin.net](https://app.pangolin.net) to get started with Pangolin Cloud.
|
||||
|
||||
### Check out the docs
|
||||
|
||||
@@ -102,7 +96,3 @@ Pangolin is dual licensed under the AGPL-3 and the [Fossorial Commercial License
|
||||
## Contributions
|
||||
|
||||
Please see [CONTRIBUTING](./CONTRIBUTING.md) in the repository for guidelines and best practices.
|
||||
|
||||
---
|
||||
|
||||
WireGuard® is a registered trademark of Jason A. Donenfeld.
|
||||
|
||||
@@ -89,8 +89,12 @@ export const sites = pgTable("sites", {
|
||||
name: varchar("name").notNull(),
|
||||
pubKey: varchar("pubKey"),
|
||||
subnet: varchar("subnet"),
|
||||
megabytesIn: real("bytesIn").default(0),
|
||||
megabytesOut: real("bytesOut").default(0),
|
||||
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
|
||||
type: varchar("type").notNull(), // "newt" or "wireguard"
|
||||
online: boolean("online").notNull().default(false),
|
||||
lastPing: integer("lastPing"),
|
||||
address: varchar("address"),
|
||||
endpoint: varchar("endpoint"),
|
||||
publicKey: varchar("publicKey"),
|
||||
@@ -725,7 +729,10 @@ export const clients = pgTable("clients", {
|
||||
name: varchar("name").notNull(),
|
||||
pubKey: varchar("pubKey"),
|
||||
subnet: varchar("subnet").notNull(),
|
||||
|
||||
megabytesIn: real("bytesIn"),
|
||||
megabytesOut: real("bytesOut"),
|
||||
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
|
||||
lastPing: integer("lastPing"),
|
||||
type: varchar("type").notNull(), // "olm"
|
||||
online: boolean("online").notNull().default(false),
|
||||
// endpoint: varchar("endpoint"),
|
||||
@@ -738,42 +745,6 @@ export const clients = pgTable("clients", {
|
||||
>()
|
||||
});
|
||||
|
||||
export const sitePing = pgTable("sitePing", {
|
||||
siteId: integer("siteId")
|
||||
.primaryKey()
|
||||
.references(() => sites.siteId, { onDelete: "cascade" })
|
||||
.notNull(),
|
||||
lastPing: integer("lastPing")
|
||||
});
|
||||
|
||||
export const siteBandwidth = pgTable("siteBandwidth", {
|
||||
siteId: integer("siteId")
|
||||
.primaryKey()
|
||||
.references(() => sites.siteId, { onDelete: "cascade" })
|
||||
.notNull(),
|
||||
megabytesIn: real("bytesIn").default(0),
|
||||
megabytesOut: real("bytesOut").default(0),
|
||||
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
|
||||
});
|
||||
|
||||
export const clientPing = pgTable("clientPing", {
|
||||
clientId: integer("clientId")
|
||||
.primaryKey()
|
||||
.references(() => clients.clientId, { onDelete: "cascade" })
|
||||
.notNull(),
|
||||
lastPing: integer("lastPing")
|
||||
});
|
||||
|
||||
export const clientBandwidth = pgTable("clientBandwidth", {
|
||||
clientId: integer("clientId")
|
||||
.primaryKey()
|
||||
.references(() => clients.clientId, { onDelete: "cascade" })
|
||||
.notNull(),
|
||||
megabytesIn: real("bytesIn"),
|
||||
megabytesOut: real("bytesOut"),
|
||||
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
|
||||
});
|
||||
|
||||
export const clientSitesAssociationsCache = pgTable(
|
||||
"clientSitesAssociationsCache",
|
||||
{
|
||||
@@ -1135,7 +1106,3 @@ export type RequestAuditLog = InferSelectModel<typeof requestAuditLog>;
|
||||
export type RoundTripMessageTracker = InferSelectModel<
|
||||
typeof roundTripMessageTracker
|
||||
>;
|
||||
export type SitePing = typeof sitePing.$inferSelect;
|
||||
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
|
||||
export type ClientPing = typeof clientPing.$inferSelect;
|
||||
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;
|
||||
|
||||
@@ -95,8 +95,12 @@ export const sites = sqliteTable("sites", {
|
||||
name: text("name").notNull(),
|
||||
pubKey: text("pubKey"),
|
||||
subnet: text("subnet"),
|
||||
megabytesIn: integer("bytesIn").default(0),
|
||||
megabytesOut: integer("bytesOut").default(0),
|
||||
lastBandwidthUpdate: text("lastBandwidthUpdate"),
|
||||
type: text("type").notNull(), // "newt" or "wireguard"
|
||||
online: integer("online", { mode: "boolean" }).notNull().default(false),
|
||||
lastPing: integer("lastPing"),
|
||||
|
||||
// exit node stuff that is how to connect to the site when it has a wg server
|
||||
address: text("address"), // this is the address of the wireguard interface in newt
|
||||
@@ -395,7 +399,10 @@ export const clients = sqliteTable("clients", {
|
||||
pubKey: text("pubKey"),
|
||||
olmId: text("olmId"), // to lock it to a specific olm optionally
|
||||
subnet: text("subnet").notNull(),
|
||||
|
||||
megabytesIn: integer("bytesIn"),
|
||||
megabytesOut: integer("bytesOut"),
|
||||
lastBandwidthUpdate: text("lastBandwidthUpdate"),
|
||||
lastPing: integer("lastPing"),
|
||||
type: text("type").notNull(), // "olm"
|
||||
online: integer("online", { mode: "boolean" }).notNull().default(false),
|
||||
// endpoint: text("endpoint"),
|
||||
@@ -407,42 +414,6 @@ export const clients = sqliteTable("clients", {
|
||||
>()
|
||||
});
|
||||
|
||||
export const sitePing = sqliteTable("sitePing", {
|
||||
siteId: integer("siteId")
|
||||
.primaryKey()
|
||||
.references(() => sites.siteId, { onDelete: "cascade" })
|
||||
.notNull(),
|
||||
lastPing: integer("lastPing")
|
||||
});
|
||||
|
||||
export const siteBandwidth = sqliteTable("siteBandwidth", {
|
||||
siteId: integer("siteId")
|
||||
.primaryKey()
|
||||
.references(() => sites.siteId, { onDelete: "cascade" })
|
||||
.notNull(),
|
||||
megabytesIn: integer("bytesIn").default(0),
|
||||
megabytesOut: integer("bytesOut").default(0),
|
||||
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
|
||||
});
|
||||
|
||||
export const clientPing = sqliteTable("clientPing", {
|
||||
clientId: integer("clientId")
|
||||
.primaryKey()
|
||||
.references(() => clients.clientId, { onDelete: "cascade" })
|
||||
.notNull(),
|
||||
lastPing: integer("lastPing")
|
||||
});
|
||||
|
||||
export const clientBandwidth = sqliteTable("clientBandwidth", {
|
||||
clientId: integer("clientId")
|
||||
.primaryKey()
|
||||
.references(() => clients.clientId, { onDelete: "cascade" })
|
||||
.notNull(),
|
||||
megabytesIn: integer("bytesIn"),
|
||||
megabytesOut: integer("bytesOut"),
|
||||
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
|
||||
});
|
||||
|
||||
export const clientSitesAssociationsCache = sqliteTable(
|
||||
"clientSitesAssociationsCache",
|
||||
{
|
||||
@@ -1238,7 +1209,3 @@ export type DeviceWebAuthCode = InferSelectModel<typeof deviceWebAuthCodes>;
|
||||
export type RoundTripMessageTracker = InferSelectModel<
|
||||
typeof roundTripMessageTracker
|
||||
>;
|
||||
export type SitePing = typeof sitePing.$inferSelect;
|
||||
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
|
||||
export type ClientPing = typeof clientPing.$inferSelect;
|
||||
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;
|
||||
|
||||
@@ -3,7 +3,7 @@ import config from "./config";
|
||||
import { getHostMeta } from "./hostMeta";
|
||||
import logger from "@server/logger";
|
||||
import { apiKeys, db, roles, siteResources } from "@server/db";
|
||||
import { sites, users, orgs, resources, clients, idp, siteBandwidth } from "@server/db";
|
||||
import { sites, users, orgs, resources, clients, idp } from "@server/db";
|
||||
import { eq, count, notInArray, and, isNotNull, isNull } from "drizzle-orm";
|
||||
import { APP_VERSION } from "./consts";
|
||||
import crypto from "crypto";
|
||||
@@ -150,13 +150,12 @@ class TelemetryClient {
|
||||
const siteDetails = await db
|
||||
.select({
|
||||
siteName: sites.name,
|
||||
megabytesIn: siteBandwidth.megabytesIn,
|
||||
megabytesOut: siteBandwidth.megabytesOut,
|
||||
megabytesIn: sites.megabytesIn,
|
||||
megabytesOut: sites.megabytesOut,
|
||||
type: sites.type,
|
||||
online: sites.online
|
||||
})
|
||||
.from(sites)
|
||||
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
|
||||
.from(sites);
|
||||
|
||||
const supporterKey = config.getSupporterData();
|
||||
|
||||
|
||||
@@ -18,11 +18,10 @@ import {
|
||||
subscriptionItems,
|
||||
usage,
|
||||
sites,
|
||||
siteBandwidth,
|
||||
customers,
|
||||
orgs
|
||||
} from "@server/db";
|
||||
import { eq, and, inArray } from "drizzle-orm";
|
||||
import { eq, and } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { getFeatureIdByMetricId, getFeatureIdByPriceId } from "@server/lib/billing/features";
|
||||
import stripe from "#private/lib/stripe";
|
||||
@@ -254,19 +253,14 @@ export async function handleSubscriptionUpdated(
|
||||
);
|
||||
}
|
||||
|
||||
// Also reset the site bandwidth to 0
|
||||
// Also reset the sites to 0
|
||||
await trx
|
||||
.update(siteBandwidth)
|
||||
.update(sites)
|
||||
.set({
|
||||
megabytesIn: 0,
|
||||
megabytesOut: 0
|
||||
})
|
||||
.where(
|
||||
inArray(
|
||||
siteBandwidth.siteId,
|
||||
trx.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
|
||||
)
|
||||
);
|
||||
.where(eq(sites.orgId, orgId));
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
import {
|
||||
clientBandwidth,
|
||||
clients,
|
||||
clientSitesAssociationsCache,
|
||||
currentFingerprint,
|
||||
@@ -181,8 +180,8 @@ function queryClientsBase() {
|
||||
name: clients.name,
|
||||
pubKey: clients.pubKey,
|
||||
subnet: clients.subnet,
|
||||
megabytesIn: clientBandwidth.megabytesIn,
|
||||
megabytesOut: clientBandwidth.megabytesOut,
|
||||
megabytesIn: clients.megabytesIn,
|
||||
megabytesOut: clients.megabytesOut,
|
||||
orgName: orgs.name,
|
||||
type: clients.type,
|
||||
online: clients.online,
|
||||
@@ -201,8 +200,7 @@ function queryClientsBase() {
|
||||
.leftJoin(orgs, eq(clients.orgId, orgs.orgId))
|
||||
.leftJoin(olms, eq(clients.clientId, olms.clientId))
|
||||
.leftJoin(users, eq(clients.userId, users.userId))
|
||||
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId))
|
||||
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
|
||||
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId));
|
||||
}
|
||||
|
||||
async function getSiteAssociations(clientIds: number[]) {
|
||||
@@ -369,15 +367,9 @@ export async function listClients(
|
||||
.offset(pageSize * (page - 1))
|
||||
.orderBy(
|
||||
sort_by
|
||||
? (() => {
|
||||
const field =
|
||||
sort_by === "megabytesIn"
|
||||
? clientBandwidth.megabytesIn
|
||||
: sort_by === "megabytesOut"
|
||||
? clientBandwidth.megabytesOut
|
||||
: clients.name;
|
||||
return order === "asc" ? asc(field) : desc(field);
|
||||
})()
|
||||
? order === "asc"
|
||||
? asc(clients[sort_by])
|
||||
: desc(clients[sort_by])
|
||||
: asc(clients.name)
|
||||
);
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { build } from "@server/build";
|
||||
import {
|
||||
clientBandwidth,
|
||||
clients,
|
||||
currentFingerprint,
|
||||
db,
|
||||
@@ -212,8 +211,8 @@ function queryUserDevicesBase() {
|
||||
name: clients.name,
|
||||
pubKey: clients.pubKey,
|
||||
subnet: clients.subnet,
|
||||
megabytesIn: clientBandwidth.megabytesIn,
|
||||
megabytesOut: clientBandwidth.megabytesOut,
|
||||
megabytesIn: clients.megabytesIn,
|
||||
megabytesOut: clients.megabytesOut,
|
||||
orgName: orgs.name,
|
||||
type: clients.type,
|
||||
online: clients.online,
|
||||
@@ -240,8 +239,7 @@ function queryUserDevicesBase() {
|
||||
.leftJoin(orgs, eq(clients.orgId, orgs.orgId))
|
||||
.leftJoin(olms, eq(clients.clientId, olms.clientId))
|
||||
.leftJoin(users, eq(clients.userId, users.userId))
|
||||
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId))
|
||||
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
|
||||
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId));
|
||||
}
|
||||
|
||||
type OlmWithUpdateAvailable = Awaited<
|
||||
@@ -429,15 +427,9 @@ export async function listUserDevices(
|
||||
.offset(pageSize * (page - 1))
|
||||
.orderBy(
|
||||
sort_by
|
||||
? (() => {
|
||||
const field =
|
||||
sort_by === "megabytesIn"
|
||||
? clientBandwidth.megabytesIn
|
||||
: sort_by === "megabytesOut"
|
||||
? clientBandwidth.megabytesOut
|
||||
: clients.name;
|
||||
return order === "asc" ? asc(field) : desc(field);
|
||||
})()
|
||||
? order === "asc"
|
||||
? asc(clients[sort_by])
|
||||
: desc(clients[sort_by])
|
||||
: asc(clients.clientId)
|
||||
);
|
||||
|
||||
|
||||
@@ -122,7 +122,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
const snapshot = accumulator;
|
||||
accumulator = new Map<string, AccumulatorEntry>();
|
||||
|
||||
const currentEpoch = Math.floor(Date.now() / 1000);
|
||||
const currentTime = new Date().toISOString();
|
||||
|
||||
// Sort by publicKey for consistent lock ordering across concurrent
|
||||
// writers — deadlock-prevention strategy.
|
||||
@@ -157,52 +157,33 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
||||
orgId: string;
|
||||
pubKey: string;
|
||||
}>(sql`
|
||||
WITH upsert AS (
|
||||
INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate")
|
||||
SELECT s."siteId", ${bytesIn}, ${bytesOut}, ${currentEpoch}
|
||||
FROM "sites" s WHERE s."pubKey" = ${publicKey}
|
||||
ON CONFLICT ("siteId") DO UPDATE SET
|
||||
"bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn",
|
||||
"bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut",
|
||||
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
|
||||
RETURNING "siteId"
|
||||
)
|
||||
SELECT u."siteId", s."orgId", s."pubKey"
|
||||
FROM upsert u
|
||||
INNER JOIN "sites" s ON s."siteId" = u."siteId"
|
||||
UPDATE sites
|
||||
SET
|
||||
"bytesOut" = COALESCE("bytesOut", 0) + ${bytesIn},
|
||||
"bytesIn" = COALESCE("bytesIn", 0) + ${bytesOut},
|
||||
"lastBandwidthUpdate" = ${currentTime}
|
||||
WHERE "pubKey" = ${publicKey}
|
||||
RETURNING "orgId", "pubKey"
|
||||
`);
|
||||
results.push(...result);
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
// PostgreSQL: batch UPSERT via CTE — single round-trip per chunk.
|
||||
// PostgreSQL: batch UPDATE … FROM (VALUES …) — single round-trip per chunk.
|
||||
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
|
||||
sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)`
|
||||
);
|
||||
const valuesClause = sql.join(valuesList, sql`, `);
|
||||
return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
|
||||
WITH vals(pub_key, bytes_in, bytes_out) AS (
|
||||
VALUES ${valuesClause}
|
||||
),
|
||||
site_lookup AS (
|
||||
SELECT s."siteId", s."orgId", s."pubKey", v.bytes_in, v.bytes_out
|
||||
FROM vals v
|
||||
INNER JOIN "sites" s ON s."pubKey" = v.pub_key
|
||||
),
|
||||
upsert AS (
|
||||
INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate")
|
||||
SELECT sl."siteId", sl.bytes_in, sl.bytes_out, ${currentEpoch}::integer
|
||||
FROM site_lookup sl
|
||||
ON CONFLICT ("siteId") DO UPDATE SET
|
||||
"bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn",
|
||||
"bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut",
|
||||
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
|
||||
RETURNING "siteId"
|
||||
)
|
||||
SELECT u."siteId", s."orgId", s."pubKey"
|
||||
FROM upsert u
|
||||
INNER JOIN "sites" s ON s."siteId" = u."siteId"
|
||||
UPDATE sites
|
||||
SET
|
||||
"bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in,
|
||||
"bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out,
|
||||
"lastBandwidthUpdate" = ${currentTime}
|
||||
FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out)
|
||||
WHERE sites."pubKey" = v.pub_key
|
||||
RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey"
|
||||
`);
|
||||
}, `flush bandwidth chunk [${i}–${chunkEnd}]`);
|
||||
} catch (error) {
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import { db, newts, sites, targetHealthCheck, targets, sitePing, siteBandwidth } from "@server/db";
|
||||
import { db, newts, sites, targetHealthCheck, targets } from "@server/db";
|
||||
import {
|
||||
hasActiveConnections,
|
||||
getClientConfigVersion
|
||||
} from "#dynamic/routers/ws";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { Newt } from "@server/db";
|
||||
import { eq, lt, isNull, and, or, ne } from "drizzle-orm";
|
||||
import { eq, lt, isNull, and, or, ne, not } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
import { sendNewtSyncMessage } from "./sync";
|
||||
import { recordPing } from "./pingAccumulator";
|
||||
@@ -41,18 +41,17 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.select({
|
||||
siteId: sites.siteId,
|
||||
newtId: newts.newtId,
|
||||
lastPing: sitePing.lastPing
|
||||
lastPing: sites.lastPing
|
||||
})
|
||||
.from(sites)
|
||||
.innerJoin(newts, eq(newts.siteId, sites.siteId))
|
||||
.leftJoin(sitePing, eq(sitePing.siteId, sites.siteId))
|
||||
.where(
|
||||
and(
|
||||
eq(sites.online, true),
|
||||
eq(sites.type, "newt"),
|
||||
or(
|
||||
lt(sitePing.lastPing, twoMinutesAgo),
|
||||
isNull(sitePing.lastPing)
|
||||
lt(sites.lastPing, twoMinutesAgo),
|
||||
isNull(sites.lastPing)
|
||||
)
|
||||
)
|
||||
);
|
||||
@@ -113,11 +112,15 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.select({
|
||||
siteId: sites.siteId,
|
||||
online: sites.online,
|
||||
lastBandwidthUpdate: siteBandwidth.lastBandwidthUpdate
|
||||
lastBandwidthUpdate: sites.lastBandwidthUpdate
|
||||
})
|
||||
.from(sites)
|
||||
.innerJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId))
|
||||
.where(eq(sites.type, "wireguard"));
|
||||
.where(
|
||||
and(
|
||||
eq(sites.type, "wireguard"),
|
||||
not(isNull(sites.lastBandwidthUpdate))
|
||||
)
|
||||
);
|
||||
|
||||
const wireguardOfflineThreshold = Math.floor(
|
||||
(Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000
|
||||
@@ -125,7 +128,12 @@ export const startNewtOfflineChecker = (): void => {
|
||||
|
||||
// loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline
|
||||
for (const site of allWireguardSites) {
|
||||
if ((site.lastBandwidthUpdate ?? 0) < wireguardOfflineThreshold && site.online) {
|
||||
const lastBandwidthUpdate =
|
||||
new Date(site.lastBandwidthUpdate!).getTime() / 1000;
|
||||
if (
|
||||
lastBandwidthUpdate < wireguardOfflineThreshold &&
|
||||
site.online
|
||||
) {
|
||||
logger.info(
|
||||
`Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes`
|
||||
);
|
||||
@@ -134,7 +142,10 @@ export const startNewtOfflineChecker = (): void => {
|
||||
.update(sites)
|
||||
.set({ online: false })
|
||||
.where(eq(sites.siteId, site.siteId));
|
||||
} else if ((site.lastBandwidthUpdate ?? 0) >= wireguardOfflineThreshold && !site.online) {
|
||||
} else if (
|
||||
lastBandwidthUpdate >= wireguardOfflineThreshold &&
|
||||
!site.online
|
||||
) {
|
||||
logger.info(
|
||||
`Marking wireguard site ${site.siteId} online: recent bandwidth update`
|
||||
);
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { db, clients, clientBandwidth } from "@server/db";
|
||||
import { db } from "@server/db";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { clients } from "@server/db";
|
||||
import { eq, sql } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
|
||||
@@ -84,7 +85,7 @@ export async function flushBandwidthToDb(): Promise<void> {
|
||||
const snapshot = accumulator;
|
||||
accumulator = new Map<string, BandwidthAccumulator>();
|
||||
|
||||
const currentEpoch = Math.floor(Date.now() / 1000);
|
||||
const currentTime = new Date().toISOString();
|
||||
|
||||
// Sort by publicKey for consistent lock ordering across concurrent
|
||||
// writers — this is the same deadlock-prevention strategy used in the
|
||||
@@ -100,37 +101,19 @@ export async function flushBandwidthToDb(): Promise<void> {
|
||||
for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) {
|
||||
try {
|
||||
await withDeadlockRetry(async () => {
|
||||
// Find clientId by pubKey
|
||||
const [clientRow] = await db
|
||||
.select({ clientId: clients.clientId })
|
||||
.from(clients)
|
||||
.where(eq(clients.pubKey, publicKey))
|
||||
.limit(1);
|
||||
|
||||
if (!clientRow) {
|
||||
logger.warn(`No client found for pubKey ${publicKey}, skipping`);
|
||||
return;
|
||||
}
|
||||
|
||||
// Use atomic SQL increment to avoid the SELECT-then-UPDATE
|
||||
// anti-pattern and the races it would introduce.
|
||||
await db
|
||||
.insert(clientBandwidth)
|
||||
.values({
|
||||
clientId: clientRow.clientId,
|
||||
.update(clients)
|
||||
.set({
|
||||
// Note: bytesIn from peer goes to megabytesOut (data
|
||||
// sent to client) and bytesOut from peer goes to
|
||||
// megabytesIn (data received from client).
|
||||
megabytesOut: bytesIn,
|
||||
megabytesIn: bytesOut,
|
||||
lastBandwidthUpdate: currentEpoch
|
||||
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
|
||||
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
|
||||
lastBandwidthUpdate: currentTime
|
||||
})
|
||||
.onConflictDoUpdate({
|
||||
target: clientBandwidth.clientId,
|
||||
set: {
|
||||
megabytesOut: sql`COALESCE(${clientBandwidth.megabytesOut}, 0) + ${bytesIn}`,
|
||||
megabytesIn: sql`COALESCE(${clientBandwidth.megabytesIn}, 0) + ${bytesOut}`,
|
||||
lastBandwidthUpdate: currentEpoch
|
||||
}
|
||||
});
|
||||
.where(eq(clients.pubKey, publicKey));
|
||||
}, `flush bandwidth for client ${publicKey}`);
|
||||
} catch (error) {
|
||||
logger.error(
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { db } from "@server/db";
|
||||
import { sites, clients, olms, sitePing, clientPing } from "@server/db";
|
||||
import { inArray, sql } from "drizzle-orm";
|
||||
import { sites, clients, olms } from "@server/db";
|
||||
import { inArray } from "drizzle-orm";
|
||||
import logger from "@server/logger";
|
||||
|
||||
/**
|
||||
@@ -81,8 +81,11 @@ export function recordClientPing(
|
||||
/**
|
||||
* Flush all accumulated site pings to the database.
|
||||
*
|
||||
* For each batch: first upserts individual per-site timestamps into
|
||||
* `sitePing`, then bulk-updates `sites.online = true`.
|
||||
* Each batch of up to BATCH_SIZE rows is written with a **single** UPDATE
|
||||
* statement. We use the maximum timestamp across the batch so that `lastPing`
|
||||
* reflects the most recent ping seen for any site in the group. This avoids
|
||||
* the multi-statement transaction that previously created additional
|
||||
* row-lock ordering hazards.
|
||||
*/
|
||||
async function flushSitePingsToDb(): Promise<void> {
|
||||
if (pendingSitePings.size === 0) {
|
||||
@@ -100,25 +103,20 @@ async function flushSitePingsToDb(): Promise<void> {
|
||||
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
// Use the latest timestamp in the batch so that `lastPing` always
|
||||
// moves forward. Using a single timestamp for the whole batch means
|
||||
// we only ever need one UPDATE statement (no transaction).
|
||||
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
|
||||
const siteIds = batch.map(([id]) => id);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
const rows = batch.map(([siteId, ts]) => ({ siteId, lastPing: ts }));
|
||||
|
||||
// Step 1: Upsert ping timestamps into sitePing
|
||||
await db
|
||||
.insert(sitePing)
|
||||
.values(rows)
|
||||
.onConflictDoUpdate({
|
||||
target: sitePing.siteId,
|
||||
set: { lastPing: sql`excluded."lastPing"` }
|
||||
});
|
||||
|
||||
// Step 2: Update online status on sites
|
||||
await db
|
||||
.update(sites)
|
||||
.set({ online: true })
|
||||
.set({
|
||||
online: true,
|
||||
lastPing: maxTimestamp
|
||||
})
|
||||
.where(inArray(sites.siteId, siteIds));
|
||||
}, "flushSitePingsToDb");
|
||||
} catch (error) {
|
||||
@@ -141,8 +139,7 @@ async function flushSitePingsToDb(): Promise<void> {
|
||||
/**
|
||||
* Flush all accumulated client (OLM) pings to the database.
|
||||
*
|
||||
* For each batch: first upserts individual per-client timestamps into
|
||||
* `clientPing`, then bulk-updates `clients.online = true, archived = false`.
|
||||
* Same single-UPDATE-per-batch approach as `flushSitePingsToDb`.
|
||||
*/
|
||||
async function flushClientPingsToDb(): Promise<void> {
|
||||
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
||||
@@ -164,25 +161,18 @@ async function flushClientPingsToDb(): Promise<void> {
|
||||
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||
|
||||
const maxTimestamp = Math.max(...batch.map(([, ts]) => ts));
|
||||
const clientIds = batch.map(([id]) => id);
|
||||
|
||||
try {
|
||||
await withRetry(async () => {
|
||||
const rows = batch.map(([clientId, ts]) => ({ clientId, lastPing: ts }));
|
||||
|
||||
// Step 1: Upsert ping timestamps into clientPing
|
||||
await db
|
||||
.insert(clientPing)
|
||||
.values(rows)
|
||||
.onConflictDoUpdate({
|
||||
target: clientPing.clientId,
|
||||
set: { lastPing: sql`excluded."lastPing"` }
|
||||
});
|
||||
|
||||
// Step 2: Update online + unarchive on clients
|
||||
await db
|
||||
.update(clients)
|
||||
.set({ online: true, archived: false })
|
||||
.set({
|
||||
lastPing: maxTimestamp,
|
||||
online: true,
|
||||
archived: false
|
||||
})
|
||||
.where(inArray(clients.clientId, clientIds));
|
||||
}, "flushClientPingsToDb");
|
||||
} catch (error) {
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws";
|
||||
import { db } from "@server/db";
|
||||
import { MessageHandler } from "@server/routers/ws";
|
||||
import { clients, olms, Olm, clientPing } from "@server/db";
|
||||
import { eq, lt, isNull, and, or, inArray } from "drizzle-orm";
|
||||
import { clients, olms, Olm } from "@server/db";
|
||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
||||
import { recordClientPing } from "@server/routers/newt/pingAccumulator";
|
||||
import logger from "@server/logger";
|
||||
import { validateSessionToken } from "@server/auth/sessions/app";
|
||||
@@ -37,33 +37,21 @@ export const startOlmOfflineChecker = (): void => {
|
||||
// TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING
|
||||
|
||||
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
|
||||
const staleClientRows = await db
|
||||
.select({
|
||||
clientId: clients.clientId,
|
||||
olmId: clients.olmId,
|
||||
lastPing: clientPing.lastPing
|
||||
})
|
||||
.from(clients)
|
||||
.leftJoin(clientPing, eq(clientPing.clientId, clients.clientId))
|
||||
const offlineClients = await db
|
||||
.update(clients)
|
||||
.set({ online: false })
|
||||
.where(
|
||||
and(
|
||||
eq(clients.online, true),
|
||||
or(
|
||||
lt(clientPing.lastPing, twoMinutesAgo),
|
||||
isNull(clientPing.lastPing)
|
||||
lt(clients.lastPing, twoMinutesAgo),
|
||||
isNull(clients.lastPing)
|
||||
)
|
||||
)
|
||||
);
|
||||
)
|
||||
.returning();
|
||||
|
||||
if (staleClientRows.length > 0) {
|
||||
const staleClientIds = staleClientRows.map((c) => c.clientId);
|
||||
await db
|
||||
.update(clients)
|
||||
.set({ online: false })
|
||||
.where(inArray(clients.clientId, staleClientIds));
|
||||
}
|
||||
|
||||
for (const offlineClient of staleClientRows) {
|
||||
for (const offlineClient of offlineClients) {
|
||||
logger.info(
|
||||
`Kicking offline olm client ${offlineClient.clientId} due to inactivity`
|
||||
);
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { NextFunction, Request, Response } from "express";
|
||||
import { z } from "zod";
|
||||
import { db, sites, siteBandwidth } from "@server/db";
|
||||
import { eq, inArray } from "drizzle-orm";
|
||||
import { db, sites } from "@server/db";
|
||||
import { eq } from "drizzle-orm";
|
||||
import response from "@server/lib/response";
|
||||
import HttpCode from "@server/types/HttpCode";
|
||||
import createHttpError from "http-errors";
|
||||
@@ -60,17 +60,12 @@ export async function resetOrgBandwidth(
|
||||
}
|
||||
|
||||
await db
|
||||
.update(siteBandwidth)
|
||||
.update(sites)
|
||||
.set({
|
||||
megabytesIn: 0,
|
||||
megabytesOut: 0
|
||||
})
|
||||
.where(
|
||||
inArray(
|
||||
siteBandwidth.siteId,
|
||||
db.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
|
||||
)
|
||||
);
|
||||
.where(eq(sites.orgId, orgId));
|
||||
|
||||
return response(res, {
|
||||
data: {},
|
||||
|
||||
@@ -6,7 +6,6 @@ import {
|
||||
remoteExitNodes,
|
||||
roleSites,
|
||||
sites,
|
||||
siteBandwidth,
|
||||
userSites
|
||||
} from "@server/db";
|
||||
import cache from "#dynamic/lib/cache";
|
||||
@@ -156,8 +155,8 @@ function querySitesBase() {
|
||||
name: sites.name,
|
||||
pubKey: sites.pubKey,
|
||||
subnet: sites.subnet,
|
||||
megabytesIn: siteBandwidth.megabytesIn,
|
||||
megabytesOut: siteBandwidth.megabytesOut,
|
||||
megabytesIn: sites.megabytesIn,
|
||||
megabytesOut: sites.megabytesOut,
|
||||
orgName: orgs.name,
|
||||
type: sites.type,
|
||||
online: sites.online,
|
||||
@@ -176,8 +175,7 @@ function querySitesBase() {
|
||||
.leftJoin(
|
||||
remoteExitNodes,
|
||||
eq(remoteExitNodes.exitNodeId, sites.exitNodeId)
|
||||
)
|
||||
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
|
||||
);
|
||||
}
|
||||
|
||||
type SiteWithUpdateAvailable = Awaited<ReturnType<typeof querySitesBase>>[0] & {
|
||||
@@ -301,15 +299,9 @@ export async function listSites(
|
||||
.offset(pageSize * (page - 1))
|
||||
.orderBy(
|
||||
sort_by
|
||||
? (() => {
|
||||
const field =
|
||||
sort_by === "megabytesIn"
|
||||
? siteBandwidth.megabytesIn
|
||||
: sort_by === "megabytesOut"
|
||||
? siteBandwidth.megabytesOut
|
||||
: sites.name;
|
||||
return order === "asc" ? asc(field) : desc(field);
|
||||
})()
|
||||
? order === "asc"
|
||||
? asc(sites[sort_by])
|
||||
: desc(sites[sort_by])
|
||||
: asc(sites.name)
|
||||
);
|
||||
|
||||
|
||||
@@ -22,7 +22,6 @@ import m13 from "./scriptsPg/1.15.3";
|
||||
import m14 from "./scriptsPg/1.15.4";
|
||||
import m15 from "./scriptsPg/1.16.0";
|
||||
import m16 from "./scriptsPg/1.17.0";
|
||||
import m17 from "./scriptsPg/1.18.0";
|
||||
|
||||
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
|
||||
// EXCEPT FOR THE DATABASE AND THE SCHEMA
|
||||
@@ -44,8 +43,7 @@ const migrations = [
|
||||
{ version: "1.15.3", run: m13 },
|
||||
{ version: "1.15.4", run: m14 },
|
||||
{ version: "1.16.0", run: m15 },
|
||||
{ version: "1.17.0", run: m16 },
|
||||
{ version: "1.18.0", run: m17 }
|
||||
{ version: "1.17.0", run: m16 }
|
||||
// Add new migrations here as they are created
|
||||
] as {
|
||||
version: string;
|
||||
|
||||
@@ -40,7 +40,6 @@ import m34 from "./scriptsSqlite/1.15.3";
|
||||
import m35 from "./scriptsSqlite/1.15.4";
|
||||
import m36 from "./scriptsSqlite/1.16.0";
|
||||
import m37 from "./scriptsSqlite/1.17.0";
|
||||
import m38 from "./scriptsSqlite/1.18.0";
|
||||
|
||||
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
|
||||
// EXCEPT FOR THE DATABASE AND THE SCHEMA
|
||||
@@ -78,8 +77,7 @@ const migrations = [
|
||||
{ version: "1.15.3", run: m34 },
|
||||
{ version: "1.15.4", run: m35 },
|
||||
{ version: "1.16.0", run: m36 },
|
||||
{ version: "1.17.0", run: m37 },
|
||||
{ version: "1.18.0", run: m38 }
|
||||
{ version: "1.17.0", run: m37 }
|
||||
// Add new migrations here as they are created
|
||||
] as const;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user