From 039ae07b7b7b43d14a5a5fff4bed3b93d3e7544d Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 2 Mar 2026 18:11:20 -0800 Subject: [PATCH 01/28] Support prefixes sent from server --- clients/clients.go | 127 +++++++++++++++++++++++++-------------------- 1 file changed, 71 insertions(+), 56 deletions(-) diff --git a/clients/clients.go b/clients/clients.go index b7065fa..dff5025 100644 --- a/clients/clients.go +++ b/clients/clients.go @@ -37,11 +37,12 @@ type WgConfig struct { } type Target struct { - SourcePrefix string `json:"sourcePrefix"` - DestPrefix string `json:"destPrefix"` - RewriteTo string `json:"rewriteTo,omitempty"` - DisableIcmp bool `json:"disableIcmp,omitempty"` - PortRange []PortRange `json:"portRange,omitempty"` + SourcePrefix string `json:"sourcePrefix"` + SourcePrefixes []string `json:"sourcePrefixes"` + DestPrefix string `json:"destPrefix"` + RewriteTo string `json:"rewriteTo,omitempty"` + DisableIcmp bool `json:"disableIcmp,omitempty"` + PortRange []PortRange `json:"portRange,omitempty"` } type PortRange struct { @@ -277,7 +278,7 @@ func (s *WireGuardService) StartHolepunch(publicKey string, endpoint string, rel } if relayPort == 0 { - relayPort = 21820 + relayPort = 21820 } // Convert websocket.ExitNode to holepunch.ExitNode @@ -695,6 +696,19 @@ func (s *WireGuardService) ensureWireguardPeers(peers []Peer) error { return nil } +// resolveSourcePrefixes returns the effective list of source prefixes for a target, +// supporting both the legacy single SourcePrefix field and the new SourcePrefixes array. +// If SourcePrefixes is non-empty it takes precedence; otherwise SourcePrefix is used. +func resolveSourcePrefixes(target Target) []string { + if len(target.SourcePrefixes) > 0 { + return target.SourcePrefixes + } + if target.SourcePrefix != "" { + return []string{target.SourcePrefix} + } + return nil +} + func (s *WireGuardService) ensureTargets(targets []Target) error { if s.tnet == nil { // Native interface mode - proxy features not available, skip silently @@ -703,11 +717,6 @@ func (s *WireGuardService) ensureTargets(targets []Target) error { } for _, target := range targets { - sourcePrefix, err := netip.ParsePrefix(target.SourcePrefix) - if err != nil { - return fmt.Errorf("invalid CIDR %s: %v", target.SourcePrefix, err) - } - destPrefix, err := netip.ParsePrefix(target.DestPrefix) if err != nil { return fmt.Errorf("invalid CIDR %s: %v", target.DestPrefix, err) @@ -722,9 +731,14 @@ func (s *WireGuardService) ensureTargets(targets []Target) error { }) } - s.tnet.AddProxySubnetRule(sourcePrefix, destPrefix, target.RewriteTo, portRanges, target.DisableIcmp) - - logger.Info("Added target subnet from %s to %s rewrite to %s with port ranges: %v", target.SourcePrefix, target.DestPrefix, target.RewriteTo, target.PortRange) + for _, sp := range resolveSourcePrefixes(target) { + sourcePrefix, err := netip.ParsePrefix(sp) + if err != nil { + return fmt.Errorf("invalid CIDR %s: %v", sp, err) + } + s.tnet.AddProxySubnetRule(sourcePrefix, destPrefix, target.RewriteTo, portRanges, target.DisableIcmp) + logger.Info("Added target subnet from %s to %s rewrite to %s with port ranges: %v", sp, target.DestPrefix, target.RewriteTo, target.PortRange) + } } return nil @@ -1043,7 +1057,7 @@ func (s *WireGuardService) processPeerBandwidth(publicKey string, rxBytes, txByt BytesOut: bytesOutMB, } } - + return nil } } @@ -1094,12 +1108,6 @@ func (s *WireGuardService) handleAddTarget(msg websocket.WSMessage) { // Process all targets for _, target := range targets { - sourcePrefix, err := netip.ParsePrefix(target.SourcePrefix) - if err != nil { - logger.Info("Invalid CIDR %s: %v", target.SourcePrefix, err) - continue - } - destPrefix, err := netip.ParsePrefix(target.DestPrefix) if err != nil { logger.Info("Invalid CIDR %s: %v", target.DestPrefix, err) @@ -1109,15 +1117,21 @@ func (s *WireGuardService) handleAddTarget(msg websocket.WSMessage) { var portRanges []netstack2.PortRange for _, pr := range target.PortRange { portRanges = append(portRanges, netstack2.PortRange{ - Min: pr.Min, - Max: pr.Max, - Protocol: pr.Protocol, + Min: pr.Min, + Max: pr.Max, + Protocol: pr.Protocol, }) } - s.tnet.AddProxySubnetRule(sourcePrefix, destPrefix, target.RewriteTo, portRanges, target.DisableIcmp) - - logger.Info("Added target subnet from %s to %s rewrite to %s with port ranges: %v", target.SourcePrefix, target.DestPrefix, target.RewriteTo, target.PortRange) + for _, sp := range resolveSourcePrefixes(target) { + sourcePrefix, err := netip.ParsePrefix(sp) + if err != nil { + logger.Info("Invalid CIDR %s: %v", sp, err) + continue + } + s.tnet.AddProxySubnetRule(sourcePrefix, destPrefix, target.RewriteTo, portRanges, target.DisableIcmp) + logger.Info("Added target subnet from %s to %s rewrite to %s with port ranges: %v", sp, target.DestPrefix, target.RewriteTo, target.PortRange) + } } } @@ -1146,21 +1160,21 @@ func (s *WireGuardService) handleRemoveTarget(msg websocket.WSMessage) { // Process all targets for _, target := range targets { - sourcePrefix, err := netip.ParsePrefix(target.SourcePrefix) - if err != nil { - logger.Info("Invalid CIDR %s: %v", target.SourcePrefix, err) - continue - } - destPrefix, err := netip.ParsePrefix(target.DestPrefix) if err != nil { logger.Info("Invalid CIDR %s: %v", target.DestPrefix, err) continue } - s.tnet.RemoveProxySubnetRule(sourcePrefix, destPrefix) - - logger.Info("Removed target subnet %s with destination %s", target.SourcePrefix, target.DestPrefix) + for _, sp := range resolveSourcePrefixes(target) { + sourcePrefix, err := netip.ParsePrefix(sp) + if err != nil { + logger.Info("Invalid CIDR %s: %v", sp, err) + continue + } + s.tnet.RemoveProxySubnetRule(sourcePrefix, destPrefix) + logger.Info("Removed target subnet %s with destination %s", sp, target.DestPrefix) + } } } @@ -1194,30 +1208,24 @@ func (s *WireGuardService) handleUpdateTarget(msg websocket.WSMessage) { // Process all update requests for _, target := range requests.OldTargets { - sourcePrefix, err := netip.ParsePrefix(target.SourcePrefix) - if err != nil { - logger.Info("Invalid CIDR %s: %v", target.SourcePrefix, err) - continue - } - destPrefix, err := netip.ParsePrefix(target.DestPrefix) if err != nil { logger.Info("Invalid CIDR %s: %v", target.DestPrefix, err) continue } - s.tnet.RemoveProxySubnetRule(sourcePrefix, destPrefix) - logger.Info("Removed target subnet %s with destination %s", target.SourcePrefix, target.DestPrefix) + for _, sp := range resolveSourcePrefixes(target) { + sourcePrefix, err := netip.ParsePrefix(sp) + if err != nil { + logger.Info("Invalid CIDR %s: %v", sp, err) + continue + } + s.tnet.RemoveProxySubnetRule(sourcePrefix, destPrefix) + logger.Info("Removed target subnet %s with destination %s", sp, target.DestPrefix) + } } for _, target := range requests.NewTargets { - // Now add the new target - sourcePrefix, err := netip.ParsePrefix(target.SourcePrefix) - if err != nil { - logger.Info("Invalid CIDR %s: %v", target.SourcePrefix, err) - continue - } - destPrefix, err := netip.ParsePrefix(target.DestPrefix) if err != nil { logger.Info("Invalid CIDR %s: %v", target.DestPrefix, err) @@ -1227,14 +1235,21 @@ func (s *WireGuardService) handleUpdateTarget(msg websocket.WSMessage) { var portRanges []netstack2.PortRange for _, pr := range target.PortRange { portRanges = append(portRanges, netstack2.PortRange{ - Min: pr.Min, - Max: pr.Max, - Protocol: pr.Protocol, + Min: pr.Min, + Max: pr.Max, + Protocol: pr.Protocol, }) } - s.tnet.AddProxySubnetRule(sourcePrefix, destPrefix, target.RewriteTo, portRanges, target.DisableIcmp) - logger.Info("Added target subnet from %s to %s rewrite to %s with port ranges: %v", target.SourcePrefix, target.DestPrefix, target.RewriteTo, target.PortRange) + for _, sp := range resolveSourcePrefixes(target) { + sourcePrefix, err := netip.ParsePrefix(sp) + if err != nil { + logger.Info("Invalid CIDR %s: %v", sp, err) + continue + } + s.tnet.AddProxySubnetRule(sourcePrefix, destPrefix, target.RewriteTo, portRanges, target.DisableIcmp) + logger.Info("Added target subnet from %s to %s rewrite to %s with port ranges: %v", sp, target.DestPrefix, target.RewriteTo, target.PortRange) + } } } From 287eef0f446d6bf783e59664b4e1adc0a09a62d3 Mon Sep 17 00:00:00 2001 From: Owen Date: Fri, 19 Dec 2025 16:45:54 -0500 Subject: [PATCH 02/28] Add version and send it down --- websocket/client.go | 35 ++++++++++++++++++++++++++++++++++- websocket/types.go | 5 +++-- 2 files changed, 37 insertions(+), 3 deletions(-) diff --git a/websocket/client.go b/websocket/client.go index da1fa88..c0fea18 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -47,6 +47,8 @@ type Client struct { metricsCtx context.Context configNeedsSave bool // Flag to track if config needs to be saved serverVersion string + configVersion int64 // Latest config version received from server + configVersionMux sync.RWMutex } type ClientOption func(*Client) @@ -154,6 +156,22 @@ func (c *Client) GetServerVersion() string { return c.serverVersion } +// GetConfigVersion returns the latest config version received from server +func (c *Client) GetConfigVersion() int64 { + c.configVersionMux.RLock() + defer c.configVersionMux.RUnlock() + return c.configVersion +} + +// setConfigVersion updates the config version if the new version is higher +func (c *Client) setConfigVersion(version int64) { + c.configVersionMux.Lock() + defer c.configVersionMux.Unlock() + if version > c.configVersion { + c.configVersion = version + } +} + // Connect establishes the WebSocket connection func (c *Client) Connect() error { go c.connectWithRetry() @@ -653,12 +671,22 @@ func (c *Client) pingMonitor() { if c.conn == nil { return } + + // Send application-level ping with config version + pingMsg := WSMessage{ + Type: "ping", + Data: map[string]interface{}{ + "configVersion": c.GetConfigVersion(), + }, + } + c.writeMux.Lock() - err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.pingTimeout)) + err := c.conn.WriteJSON(pingMsg) if err == nil { telemetry.IncWSMessage(c.metricsContext(), "out", "ping") } c.writeMux.Unlock() + if err != nil { // Check if we're shutting down before logging error and reconnecting select { @@ -737,6 +765,11 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) { } } + // Extract and update config version from message if present + if msg.ConfigVersion > 0 { + c.setConfigVersion(msg.ConfigVersion) + } + c.handlersMux.RLock() if handler, ok := c.handlers[msg.Type]; ok { handler(msg) diff --git a/websocket/types.go b/websocket/types.go index 1196d64..381f7a1 100644 --- a/websocket/types.go +++ b/websocket/types.go @@ -17,6 +17,7 @@ type TokenResponse struct { } type WSMessage struct { - Type string `json:"type"` - Data interface{} `json:"data"` + Type string `json:"type"` + Data interface{} `json:"data"` + ConfigVersion int64 `json:"configVersion,omitempty"` } From 4e854b5f961e322573a29d0586dbc1f5578060c1 Mon Sep 17 00:00:00 2001 From: Owen Date: Sun, 21 Dec 2025 20:57:10 -0500 Subject: [PATCH 03/28] Working on message versioning --- clients/clients.go | 178 +++++++++++++++++++++++++++++++++++++ healthcheck/healthcheck.go | 79 ++++++++++++++++ main.go | 145 ++++++++++++++++++++++++++++++ netstack2/proxy.go | 20 +++++ netstack2/tun.go | 9 ++ proxy/manager.go | 25 ++++++ websocket/client.go | 18 ++-- 7 files changed, 466 insertions(+), 8 deletions(-) diff --git a/clients/clients.go b/clients/clients.go index dff5025..4c64dbd 100644 --- a/clients/clients.go +++ b/clients/clients.go @@ -173,6 +173,7 @@ func NewWireGuardService(interfaceName string, port uint16, mtu int, host string wsClient.RegisterHandler("newt/wg/targets/add", service.handleAddTarget) wsClient.RegisterHandler("newt/wg/targets/remove", service.handleRemoveTarget) wsClient.RegisterHandler("newt/wg/targets/update", service.handleUpdateTarget) + wsClient.RegisterHandler("newt/wg/sync", service.handleSyncConfig) return service, nil } @@ -493,6 +494,183 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { logger.Info("Client connectivity setup. Ready to accept connections from clients!") } +// SyncConfig represents the configuration sent from server for syncing +type SyncConfig struct { + Targets []Target `json:"targets"` + Peers []Peer `json:"peers"` +} + +func (s *WireGuardService) handleSyncConfig(msg websocket.WSMessage) { + var syncConfig SyncConfig + + logger.Debug("Received sync message: %v", msg) + logger.Info("Received sync configuration from remote server") + + jsonData, err := json.Marshal(msg.Data) + if err != nil { + logger.Error("Error marshaling sync data: %v", err) + return + } + + if err := json.Unmarshal(jsonData, &syncConfig); err != nil { + logger.Error("Error unmarshaling sync data: %v", err) + return + } + + // Sync peers + if err := s.syncPeers(syncConfig.Peers); err != nil { + logger.Error("Failed to sync peers: %v", err) + } + + // Sync targets + if err := s.syncTargets(syncConfig.Targets); err != nil { + logger.Error("Failed to sync targets: %v", err) + } +} + +// syncPeers synchronizes the current peers with the desired state +// It removes peers not in the desired list and adds missing ones +func (s *WireGuardService) syncPeers(desiredPeers []Peer) error { + if s.device == nil { + return fmt.Errorf("WireGuard device is not initialized") + } + + // Get current peers from the device + currentConfig, err := s.device.IpcGet() + if err != nil { + return fmt.Errorf("failed to get current device config: %v", err) + } + + // Parse current peer public keys + lines := strings.Split(currentConfig, "\n") + currentPeerKeys := make(map[string]bool) + for _, line := range lines { + if strings.HasPrefix(line, "public_key=") { + pubKey := strings.TrimPrefix(line, "public_key=") + currentPeerKeys[pubKey] = true + } + } + + // Build a map of desired peers by their public key (normalized) + desiredPeerMap := make(map[string]Peer) + for _, peer := range desiredPeers { + // Normalize the public key for comparison + pubKey, err := wgtypes.ParseKey(peer.PublicKey) + if err != nil { + logger.Warn("Invalid public key in desired peers: %s", peer.PublicKey) + continue + } + normalizedKey := util.FixKey(pubKey.String()) + desiredPeerMap[normalizedKey] = peer + } + + // Remove peers that are not in the desired list + for currentKey := range currentPeerKeys { + if _, exists := desiredPeerMap[currentKey]; !exists { + // Parse the key back to get the original format for removal + removeConfig := fmt.Sprintf("public_key=%s\nremove=true", currentKey) + if err := s.device.IpcSet(removeConfig); err != nil { + logger.Warn("Failed to remove peer %s during sync: %v", currentKey, err) + } else { + logger.Info("Removed peer %s during sync", currentKey) + } + } + } + + // Add peers that are missing + for normalizedKey, peer := range desiredPeerMap { + if _, exists := currentPeerKeys[normalizedKey]; !exists { + if err := s.addPeerToDevice(peer); err != nil { + logger.Warn("Failed to add peer %s during sync: %v", peer.PublicKey, err) + } else { + logger.Info("Added peer %s during sync", peer.PublicKey) + } + } + } + + return nil +} + +// syncTargets synchronizes the current targets with the desired state +// It removes targets not in the desired list and adds missing ones +func (s *WireGuardService) syncTargets(desiredTargets []Target) error { + if s.tnet == nil { + // Native interface mode - proxy features not available, skip silently + logger.Debug("Skipping target sync - using native interface (no proxy support)") + return nil + } + + // Get current rules from the proxy handler + currentRules := s.tnet.GetProxySubnetRules() + + // Build a map of current rules by source+dest prefix + type ruleKey struct { + sourcePrefix string + destPrefix string + } + currentRuleMap := make(map[ruleKey]bool) + for _, rule := range currentRules { + key := ruleKey{ + sourcePrefix: rule.SourcePrefix.String(), + destPrefix: rule.DestPrefix.String(), + } + currentRuleMap[key] = true + } + + // Build a map of desired targets + desiredTargetMap := make(map[ruleKey]Target) + for _, target := range desiredTargets { + key := ruleKey{ + sourcePrefix: target.SourcePrefix, + destPrefix: target.DestPrefix, + } + desiredTargetMap[key] = target + } + + // Remove targets that are not in the desired list + for _, rule := range currentRules { + key := ruleKey{ + sourcePrefix: rule.SourcePrefix.String(), + destPrefix: rule.DestPrefix.String(), + } + if _, exists := desiredTargetMap[key]; !exists { + s.tnet.RemoveProxySubnetRule(rule.SourcePrefix, rule.DestPrefix) + logger.Info("Removed target %s -> %s during sync", rule.SourcePrefix.String(), rule.DestPrefix.String()) + } + } + + // Add targets that are missing + for key, target := range desiredTargetMap { + if _, exists := currentRuleMap[key]; !exists { + sourcePrefix, err := netip.ParsePrefix(target.SourcePrefix) + if err != nil { + logger.Warn("Invalid source prefix %s during sync: %v", target.SourcePrefix, err) + continue + } + + destPrefix, err := netip.ParsePrefix(target.DestPrefix) + if err != nil { + logger.Warn("Invalid dest prefix %s during sync: %v", target.DestPrefix, err) + continue + } + + var portRanges []netstack2.PortRange + for _, pr := range target.PortRange { + portRanges = append(portRanges, netstack2.PortRange{ + Min: pr.Min, + Max: pr.Max, + Protocol: pr.Protocol, + }) + } + + s.tnet.AddProxySubnetRule(sourcePrefix, destPrefix, target.RewriteTo, portRanges, target.DisableIcmp) + logger.Info("Added target %s -> %s during sync", target.SourcePrefix, target.DestPrefix) + } + } + + return nil +} + func (s *WireGuardService) ensureWireguardInterface(wgconfig WgConfig) error { s.mu.Lock() diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go index 9b23479..9889cc6 100644 --- a/healthcheck/healthcheck.go +++ b/healthcheck/healthcheck.go @@ -521,3 +521,82 @@ func (m *Monitor) DisableTarget(id int) error { return nil } + +// GetTargetIDs returns a slice of all current target IDs +func (m *Monitor) GetTargetIDs() []int { + m.mutex.RLock() + defer m.mutex.RUnlock() + + ids := make([]int, 0, len(m.targets)) + for id := range m.targets { + ids = append(ids, id) + } + return ids +} + +// SyncTargets synchronizes the current targets to match the desired set. +// It removes targets not in the desired set and adds targets that are missing. +func (m *Monitor) SyncTargets(desiredConfigs []Config) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + logger.Info("Syncing health check targets: %d desired targets", len(desiredConfigs)) + + // Build a set of desired target IDs + desiredIDs := make(map[int]Config) + for _, config := range desiredConfigs { + desiredIDs[config.ID] = config + } + + // Find targets to remove (exist but not in desired set) + var toRemove []int + for id := range m.targets { + if _, exists := desiredIDs[id]; !exists { + toRemove = append(toRemove, id) + } + } + + // Remove targets that are not in the desired set + for _, id := range toRemove { + logger.Info("Sync: removing health check target %d", id) + if target, exists := m.targets[id]; exists { + target.cancel() + delete(m.targets, id) + } + } + + // Add or update targets from the desired set + var addedCount, updatedCount int + for id, config := range desiredIDs { + if existing, exists := m.targets[id]; exists { + // Target exists - check if config changed and update if needed + // For now, we'll replace it to ensure config is up to date + logger.Debug("Sync: updating health check target %d", id) + existing.cancel() + delete(m.targets, id) + if err := m.addTargetUnsafe(config); err != nil { + logger.Error("Sync: failed to update target %d: %v", id, err) + return fmt.Errorf("failed to update target %d: %v", id, err) + } + updatedCount++ + } else { + // Target doesn't exist - add it + logger.Debug("Sync: adding health check target %d", id) + if err := m.addTargetUnsafe(config); err != nil { + logger.Error("Sync: failed to add target %d: %v", id, err) + return fmt.Errorf("failed to add target %d: %v", id, err) + } + addedCount++ + } + } + + logger.Info("Sync complete: removed %d, added %d, updated %d targets", + len(toRemove), addedCount, updatedCount) + + // Notify callback if any changes were made + if (len(toRemove) > 0 || addedCount > 0 || updatedCount > 0) && m.callback != nil { + go m.callback(m.getAllTargetsUnsafe()) + } + + return nil +} diff --git a/main.go b/main.go index dee958a..b4175a2 100644 --- a/main.go +++ b/main.go @@ -1165,6 +1165,151 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( } }) + // Register handler for syncing targets (TCP, UDP, and health checks) + client.RegisterHandler("newt/sync", func(msg websocket.WSMessage) { + logger.Info("Received sync message") + + // if there is no wgData or pm, we can't sync targets + if wgData.TunnelIP == "" || pm == nil { + logger.Info(msgNoTunnelOrProxy) + return + } + + // Define the sync data structure + type SyncData struct { + Targets TargetsByType `json:"targets"` + HealthCheckTargets []healthcheck.Config `json:"healthCheckTargets"` + } + + var syncData SyncData + jsonData, err := json.Marshal(msg.Data) + if err != nil { + logger.Error("Error marshaling sync data: %v", err) + return + } + + if err := json.Unmarshal(jsonData, &syncData); err != nil { + logger.Error("Error unmarshaling sync data: %v", err) + return + } + + logger.Debug("Sync data received: TCP targets=%d, UDP targets=%d, health check targets=%d", + len(syncData.Targets.TCP), len(syncData.Targets.UDP), len(syncData.HealthCheckTargets)) + + // Build sets of desired targets (port -> target string) + desiredTCP := make(map[int]string) + for _, t := range syncData.Targets.TCP { + parts := strings.Split(t, ":") + if len(parts) != 3 { + logger.Warn("Invalid TCP target format: %s", t) + continue + } + port := 0 + if _, err := fmt.Sscanf(parts[0], "%d", &port); err != nil { + logger.Warn("Invalid port in TCP target: %s", parts[0]) + continue + } + desiredTCP[port] = parts[1] + ":" + parts[2] + } + + desiredUDP := make(map[int]string) + for _, t := range syncData.Targets.UDP { + parts := strings.Split(t, ":") + if len(parts) != 3 { + logger.Warn("Invalid UDP target format: %s", t) + continue + } + port := 0 + if _, err := fmt.Sscanf(parts[0], "%d", &port); err != nil { + logger.Warn("Invalid port in UDP target: %s", parts[0]) + continue + } + desiredUDP[port] = parts[1] + ":" + parts[2] + } + + // Get current targets from proxy manager + currentTCP, currentUDP := pm.GetTargets() + + // Sync TCP targets + // Remove TCP targets not in desired set + if tcpForIP, ok := currentTCP[wgData.TunnelIP]; ok { + for port := range tcpForIP { + if _, exists := desiredTCP[port]; !exists { + logger.Info("Sync: removing TCP target on port %d", port) + targetStr := fmt.Sprintf("%d:%s", port, tcpForIP[port]) + updateTargets(pm, "remove", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}}) + } + } + } + + // Add TCP targets that are missing + for port, target := range desiredTCP { + needsAdd := true + if tcpForIP, ok := currentTCP[wgData.TunnelIP]; ok { + if currentTarget, exists := tcpForIP[port]; exists { + // Check if target address changed + if currentTarget == target { + needsAdd = false + } else { + // Target changed, remove old one first + logger.Info("Sync: updating TCP target on port %d", port) + targetStr := fmt.Sprintf("%d:%s", port, currentTarget) + updateTargets(pm, "remove", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}}) + } + } + } + if needsAdd { + logger.Info("Sync: adding TCP target on port %d -> %s", port, target) + targetStr := fmt.Sprintf("%d:%s", port, target) + updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}}) + } + } + + // Sync UDP targets + // Remove UDP targets not in desired set + if udpForIP, ok := currentUDP[wgData.TunnelIP]; ok { + for port := range udpForIP { + if _, exists := desiredUDP[port]; !exists { + logger.Info("Sync: removing UDP target on port %d", port) + targetStr := fmt.Sprintf("%d:%s", port, udpForIP[port]) + updateTargets(pm, "remove", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}}) + } + } + } + + // Add UDP targets that are missing + for port, target := range desiredUDP { + needsAdd := true + if udpForIP, ok := currentUDP[wgData.TunnelIP]; ok { + if currentTarget, exists := udpForIP[port]; exists { + // Check if target address changed + if currentTarget == target { + needsAdd = false + } else { + // Target changed, remove old one first + logger.Info("Sync: updating UDP target on port %d", port) + targetStr := fmt.Sprintf("%d:%s", port, currentTarget) + updateTargets(pm, "remove", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}}) + } + } + } + if needsAdd { + logger.Info("Sync: adding UDP target on port %d -> %s", port, target) + targetStr := fmt.Sprintf("%d:%s", port, target) + updateTargets(pm, "add", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}}) + } + } + + // Sync health check targets + if err := healthMonitor.SyncTargets(syncData.HealthCheckTargets); err != nil { + logger.Error("Failed to sync health check targets: %v", err) + } else { + logger.Info("Successfully synced health check targets") + } + + logger.Info("Sync complete") + }) + // Register handler for Docker socket check client.RegisterHandler("newt/socket/check", func(msg websocket.WSMessage) { logger.Debug("Received Docker socket check request") diff --git a/netstack2/proxy.go b/netstack2/proxy.go index 388a3d1..2e2d763 100644 --- a/netstack2/proxy.go +++ b/netstack2/proxy.go @@ -48,6 +48,18 @@ type SubnetRule struct { PortRanges []PortRange // empty slice means all ports allowed } +// GetAllRules returns a copy of all subnet rules +func (sl *SubnetLookup) GetAllRules() []SubnetRule { + sl.mu.RLock() + defer sl.mu.RUnlock() + + rules := make([]SubnetRule, 0, len(sl.rules)) + for _, rule := range sl.rules { + rules = append(rules, *rule) + } + return rules +} + // connKey uniquely identifies a connection for NAT tracking type connKey struct { srcIP string @@ -200,6 +212,14 @@ func (p *ProxyHandler) RemoveSubnetRule(sourcePrefix, destPrefix netip.Prefix) { p.subnetLookup.RemoveSubnet(sourcePrefix, destPrefix) } +// GetAllRules returns all subnet rules from the proxy handler +func (p *ProxyHandler) GetAllRules() []SubnetRule { + if p == nil || !p.enabled { + return nil + } + return p.subnetLookup.GetAllRules() +} + // LookupDestinationRewrite looks up the rewritten destination for a connection // This is used by TCP/UDP handlers to find the actual target address func (p *ProxyHandler) LookupDestinationRewrite(srcIP, dstIP string, dstPort uint16, proto uint8) (netip.Addr, bool) { diff --git a/netstack2/tun.go b/netstack2/tun.go index e743f1e..b00faea 100644 --- a/netstack2/tun.go +++ b/netstack2/tun.go @@ -369,6 +369,15 @@ func (net *Net) RemoveProxySubnetRule(sourcePrefix, destPrefix netip.Prefix) { } } +// GetProxySubnetRules returns all subnet rules from the proxy handler +func (net *Net) GetProxySubnetRules() []SubnetRule { + tun := (*netTun)(net) + if tun.proxyHandler != nil { + return tun.proxyHandler.GetAllRules() + } + return nil +} + // GetProxyHandler returns the proxy handler (for advanced use cases) // Returns nil if proxy is not enabled func (net *Net) GetProxyHandler() *ProxyHandler { diff --git a/proxy/manager.go b/proxy/manager.go index cef5fa6..0619e80 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -736,3 +736,28 @@ func (pm *ProxyManager) PrintTargets() { } } } + +// GetTargets returns a copy of the current TCP and UDP targets +// Returns map[listenIP]map[port]targetAddress for both TCP and UDP +func (pm *ProxyManager) GetTargets() (tcpTargets map[string]map[int]string, udpTargets map[string]map[int]string) { + pm.mutex.RLock() + defer pm.mutex.RUnlock() + + tcpTargets = make(map[string]map[int]string) + for listenIP, targets := range pm.tcpTargets { + tcpTargets[listenIP] = make(map[int]string) + for port, targetAddr := range targets { + tcpTargets[listenIP][port] = targetAddr + } + } + + udpTargets = make(map[string]map[int]string) + for listenIP, targets := range pm.udpTargets { + udpTargets[listenIP] = make(map[int]string) + for port, targetAddr := range targets { + udpTargets[listenIP][port] = targetAddr + } + } + + return tcpTargets, udpTargets +} diff --git a/websocket/client.go b/websocket/client.go index c0fea18..8703b51 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -671,22 +671,24 @@ func (c *Client) pingMonitor() { if c.conn == nil { return } - - // Send application-level ping with config version + + c.configVersionMux.RLock() + configVersion := c.configVersion + c.configVersionMux.RUnlock() + pingMsg := WSMessage{ - Type: "ping", - Data: map[string]interface{}{ - "configVersion": c.GetConfigVersion(), - }, + Type: "ping", + Data: map[string]interface{}{}, + ConfigVersion: configVersion, } - + c.writeMux.Lock() err := c.conn.WriteJSON(pingMsg) if err == nil { telemetry.IncWSMessage(c.metricsContext(), "out", "ping") } c.writeMux.Unlock() - + if err != nil { // Check if we're shutting down before logging error and reconnecting select { From 15ea631b966d2175b3d03df85a8a6cee69a42f81 Mon Sep 17 00:00:00 2001 From: Owen Date: Thu, 15 Jan 2026 21:33:11 -0800 Subject: [PATCH 04/28] Mutex on handlers, slight change to ping message and handler --- websocket/client.go | 40 ++++++++++++++++++++++++++++++---------- 1 file changed, 30 insertions(+), 10 deletions(-) diff --git a/websocket/client.go b/websocket/client.go index 8703b51..8b64e21 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -47,8 +47,11 @@ type Client struct { metricsCtx context.Context configNeedsSave bool // Flag to track if config needs to be saved serverVersion string - configVersion int64 // Latest config version received from server + configVersion int64 // Latest config version received from server configVersionMux sync.RWMutex + processingMessage bool // Flag to track if a message is currently being processed + processingMux sync.RWMutex // Protects processingMessage + processingWg sync.WaitGroup // WaitGroup to wait for message processing to complete } type ClientOption func(*Client) @@ -163,13 +166,11 @@ func (c *Client) GetConfigVersion() int64 { return c.configVersion } -// setConfigVersion updates the config version if the new version is higher +// setConfigVersion updates the config version func (c *Client) setConfigVersion(version int64) { c.configVersionMux.Lock() defer c.configVersionMux.Unlock() - if version > c.configVersion { - c.configVersion = version - } + c.configVersion = version } // Connect establishes the WebSocket connection @@ -672,12 +673,21 @@ func (c *Client) pingMonitor() { return } + // Skip ping if a message is currently being processed + c.processingMux.RLock() + isProcessing := c.processingMessage + c.processingMux.RUnlock() + if isProcessing { + logger.Debug("Skipping ping, message is being processed") + continue + } + c.configVersionMux.RLock() configVersion := c.configVersion c.configVersionMux.RUnlock() pingMsg := WSMessage{ - Type: "ping", + Type: "newt/ping", Data: map[string]interface{}{}, ConfigVersion: configVersion, } @@ -767,14 +777,24 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) { } } - // Extract and update config version from message if present - if msg.ConfigVersion > 0 { - c.setConfigVersion(msg.ConfigVersion) - } + // Update config version from incoming message + c.setConfigVersion(msg.ConfigVersion) c.handlersMux.RLock() if handler, ok := c.handlers[msg.Type]; ok { + // Mark that we're processing a message + c.processingMux.Lock() + c.processingMessage = true + c.processingMux.Unlock() + c.processingWg.Add(1) + handler(msg) + + // Mark that we're done processing + c.processingWg.Done() + c.processingMux.Lock() + c.processingMessage = false + c.processingMux.Unlock() } c.handlersMux.RUnlock() } From 6371e980d2e85570ada7013e91d34e4140092491 Mon Sep 17 00:00:00 2001 From: Owen Date: Tue, 3 Mar 2026 16:11:32 -0800 Subject: [PATCH 05/28] Update the get all rules --- netstack2/proxy.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/netstack2/proxy.go b/netstack2/proxy.go index 2e2d763..1b34818 100644 --- a/netstack2/proxy.go +++ b/netstack2/proxy.go @@ -53,9 +53,14 @@ func (sl *SubnetLookup) GetAllRules() []SubnetRule { sl.mu.RLock() defer sl.mu.RUnlock() - rules := make([]SubnetRule, 0, len(sl.rules)) - for _, rule := range sl.rules { - rules = append(rules, *rule) + var rules []SubnetRule + for _, destTriePtr := range sl.sourceTrie.All() { + if destTriePtr == nil { + continue + } + for _, rule := range destTriePtr.rules { + rules = append(rules, *rule) + } } return rules } From e68b65683fefd48b10198cca954f03b220a99934 Mon Sep 17 00:00:00 2001 From: Owen Date: Fri, 6 Mar 2026 15:14:48 -0800 Subject: [PATCH 06/28] Temp lets ignore the sync messages --- main.go | 214 ++++++++++++++++++++++++++++---------------------------- 1 file changed, 108 insertions(+), 106 deletions(-) diff --git a/main.go b/main.go index 0f4acd7..0141b77 100644 --- a/main.go +++ b/main.go @@ -565,7 +565,7 @@ func runNewtMain(ctx context.Context) { id, // CLI arg takes precedence secret, // CLI arg takes precedence endpoint, - pingInterval, + 30*time.Second, pingTimeout, opt, ) @@ -959,7 +959,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( "publicKey": publicKey.String(), "pingResults": pingResults, "newtVersion": newtVersion, - }, 1*time.Second) + }, 2*time.Second) return } @@ -1062,7 +1062,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( "publicKey": publicKey.String(), "pingResults": pingResults, "newtVersion": newtVersion, - }, 1*time.Second) + }, 2*time.Second) logger.Debug("Sent exit node ping results to cloud for selection: pingResults=%+v", pingResults) }) @@ -1198,116 +1198,118 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( logger.Debug("Sync data received: TCP targets=%d, UDP targets=%d, health check targets=%d", len(syncData.Targets.TCP), len(syncData.Targets.UDP), len(syncData.HealthCheckTargets)) - // Build sets of desired targets (port -> target string) - desiredTCP := make(map[int]string) - for _, t := range syncData.Targets.TCP { - parts := strings.Split(t, ":") - if len(parts) != 3 { - logger.Warn("Invalid TCP target format: %s", t) - continue - } - port := 0 - if _, err := fmt.Sscanf(parts[0], "%d", &port); err != nil { - logger.Warn("Invalid port in TCP target: %s", parts[0]) - continue - } - desiredTCP[port] = parts[1] + ":" + parts[2] - } + //TODO: TEST AND IMPLEMENT THIS + + // // Build sets of desired targets (port -> target string) + // desiredTCP := make(map[int]string) + // for _, t := range syncData.Targets.TCP { + // parts := strings.Split(t, ":") + // if len(parts) != 3 { + // logger.Warn("Invalid TCP target format: %s", t) + // continue + // } + // port := 0 + // if _, err := fmt.Sscanf(parts[0], "%d", &port); err != nil { + // logger.Warn("Invalid port in TCP target: %s", parts[0]) + // continue + // } + // desiredTCP[port] = parts[1] + ":" + parts[2] + // } - desiredUDP := make(map[int]string) - for _, t := range syncData.Targets.UDP { - parts := strings.Split(t, ":") - if len(parts) != 3 { - logger.Warn("Invalid UDP target format: %s", t) - continue - } - port := 0 - if _, err := fmt.Sscanf(parts[0], "%d", &port); err != nil { - logger.Warn("Invalid port in UDP target: %s", parts[0]) - continue - } - desiredUDP[port] = parts[1] + ":" + parts[2] - } + // desiredUDP := make(map[int]string) + // for _, t := range syncData.Targets.UDP { + // parts := strings.Split(t, ":") + // if len(parts) != 3 { + // logger.Warn("Invalid UDP target format: %s", t) + // continue + // } + // port := 0 + // if _, err := fmt.Sscanf(parts[0], "%d", &port); err != nil { + // logger.Warn("Invalid port in UDP target: %s", parts[0]) + // continue + // } + // desiredUDP[port] = parts[1] + ":" + parts[2] + // } - // Get current targets from proxy manager - currentTCP, currentUDP := pm.GetTargets() + // // Get current targets from proxy manager + // currentTCP, currentUDP := pm.GetTargets() - // Sync TCP targets - // Remove TCP targets not in desired set - if tcpForIP, ok := currentTCP[wgData.TunnelIP]; ok { - for port := range tcpForIP { - if _, exists := desiredTCP[port]; !exists { - logger.Info("Sync: removing TCP target on port %d", port) - targetStr := fmt.Sprintf("%d:%s", port, tcpForIP[port]) - updateTargets(pm, "remove", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}}) - } - } - } + // // Sync TCP targets + // // Remove TCP targets not in desired set + // if tcpForIP, ok := currentTCP[wgData.TunnelIP]; ok { + // for port := range tcpForIP { + // if _, exists := desiredTCP[port]; !exists { + // logger.Info("Sync: removing TCP target on port %d", port) + // targetStr := fmt.Sprintf("%d:%s", port, tcpForIP[port]) + // updateTargets(pm, "remove", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}}) + // } + // } + // } - // Add TCP targets that are missing - for port, target := range desiredTCP { - needsAdd := true - if tcpForIP, ok := currentTCP[wgData.TunnelIP]; ok { - if currentTarget, exists := tcpForIP[port]; exists { - // Check if target address changed - if currentTarget == target { - needsAdd = false - } else { - // Target changed, remove old one first - logger.Info("Sync: updating TCP target on port %d", port) - targetStr := fmt.Sprintf("%d:%s", port, currentTarget) - updateTargets(pm, "remove", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}}) - } - } - } - if needsAdd { - logger.Info("Sync: adding TCP target on port %d -> %s", port, target) - targetStr := fmt.Sprintf("%d:%s", port, target) - updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}}) - } - } + // // Add TCP targets that are missing + // for port, target := range desiredTCP { + // needsAdd := true + // if tcpForIP, ok := currentTCP[wgData.TunnelIP]; ok { + // if currentTarget, exists := tcpForIP[port]; exists { + // // Check if target address changed + // if currentTarget == target { + // needsAdd = false + // } else { + // // Target changed, remove old one first + // logger.Info("Sync: updating TCP target on port %d", port) + // targetStr := fmt.Sprintf("%d:%s", port, currentTarget) + // updateTargets(pm, "remove", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}}) + // } + // } + // } + // if needsAdd { + // logger.Info("Sync: adding TCP target on port %d -> %s", port, target) + // targetStr := fmt.Sprintf("%d:%s", port, target) + // updateTargets(pm, "add", wgData.TunnelIP, "tcp", TargetData{Targets: []string{targetStr}}) + // } + // } - // Sync UDP targets - // Remove UDP targets not in desired set - if udpForIP, ok := currentUDP[wgData.TunnelIP]; ok { - for port := range udpForIP { - if _, exists := desiredUDP[port]; !exists { - logger.Info("Sync: removing UDP target on port %d", port) - targetStr := fmt.Sprintf("%d:%s", port, udpForIP[port]) - updateTargets(pm, "remove", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}}) - } - } - } + // // Sync UDP targets + // // Remove UDP targets not in desired set + // if udpForIP, ok := currentUDP[wgData.TunnelIP]; ok { + // for port := range udpForIP { + // if _, exists := desiredUDP[port]; !exists { + // logger.Info("Sync: removing UDP target on port %d", port) + // targetStr := fmt.Sprintf("%d:%s", port, udpForIP[port]) + // updateTargets(pm, "remove", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}}) + // } + // } + // } - // Add UDP targets that are missing - for port, target := range desiredUDP { - needsAdd := true - if udpForIP, ok := currentUDP[wgData.TunnelIP]; ok { - if currentTarget, exists := udpForIP[port]; exists { - // Check if target address changed - if currentTarget == target { - needsAdd = false - } else { - // Target changed, remove old one first - logger.Info("Sync: updating UDP target on port %d", port) - targetStr := fmt.Sprintf("%d:%s", port, currentTarget) - updateTargets(pm, "remove", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}}) - } - } - } - if needsAdd { - logger.Info("Sync: adding UDP target on port %d -> %s", port, target) - targetStr := fmt.Sprintf("%d:%s", port, target) - updateTargets(pm, "add", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}}) - } - } + // // Add UDP targets that are missing + // for port, target := range desiredUDP { + // needsAdd := true + // if udpForIP, ok := currentUDP[wgData.TunnelIP]; ok { + // if currentTarget, exists := udpForIP[port]; exists { + // // Check if target address changed + // if currentTarget == target { + // needsAdd = false + // } else { + // // Target changed, remove old one first + // logger.Info("Sync: updating UDP target on port %d", port) + // targetStr := fmt.Sprintf("%d:%s", port, currentTarget) + // updateTargets(pm, "remove", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}}) + // } + // } + // } + // if needsAdd { + // logger.Info("Sync: adding UDP target on port %d -> %s", port, target) + // targetStr := fmt.Sprintf("%d:%s", port, target) + // updateTargets(pm, "add", wgData.TunnelIP, "udp", TargetData{Targets: []string{targetStr}}) + // } + // } - // Sync health check targets - if err := healthMonitor.SyncTargets(syncData.HealthCheckTargets); err != nil { - logger.Error("Failed to sync health check targets: %v", err) - } else { - logger.Info("Successfully synced health check targets") - } + // // Sync health check targets + // if err := healthMonitor.SyncTargets(syncData.HealthCheckTargets); err != nil { + // logger.Error("Failed to sync health check targets: %v", err) + // } else { + // logger.Info("Successfully synced health check targets") + // } logger.Info("Sync complete") }) From fac0f5b1978814bd98b2f8cf994efc79857c27fb Mon Sep 17 00:00:00 2001 From: Owen Date: Sat, 7 Mar 2026 10:17:14 -0800 Subject: [PATCH 07/28] Build full arn --- .github/workflows/cicd.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 815c400..4df4ba8 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -70,7 +70,7 @@ jobs: - name: Configure AWS credentials (OIDC) uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7 # v6.0.0 with: - role-to-assume: ${{ secrets.AWS_ROLE_ARN }} + role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.AWS_ROLE_NAME }} role-duration-seconds: 3600 aws-region: ${{ secrets.AWS_REGION }} From 1bd1133ac2bbf54714de20baec5f4164d444e68a Mon Sep 17 00:00:00 2001 From: Owen Date: Sat, 7 Mar 2026 10:36:18 -0800 Subject: [PATCH 08/28] Make sure to skip prepare --- .github/workflows/cicd.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 4df4ba8..4d55ed4 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -136,7 +136,7 @@ jobs: build-amd: name: Build image (linux/amd64) needs: [pre-run, prepare] - if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]') || (github.event_name == 'workflow_dispatch' && needs.prepare.result == 'success')) }} + if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]') || (github.event_name == 'workflow_dispatch' && (needs.prepare.result == 'success' || needs.prepare.result == 'skipped'))) }} runs-on: [self-hosted, linux, x64] timeout-minutes: 120 env: @@ -293,7 +293,7 @@ jobs: build-arm: name: Build image (linux/arm64) needs: [pre-run, prepare] - if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]') || (github.event_name == 'workflow_dispatch' && needs.prepare.result == 'success')) }} + if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]') || (github.event_name == 'workflow_dispatch' && (needs.prepare.result == 'success' || needs.prepare.result == 'skipped'))) }} runs-on: [self-hosted, linux, arm64] # NOTE: ensure label exists on runner timeout-minutes: 120 env: @@ -417,7 +417,7 @@ jobs: build-armv7: name: Build image (linux/arm/v7) needs: [pre-run, prepare] - if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]') || (github.event_name == 'workflow_dispatch' && needs.prepare.result == 'success')) }} + if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]') || (github.event_name == 'workflow_dispatch' && (needs.prepare.result == 'success' || needs.prepare.result == 'skipped'))) }} runs-on: [self-hosted, linux, arm64] timeout-minutes: 120 env: @@ -919,7 +919,7 @@ jobs: - name: Configure AWS credentials (OIDC) uses: aws-actions/configure-aws-credentials@8df5847569e6427dd6c4fb1cf565c83acfa8afa7 # v6.0.0 with: - role-to-assume: ${{ secrets.AWS_ROLE_ARN }} + role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.AWS_ROLE_NAME }} role-duration-seconds: 3600 aws-region: ${{ secrets.AWS_REGION }} From afdb1fc9772f376aefa4db05ed14faf083813176 Mon Sep 17 00:00:00 2001 From: Owen Date: Sat, 7 Mar 2026 12:32:49 -0800 Subject: [PATCH 09/28] Make sure to set version and fix prepare issue --- .github/workflows/cicd.yml | 8 ++++---- Makefile | 23 +++++++++++++---------- 2 files changed, 17 insertions(+), 14 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 4d55ed4..d0af856 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -136,7 +136,7 @@ jobs: build-amd: name: Build image (linux/amd64) needs: [pre-run, prepare] - if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]') || (github.event_name == 'workflow_dispatch' && (needs.prepare.result == 'success' || needs.prepare.result == 'skipped'))) }} + if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]' && needs.prepare.result == 'skipped') || (github.event_name == 'workflow_dispatch' && (needs.prepare.result == 'success' || needs.prepare.result == 'skipped'))) }} runs-on: [self-hosted, linux, x64] timeout-minutes: 120 env: @@ -293,7 +293,7 @@ jobs: build-arm: name: Build image (linux/arm64) needs: [pre-run, prepare] - if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]') || (github.event_name == 'workflow_dispatch' && (needs.prepare.result == 'success' || needs.prepare.result == 'skipped'))) }} + if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]' && needs.prepare.result == 'skipped') || (github.event_name == 'workflow_dispatch' && (needs.prepare.result == 'success' || needs.prepare.result == 'skipped'))) }} runs-on: [self-hosted, linux, arm64] # NOTE: ensure label exists on runner timeout-minutes: 120 env: @@ -417,7 +417,7 @@ jobs: build-armv7: name: Build image (linux/arm/v7) needs: [pre-run, prepare] - if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]') || (github.event_name == 'workflow_dispatch' && (needs.prepare.result == 'success' || needs.prepare.result == 'skipped'))) }} + if: ${{ needs.pre-run.result == 'success' && ((github.event_name == 'push' && github.actor != 'github-actions[bot]' && needs.prepare.result == 'skipped') || (github.event_name == 'workflow_dispatch' && (needs.prepare.result == 'success' || needs.prepare.result == 'skipped'))) }} runs-on: [self-hosted, linux, arm64] timeout-minutes: 120 env: @@ -887,7 +887,7 @@ jobs: shell: bash run: | set -euo pipefail - make -j 10 go-build-release tag="${TAG}" + make -j 10 go-build-release VERSION="${TAG}" - name: Create GitHub Release (draft) uses: softprops/action-gh-release@5be0e66d93ac7ed76da52eca8bb058f665c3a5fe # v2.4.2 diff --git a/Makefile b/Makefile index e720189..c35bbbf 100644 --- a/Makefile +++ b/Makefile @@ -2,6 +2,9 @@ all: local +VERSION ?= dev +LDFLAGS = -X main.newtVersion=$(VERSION) + local: CGO_ENABLED=0 go build -o ./bin/newt @@ -40,31 +43,31 @@ go-build-release: \ go-build-release-freebsd-arm64 go-build-release-linux-arm64: - CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -o bin/newt_linux_arm64 + CGO_ENABLED=0 GOOS=linux GOARCH=arm64 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_arm64 go-build-release-linux-arm32-v7: - CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 go build -o bin/newt_linux_arm32 + CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=7 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_arm32 go-build-release-linux-arm32-v6: - CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=6 go build -o bin/newt_linux_arm32v6 + CGO_ENABLED=0 GOOS=linux GOARCH=arm GOARM=6 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_arm32v6 go-build-release-linux-amd64: - CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o bin/newt_linux_amd64 + CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_amd64 go-build-release-linux-riscv64: - CGO_ENABLED=0 GOOS=linux GOARCH=riscv64 go build -o bin/newt_linux_riscv64 + CGO_ENABLED=0 GOOS=linux GOARCH=riscv64 go build -ldflags "$(LDFLAGS)" -o bin/newt_linux_riscv64 go-build-release-darwin-arm64: - CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -o bin/newt_darwin_arm64 + CGO_ENABLED=0 GOOS=darwin GOARCH=arm64 go build -ldflags "$(LDFLAGS)" -o bin/newt_darwin_arm64 go-build-release-darwin-amd64: - CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -o bin/newt_darwin_amd64 + CGO_ENABLED=0 GOOS=darwin GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o bin/newt_darwin_amd64 go-build-release-windows-amd64: - CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -o bin/newt_windows_amd64.exe + CGO_ENABLED=0 GOOS=windows GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o bin/newt_windows_amd64.exe go-build-release-freebsd-amd64: - CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -o bin/newt_freebsd_amd64 + CGO_ENABLED=0 GOOS=freebsd GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o bin/newt_freebsd_amd64 go-build-release-freebsd-arm64: - CGO_ENABLED=0 GOOS=freebsd GOARCH=arm64 go build -o bin/newt_freebsd_arm64 + CGO_ENABLED=0 GOOS=freebsd GOARCH=arm64 go build -ldflags "$(LDFLAGS)" -o bin/newt_freebsd_arm64 \ No newline at end of file From 768415f90be5b8516eae674a325c8adaebdbfa85 Mon Sep 17 00:00:00 2001 From: Laurence Date: Thu, 5 Mar 2026 15:12:47 +0000 Subject: [PATCH 10/28] Parse target strings with IPv6 support and strict validation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add parseTargetString() for listenPort:host:targetPort using net.SplitHostPort/JoinHostPort. Replace manual split in updateTargets; fix err shadowing on remove. Validate listen port 1–65535 and reject empty host/port; use %w for errors. Add tests for IPv4, IPv6, hostnames, and invalid cases. --- common.go | 66 +++++++++++---- common_test.go | 212 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+), 16 deletions(-) create mode 100644 common_test.go diff --git a/common.go b/common.go index 5fe0645..4701411 100644 --- a/common.go +++ b/common.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "net" "os" "os/exec" "strings" @@ -363,27 +364,62 @@ func parseTargetData(data interface{}) (TargetData, error) { return targetData, nil } +// parseTargetString parses a target string in the format "listenPort:host:targetPort" +// It properly handles IPv6 addresses which must be in brackets: "listenPort:[ipv6]:targetPort" +// Examples: +// - IPv4: "3001:192.168.1.1:80" +// - IPv6: "3001:[::1]:8080" or "3001:[fd70:1452:b736:4dd5:caca:7db9:c588:f5b3]:80" +// +// Returns listenPort, targetAddress (in host:port format suitable for net.Dial), and error +func parseTargetString(target string) (int, string, error) { + // Find the first colon to extract the listen port + firstColon := strings.Index(target, ":") + if firstColon == -1 { + return 0, "", fmt.Errorf("invalid target format, no colon found: %s", target) + } + + listenPortStr := target[:firstColon] + var listenPort int + _, err := fmt.Sscanf(listenPortStr, "%d", &listenPort) + if err != nil { + return 0, "", fmt.Errorf("invalid listen port: %s", listenPortStr) + } + if listenPort <= 0 || listenPort > 65535 { + return 0, "", fmt.Errorf("listen port out of range: %d", listenPort) + } + + // The remainder is host:targetPort - use net.SplitHostPort which handles IPv6 brackets + remainder := target[firstColon+1:] + host, targetPort, err := net.SplitHostPort(remainder) + if err != nil { + return 0, "", fmt.Errorf("invalid host:port format '%s': %w", remainder, err) + } + + // Reject empty host or target port + if host == "" { + return 0, "", fmt.Errorf("empty host in target: %s", target) + } + if targetPort == "" { + return 0, "", fmt.Errorf("empty target port in target: %s", target) + } + + // Reconstruct the target address using JoinHostPort (handles IPv6 properly) + targetAddr := net.JoinHostPort(host, targetPort) + + return listenPort, targetAddr, nil +} + func updateTargets(pm *proxy.ProxyManager, action string, tunnelIP string, proto string, targetData TargetData) error { for _, t := range targetData.Targets { - // Split the first number off of the target with : separator and use as the port - parts := strings.Split(t, ":") - if len(parts) != 3 { - logger.Info("Invalid target format: %s", t) - continue - } - - // Get the port as an int - port := 0 - _, err := fmt.Sscanf(parts[0], "%d", &port) + // Parse the target string, handling both IPv4 and IPv6 addresses + port, target, err := parseTargetString(t) if err != nil { - logger.Info("Invalid port: %s", parts[0]) + logger.Info("Invalid target format: %s (%v)", t, err) continue } switch action { case "add": - target := parts[1] + ":" + parts[2] - // Call updown script if provided processedTarget := target if updownScript != "" { @@ -410,8 +446,6 @@ func updateTargets(pm *proxy.ProxyManager, action string, tunnelIP string, proto case "remove": logger.Info("Removing target with port %d", port) - target := parts[1] + ":" + parts[2] - // Call updown script if provided if updownScript != "" { _, err := executeUpdownScript(action, proto, target) @@ -420,7 +454,7 @@ func updateTargets(pm *proxy.ProxyManager, action string, tunnelIP string, proto } } - err := pm.RemoveTarget(proto, tunnelIP, port) + err = pm.RemoveTarget(proto, tunnelIP, port) if err != nil { logger.Error("Failed to remove target: %v", err) return err diff --git a/common_test.go b/common_test.go new file mode 100644 index 0000000..a7e659a --- /dev/null +++ b/common_test.go @@ -0,0 +1,212 @@ +package main + +import ( + "net" + "testing" +) + +func TestParseTargetString(t *testing.T) { + tests := []struct { + name string + input string + wantListenPort int + wantTargetAddr string + wantErr bool + }{ + // IPv4 test cases + { + name: "valid IPv4 basic", + input: "3001:192.168.1.1:80", + wantListenPort: 3001, + wantTargetAddr: "192.168.1.1:80", + wantErr: false, + }, + { + name: "valid IPv4 localhost", + input: "8080:127.0.0.1:3000", + wantListenPort: 8080, + wantTargetAddr: "127.0.0.1:3000", + wantErr: false, + }, + { + name: "valid IPv4 same ports", + input: "443:10.0.0.1:443", + wantListenPort: 443, + wantTargetAddr: "10.0.0.1:443", + wantErr: false, + }, + + // IPv6 test cases + { + name: "valid IPv6 loopback", + input: "3001:[::1]:8080", + wantListenPort: 3001, + wantTargetAddr: "[::1]:8080", + wantErr: false, + }, + { + name: "valid IPv6 full address", + input: "80:[fd70:1452:b736:4dd5:caca:7db9:c588:f5b3]:8080", + wantListenPort: 80, + wantTargetAddr: "[fd70:1452:b736:4dd5:caca:7db9:c588:f5b3]:8080", + wantErr: false, + }, + { + name: "valid IPv6 link-local", + input: "443:[fe80::1]:443", + wantListenPort: 443, + wantTargetAddr: "[fe80::1]:443", + wantErr: false, + }, + { + name: "valid IPv6 all zeros compressed", + input: "8000:[::]:9000", + wantListenPort: 8000, + wantTargetAddr: "[::]:9000", + wantErr: false, + }, + { + name: "valid IPv6 mixed notation", + input: "5000:[::ffff:192.168.1.1]:6000", + wantListenPort: 5000, + wantTargetAddr: "[::ffff:192.168.1.1]:6000", + wantErr: false, + }, + + // Hostname test cases + { + name: "valid hostname", + input: "8080:example.com:80", + wantListenPort: 8080, + wantTargetAddr: "example.com:80", + wantErr: false, + }, + { + name: "valid hostname with subdomain", + input: "443:api.example.com:8443", + wantListenPort: 443, + wantTargetAddr: "api.example.com:8443", + wantErr: false, + }, + { + name: "valid localhost hostname", + input: "3000:localhost:3000", + wantListenPort: 3000, + wantTargetAddr: "localhost:3000", + wantErr: false, + }, + + // Error cases + { + name: "invalid - no colons", + input: "invalid", + wantErr: true, + }, + { + name: "invalid - empty string", + input: "", + wantErr: true, + }, + { + name: "invalid - non-numeric listen port", + input: "abc:192.168.1.1:80", + wantErr: true, + }, + { + name: "invalid - missing target port", + input: "3001:192.168.1.1", + wantErr: true, + }, + { + name: "invalid - IPv6 without brackets", + input: "3001:fd70:1452:b736:4dd5:caca:7db9:c588:f5b3:80", + wantErr: true, + }, + { + name: "invalid - only listen port", + input: "3001:", + wantErr: true, + }, + { + name: "invalid - missing host", + input: "3001::80", + wantErr: true, + }, + { + name: "invalid - IPv6 unclosed bracket", + input: "3001:[::1:80", + wantErr: true, + }, + { + name: "invalid - listen port zero", + input: "0:192.168.1.1:80", + wantErr: true, + }, + { + name: "invalid - listen port negative", + input: "-1:192.168.1.1:80", + wantErr: true, + }, + { + name: "invalid - listen port out of range", + input: "70000:192.168.1.1:80", + wantErr: true, + }, + { + name: "invalid - empty target port", + input: "3001:192.168.1.1:", + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + listenPort, targetAddr, err := parseTargetString(tt.input) + + if (err != nil) != tt.wantErr { + t.Errorf("parseTargetString(%q) error = %v, wantErr %v", tt.input, err, tt.wantErr) + return + } + + if tt.wantErr { + return // Don't check other values if we expected an error + } + + if listenPort != tt.wantListenPort { + t.Errorf("parseTargetString(%q) listenPort = %d, want %d", tt.input, listenPort, tt.wantListenPort) + } + + if targetAddr != tt.wantTargetAddr { + t.Errorf("parseTargetString(%q) targetAddr = %q, want %q", tt.input, targetAddr, tt.wantTargetAddr) + } + }) + } +} + +// TestParseTargetStringNetDialCompatibility verifies that the output is compatible with net.Dial +func TestParseTargetStringNetDialCompatibility(t *testing.T) { + tests := []struct { + name string + input string + }{ + {"IPv4", "8080:127.0.0.1:80"}, + {"IPv6 loopback", "8080:[::1]:80"}, + {"IPv6 full", "8080:[2001:db8::1]:80"}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + _, targetAddr, err := parseTargetString(tt.input) + if err != nil { + t.Fatalf("parseTargetString(%q) unexpected error: %v", tt.input, err) + } + + // Verify the format is valid for net.Dial by checking it can be split back + // This doesn't actually dial, just validates the format + _, _, err = net.SplitHostPort(targetAddr) + if err != nil { + t.Errorf("parseTargetString(%q) produced invalid net.Dial format %q: %v", tt.input, targetAddr, err) + } + }) + } +} From accac75a5320c9c03c0143fc136032ae3c63a973 Mon Sep 17 00:00:00 2001 From: Owen Date: Sun, 8 Mar 2026 11:26:22 -0700 Subject: [PATCH 11/28] Set newt version in dockerfile --- .github/workflows/cicd.yml | 3 +++ Dockerfile | 3 ++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index d0af856..3082333 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -269,6 +269,7 @@ jobs: context: . push: true platforms: linux/amd64 + build-args: VERSION=${{ env.TAG }} tags: | ${{ env.GHCR_IMAGE }}:amd64-${{ env.TAG }} ${{ env.DOCKERHUB_IMAGE }}:amd64-${{ env.TAG }} @@ -393,6 +394,7 @@ jobs: context: . push: true platforms: linux/arm64 + build-args: VERSION=${{ env.TAG }} tags: | ${{ env.GHCR_IMAGE }}:arm64-${{ env.TAG }} ${{ env.DOCKERHUB_IMAGE }}:arm64-${{ env.TAG }} @@ -509,6 +511,7 @@ jobs: context: . push: true platforms: linux/arm/v7 + build-args: VERSION=${{ env.TAG }} tags: | ${{ env.GHCR_IMAGE }}:armv7-${{ env.TAG }} ${{ env.DOCKERHUB_IMAGE }}:armv7-${{ env.TAG }} diff --git a/Dockerfile b/Dockerfile index 25113a9..ea870c2 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,8 @@ RUN go mod download COPY . . # Build the application -RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w" -o /newt +ARG VERSION=dev +RUN CGO_ENABLED=0 GOOS=linux go build -ldflags="-s -w -X main.newtVersion=${VERSION}" -o /newt FROM public.ecr.aws/docker/library/alpine:3.23 AS runner From bc44ca1aba7188a8d59a61e014565221b7fba2bc Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 9 Mar 2026 10:28:02 +0000 Subject: [PATCH 12/28] chore(deps): bump docker/build-push-action from 6.19.2 to 7.0.0 Bumps [docker/build-push-action](https://github.com/docker/build-push-action) from 6.19.2 to 7.0.0. - [Release notes](https://github.com/docker/build-push-action/releases) - [Commits](https://github.com/docker/build-push-action/compare/10e90e3645eae34f1e60eeb005ba3a3d33f178e8...d08e5c354a6adb9ed34480a06d141179aa583294) --- updated-dependencies: - dependency-name: docker/build-push-action dependency-version: 7.0.0 dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/cicd.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/cicd.yml b/.github/workflows/cicd.yml index 3082333..3f3eeb7 100644 --- a/.github/workflows/cicd.yml +++ b/.github/workflows/cicd.yml @@ -264,7 +264,7 @@ jobs: # Build ONLY amd64 and push arch-specific tag suffixes used later for manifest creation. - name: Build and push (amd64 -> *:amd64-TAG) id: build_amd - uses: docker/build-push-action@10e90e3645eae34f1e60eeb005ba3a3d33f178e8 # v6.19.2 + uses: docker/build-push-action@d08e5c354a6adb9ed34480a06d141179aa583294 # v7.0.0 with: context: . push: true @@ -389,7 +389,7 @@ jobs: # Build ONLY arm64 and push arch-specific tag suffixes used later for manifest creation. - name: Build and push (arm64 -> *:arm64-TAG) id: build_arm - uses: docker/build-push-action@10e90e3645eae34f1e60eeb005ba3a3d33f178e8 # v6.19.2 + uses: docker/build-push-action@d08e5c354a6adb9ed34480a06d141179aa583294 # v7.0.0 with: context: . push: true @@ -506,7 +506,7 @@ jobs: - name: Build and push (arm/v7 -> *:armv7-TAG) id: build_armv7 - uses: docker/build-push-action@10e90e3645eae34f1e60eeb005ba3a3d33f178e8 # v6.19.2 + uses: docker/build-push-action@d08e5c354a6adb9ed34480a06d141179aa583294 # v7.0.0 with: context: . push: true From d7741df514b0280b2885f1a3ee7d491c9a91bfb2 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" Date: Mon, 9 Mar 2026 10:29:50 +0000 Subject: [PATCH 13/28] chore(nix): fix hash for updated go dependencies --- flake.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index 6c071ad..8fed719 100644 --- a/flake.nix +++ b/flake.nix @@ -35,7 +35,7 @@ inherit version; src = pkgs.nix-gitignore.gitignoreSource [ ] ./.; - vendorHash = "sha256-kmQM8Yy5TuOiNpMpUme/2gfE+vrhUK+0AphN+p71wGs="; + vendorHash = "sha256-vy6Dqjek7pLdASbCrM9snq5Dt9lbwNJ0AuQboy1JWNQ="; nativeInstallCheckInputs = [ pkgs.versionCheckHook ]; From d68a13ea1fe50a4f36ac74f52cf093999c1aaf5f Mon Sep 17 00:00:00 2001 From: Laurence Date: Tue, 10 Mar 2026 13:53:39 +0000 Subject: [PATCH 14/28] feat(installer): prefer /usr/local/bin and improve POSIX compatibility - Always install to /usr/local/bin instead of ~/.local/bin - Use sudo automatically when write access is needed - Replace bash-specific syntax with POSIX equivalents: - Change shebang from #!/bin/bash to #!/bin/sh - Replace [[ == *pattern* ]] with case statements - Replace echo -e with printf for colored output - Script now works with dash, ash, busybox sh, and bash --- get-newt.sh | 119 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 73 insertions(+), 46 deletions(-) diff --git a/get-newt.sh b/get-newt.sh index d57f69a..d4ddd3f 100644 --- a/get-newt.sh +++ b/get-newt.sh @@ -1,7 +1,7 @@ -#!/bin/bash +#!/bin/sh # Get Newt - Cross-platform installation script -# Usage: curl -fsSL https://raw.githubusercontent.com/fosrl/newt/refs/heads/main/get-newt.sh | bash +# Usage: curl -fsSL https://raw.githubusercontent.com/fosrl/newt/refs/heads/main/get-newt.sh | sh set -e @@ -17,15 +17,15 @@ GITHUB_API_URL="https://api.github.com/repos/${REPO}/releases/latest" # Function to print colored output print_status() { - echo -e "${GREEN}[INFO]${NC} $1" + printf '%b[INFO]%b %s\n' "${GREEN}" "${NC}" "$1" } print_warning() { - echo -e "${YELLOW}[WARN]${NC} $1" + printf '%b[WARN]%b %s\n' "${YELLOW}" "${NC}" "$1" } print_error() { - echo -e "${RED}[ERROR]${NC} $1" + printf '%b[ERROR]%b %s\n' "${RED}" "${NC}" "$1" } # Function to get latest version from GitHub API @@ -113,16 +113,34 @@ get_install_dir() { if [ "$OS" = "windows" ]; then echo "$HOME/bin" else - # Try to use a directory in PATH, fallback to ~/.local/bin - if echo "$PATH" | grep -q "/usr/local/bin"; then - if [ -w "/usr/local/bin" ] 2>/dev/null; then - echo "/usr/local/bin" - else - echo "$HOME/.local/bin" - fi + # Prefer /usr/local/bin for system-wide installation + echo "/usr/local/bin" + fi +} + +# Check if we need sudo for installation +needs_sudo() { + local install_dir="$1" + if [ -w "$install_dir" ] 2>/dev/null; then + return 1 # No sudo needed + else + return 0 # Sudo needed + fi +} + +# Get the appropriate command prefix (sudo or empty) +get_sudo_cmd() { + local install_dir="$1" + if needs_sudo "$install_dir"; then + if command -v sudo >/dev/null 2>&1; then + echo "sudo" else - echo "$HOME/.local/bin" + print_error "Cannot write to ${install_dir} and sudo is not available." + print_error "Please run this script as root or install sudo." + exit 1 fi + else + echo "" fi } @@ -130,21 +148,24 @@ get_install_dir() { install_newt() { local platform="$1" local install_dir="$2" + local sudo_cmd="$3" local binary_name="newt_${platform}" local exe_suffix="" - + # Add .exe suffix for Windows - if [[ "$platform" == *"windows"* ]]; then - binary_name="${binary_name}.exe" - exe_suffix=".exe" - fi - + case "$platform" in + *windows*) + binary_name="${binary_name}.exe" + exe_suffix=".exe" + ;; + esac + local download_url="${BASE_URL}/${binary_name}" local temp_file="/tmp/newt${exe_suffix}" local final_path="${install_dir}/newt${exe_suffix}" - + print_status "Downloading newt from ${download_url}" - + # Download the binary if command -v curl >/dev/null 2>&1; then curl -fsSL "$download_url" -o "$temp_file" @@ -154,18 +175,22 @@ install_newt() { print_error "Neither curl nor wget is available. Please install one of them." exit 1 fi - + + # Make executable before moving + chmod +x "$temp_file" + # Create install directory if it doesn't exist - mkdir -p "$install_dir" - - # Move binary to install directory - mv "$temp_file" "$final_path" - - # Make executable (not needed on Windows, but doesn't hurt) - chmod +x "$final_path" - + if [ -n "$sudo_cmd" ]; then + $sudo_cmd mkdir -p "$install_dir" + print_status "Using sudo to install to ${install_dir}" + $sudo_cmd mv "$temp_file" "$final_path" + else + mkdir -p "$install_dir" + mv "$temp_file" "$final_path" + fi + print_status "newt installed to ${final_path}" - + # Check if install directory is in PATH if ! echo "$PATH" | grep -q "$install_dir"; then print_warning "Install directory ${install_dir} is not in your PATH." @@ -179,9 +204,9 @@ verify_installation() { local install_dir="$1" local exe_suffix="" - if [[ "$PLATFORM" == *"windows"* ]]; then - exe_suffix=".exe" - fi + case "$PLATFORM" in + *windows*) exe_suffix=".exe" ;; + esac local newt_path="${install_dir}/newt${exe_suffix}" @@ -198,34 +223,36 @@ verify_installation() { # Main installation process main() { print_status "Installing latest version of newt..." - + # Get latest version print_status "Fetching latest version from GitHub..." VERSION=$(get_latest_version) print_status "Latest version: v${VERSION}" - + # Set base URL with the fetched version BASE_URL="https://github.com/${REPO}/releases/download/${VERSION}" - + # Detect platform PLATFORM=$(detect_platform) print_status "Detected platform: ${PLATFORM}" - + # Get install directory INSTALL_DIR=$(get_install_dir) print_status "Install directory: ${INSTALL_DIR}" - + + # Check if we need sudo + SUDO_CMD=$(get_sudo_cmd "$INSTALL_DIR") + if [ -n "$SUDO_CMD" ]; then + print_status "Root privileges required for installation to ${INSTALL_DIR}" + fi + # Install newt - install_newt "$PLATFORM" "$INSTALL_DIR" - + install_newt "$PLATFORM" "$INSTALL_DIR" "$SUDO_CMD" + # Verify installation if verify_installation "$INSTALL_DIR"; then print_status "newt is ready to use!" - if [[ "$PLATFORM" == *"windows"* ]]; then - print_status "Run 'newt --help' to get started" - else - print_status "Run 'newt --help' to get started" - fi + print_status "Run 'newt --help' to get started" else exit 1 fi From 836144aebf9336143dfcb29bb1656df03385520f Mon Sep 17 00:00:00 2001 From: Laurence Date: Thu, 12 Mar 2026 09:22:50 +0000 Subject: [PATCH 15/28] feat(admin): Add pprof endpoints To aid us in debugging user issues with memory or leaks we need to be able for the user to configure pprof, wait and then provide us the output files to see where memory/leaks occur in actual runtimes --- main.go | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/main.go b/main.go index 9c373b0..9637377 100644 --- a/main.go +++ b/main.go @@ -10,6 +10,7 @@ import ( "fmt" "net" "net/http" + "net/http/pprof" "net/netip" "os" "os/signal" @@ -147,6 +148,7 @@ var ( adminAddr string region string metricsAsyncBytes bool + pprofEnabled bool blueprintFile string noCloud bool @@ -225,6 +227,7 @@ func runNewtMain(ctx context.Context) { adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR") regionEnv := os.Getenv("NEWT_REGION") asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES") + pprofEnabledEnv := os.Getenv("NEWT_PPROF_ENABLED") disableClientsEnv := os.Getenv("DISABLE_CLIENTS") disableClients = disableClientsEnv == "true" @@ -390,6 +393,14 @@ func runNewtMain(ctx context.Context) { metricsAsyncBytes = v } } + // pprof debug endpoint toggle + if pprofEnabledEnv == "" { + flag.BoolVar(&pprofEnabled, "pprof", false, "Enable pprof debug endpoints on admin server") + } else { + if v, err := strconv.ParseBool(pprofEnabledEnv); err == nil { + pprofEnabled = v + } + } // Optional region flag (resource attribute) if regionEnv == "" { flag.StringVar(®ion, "region", "", "Optional region resource attribute (also NEWT_REGION)") @@ -485,6 +496,14 @@ func runNewtMain(ctx context.Context) { if tel.PrometheusHandler != nil { mux.Handle("/metrics", tel.PrometheusHandler) } + if pprofEnabled { + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + logger.Info("pprof debugging enabled on %s/debug/pprof/", tcfg.AdminAddr) + } admin := &http.Server{ Addr: tcfg.AdminAddr, Handler: otelhttp.NewHandler(mux, "newt-admin"), From ef03b4566dbb923ad919569a43b3c8047cd8298a Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 11 Mar 2026 16:47:13 -0700 Subject: [PATCH 16/28] Allow passing public dns into resolve --- clients/clients.go | 5 +-- holepunch/holepunch.go | 40 ++++++++++++------ holepunch/tester.go | 20 ++++++--- util/util.go | 94 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 137 insertions(+), 22 deletions(-) diff --git a/clients/clients.go b/clients/clients.go index 05ed3cf..3e062b3 100644 --- a/clients/clients.go +++ b/clients/clients.go @@ -162,9 +162,8 @@ func NewWireGuardService(interfaceName string, port uint16, mtu int, host string useNativeInterface: useNativeInterface, } - // Create the holepunch manager with ResolveDomain function - // We'll need to pass a domain resolver function - service.holePunchManager = holepunch.NewManager(sharedBind, newtId, "newt", key.PublicKey().String()) + // Create the holepunch manager + service.holePunchManager = holepunch.NewManager(sharedBind, newtId, "newt", key.PublicKey().String(), nil) // Register websocket handlers wsClient.RegisterHandler("newt/wg/receive-config", service.handleConfig) diff --git a/holepunch/holepunch.go b/holepunch/holepunch.go index 85679a9..8000837 100644 --- a/holepunch/holepunch.go +++ b/holepunch/holepunch.go @@ -27,16 +27,17 @@ type ExitNode struct { // Manager handles UDP hole punching operations type Manager struct { - mu sync.Mutex - running bool - stopChan chan struct{} - sharedBind *bind.SharedBind - ID string - token string - publicKey string - clientType string - exitNodes map[string]ExitNode // key is endpoint - updateChan chan struct{} // signals the goroutine to refresh exit nodes + mu sync.Mutex + running bool + stopChan chan struct{} + sharedBind *bind.SharedBind + ID string + token string + publicKey string + clientType string + exitNodes map[string]ExitNode // key is endpoint + updateChan chan struct{} // signals the goroutine to refresh exit nodes + publicDNS []string sendHolepunchInterval time.Duration sendHolepunchIntervalMin time.Duration @@ -49,12 +50,13 @@ const defaultSendHolepunchIntervalMax = 60 * time.Second const defaultSendHolepunchIntervalMin = 1 * time.Second // NewManager creates a new hole punch manager -func NewManager(sharedBind *bind.SharedBind, ID string, clientType string, publicKey string) *Manager { +func NewManager(sharedBind *bind.SharedBind, ID string, clientType string, publicKey string, publicDNS []string) *Manager { return &Manager{ sharedBind: sharedBind, ID: ID, clientType: clientType, publicKey: publicKey, + publicDNS: publicDNS, exitNodes: make(map[string]ExitNode), sendHolepunchInterval: defaultSendHolepunchIntervalMin, sendHolepunchIntervalMin: defaultSendHolepunchIntervalMin, @@ -281,7 +283,13 @@ func (m *Manager) TriggerHolePunch() error { // Send hole punch to all exit nodes successCount := 0 for _, exitNode := range currentExitNodes { - host, err := util.ResolveDomain(exitNode.Endpoint) + var host string + var err error + if len(m.publicDNS) > 0 { + host, err = util.ResolveDomainUpstream(exitNode.Endpoint, m.publicDNS) + } else { + host, err = util.ResolveDomain(exitNode.Endpoint) + } if err != nil { logger.Warn("Failed to resolve endpoint %s: %v", exitNode.Endpoint, err) continue @@ -392,7 +400,13 @@ func (m *Manager) runMultipleExitNodes() { var resolvedNodes []resolvedExitNode for _, exitNode := range currentExitNodes { - host, err := util.ResolveDomain(exitNode.Endpoint) + var host string + var err error + if len(m.publicDNS) > 0 { + host, err = util.ResolveDomainUpstream(exitNode.Endpoint, m.publicDNS) + } else { + host, err = util.ResolveDomain(exitNode.Endpoint) + } if err != nil { logger.Warn("Failed to resolve endpoint %s: %v", exitNode.Endpoint, err) continue diff --git a/holepunch/tester.go b/holepunch/tester.go index 9fb83df..85b8c89 100644 --- a/holepunch/tester.go +++ b/holepunch/tester.go @@ -49,10 +49,11 @@ type cachedAddr struct { // HolepunchTester monitors holepunch connectivity using magic packets type HolepunchTester struct { - sharedBind *bind.SharedBind - mu sync.RWMutex - running bool - stopChan chan struct{} + sharedBind *bind.SharedBind + publicDNS []string + mu sync.RWMutex + running bool + stopChan chan struct{} // Pending requests waiting for responses (key: echo data as string) pendingRequests sync.Map // map[string]*pendingRequest @@ -84,9 +85,10 @@ type pendingRequest struct { } // NewHolepunchTester creates a new holepunch tester using the given SharedBind -func NewHolepunchTester(sharedBind *bind.SharedBind) *HolepunchTester { +func NewHolepunchTester(sharedBind *bind.SharedBind, publicDNS []string) *HolepunchTester { return &HolepunchTester{ sharedBind: sharedBind, + publicDNS: publicDNS, addrCache: make(map[string]*cachedAddr), addrCacheTTL: 5 * time.Minute, // Cache addresses for 5 minutes } @@ -169,7 +171,13 @@ func (t *HolepunchTester) resolveEndpoint(endpoint string) (*net.UDPAddr, error) } // Resolve the endpoint - host, err := util.ResolveDomain(endpoint) + var host string + var err error + if len(t.publicDNS) > 0 { + host, err = util.ResolveDomainUpstream(endpoint, t.publicDNS) + } else { + host, err = util.ResolveDomain(endpoint) + } if err != nil { host = endpoint } diff --git a/util/util.go b/util/util.go index 58221c4..0ce5dee 100644 --- a/util/util.go +++ b/util/util.go @@ -1,6 +1,7 @@ package util import ( + "context" "encoding/base64" "encoding/binary" "encoding/hex" @@ -14,6 +15,99 @@ import ( "golang.zx2c4.com/wireguard/device" ) +func ResolveDomainUpstream(domain string, publicDNS []string) (string, error) { + // trim whitespace + domain = strings.TrimSpace(domain) + + // Remove any protocol prefix if present (do this first, before splitting host/port) + domain = strings.TrimPrefix(domain, "http://") + domain = strings.TrimPrefix(domain, "https://") + + // if there are any trailing slashes, remove them + domain = strings.TrimSuffix(domain, "/") + + // Check if there's a port in the domain + host, port, err := net.SplitHostPort(domain) + if err != nil { + // No port found, use the domain as is + host = domain + port = "" + } + + // Check if host is already an IP address (IPv4 or IPv6) + // For IPv6, the host from SplitHostPort will already have brackets stripped + // but if there was no port, we need to handle bracketed IPv6 addresses + cleanHost := strings.TrimPrefix(strings.TrimSuffix(host, "]"), "[") + if ip := net.ParseIP(cleanHost); ip != nil { + // It's already an IP address, no need to resolve + ipAddr := ip.String() + if port != "" { + return net.JoinHostPort(ipAddr, port), nil + } + return ipAddr, nil + } + + // Lookup IP addresses using the upstream DNS servers if provided + var ips []net.IP + if len(publicDNS) > 0 { + var lastErr error + for _, server := range publicDNS { + // Ensure the upstream DNS address has a port + dnsAddr := server + if _, _, err := net.SplitHostPort(dnsAddr); err != nil { + // No port specified, default to 53 + dnsAddr = net.JoinHostPort(server, "53") + } + + resolver := &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + d := net.Dialer{} + return d.DialContext(ctx, "udp", dnsAddr) + }, + } + ips, lastErr = resolver.LookupIP(context.Background(), "ip", host) + if lastErr == nil { + break + } + } + if lastErr != nil { + return "", fmt.Errorf("DNS lookup failed using all upstream servers: %v", lastErr) + } + } else { + ips, err = net.LookupIP(host) + if err != nil { + return "", fmt.Errorf("DNS lookup failed: %v", err) + } + } + + if len(ips) == 0 { + return "", fmt.Errorf("no IP addresses found for domain %s", host) + } + + // Get the first IPv4 address if available + var ipAddr string + for _, ip := range ips { + if ipv4 := ip.To4(); ipv4 != nil { + ipAddr = ipv4.String() + break + } + } + + // If no IPv4 found, use the first IP (might be IPv6) + if ipAddr == "" { + ipAddr = ips[0].String() + } + + // Add port back if it existed + if port != "" { + ipAddr = net.JoinHostPort(ipAddr, port) + } + + return ipAddr, nil +} + + func ResolveDomain(domain string) (string, error) { // trim whitespace domain = strings.TrimSpace(domain) From 539e595c4821107b26b8e2ad390b547076bc81df Mon Sep 17 00:00:00 2001 From: Owen Date: Thu, 12 Mar 2026 17:49:05 -0700 Subject: [PATCH 17/28] Add optional compression --- websocket/client.go | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/websocket/client.go b/websocket/client.go index da1fa88..c4daf5f 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -2,6 +2,7 @@ package websocket import ( "bytes" + "compress/gzip" "crypto/tls" "crypto/x509" "encoding/json" @@ -709,10 +710,13 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) { disconnectResult = "success" return default: - var msg WSMessage - err := c.conn.ReadJSON(&msg) + msgType, p, err := c.conn.ReadMessage() if err == nil { - telemetry.IncWSMessage(c.metricsContext(), "in", "text") + if msgType == websocket.BinaryMessage { + telemetry.IncWSMessage(c.metricsContext(), "in", "binary") + } else { + telemetry.IncWSMessage(c.metricsContext(), "in", "text") + } } if err != nil { // Check if we're shutting down before logging error @@ -737,6 +741,29 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) { } } + var data []byte + if msgType == websocket.BinaryMessage { + gr, err := gzip.NewReader(bytes.NewReader(p)) + if err != nil { + logger.Error("WebSocket failed to create gzip reader: %v", err) + continue + } + data, err = io.ReadAll(gr) + gr.Close() + if err != nil { + logger.Error("WebSocket failed to decompress message: %v", err) + continue + } + } else { + data = p + } + + var msg WSMessage + if err = json.Unmarshal(data, &msg); err != nil { + logger.Error("WebSocket failed to parse message: %v", err) + continue + } + c.handlersMux.RLock() if handler, ok := c.handlers[msg.Type]; ok { handler(msg) From c7b01288e0ccd0228cb67a4dac2fb087df301b4f Mon Sep 17 00:00:00 2001 From: Owen Date: Fri, 13 Mar 2026 11:45:36 -0700 Subject: [PATCH 18/28] Clean up previous logging --- clients/clients.go | 2 -- main.go | 2 -- 2 files changed, 4 deletions(-) diff --git a/clients/clients.go b/clients/clients.go index 05ed3cf..537848d 100644 --- a/clients/clients.go +++ b/clients/clients.go @@ -112,8 +112,6 @@ func NewWireGuardService(interfaceName string, port uint16, mtu int, host string return nil, fmt.Errorf("failed to generate private key: %v", err) } - logger.Debug("+++++++++++++++++++++++++++++++= the port is %d", port) - if port == 0 { // Find an available port portRandom, err := util.FindAvailableUDPPort(49152, 65535) diff --git a/main.go b/main.go index 9c373b0..fc6a890 100644 --- a/main.go +++ b/main.go @@ -619,8 +619,6 @@ func runNewtMain(ctx context.Context) { var wgData WgData var dockerEventMonitor *docker.EventMonitor - logger.Debug("++++++++++++++++++++++ the port is %d", port) - if !disableClients { setupClients(client) } From bf029b7bb295d41a9b9119332ca92cfb6228800e Mon Sep 17 00:00:00 2001 From: Owen Date: Sat, 14 Mar 2026 11:57:37 -0700 Subject: [PATCH 19/28] Clean up to match olm --- websocket/client.go | 95 ++++++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 44 deletions(-) diff --git a/websocket/client.go b/websocket/client.go index 8b8893a..dd3f39a 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -661,7 +661,57 @@ func (c *Client) setupPKCS12TLS() (*tls.Config, error) { } // pingMonitor sends pings at a short interval and triggers reconnect on failure +func (c *Client) sendPing() { + if c.conn == nil { + return + } + + // Skip ping if a message is currently being processed + c.processingMux.RLock() + isProcessing := c.processingMessage + c.processingMux.RUnlock() + if isProcessing { + logger.Debug("Skipping ping, message is being processed") + return + } + + c.configVersionMux.RLock() + configVersion := c.configVersion + c.configVersionMux.RUnlock() + + pingMsg := WSMessage{ + Type: "newt/ping", + Data: map[string]interface{}{}, + ConfigVersion: configVersion, + } + + c.writeMux.Lock() + err := c.conn.WriteJSON(pingMsg) + if err == nil { + telemetry.IncWSMessage(c.metricsContext(), "out", "ping") + } + c.writeMux.Unlock() + + if err != nil { + // Check if we're shutting down before logging error and reconnecting + select { + case <-c.done: + // Expected during shutdown + return + default: + logger.Error("Ping failed: %v", err) + telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write") + telemetry.IncWSReconnect(c.metricsContext(), "ping_write") + c.reconnect() + return + } + } +} + func (c *Client) pingMonitor() { + // Send an immediate ping as soon as we connect + c.sendPing() + ticker := time.NewTicker(c.pingInterval) defer ticker.Stop() @@ -670,50 +720,7 @@ func (c *Client) pingMonitor() { case <-c.done: return case <-ticker.C: - if c.conn == nil { - return - } - - // Skip ping if a message is currently being processed - c.processingMux.RLock() - isProcessing := c.processingMessage - c.processingMux.RUnlock() - if isProcessing { - logger.Debug("Skipping ping, message is being processed") - continue - } - - c.configVersionMux.RLock() - configVersion := c.configVersion - c.configVersionMux.RUnlock() - - pingMsg := WSMessage{ - Type: "newt/ping", - Data: map[string]interface{}{}, - ConfigVersion: configVersion, - } - - c.writeMux.Lock() - err := c.conn.WriteJSON(pingMsg) - if err == nil { - telemetry.IncWSMessage(c.metricsContext(), "out", "ping") - } - c.writeMux.Unlock() - - if err != nil { - // Check if we're shutting down before logging error and reconnecting - select { - case <-c.done: - // Expected during shutdown - return - default: - logger.Error("Ping failed: %v", err) - telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write") - telemetry.IncWSReconnect(c.metricsContext(), "ping_write") - c.reconnect() - return - } - } + c.sendPing() } } } From d4ebb3e2afcc50f5e12f62464de2f122d52897bd Mon Sep 17 00:00:00 2001 From: Owen Date: Sun, 15 Mar 2026 17:42:03 -0700 Subject: [PATCH 20/28] Send disconnecting message --- main.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 1bc1bce..b669af2 100644 --- a/main.go +++ b/main.go @@ -618,7 +618,7 @@ func runNewtMain(ctx context.Context) { var connected bool var wgData WgData var dockerEventMonitor *docker.EventMonitor - + if !disableClients { setupClients(client) } @@ -1197,7 +1197,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( len(syncData.Targets.TCP), len(syncData.Targets.UDP), len(syncData.HealthCheckTargets)) //TODO: TEST AND IMPLEMENT THIS - + // // Build sets of desired targets (port -> target string) // desiredTCP := make(map[int]string) // for _, t := range syncData.Targets.TCP { @@ -1794,6 +1794,8 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( pm.Stop() } + client.SendMessage("newt/disconnecting", map[string]any{}) + if client != nil { client.Close() } From 13448f76aa134c922c5111c80deccbe1f72bd11c Mon Sep 17 00:00:00 2001 From: Laurence Date: Mon, 16 Mar 2026 14:11:14 +0000 Subject: [PATCH 21/28] refactor(proxy): cleanup basics - constants, remove dead code, fix deprecated calls - Add maxUDPPacketSize constant to replace magic number 65507 - Remove commented-out code in Stop() - Replace deprecated ne.Temporary() with errors.Is(err, net.ErrClosed) - Use errors.As instead of type assertion for net.Error - Use errors.Is for closed connection checks instead of string matching - Handle closed connection gracefully when reading from UDP target --- proxy/manager.go | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/proxy/manager.go b/proxy/manager.go index 0619e80..5566589 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -21,7 +21,10 @@ import ( "gvisor.dev/gvisor/pkg/tcpip/adapters/gonet" ) -const errUnsupportedProtoFmt = "unsupported protocol: %s" +const ( + errUnsupportedProtoFmt = "unsupported protocol: %s" + maxUDPPacketSize = 65507 +) // Target represents a proxy target with its address and port type Target struct { @@ -105,13 +108,9 @@ func classifyProxyError(err error) string { if errors.Is(err, net.ErrClosed) { return "closed" } - if ne, ok := err.(net.Error); ok { - if ne.Timeout() { - return "timeout" - } - if ne.Temporary() { - return "temporary" - } + var ne net.Error + if errors.As(err, &ne) && ne.Timeout() { + return "timeout" } msg := strings.ToLower(err.Error()) switch { @@ -437,14 +436,6 @@ func (pm *ProxyManager) Stop() error { pm.udpConns = append(pm.udpConns[:i], pm.udpConns[i+1:]...) } - // // Clear the target maps - // for k := range pm.tcpTargets { - // delete(pm.tcpTargets, k) - // } - // for k := range pm.udpTargets { - // delete(pm.udpTargets, k) - // } - // Give active connections a chance to close gracefully time.Sleep(100 * time.Millisecond) @@ -498,7 +489,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) if !pm.running { return } - if ne, ok := err.(net.Error); ok && !ne.Temporary() { + if errors.Is(err, net.ErrClosed) { logger.Info("TCP listener closed, stopping proxy handler for %v", listener.Addr()) return } @@ -564,7 +555,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) } func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { - buffer := make([]byte, 65507) // Max UDP packet size + buffer := make([]byte, maxUDPPacketSize) // Max UDP packet size clientConns := make(map[string]*net.UDPConn) var clientsMutex sync.RWMutex @@ -583,7 +574,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { } // Check for connection closed conditions - if err == io.EOF || strings.Contains(err.Error(), "use of closed network connection") { + if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) { logger.Info("UDP connection closed, stopping proxy handler") // Clean up existing client connections @@ -662,10 +653,14 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionClosed) }() - buffer := make([]byte, 65507) + buffer := make([]byte, maxUDPPacketSize) for { n, _, err := targetConn.ReadFromUDP(buffer) if err != nil { + // Connection closed is normal during cleanup + if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) { + return // defer will handle cleanup, result stays "success" + } logger.Error("Error reading from target: %v", err) result = "failure" return // defer will handle cleanup From 24dfb3a8a2c7be7cad59e3f976609c1b86d3ec4c Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 16 Mar 2026 13:50:45 -0700 Subject: [PATCH 22/28] Remove redundant info --- main.go | 1 - websocket/client.go | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/main.go b/main.go index b669af2..d736153 100644 --- a/main.go +++ b/main.go @@ -566,7 +566,6 @@ func runNewtMain(ctx context.Context) { secret, // CLI arg takes precedence endpoint, 30*time.Second, - pingTimeout, opt, ) if err != nil { diff --git a/websocket/client.go b/websocket/client.go index dd3f39a..533771b 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -38,7 +38,6 @@ type Client struct { isConnected bool reconnectMux sync.RWMutex pingInterval time.Duration - pingTimeout time.Duration onConnect func() error onTokenUpdate func(token string) writeMux sync.Mutex @@ -117,7 +116,7 @@ func (c *Client) MetricsContext() context.Context { } // NewClient creates a new websocket client -func NewClient(clientType string, ID, secret string, endpoint string, pingInterval time.Duration, pingTimeout time.Duration, opts ...ClientOption) (*Client, error) { +func NewClient(clientType string, ID, secret string, endpoint string, pingInterval time.Duration, opts ...ClientOption) (*Client, error) { config := &Config{ ID: ID, Secret: secret, @@ -132,7 +131,6 @@ func NewClient(clientType string, ID, secret string, endpoint string, pingInterv reconnectInterval: 3 * time.Second, isConnected: false, pingInterval: pingInterval, - pingTimeout: pingTimeout, clientType: clientType, } From 8161fa6626a8c0f880ee0a2cf8c8bfe24b85a12f Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 16 Mar 2026 14:33:40 -0700 Subject: [PATCH 23/28] Bump ping interval up --- main.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/main.go b/main.go index d736153..c9e7d8d 100644 --- a/main.go +++ b/main.go @@ -302,10 +302,10 @@ func runNewtMain(ctx context.Context) { flag.StringVar(&dockerSocket, "docker-socket", "", "Path or address to Docker socket (typically unix:///var/run/docker.sock)") } if pingIntervalStr == "" { - flag.StringVar(&pingIntervalStr, "ping-interval", "3s", "Interval for pinging the server (default 3s)") + flag.StringVar(&pingIntervalStr, "ping-interval", "15s", "Interval for pinging the server (default 15s)") } if pingTimeoutStr == "" { - flag.StringVar(&pingTimeoutStr, "ping-timeout", "5s", " Timeout for each ping (default 5s)") + flag.StringVar(&pingTimeoutStr, "ping-timeout", "7s", " Timeout for each ping (default 7s)") } // load the prefer endpoint just as a flag flag.StringVar(&preferEndpoint, "prefer-endpoint", "", "Prefer this endpoint for the connection (if set, will override the endpoint from the server)") @@ -330,21 +330,21 @@ func runNewtMain(ctx context.Context) { if pingIntervalStr != "" { pingInterval, err = time.ParseDuration(pingIntervalStr) if err != nil { - fmt.Printf("Invalid PING_INTERVAL value: %s, using default 3 seconds\n", pingIntervalStr) - pingInterval = 3 * time.Second + fmt.Printf("Invalid PING_INTERVAL value: %s, using default 15 seconds\n", pingIntervalStr) + pingInterval = 15 * time.Second } } else { - pingInterval = 3 * time.Second + pingInterval = 15 * time.Second } if pingTimeoutStr != "" { pingTimeout, err = time.ParseDuration(pingTimeoutStr) if err != nil { - fmt.Printf("Invalid PING_TIMEOUT value: %s, using default 5 seconds\n", pingTimeoutStr) - pingTimeout = 5 * time.Second + fmt.Printf("Invalid PING_TIMEOUT value: %s, using default 7 seconds\n", pingTimeoutStr) + pingTimeout = 7 * time.Second } } else { - pingTimeout = 5 * time.Second + pingTimeout = 7 * time.Second } if dockerEnforceNetworkValidation == "" { From 8fda35db4f2ee08e217ff698a26e3796c80846d3 Mon Sep 17 00:00:00 2001 From: Laurence Date: Wed, 18 Mar 2026 13:37:31 +0000 Subject: [PATCH 24/28] fix(healthcheck): Support ipv6 healthchecks Currently we are doing fmt.sprintf on hostname and port which will not properly handle ipv6 addresses, instead of changing pangolin to send bracketed address a simply net.join can do this for us since we dont need to parse a formatted string --- healthcheck/healthcheck.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go index 9889cc6..f618803 100644 --- a/healthcheck/healthcheck.go +++ b/healthcheck/healthcheck.go @@ -5,7 +5,9 @@ import ( "crypto/tls" "encoding/json" "fmt" + "net" "net/http" + "strconv" "strings" "sync" "time" @@ -365,11 +367,12 @@ func (m *Monitor) performHealthCheck(target *Target) { target.LastCheck = time.Now() target.LastError = "" - // Build URL - url := fmt.Sprintf("%s://%s", target.Config.Scheme, target.Config.Hostname) + // Build URL (use net.JoinHostPort to properly handle IPv6 addresses with ports) + host := target.Config.Hostname if target.Config.Port > 0 { - url = fmt.Sprintf("%s:%d", url, target.Config.Port) + host = net.JoinHostPort(target.Config.Hostname, strconv.Itoa(target.Config.Port)) } + url := fmt.Sprintf("%s://%s", target.Config.Scheme, host) if target.Config.Path != "" { if !strings.HasPrefix(target.Config.Path, "/") { url += "/" From b045a0f5d43e0b54b091177f6b8a013507572464 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 19 Mar 2026 02:14:44 +0000 Subject: [PATCH 25/28] chore(deps): bump google.golang.org/grpc from 1.79.1 to 1.79.3 Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.79.1 to 1.79.3. - [Release notes](https://github.com/grpc/grpc-go/releases) - [Commits](https://github.com/grpc/grpc-go/compare/v1.79.1...v1.79.3) --- updated-dependencies: - dependency-name: google.golang.org/grpc dependency-version: 1.79.3 dependency-type: direct:production ... Signed-off-by: dependabot[bot] --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 2aa8f5e..e494319 100644 --- a/go.mod +++ b/go.mod @@ -24,7 +24,7 @@ require ( golang.zx2c4.com/wireguard v0.0.0-20250521234502-f333402bd9cb golang.zx2c4.com/wireguard/wgctrl v0.0.0-20241231184526-a9ab2273dd10 golang.zx2c4.com/wireguard/windows v0.5.3 - google.golang.org/grpc v1.79.1 + google.golang.org/grpc v1.79.3 gopkg.in/yaml.v3 v3.0.1 gvisor.dev/gvisor v0.0.0-20250503011706-39ed1f5ac29c software.sslmate.com/src/go-pkcs12 v0.7.0 diff --git a/go.sum b/go.sum index d345b1d..0b75184 100644 --- a/go.sum +++ b/go.sum @@ -159,8 +159,8 @@ google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1: google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY= google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac= google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ= -google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY= -google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= +google.golang.org/grpc v1.79.3 h1:sybAEdRIEtvcD68Gx7dmnwjZKlyfuc61Dyo9pGXXkKE= +google.golang.org/grpc v1.79.3/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ= google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= From 212bdf765a592fa744d015ead611df79fea8d1f1 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" Date: Thu, 19 Mar 2026 02:16:03 +0000 Subject: [PATCH 26/28] chore(nix): fix hash for updated go dependencies --- flake.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flake.nix b/flake.nix index 6c071ad..ef1f52e 100644 --- a/flake.nix +++ b/flake.nix @@ -35,7 +35,7 @@ inherit version; src = pkgs.nix-gitignore.gitignoreSource [ ] ./.; - vendorHash = "sha256-kmQM8Yy5TuOiNpMpUme/2gfE+vrhUK+0AphN+p71wGs="; + vendorHash = "sha256-0eK4C42Upqpp01pfjW9+t3NKzadwVlGwwuWXhdpgDz4="; nativeInstallCheckInputs = [ pkgs.versionCheckHook ]; From 1057013b50a7a40e1347f24139669f98c5c792a4 Mon Sep 17 00:00:00 2001 From: Owen Date: Fri, 27 Mar 2026 11:55:34 -0700 Subject: [PATCH 27/28] Add chainId based dedup --- clients/clients.go | 27 ++++++++++++++++++++++++++- common.go | 3 +++ main.go | 38 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/clients/clients.go b/clients/clients.go index 4c64dbd..6130dcb 100644 --- a/clients/clients.go +++ b/clients/clients.go @@ -2,6 +2,8 @@ package clients import ( "context" + "crypto/rand" + "encoding/hex" "encoding/json" "fmt" "net" @@ -34,6 +36,7 @@ type WgConfig struct { IpAddress string `json:"ipAddress"` Peers []Peer `json:"peers"` Targets []Target `json:"targets"` + ChainId string `json:"chainId"` } type Target struct { @@ -82,7 +85,8 @@ type WireGuardService struct { host string serverPubKey string token string - stopGetConfig func() + stopGetConfig func() + pendingConfigChainId string // Netstack fields tun tun.Device tnet *netstack2.Net @@ -107,6 +111,13 @@ type WireGuardService struct { wgTesterServer *wgtester.Server } +// generateChainId generates a random chain ID for deduplicating round-trip messages. +func generateChainId() string { + b := make([]byte, 8) + _, _ = rand.Read(b) + return hex.EncodeToString(b) +} + func NewWireGuardService(interfaceName string, port uint16, mtu int, host string, newtId string, wsClient *websocket.Client, dns string, useNativeInterface bool) (*WireGuardService, error) { key, err := wgtypes.GeneratePrivateKey() if err != nil { @@ -442,9 +453,12 @@ func (s *WireGuardService) LoadRemoteConfig() error { s.stopGetConfig() s.stopGetConfig = nil } + chainId := generateChainId() + s.pendingConfigChainId = chainId s.stopGetConfig = s.client.SendMessageInterval("newt/wg/get-config", map[string]interface{}{ "publicKey": s.key.PublicKey().String(), "port": s.Port, + "chainId": chainId, }, 2*time.Second) logger.Debug("Requesting WireGuard configuration from remote server") @@ -469,6 +483,17 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { logger.Info("Error unmarshaling target data: %v", err) return } + + // Deduplicate using chainId: discard responses that don't match the + // pending request, or that we have already processed. + if config.ChainId != "" { + if config.ChainId != s.pendingConfigChainId { + logger.Debug("Discarding duplicate/stale newt/wg/get-config response (chainId=%s, expected=%s)", config.ChainId, s.pendingConfigChainId) + return + } + s.pendingConfigChainId = "" // consume – further duplicates are rejected + } + s.config = config if s.stopGetConfig != nil { diff --git a/common.go b/common.go index 4701411..707eefa 100644 --- a/common.go +++ b/common.go @@ -287,9 +287,12 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien } stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) // Send registration message to the server for backward compatibility + bcChainId := generateChainId() + pendingRegisterChainId = bcChainId err := client.SendMessage("newt/wg/register", map[string]interface{}{ "publicKey": publicKey.String(), "backwardsCompatible": true, + "chainId": bcChainId, }) if err != nil { logger.Error("Failed to send registration message: %v", err) diff --git a/main.go b/main.go index 3646a27..a79c70d 100644 --- a/main.go +++ b/main.go @@ -3,7 +3,9 @@ package main import ( "bytes" "context" + "crypto/rand" "crypto/tls" + "encoding/hex" "encoding/json" "errors" "flag" @@ -46,6 +48,7 @@ type WgData struct { TunnelIP string `json:"tunnelIP"` Targets TargetsByType `json:"targets"` HealthCheckTargets []healthcheck.Config `json:"healthCheckTargets"` + ChainId string `json:"chainId"` } type TargetsByType struct { @@ -128,6 +131,7 @@ var ( publicKey wgtypes.Key pingStopChan chan struct{} stopFunc func() + pendingRegisterChainId string healthFile string useNativeInterface bool authorizedKeysFile string @@ -161,6 +165,13 @@ var ( tlsPrivateKey string ) +// generateChainId generates a random chain ID for deduplicating round-trip messages. +func generateChainId() string { + b := make([]byte, 8) + _, _ = rand.Read(b) + return hex.EncodeToString(b) +} + func main() { // Check for subcommands first (only principals exits early) if len(os.Args) > 1 { @@ -706,6 +717,24 @@ func runNewtMain(ctx context.Context) { defer func() { telemetry.IncSiteRegistration(ctx, regResult) }() + + // Deduplicate using chainId: if the server echoes back a chainId we have + // already consumed (or one that doesn't match our current pending request), + // throw the message away to avoid setting up the tunnel twice. + var chainData struct { + ChainId string `json:"chainId"` + } + if jsonBytes, err := json.Marshal(msg.Data); err == nil { + _ = json.Unmarshal(jsonBytes, &chainData) + } + if chainData.ChainId != "" { + if chainData.ChainId != pendingRegisterChainId { + logger.Debug("Discarding duplicate/stale newt/wg/connect (chainId=%s, expected=%s)", chainData.ChainId, pendingRegisterChainId) + return + } + pendingRegisterChainId = "" // consume – further duplicates with this id are rejected + } + if stopFunc != nil { stopFunc() // stop the ws from sending more requests stopFunc = nil // reset stopFunc to nil to avoid double stopping @@ -971,10 +1000,13 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( }, } + chainId := generateChainId() + pendingRegisterChainId = chainId stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ "publicKey": publicKey.String(), "pingResults": pingResults, "newtVersion": newtVersion, + "chainId": chainId, }, 2*time.Second) return @@ -1074,10 +1106,13 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( } // Send the ping results to the cloud for selection + chainId := generateChainId() + pendingRegisterChainId = chainId stopFunc = client.SendMessageInterval(topicWGRegister, map[string]interface{}{ "publicKey": publicKey.String(), "pingResults": pingResults, "newtVersion": newtVersion, + "chainId": chainId, }, 2*time.Second) logger.Debug("Sent exit node ping results to cloud for selection: pingResults=%+v", pingResults) @@ -1740,10 +1775,13 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( } // Send registration message to the server for backward compatibility + bcChainId := generateChainId() + pendingRegisterChainId = bcChainId err := client.SendMessage(topicWGRegister, map[string]interface{}{ "publicKey": publicKey.String(), "newtVersion": newtVersion, "backwardsCompatible": true, + "chainId": bcChainId, }) sendBlueprint(client) From cdaf4f7898f057826ba191a59a5e139b299bfa69 Mon Sep 17 00:00:00 2001 From: Owen Date: Sun, 29 Mar 2026 12:00:17 -0700 Subject: [PATCH 28/28] Add chain id to ping --- common.go | 6 +++++- main.go | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/common.go b/common.go index 707eefa..c55909a 100644 --- a/common.go +++ b/common.go @@ -285,7 +285,11 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien if tunnelID != "" { telemetry.IncReconnect(context.Background(), tunnelID, "client", telemetry.ReasonTimeout) } - stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) + pingChainId := generateChainId() + pendingPingChainId = pingChainId + stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{ + "chainId": pingChainId, + }, 3*time.Second) // Send registration message to the server for backward compatibility bcChainId := generateChainId() pendingRegisterChainId = bcChainId diff --git a/main.go b/main.go index a79c70d..e051450 100644 --- a/main.go +++ b/main.go @@ -62,6 +62,7 @@ type TargetData struct { type ExitNodeData struct { ExitNodes []ExitNode `json:"exitNodes"` + ChainId string `json:"chainId"` } // ExitNode represents an exit node with an ID, endpoint, and weight. @@ -132,6 +133,7 @@ var ( pingStopChan chan struct{} stopFunc func() pendingRegisterChainId string + pendingPingChainId string healthFile string useNativeInterface bool authorizedKeysFile string @@ -919,8 +921,11 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( } // Request exit nodes from the server + pingChainId := generateChainId() + pendingPingChainId = pingChainId stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{ "noCloud": noCloud, + "chainId": pingChainId, }, 3*time.Second) logger.Info("Tunnel destroyed, ready for reconnection") @@ -949,6 +954,7 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( client.RegisterHandler("newt/ping/exitNodes", func(msg websocket.WSMessage) { logger.Debug("Received ping message") + if stopFunc != nil { stopFunc() // stop the ws from sending more requests stopFunc = nil // reset stopFunc to nil to avoid double stopping @@ -968,6 +974,14 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( } exitNodes := exitNodeData.ExitNodes + if exitNodeData.ChainId != "" { + if exitNodeData.ChainId != pendingPingChainId { + logger.Debug("Discarding duplicate/stale newt/ping/exitNodes (chainId=%s, expected=%s)", exitNodeData.ChainId, pendingPingChainId) + return + } + pendingPingChainId = "" // consume – further duplicates with this id are rejected + } + if len(exitNodes) == 0 { logger.Info("No exit nodes provided") return @@ -1762,8 +1776,11 @@ persistent_keepalive_interval=5`, util.FixKey(privateKey.String()), util.FixKey( stopFunc() } // request from the server the list of nodes to ping + pingChainId := generateChainId() + pendingPingChainId = pingChainId stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{ "noCloud": noCloud, + "chainId": pingChainId, }, 3*time.Second) logger.Debug("Requesting exit nodes from server")