mirror of
https://github.com/fosrl/newt.git
synced 2026-03-01 16:26:40 +00:00
Working on message versioning
This commit is contained in:
145
main.go
145
main.go
@@ -1106,6 +1106,151 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey(
|
||||
}
|
||||
})
|
||||
|
||||
// Register handler for syncing targets (TCP, UDP, and health checks)
|
||||
client.RegisterHandler("newt/sync", func(msg websocket.WSMessage) {
|
||||
logger.Info("Received sync message")
|
||||
|
||||
// if there is no wgData or pm, we can't sync targets
|
||||
if wgData.TunnelIP == "" || pm == nil {
|
||||
logger.Info(msgNoTunnelOrProxy)
|
||||
return
|
||||
}
|
||||
|
||||
// Define the sync data structure
|
||||
type SyncData struct {
|
||||
Targets TargetsByType `json:"targets"`
|
||||
HealthCheckTargets []healthcheck.Config `json:"healthCheckTargets"`
|
||||
}
|
||||
|
||||
var syncData SyncData
|
||||
jsonData, err := json.Marshal(msg.Data)
|
||||
if err != nil {
|
||||
logger.Error("Error marshaling sync data: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(jsonData, &syncData); err != nil {
|
||||
logger.Error("Error unmarshaling sync data: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logger.Debug("Sync data received: TCP targets=%d, UDP targets=%d, health check targets=%d",
|
||||
len(syncData.Targets.TCP), len(syncData.Targets.UDP), len(syncData.HealthCheckTargets))
|
||||
|
||||
// Build sets of desired targets (port -> target string)
|
||||
desiredTCP := make(map[int]string)
|
||||
for _, t := range syncData.Targets.TCP {
|
||||
parts := strings.Split(t, ":")
|
||||
if len(parts) != 3 {
|
||||
logger.Warn("Invalid TCP target format: %s", t)
|
||||
continue
|
||||
}
|
||||
port := 0
|
||||
if _, err := fmt.Sscanf(parts[0], "%d", &port); err != nil {
|
||||
logger.Warn("Invalid port in TCP target: %s", parts[0])
|
||||
continue
|
||||
}
|
||||
desiredTCP[port] = parts[1] + ":" + parts[2]
|
||||
}
|
||||
|
||||
desiredUDP := make(map[int]string)
|
||||
for _, t := range syncData.Targets.UDP {
|
||||
parts := strings.Split(t, ":")
|
||||
if len(parts) != 3 {
|
||||
logger.Warn("Invalid UDP target format: %s", t)
|
||||
continue
|
||||
}
|
||||
port := 0
|
||||
if _, err := fmt.Sscanf(parts[0], "%d", &port); err != nil {
|
||||
logger.Warn("Invalid port in UDP target: %s", parts[0])
|
||||
continue
|
||||
}
|
||||
desiredUDP[port] = parts[1] + ":" + parts[2]
|
||||
}
|
||||
|
||||
// Get current targets from proxy manager
|
||||
currentTCP, currentUDP := pm.GetTargets()
|
||||
|
||||
// Sync TCP targets
|
||||
// Remove TCP targets not in desired set
|
||||
if tcpForIP, ok := currentTCP[wgData.TunnelIP]; ok {
|
||||
for port := range tcpForIP {
|
||||
if _, exists := desiredTCP[port]; !exists {
|
||||
logger.Info("Sync: removing TCP target on port %d", port)
|
||||
targetStr := fmt.Sprintf("%d:%s", port, tcpForIP[port])
|
||||
updateTargets(pm, "remove", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add TCP targets that are missing
|
||||
for port, target := range desiredTCP {
|
||||
needsAdd := true
|
||||
if tcpForIP, ok := currentTCP[wgData.TunnelIP]; ok {
|
||||
if currentTarget, exists := tcpForIP[port]; exists {
|
||||
// Check if target address changed
|
||||
if currentTarget == target {
|
||||
needsAdd = false
|
||||
} else {
|
||||
// Target changed, remove old one first
|
||||
logger.Info("Sync: updating TCP target on port %d", port)
|
||||
targetStr := fmt.Sprintf("%d:%s", port, currentTarget)
|
||||
updateTargets(pm, "remove", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}})
|
||||
}
|
||||
}
|
||||
}
|
||||
if needsAdd {
|
||||
logger.Info("Sync: adding TCP target on port %d -> %s", port, target)
|
||||
targetStr := fmt.Sprintf("%d:%s", port, target)
|
||||
updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}})
|
||||
}
|
||||
}
|
||||
|
||||
// Sync UDP targets
|
||||
// Remove UDP targets not in desired set
|
||||
if udpForIP, ok := currentUDP[wgData.TunnelIP]; ok {
|
||||
for port := range udpForIP {
|
||||
if _, exists := desiredUDP[port]; !exists {
|
||||
logger.Info("Sync: removing UDP target on port %d", port)
|
||||
targetStr := fmt.Sprintf("%d:%s", port, udpForIP[port])
|
||||
updateTargets(pm, "remove", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add UDP targets that are missing
|
||||
for port, target := range desiredUDP {
|
||||
needsAdd := true
|
||||
if udpForIP, ok := currentUDP[wgData.TunnelIP]; ok {
|
||||
if currentTarget, exists := udpForIP[port]; exists {
|
||||
// Check if target address changed
|
||||
if currentTarget == target {
|
||||
needsAdd = false
|
||||
} else {
|
||||
// Target changed, remove old one first
|
||||
logger.Info("Sync: updating UDP target on port %d", port)
|
||||
targetStr := fmt.Sprintf("%d:%s", port, currentTarget)
|
||||
updateTargets(pm, "remove", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}})
|
||||
}
|
||||
}
|
||||
}
|
||||
if needsAdd {
|
||||
logger.Info("Sync: adding UDP target on port %d -> %s", port, target)
|
||||
targetStr := fmt.Sprintf("%d:%s", port, target)
|
||||
updateTargets(pm, "add", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}})
|
||||
}
|
||||
}
|
||||
|
||||
// Sync health check targets
|
||||
if err := healthMonitor.SyncTargets(syncData.HealthCheckTargets); err != nil {
|
||||
logger.Error("Failed to sync health check targets: %v", err)
|
||||
} else {
|
||||
logger.Info("Successfully synced health check targets")
|
||||
}
|
||||
|
||||
logger.Info("Sync complete")
|
||||
})
|
||||
|
||||
// Register handler for Docker socket check
|
||||
client.RegisterHandler("newt/socket/check", func(msg websocket.WSMessage) {
|
||||
logger.Debug("Received Docker socket check request")
|
||||
|
||||
Reference in New Issue
Block a user