mirror of
https://github.com/fosrl/pangolin.git
synced 2026-04-09 11:26:39 +00:00
Compare commits
5 Commits
1.17.0-s.3
...
breakout-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
48abb9e98c | ||
|
|
28ef5238c9 | ||
|
|
d948d2ec33 | ||
|
|
6b8a3c8d77 | ||
|
|
ba9794c067 |
1
.github/CODEOWNERS
vendored
Normal file
1
.github/CODEOWNERS
vendored
Normal file
@@ -0,0 +1 @@
|
|||||||
|
* @oschwartz10612 @miloschwartz
|
||||||
@@ -86,6 +86,8 @@ entryPoints:
|
|||||||
http:
|
http:
|
||||||
tls:
|
tls:
|
||||||
certResolver: "letsencrypt"
|
certResolver: "letsencrypt"
|
||||||
|
middlewares:
|
||||||
|
- crowdsec@file
|
||||||
encodedCharacters:
|
encodedCharacters:
|
||||||
allowEncodedSlash: true
|
allowEncodedSlash: true
|
||||||
allowEncodedQuestionMark: true
|
allowEncodedQuestionMark: true
|
||||||
|
|||||||
@@ -89,12 +89,8 @@ export const sites = pgTable("sites", {
|
|||||||
name: varchar("name").notNull(),
|
name: varchar("name").notNull(),
|
||||||
pubKey: varchar("pubKey"),
|
pubKey: varchar("pubKey"),
|
||||||
subnet: varchar("subnet"),
|
subnet: varchar("subnet"),
|
||||||
megabytesIn: real("bytesIn").default(0),
|
|
||||||
megabytesOut: real("bytesOut").default(0),
|
|
||||||
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
|
|
||||||
type: varchar("type").notNull(), // "newt" or "wireguard"
|
type: varchar("type").notNull(), // "newt" or "wireguard"
|
||||||
online: boolean("online").notNull().default(false),
|
online: boolean("online").notNull().default(false),
|
||||||
lastPing: integer("lastPing"),
|
|
||||||
address: varchar("address"),
|
address: varchar("address"),
|
||||||
endpoint: varchar("endpoint"),
|
endpoint: varchar("endpoint"),
|
||||||
publicKey: varchar("publicKey"),
|
publicKey: varchar("publicKey"),
|
||||||
@@ -729,10 +725,7 @@ export const clients = pgTable("clients", {
|
|||||||
name: varchar("name").notNull(),
|
name: varchar("name").notNull(),
|
||||||
pubKey: varchar("pubKey"),
|
pubKey: varchar("pubKey"),
|
||||||
subnet: varchar("subnet").notNull(),
|
subnet: varchar("subnet").notNull(),
|
||||||
megabytesIn: real("bytesIn"),
|
|
||||||
megabytesOut: real("bytesOut"),
|
|
||||||
lastBandwidthUpdate: varchar("lastBandwidthUpdate"),
|
|
||||||
lastPing: integer("lastPing"),
|
|
||||||
type: varchar("type").notNull(), // "olm"
|
type: varchar("type").notNull(), // "olm"
|
||||||
online: boolean("online").notNull().default(false),
|
online: boolean("online").notNull().default(false),
|
||||||
// endpoint: varchar("endpoint"),
|
// endpoint: varchar("endpoint"),
|
||||||
@@ -745,6 +738,42 @@ export const clients = pgTable("clients", {
|
|||||||
>()
|
>()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
export const sitePing = pgTable("sitePing", {
|
||||||
|
siteId: integer("siteId")
|
||||||
|
.primaryKey()
|
||||||
|
.references(() => sites.siteId, { onDelete: "cascade" })
|
||||||
|
.notNull(),
|
||||||
|
lastPing: integer("lastPing")
|
||||||
|
});
|
||||||
|
|
||||||
|
export const siteBandwidth = pgTable("siteBandwidth", {
|
||||||
|
siteId: integer("siteId")
|
||||||
|
.primaryKey()
|
||||||
|
.references(() => sites.siteId, { onDelete: "cascade" })
|
||||||
|
.notNull(),
|
||||||
|
megabytesIn: real("bytesIn").default(0),
|
||||||
|
megabytesOut: real("bytesOut").default(0),
|
||||||
|
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
|
||||||
|
});
|
||||||
|
|
||||||
|
export const clientPing = pgTable("clientPing", {
|
||||||
|
clientId: integer("clientId")
|
||||||
|
.primaryKey()
|
||||||
|
.references(() => clients.clientId, { onDelete: "cascade" })
|
||||||
|
.notNull(),
|
||||||
|
lastPing: integer("lastPing")
|
||||||
|
});
|
||||||
|
|
||||||
|
export const clientBandwidth = pgTable("clientBandwidth", {
|
||||||
|
clientId: integer("clientId")
|
||||||
|
.primaryKey()
|
||||||
|
.references(() => clients.clientId, { onDelete: "cascade" })
|
||||||
|
.notNull(),
|
||||||
|
megabytesIn: real("bytesIn"),
|
||||||
|
megabytesOut: real("bytesOut"),
|
||||||
|
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
|
||||||
|
});
|
||||||
|
|
||||||
export const clientSitesAssociationsCache = pgTable(
|
export const clientSitesAssociationsCache = pgTable(
|
||||||
"clientSitesAssociationsCache",
|
"clientSitesAssociationsCache",
|
||||||
{
|
{
|
||||||
@@ -1106,3 +1135,7 @@ export type RequestAuditLog = InferSelectModel<typeof requestAuditLog>;
|
|||||||
export type RoundTripMessageTracker = InferSelectModel<
|
export type RoundTripMessageTracker = InferSelectModel<
|
||||||
typeof roundTripMessageTracker
|
typeof roundTripMessageTracker
|
||||||
>;
|
>;
|
||||||
|
export type SitePing = typeof sitePing.$inferSelect;
|
||||||
|
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
|
||||||
|
export type ClientPing = typeof clientPing.$inferSelect;
|
||||||
|
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;
|
||||||
|
|||||||
@@ -95,12 +95,8 @@ export const sites = sqliteTable("sites", {
|
|||||||
name: text("name").notNull(),
|
name: text("name").notNull(),
|
||||||
pubKey: text("pubKey"),
|
pubKey: text("pubKey"),
|
||||||
subnet: text("subnet"),
|
subnet: text("subnet"),
|
||||||
megabytesIn: integer("bytesIn").default(0),
|
|
||||||
megabytesOut: integer("bytesOut").default(0),
|
|
||||||
lastBandwidthUpdate: text("lastBandwidthUpdate"),
|
|
||||||
type: text("type").notNull(), // "newt" or "wireguard"
|
type: text("type").notNull(), // "newt" or "wireguard"
|
||||||
online: integer("online", { mode: "boolean" }).notNull().default(false),
|
online: integer("online", { mode: "boolean" }).notNull().default(false),
|
||||||
lastPing: integer("lastPing"),
|
|
||||||
|
|
||||||
// exit node stuff that is how to connect to the site when it has a wg server
|
// exit node stuff that is how to connect to the site when it has a wg server
|
||||||
address: text("address"), // this is the address of the wireguard interface in newt
|
address: text("address"), // this is the address of the wireguard interface in newt
|
||||||
@@ -399,10 +395,7 @@ export const clients = sqliteTable("clients", {
|
|||||||
pubKey: text("pubKey"),
|
pubKey: text("pubKey"),
|
||||||
olmId: text("olmId"), // to lock it to a specific olm optionally
|
olmId: text("olmId"), // to lock it to a specific olm optionally
|
||||||
subnet: text("subnet").notNull(),
|
subnet: text("subnet").notNull(),
|
||||||
megabytesIn: integer("bytesIn"),
|
|
||||||
megabytesOut: integer("bytesOut"),
|
|
||||||
lastBandwidthUpdate: text("lastBandwidthUpdate"),
|
|
||||||
lastPing: integer("lastPing"),
|
|
||||||
type: text("type").notNull(), // "olm"
|
type: text("type").notNull(), // "olm"
|
||||||
online: integer("online", { mode: "boolean" }).notNull().default(false),
|
online: integer("online", { mode: "boolean" }).notNull().default(false),
|
||||||
// endpoint: text("endpoint"),
|
// endpoint: text("endpoint"),
|
||||||
@@ -414,6 +407,42 @@ export const clients = sqliteTable("clients", {
|
|||||||
>()
|
>()
|
||||||
});
|
});
|
||||||
|
|
||||||
|
export const sitePing = sqliteTable("sitePing", {
|
||||||
|
siteId: integer("siteId")
|
||||||
|
.primaryKey()
|
||||||
|
.references(() => sites.siteId, { onDelete: "cascade" })
|
||||||
|
.notNull(),
|
||||||
|
lastPing: integer("lastPing")
|
||||||
|
});
|
||||||
|
|
||||||
|
export const siteBandwidth = sqliteTable("siteBandwidth", {
|
||||||
|
siteId: integer("siteId")
|
||||||
|
.primaryKey()
|
||||||
|
.references(() => sites.siteId, { onDelete: "cascade" })
|
||||||
|
.notNull(),
|
||||||
|
megabytesIn: integer("bytesIn").default(0),
|
||||||
|
megabytesOut: integer("bytesOut").default(0),
|
||||||
|
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
|
||||||
|
});
|
||||||
|
|
||||||
|
export const clientPing = sqliteTable("clientPing", {
|
||||||
|
clientId: integer("clientId")
|
||||||
|
.primaryKey()
|
||||||
|
.references(() => clients.clientId, { onDelete: "cascade" })
|
||||||
|
.notNull(),
|
||||||
|
lastPing: integer("lastPing")
|
||||||
|
});
|
||||||
|
|
||||||
|
export const clientBandwidth = sqliteTable("clientBandwidth", {
|
||||||
|
clientId: integer("clientId")
|
||||||
|
.primaryKey()
|
||||||
|
.references(() => clients.clientId, { onDelete: "cascade" })
|
||||||
|
.notNull(),
|
||||||
|
megabytesIn: integer("bytesIn"),
|
||||||
|
megabytesOut: integer("bytesOut"),
|
||||||
|
lastBandwidthUpdate: integer("lastBandwidthUpdate") // unix epoch
|
||||||
|
});
|
||||||
|
|
||||||
export const clientSitesAssociationsCache = sqliteTable(
|
export const clientSitesAssociationsCache = sqliteTable(
|
||||||
"clientSitesAssociationsCache",
|
"clientSitesAssociationsCache",
|
||||||
{
|
{
|
||||||
@@ -1209,3 +1238,7 @@ export type DeviceWebAuthCode = InferSelectModel<typeof deviceWebAuthCodes>;
|
|||||||
export type RoundTripMessageTracker = InferSelectModel<
|
export type RoundTripMessageTracker = InferSelectModel<
|
||||||
typeof roundTripMessageTracker
|
typeof roundTripMessageTracker
|
||||||
>;
|
>;
|
||||||
|
export type SitePing = typeof sitePing.$inferSelect;
|
||||||
|
export type SiteBandwidth = typeof siteBandwidth.$inferSelect;
|
||||||
|
export type ClientPing = typeof clientPing.$inferSelect;
|
||||||
|
export type ClientBandwidth = typeof clientBandwidth.$inferSelect;
|
||||||
|
|||||||
@@ -3,7 +3,7 @@ import config from "./config";
|
|||||||
import { getHostMeta } from "./hostMeta";
|
import { getHostMeta } from "./hostMeta";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { apiKeys, db, roles, siteResources } from "@server/db";
|
import { apiKeys, db, roles, siteResources } from "@server/db";
|
||||||
import { sites, users, orgs, resources, clients, idp } from "@server/db";
|
import { sites, users, orgs, resources, clients, idp, siteBandwidth } from "@server/db";
|
||||||
import { eq, count, notInArray, and, isNotNull, isNull } from "drizzle-orm";
|
import { eq, count, notInArray, and, isNotNull, isNull } from "drizzle-orm";
|
||||||
import { APP_VERSION } from "./consts";
|
import { APP_VERSION } from "./consts";
|
||||||
import crypto from "crypto";
|
import crypto from "crypto";
|
||||||
@@ -150,12 +150,13 @@ class TelemetryClient {
|
|||||||
const siteDetails = await db
|
const siteDetails = await db
|
||||||
.select({
|
.select({
|
||||||
siteName: sites.name,
|
siteName: sites.name,
|
||||||
megabytesIn: sites.megabytesIn,
|
megabytesIn: siteBandwidth.megabytesIn,
|
||||||
megabytesOut: sites.megabytesOut,
|
megabytesOut: siteBandwidth.megabytesOut,
|
||||||
type: sites.type,
|
type: sites.type,
|
||||||
online: sites.online
|
online: sites.online
|
||||||
})
|
})
|
||||||
.from(sites);
|
.from(sites)
|
||||||
|
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
|
||||||
|
|
||||||
const supporterKey = config.getSupporterData();
|
const supporterKey = config.getSupporterData();
|
||||||
|
|
||||||
|
|||||||
@@ -479,10 +479,7 @@ export async function getTraefikConfig(
|
|||||||
|
|
||||||
// TODO: HOW TO HANDLE ^^^^^^ BETTER
|
// TODO: HOW TO HANDLE ^^^^^^ BETTER
|
||||||
const anySitesOnline = targets.some(
|
const anySitesOnline = targets.some(
|
||||||
(target) =>
|
(target) => target.site.online
|
||||||
target.site.online ||
|
|
||||||
target.site.type === "local" ||
|
|
||||||
target.site.type === "wireguard"
|
|
||||||
);
|
);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
@@ -610,10 +607,7 @@ export async function getTraefikConfig(
|
|||||||
servers: (() => {
|
servers: (() => {
|
||||||
// Check if any sites are online
|
// Check if any sites are online
|
||||||
const anySitesOnline = targets.some(
|
const anySitesOnline = targets.some(
|
||||||
(target) =>
|
(target) => target.site.online
|
||||||
target.site.online ||
|
|
||||||
target.site.type === "local" ||
|
|
||||||
target.site.type === "wireguard"
|
|
||||||
);
|
);
|
||||||
|
|
||||||
return targets
|
return targets
|
||||||
|
|||||||
@@ -671,10 +671,7 @@ export async function getTraefikConfig(
|
|||||||
|
|
||||||
// TODO: HOW TO HANDLE ^^^^^^ BETTER
|
// TODO: HOW TO HANDLE ^^^^^^ BETTER
|
||||||
const anySitesOnline = targets.some(
|
const anySitesOnline = targets.some(
|
||||||
(target) =>
|
(target) => target.site.online
|
||||||
target.site.online ||
|
|
||||||
target.site.type === "local" ||
|
|
||||||
target.site.type === "wireguard"
|
|
||||||
);
|
);
|
||||||
|
|
||||||
return (
|
return (
|
||||||
@@ -802,10 +799,7 @@ export async function getTraefikConfig(
|
|||||||
servers: (() => {
|
servers: (() => {
|
||||||
// Check if any sites are online
|
// Check if any sites are online
|
||||||
const anySitesOnline = targets.some(
|
const anySitesOnline = targets.some(
|
||||||
(target) =>
|
(target) => target.site.online
|
||||||
target.site.online ||
|
|
||||||
target.site.type === "local" ||
|
|
||||||
target.site.type === "wireguard"
|
|
||||||
);
|
);
|
||||||
|
|
||||||
return targets
|
return targets
|
||||||
|
|||||||
@@ -18,10 +18,11 @@ import {
|
|||||||
subscriptionItems,
|
subscriptionItems,
|
||||||
usage,
|
usage,
|
||||||
sites,
|
sites,
|
||||||
|
siteBandwidth,
|
||||||
customers,
|
customers,
|
||||||
orgs
|
orgs
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import { eq, and } from "drizzle-orm";
|
import { eq, and, inArray } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { getFeatureIdByMetricId, getFeatureIdByPriceId } from "@server/lib/billing/features";
|
import { getFeatureIdByMetricId, getFeatureIdByPriceId } from "@server/lib/billing/features";
|
||||||
import stripe from "#private/lib/stripe";
|
import stripe from "#private/lib/stripe";
|
||||||
@@ -253,14 +254,19 @@ export async function handleSubscriptionUpdated(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Also reset the sites to 0
|
// Also reset the site bandwidth to 0
|
||||||
await trx
|
await trx
|
||||||
.update(sites)
|
.update(siteBandwidth)
|
||||||
.set({
|
.set({
|
||||||
megabytesIn: 0,
|
megabytesIn: 0,
|
||||||
megabytesOut: 0
|
megabytesOut: 0
|
||||||
})
|
})
|
||||||
.where(eq(sites.orgId, orgId));
|
.where(
|
||||||
|
inArray(
|
||||||
|
siteBandwidth.siteId,
|
||||||
|
trx.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
|
||||||
|
)
|
||||||
|
);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
import {
|
import {
|
||||||
|
clientBandwidth,
|
||||||
clients,
|
clients,
|
||||||
clientSitesAssociationsCache,
|
clientSitesAssociationsCache,
|
||||||
currentFingerprint,
|
currentFingerprint,
|
||||||
@@ -180,8 +181,8 @@ function queryClientsBase() {
|
|||||||
name: clients.name,
|
name: clients.name,
|
||||||
pubKey: clients.pubKey,
|
pubKey: clients.pubKey,
|
||||||
subnet: clients.subnet,
|
subnet: clients.subnet,
|
||||||
megabytesIn: clients.megabytesIn,
|
megabytesIn: clientBandwidth.megabytesIn,
|
||||||
megabytesOut: clients.megabytesOut,
|
megabytesOut: clientBandwidth.megabytesOut,
|
||||||
orgName: orgs.name,
|
orgName: orgs.name,
|
||||||
type: clients.type,
|
type: clients.type,
|
||||||
online: clients.online,
|
online: clients.online,
|
||||||
@@ -200,7 +201,8 @@ function queryClientsBase() {
|
|||||||
.leftJoin(orgs, eq(clients.orgId, orgs.orgId))
|
.leftJoin(orgs, eq(clients.orgId, orgs.orgId))
|
||||||
.leftJoin(olms, eq(clients.clientId, olms.clientId))
|
.leftJoin(olms, eq(clients.clientId, olms.clientId))
|
||||||
.leftJoin(users, eq(clients.userId, users.userId))
|
.leftJoin(users, eq(clients.userId, users.userId))
|
||||||
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId));
|
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId))
|
||||||
|
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
|
||||||
}
|
}
|
||||||
|
|
||||||
async function getSiteAssociations(clientIds: number[]) {
|
async function getSiteAssociations(clientIds: number[]) {
|
||||||
@@ -367,9 +369,15 @@ export async function listClients(
|
|||||||
.offset(pageSize * (page - 1))
|
.offset(pageSize * (page - 1))
|
||||||
.orderBy(
|
.orderBy(
|
||||||
sort_by
|
sort_by
|
||||||
? order === "asc"
|
? (() => {
|
||||||
? asc(clients[sort_by])
|
const field =
|
||||||
: desc(clients[sort_by])
|
sort_by === "megabytesIn"
|
||||||
|
? clientBandwidth.megabytesIn
|
||||||
|
: sort_by === "megabytesOut"
|
||||||
|
? clientBandwidth.megabytesOut
|
||||||
|
: clients.name;
|
||||||
|
return order === "asc" ? asc(field) : desc(field);
|
||||||
|
})()
|
||||||
: asc(clients.name)
|
: asc(clients.name)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { build } from "@server/build";
|
import { build } from "@server/build";
|
||||||
import {
|
import {
|
||||||
|
clientBandwidth,
|
||||||
clients,
|
clients,
|
||||||
currentFingerprint,
|
currentFingerprint,
|
||||||
db,
|
db,
|
||||||
@@ -211,8 +212,8 @@ function queryUserDevicesBase() {
|
|||||||
name: clients.name,
|
name: clients.name,
|
||||||
pubKey: clients.pubKey,
|
pubKey: clients.pubKey,
|
||||||
subnet: clients.subnet,
|
subnet: clients.subnet,
|
||||||
megabytesIn: clients.megabytesIn,
|
megabytesIn: clientBandwidth.megabytesIn,
|
||||||
megabytesOut: clients.megabytesOut,
|
megabytesOut: clientBandwidth.megabytesOut,
|
||||||
orgName: orgs.name,
|
orgName: orgs.name,
|
||||||
type: clients.type,
|
type: clients.type,
|
||||||
online: clients.online,
|
online: clients.online,
|
||||||
@@ -239,7 +240,8 @@ function queryUserDevicesBase() {
|
|||||||
.leftJoin(orgs, eq(clients.orgId, orgs.orgId))
|
.leftJoin(orgs, eq(clients.orgId, orgs.orgId))
|
||||||
.leftJoin(olms, eq(clients.clientId, olms.clientId))
|
.leftJoin(olms, eq(clients.clientId, olms.clientId))
|
||||||
.leftJoin(users, eq(clients.userId, users.userId))
|
.leftJoin(users, eq(clients.userId, users.userId))
|
||||||
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId));
|
.leftJoin(currentFingerprint, eq(olms.olmId, currentFingerprint.olmId))
|
||||||
|
.leftJoin(clientBandwidth, eq(clientBandwidth.clientId, clients.clientId));
|
||||||
}
|
}
|
||||||
|
|
||||||
type OlmWithUpdateAvailable = Awaited<
|
type OlmWithUpdateAvailable = Awaited<
|
||||||
@@ -427,9 +429,15 @@ export async function listUserDevices(
|
|||||||
.offset(pageSize * (page - 1))
|
.offset(pageSize * (page - 1))
|
||||||
.orderBy(
|
.orderBy(
|
||||||
sort_by
|
sort_by
|
||||||
? order === "asc"
|
? (() => {
|
||||||
? asc(clients[sort_by])
|
const field =
|
||||||
: desc(clients[sort_by])
|
sort_by === "megabytesIn"
|
||||||
|
? clientBandwidth.megabytesIn
|
||||||
|
: sort_by === "megabytesOut"
|
||||||
|
? clientBandwidth.megabytesOut
|
||||||
|
: clients.name;
|
||||||
|
return order === "asc" ? asc(field) : desc(field);
|
||||||
|
})()
|
||||||
: asc(clients.clientId)
|
: asc(clients.clientId)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -122,7 +122,7 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
|||||||
const snapshot = accumulator;
|
const snapshot = accumulator;
|
||||||
accumulator = new Map<string, AccumulatorEntry>();
|
accumulator = new Map<string, AccumulatorEntry>();
|
||||||
|
|
||||||
const currentTime = new Date().toISOString();
|
const currentEpoch = Math.floor(Date.now() / 1000);
|
||||||
|
|
||||||
// Sort by publicKey for consistent lock ordering across concurrent
|
// Sort by publicKey for consistent lock ordering across concurrent
|
||||||
// writers — deadlock-prevention strategy.
|
// writers — deadlock-prevention strategy.
|
||||||
@@ -157,33 +157,52 @@ export async function flushSiteBandwidthToDb(): Promise<void> {
|
|||||||
orgId: string;
|
orgId: string;
|
||||||
pubKey: string;
|
pubKey: string;
|
||||||
}>(sql`
|
}>(sql`
|
||||||
UPDATE sites
|
WITH upsert AS (
|
||||||
SET
|
INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate")
|
||||||
"bytesOut" = COALESCE("bytesOut", 0) + ${bytesIn},
|
SELECT s."siteId", ${bytesIn}, ${bytesOut}, ${currentEpoch}
|
||||||
"bytesIn" = COALESCE("bytesIn", 0) + ${bytesOut},
|
FROM "sites" s WHERE s."pubKey" = ${publicKey}
|
||||||
"lastBandwidthUpdate" = ${currentTime}
|
ON CONFLICT ("siteId") DO UPDATE SET
|
||||||
WHERE "pubKey" = ${publicKey}
|
"bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn",
|
||||||
RETURNING "orgId", "pubKey"
|
"bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut",
|
||||||
|
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
|
||||||
|
RETURNING "siteId"
|
||||||
|
)
|
||||||
|
SELECT u."siteId", s."orgId", s."pubKey"
|
||||||
|
FROM upsert u
|
||||||
|
INNER JOIN "sites" s ON s."siteId" = u."siteId"
|
||||||
`);
|
`);
|
||||||
results.push(...result);
|
results.push(...result);
|
||||||
}
|
}
|
||||||
return results;
|
return results;
|
||||||
}
|
}
|
||||||
|
|
||||||
// PostgreSQL: batch UPDATE … FROM (VALUES …) — single round-trip per chunk.
|
// PostgreSQL: batch UPSERT via CTE — single round-trip per chunk.
|
||||||
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
|
const valuesList = chunk.map(([publicKey, { bytesIn, bytesOut }]) =>
|
||||||
sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)`
|
sql`(${publicKey}::text, ${bytesIn}::real, ${bytesOut}::real)`
|
||||||
);
|
);
|
||||||
const valuesClause = sql.join(valuesList, sql`, `);
|
const valuesClause = sql.join(valuesList, sql`, `);
|
||||||
return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
|
return dbQueryRows<{ orgId: string; pubKey: string }>(sql`
|
||||||
UPDATE sites
|
WITH vals(pub_key, bytes_in, bytes_out) AS (
|
||||||
SET
|
VALUES ${valuesClause}
|
||||||
"bytesOut" = COALESCE("bytesOut", 0) + v.bytes_in,
|
),
|
||||||
"bytesIn" = COALESCE("bytesIn", 0) + v.bytes_out,
|
site_lookup AS (
|
||||||
"lastBandwidthUpdate" = ${currentTime}
|
SELECT s."siteId", s."orgId", s."pubKey", v.bytes_in, v.bytes_out
|
||||||
FROM (VALUES ${valuesClause}) AS v(pub_key, bytes_in, bytes_out)
|
FROM vals v
|
||||||
WHERE sites."pubKey" = v.pub_key
|
INNER JOIN "sites" s ON s."pubKey" = v.pub_key
|
||||||
RETURNING sites."orgId" AS "orgId", sites."pubKey" AS "pubKey"
|
),
|
||||||
|
upsert AS (
|
||||||
|
INSERT INTO "siteBandwidth" ("siteId", "bytesIn", "bytesOut", "lastBandwidthUpdate")
|
||||||
|
SELECT sl."siteId", sl.bytes_in, sl.bytes_out, ${currentEpoch}::integer
|
||||||
|
FROM site_lookup sl
|
||||||
|
ON CONFLICT ("siteId") DO UPDATE SET
|
||||||
|
"bytesIn" = COALESCE("siteBandwidth"."bytesIn", 0) + EXCLUDED."bytesIn",
|
||||||
|
"bytesOut" = COALESCE("siteBandwidth"."bytesOut", 0) + EXCLUDED."bytesOut",
|
||||||
|
"lastBandwidthUpdate" = EXCLUDED."lastBandwidthUpdate"
|
||||||
|
RETURNING "siteId"
|
||||||
|
)
|
||||||
|
SELECT u."siteId", s."orgId", s."pubKey"
|
||||||
|
FROM upsert u
|
||||||
|
INNER JOIN "sites" s ON s."siteId" = u."siteId"
|
||||||
`);
|
`);
|
||||||
}, `flush bandwidth chunk [${i}–${chunkEnd}]`);
|
}, `flush bandwidth chunk [${i}–${chunkEnd}]`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|||||||
@@ -1,11 +1,11 @@
|
|||||||
import { db, newts, sites, targetHealthCheck, targets } from "@server/db";
|
import { db, newts, sites, targetHealthCheck, targets, sitePing, siteBandwidth } from "@server/db";
|
||||||
import {
|
import {
|
||||||
hasActiveConnections,
|
hasActiveConnections,
|
||||||
getClientConfigVersion
|
getClientConfigVersion
|
||||||
} from "#dynamic/routers/ws";
|
} from "#dynamic/routers/ws";
|
||||||
import { MessageHandler } from "@server/routers/ws";
|
import { MessageHandler } from "@server/routers/ws";
|
||||||
import { Newt } from "@server/db";
|
import { Newt } from "@server/db";
|
||||||
import { eq, lt, isNull, and, or, ne, not } from "drizzle-orm";
|
import { eq, lt, isNull, and, or, ne } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { sendNewtSyncMessage } from "./sync";
|
import { sendNewtSyncMessage } from "./sync";
|
||||||
import { recordPing } from "./pingAccumulator";
|
import { recordPing } from "./pingAccumulator";
|
||||||
@@ -41,17 +41,18 @@ export const startNewtOfflineChecker = (): void => {
|
|||||||
.select({
|
.select({
|
||||||
siteId: sites.siteId,
|
siteId: sites.siteId,
|
||||||
newtId: newts.newtId,
|
newtId: newts.newtId,
|
||||||
lastPing: sites.lastPing
|
lastPing: sitePing.lastPing
|
||||||
})
|
})
|
||||||
.from(sites)
|
.from(sites)
|
||||||
.innerJoin(newts, eq(newts.siteId, sites.siteId))
|
.innerJoin(newts, eq(newts.siteId, sites.siteId))
|
||||||
|
.leftJoin(sitePing, eq(sitePing.siteId, sites.siteId))
|
||||||
.where(
|
.where(
|
||||||
and(
|
and(
|
||||||
eq(sites.online, true),
|
eq(sites.online, true),
|
||||||
eq(sites.type, "newt"),
|
eq(sites.type, "newt"),
|
||||||
or(
|
or(
|
||||||
lt(sites.lastPing, twoMinutesAgo),
|
lt(sitePing.lastPing, twoMinutesAgo),
|
||||||
isNull(sites.lastPing)
|
isNull(sitePing.lastPing)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -112,15 +113,11 @@ export const startNewtOfflineChecker = (): void => {
|
|||||||
.select({
|
.select({
|
||||||
siteId: sites.siteId,
|
siteId: sites.siteId,
|
||||||
online: sites.online,
|
online: sites.online,
|
||||||
lastBandwidthUpdate: sites.lastBandwidthUpdate
|
lastBandwidthUpdate: siteBandwidth.lastBandwidthUpdate
|
||||||
})
|
})
|
||||||
.from(sites)
|
.from(sites)
|
||||||
.where(
|
.innerJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId))
|
||||||
and(
|
.where(eq(sites.type, "wireguard"));
|
||||||
eq(sites.type, "wireguard"),
|
|
||||||
not(isNull(sites.lastBandwidthUpdate))
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
const wireguardOfflineThreshold = Math.floor(
|
const wireguardOfflineThreshold = Math.floor(
|
||||||
(Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000
|
(Date.now() - OFFLINE_THRESHOLD_BANDWIDTH_MS) / 1000
|
||||||
@@ -128,12 +125,7 @@ export const startNewtOfflineChecker = (): void => {
|
|||||||
|
|
||||||
// loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline
|
// loop over each one. If its offline and there is a new update then mark it online. If its online and there is no update then mark it offline
|
||||||
for (const site of allWireguardSites) {
|
for (const site of allWireguardSites) {
|
||||||
const lastBandwidthUpdate =
|
if ((site.lastBandwidthUpdate ?? 0) < wireguardOfflineThreshold && site.online) {
|
||||||
new Date(site.lastBandwidthUpdate!).getTime() / 1000;
|
|
||||||
if (
|
|
||||||
lastBandwidthUpdate < wireguardOfflineThreshold &&
|
|
||||||
site.online
|
|
||||||
) {
|
|
||||||
logger.info(
|
logger.info(
|
||||||
`Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes`
|
`Marking wireguard site ${site.siteId} offline: no bandwidth update in over ${OFFLINE_THRESHOLD_BANDWIDTH_MS / 60000} minutes`
|
||||||
);
|
);
|
||||||
@@ -142,10 +134,7 @@ export const startNewtOfflineChecker = (): void => {
|
|||||||
.update(sites)
|
.update(sites)
|
||||||
.set({ online: false })
|
.set({ online: false })
|
||||||
.where(eq(sites.siteId, site.siteId));
|
.where(eq(sites.siteId, site.siteId));
|
||||||
} else if (
|
} else if ((site.lastBandwidthUpdate ?? 0) >= wireguardOfflineThreshold && !site.online) {
|
||||||
lastBandwidthUpdate >= wireguardOfflineThreshold &&
|
|
||||||
!site.online
|
|
||||||
) {
|
|
||||||
logger.info(
|
logger.info(
|
||||||
`Marking wireguard site ${site.siteId} online: recent bandwidth update`
|
`Marking wireguard site ${site.siteId} online: recent bandwidth update`
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import { db } from "@server/db";
|
import { db, clients, clientBandwidth } from "@server/db";
|
||||||
import { MessageHandler } from "@server/routers/ws";
|
import { MessageHandler } from "@server/routers/ws";
|
||||||
import { clients } from "@server/db";
|
|
||||||
import { eq, sql } from "drizzle-orm";
|
import { eq, sql } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
|
|
||||||
@@ -85,7 +84,7 @@ export async function flushBandwidthToDb(): Promise<void> {
|
|||||||
const snapshot = accumulator;
|
const snapshot = accumulator;
|
||||||
accumulator = new Map<string, BandwidthAccumulator>();
|
accumulator = new Map<string, BandwidthAccumulator>();
|
||||||
|
|
||||||
const currentTime = new Date().toISOString();
|
const currentEpoch = Math.floor(Date.now() / 1000);
|
||||||
|
|
||||||
// Sort by publicKey for consistent lock ordering across concurrent
|
// Sort by publicKey for consistent lock ordering across concurrent
|
||||||
// writers — this is the same deadlock-prevention strategy used in the
|
// writers — this is the same deadlock-prevention strategy used in the
|
||||||
@@ -101,19 +100,37 @@ export async function flushBandwidthToDb(): Promise<void> {
|
|||||||
for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) {
|
for (const [publicKey, { bytesIn, bytesOut }] of sortedEntries) {
|
||||||
try {
|
try {
|
||||||
await withDeadlockRetry(async () => {
|
await withDeadlockRetry(async () => {
|
||||||
// Use atomic SQL increment to avoid the SELECT-then-UPDATE
|
// Find clientId by pubKey
|
||||||
// anti-pattern and the races it would introduce.
|
const [clientRow] = await db
|
||||||
|
.select({ clientId: clients.clientId })
|
||||||
|
.from(clients)
|
||||||
|
.where(eq(clients.pubKey, publicKey))
|
||||||
|
.limit(1);
|
||||||
|
|
||||||
|
if (!clientRow) {
|
||||||
|
logger.warn(`No client found for pubKey ${publicKey}, skipping`);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
await db
|
await db
|
||||||
.update(clients)
|
.insert(clientBandwidth)
|
||||||
.set({
|
.values({
|
||||||
|
clientId: clientRow.clientId,
|
||||||
// Note: bytesIn from peer goes to megabytesOut (data
|
// Note: bytesIn from peer goes to megabytesOut (data
|
||||||
// sent to client) and bytesOut from peer goes to
|
// sent to client) and bytesOut from peer goes to
|
||||||
// megabytesIn (data received from client).
|
// megabytesIn (data received from client).
|
||||||
megabytesOut: sql`COALESCE(${clients.megabytesOut}, 0) + ${bytesIn}`,
|
megabytesOut: bytesIn,
|
||||||
megabytesIn: sql`COALESCE(${clients.megabytesIn}, 0) + ${bytesOut}`,
|
megabytesIn: bytesOut,
|
||||||
lastBandwidthUpdate: currentTime
|
lastBandwidthUpdate: currentEpoch
|
||||||
})
|
})
|
||||||
.where(eq(clients.pubKey, publicKey));
|
.onConflictDoUpdate({
|
||||||
|
target: clientBandwidth.clientId,
|
||||||
|
set: {
|
||||||
|
megabytesOut: sql`COALESCE(${clientBandwidth.megabytesOut}, 0) + ${bytesIn}`,
|
||||||
|
megabytesIn: sql`COALESCE(${clientBandwidth.megabytesIn}, 0) + ${bytesOut}`,
|
||||||
|
lastBandwidthUpdate: currentEpoch
|
||||||
|
}
|
||||||
|
});
|
||||||
}, `flush bandwidth for client ${publicKey}`);
|
}, `flush bandwidth for client ${publicKey}`);
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { sites, clients, olms } from "@server/db";
|
import { sites, clients, olms, sitePing, clientPing } from "@server/db";
|
||||||
import { eq, inArray } from "drizzle-orm";
|
import { inArray, sql } from "drizzle-orm";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -21,7 +21,7 @@ import logger from "@server/logger";
|
|||||||
*/
|
*/
|
||||||
|
|
||||||
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
const FLUSH_INTERVAL_MS = 10_000; // Flush every 10 seconds
|
||||||
const MAX_RETRIES = 2;
|
const MAX_RETRIES = 5;
|
||||||
const BASE_DELAY_MS = 50;
|
const BASE_DELAY_MS = 50;
|
||||||
|
|
||||||
// ── Site (newt) pings ──────────────────────────────────────────────────
|
// ── Site (newt) pings ──────────────────────────────────────────────────
|
||||||
@@ -36,6 +36,14 @@ const pendingOlmArchiveResets: Set<string> = new Set();
|
|||||||
|
|
||||||
let flushTimer: NodeJS.Timeout | null = null;
|
let flushTimer: NodeJS.Timeout | null = null;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Guard that prevents two flush cycles from running concurrently.
|
||||||
|
* setInterval does not await async callbacks, so without this a slow flush
|
||||||
|
* (e.g. due to DB latency) would overlap with the next scheduled cycle and
|
||||||
|
* the two concurrent bulk UPDATEs would deadlock each other.
|
||||||
|
*/
|
||||||
|
let isFlushing = false;
|
||||||
|
|
||||||
// ── Public API ─────────────────────────────────────────────────────────
|
// ── Public API ─────────────────────────────────────────────────────────
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -72,6 +80,9 @@ export function recordClientPing(
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush all accumulated site pings to the database.
|
* Flush all accumulated site pings to the database.
|
||||||
|
*
|
||||||
|
* For each batch: first upserts individual per-site timestamps into
|
||||||
|
* `sitePing`, then bulk-updates `sites.online = true`.
|
||||||
*/
|
*/
|
||||||
async function flushSitePingsToDb(): Promise<void> {
|
async function flushSitePingsToDb(): Promise<void> {
|
||||||
if (pendingSitePings.size === 0) {
|
if (pendingSitePings.size === 0) {
|
||||||
@@ -83,55 +94,40 @@ async function flushSitePingsToDb(): Promise<void> {
|
|||||||
const pingsToFlush = new Map(pendingSitePings);
|
const pingsToFlush = new Map(pendingSitePings);
|
||||||
pendingSitePings.clear();
|
pendingSitePings.clear();
|
||||||
|
|
||||||
// Sort by siteId for consistent lock ordering (prevents deadlocks)
|
const entries = Array.from(pingsToFlush.entries());
|
||||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
|
||||||
([a], [b]) => a - b
|
|
||||||
);
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
const BATCH_SIZE = 50;
|
||||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||||
|
|
||||||
|
const siteIds = batch.map(([id]) => id);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await withRetry(async () => {
|
await withRetry(async () => {
|
||||||
// Group by timestamp for efficient bulk updates
|
const rows = batch.map(([siteId, ts]) => ({ siteId, lastPing: ts }));
|
||||||
const byTimestamp = new Map<number, number[]>();
|
|
||||||
for (const [siteId, timestamp] of batch) {
|
|
||||||
const group = byTimestamp.get(timestamp) || [];
|
|
||||||
group.push(siteId);
|
|
||||||
byTimestamp.set(timestamp, group);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (byTimestamp.size === 1) {
|
// Step 1: Upsert ping timestamps into sitePing
|
||||||
const [timestamp, siteIds] = Array.from(
|
await db
|
||||||
byTimestamp.entries()
|
.insert(sitePing)
|
||||||
)[0];
|
.values(rows)
|
||||||
|
.onConflictDoUpdate({
|
||||||
|
target: sitePing.siteId,
|
||||||
|
set: { lastPing: sql`excluded."lastPing"` }
|
||||||
|
});
|
||||||
|
|
||||||
|
// Step 2: Update online status on sites
|
||||||
await db
|
await db
|
||||||
.update(sites)
|
.update(sites)
|
||||||
.set({
|
.set({ online: true })
|
||||||
online: true,
|
|
||||||
lastPing: timestamp
|
|
||||||
})
|
|
||||||
.where(inArray(sites.siteId, siteIds));
|
.where(inArray(sites.siteId, siteIds));
|
||||||
} else {
|
|
||||||
await db.transaction(async (tx) => {
|
|
||||||
for (const [timestamp, siteIds] of byTimestamp) {
|
|
||||||
await tx
|
|
||||||
.update(sites)
|
|
||||||
.set({
|
|
||||||
online: true,
|
|
||||||
lastPing: timestamp
|
|
||||||
})
|
|
||||||
.where(inArray(sites.siteId, siteIds));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, "flushSitePingsToDb");
|
}, "flushSitePingsToDb");
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
`Failed to flush site ping batch (${batch.length} sites), re-queuing for next cycle`,
|
||||||
{ error }
|
{ error }
|
||||||
);
|
);
|
||||||
|
// Re-queue only if the preserved timestamp is newer than any
|
||||||
|
// update that may have landed since we snapshotted.
|
||||||
for (const [siteId, timestamp] of batch) {
|
for (const [siteId, timestamp] of batch) {
|
||||||
const existing = pendingSitePings.get(siteId);
|
const existing = pendingSitePings.get(siteId);
|
||||||
if (!existing || existing < timestamp) {
|
if (!existing || existing < timestamp) {
|
||||||
@@ -144,6 +140,9 @@ async function flushSitePingsToDb(): Promise<void> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Flush all accumulated client (OLM) pings to the database.
|
* Flush all accumulated client (OLM) pings to the database.
|
||||||
|
*
|
||||||
|
* For each batch: first upserts individual per-client timestamps into
|
||||||
|
* `clientPing`, then bulk-updates `clients.online = true, archived = false`.
|
||||||
*/
|
*/
|
||||||
async function flushClientPingsToDb(): Promise<void> {
|
async function flushClientPingsToDb(): Promise<void> {
|
||||||
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
if (pendingClientPings.size === 0 && pendingOlmArchiveResets.size === 0) {
|
||||||
@@ -159,51 +158,32 @@ async function flushClientPingsToDb(): Promise<void> {
|
|||||||
|
|
||||||
// ── Flush client pings ─────────────────────────────────────────────
|
// ── Flush client pings ─────────────────────────────────────────────
|
||||||
if (pingsToFlush.size > 0) {
|
if (pingsToFlush.size > 0) {
|
||||||
const sortedEntries = Array.from(pingsToFlush.entries()).sort(
|
const entries = Array.from(pingsToFlush.entries());
|
||||||
([a], [b]) => a - b
|
|
||||||
);
|
|
||||||
|
|
||||||
const BATCH_SIZE = 50;
|
const BATCH_SIZE = 50;
|
||||||
for (let i = 0; i < sortedEntries.length; i += BATCH_SIZE) {
|
for (let i = 0; i < entries.length; i += BATCH_SIZE) {
|
||||||
const batch = sortedEntries.slice(i, i + BATCH_SIZE);
|
const batch = entries.slice(i, i + BATCH_SIZE);
|
||||||
|
|
||||||
|
const clientIds = batch.map(([id]) => id);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await withRetry(async () => {
|
await withRetry(async () => {
|
||||||
const byTimestamp = new Map<number, number[]>();
|
const rows = batch.map(([clientId, ts]) => ({ clientId, lastPing: ts }));
|
||||||
for (const [clientId, timestamp] of batch) {
|
|
||||||
const group = byTimestamp.get(timestamp) || [];
|
|
||||||
group.push(clientId);
|
|
||||||
byTimestamp.set(timestamp, group);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (byTimestamp.size === 1) {
|
// Step 1: Upsert ping timestamps into clientPing
|
||||||
const [timestamp, clientIds] = Array.from(
|
await db
|
||||||
byTimestamp.entries()
|
.insert(clientPing)
|
||||||
)[0];
|
.values(rows)
|
||||||
|
.onConflictDoUpdate({
|
||||||
|
target: clientPing.clientId,
|
||||||
|
set: { lastPing: sql`excluded."lastPing"` }
|
||||||
|
});
|
||||||
|
|
||||||
|
// Step 2: Update online + unarchive on clients
|
||||||
await db
|
await db
|
||||||
.update(clients)
|
.update(clients)
|
||||||
.set({
|
.set({ online: true, archived: false })
|
||||||
lastPing: timestamp,
|
|
||||||
online: true,
|
|
||||||
archived: false
|
|
||||||
})
|
|
||||||
.where(inArray(clients.clientId, clientIds));
|
.where(inArray(clients.clientId, clientIds));
|
||||||
} else {
|
|
||||||
await db.transaction(async (tx) => {
|
|
||||||
for (const [timestamp, clientIds] of byTimestamp) {
|
|
||||||
await tx
|
|
||||||
.update(clients)
|
|
||||||
.set({
|
|
||||||
lastPing: timestamp,
|
|
||||||
online: true,
|
|
||||||
archived: false
|
|
||||||
})
|
|
||||||
.where(
|
|
||||||
inArray(clients.clientId, clientIds)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}, "flushClientPingsToDb");
|
}, "flushClientPingsToDb");
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error(
|
logger.error(
|
||||||
@@ -260,7 +240,12 @@ export async function flushPingsToDb(): Promise<void> {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Simple retry wrapper with exponential backoff for transient errors
|
* Simple retry wrapper with exponential backoff for transient errors
|
||||||
* (connection timeouts, unexpected disconnects).
|
* (deadlocks, connection timeouts, unexpected disconnects).
|
||||||
|
*
|
||||||
|
* PostgreSQL deadlocks (40P01) are always safe to retry: the database
|
||||||
|
* guarantees exactly one winner per deadlock pair, so the loser just needs
|
||||||
|
* to try again. MAX_RETRIES is intentionally higher than typical connection
|
||||||
|
* retry budgets to give deadlock victims enough chances to succeed.
|
||||||
*/
|
*/
|
||||||
async function withRetry<T>(
|
async function withRetry<T>(
|
||||||
operation: () => Promise<T>,
|
operation: () => Promise<T>,
|
||||||
@@ -277,7 +262,8 @@ async function withRetry<T>(
|
|||||||
const jitter = Math.random() * baseDelay;
|
const jitter = Math.random() * baseDelay;
|
||||||
const delay = baseDelay + jitter;
|
const delay = baseDelay + jitter;
|
||||||
logger.warn(
|
logger.warn(
|
||||||
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
`Transient DB error in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`,
|
||||||
|
{ code: error?.code ?? error?.cause?.code }
|
||||||
);
|
);
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
await new Promise((resolve) => setTimeout(resolve, delay));
|
||||||
continue;
|
continue;
|
||||||
@@ -288,14 +274,14 @@ async function withRetry<T>(
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Detect transient connection errors that are safe to retry.
|
* Detect transient errors that are safe to retry.
|
||||||
*/
|
*/
|
||||||
function isTransientError(error: any): boolean {
|
function isTransientError(error: any): boolean {
|
||||||
if (!error) return false;
|
if (!error) return false;
|
||||||
|
|
||||||
const message = (error.message || "").toLowerCase();
|
const message = (error.message || "").toLowerCase();
|
||||||
const causeMessage = (error.cause?.message || "").toLowerCase();
|
const causeMessage = (error.cause?.message || "").toLowerCase();
|
||||||
const code = error.code || "";
|
const code = error.code || error.cause?.code || "";
|
||||||
|
|
||||||
// Connection timeout / terminated
|
// Connection timeout / terminated
|
||||||
if (
|
if (
|
||||||
@@ -308,12 +294,17 @@ function isTransientError(error: any): boolean {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// PostgreSQL deadlock
|
// PostgreSQL deadlock detected — always safe to retry (one winner guaranteed)
|
||||||
if (code === "40P01" || message.includes("deadlock")) {
|
if (code === "40P01" || message.includes("deadlock")) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// ECONNRESET, ECONNREFUSED, EPIPE
|
// PostgreSQL serialization failure
|
||||||
|
if (code === "40001") {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ECONNRESET, ECONNREFUSED, EPIPE, ETIMEDOUT
|
||||||
if (
|
if (
|
||||||
code === "ECONNRESET" ||
|
code === "ECONNRESET" ||
|
||||||
code === "ECONNREFUSED" ||
|
code === "ECONNREFUSED" ||
|
||||||
@@ -337,12 +328,26 @@ export function startPingAccumulator(): void {
|
|||||||
}
|
}
|
||||||
|
|
||||||
flushTimer = setInterval(async () => {
|
flushTimer = setInterval(async () => {
|
||||||
|
// Skip this tick if the previous flush is still in progress.
|
||||||
|
// setInterval does not await async callbacks, so without this guard
|
||||||
|
// two flush cycles can run concurrently and deadlock each other on
|
||||||
|
// overlapping bulk UPDATE statements.
|
||||||
|
if (isFlushing) {
|
||||||
|
logger.debug(
|
||||||
|
"Ping accumulator: previous flush still in progress, skipping cycle"
|
||||||
|
);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
isFlushing = true;
|
||||||
try {
|
try {
|
||||||
await flushPingsToDb();
|
await flushPingsToDb();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Unhandled error in ping accumulator flush", {
|
logger.error("Unhandled error in ping accumulator flush", {
|
||||||
error
|
error
|
||||||
});
|
});
|
||||||
|
} finally {
|
||||||
|
isFlushing = false;
|
||||||
}
|
}
|
||||||
}, FLUSH_INTERVAL_MS);
|
}, FLUSH_INTERVAL_MS);
|
||||||
|
|
||||||
@@ -364,7 +369,22 @@ export async function stopPingAccumulator(): Promise<void> {
|
|||||||
flushTimer = null;
|
flushTimer = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Final flush to persist any remaining pings
|
// Final flush to persist any remaining pings.
|
||||||
|
// Wait for any in-progress flush to finish first so we don't race.
|
||||||
|
if (isFlushing) {
|
||||||
|
logger.debug(
|
||||||
|
"Ping accumulator: waiting for in-progress flush before stopping…"
|
||||||
|
);
|
||||||
|
await new Promise<void>((resolve) => {
|
||||||
|
const poll = setInterval(() => {
|
||||||
|
if (!isFlushing) {
|
||||||
|
clearInterval(poll);
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
}, 50);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await flushPingsToDb();
|
await flushPingsToDb();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
|
|||||||
@@ -1,8 +1,8 @@
|
|||||||
import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws";
|
import { disconnectClient, getClientConfigVersion } from "#dynamic/routers/ws";
|
||||||
import { db } from "@server/db";
|
import { db } from "@server/db";
|
||||||
import { MessageHandler } from "@server/routers/ws";
|
import { MessageHandler } from "@server/routers/ws";
|
||||||
import { clients, olms, Olm } from "@server/db";
|
import { clients, olms, Olm, clientPing } from "@server/db";
|
||||||
import { eq, lt, isNull, and, or } from "drizzle-orm";
|
import { eq, lt, isNull, and, or, inArray } from "drizzle-orm";
|
||||||
import { recordClientPing } from "@server/routers/newt/pingAccumulator";
|
import { recordClientPing } from "@server/routers/newt/pingAccumulator";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { validateSessionToken } from "@server/auth/sessions/app";
|
import { validateSessionToken } from "@server/auth/sessions/app";
|
||||||
@@ -37,21 +37,33 @@ export const startOlmOfflineChecker = (): void => {
|
|||||||
// TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING
|
// TODO: WE NEED TO MAKE SURE THIS WORKS WITH DISTRIBUTED NODES ALL DOING THE SAME THING
|
||||||
|
|
||||||
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
|
// Find clients that haven't pinged in the last 2 minutes and mark them as offline
|
||||||
const offlineClients = await db
|
const staleClientRows = await db
|
||||||
.update(clients)
|
.select({
|
||||||
.set({ online: false })
|
clientId: clients.clientId,
|
||||||
|
olmId: clients.olmId,
|
||||||
|
lastPing: clientPing.lastPing
|
||||||
|
})
|
||||||
|
.from(clients)
|
||||||
|
.leftJoin(clientPing, eq(clientPing.clientId, clients.clientId))
|
||||||
.where(
|
.where(
|
||||||
and(
|
and(
|
||||||
eq(clients.online, true),
|
eq(clients.online, true),
|
||||||
or(
|
or(
|
||||||
lt(clients.lastPing, twoMinutesAgo),
|
lt(clientPing.lastPing, twoMinutesAgo),
|
||||||
isNull(clients.lastPing)
|
isNull(clientPing.lastPing)
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
)
|
);
|
||||||
.returning();
|
|
||||||
|
|
||||||
for (const offlineClient of offlineClients) {
|
if (staleClientRows.length > 0) {
|
||||||
|
const staleClientIds = staleClientRows.map((c) => c.clientId);
|
||||||
|
await db
|
||||||
|
.update(clients)
|
||||||
|
.set({ online: false })
|
||||||
|
.where(inArray(clients.clientId, staleClientIds));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (const offlineClient of staleClientRows) {
|
||||||
logger.info(
|
logger.info(
|
||||||
`Kicking offline olm client ${offlineClient.clientId} due to inactivity`
|
`Kicking offline olm client ${offlineClient.clientId} due to inactivity`
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
import { NextFunction, Request, Response } from "express";
|
import { NextFunction, Request, Response } from "express";
|
||||||
import { z } from "zod";
|
import { z } from "zod";
|
||||||
import { db, sites } from "@server/db";
|
import { db, sites, siteBandwidth } from "@server/db";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq, inArray } from "drizzle-orm";
|
||||||
import response from "@server/lib/response";
|
import response from "@server/lib/response";
|
||||||
import HttpCode from "@server/types/HttpCode";
|
import HttpCode from "@server/types/HttpCode";
|
||||||
import createHttpError from "http-errors";
|
import createHttpError from "http-errors";
|
||||||
@@ -60,12 +60,17 @@ export async function resetOrgBandwidth(
|
|||||||
}
|
}
|
||||||
|
|
||||||
await db
|
await db
|
||||||
.update(sites)
|
.update(siteBandwidth)
|
||||||
.set({
|
.set({
|
||||||
megabytesIn: 0,
|
megabytesIn: 0,
|
||||||
megabytesOut: 0
|
megabytesOut: 0
|
||||||
})
|
})
|
||||||
.where(eq(sites.orgId, orgId));
|
.where(
|
||||||
|
inArray(
|
||||||
|
siteBandwidth.siteId,
|
||||||
|
db.select({ siteId: sites.siteId }).from(sites).where(eq(sites.orgId, orgId))
|
||||||
|
)
|
||||||
|
);
|
||||||
|
|
||||||
return response(res, {
|
return response(res, {
|
||||||
data: {},
|
data: {},
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import {
|
|||||||
remoteExitNodes,
|
remoteExitNodes,
|
||||||
roleSites,
|
roleSites,
|
||||||
sites,
|
sites,
|
||||||
|
siteBandwidth,
|
||||||
userSites
|
userSites
|
||||||
} from "@server/db";
|
} from "@server/db";
|
||||||
import cache from "#dynamic/lib/cache";
|
import cache from "#dynamic/lib/cache";
|
||||||
@@ -155,8 +156,8 @@ function querySitesBase() {
|
|||||||
name: sites.name,
|
name: sites.name,
|
||||||
pubKey: sites.pubKey,
|
pubKey: sites.pubKey,
|
||||||
subnet: sites.subnet,
|
subnet: sites.subnet,
|
||||||
megabytesIn: sites.megabytesIn,
|
megabytesIn: siteBandwidth.megabytesIn,
|
||||||
megabytesOut: sites.megabytesOut,
|
megabytesOut: siteBandwidth.megabytesOut,
|
||||||
orgName: orgs.name,
|
orgName: orgs.name,
|
||||||
type: sites.type,
|
type: sites.type,
|
||||||
online: sites.online,
|
online: sites.online,
|
||||||
@@ -175,7 +176,8 @@ function querySitesBase() {
|
|||||||
.leftJoin(
|
.leftJoin(
|
||||||
remoteExitNodes,
|
remoteExitNodes,
|
||||||
eq(remoteExitNodes.exitNodeId, sites.exitNodeId)
|
eq(remoteExitNodes.exitNodeId, sites.exitNodeId)
|
||||||
);
|
)
|
||||||
|
.leftJoin(siteBandwidth, eq(siteBandwidth.siteId, sites.siteId));
|
||||||
}
|
}
|
||||||
|
|
||||||
type SiteWithUpdateAvailable = Awaited<ReturnType<typeof querySitesBase>>[0] & {
|
type SiteWithUpdateAvailable = Awaited<ReturnType<typeof querySitesBase>>[0] & {
|
||||||
@@ -299,9 +301,15 @@ export async function listSites(
|
|||||||
.offset(pageSize * (page - 1))
|
.offset(pageSize * (page - 1))
|
||||||
.orderBy(
|
.orderBy(
|
||||||
sort_by
|
sort_by
|
||||||
? order === "asc"
|
? (() => {
|
||||||
? asc(sites[sort_by])
|
const field =
|
||||||
: desc(sites[sort_by])
|
sort_by === "megabytesIn"
|
||||||
|
? siteBandwidth.megabytesIn
|
||||||
|
: sort_by === "megabytesOut"
|
||||||
|
? siteBandwidth.megabytesOut
|
||||||
|
: sites.name;
|
||||||
|
return order === "asc" ? asc(field) : desc(field);
|
||||||
|
})()
|
||||||
: asc(sites.name)
|
: asc(sites.name)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import m13 from "./scriptsPg/1.15.3";
|
|||||||
import m14 from "./scriptsPg/1.15.4";
|
import m14 from "./scriptsPg/1.15.4";
|
||||||
import m15 from "./scriptsPg/1.16.0";
|
import m15 from "./scriptsPg/1.16.0";
|
||||||
import m16 from "./scriptsPg/1.17.0";
|
import m16 from "./scriptsPg/1.17.0";
|
||||||
|
import m17 from "./scriptsPg/1.18.0";
|
||||||
|
|
||||||
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
|
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
|
||||||
// EXCEPT FOR THE DATABASE AND THE SCHEMA
|
// EXCEPT FOR THE DATABASE AND THE SCHEMA
|
||||||
@@ -43,7 +44,8 @@ const migrations = [
|
|||||||
{ version: "1.15.3", run: m13 },
|
{ version: "1.15.3", run: m13 },
|
||||||
{ version: "1.15.4", run: m14 },
|
{ version: "1.15.4", run: m14 },
|
||||||
{ version: "1.16.0", run: m15 },
|
{ version: "1.16.0", run: m15 },
|
||||||
{ version: "1.17.0", run: m16 }
|
{ version: "1.17.0", run: m16 },
|
||||||
|
{ version: "1.18.0", run: m17 }
|
||||||
// Add new migrations here as they are created
|
// Add new migrations here as they are created
|
||||||
] as {
|
] as {
|
||||||
version: string;
|
version: string;
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ import m34 from "./scriptsSqlite/1.15.3";
|
|||||||
import m35 from "./scriptsSqlite/1.15.4";
|
import m35 from "./scriptsSqlite/1.15.4";
|
||||||
import m36 from "./scriptsSqlite/1.16.0";
|
import m36 from "./scriptsSqlite/1.16.0";
|
||||||
import m37 from "./scriptsSqlite/1.17.0";
|
import m37 from "./scriptsSqlite/1.17.0";
|
||||||
|
import m38 from "./scriptsSqlite/1.18.0";
|
||||||
|
|
||||||
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
|
// THIS CANNOT IMPORT ANYTHING FROM THE SERVER
|
||||||
// EXCEPT FOR THE DATABASE AND THE SCHEMA
|
// EXCEPT FOR THE DATABASE AND THE SCHEMA
|
||||||
@@ -77,7 +78,8 @@ const migrations = [
|
|||||||
{ version: "1.15.3", run: m34 },
|
{ version: "1.15.3", run: m34 },
|
||||||
{ version: "1.15.4", run: m35 },
|
{ version: "1.15.4", run: m35 },
|
||||||
{ version: "1.16.0", run: m36 },
|
{ version: "1.16.0", run: m36 },
|
||||||
{ version: "1.17.0", run: m37 }
|
{ version: "1.17.0", run: m37 },
|
||||||
|
{ version: "1.18.0", run: m38 }
|
||||||
// Add new migrations here as they are created
|
// Add new migrations here as they are created
|
||||||
] as const;
|
] as const;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user