From 4d33567888fe704fb75f0c0a74ed8a27fd131052 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Wed, 8 Oct 2025 03:12:16 +0200 Subject: [PATCH 1/4] [client] Remove endpoint address on peer disconnect, retain status for activity recording (#4228) * When a peer disconnects, remove the endpoint address to avoid sending traffic to a non-existent address, but retain the status for the activity recorder. --- client/iface/configurer/kernel_unix.go | 38 ++++++++++++++++ client/iface/configurer/usp.go | 61 ++++++++++++++++++++++++++ client/iface/device/interface.go | 1 + client/iface/iface.go | 11 +++++ client/internal/engine_test.go | 4 ++ client/internal/iface_common.go | 1 + client/internal/peer/conn.go | 6 +++ client/internal/peer/iface.go | 1 + 8 files changed, 123 insertions(+) diff --git a/client/iface/configurer/kernel_unix.go b/client/iface/configurer/kernel_unix.go index 84afc38f5..96b286175 100644 --- a/client/iface/configurer/kernel_unix.go +++ b/client/iface/configurer/kernel_unix.go @@ -73,6 +73,44 @@ func (c *KernelConfigurer) UpdatePeer(peerKey string, allowedIps []netip.Prefix, return nil } +func (c *KernelConfigurer) RemoveEndpointAddress(peerKey string) error { + peerKeyParsed, err := wgtypes.ParseKey(peerKey) + if err != nil { + return err + } + + // Get the existing peer to preserve its allowed IPs + existingPeer, err := c.getPeer(c.deviceName, peerKey) + if err != nil { + return fmt.Errorf("get peer: %w", err) + } + + removePeerCfg := wgtypes.PeerConfig{ + PublicKey: peerKeyParsed, + Remove: true, + } + + if err := c.configure(wgtypes.Config{Peers: []wgtypes.PeerConfig{removePeerCfg}}); err != nil { + return fmt.Errorf(`error removing peer %s from interface %s: %w`, peerKey, c.deviceName, err) + } + + //Re-add the peer without the endpoint but same AllowedIPs + reAddPeerCfg := wgtypes.PeerConfig{ + PublicKey: peerKeyParsed, + AllowedIPs: existingPeer.AllowedIPs, + ReplaceAllowedIPs: true, + } + + if err := c.configure(wgtypes.Config{Peers: []wgtypes.PeerConfig{reAddPeerCfg}}); err != nil { + return fmt.Errorf( + `error re-adding peer %s to interface %s with allowed IPs %v: %w`, + peerKey, c.deviceName, existingPeer.AllowedIPs, err, + ) + } + + return nil +} + func (c *KernelConfigurer) RemovePeer(peerKey string) error { peerKeyParsed, err := wgtypes.ParseKey(peerKey) if err != nil { diff --git a/client/iface/configurer/usp.go b/client/iface/configurer/usp.go index f744e0127..bc875b73c 100644 --- a/client/iface/configurer/usp.go +++ b/client/iface/configurer/usp.go @@ -106,6 +106,67 @@ func (c *WGUSPConfigurer) UpdatePeer(peerKey string, allowedIps []netip.Prefix, return nil } +func (c *WGUSPConfigurer) RemoveEndpointAddress(peerKey string) error { + peerKeyParsed, err := wgtypes.ParseKey(peerKey) + if err != nil { + return fmt.Errorf("parse peer key: %w", err) + } + + ipcStr, err := c.device.IpcGet() + if err != nil { + return fmt.Errorf("get IPC config: %w", err) + } + + // Parse current status to get allowed IPs for the peer + stats, err := parseStatus(c.deviceName, ipcStr) + if err != nil { + return fmt.Errorf("parse IPC config: %w", err) + } + + var allowedIPs []net.IPNet + found := false + for _, peer := range stats.Peers { + if peer.PublicKey == peerKey { + allowedIPs = peer.AllowedIPs + found = true + break + } + } + if !found { + return fmt.Errorf("peer %s not found", peerKey) + } + + // remove the peer from the WireGuard configuration + peer := wgtypes.PeerConfig{ + PublicKey: peerKeyParsed, + Remove: true, + } + + config := wgtypes.Config{ + Peers: []wgtypes.PeerConfig{peer}, + } + if ipcErr := c.device.IpcSet(toWgUserspaceString(config)); ipcErr != nil { + return fmt.Errorf("failed to remove peer: %s", ipcErr) + } + + // Build the peer config + peer = wgtypes.PeerConfig{ + PublicKey: peerKeyParsed, + ReplaceAllowedIPs: true, + AllowedIPs: allowedIPs, + } + + config = wgtypes.Config{ + Peers: []wgtypes.PeerConfig{peer}, + } + + if err := c.device.IpcSet(toWgUserspaceString(config)); err != nil { + return fmt.Errorf("remove endpoint address: %w", err) + } + + return nil +} + func (c *WGUSPConfigurer) RemovePeer(peerKey string) error { peerKeyParsed, err := wgtypes.ParseKey(peerKey) if err != nil { diff --git a/client/iface/device/interface.go b/client/iface/device/interface.go index 1f40b0d46..db53d9c3a 100644 --- a/client/iface/device/interface.go +++ b/client/iface/device/interface.go @@ -21,4 +21,5 @@ type WGConfigurer interface { GetStats() (map[string]configurer.WGStats, error) FullStats() (*configurer.Stats, error) LastActivities() map[string]monotime.Time + RemoveEndpointAddress(peerKey string) error } diff --git a/client/iface/iface.go b/client/iface/iface.go index 609572561..158672160 100644 --- a/client/iface/iface.go +++ b/client/iface/iface.go @@ -148,6 +148,17 @@ func (w *WGIface) UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAliv return w.configurer.UpdatePeer(peerKey, allowedIps, keepAlive, endpoint, preSharedKey) } +func (w *WGIface) RemoveEndpointAddress(peerKey string) error { + w.mu.Lock() + defer w.mu.Unlock() + if w.configurer == nil { + return ErrIfaceNotFound + } + + log.Debugf("Removing endpoint address: %s", peerKey) + return w.configurer.RemoveEndpointAddress(peerKey) +} + // RemovePeer removes a Wireguard Peer from the interface iface func (w *WGIface) RemovePeer(peerKey string) error { w.mu.Lock() diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index 344104405..2f1098100 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -105,6 +105,10 @@ type MockWGIface struct { LastActivitiesFunc func() map[string]monotime.Time } +func (m *MockWGIface) RemoveEndpointAddress(_ string) error { + return nil +} + func (m *MockWGIface) FullStats() (*configurer.Stats, error) { return nil, fmt.Errorf("not implemented") } diff --git a/client/internal/iface_common.go b/client/internal/iface_common.go index 690fdb7cc..98fe01912 100644 --- a/client/internal/iface_common.go +++ b/client/internal/iface_common.go @@ -28,6 +28,7 @@ type wgIfaceBase interface { UpdateAddr(newAddr string) error GetProxy() wgproxy.Proxy UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error + RemoveEndpointAddress(key string) error RemovePeer(peerKey string) error AddAllowedIP(peerKey string, allowedIP netip.Prefix) error RemoveAllowedIP(peerKey string, allowedIP netip.Prefix) error diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 8db9e58f4..ded9aa479 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -430,6 +430,9 @@ func (conn *Conn) onICEStateDisconnected() { } else { conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String()) conn.currentConnPriority = conntype.None + if err := conn.config.WgConfig.WgInterface.RemoveEndpointAddress(conn.config.WgConfig.RemoteKey); err != nil { + conn.Log.Errorf("failed to remove wg endpoint: %v", err) + } } changed := conn.statusICE.Get() != worker.StatusDisconnected @@ -523,6 +526,9 @@ func (conn *Conn) onRelayDisconnected() { if conn.currentConnPriority == conntype.Relay { conn.Log.Debugf("clean up WireGuard config") conn.currentConnPriority = conntype.None + if err := conn.config.WgConfig.WgInterface.RemoveEndpointAddress(conn.config.WgConfig.RemoteKey); err != nil { + conn.Log.Errorf("failed to remove wg endpoint: %v", err) + } } if conn.wgProxyRelay != nil { diff --git a/client/internal/peer/iface.go b/client/internal/peer/iface.go index 0bcc7a68e..678396e61 100644 --- a/client/internal/peer/iface.go +++ b/client/internal/peer/iface.go @@ -18,4 +18,5 @@ type WGIface interface { GetStats() (map[string]configurer.WGStats, error) GetProxy() wgproxy.Proxy Address() wgaddr.Address + RemoveEndpointAddress(key string) error } From 229c65ffa1d783c0f1b56070c0718897f69072b0 Mon Sep 17 00:00:00 2001 From: hakansa <43675540+hakansa@users.noreply.github.com> Date: Wed, 8 Oct 2025 17:42:15 +0700 Subject: [PATCH 2/4] Enhance showLoginURL to include connection status check and auto-close functionality (#4525) --- client/ui/client_ui.go | 42 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/client/ui/client_ui.go b/client/ui/client_ui.go index 25d7380a9..7c2000a9d 100644 --- a/client/ui/client_ui.go +++ b/client/ui/client_ui.go @@ -1354,7 +1354,13 @@ func (s *serviceClient) updateConfig() error { } // showLoginURL creates a borderless window styled like a pop-up in the top-right corner using s.wLoginURL. -func (s *serviceClient) showLoginURL() { +// It also starts a background goroutine that periodically checks if the client is already connected +// and closes the window if so. The goroutine can be cancelled by the returned CancelFunc, and it is +// also cancelled when the window is closed. +func (s *serviceClient) showLoginURL() context.CancelFunc { + + // create a cancellable context for the background check goroutine + ctx, cancel := context.WithCancel(s.ctx) resIcon := fyne.NewStaticResource("netbird.png", iconAbout) @@ -1363,6 +1369,8 @@ func (s *serviceClient) showLoginURL() { s.wLoginURL.Resize(fyne.NewSize(400, 200)) s.wLoginURL.SetIcon(resIcon) } + // ensure goroutine is cancelled when the window is closed + s.wLoginURL.SetOnClosed(func() { cancel() }) // add a description label label := widget.NewLabel("Your NetBird session has expired.\nPlease re-authenticate to continue using NetBird.") @@ -1443,7 +1451,39 @@ func (s *serviceClient) showLoginURL() { ) s.wLoginURL.SetContent(container.NewCenter(content)) + // start a goroutine to check connection status and close the window if connected + go func() { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + conn, err := s.getSrvClient(failFastTimeout) + if err != nil { + return + } + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + status, err := conn.Status(s.ctx, &proto.StatusRequest{}) + if err != nil { + continue + } + if status.Status == string(internal.StatusConnected) { + if s.wLoginURL != nil { + s.wLoginURL.Close() + } + return + } + } + } + }() + s.wLoginURL.Show() + + // return cancel func so callers can stop the background goroutine if desired + return cancel } func openURL(url string) error { From 768332820e1e6e211325332a1350b0b2a34429b3 Mon Sep 17 00:00:00 2001 From: hakansa <43675540+hakansa@users.noreply.github.com> Date: Wed, 8 Oct 2025 21:54:27 +0700 Subject: [PATCH 3/4] [client] Implement DNS query caching in DNSForwarder (#4574) implements DNS query caching in the DNSForwarder to improve performance and provide fallback responses when upstream DNS servers fail. The cache stores successful DNS query results and serves them when upstream resolution fails. - Added a new cache component to store DNS query results by domain and query type - Integrated cache storage after successful DNS resolutions - Enhanced error handling to serve cached responses as fallback when upstream DNS fails --- client/internal/dnsfwd/cache.go | 78 +++++++++++++++++ client/internal/dnsfwd/cache_test.go | 86 +++++++++++++++++++ client/internal/dnsfwd/forwarder.go | 104 +++++++++++++++++++---- client/internal/dnsfwd/forwarder_test.go | 89 +++++++++++++++++++ 4 files changed, 341 insertions(+), 16 deletions(-) create mode 100644 client/internal/dnsfwd/cache.go create mode 100644 client/internal/dnsfwd/cache_test.go diff --git a/client/internal/dnsfwd/cache.go b/client/internal/dnsfwd/cache.go new file mode 100644 index 000000000..43fe2d020 --- /dev/null +++ b/client/internal/dnsfwd/cache.go @@ -0,0 +1,78 @@ +package dnsfwd + +import ( + "net/netip" + "slices" + "strings" + "sync" + + "github.com/miekg/dns" +) + +type cache struct { + mu sync.RWMutex + records map[string]*cacheEntry +} + +type cacheEntry struct { + ip4Addrs []netip.Addr + ip6Addrs []netip.Addr +} + +func newCache() *cache { + return &cache{ + records: make(map[string]*cacheEntry), + } +} + +func (c *cache) get(domain string, reqType uint16) ([]netip.Addr, bool) { + c.mu.RLock() + defer c.mu.RUnlock() + + entry, exists := c.records[normalizeDomain(domain)] + if !exists { + return nil, false + } + + switch reqType { + case dns.TypeA: + return slices.Clone(entry.ip4Addrs), true + case dns.TypeAAAA: + return slices.Clone(entry.ip6Addrs), true + default: + return nil, false + } + +} + +func (c *cache) set(domain string, reqType uint16, addrs []netip.Addr) { + c.mu.Lock() + defer c.mu.Unlock() + norm := normalizeDomain(domain) + entry, exists := c.records[norm] + if !exists { + entry = &cacheEntry{} + c.records[norm] = entry + } + + switch reqType { + case dns.TypeA: + entry.ip4Addrs = slices.Clone(addrs) + case dns.TypeAAAA: + entry.ip6Addrs = slices.Clone(addrs) + } +} + +// unset removes cached entries for the given domain and request type. +func (c *cache) unset(domain string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.records, normalizeDomain(domain)) +} + +// normalizeDomain converts an input domain into a canonical form used as cache key: +// lowercase and fully-qualified (with trailing dot). +func normalizeDomain(domain string) string { + // dns.Fqdn ensures trailing dot; ToLower for consistent casing + return dns.Fqdn(strings.ToLower(domain)) +} diff --git a/client/internal/dnsfwd/cache_test.go b/client/internal/dnsfwd/cache_test.go new file mode 100644 index 000000000..c23f0f31d --- /dev/null +++ b/client/internal/dnsfwd/cache_test.go @@ -0,0 +1,86 @@ +package dnsfwd + +import ( + "net/netip" + "testing" +) + +func mustAddr(t *testing.T, s string) netip.Addr { + t.Helper() + a, err := netip.ParseAddr(s) + if err != nil { + t.Fatalf("parse addr %s: %v", s, err) + } + return a +} + +func TestCacheNormalization(t *testing.T) { + c := newCache() + + // Mixed case, without trailing dot + domainInput := "ExAmPlE.CoM" + ipv4 := []netip.Addr{mustAddr(t, "1.2.3.4")} + c.set(domainInput, 1 /* dns.TypeA */, ipv4) + + // Lookup with lower, with trailing dot + if got, ok := c.get("example.com.", 1); !ok || len(got) != 1 || got[0].String() != "1.2.3.4" { + t.Fatalf("expected cached IPv4 result via normalized key, got=%v ok=%v", got, ok) + } + + // Lookup with different casing again + if got, ok := c.get("EXAMPLE.COM", 1); !ok || len(got) != 1 || got[0].String() != "1.2.3.4" { + t.Fatalf("expected cached IPv4 result via different casing, got=%v ok=%v", got, ok) + } +} + +func TestCacheSeparateTypes(t *testing.T) { + c := newCache() + + domain := "test.local" + ipv4 := []netip.Addr{mustAddr(t, "10.0.0.1")} + ipv6 := []netip.Addr{mustAddr(t, "2001:db8::1")} + + c.set(domain, 1 /* A */, ipv4) + c.set(domain, 28 /* AAAA */, ipv6) + + got4, ok4 := c.get(domain, 1) + if !ok4 || len(got4) != 1 || got4[0] != ipv4[0] { + t.Fatalf("expected A record from cache, got=%v ok=%v", got4, ok4) + } + + got6, ok6 := c.get(domain, 28) + if !ok6 || len(got6) != 1 || got6[0] != ipv6[0] { + t.Fatalf("expected AAAA record from cache, got=%v ok=%v", got6, ok6) + } +} + +func TestCacheCloneOnGetAndSet(t *testing.T) { + c := newCache() + domain := "clone.test" + + src := []netip.Addr{mustAddr(t, "8.8.8.8")} + c.set(domain, 1, src) + + // Mutate source slice; cache should be unaffected + src[0] = mustAddr(t, "9.9.9.9") + + got, ok := c.get(domain, 1) + if !ok || len(got) != 1 || got[0].String() != "8.8.8.8" { + t.Fatalf("expected cached value to be independent of source slice, got=%v ok=%v", got, ok) + } + + // Mutate returned slice; internal cache should remain unchanged + got[0] = mustAddr(t, "4.4.4.4") + got2, ok2 := c.get(domain, 1) + if !ok2 || len(got2) != 1 || got2[0].String() != "8.8.8.8" { + t.Fatalf("expected returned slice to be a clone, got=%v ok=%v", got2, ok2) + } +} + +func TestCacheMiss(t *testing.T) { + c := newCache() + if got, ok := c.get("missing.example", 1); ok || got != nil { + t.Fatalf("expected cache miss, got=%v ok=%v", got, ok) + } +} + diff --git a/client/internal/dnsfwd/forwarder.go b/client/internal/dnsfwd/forwarder.go index d912919a1..7a262fa4c 100644 --- a/client/internal/dnsfwd/forwarder.go +++ b/client/internal/dnsfwd/forwarder.go @@ -46,6 +46,7 @@ type DNSForwarder struct { fwdEntries []*ForwarderEntry firewall firewaller resolver resolver + cache *cache } func NewDNSForwarder(listenAddress string, ttl uint32, firewall firewaller, statusRecorder *peer.Status) *DNSForwarder { @@ -56,6 +57,7 @@ func NewDNSForwarder(listenAddress string, ttl uint32, firewall firewaller, stat firewall: firewall, statusRecorder: statusRecorder, resolver: net.DefaultResolver, + cache: newCache(), } } @@ -103,10 +105,39 @@ func (f *DNSForwarder) UpdateDomains(entries []*ForwarderEntry) { f.mutex.Lock() defer f.mutex.Unlock() + // remove cache entries for domains that no longer appear + f.removeStaleCacheEntries(f.fwdEntries, entries) + f.fwdEntries = entries log.Debugf("Updated DNS forwarder with %d domains", len(entries)) } +// removeStaleCacheEntries unsets cache items for domains that were present +// in the old list but not present in the new list. +func (f *DNSForwarder) removeStaleCacheEntries(oldEntries, newEntries []*ForwarderEntry) { + if f.cache == nil { + return + } + + newSet := make(map[string]struct{}, len(newEntries)) + for _, e := range newEntries { + if e == nil { + continue + } + newSet[e.Domain.PunycodeString()] = struct{}{} + } + + for _, e := range oldEntries { + if e == nil { + continue + } + pattern := e.Domain.PunycodeString() + if _, ok := newSet[pattern]; !ok { + f.cache.unset(pattern) + } + } +} + func (f *DNSForwarder) Close(ctx context.Context) error { var result *multierror.Error @@ -171,6 +202,7 @@ func (f *DNSForwarder) handleDNSQuery(w dns.ResponseWriter, query *dns.Msg) *dns f.updateInternalState(ips, mostSpecificResId, matchingEntries) f.addIPsToResponse(resp, domain, ips) + f.cache.set(domain, question.Qtype, ips) return resp } @@ -282,29 +314,69 @@ func (f *DNSForwarder) setResponseCodeForNotFound(ctx context.Context, resp *dns resp.Rcode = dns.RcodeSuccess } -// handleDNSError processes DNS lookup errors and sends an appropriate error response -func (f *DNSForwarder) handleDNSError(ctx context.Context, w dns.ResponseWriter, question dns.Question, resp *dns.Msg, domain string, err error) { +// handleDNSError processes DNS lookup errors and sends an appropriate error response. +func (f *DNSForwarder) handleDNSError( + ctx context.Context, + w dns.ResponseWriter, + question dns.Question, + resp *dns.Msg, + domain string, + err error, +) { + // Default to SERVFAIL; override below when appropriate. + resp.Rcode = dns.RcodeServerFailure + + qType := question.Qtype + qTypeName := dns.TypeToString[qType] + + // Prefer typed DNS errors; fall back to generic logging otherwise. var dnsErr *net.DNSError - - switch { - case errors.As(err, &dnsErr): - resp.Rcode = dns.RcodeServerFailure - if dnsErr.IsNotFound { - f.setResponseCodeForNotFound(ctx, resp, domain, question.Qtype) + if !errors.As(err, &dnsErr) { + log.Warnf(errResolveFailed, domain, err) + if writeErr := w.WriteMsg(resp); writeErr != nil { + log.Errorf("failed to write failure DNS response: %v", writeErr) } + return + } - if dnsErr.Server != "" { - log.Warnf("failed to resolve query for type=%s domain=%s server=%s: %v", dns.TypeToString[question.Qtype], domain, dnsErr.Server, err) - } else { - log.Warnf(errResolveFailed, domain, err) + // NotFound: set NXDOMAIN / appropriate code via helper. + if dnsErr.IsNotFound { + f.setResponseCodeForNotFound(ctx, resp, domain, qType) + if writeErr := w.WriteMsg(resp); writeErr != nil { + log.Errorf("failed to write failure DNS response: %v", writeErr) } - default: - resp.Rcode = dns.RcodeServerFailure + f.cache.set(domain, question.Qtype, nil) + return + } + + // Upstream failed but we might have a cached answer—serve it if present. + if ips, ok := f.cache.get(domain, qType); ok { + if len(ips) > 0 { + log.Debugf("serving cached DNS response after upstream failure: domain=%s type=%s", domain, qTypeName) + f.addIPsToResponse(resp, domain, ips) + resp.Rcode = dns.RcodeSuccess + if writeErr := w.WriteMsg(resp); writeErr != nil { + log.Errorf("failed to write cached DNS response: %v", writeErr) + } + } else { // send NXDOMAIN / appropriate code if cache is empty + f.setResponseCodeForNotFound(ctx, resp, domain, qType) + if writeErr := w.WriteMsg(resp); writeErr != nil { + log.Errorf("failed to write failure DNS response: %v", writeErr) + } + } + return + } + + // No cache. Log with or without the server field for more context. + if dnsErr.Server != "" { + log.Warnf("failed to resolve: type=%s domain=%s server=%s: %v", qTypeName, domain, dnsErr.Server, err) + } else { log.Warnf(errResolveFailed, domain, err) } - if err := w.WriteMsg(resp); err != nil { - log.Errorf("failed to write failure DNS response: %v", err) + // Write final failure response. + if writeErr := w.WriteMsg(resp); writeErr != nil { + log.Errorf("failed to write failure DNS response: %v", writeErr) } } diff --git a/client/internal/dnsfwd/forwarder_test.go b/client/internal/dnsfwd/forwarder_test.go index 57085e19a..c1c95a2c1 100644 --- a/client/internal/dnsfwd/forwarder_test.go +++ b/client/internal/dnsfwd/forwarder_test.go @@ -648,6 +648,95 @@ func TestDNSForwarder_TCPTruncation(t *testing.T) { assert.LessOrEqual(t, writtenResp.Len(), dns.MinMsgSize, "Response should fit in minimum UDP size") } +// Ensures that when the first query succeeds and populates the cache, +// a subsequent upstream failure still returns a successful response from cache. +func TestDNSForwarder_ServeFromCacheOnUpstreamFailure(t *testing.T) { + mockResolver := &MockResolver{} + forwarder := NewDNSForwarder("127.0.0.1:0", 300, nil, &peer.Status{}) + forwarder.resolver = mockResolver + + d, err := domain.FromString("example.com") + require.NoError(t, err) + entries := []*ForwarderEntry{{Domain: d, ResID: "res-cache"}} + forwarder.UpdateDomains(entries) + + ip := netip.MustParseAddr("1.2.3.4") + + // First call resolves successfully and populates cache + mockResolver.On("LookupNetIP", mock.Anything, "ip4", dns.Fqdn("example.com")). + Return([]netip.Addr{ip}, nil).Once() + + // Second call fails upstream; forwarder should serve from cache + mockResolver.On("LookupNetIP", mock.Anything, "ip4", dns.Fqdn("example.com")). + Return([]netip.Addr{}, &net.DNSError{Err: "temporary failure"}).Once() + + // First query: populate cache + q1 := &dns.Msg{} + q1.SetQuestion(dns.Fqdn("example.com"), dns.TypeA) + w1 := &test.MockResponseWriter{} + resp1 := forwarder.handleDNSQuery(w1, q1) + require.NotNil(t, resp1) + require.Equal(t, dns.RcodeSuccess, resp1.Rcode) + require.Len(t, resp1.Answer, 1) + + // Second query: serve from cache after upstream failure + q2 := &dns.Msg{} + q2.SetQuestion(dns.Fqdn("example.com"), dns.TypeA) + var writtenResp *dns.Msg + w2 := &test.MockResponseWriter{WriteMsgFunc: func(m *dns.Msg) error { writtenResp = m; return nil }} + _ = forwarder.handleDNSQuery(w2, q2) + + require.NotNil(t, writtenResp, "expected response to be written") + require.Equal(t, dns.RcodeSuccess, writtenResp.Rcode) + require.Len(t, writtenResp.Answer, 1) + + mockResolver.AssertExpectations(t) +} + +// Verifies that cache normalization works across casing and trailing dot variations. +func TestDNSForwarder_CacheNormalizationCasingAndDot(t *testing.T) { + mockResolver := &MockResolver{} + forwarder := NewDNSForwarder("127.0.0.1:0", 300, nil, &peer.Status{}) + forwarder.resolver = mockResolver + + d, err := domain.FromString("ExAmPlE.CoM") + require.NoError(t, err) + entries := []*ForwarderEntry{{Domain: d, ResID: "res-norm"}} + forwarder.UpdateDomains(entries) + + ip := netip.MustParseAddr("9.8.7.6") + + // Initial resolution with mixed case to populate cache + mixedQuery := "ExAmPlE.CoM" + mockResolver.On("LookupNetIP", mock.Anything, "ip4", dns.Fqdn(strings.ToLower(mixedQuery))). + Return([]netip.Addr{ip}, nil).Once() + + q1 := &dns.Msg{} + q1.SetQuestion(mixedQuery+".", dns.TypeA) + w1 := &test.MockResponseWriter{} + resp1 := forwarder.handleDNSQuery(w1, q1) + require.NotNil(t, resp1) + require.Equal(t, dns.RcodeSuccess, resp1.Rcode) + require.Len(t, resp1.Answer, 1) + + // Subsequent query without dot and upper case should hit cache even if upstream fails + // Forwarder lowercases and uses the question name as-is (no trailing dot here) + mockResolver.On("LookupNetIP", mock.Anything, "ip4", strings.ToLower("EXAMPLE.COM")). + Return([]netip.Addr{}, &net.DNSError{Err: "temporary failure"}).Once() + + q2 := &dns.Msg{} + q2.SetQuestion("EXAMPLE.COM", dns.TypeA) + var writtenResp *dns.Msg + w2 := &test.MockResponseWriter{WriteMsgFunc: func(m *dns.Msg) error { writtenResp = m; return nil }} + _ = forwarder.handleDNSQuery(w2, q2) + + require.NotNil(t, writtenResp) + require.Equal(t, dns.RcodeSuccess, writtenResp.Rcode) + require.Len(t, writtenResp.Answer, 1) + + mockResolver.AssertExpectations(t) +} + func TestDNSForwarder_MultipleOverlappingPatterns(t *testing.T) { // Test complex overlapping pattern scenarios mockFirewall := &MockFirewall{} From 9021bb512bf7f88157a4c06aa1d9fba6ac80587f Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Wed, 8 Oct 2025 17:14:24 +0200 Subject: [PATCH 4/4] [client] Recreate agent when receive new session id (#4564) When an ICE agent connection was in progress, new offers were being ignored. This was incorrect logic because the remote agent could be restarted at any time. In this change, whenever a new session ID is received, the ongoing handshake is closed and a new one is started. --- client/internal/peer/conn.go | 4 +- client/internal/peer/conn_test.go | 16 +++--- client/internal/peer/guard/env.go | 20 ++++++++ client/internal/peer/guard/ice_monitor.go | 16 ++++-- client/internal/peer/guard/sr_watcher.go | 2 +- client/internal/peer/handshaker.go | 51 ++++++++++++------- client/internal/peer/handshaker_listener.go | 8 +-- .../internal/peer/handshaker_listener_test.go | 2 +- client/internal/peer/worker_ice.go | 47 +++++++++-------- 9 files changed, 107 insertions(+), 59 deletions(-) create mode 100644 client/internal/peer/guard/env.go diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index ded9aa479..68afe986a 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -171,9 +171,9 @@ func (conn *Conn) Open(engineCtx context.Context) error { conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay) - conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer) + conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer) if !isForceRelayed() { - conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer) + conn.handshaker.AddICEListener(conn.workerICE.OnNewOffer) } conn.guard = guard.NewGuard(conn.Log, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher) diff --git a/client/internal/peer/conn_test.go b/client/internal/peer/conn_test.go index c839ab147..6b47f95eb 100644 --- a/client/internal/peer/conn_test.go +++ b/client/internal/peer/conn_test.go @@ -79,10 +79,10 @@ func TestConn_OnRemoteOffer(t *testing.T) { return } - onNewOffeChan := make(chan struct{}) + onNewOfferChan := make(chan struct{}) - conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) { - onNewOffeChan <- struct{}{} + conn.handshaker.AddRelayListener(func(remoteOfferAnswer *OfferAnswer) { + onNewOfferChan <- struct{}{} }) conn.OnRemoteOffer(OfferAnswer{ @@ -98,7 +98,7 @@ func TestConn_OnRemoteOffer(t *testing.T) { defer cancel() select { - case <-onNewOffeChan: + case <-onNewOfferChan: // success case <-ctx.Done(): t.Error("expected to receive a new offer notification, but timed out") @@ -118,10 +118,10 @@ func TestConn_OnRemoteAnswer(t *testing.T) { return } - onNewOffeChan := make(chan struct{}) + onNewOfferChan := make(chan struct{}) - conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) { - onNewOffeChan <- struct{}{} + conn.handshaker.AddRelayListener(func(remoteOfferAnswer *OfferAnswer) { + onNewOfferChan <- struct{}{} }) conn.OnRemoteAnswer(OfferAnswer{ @@ -136,7 +136,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) { defer cancel() select { - case <-onNewOffeChan: + case <-onNewOfferChan: // success case <-ctx.Done(): t.Error("expected to receive a new offer notification, but timed out") diff --git a/client/internal/peer/guard/env.go b/client/internal/peer/guard/env.go new file mode 100644 index 000000000..1ea2d21be --- /dev/null +++ b/client/internal/peer/guard/env.go @@ -0,0 +1,20 @@ +package guard + +import ( + "os" + "strconv" + "time" +) + +const ( + envICEMonitorPeriod = "NB_ICE_MONITOR_PERIOD" +) + +func GetICEMonitorPeriod() time.Duration { + if envVal := os.Getenv(envICEMonitorPeriod); envVal != "" { + if seconds, err := strconv.Atoi(envVal); err == nil && seconds > 0 { + return time.Duration(seconds) * time.Second + } + } + return defaultCandidatesMonitorPeriod +} diff --git a/client/internal/peer/guard/ice_monitor.go b/client/internal/peer/guard/ice_monitor.go index 09cf9ae63..0f22ee7b0 100644 --- a/client/internal/peer/guard/ice_monitor.go +++ b/client/internal/peer/guard/ice_monitor.go @@ -16,8 +16,8 @@ import ( ) const ( - candidatesMonitorPeriod = 5 * time.Minute - candidateGatheringTimeout = 5 * time.Second + defaultCandidatesMonitorPeriod = 5 * time.Minute + candidateGatheringTimeout = 5 * time.Second ) type ICEMonitor struct { @@ -25,16 +25,19 @@ type ICEMonitor struct { iFaceDiscover stdnet.ExternalIFaceDiscover iceConfig icemaker.Config + tickerPeriod time.Duration currentCandidatesAddress []string candidatesMu sync.Mutex } -func NewICEMonitor(iFaceDiscover stdnet.ExternalIFaceDiscover, config icemaker.Config) *ICEMonitor { +func NewICEMonitor(iFaceDiscover stdnet.ExternalIFaceDiscover, config icemaker.Config, period time.Duration) *ICEMonitor { + log.Debugf("prepare ICE monitor with period: %s", period) cm := &ICEMonitor{ ReconnectCh: make(chan struct{}, 1), iFaceDiscover: iFaceDiscover, iceConfig: config, + tickerPeriod: period, } return cm } @@ -46,7 +49,12 @@ func (cm *ICEMonitor) Start(ctx context.Context, onChanged func()) { return } - ticker := time.NewTicker(candidatesMonitorPeriod) + // Initial check to populate the candidates for later comparison + if _, err := cm.handleCandidateTick(ctx, ufrag, pwd); err != nil { + log.Warnf("Failed to check initial ICE candidates: %v", err) + } + + ticker := time.NewTicker(cm.tickerPeriod) defer ticker.Stop() for { diff --git a/client/internal/peer/guard/sr_watcher.go b/client/internal/peer/guard/sr_watcher.go index 90e45426f..686430752 100644 --- a/client/internal/peer/guard/sr_watcher.go +++ b/client/internal/peer/guard/sr_watcher.go @@ -51,7 +51,7 @@ func (w *SRWatcher) Start() { ctx, cancel := context.WithCancel(context.Background()) w.cancelIceMonitor = cancel - iceMonitor := NewICEMonitor(w.iFaceDiscover, w.iceConfig) + iceMonitor := NewICEMonitor(w.iFaceDiscover, w.iceConfig, GetICEMonitorPeriod()) go iceMonitor.Start(ctx, w.onICEChanged) w.signalClient.SetOnReconnectedListener(w.onReconnected) w.relayManager.SetOnReconnectedListener(w.onReconnected) diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index 42eaea683..aff26f847 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -44,13 +44,19 @@ type OfferAnswer struct { } type Handshaker struct { - mu sync.Mutex - log *log.Entry - config ConnConfig - signaler *Signaler - ice *WorkerICE - relay *WorkerRelay - onNewOfferListeners []*OfferListener + mu sync.Mutex + log *log.Entry + config ConnConfig + signaler *Signaler + ice *WorkerICE + relay *WorkerRelay + // relayListener is not blocking because the listener is using a goroutine to process the messages + // and it will only keep the latest message if multiple offers are received in a short time + // this is to avoid blocking the handshaker if the listener is doing some heavy processing + // and also to avoid processing old offers if multiple offers are received in a short time + // the listener will always process the latest offer + relayListener *AsyncOfferListener + iceListener func(remoteOfferAnswer *OfferAnswer) // remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection remoteOffersCh chan OfferAnswer @@ -70,28 +76,39 @@ func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *W } } -func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) { - l := NewOfferListener(offer) - h.onNewOfferListeners = append(h.onNewOfferListeners, l) +func (h *Handshaker) AddRelayListener(offer func(remoteOfferAnswer *OfferAnswer)) { + h.relayListener = NewAsyncOfferListener(offer) +} + +func (h *Handshaker) AddICEListener(offer func(remoteOfferAnswer *OfferAnswer)) { + h.iceListener = offer } func (h *Handshaker) Listen(ctx context.Context) { for { select { case remoteOfferAnswer := <-h.remoteOffersCh: - // received confirmation from the remote peer -> ready to proceed + h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString()) + if h.relayListener != nil { + h.relayListener.Notify(&remoteOfferAnswer) + } + + if h.iceListener != nil { + h.iceListener(&remoteOfferAnswer) + } + if err := h.sendAnswer(); err != nil { h.log.Errorf("failed to send remote offer confirmation: %s", err) continue } - for _, listener := range h.onNewOfferListeners { - listener.Notify(&remoteOfferAnswer) - } - h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString()) case remoteOfferAnswer := <-h.remoteAnswerCh: h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString()) - for _, listener := range h.onNewOfferListeners { - listener.Notify(&remoteOfferAnswer) + if h.relayListener != nil { + h.relayListener.Notify(&remoteOfferAnswer) + } + + if h.iceListener != nil { + h.iceListener(&remoteOfferAnswer) } case <-ctx.Done(): h.log.Infof("stop listening for remote offers and answers") diff --git a/client/internal/peer/handshaker_listener.go b/client/internal/peer/handshaker_listener.go index e2d3f3f38..772e2777f 100644 --- a/client/internal/peer/handshaker_listener.go +++ b/client/internal/peer/handshaker_listener.go @@ -13,20 +13,20 @@ func (oa *OfferAnswer) SessionIDString() string { return oa.SessionID.String() } -type OfferListener struct { +type AsyncOfferListener struct { fn callbackFunc running bool latest *OfferAnswer mu sync.Mutex } -func NewOfferListener(fn callbackFunc) *OfferListener { - return &OfferListener{ +func NewAsyncOfferListener(fn callbackFunc) *AsyncOfferListener { + return &AsyncOfferListener{ fn: fn, } } -func (o *OfferListener) Notify(remoteOfferAnswer *OfferAnswer) { +func (o *AsyncOfferListener) Notify(remoteOfferAnswer *OfferAnswer) { o.mu.Lock() defer o.mu.Unlock() diff --git a/client/internal/peer/handshaker_listener_test.go b/client/internal/peer/handshaker_listener_test.go index 8363741a5..1a7156d10 100644 --- a/client/internal/peer/handshaker_listener_test.go +++ b/client/internal/peer/handshaker_listener_test.go @@ -14,7 +14,7 @@ func Test_newOfferListener(t *testing.T) { runChan <- struct{}{} } - hl := NewOfferListener(longRunningFn) + hl := NewAsyncOfferListener(longRunningFn) hl.Notify(dummyOfferAnswer) hl.Notify(dummyOfferAnswer) diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index eb886a4d3..3675f0157 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -92,23 +92,16 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn * func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.log.Debugf("OnNewOffer for ICE, serial: %s", remoteOfferAnswer.SessionIDString()) w.muxAgent.Lock() + defer w.muxAgent.Unlock() - if w.agentConnecting { - w.log.Debugf("agent connection is in progress, skipping the offer") - w.muxAgent.Unlock() - return - } - - if w.agent != nil { + if w.agent != nil || w.agentConnecting { // backward compatibility with old clients that do not send session ID if remoteOfferAnswer.SessionID == nil { w.log.Debugf("agent already exists, skipping the offer") - w.muxAgent.Unlock() return } if w.remoteSessionID == *remoteOfferAnswer.SessionID { w.log.Debugf("agent already exists and session ID matches, skipping the offer: %s", remoteOfferAnswer.SessionIDString()) - w.muxAgent.Unlock() return } w.log.Debugf("agent already exists, recreate the connection") @@ -116,6 +109,12 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { if err := w.agent.Close(); err != nil { w.log.Warnf("failed to close ICE agent: %s", err) } + + sessionID, err := NewICESessionID() + if err != nil { + w.log.Errorf("failed to create new session ID: %s", err) + } + w.sessionID = sessionID w.agent = nil } @@ -126,18 +125,23 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { preferredCandidateTypes = icemaker.CandidateTypes() } - w.log.Debugf("recreate ICE agent") + if remoteOfferAnswer.SessionID != nil { + w.log.Debugf("recreate ICE agent: %s / %s", w.sessionID, *remoteOfferAnswer.SessionID) + } dialerCtx, dialerCancel := context.WithCancel(w.ctx) agent, err := w.reCreateAgent(dialerCancel, preferredCandidateTypes) if err != nil { w.log.Errorf("failed to recreate ICE Agent: %s", err) - w.muxAgent.Unlock() return } w.agent = agent w.agentDialerCancel = dialerCancel w.agentConnecting = true - w.muxAgent.Unlock() + if remoteOfferAnswer.SessionID != nil { + w.remoteSessionID = *remoteOfferAnswer.SessionID + } else { + w.remoteSessionID = "" + } go w.connect(dialerCtx, agent, remoteOfferAnswer) } @@ -293,9 +297,6 @@ func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent w.muxAgent.Lock() w.agentConnecting = false w.lastSuccess = time.Now() - if remoteOfferAnswer.SessionID != nil { - w.remoteSessionID = *remoteOfferAnswer.SessionID - } w.muxAgent.Unlock() // todo: the potential problem is a race between the onConnectionStateChange @@ -309,16 +310,17 @@ func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.C } w.muxAgent.Lock() - // todo review does it make sense to generate new session ID all the time when w.agent==agent - sessionID, err := NewICESessionID() - if err != nil { - w.log.Errorf("failed to create new session ID: %s", err) - } - w.sessionID = sessionID if w.agent == agent { + // consider to remove from here and move to the OnNewOffer + sessionID, err := NewICESessionID() + if err != nil { + w.log.Errorf("failed to create new session ID: %s", err) + } + w.sessionID = sessionID w.agent = nil w.agentConnecting = false + w.remoteSessionID = "" } w.muxAgent.Unlock() } @@ -395,11 +397,12 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia // ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to // notify the conn.onICEStateDisconnected changes to update the current used priority + w.closeAgent(agent, dialerCancel) + if w.lastKnownState == ice.ConnectionStateConnected { w.lastKnownState = ice.ConnectionStateDisconnected w.conn.onICEStateDisconnected() } - w.closeAgent(agent, dialerCancel) default: return }