This commit is contained in:
Owen
2025-10-04 18:36:44 -07:00
parent 3123f858bb
commit c2c907852d
320 changed files with 35785 additions and 2984 deletions

View File

@@ -1,6 +1,7 @@
import { db, newts } from "@server/db";
import { db, exitNodeOrgs, newts } from "@server/db";
import { MessageHandler } from "../ws";
import { exitNodes, Newt, resources, sites, Target, targets } from "@server/db";
import { targetHealthCheck } from "@server/db";
import { eq, and, sql, inArray } from "drizzle-orm";
import { addPeer, deletePeer } from "../gerbil/peers";
import logger from "@server/logger";
@@ -9,7 +10,12 @@ import {
findNextAvailableCidr,
getNextAvailableClientSubnet
} from "@server/lib/ip";
import { selectBestExitNode, verifyExitNodeOrgAccess } from "@server/lib/exitNodes";
import { usageService } from "@server/lib/private/billing/usageService";
import { FeatureId } from "@server/lib/private/billing";
import {
selectBestExitNode,
verifyExitNodeOrgAccess
} from "@server/lib/exitNodes";
import { fetchContainers } from "./dockerSocket";
export type ExitNodePingResult = {
@@ -22,6 +28,8 @@ export type ExitNodePingResult = {
wasPreviouslyConnected: boolean;
};
let numTimesLimitExceededForId: Record<string, number> = {};
export const handleNewtRegisterMessage: MessageHandler = async (context) => {
const { message, client, sendToClient } = context;
const newt = client as Newt;
@@ -86,13 +94,52 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
fetchContainers(newt.newtId);
}
const rejectSiteUptime = await usageService.checkLimitSet(
oldSite.orgId,
false,
FeatureId.SITE_UPTIME
);
const rejectEgressDataMb = await usageService.checkLimitSet(
oldSite.orgId,
false,
FeatureId.EGRESS_DATA_MB
);
// Do we need to check the users and domains daily limits here?
// const rejectUsers = await usageService.checkLimitSet(oldSite.orgId, false, FeatureId.USERS);
// const rejectDomains = await usageService.checkLimitSet(oldSite.orgId, false, FeatureId.DOMAINS);
// if (rejectEgressDataMb || rejectSiteUptime || rejectUsers || rejectDomains) {
if (rejectEgressDataMb || rejectSiteUptime) {
logger.info(
`Usage limits exceeded for org ${oldSite.orgId}. Rejecting newt registration.`
);
// PREVENT FURTHER REGISTRATION ATTEMPTS SO WE DON'T SPAM
// Increment the limit exceeded count for this site
numTimesLimitExceededForId[newt.newtId] =
(numTimesLimitExceededForId[newt.newtId] || 0) + 1;
if (numTimesLimitExceededForId[newt.newtId] > 15) {
logger.debug(
`Newt ${newt.newtId} has exceeded usage limits 15 times. Terminating...`
);
}
return;
}
let siteSubnet = oldSite.subnet;
let exitNodeIdToQuery = oldSite.exitNodeId;
if (exitNodeId && (oldSite.exitNodeId !== exitNodeId || !oldSite.subnet)) {
// This effectively moves the exit node to the new one
exitNodeIdToQuery = exitNodeId; // Use the provided exitNodeId if it differs from the site's exitNodeId
const { exitNode, hasAccess } = await verifyExitNodeOrgAccess(exitNodeIdToQuery, oldSite.orgId);
const { exitNode, hasAccess } = await verifyExitNodeOrgAccess(
exitNodeIdToQuery,
oldSite.orgId
);
if (!exitNode) {
logger.warn("Exit node not found");
@@ -114,6 +161,10 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
const blockSize = config.getRawConfig().gerbil.site_block_size;
const subnets = sitesQuery
.map((site) => site.subnet)
.filter(
(subnet) =>
subnet && /^(\d{1,3}\.){3}\d{1,3}\/\d{1,2}$/.test(subnet)
)
.filter((subnet) => subnet !== null);
subnets.push(exitNode.address.replace(/\/\d+$/, `/${blockSize}`));
const newSubnet = findNextAvailableCidr(
@@ -122,7 +173,9 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
exitNode.address
);
if (!newSubnet) {
logger.error("No available subnets found for the new exit node");
logger.error(
`No available subnets found for the new exit node id ${exitNodeId} and site id ${siteId}`
);
return;
}
@@ -168,11 +221,25 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
return;
}
// add the peer to the exit node
await addPeer(exitNodeIdToQuery, {
publicKey: publicKey,
allowedIps: [siteSubnet]
});
try {
// add the peer to the exit node
await addPeer(exitNodeIdToQuery, {
publicKey: publicKey,
allowedIps: [siteSubnet]
});
} catch (error) {
logger.error(`Failed to add peer to exit node: ${error}`);
}
if (newtVersion && newtVersion !== newt.version) {
// update the newt version in the database
await db
.update(newts)
.set({
version: newtVersion as string
})
.where(eq(newts.newtId, newt.newtId));
}
if (newtVersion && newtVersion !== newt.version) {
// update the newt version in the database
@@ -194,10 +261,25 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
port: targets.port,
internalPort: targets.internalPort,
enabled: targets.enabled,
protocol: resources.protocol
protocol: resources.protocol,
hcEnabled: targetHealthCheck.hcEnabled,
hcPath: targetHealthCheck.hcPath,
hcScheme: targetHealthCheck.hcScheme,
hcMode: targetHealthCheck.hcMode,
hcHostname: targetHealthCheck.hcHostname,
hcPort: targetHealthCheck.hcPort,
hcInterval: targetHealthCheck.hcInterval,
hcUnhealthyInterval: targetHealthCheck.hcUnhealthyInterval,
hcTimeout: targetHealthCheck.hcTimeout,
hcHeaders: targetHealthCheck.hcHeaders,
hcMethod: targetHealthCheck.hcMethod
})
.from(targets)
.innerJoin(resources, eq(targets.resourceId, resources.resourceId))
.leftJoin(
targetHealthCheck,
eq(targets.targetId, targetHealthCheck.targetId)
)
.where(and(eq(targets.siteId, siteId), eq(targets.enabled, true)));
const { tcpTargets, udpTargets } = allTargets.reduce(
@@ -222,6 +304,59 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
{ tcpTargets: [] as string[], udpTargets: [] as string[] }
);
const healthCheckTargets = allTargets.map((target) => {
// make sure the stuff is defined
if (
!target.hcPath ||
!target.hcHostname ||
!target.hcPort ||
!target.hcInterval ||
!target.hcMethod
) {
logger.debug(
`Skipping target ${target.targetId} due to missing health check fields`
);
return null; // Skip targets with missing health check fields
}
// parse headers
const hcHeadersParse = target.hcHeaders
? JSON.parse(target.hcHeaders)
: null;
let hcHeadersSend: { [key: string]: string } = {};
if (hcHeadersParse) {
hcHeadersParse.forEach(
(header: { name: string; value: string }) => {
hcHeadersSend[header.name] = header.value;
}
);
}
return {
id: target.targetId,
hcEnabled: target.hcEnabled,
hcPath: target.hcPath,
hcScheme: target.hcScheme,
hcMode: target.hcMode,
hcHostname: target.hcHostname,
hcPort: target.hcPort,
hcInterval: target.hcInterval, // in seconds
hcUnhealthyInterval: target.hcUnhealthyInterval, // in seconds
hcTimeout: target.hcTimeout, // in seconds
hcHeaders: hcHeadersSend,
hcMethod: target.hcMethod
};
});
// Filter out any null values from health check targets
const validHealthCheckTargets = healthCheckTargets.filter(
(target) => target !== null
);
logger.debug(
`Sending health check targets to newt ${newt.newtId}: ${JSON.stringify(validHealthCheckTargets)}`
);
return {
message: {
type: "newt/wg/connect",
@@ -233,10 +368,11 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
targets: {
udp: udpTargets,
tcp: tcpTargets
}
},
healthCheckTargets: validHealthCheckTargets
}
},
broadcast: false, // Send to all clients
excludeSender: false // Include sender in broadcast
};
};
};