import { sendToClient } from "#dynamic/routers/ws"; import { S } from "@faker-js/faker/dist/airline-Dz1uGqgJ"; import { db, newts, olms, Transaction } from "@server/db"; import { Alias, convertSubnetProxyTargetsV2ToV1, SubnetProxyTarget, SubnetProxyTargetV2 } from "@server/lib/ip"; import logger from "@server/logger"; import { eq } from "drizzle-orm"; import semver from "semver"; const BATCH_SIZE = 50; const BATCH_DELAY_MS = 50; function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } function chunkArray(array: T[], size: number): T[][] { const chunks: T[][] = []; for (let i = 0; i < array.length; i += size) { chunks.push(array.slice(i, i + size)); } return chunks; } const NEWT_V2_TARGETS_VERSION = ">=1.11.0"; export async function convertTargetsIfNessicary( newtId: string, targets: SubnetProxyTarget[] | SubnetProxyTargetV2[] ) { // get the newt const [newt] = await db .select() .from(newts) .where(eq(newts.newtId, newtId)); if (!newt) { throw new Error(`No newt found for id: ${newtId}`); } // check the semver if ( newt.version && !semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION) ) { logger.debug( `addTargets Newt version ${newt.version} does not support targets v2 falling back` ); targets = convertSubnetProxyTargetsV2ToV1( targets as SubnetProxyTargetV2[] ); } return targets; } export async function addTargets( newtId: string, targets: SubnetProxyTarget[] | SubnetProxyTargetV2[] ) { targets = await convertTargetsIfNessicary(newtId, targets); const batches = chunkArray( targets, BATCH_SIZE ); for (let i = 0; i < batches.length; i++) { if (i > 0) { await sleep(BATCH_DELAY_MS); } await sendToClient( newtId, { type: `newt/wg/targets/add`, data: batches[i] }, { incrementConfigVersion: true } ); } } export async function removeTargets( newtId: string, targets: SubnetProxyTarget[] | SubnetProxyTargetV2[] ) { targets = await convertTargetsIfNessicary(newtId, targets); const batches = chunkArray( targets, BATCH_SIZE ); for (let i = 0; i < batches.length; i++) { if (i > 0) { await sleep(BATCH_DELAY_MS); } await sendToClient( newtId, { type: `newt/wg/targets/remove`, data: batches[i] }, { incrementConfigVersion: true } ); } } export async function updateTargets( newtId: string, targets: { oldTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[]; newTargets: SubnetProxyTarget[] | SubnetProxyTargetV2[]; } ) { // get the newt const [newt] = await db .select() .from(newts) .where(eq(newts.newtId, newtId)); if (!newt) { logger.error(`addTargetsL No newt found for id: ${newtId}`); return; } // check the semver if ( newt.version && !semver.satisfies(newt.version, NEWT_V2_TARGETS_VERSION) ) { logger.debug( `addTargets Newt version ${newt.version} does not support targets v2 falling back` ); targets = { oldTargets: convertSubnetProxyTargetsV2ToV1( targets.oldTargets as SubnetProxyTargetV2[] ), newTargets: convertSubnetProxyTargetsV2ToV1( targets.newTargets as SubnetProxyTargetV2[] ) }; } const oldBatches = chunkArray( targets.oldTargets, BATCH_SIZE ); const newBatches = chunkArray( targets.newTargets, BATCH_SIZE ); const maxBatches = Math.max(oldBatches.length, newBatches.length); for (let i = 0; i < maxBatches; i++) { if (i > 0) { await sleep(BATCH_DELAY_MS); } await sendToClient( newtId, { type: `newt/wg/targets/update`, data: { oldTargets: oldBatches[i] || [], newTargets: newBatches[i] || [] } }, { incrementConfigVersion: true } ).catch((error) => { logger.warn(`Error sending message:`, error); }); } } export async function addPeerData( clientId: number, siteId: number, remoteSubnets: string[], aliases: Alias[], olmId?: string ) { if (!olmId) { const [olm] = await db .select() .from(olms) .where(eq(olms.clientId, clientId)) .limit(1); if (!olm) { return; // ignore this because an olm might not be associated with the client anymore } olmId = olm.olmId; } await sendToClient( olmId, { type: `olm/wg/peer/data/add`, data: { siteId: siteId, remoteSubnets: remoteSubnets, aliases: aliases } }, { incrementConfigVersion: true } ).catch((error) => { logger.warn(`Error sending message:`, error); }); } export async function removePeerData( clientId: number, siteId: number, remoteSubnets: string[], aliases: Alias[], olmId?: string ) { if (!olmId) { const [olm] = await db .select() .from(olms) .where(eq(olms.clientId, clientId)) .limit(1); if (!olm) { return; } olmId = olm.olmId; } await sendToClient( olmId, { type: `olm/wg/peer/data/remove`, data: { siteId: siteId, remoteSubnets: remoteSubnets, aliases: aliases } }, { incrementConfigVersion: true } ).catch((error) => { logger.warn(`Error sending message:`, error); }); } export async function updatePeerData( clientId: number, siteId: number, remoteSubnets: | { oldRemoteSubnets: string[]; newRemoteSubnets: string[]; } | undefined, aliases: | { oldAliases: Alias[]; newAliases: Alias[]; } | undefined, olmId?: string ) { if (!olmId) { const [olm] = await db .select() .from(olms) .where(eq(olms.clientId, clientId)) .limit(1); if (!olm) { return; } olmId = olm.olmId; } await sendToClient( olmId, { type: `olm/wg/peer/data/update`, data: { siteId: siteId, ...remoteSubnets, ...aliases } }, { incrementConfigVersion: true } ).catch((error) => { logger.warn(`Error sending message:`, error); }); }