Support improved targets msg v2

This commit is contained in:
Owen
2026-03-02 18:51:48 -08:00
parent dae169540b
commit 6cf1b9b010
6 changed files with 326 additions and 79 deletions

View File

@@ -1,8 +1,15 @@
import { sendToClient } from "#dynamic/routers/ws";
import { db, olms, Transaction } from "@server/db";
import { Alias, SubnetProxyTarget } from "@server/lib/ip";
import { S } from "@faker-js/faker/dist/airline-Dz1uGqgJ";
import { db, newts, olms, Transaction } from "@server/db";
import {
Alias,
convertSubnetProxyTargetsV2ToV1,
SubnetProxyTarget,
SubnetProxyTargetV2
} from "@server/lib/ip";
import logger from "@server/logger";
import { eq } from "drizzle-orm";
import semver from "semver";
const BATCH_SIZE = 50;
const BATCH_DELAY_MS = 50;
@@ -19,57 +26,149 @@ function chunkArray<T>(array: T[], size: number): T[][] {
return chunks;
}
export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) {
const batches = chunkArray(targets, BATCH_SIZE);
const NEWT_V2_TARGETS_VERSION = ">=1.11.0";
export async function convertTargetsIfNessicary(
newtId: string,
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[]
) {
// get the newt
const [newt] = await db
.select()
.from(newts)
.where(eq(newts.newtId, newtId));
if (!newt) {
throw new Error(`No newt found for id: ${newtId}`);
}
// check the semver
if (
newt.version &&
!semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION)
) {
logger.debug(
`addTargets Newt version ${newt.version} does not support targets v2 falling back`
);
targets = convertSubnetProxyTargetsV2ToV1(
targets as SubnetProxyTargetV2[]
);
}
return targets;
}
export async function addTargets(
newtId: string,
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[]
) {
targets = await convertTargetsIfNessicary(newtId, targets);
const batches = chunkArray<SubnetProxyTarget | SubnetProxyTargetV2>(
targets,
BATCH_SIZE
);
for (let i = 0; i < batches.length; i++) {
if (i > 0) {
await sleep(BATCH_DELAY_MS);
}
await sendToClient(newtId, {
type: `newt/wg/targets/add`,
data: batches[i]
}, { incrementConfigVersion: true });
await sendToClient(
newtId,
{
type: `newt/wg/targets/add`,
data: batches[i]
},
{ incrementConfigVersion: true }
);
}
}
export async function removeTargets(
newtId: string,
targets: SubnetProxyTarget[]
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[]
) {
const batches = chunkArray(targets, BATCH_SIZE);
targets = await convertTargetsIfNessicary(newtId, targets);
const batches = chunkArray<SubnetProxyTarget | SubnetProxyTargetV2>(
targets,
BATCH_SIZE
);
for (let i = 0; i < batches.length; i++) {
if (i > 0) {
await sleep(BATCH_DELAY_MS);
}
await sendToClient(newtId, {
type: `newt/wg/targets/remove`,
data: batches[i]
},{ incrementConfigVersion: true });
await sendToClient(
newtId,
{
type: `newt/wg/targets/remove`,
data: batches[i]
},
{ incrementConfigVersion: true }
);
}
}
export async function updateTargets(
newtId: string,
targets: {
oldTargets: SubnetProxyTarget[];
newTargets: SubnetProxyTarget[];
oldTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
newTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
}
) {
const oldBatches = chunkArray(targets.oldTargets, BATCH_SIZE);
const newBatches = chunkArray(targets.newTargets, BATCH_SIZE);
// get the newt
const [newt] = await db
.select()
.from(newts)
.where(eq(newts.newtId, newtId));
if (!newt) {
logger.error(`addTargetsL No newt found for id: ${newtId}`);
return;
}
// check the semver
if (
newt.version &&
!semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION)
) {
logger.debug(
`addTargets Newt version ${newt.version} does not support targets v2 falling back`
);
targets = {
oldTargets: convertSubnetProxyTargetsV2ToV1(
targets.oldTargets as SubnetProxyTargetV2[]
),
newTargets: convertSubnetProxyTargetsV2ToV1(
targets.newTargets as SubnetProxyTargetV2[]
)
};
}
const oldBatches = chunkArray<SubnetProxyTarget | SubnetProxyTargetV2>(
targets.oldTargets,
BATCH_SIZE
);
const newBatches = chunkArray<SubnetProxyTarget | SubnetProxyTargetV2>(
targets.newTargets,
BATCH_SIZE
);
const maxBatches = Math.max(oldBatches.length, newBatches.length);
for (let i = 0; i < maxBatches; i++) {
if (i > 0) {
await sleep(BATCH_DELAY_MS);
}
await sendToClient(newtId, {
type: `newt/wg/targets/update`,
data: {
oldTargets: oldBatches[i] || [],
newTargets: newBatches[i] || []
}
}, { incrementConfigVersion: true }).catch((error) => {
await sendToClient(
newtId,
{
type: `newt/wg/targets/update`,
data: {
oldTargets: oldBatches[i] || [],
newTargets: newBatches[i] || []
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
}
@@ -94,14 +193,18 @@ export async function addPeerData(
olmId = olm.olmId;
}
await sendToClient(olmId, {
type: `olm/wg/peer/data/add`,
data: {
siteId: siteId,
remoteSubnets: remoteSubnets,
aliases: aliases
}
}, { incrementConfigVersion: true }).catch((error) => {
await sendToClient(
olmId,
{
type: `olm/wg/peer/data/add`,
data: {
siteId: siteId,
remoteSubnets: remoteSubnets,
aliases: aliases
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
}
@@ -125,14 +228,18 @@ export async function removePeerData(
olmId = olm.olmId;
}
await sendToClient(olmId, {
type: `olm/wg/peer/data/remove`,
data: {
siteId: siteId,
remoteSubnets: remoteSubnets,
aliases: aliases
}
}, { incrementConfigVersion: true }).catch((error) => {
await sendToClient(
olmId,
{
type: `olm/wg/peer/data/remove`,
data: {
siteId: siteId,
remoteSubnets: remoteSubnets,
aliases: aliases
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
}
@@ -166,14 +273,18 @@ export async function updatePeerData(
olmId = olm.olmId;
}
await sendToClient(olmId, {
type: `olm/wg/peer/data/update`,
data: {
siteId: siteId,
...remoteSubnets,
...aliases
}
}, { incrementConfigVersion: true }).catch((error) => {
await sendToClient(
olmId,
{
type: `olm/wg/peer/data/update`,
data: {
siteId: siteId,
...remoteSubnets,
...aliases
}
},
{ incrementConfigVersion: true }
).catch((error) => {
logger.warn(`Error sending message:`, error);
});
}

View File

@@ -1,9 +1,23 @@
import { clients, clientSiteResourcesAssociationsCache, clientSitesAssociationsCache, db, ExitNode, resources, Site, siteResources, targetHealthCheck, targets } from "@server/db";
import {
clients,
clientSiteResourcesAssociationsCache,
clientSitesAssociationsCache,
db,
ExitNode,
resources,
Site,
siteResources,
targetHealthCheck,
targets
} from "@server/db";
import logger from "@server/logger";
import { initPeerAddHandshake, updatePeer } from "../olm/peers";
import { eq, and } from "drizzle-orm";
import config from "@server/lib/config";
import { generateSubnetProxyTargets, SubnetProxyTarget } from "@server/lib/ip";
import {
generateSubnetProxyTargetV2,
SubnetProxyTargetV2
} from "@server/lib/ip";
export async function buildClientConfigurationForNewtClient(
site: Site,
@@ -126,7 +140,7 @@ export async function buildClientConfigurationForNewtClient(
.from(siteResources)
.where(eq(siteResources.siteId, siteId));
const targetsToSend: SubnetProxyTarget[] = [];
const targetsToSend: SubnetProxyTargetV2[] = [];
for (const resource of allSiteResources) {
// Get clients associated with this specific resource
@@ -151,12 +165,14 @@ export async function buildClientConfigurationForNewtClient(
)
);
const resourceTargets = generateSubnetProxyTargets(
const resourceTarget = generateSubnetProxyTargetV2(
resource,
resourceClients
);
targetsToSend.push(...resourceTargets);
if (resourceTarget) {
targetsToSend.push(resourceTarget);
}
}
return {

View File

@@ -6,6 +6,7 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db";
import { eq } from "drizzle-orm";
import { sendToExitNode } from "#dynamic/lib/exitNodes";
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
import { convertTargetsIfNessicary } from "../client/targets";
const inputSchema = z.object({
publicKey: z.string(),
@@ -126,13 +127,15 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
exitNode
);
const targetsToSend = await convertTargetsIfNessicary(newt.newtId, targets);
return {
message: {
type: "newt/wg/receive-config",
data: {
ipAddress: site.address,
peers,
targets
targets: targetsToSend
}
},
broadcast: false,

View File

@@ -24,7 +24,7 @@ import { updatePeerData, updateTargets } from "@server/routers/client/targets";
import {
generateAliasConfig,
generateRemoteSubnets,
generateSubnetProxyTargets,
generateSubnetProxyTargetV2,
isIpInCidr,
portRangeStringSchema
} from "@server/lib/ip";
@@ -608,18 +608,18 @@ export async function handleMessagingForUpdatedSiteResource(
// Only update targets on newt if destination changed
if (destinationChanged || portRangesChanged) {
const oldTargets = generateSubnetProxyTargets(
const oldTarget = generateSubnetProxyTargetV2(
existingSiteResource,
mergedAllClients
);
const newTargets = generateSubnetProxyTargets(
const newTarget = generateSubnetProxyTargetV2(
updatedSiteResource,
mergedAllClients
);
await updateTargets(newt.newtId, {
oldTargets: oldTargets,
newTargets: newTargets
oldTargets: [oldTarget],
newTargets: [newTarget]
});
}