mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-16 23:56:39 +00:00
Add message compression for large messages
This commit is contained in:
@@ -107,7 +107,7 @@ export async function applyBlueprint({
|
|||||||
[target],
|
[target],
|
||||||
matchingHealthcheck ? [matchingHealthcheck] : [],
|
matchingHealthcheck ? [matchingHealthcheck] : [],
|
||||||
result.proxyResource.protocol,
|
result.proxyResource.protocol,
|
||||||
result.proxyResource.proxyPort
|
site.newt.version
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
20
server/lib/clientVersionChecks.ts
Normal file
20
server/lib/clientVersionChecks.ts
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
import semver from "semver";
|
||||||
|
|
||||||
|
export function canCompress(
|
||||||
|
clientVersion: string | null | undefined,
|
||||||
|
type: "newt" | "olm"
|
||||||
|
): boolean {
|
||||||
|
try {
|
||||||
|
if (!clientVersion) return false;
|
||||||
|
// check if it is a valid semver
|
||||||
|
if (!semver.valid(clientVersion)) return false;
|
||||||
|
if (type === "newt") {
|
||||||
|
return semver.gte(clientVersion, "1.10.3");
|
||||||
|
} else if (type === "olm") {
|
||||||
|
return semver.gte(clientVersion, "1.4.3");
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} catch {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -670,7 +670,11 @@ async function handleSubnetProxyTargetUpdates(
|
|||||||
`Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
`Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
||||||
);
|
);
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
addSubnetProxyTargets(newt.newtId, targetsToAdd)
|
addSubnetProxyTargets(
|
||||||
|
newt.newtId,
|
||||||
|
targetsToAdd,
|
||||||
|
newt.version
|
||||||
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -706,7 +710,11 @@ async function handleSubnetProxyTargetUpdates(
|
|||||||
`Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
`Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
||||||
);
|
);
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
removeSubnetProxyTargets(newt.newtId, targetsToRemove)
|
removeSubnetProxyTargets(
|
||||||
|
newt.newtId,
|
||||||
|
targetsToRemove,
|
||||||
|
newt.version
|
||||||
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1148,7 +1156,7 @@ async function handleMessagesForClientResources(
|
|||||||
// Add subnet proxy targets for each site
|
// Add subnet proxy targets for each site
|
||||||
for (const [siteId, resources] of addedBySite.entries()) {
|
for (const [siteId, resources] of addedBySite.entries()) {
|
||||||
const [newt] = await trx
|
const [newt] = await trx
|
||||||
.select({ newtId: newts.newtId })
|
.select({ newtId: newts.newtId, version: newts.version })
|
||||||
.from(newts)
|
.from(newts)
|
||||||
.where(eq(newts.siteId, siteId))
|
.where(eq(newts.siteId, siteId))
|
||||||
.limit(1);
|
.limit(1);
|
||||||
@@ -1170,7 +1178,13 @@ async function handleMessagesForClientResources(
|
|||||||
]);
|
]);
|
||||||
|
|
||||||
if (targets.length > 0) {
|
if (targets.length > 0) {
|
||||||
proxyJobs.push(addSubnetProxyTargets(newt.newtId, targets));
|
proxyJobs.push(
|
||||||
|
addSubnetProxyTargets(
|
||||||
|
newt.newtId,
|
||||||
|
targets,
|
||||||
|
newt.version
|
||||||
|
)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
@@ -1219,7 +1233,7 @@ async function handleMessagesForClientResources(
|
|||||||
// Remove subnet proxy targets for each site
|
// Remove subnet proxy targets for each site
|
||||||
for (const [siteId, resources] of removedBySite.entries()) {
|
for (const [siteId, resources] of removedBySite.entries()) {
|
||||||
const [newt] = await trx
|
const [newt] = await trx
|
||||||
.select({ newtId: newts.newtId })
|
.select({ newtId: newts.newtId, version: newts.version })
|
||||||
.from(newts)
|
.from(newts)
|
||||||
.where(eq(newts.siteId, siteId))
|
.where(eq(newts.siteId, siteId))
|
||||||
.limit(1);
|
.limit(1);
|
||||||
@@ -1242,7 +1256,11 @@ async function handleMessagesForClientResources(
|
|||||||
|
|
||||||
if (targets.length > 0) {
|
if (targets.length > 0) {
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
removeSubnetProxyTargets(newt.newtId, targets)
|
removeSubnetProxyTargets(
|
||||||
|
newt.newtId,
|
||||||
|
targets,
|
||||||
|
newt.version
|
||||||
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -168,7 +168,13 @@ const processPendingMessages = async (
|
|||||||
const jobs = [];
|
const jobs = [];
|
||||||
for (const pending of ws.pendingMessages) {
|
for (const pending of ws.pendingMessages) {
|
||||||
jobs.push(
|
jobs.push(
|
||||||
processMessage(ws, pending.data, pending.isBinary, clientId, clientType)
|
processMessage(
|
||||||
|
ws,
|
||||||
|
pending.data,
|
||||||
|
pending.isBinary,
|
||||||
|
clientId,
|
||||||
|
clientType
|
||||||
|
)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -330,7 +336,9 @@ const addClient = async (
|
|||||||
// Check Redis first if enabled
|
// Check Redis first if enabled
|
||||||
if (redisManager.isRedisEnabled()) {
|
if (redisManager.isRedisEnabled()) {
|
||||||
try {
|
try {
|
||||||
const redisVersion = await redisManager.get(getConfigVersionKey(clientId));
|
const redisVersion = await redisManager.get(
|
||||||
|
getConfigVersionKey(clientId)
|
||||||
|
);
|
||||||
if (redisVersion !== null) {
|
if (redisVersion !== null) {
|
||||||
configVersion = parseInt(redisVersion, 10);
|
configVersion = parseInt(redisVersion, 10);
|
||||||
// Sync to local cache
|
// Sync to local cache
|
||||||
@@ -342,7 +350,10 @@ const addClient = async (
|
|||||||
} else {
|
} else {
|
||||||
// Use local cache version and sync to Redis
|
// Use local cache version and sync to Redis
|
||||||
configVersion = clientConfigVersions.get(clientId) || 0;
|
configVersion = clientConfigVersions.get(clientId) || 0;
|
||||||
await redisManager.set(getConfigVersionKey(clientId), configVersion.toString());
|
await redisManager.set(
|
||||||
|
getConfigVersionKey(clientId),
|
||||||
|
configVersion.toString()
|
||||||
|
);
|
||||||
}
|
}
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
logger.error("Failed to get/set config version in Redis:", error);
|
logger.error("Failed to get/set config version in Redis:", error);
|
||||||
@@ -437,7 +448,9 @@ const removeClient = async (
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Helper to get the current config version for a client
|
// Helper to get the current config version for a client
|
||||||
const getClientConfigVersion = async (clientId: string): Promise<number | undefined> => {
|
const getClientConfigVersion = async (
|
||||||
|
clientId: string
|
||||||
|
): Promise<number | undefined> => {
|
||||||
// Try Redis first if available
|
// Try Redis first if available
|
||||||
if (redisManager.isRedisEnabled()) {
|
if (redisManager.isRedisEnabled()) {
|
||||||
try {
|
try {
|
||||||
@@ -508,7 +521,13 @@ const sendToClientLocal = async (
|
|||||||
|
|
||||||
const messageString = JSON.stringify(messageWithVersion);
|
const messageString = JSON.stringify(messageWithVersion);
|
||||||
if (options.compress) {
|
if (options.compress) {
|
||||||
|
logger.debug(
|
||||||
|
`Message size before compression: ${messageString.length} bytes`
|
||||||
|
);
|
||||||
const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8"));
|
const compressed = zlib.gzipSync(Buffer.from(messageString, "utf8"));
|
||||||
|
logger.debug(
|
||||||
|
`Message size after compression: ${compressed.length} bytes`
|
||||||
|
);
|
||||||
clients.forEach((client) => {
|
clients.forEach((client) => {
|
||||||
if (client.readyState === WebSocket.OPEN) {
|
if (client.readyState === WebSocket.OPEN) {
|
||||||
client.send(compressed);
|
client.send(compressed);
|
||||||
@@ -806,7 +825,13 @@ const setupConnection = async (
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
await processMessage(ws, data as Buffer, isBinary, clientId, clientType);
|
await processMessage(
|
||||||
|
ws,
|
||||||
|
data as Buffer,
|
||||||
|
isBinary,
|
||||||
|
clientId,
|
||||||
|
clientType
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
// Set up other event handlers before async operations
|
// Set up other event handlers before async operations
|
||||||
|
|||||||
@@ -1,23 +1,29 @@
|
|||||||
import { sendToClient } from "#dynamic/routers/ws";
|
import { sendToClient } from "#dynamic/routers/ws";
|
||||||
import { db, olms, Transaction } from "@server/db";
|
import { db, olms, Transaction } from "@server/db";
|
||||||
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
import { Alias, SubnetProxyTarget } from "@server/lib/ip";
|
import { Alias, SubnetProxyTarget } from "@server/lib/ip";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
|
|
||||||
export async function addTargets(newtId: string, targets: SubnetProxyTarget[]) {
|
export async function addTargets(
|
||||||
|
newtId: string,
|
||||||
|
targets: SubnetProxyTarget[],
|
||||||
|
version?: string | null
|
||||||
|
) {
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
type: `newt/wg/targets/add`,
|
type: `newt/wg/targets/add`,
|
||||||
data: targets
|
data: targets
|
||||||
},
|
},
|
||||||
{ incrementConfigVersion: true }
|
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function removeTargets(
|
export async function removeTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: SubnetProxyTarget[]
|
targets: SubnetProxyTarget[],
|
||||||
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
@@ -25,7 +31,7 @@ export async function removeTargets(
|
|||||||
type: `newt/wg/targets/remove`,
|
type: `newt/wg/targets/remove`,
|
||||||
data: targets
|
data: targets
|
||||||
},
|
},
|
||||||
{ incrementConfigVersion: true }
|
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -34,7 +40,8 @@ export async function updateTargets(
|
|||||||
targets: {
|
targets: {
|
||||||
oldTargets: SubnetProxyTarget[];
|
oldTargets: SubnetProxyTarget[];
|
||||||
newTargets: SubnetProxyTarget[];
|
newTargets: SubnetProxyTarget[];
|
||||||
}
|
},
|
||||||
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
@@ -45,7 +52,7 @@ export async function updateTargets(
|
|||||||
newTargets: targets.newTargets
|
newTargets: targets.newTargets
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ incrementConfigVersion: true }
|
{ incrementConfigVersion: true, compress: canCompress(version, "newt") }
|
||||||
).catch((error) => {
|
).catch((error) => {
|
||||||
logger.warn(`Error sending message:`, error);
|
logger.warn(`Error sending message:`, error);
|
||||||
});
|
});
|
||||||
@@ -56,7 +63,8 @@ export async function addPeerData(
|
|||||||
siteId: number,
|
siteId: number,
|
||||||
remoteSubnets: string[],
|
remoteSubnets: string[],
|
||||||
aliases: Alias[],
|
aliases: Alias[],
|
||||||
olmId?: string
|
olmId?: string,
|
||||||
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
if (!olmId) {
|
if (!olmId) {
|
||||||
const [olm] = await db
|
const [olm] = await db
|
||||||
@@ -68,6 +76,7 @@ export async function addPeerData(
|
|||||||
return; // ignore this because an olm might not be associated with the client anymore
|
return; // ignore this because an olm might not be associated with the client anymore
|
||||||
}
|
}
|
||||||
olmId = olm.olmId;
|
olmId = olm.olmId;
|
||||||
|
version = olm.version;
|
||||||
}
|
}
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
@@ -80,7 +89,7 @@ export async function addPeerData(
|
|||||||
aliases: aliases
|
aliases: aliases
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ incrementConfigVersion: true }
|
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||||
).catch((error) => {
|
).catch((error) => {
|
||||||
logger.warn(`Error sending message:`, error);
|
logger.warn(`Error sending message:`, error);
|
||||||
});
|
});
|
||||||
@@ -91,7 +100,8 @@ export async function removePeerData(
|
|||||||
siteId: number,
|
siteId: number,
|
||||||
remoteSubnets: string[],
|
remoteSubnets: string[],
|
||||||
aliases: Alias[],
|
aliases: Alias[],
|
||||||
olmId?: string
|
olmId?: string,
|
||||||
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
if (!olmId) {
|
if (!olmId) {
|
||||||
const [olm] = await db
|
const [olm] = await db
|
||||||
@@ -103,6 +113,7 @@ export async function removePeerData(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
olmId = olm.olmId;
|
olmId = olm.olmId;
|
||||||
|
version = olm.version;
|
||||||
}
|
}
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
@@ -115,7 +126,7 @@ export async function removePeerData(
|
|||||||
aliases: aliases
|
aliases: aliases
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ incrementConfigVersion: true }
|
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||||
).catch((error) => {
|
).catch((error) => {
|
||||||
logger.warn(`Error sending message:`, error);
|
logger.warn(`Error sending message:`, error);
|
||||||
});
|
});
|
||||||
@@ -136,7 +147,8 @@ export async function updatePeerData(
|
|||||||
newAliases: Alias[];
|
newAliases: Alias[];
|
||||||
}
|
}
|
||||||
| undefined,
|
| undefined,
|
||||||
olmId?: string
|
olmId?: string,
|
||||||
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
if (!olmId) {
|
if (!olmId) {
|
||||||
const [olm] = await db
|
const [olm] = await db
|
||||||
@@ -148,6 +160,7 @@ export async function updatePeerData(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
olmId = olm.olmId;
|
olmId = olm.olmId;
|
||||||
|
version = olm.version;
|
||||||
}
|
}
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
@@ -160,7 +173,7 @@ export async function updatePeerData(
|
|||||||
...aliases
|
...aliases
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ incrementConfigVersion: true }
|
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||||
).catch((error) => {
|
).catch((error) => {
|
||||||
logger.warn(`Error sending message:`, error);
|
logger.warn(`Error sending message:`, error);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -243,9 +243,9 @@ export async function buildTargetConfigurationForNewtClient(siteId: number) {
|
|||||||
!target.hcInterval ||
|
!target.hcInterval ||
|
||||||
!target.hcMethod
|
!target.hcMethod
|
||||||
) {
|
) {
|
||||||
logger.debug(
|
// logger.debug(
|
||||||
`Skipping adding target health check ${target.targetId} due to missing health check fields`
|
// `Skipping adding target health check ${target.targetId} due to missing health check fields`
|
||||||
);
|
// );
|
||||||
return null; // Skip targets with missing health check fields
|
return null; // Skip targets with missing health check fields
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db";
|
|||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
||||||
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
||||||
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
|
||||||
const inputSchema = z.object({
|
const inputSchema = z.object({
|
||||||
publicKey: z.string(),
|
publicKey: z.string(),
|
||||||
@@ -135,6 +136,9 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
|||||||
targets
|
targets
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
options: {
|
||||||
|
compress: canCompress(newt.version, "newt")
|
||||||
|
},
|
||||||
broadcast: false,
|
broadcast: false,
|
||||||
excludeSender: false
|
excludeSender: false
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -5,9 +5,7 @@ import { eq } from "drizzle-orm";
|
|||||||
import { addPeer, deletePeer } from "../gerbil/peers";
|
import { addPeer, deletePeer } from "../gerbil/peers";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
import {
|
import { findNextAvailableCidr } from "@server/lib/ip";
|
||||||
findNextAvailableCidr,
|
|
||||||
} from "@server/lib/ip";
|
|
||||||
import {
|
import {
|
||||||
selectBestExitNode,
|
selectBestExitNode,
|
||||||
verifyExitNodeOrgAccess
|
verifyExitNodeOrgAccess
|
||||||
@@ -15,6 +13,7 @@ import {
|
|||||||
import { fetchContainers } from "./dockerSocket";
|
import { fetchContainers } from "./dockerSocket";
|
||||||
import { lockManager } from "#dynamic/lib/lock";
|
import { lockManager } from "#dynamic/lib/lock";
|
||||||
import { buildTargetConfigurationForNewtClient } from "./buildConfiguration";
|
import { buildTargetConfigurationForNewtClient } from "./buildConfiguration";
|
||||||
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
|
||||||
export type ExitNodePingResult = {
|
export type ExitNodePingResult = {
|
||||||
exitNodeId: number;
|
exitNodeId: number;
|
||||||
@@ -215,6 +214,9 @@ export const handleNewtRegisterMessage: MessageHandler = async (context) => {
|
|||||||
healthCheckTargets: validHealthCheckTargets
|
healthCheckTargets: validHealthCheckTargets
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
options: {
|
||||||
|
compress: canCompress(newt.version, "newt")
|
||||||
|
},
|
||||||
broadcast: false, // Send to all clients
|
broadcast: false, // Send to all clients
|
||||||
excludeSender: false // Include sender in broadcast
|
excludeSender: false // Include sender in broadcast
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import {
|
|||||||
buildClientConfigurationForNewtClient,
|
buildClientConfigurationForNewtClient,
|
||||||
buildTargetConfigurationForNewtClient
|
buildTargetConfigurationForNewtClient
|
||||||
} from "./buildConfiguration";
|
} from "./buildConfiguration";
|
||||||
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
|
||||||
export async function sendNewtSyncMessage(newt: Newt, site: Site) {
|
export async function sendNewtSyncMessage(newt: Newt, site: Site) {
|
||||||
const { tcpTargets, udpTargets, validHealthCheckTargets } =
|
const { tcpTargets, udpTargets, validHealthCheckTargets } =
|
||||||
@@ -24,18 +25,24 @@ export async function sendNewtSyncMessage(newt: Newt, site: Site) {
|
|||||||
exitNode
|
exitNode
|
||||||
);
|
);
|
||||||
|
|
||||||
await sendToClient(newt.newtId, {
|
await sendToClient(
|
||||||
type: "newt/sync",
|
newt.newtId,
|
||||||
data: {
|
{
|
||||||
proxyTargets: {
|
type: "newt/sync",
|
||||||
udp: udpTargets,
|
data: {
|
||||||
tcp: tcpTargets
|
proxyTargets: {
|
||||||
},
|
udp: udpTargets,
|
||||||
healthCheckTargets: validHealthCheckTargets,
|
tcp: tcpTargets
|
||||||
peers: peers,
|
},
|
||||||
clientTargets: targets
|
healthCheckTargets: validHealthCheckTargets,
|
||||||
|
peers: peers,
|
||||||
|
clientTargets: targets
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
compress: canCompress(newt.version, "newt")
|
||||||
}
|
}
|
||||||
}).catch((error) => {
|
).catch((error) => {
|
||||||
logger.warn(`Error sending newt sync message:`, error);
|
logger.warn(`Error sending newt sync message:`, error);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,13 +2,14 @@ import { Target, TargetHealthCheck, db, targetHealthCheck } from "@server/db";
|
|||||||
import { sendToClient } from "#dynamic/routers/ws";
|
import { sendToClient } from "#dynamic/routers/ws";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { eq, inArray } from "drizzle-orm";
|
import { eq, inArray } from "drizzle-orm";
|
||||||
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
|
||||||
export async function addTargets(
|
export async function addTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: Target[],
|
targets: Target[],
|
||||||
healthCheckData: TargetHealthCheck[],
|
healthCheckData: TargetHealthCheck[],
|
||||||
protocol: string,
|
protocol: string,
|
||||||
port: number | null = null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
//create a list of udp and tcp targets
|
//create a list of udp and tcp targets
|
||||||
const payloadTargets = targets.map((target) => {
|
const payloadTargets = targets.map((target) => {
|
||||||
@@ -22,7 +23,7 @@ export async function addTargets(
|
|||||||
data: {
|
data: {
|
||||||
targets: payloadTargets
|
targets: payloadTargets
|
||||||
}
|
}
|
||||||
}, { incrementConfigVersion: true });
|
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
|
||||||
|
|
||||||
// Create a map for quick lookup
|
// Create a map for quick lookup
|
||||||
const healthCheckMap = new Map<number, TargetHealthCheck>();
|
const healthCheckMap = new Map<number, TargetHealthCheck>();
|
||||||
@@ -103,14 +104,14 @@ export async function addTargets(
|
|||||||
data: {
|
data: {
|
||||||
targets: validHealthCheckTargets
|
targets: validHealthCheckTargets
|
||||||
}
|
}
|
||||||
}, { incrementConfigVersion: true });
|
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function removeTargets(
|
export async function removeTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: Target[],
|
targets: Target[],
|
||||||
protocol: string,
|
protocol: string,
|
||||||
port: number | null = null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
//create a list of udp and tcp targets
|
//create a list of udp and tcp targets
|
||||||
const payloadTargets = targets.map((target) => {
|
const payloadTargets = targets.map((target) => {
|
||||||
@@ -135,5 +136,5 @@ export async function removeTargets(
|
|||||||
data: {
|
data: {
|
||||||
ids: healthCheckTargets
|
ids: healthCheckTargets
|
||||||
}
|
}
|
||||||
}, { incrementConfigVersion: true });
|
}, { incrementConfigVersion: true, compress: canCompress(version, "newt") });
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import { OlmErrorCodes, sendOlmError } from "./error";
|
|||||||
import { handleFingerprintInsertion } from "./fingerprintingUtils";
|
import { handleFingerprintInsertion } from "./fingerprintingUtils";
|
||||||
import { Alias } from "@server/lib/ip";
|
import { Alias } from "@server/lib/ip";
|
||||||
import { build } from "@server/build";
|
import { build } from "@server/build";
|
||||||
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
|
||||||
export const handleOlmRegisterMessage: MessageHandler = async (context) => {
|
export const handleOlmRegisterMessage: MessageHandler = async (context) => {
|
||||||
logger.info("Handling register olm message!");
|
logger.info("Handling register olm message!");
|
||||||
@@ -295,6 +296,9 @@ export const handleOlmRegisterMessage: MessageHandler = async (context) => {
|
|||||||
utilitySubnet: org.utilitySubnet
|
utilitySubnet: org.utilitySubnet
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
options: {
|
||||||
|
compress: canCompress(olm.version, "olm")
|
||||||
|
},
|
||||||
broadcast: false,
|
broadcast: false,
|
||||||
excludeSender: false
|
excludeSender: false
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
import { sendToClient } from "#dynamic/routers/ws";
|
import { sendToClient } from "#dynamic/routers/ws";
|
||||||
import { clientSitesAssociationsCache, db, olms } from "@server/db";
|
import { clientSitesAssociationsCache, db, olms } from "@server/db";
|
||||||
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { and, eq } from "drizzle-orm";
|
import { and, eq } from "drizzle-orm";
|
||||||
@@ -18,7 +19,8 @@ export async function addPeer(
|
|||||||
remoteSubnets: string[] | null; // optional, comma-separated list of subnets that this site can access
|
remoteSubnets: string[] | null; // optional, comma-separated list of subnets that this site can access
|
||||||
aliases: Alias[];
|
aliases: Alias[];
|
||||||
},
|
},
|
||||||
olmId?: string
|
olmId?: string,
|
||||||
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
if (!olmId) {
|
if (!olmId) {
|
||||||
const [olm] = await db
|
const [olm] = await db
|
||||||
@@ -30,6 +32,7 @@ export async function addPeer(
|
|||||||
return; // ignore this because an olm might not be associated with the client anymore
|
return; // ignore this because an olm might not be associated with the client anymore
|
||||||
}
|
}
|
||||||
olmId = olm.olmId;
|
olmId = olm.olmId;
|
||||||
|
version = olm.version;
|
||||||
}
|
}
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
@@ -48,7 +51,7 @@ export async function addPeer(
|
|||||||
aliases: peer.aliases
|
aliases: peer.aliases
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ incrementConfigVersion: true }
|
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||||
).catch((error) => {
|
).catch((error) => {
|
||||||
logger.warn(`Error sending message:`, error);
|
logger.warn(`Error sending message:`, error);
|
||||||
});
|
});
|
||||||
@@ -60,7 +63,8 @@ export async function deletePeer(
|
|||||||
clientId: number,
|
clientId: number,
|
||||||
siteId: number,
|
siteId: number,
|
||||||
publicKey: string,
|
publicKey: string,
|
||||||
olmId?: string
|
olmId?: string,
|
||||||
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
if (!olmId) {
|
if (!olmId) {
|
||||||
const [olm] = await db
|
const [olm] = await db
|
||||||
@@ -72,6 +76,7 @@ export async function deletePeer(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
olmId = olm.olmId;
|
olmId = olm.olmId;
|
||||||
|
version = olm.version;
|
||||||
}
|
}
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
@@ -83,7 +88,7 @@ export async function deletePeer(
|
|||||||
siteId: siteId
|
siteId: siteId
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ incrementConfigVersion: true }
|
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||||
).catch((error) => {
|
).catch((error) => {
|
||||||
logger.warn(`Error sending message:`, error);
|
logger.warn(`Error sending message:`, error);
|
||||||
});
|
});
|
||||||
@@ -103,7 +108,8 @@ export async function updatePeer(
|
|||||||
remoteSubnets?: string[] | null; // optional, comma-separated list of subnets that
|
remoteSubnets?: string[] | null; // optional, comma-separated list of subnets that
|
||||||
aliases?: Alias[] | null;
|
aliases?: Alias[] | null;
|
||||||
},
|
},
|
||||||
olmId?: string
|
olmId?: string,
|
||||||
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
if (!olmId) {
|
if (!olmId) {
|
||||||
const [olm] = await db
|
const [olm] = await db
|
||||||
@@ -115,6 +121,7 @@ export async function updatePeer(
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
olmId = olm.olmId;
|
olmId = olm.olmId;
|
||||||
|
version = olm.version;
|
||||||
}
|
}
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
@@ -132,7 +139,7 @@ export async function updatePeer(
|
|||||||
aliases: peer.aliases
|
aliases: peer.aliases
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
{ incrementConfigVersion: true }
|
{ incrementConfigVersion: true, compress: canCompress(version, "olm") }
|
||||||
).catch((error) => {
|
).catch((error) => {
|
||||||
logger.warn(`Error sending message:`, error);
|
logger.warn(`Error sending message:`, error);
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -1,9 +1,17 @@
|
|||||||
import { Client, db, exitNodes, Olm, sites, clientSitesAssociationsCache } from "@server/db";
|
import {
|
||||||
|
Client,
|
||||||
|
db,
|
||||||
|
exitNodes,
|
||||||
|
Olm,
|
||||||
|
sites,
|
||||||
|
clientSitesAssociationsCache
|
||||||
|
} from "@server/db";
|
||||||
import { buildSiteConfigurationForOlmClient } from "./buildConfiguration";
|
import { buildSiteConfigurationForOlmClient } from "./buildConfiguration";
|
||||||
import { sendToClient } from "#dynamic/routers/ws";
|
import { sendToClient } from "#dynamic/routers/ws";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { eq, inArray } from "drizzle-orm";
|
import { eq, inArray } from "drizzle-orm";
|
||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
|
||||||
export async function sendOlmSyncMessage(olm: Olm, client: Client) {
|
export async function sendOlmSyncMessage(olm: Olm, client: Client) {
|
||||||
// NOTE: WE ARE HARDCODING THE RELAY PARAMETER TO FALSE HERE BUT IN THE REGISTER MESSAGE ITS DEFINED BY THE CLIENT
|
// NOTE: WE ARE HARDCODING THE RELAY PARAMETER TO FALSE HERE BUT IN THE REGISTER MESSAGE ITS DEFINED BY THE CLIENT
|
||||||
@@ -17,10 +25,7 @@ export async function sendOlmSyncMessage(olm: Olm, client: Client) {
|
|||||||
const clientSites = await db
|
const clientSites = await db
|
||||||
.select()
|
.select()
|
||||||
.from(clientSitesAssociationsCache)
|
.from(clientSitesAssociationsCache)
|
||||||
.innerJoin(
|
.innerJoin(sites, eq(sites.siteId, clientSitesAssociationsCache.siteId))
|
||||||
sites,
|
|
||||||
eq(sites.siteId, clientSitesAssociationsCache.siteId)
|
|
||||||
)
|
|
||||||
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
|
.where(eq(clientSitesAssociationsCache.clientId, client.clientId));
|
||||||
|
|
||||||
// Extract unique exit node IDs
|
// Extract unique exit node IDs
|
||||||
@@ -68,13 +73,20 @@ export async function sendOlmSyncMessage(olm: Olm, client: Client) {
|
|||||||
|
|
||||||
logger.debug("sendOlmSyncMessage: sending sync message");
|
logger.debug("sendOlmSyncMessage: sending sync message");
|
||||||
|
|
||||||
await sendToClient(olm.olmId, {
|
await sendToClient(
|
||||||
type: "olm/sync",
|
olm.olmId,
|
||||||
data: {
|
{
|
||||||
sites: siteConfigurations,
|
type: "olm/sync",
|
||||||
exitNodes: exitNodesData
|
data: {
|
||||||
|
sites: siteConfigurations,
|
||||||
|
exitNodes: exitNodesData
|
||||||
|
}
|
||||||
|
},
|
||||||
|
|
||||||
|
{
|
||||||
|
compress: canCompress(olm.version, "olm")
|
||||||
}
|
}
|
||||||
}).catch((error) => {
|
).catch((error) => {
|
||||||
logger.warn(`Error sending olm sync message:`, error);
|
logger.warn(`Error sending olm sync message:`, error);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -620,7 +620,7 @@ export async function handleMessagingForUpdatedSiteResource(
|
|||||||
await updateTargets(newt.newtId, {
|
await updateTargets(newt.newtId, {
|
||||||
oldTargets: oldTargets,
|
oldTargets: oldTargets,
|
||||||
newTargets: newTargets
|
newTargets: newTargets
|
||||||
});
|
}, newt.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
const olmJobs: Promise<void>[] = [];
|
const olmJobs: Promise<void>[] = [];
|
||||||
|
|||||||
@@ -264,7 +264,7 @@ export async function createTarget(
|
|||||||
newTarget,
|
newTarget,
|
||||||
healthCheck,
|
healthCheck,
|
||||||
resource.protocol,
|
resource.protocol,
|
||||||
resource.proxyPort
|
newt.version
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -262,7 +262,7 @@ export async function updateTarget(
|
|||||||
[updatedTarget],
|
[updatedTarget],
|
||||||
[updatedHc],
|
[updatedHc],
|
||||||
resource.protocol,
|
resource.protocol,
|
||||||
resource.proxyPort
|
newt.version
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user