mirror of
https://github.com/fosrl/pangolin.git
synced 2026-03-24 03:26:39 +00:00
Compare commits
1 Commits
logging-pr
...
1.16.2-s.1
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
85335bfecc |
@@ -1,11 +1,9 @@
|
|||||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||||
import { flushConnectionLogToDb } from "#dynamic/routers/newt";
|
|
||||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||||
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
import { cleanup as wsCleanup } from "#dynamic/routers/ws";
|
||||||
|
|
||||||
async function cleanup() {
|
async function cleanup() {
|
||||||
await flushBandwidthToDb();
|
await flushBandwidthToDb();
|
||||||
await flushConnectionLogToDb();
|
|
||||||
await flushSiteBandwidthToDb();
|
await flushSiteBandwidthToDb();
|
||||||
await wsCleanup();
|
await wsCleanup();
|
||||||
|
|
||||||
@@ -16,4 +14,4 @@ export async function initCleanup() {
|
|||||||
// Handle process termination
|
// Handle process termination
|
||||||
process.on("SIGTERM", () => cleanup());
|
process.on("SIGTERM", () => cleanup());
|
||||||
process.on("SIGINT", () => cleanup());
|
process.on("SIGINT", () => cleanup());
|
||||||
}
|
}
|
||||||
@@ -17,9 +17,7 @@ import {
|
|||||||
users,
|
users,
|
||||||
exitNodes,
|
exitNodes,
|
||||||
sessions,
|
sessions,
|
||||||
clients,
|
clients
|
||||||
siteResources,
|
|
||||||
sites
|
|
||||||
} from "./schema";
|
} from "./schema";
|
||||||
|
|
||||||
export const certificates = pgTable("certificates", {
|
export const certificates = pgTable("certificates", {
|
||||||
@@ -304,39 +302,6 @@ export const accessAuditLog = pgTable(
|
|||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
|
||||||
export const connectionAuditLog = pgTable(
|
|
||||||
"connectionAuditLog",
|
|
||||||
{
|
|
||||||
id: serial("id").primaryKey(),
|
|
||||||
sessionId: text("sessionId").notNull(),
|
|
||||||
siteResourceId: integer("siteResourceId").references(
|
|
||||||
() => siteResources.siteResourceId,
|
|
||||||
{ onDelete: "cascade" }
|
|
||||||
),
|
|
||||||
orgId: text("orgId").references(() => orgs.orgId, {
|
|
||||||
onDelete: "cascade"
|
|
||||||
}),
|
|
||||||
siteId: integer("siteId").references(() => sites.siteId, {
|
|
||||||
onDelete: "cascade"
|
|
||||||
}),
|
|
||||||
sourceAddr: text("sourceAddr").notNull(),
|
|
||||||
destAddr: text("destAddr").notNull(),
|
|
||||||
protocol: text("protocol").notNull(),
|
|
||||||
startedAt: integer("startedAt").notNull(),
|
|
||||||
endedAt: integer("endedAt"),
|
|
||||||
bytesTx: integer("bytesTx"),
|
|
||||||
bytesRx: integer("bytesRx")
|
|
||||||
},
|
|
||||||
(table) => [
|
|
||||||
index("idx_accessAuditLog_startedAt").on(table.startedAt),
|
|
||||||
index("idx_accessAuditLog_org_startedAt").on(
|
|
||||||
table.orgId,
|
|
||||||
table.startedAt
|
|
||||||
),
|
|
||||||
index("idx_accessAuditLog_siteResourceId").on(table.siteResourceId)
|
|
||||||
]
|
|
||||||
);
|
|
||||||
|
|
||||||
export const approvals = pgTable("approvals", {
|
export const approvals = pgTable("approvals", {
|
||||||
approvalId: serial("approvalId").primaryKey(),
|
approvalId: serial("approvalId").primaryKey(),
|
||||||
timestamp: integer("timestamp").notNull(), // this is EPOCH time in seconds
|
timestamp: integer("timestamp").notNull(), // this is EPOCH time in seconds
|
||||||
@@ -392,4 +357,3 @@ export type LoginPage = InferSelectModel<typeof loginPage>;
|
|||||||
export type LoginPageBranding = InferSelectModel<typeof loginPageBranding>;
|
export type LoginPageBranding = InferSelectModel<typeof loginPageBranding>;
|
||||||
export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>;
|
export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>;
|
||||||
export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>;
|
export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>;
|
||||||
export type ConnectionAuditLog = InferSelectModel<typeof connectionAuditLog>;
|
|
||||||
|
|||||||
@@ -55,9 +55,6 @@ export const orgs = pgTable("orgs", {
|
|||||||
settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
|
settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
|
||||||
.notNull()
|
.notNull()
|
||||||
.default(0),
|
.default(0),
|
||||||
settingsLogRetentionDaysConnection: integer("settingsLogRetentionDaysConnection") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
|
|
||||||
.notNull()
|
|
||||||
.default(0),
|
|
||||||
sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format)
|
sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format)
|
||||||
sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format)
|
sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format)
|
||||||
isBillingOrg: boolean("isBillingOrg"),
|
isBillingOrg: boolean("isBillingOrg"),
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ import {
|
|||||||
sqliteTable,
|
sqliteTable,
|
||||||
text
|
text
|
||||||
} from "drizzle-orm/sqlite-core";
|
} from "drizzle-orm/sqlite-core";
|
||||||
import { clients, domains, exitNodes, orgs, sessions, siteResources, sites, users } from "./schema";
|
import { clients, domains, exitNodes, orgs, sessions, users } from "./schema";
|
||||||
|
|
||||||
export const certificates = sqliteTable("certificates", {
|
export const certificates = sqliteTable("certificates", {
|
||||||
certId: integer("certId").primaryKey({ autoIncrement: true }),
|
certId: integer("certId").primaryKey({ autoIncrement: true }),
|
||||||
@@ -294,39 +294,6 @@ export const accessAuditLog = sqliteTable(
|
|||||||
]
|
]
|
||||||
);
|
);
|
||||||
|
|
||||||
export const connectionAuditLog = sqliteTable(
|
|
||||||
"connectionAuditLog",
|
|
||||||
{
|
|
||||||
id: integer("id").primaryKey({ autoIncrement: true }),
|
|
||||||
sessionId: text("sessionId").notNull(),
|
|
||||||
siteResourceId: integer("siteResourceId").references(
|
|
||||||
() => siteResources.siteResourceId,
|
|
||||||
{ onDelete: "cascade" }
|
|
||||||
),
|
|
||||||
orgId: text("orgId").references(() => orgs.orgId, {
|
|
||||||
onDelete: "cascade"
|
|
||||||
}),
|
|
||||||
siteId: integer("siteId").references(() => sites.siteId, {
|
|
||||||
onDelete: "cascade"
|
|
||||||
}),
|
|
||||||
sourceAddr: text("sourceAddr").notNull(),
|
|
||||||
destAddr: text("destAddr").notNull(),
|
|
||||||
protocol: text("protocol").notNull(),
|
|
||||||
startedAt: integer("startedAt").notNull(),
|
|
||||||
endedAt: integer("endedAt"),
|
|
||||||
bytesTx: integer("bytesTx"),
|
|
||||||
bytesRx: integer("bytesRx")
|
|
||||||
},
|
|
||||||
(table) => [
|
|
||||||
index("idx_accessAuditLog_startedAt").on(table.startedAt),
|
|
||||||
index("idx_accessAuditLog_org_startedAt").on(
|
|
||||||
table.orgId,
|
|
||||||
table.startedAt
|
|
||||||
),
|
|
||||||
index("idx_accessAuditLog_siteResourceId").on(table.siteResourceId)
|
|
||||||
]
|
|
||||||
);
|
|
||||||
|
|
||||||
export const approvals = sqliteTable("approvals", {
|
export const approvals = sqliteTable("approvals", {
|
||||||
approvalId: integer("approvalId").primaryKey({ autoIncrement: true }),
|
approvalId: integer("approvalId").primaryKey({ autoIncrement: true }),
|
||||||
timestamp: integer("timestamp").notNull(), // this is EPOCH time in seconds
|
timestamp: integer("timestamp").notNull(), // this is EPOCH time in seconds
|
||||||
@@ -381,4 +348,3 @@ export type LoginPage = InferSelectModel<typeof loginPage>;
|
|||||||
export type LoginPageBranding = InferSelectModel<typeof loginPageBranding>;
|
export type LoginPageBranding = InferSelectModel<typeof loginPageBranding>;
|
||||||
export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>;
|
export type ActionAuditLog = InferSelectModel<typeof actionAuditLog>;
|
||||||
export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>;
|
export type AccessAuditLog = InferSelectModel<typeof accessAuditLog>;
|
||||||
export type ConnectionAuditLog = InferSelectModel<typeof connectionAuditLog>;
|
|
||||||
|
|||||||
@@ -47,9 +47,6 @@ export const orgs = sqliteTable("orgs", {
|
|||||||
settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
|
settingsLogRetentionDaysAction: integer("settingsLogRetentionDaysAction") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
|
||||||
.notNull()
|
.notNull()
|
||||||
.default(0),
|
.default(0),
|
||||||
settingsLogRetentionDaysConnection: integer("settingsLogRetentionDaysConnection") // where 0 = dont keep logs and -1 = keep forever and 9001 = end of the following year
|
|
||||||
.notNull()
|
|
||||||
.default(0),
|
|
||||||
sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format)
|
sshCaPrivateKey: text("sshCaPrivateKey"), // Encrypted SSH CA private key (PEM format)
|
||||||
sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format)
|
sshCaPublicKey: text("sshCaPublicKey"), // SSH CA public key (OpenSSH format)
|
||||||
isBillingOrg: integer("isBillingOrg", { mode: "boolean" }),
|
isBillingOrg: integer("isBillingOrg", { mode: "boolean" }),
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ import { db, orgs } from "@server/db";
|
|||||||
import { cleanUpOldLogs as cleanUpOldAccessLogs } from "#dynamic/lib/logAccessAudit";
|
import { cleanUpOldLogs as cleanUpOldAccessLogs } from "#dynamic/lib/logAccessAudit";
|
||||||
import { cleanUpOldLogs as cleanUpOldActionLogs } from "#dynamic/middlewares/logActionAudit";
|
import { cleanUpOldLogs as cleanUpOldActionLogs } from "#dynamic/middlewares/logActionAudit";
|
||||||
import { cleanUpOldLogs as cleanUpOldRequestLogs } from "@server/routers/badger/logRequestAudit";
|
import { cleanUpOldLogs as cleanUpOldRequestLogs } from "@server/routers/badger/logRequestAudit";
|
||||||
import { cleanUpOldLogs as cleanUpOldConnectionLogs } from "#dynamic/routers/newt";
|
|
||||||
import { gt, or } from "drizzle-orm";
|
import { gt, or } from "drizzle-orm";
|
||||||
import { cleanUpOldFingerprintSnapshots } from "@server/routers/olm/fingerprintingUtils";
|
import { cleanUpOldFingerprintSnapshots } from "@server/routers/olm/fingerprintingUtils";
|
||||||
import { build } from "@server/build";
|
import { build } from "@server/build";
|
||||||
@@ -21,17 +20,14 @@ export function initLogCleanupInterval() {
|
|||||||
settingsLogRetentionDaysAccess:
|
settingsLogRetentionDaysAccess:
|
||||||
orgs.settingsLogRetentionDaysAccess,
|
orgs.settingsLogRetentionDaysAccess,
|
||||||
settingsLogRetentionDaysRequest:
|
settingsLogRetentionDaysRequest:
|
||||||
orgs.settingsLogRetentionDaysRequest,
|
orgs.settingsLogRetentionDaysRequest
|
||||||
settingsLogRetentionDaysConnection:
|
|
||||||
orgs.settingsLogRetentionDaysConnection
|
|
||||||
})
|
})
|
||||||
.from(orgs)
|
.from(orgs)
|
||||||
.where(
|
.where(
|
||||||
or(
|
or(
|
||||||
gt(orgs.settingsLogRetentionDaysAction, 0),
|
gt(orgs.settingsLogRetentionDaysAction, 0),
|
||||||
gt(orgs.settingsLogRetentionDaysAccess, 0),
|
gt(orgs.settingsLogRetentionDaysAccess, 0),
|
||||||
gt(orgs.settingsLogRetentionDaysRequest, 0),
|
gt(orgs.settingsLogRetentionDaysRequest, 0)
|
||||||
gt(orgs.settingsLogRetentionDaysConnection, 0)
|
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -41,8 +37,7 @@ export function initLogCleanupInterval() {
|
|||||||
orgId,
|
orgId,
|
||||||
settingsLogRetentionDaysAction,
|
settingsLogRetentionDaysAction,
|
||||||
settingsLogRetentionDaysAccess,
|
settingsLogRetentionDaysAccess,
|
||||||
settingsLogRetentionDaysRequest,
|
settingsLogRetentionDaysRequest
|
||||||
settingsLogRetentionDaysConnection
|
|
||||||
} = org;
|
} = org;
|
||||||
|
|
||||||
if (settingsLogRetentionDaysAction > 0) {
|
if (settingsLogRetentionDaysAction > 0) {
|
||||||
@@ -65,13 +60,6 @@ export function initLogCleanupInterval() {
|
|||||||
settingsLogRetentionDaysRequest
|
settingsLogRetentionDaysRequest
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (settingsLogRetentionDaysConnection > 0) {
|
|
||||||
await cleanUpOldConnectionLogs(
|
|
||||||
orgId,
|
|
||||||
settingsLogRetentionDaysConnection
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
await cleanUpOldFingerprintSnapshots(365);
|
await cleanUpOldFingerprintSnapshots(365);
|
||||||
|
|||||||
123
server/lib/ip.ts
123
server/lib/ip.ts
@@ -571,129 +571,6 @@ export function generateSubnetProxyTargets(
|
|||||||
return targets;
|
return targets;
|
||||||
}
|
}
|
||||||
|
|
||||||
export type SubnetProxyTargetV2 = {
|
|
||||||
sourcePrefixes: string[]; // must be cidrs
|
|
||||||
destPrefix: string; // must be a cidr
|
|
||||||
disableIcmp?: boolean;
|
|
||||||
rewriteTo?: string; // must be a cidr
|
|
||||||
portRange?: {
|
|
||||||
min: number;
|
|
||||||
max: number;
|
|
||||||
protocol: "tcp" | "udp";
|
|
||||||
}[];
|
|
||||||
};
|
|
||||||
|
|
||||||
export function generateSubnetProxyTargetV2(
|
|
||||||
siteResource: SiteResource,
|
|
||||||
clients: {
|
|
||||||
clientId: number;
|
|
||||||
pubKey: string | null;
|
|
||||||
subnet: string | null;
|
|
||||||
}[]
|
|
||||||
): SubnetProxyTargetV2 | undefined {
|
|
||||||
if (clients.length === 0) {
|
|
||||||
logger.debug(
|
|
||||||
`No clients have access to site resource ${siteResource.siteResourceId}, skipping target generation.`
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let target: SubnetProxyTargetV2 | null = null;
|
|
||||||
|
|
||||||
const portRange = [
|
|
||||||
...parsePortRangeString(siteResource.tcpPortRangeString, "tcp"),
|
|
||||||
...parsePortRangeString(siteResource.udpPortRangeString, "udp")
|
|
||||||
];
|
|
||||||
const disableIcmp = siteResource.disableIcmp ?? false;
|
|
||||||
|
|
||||||
if (siteResource.mode == "host") {
|
|
||||||
let destination = siteResource.destination;
|
|
||||||
// check if this is a valid ip
|
|
||||||
const ipSchema = z.union([z.ipv4(), z.ipv6()]);
|
|
||||||
if (ipSchema.safeParse(destination).success) {
|
|
||||||
destination = `${destination}/32`;
|
|
||||||
|
|
||||||
target = {
|
|
||||||
sourcePrefixes: [],
|
|
||||||
destPrefix: destination,
|
|
||||||
portRange,
|
|
||||||
disableIcmp
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (siteResource.alias && siteResource.aliasAddress) {
|
|
||||||
// also push a match for the alias address
|
|
||||||
target = {
|
|
||||||
sourcePrefixes: [],
|
|
||||||
destPrefix: `${siteResource.aliasAddress}/32`,
|
|
||||||
rewriteTo: destination,
|
|
||||||
portRange,
|
|
||||||
disableIcmp
|
|
||||||
};
|
|
||||||
}
|
|
||||||
} else if (siteResource.mode == "cidr") {
|
|
||||||
target = {
|
|
||||||
sourcePrefixes: [],
|
|
||||||
destPrefix: siteResource.destination,
|
|
||||||
portRange,
|
|
||||||
disableIcmp
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!target) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (const clientSite of clients) {
|
|
||||||
if (!clientSite.subnet) {
|
|
||||||
logger.debug(
|
|
||||||
`Client ${clientSite.clientId} has no subnet, skipping for site resource ${siteResource.siteResourceId}.`
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const clientPrefix = `${clientSite.subnet.split("/")[0]}/32`;
|
|
||||||
|
|
||||||
// add client prefix to source prefixes
|
|
||||||
target.sourcePrefixes.push(clientPrefix);
|
|
||||||
}
|
|
||||||
|
|
||||||
// print a nice representation of the targets
|
|
||||||
// logger.debug(
|
|
||||||
// `Generated subnet proxy targets for: ${JSON.stringify(targets, null, 2)}`
|
|
||||||
// );
|
|
||||||
|
|
||||||
return target;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Converts a SubnetProxyTargetV2 to an array of SubnetProxyTarget (v1)
|
|
||||||
* by expanding each source prefix into its own target entry.
|
|
||||||
* @param targetV2 - The v2 target to convert
|
|
||||||
* @returns Array of v1 SubnetProxyTarget objects
|
|
||||||
*/
|
|
||||||
export function convertSubnetProxyTargetsV2ToV1(
|
|
||||||
targetsV2: SubnetProxyTargetV2[]
|
|
||||||
): SubnetProxyTarget[] {
|
|
||||||
return targetsV2.flatMap((targetV2) =>
|
|
||||||
targetV2.sourcePrefixes.map((sourcePrefix) => ({
|
|
||||||
sourcePrefix,
|
|
||||||
destPrefix: targetV2.destPrefix,
|
|
||||||
...(targetV2.disableIcmp !== undefined && {
|
|
||||||
disableIcmp: targetV2.disableIcmp
|
|
||||||
}),
|
|
||||||
...(targetV2.rewriteTo !== undefined && {
|
|
||||||
rewriteTo: targetV2.rewriteTo
|
|
||||||
}),
|
|
||||||
...(targetV2.portRange !== undefined && {
|
|
||||||
portRange: targetV2.portRange
|
|
||||||
})
|
|
||||||
}))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
// Custom schema for validating port range strings
|
// Custom schema for validating port range strings
|
||||||
// Format: "80,443,8000-9000" or "*" for all ports, or empty string
|
// Format: "80,443,8000-9000" or "*" for all ports, or empty string
|
||||||
export const portRangeStringSchema = z
|
export const portRangeStringSchema = z
|
||||||
|
|||||||
@@ -302,8 +302,8 @@ export const configSchema = z
|
|||||||
.optional()
|
.optional()
|
||||||
.default({
|
.default({
|
||||||
block_size: 24,
|
block_size: 24,
|
||||||
subnet_group: "100.90.128.0/20",
|
subnet_group: "100.90.128.0/24",
|
||||||
utility_subnet_group: "100.96.128.0/20"
|
utility_subnet_group: "100.96.128.0/24"
|
||||||
}),
|
}),
|
||||||
rate_limits: z
|
rate_limits: z
|
||||||
.object({
|
.object({
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ import logger from "@server/logger";
|
|||||||
import {
|
import {
|
||||||
generateAliasConfig,
|
generateAliasConfig,
|
||||||
generateRemoteSubnets,
|
generateRemoteSubnets,
|
||||||
generateSubnetProxyTargetV2,
|
generateSubnetProxyTargets,
|
||||||
parseEndpoint,
|
parseEndpoint,
|
||||||
formatEndpoint
|
formatEndpoint
|
||||||
} from "@server/lib/ip";
|
} from "@server/lib/ip";
|
||||||
@@ -660,16 +660,19 @@ async function handleSubnetProxyTargetUpdates(
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (addedClients.length > 0) {
|
if (addedClients.length > 0) {
|
||||||
const targetToAdd = generateSubnetProxyTargetV2(
|
const targetsToAdd = generateSubnetProxyTargets(
|
||||||
siteResource,
|
siteResource,
|
||||||
addedClients
|
addedClients
|
||||||
);
|
);
|
||||||
|
|
||||||
if (targetToAdd) {
|
if (targetsToAdd.length > 0) {
|
||||||
|
logger.info(
|
||||||
|
`Adding ${targetsToAdd.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
||||||
|
);
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
addSubnetProxyTargets(
|
addSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
[targetToAdd],
|
targetsToAdd,
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -697,16 +700,19 @@ async function handleSubnetProxyTargetUpdates(
|
|||||||
);
|
);
|
||||||
|
|
||||||
if (removedClients.length > 0) {
|
if (removedClients.length > 0) {
|
||||||
const targetToRemove = generateSubnetProxyTargetV2(
|
const targetsToRemove = generateSubnetProxyTargets(
|
||||||
siteResource,
|
siteResource,
|
||||||
removedClients
|
removedClients
|
||||||
);
|
);
|
||||||
|
|
||||||
if (targetToRemove) {
|
if (targetsToRemove.length > 0) {
|
||||||
|
logger.info(
|
||||||
|
`Removing ${targetsToRemove.length} subnet proxy targets for siteResource ${siteResource.siteResourceId}`
|
||||||
|
);
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
removeSubnetProxyTargets(
|
removeSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
[targetToRemove],
|
targetsToRemove,
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -1163,7 +1169,7 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const resource of resources) {
|
for (const resource of resources) {
|
||||||
const target = generateSubnetProxyTargetV2(resource, [
|
const targets = generateSubnetProxyTargets(resource, [
|
||||||
{
|
{
|
||||||
clientId: client.clientId,
|
clientId: client.clientId,
|
||||||
pubKey: client.pubKey,
|
pubKey: client.pubKey,
|
||||||
@@ -1171,11 +1177,11 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (target) {
|
if (targets.length > 0) {
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
addSubnetProxyTargets(
|
addSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
[target],
|
targets,
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
@@ -1240,7 +1246,7 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
|
|
||||||
for (const resource of resources) {
|
for (const resource of resources) {
|
||||||
const target = generateSubnetProxyTargetV2(resource, [
|
const targets = generateSubnetProxyTargets(resource, [
|
||||||
{
|
{
|
||||||
clientId: client.clientId,
|
clientId: client.clientId,
|
||||||
pubKey: client.pubKey,
|
pubKey: client.pubKey,
|
||||||
@@ -1248,11 +1254,11 @@ async function handleMessagesForClientResources(
|
|||||||
}
|
}
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if (target) {
|
if (targets.length > 0) {
|
||||||
proxyJobs.push(
|
proxyJobs.push(
|
||||||
removeSubnetProxyTargets(
|
removeSubnetProxyTargets(
|
||||||
newt.newtId,
|
newt.newtId,
|
||||||
[target],
|
targets,
|
||||||
newt.version
|
newt.version
|
||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|||||||
@@ -14,12 +14,10 @@
|
|||||||
import { rateLimitService } from "#private/lib/rateLimit";
|
import { rateLimitService } from "#private/lib/rateLimit";
|
||||||
import { cleanup as wsCleanup } from "#private/routers/ws";
|
import { cleanup as wsCleanup } from "#private/routers/ws";
|
||||||
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
import { flushBandwidthToDb } from "@server/routers/newt/handleReceiveBandwidthMessage";
|
||||||
import { flushConnectionLogToDb } from "#dynamic/routers/newt";
|
|
||||||
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
import { flushSiteBandwidthToDb } from "@server/routers/gerbil/receiveBandwidth";
|
||||||
|
|
||||||
async function cleanup() {
|
async function cleanup() {
|
||||||
await flushBandwidthToDb();
|
await flushBandwidthToDb();
|
||||||
await flushConnectionLogToDb();
|
|
||||||
await flushSiteBandwidthToDb();
|
await flushSiteBandwidthToDb();
|
||||||
await rateLimitService.cleanup();
|
await rateLimitService.cleanup();
|
||||||
await wsCleanup();
|
await wsCleanup();
|
||||||
@@ -31,4 +29,4 @@ export async function initCleanup() {
|
|||||||
// Handle process termination
|
// Handle process termination
|
||||||
process.on("SIGTERM", () => cleanup());
|
process.on("SIGTERM", () => cleanup());
|
||||||
process.on("SIGINT", () => cleanup());
|
process.on("SIGINT", () => cleanup());
|
||||||
}
|
}
|
||||||
@@ -57,10 +57,7 @@ export const privateConfigSchema = z.object({
|
|||||||
.object({
|
.object({
|
||||||
host: z.string(),
|
host: z.string(),
|
||||||
port: portSchema,
|
port: portSchema,
|
||||||
password: z
|
password: z.string().optional(),
|
||||||
.string()
|
|
||||||
.optional()
|
|
||||||
.transform(getEnvOrYaml("REDIS_PASSWORD")),
|
|
||||||
db: z.int().nonnegative().optional().default(0),
|
db: z.int().nonnegative().optional().default(0),
|
||||||
replicas: z
|
replicas: z
|
||||||
.array(
|
.array(
|
||||||
|
|||||||
@@ -1,324 +0,0 @@
|
|||||||
import { db, logsDb } from "@server/db";
|
|
||||||
import { MessageHandler } from "@server/routers/ws";
|
|
||||||
import { connectionAuditLog, sites, Newt } from "@server/db";
|
|
||||||
import { and, eq, lt } from "drizzle-orm";
|
|
||||||
import logger from "@server/logger";
|
|
||||||
import { inflate } from "zlib";
|
|
||||||
import { promisify } from "util";
|
|
||||||
import { calculateCutoffTimestamp } from "@server/lib/cleanupLogs";
|
|
||||||
|
|
||||||
const zlibInflate = promisify(inflate);
|
|
||||||
|
|
||||||
// Retry configuration for deadlock handling
|
|
||||||
const MAX_RETRIES = 3;
|
|
||||||
const BASE_DELAY_MS = 50;
|
|
||||||
|
|
||||||
// How often to flush accumulated connection log data to the database
|
|
||||||
const FLUSH_INTERVAL_MS = 30_000; // 30 seconds
|
|
||||||
|
|
||||||
// Maximum number of records to buffer before forcing a flush
|
|
||||||
const MAX_BUFFERED_RECORDS = 500;
|
|
||||||
|
|
||||||
// Maximum number of records to insert in a single batch
|
|
||||||
const INSERT_BATCH_SIZE = 100;
|
|
||||||
|
|
||||||
interface ConnectionSessionData {
|
|
||||||
sessionId: string;
|
|
||||||
resourceId: number;
|
|
||||||
sourceAddr: string;
|
|
||||||
destAddr: string;
|
|
||||||
protocol: string;
|
|
||||||
startedAt: string; // ISO 8601 timestamp
|
|
||||||
endedAt?: string; // ISO 8601 timestamp
|
|
||||||
bytesTx?: number;
|
|
||||||
bytesRx?: number;
|
|
||||||
}
|
|
||||||
|
|
||||||
interface ConnectionLogRecord {
|
|
||||||
sessionId: string;
|
|
||||||
siteResourceId: number;
|
|
||||||
orgId: string;
|
|
||||||
siteId: number;
|
|
||||||
sourceAddr: string;
|
|
||||||
destAddr: string;
|
|
||||||
protocol: string;
|
|
||||||
startedAt: number; // epoch seconds
|
|
||||||
endedAt: number | null;
|
|
||||||
bytesTx: number | null;
|
|
||||||
bytesRx: number | null;
|
|
||||||
}
|
|
||||||
|
|
||||||
// In-memory buffer of records waiting to be flushed
|
|
||||||
let buffer: ConnectionLogRecord[] = [];
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if an error is a deadlock error
|
|
||||||
*/
|
|
||||||
function isDeadlockError(error: any): boolean {
|
|
||||||
return (
|
|
||||||
error?.code === "40P01" ||
|
|
||||||
error?.cause?.code === "40P01" ||
|
|
||||||
(error?.message && error.message.includes("deadlock"))
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Execute a function with retry logic for deadlock handling
|
|
||||||
*/
|
|
||||||
async function withDeadlockRetry<T>(
|
|
||||||
operation: () => Promise<T>,
|
|
||||||
context: string
|
|
||||||
): Promise<T> {
|
|
||||||
let attempt = 0;
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
return await operation();
|
|
||||||
} catch (error: any) {
|
|
||||||
if (isDeadlockError(error) && attempt < MAX_RETRIES) {
|
|
||||||
attempt++;
|
|
||||||
const baseDelay = Math.pow(2, attempt - 1) * BASE_DELAY_MS;
|
|
||||||
const jitter = Math.random() * baseDelay;
|
|
||||||
const delay = baseDelay + jitter;
|
|
||||||
logger.warn(
|
|
||||||
`Deadlock detected in ${context}, retrying attempt ${attempt}/${MAX_RETRIES} after ${delay.toFixed(0)}ms`
|
|
||||||
);
|
|
||||||
await new Promise((resolve) => setTimeout(resolve, delay));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Decompress a base64-encoded zlib-compressed string into parsed JSON.
|
|
||||||
*/
|
|
||||||
async function decompressConnectionLog(
|
|
||||||
compressed: string
|
|
||||||
): Promise<ConnectionSessionData[]> {
|
|
||||||
const compressedBuffer = Buffer.from(compressed, "base64");
|
|
||||||
const decompressed = await zlibInflate(compressedBuffer);
|
|
||||||
const jsonString = decompressed.toString("utf-8");
|
|
||||||
const parsed = JSON.parse(jsonString);
|
|
||||||
|
|
||||||
if (!Array.isArray(parsed)) {
|
|
||||||
throw new Error("Decompressed connection log data is not an array");
|
|
||||||
}
|
|
||||||
|
|
||||||
return parsed;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Convert an ISO 8601 timestamp string to epoch seconds.
|
|
||||||
* Returns null if the input is falsy.
|
|
||||||
*/
|
|
||||||
function toEpochSeconds(isoString: string | undefined | null): number | null {
|
|
||||||
if (!isoString) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
const ms = new Date(isoString).getTime();
|
|
||||||
if (isNaN(ms)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return Math.floor(ms / 1000);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Flush all buffered connection log records to the database.
|
|
||||||
*
|
|
||||||
* Swaps out the buffer before writing so that any records added during the
|
|
||||||
* flush are captured in the new buffer rather than being lost. Entries that
|
|
||||||
* fail to write are re-queued back into the buffer so they will be retried
|
|
||||||
* on the next flush.
|
|
||||||
*
|
|
||||||
* This function is exported so that the application's graceful-shutdown
|
|
||||||
* cleanup handler can call it before the process exits.
|
|
||||||
*/
|
|
||||||
export async function flushConnectionLogToDb(): Promise<void> {
|
|
||||||
if (buffer.length === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Atomically swap out the buffer so new data keeps flowing in
|
|
||||||
const snapshot = buffer;
|
|
||||||
buffer = [];
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
`Flushing ${snapshot.length} connection log record(s) to the database`
|
|
||||||
);
|
|
||||||
|
|
||||||
// Insert in batches to avoid overly large SQL statements
|
|
||||||
for (let i = 0; i < snapshot.length; i += INSERT_BATCH_SIZE) {
|
|
||||||
const batch = snapshot.slice(i, i + INSERT_BATCH_SIZE);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await withDeadlockRetry(async () => {
|
|
||||||
await logsDb.insert(connectionAuditLog).values(batch);
|
|
||||||
}, `flush connection log batch (${batch.length} records)`);
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
`Failed to flush connection log batch of ${batch.length} records:`,
|
|
||||||
error
|
|
||||||
);
|
|
||||||
|
|
||||||
// Re-queue the failed batch so it is retried on the next flush
|
|
||||||
buffer = [...batch, ...buffer];
|
|
||||||
|
|
||||||
// Cap buffer to prevent unbounded growth if DB is unreachable
|
|
||||||
if (buffer.length > MAX_BUFFERED_RECORDS * 5) {
|
|
||||||
const dropped = buffer.length - MAX_BUFFERED_RECORDS * 5;
|
|
||||||
buffer = buffer.slice(0, MAX_BUFFERED_RECORDS * 5);
|
|
||||||
logger.warn(
|
|
||||||
`Connection log buffer overflow, dropped ${dropped} oldest records`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Stop trying further batches from this snapshot — they'll be
|
|
||||||
// picked up by the next flush via the re-queued records above
|
|
||||||
const remaining = snapshot.slice(i + INSERT_BATCH_SIZE);
|
|
||||||
if (remaining.length > 0) {
|
|
||||||
buffer = [...remaining, ...buffer];
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const flushTimer = setInterval(async () => {
|
|
||||||
try {
|
|
||||||
await flushConnectionLogToDb();
|
|
||||||
} catch (error) {
|
|
||||||
logger.error(
|
|
||||||
"Unexpected error during periodic connection log flush:",
|
|
||||||
error
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}, FLUSH_INTERVAL_MS);
|
|
||||||
|
|
||||||
// Calling unref() means this timer will not keep the Node.js event loop alive
|
|
||||||
// on its own — the process can still exit normally when there is no other work
|
|
||||||
// left. The graceful-shutdown path will call flushConnectionLogToDb() explicitly
|
|
||||||
// before process.exit(), so no data is lost.
|
|
||||||
flushTimer.unref();
|
|
||||||
|
|
||||||
export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
|
|
||||||
const cutoffTimestamp = calculateCutoffTimestamp(retentionDays);
|
|
||||||
|
|
||||||
try {
|
|
||||||
await logsDb
|
|
||||||
.delete(connectionAuditLog)
|
|
||||||
.where(
|
|
||||||
and(
|
|
||||||
lt(connectionAuditLog.startedAt, cutoffTimestamp),
|
|
||||||
eq(connectionAuditLog.orgId, orgId)
|
|
||||||
)
|
|
||||||
);
|
|
||||||
|
|
||||||
// logger.debug(
|
|
||||||
// `Cleaned up connection audit logs older than ${retentionDays} days`
|
|
||||||
// );
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Error cleaning up old connection audit logs:", error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
export const handleConnectionLogMessage: MessageHandler = async (context) => {
|
|
||||||
const { message, client } = context;
|
|
||||||
const newt = client as Newt;
|
|
||||||
|
|
||||||
if (!newt) {
|
|
||||||
logger.warn("Connection log received but no newt client in context");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!newt.siteId) {
|
|
||||||
logger.warn("Connection log received but newt has no siteId");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!message.data?.compressed) {
|
|
||||||
logger.warn("Connection log message missing compressed data");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Look up the org for this site
|
|
||||||
const [site] = await db
|
|
||||||
.select({ orgId: sites.orgId })
|
|
||||||
.from(sites)
|
|
||||||
.where(eq(sites.siteId, newt.siteId));
|
|
||||||
|
|
||||||
if (!site) {
|
|
||||||
logger.warn(
|
|
||||||
`Connection log received but site ${newt.siteId} not found in database`
|
|
||||||
);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
const orgId = site.orgId;
|
|
||||||
|
|
||||||
let sessions: ConnectionSessionData[];
|
|
||||||
try {
|
|
||||||
sessions = await decompressConnectionLog(message.data.compressed);
|
|
||||||
} catch (error) {
|
|
||||||
logger.error("Failed to decompress connection log data:", error);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (sessions.length === 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert to DB records and add to the buffer
|
|
||||||
for (const session of sessions) {
|
|
||||||
// Validate required fields
|
|
||||||
if (
|
|
||||||
!session.sessionId ||
|
|
||||||
!session.resourceId ||
|
|
||||||
!session.sourceAddr ||
|
|
||||||
!session.destAddr ||
|
|
||||||
!session.protocol
|
|
||||||
) {
|
|
||||||
logger.debug(
|
|
||||||
`Skipping connection log session with missing required fields: ${JSON.stringify(session)}`
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
const startedAt = toEpochSeconds(session.startedAt);
|
|
||||||
if (startedAt === null) {
|
|
||||||
logger.debug(
|
|
||||||
`Skipping connection log session with invalid startedAt: ${session.startedAt}`
|
|
||||||
);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
buffer.push({
|
|
||||||
sessionId: session.sessionId,
|
|
||||||
siteResourceId: session.resourceId,
|
|
||||||
orgId,
|
|
||||||
siteId: newt.siteId,
|
|
||||||
sourceAddr: session.sourceAddr,
|
|
||||||
destAddr: session.destAddr,
|
|
||||||
protocol: session.protocol,
|
|
||||||
startedAt,
|
|
||||||
endedAt: toEpochSeconds(session.endedAt),
|
|
||||||
bytesTx: session.bytesTx ?? null,
|
|
||||||
bytesRx: session.bytesRx ?? null
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
logger.debug(
|
|
||||||
`Buffered ${sessions.length} connection log session(s) from newt ${newt.newtId} (site ${newt.siteId})`
|
|
||||||
);
|
|
||||||
|
|
||||||
// If the buffer has grown large enough, trigger an immediate flush
|
|
||||||
if (buffer.length >= MAX_BUFFERED_RECORDS) {
|
|
||||||
// Fire and forget — errors are handled inside flushConnectionLogToDb
|
|
||||||
flushConnectionLogToDb().catch((error) => {
|
|
||||||
logger.error(
|
|
||||||
"Unexpected error during size-triggered connection log flush:",
|
|
||||||
error
|
|
||||||
);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
@@ -1 +0,0 @@
|
|||||||
export * from "./handleConnectionLogMessage";
|
|
||||||
@@ -18,12 +18,10 @@ import {
|
|||||||
} from "#private/routers/remoteExitNode";
|
} from "#private/routers/remoteExitNode";
|
||||||
import { MessageHandler } from "@server/routers/ws";
|
import { MessageHandler } from "@server/routers/ws";
|
||||||
import { build } from "@server/build";
|
import { build } from "@server/build";
|
||||||
import { handleConnectionLogMessage } from "#dynamic/routers/newt";
|
|
||||||
|
|
||||||
export const messageHandlers: Record<string, MessageHandler> = {
|
export const messageHandlers: Record<string, MessageHandler> = {
|
||||||
"remoteExitNode/register": handleRemoteExitNodeRegisterMessage,
|
"remoteExitNode/register": handleRemoteExitNodeRegisterMessage,
|
||||||
"remoteExitNode/ping": handleRemoteExitNodePingMessage,
|
"remoteExitNode/ping": handleRemoteExitNodePingMessage
|
||||||
"newt/access-log": handleConnectionLogMessage,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (build != "saas") {
|
if (build != "saas") {
|
||||||
|
|||||||
@@ -1,54 +1,15 @@
|
|||||||
import { sendToClient } from "#dynamic/routers/ws";
|
import { sendToClient } from "#dynamic/routers/ws";
|
||||||
import { db, newts, olms } from "@server/db";
|
import { db, olms, Transaction } from "@server/db";
|
||||||
import {
|
|
||||||
Alias,
|
|
||||||
convertSubnetProxyTargetsV2ToV1,
|
|
||||||
SubnetProxyTarget,
|
|
||||||
SubnetProxyTargetV2
|
|
||||||
} from "@server/lib/ip";
|
|
||||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
import { Alias, SubnetProxyTarget } from "@server/lib/ip";
|
||||||
import logger from "@server/logger";
|
import logger from "@server/logger";
|
||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import semver from "semver";
|
|
||||||
|
|
||||||
const NEWT_V2_TARGETS_VERSION = ">=1.10.3";
|
|
||||||
|
|
||||||
export async function convertTargetsIfNessicary(
|
|
||||||
newtId: string,
|
|
||||||
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[]
|
|
||||||
) {
|
|
||||||
// get the newt
|
|
||||||
const [newt] = await db
|
|
||||||
.select()
|
|
||||||
.from(newts)
|
|
||||||
.where(eq(newts.newtId, newtId));
|
|
||||||
if (!newt) {
|
|
||||||
throw new Error(`No newt found for id: ${newtId}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
// check the semver
|
|
||||||
if (
|
|
||||||
newt.version &&
|
|
||||||
!semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION)
|
|
||||||
) {
|
|
||||||
logger.debug(
|
|
||||||
`addTargets Newt version ${newt.version} does not support targets v2 falling back`
|
|
||||||
);
|
|
||||||
targets = convertSubnetProxyTargetsV2ToV1(
|
|
||||||
targets as SubnetProxyTargetV2[]
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
return targets;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function addTargets(
|
export async function addTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[],
|
targets: SubnetProxyTarget[],
|
||||||
version?: string | null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
targets = await convertTargetsIfNessicary(newtId, targets);
|
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
@@ -61,11 +22,9 @@ export async function addTargets(
|
|||||||
|
|
||||||
export async function removeTargets(
|
export async function removeTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: SubnetProxyTarget[] | SubnetProxyTargetV2[],
|
targets: SubnetProxyTarget[],
|
||||||
version?: string | null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
targets = await convertTargetsIfNessicary(newtId, targets);
|
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
@@ -79,39 +38,11 @@ export async function removeTargets(
|
|||||||
export async function updateTargets(
|
export async function updateTargets(
|
||||||
newtId: string,
|
newtId: string,
|
||||||
targets: {
|
targets: {
|
||||||
oldTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
|
oldTargets: SubnetProxyTarget[];
|
||||||
newTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[];
|
newTargets: SubnetProxyTarget[];
|
||||||
},
|
},
|
||||||
version?: string | null
|
version?: string | null
|
||||||
) {
|
) {
|
||||||
// get the newt
|
|
||||||
const [newt] = await db
|
|
||||||
.select()
|
|
||||||
.from(newts)
|
|
||||||
.where(eq(newts.newtId, newtId));
|
|
||||||
if (!newt) {
|
|
||||||
logger.error(`addTargetsL No newt found for id: ${newtId}`);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// check the semver
|
|
||||||
if (
|
|
||||||
newt.version &&
|
|
||||||
!semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION)
|
|
||||||
) {
|
|
||||||
logger.debug(
|
|
||||||
`addTargets Newt version ${newt.version} does not support targets v2 falling back`
|
|
||||||
);
|
|
||||||
targets = {
|
|
||||||
oldTargets: convertSubnetProxyTargetsV2ToV1(
|
|
||||||
targets.oldTargets as SubnetProxyTargetV2[]
|
|
||||||
),
|
|
||||||
newTargets: convertSubnetProxyTargetsV2ToV1(
|
|
||||||
targets.newTargets as SubnetProxyTargetV2[]
|
|
||||||
)
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
await sendToClient(
|
await sendToClient(
|
||||||
newtId,
|
newtId,
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -16,8 +16,8 @@ import { eq, and } from "drizzle-orm";
|
|||||||
import config from "@server/lib/config";
|
import config from "@server/lib/config";
|
||||||
import {
|
import {
|
||||||
formatEndpoint,
|
formatEndpoint,
|
||||||
generateSubnetProxyTargetV2,
|
generateSubnetProxyTargets,
|
||||||
SubnetProxyTargetV2
|
SubnetProxyTarget
|
||||||
} from "@server/lib/ip";
|
} from "@server/lib/ip";
|
||||||
|
|
||||||
export async function buildClientConfigurationForNewtClient(
|
export async function buildClientConfigurationForNewtClient(
|
||||||
@@ -143,7 +143,7 @@ export async function buildClientConfigurationForNewtClient(
|
|||||||
.from(siteResources)
|
.from(siteResources)
|
||||||
.where(eq(siteResources.siteId, siteId));
|
.where(eq(siteResources.siteId, siteId));
|
||||||
|
|
||||||
const targetsToSend: SubnetProxyTargetV2[] = [];
|
const targetsToSend: SubnetProxyTarget[] = [];
|
||||||
|
|
||||||
for (const resource of allSiteResources) {
|
for (const resource of allSiteResources) {
|
||||||
// Get clients associated with this specific resource
|
// Get clients associated with this specific resource
|
||||||
@@ -168,14 +168,12 @@ export async function buildClientConfigurationForNewtClient(
|
|||||||
)
|
)
|
||||||
);
|
);
|
||||||
|
|
||||||
const resourceTarget = generateSubnetProxyTargetV2(
|
const resourceTargets = generateSubnetProxyTargets(
|
||||||
resource,
|
resource,
|
||||||
resourceClients
|
resourceClients
|
||||||
);
|
);
|
||||||
|
|
||||||
if (resourceTarget) {
|
targetsToSend.push(...resourceTargets);
|
||||||
targetsToSend.push(resourceTarget);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
|||||||
@@ -1,13 +0,0 @@
|
|||||||
import { MessageHandler } from "@server/routers/ws";
|
|
||||||
|
|
||||||
export async function flushConnectionLogToDb(): Promise<void> {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function cleanUpOldLogs(orgId: string, retentionDays: number) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
export const handleConnectionLogMessage: MessageHandler = async (context) => {
|
|
||||||
return;
|
|
||||||
};
|
|
||||||
@@ -6,7 +6,6 @@ import { db, ExitNode, exitNodes, Newt, sites } from "@server/db";
|
|||||||
import { eq } from "drizzle-orm";
|
import { eq } from "drizzle-orm";
|
||||||
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
import { sendToExitNode } from "#dynamic/lib/exitNodes";
|
||||||
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
import { buildClientConfigurationForNewtClient } from "./buildConfiguration";
|
||||||
import { convertTargetsIfNessicary } from "../client/targets";
|
|
||||||
import { canCompress } from "@server/lib/clientVersionChecks";
|
import { canCompress } from "@server/lib/clientVersionChecks";
|
||||||
|
|
||||||
const inputSchema = z.object({
|
const inputSchema = z.object({
|
||||||
@@ -128,15 +127,13 @@ export const handleGetConfigMessage: MessageHandler = async (context) => {
|
|||||||
exitNode
|
exitNode
|
||||||
);
|
);
|
||||||
|
|
||||||
const targetsToSend = await convertTargetsIfNessicary(newt.newtId, targets);
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
message: {
|
message: {
|
||||||
type: "newt/wg/receive-config",
|
type: "newt/wg/receive-config",
|
||||||
data: {
|
data: {
|
||||||
ipAddress: site.address,
|
ipAddress: site.address,
|
||||||
peers,
|
peers,
|
||||||
targets: targetsToSend
|
targets
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
options: {
|
options: {
|
||||||
|
|||||||
@@ -8,4 +8,3 @@ export * from "./handleNewtPingRequestMessage";
|
|||||||
export * from "./handleApplyBlueprintMessage";
|
export * from "./handleApplyBlueprintMessage";
|
||||||
export * from "./handleNewtPingMessage";
|
export * from "./handleNewtPingMessage";
|
||||||
export * from "./handleNewtDisconnectingMessage";
|
export * from "./handleNewtDisconnectingMessage";
|
||||||
export * from "./handleConnectionLogMessage";
|
|
||||||
|
|||||||
@@ -88,7 +88,7 @@ const createSiteResourceSchema = z
|
|||||||
},
|
},
|
||||||
{
|
{
|
||||||
message:
|
message:
|
||||||
"Destination must be a valid IPV4 address or valid domain AND alias is required"
|
"Destination must be a valid IP address or valid domain AND alias is required"
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
.refine(
|
.refine(
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ import { updatePeerData, updateTargets } from "@server/routers/client/targets";
|
|||||||
import {
|
import {
|
||||||
generateAliasConfig,
|
generateAliasConfig,
|
||||||
generateRemoteSubnets,
|
generateRemoteSubnets,
|
||||||
generateSubnetProxyTargetV2,
|
generateSubnetProxyTargets,
|
||||||
isIpInCidr,
|
isIpInCidr,
|
||||||
portRangeStringSchema
|
portRangeStringSchema
|
||||||
} from "@server/lib/ip";
|
} from "@server/lib/ip";
|
||||||
@@ -608,18 +608,18 @@ export async function handleMessagingForUpdatedSiteResource(
|
|||||||
|
|
||||||
// Only update targets on newt if destination changed
|
// Only update targets on newt if destination changed
|
||||||
if (destinationChanged || portRangesChanged) {
|
if (destinationChanged || portRangesChanged) {
|
||||||
const oldTarget = generateSubnetProxyTargetV2(
|
const oldTargets = generateSubnetProxyTargets(
|
||||||
existingSiteResource,
|
existingSiteResource,
|
||||||
mergedAllClients
|
mergedAllClients
|
||||||
);
|
);
|
||||||
const newTarget = generateSubnetProxyTargetV2(
|
const newTargets = generateSubnetProxyTargets(
|
||||||
updatedSiteResource,
|
updatedSiteResource,
|
||||||
mergedAllClients
|
mergedAllClients
|
||||||
);
|
);
|
||||||
|
|
||||||
await updateTargets(newt.newtId, {
|
await updateTargets(newt.newtId, {
|
||||||
oldTargets: oldTarget ? [oldTarget] : [],
|
oldTargets: oldTargets,
|
||||||
newTargets: newTarget ? [newTarget] : []
|
newTargets: newTargets
|
||||||
}, newt.version);
|
}, newt.version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user