From 50b58a682868851a3666a99662aeb00d7fbb3846 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Mon, 4 May 2026 18:40:25 +0900 Subject: [PATCH 1/4] [client, relay] Advertise relay server IP via signal for foreign-relay fallback dial (#6004) --- client/internal/engine.go | 18 ++ client/internal/peer/handshaker.go | 8 +- client/internal/peer/signaler.go | 20 +- client/internal/peer/status.go | 2 +- client/internal/peer/worker_relay.go | 11 +- shared/relay/client/client.go | 116 +++++++- shared/relay/client/client_serverip_test.go | 280 ++++++++++++++++++ shared/relay/client/dialer/quic/quic.go | 15 +- shared/relay/client/dialer/race_dialer.go | 17 +- .../relay/client/dialer/race_dialer_test.go | 2 +- shared/relay/client/dialer/ws/conn.go | 16 +- .../client/dialer/ws/dialopts_generic.go | 10 +- shared/relay/client/dialer/ws/dialopts_js.go | 10 +- shared/relay/client/dialer/ws/ws.go | 21 +- shared/relay/client/manager.go | 37 ++- shared/relay/client/manager_serverip_test.go | 144 +++++++++ shared/relay/client/manager_test.go | 19 +- shared/signal/client/client.go | 69 +++-- shared/signal/proto/signalexchange.pb.go | 88 +++--- shared/signal/proto/signalexchange.proto | 10 +- 20 files changed, 789 insertions(+), 124 deletions(-) create mode 100644 shared/relay/client/client_serverip_test.go create mode 100644 shared/relay/client/manager_serverip_test.go diff --git a/client/internal/engine.go b/client/internal/engine.go index 8c9553e52..7f19e2d28 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -2454,6 +2454,8 @@ func convertToOfferAnswer(msg *sProto.Message) (*peer.OfferAnswer, error) { } } + relayIP := decodeRelayIP(msg.GetBody().GetRelayServerIP()) + offerAnswer := peer.OfferAnswer{ IceCredentials: peer.IceCredentials{ UFrag: remoteCred.UFrag, @@ -2464,7 +2466,23 @@ func convertToOfferAnswer(msg *sProto.Message) (*peer.OfferAnswer, error) { RosenpassPubKey: rosenpassPubKey, RosenpassAddr: rosenpassAddr, RelaySrvAddress: msg.GetBody().GetRelayServerAddress(), + RelaySrvIP: relayIP, SessionID: sessionID, } return &offerAnswer, nil } + +// decodeRelayIP decodes the proto relayServerIP bytes (4 or 16) into a +// netip.Addr. Returns the zero value for empty input and logs a warning +// for malformed payloads. +func decodeRelayIP(b []byte) netip.Addr { + if len(b) == 0 { + return netip.Addr{} + } + ip, ok := netip.AddrFromSlice(b) + if !ok { + log.Warnf("invalid relayServerIP in signal message (%d bytes), ignoring", len(b)) + return netip.Addr{} + } + return ip.Unmap() +} diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index 741dfce60..1d44096b6 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -3,6 +3,7 @@ package peer import ( "context" "errors" + "net/netip" "sync" "sync/atomic" @@ -40,6 +41,10 @@ type OfferAnswer struct { // relay server address RelaySrvAddress string + // RelaySrvIP is the IP the remote peer is connected to on its + // relay server. Used as a dial target if DNS for RelaySrvAddress + // fails. Zero value if the peer did not advertise an IP. + RelaySrvIP netip.Addr // SessionID is the unique identifier of the session, used to discard old messages SessionID *ICESessionID } @@ -217,8 +222,9 @@ func (h *Handshaker) buildOfferAnswer() OfferAnswer { answer.SessionID = &sid } - if addr, err := h.relay.RelayInstanceAddress(); err == nil { + if addr, ip, err := h.relay.RelayInstanceAddress(); err == nil { answer.RelaySrvAddress = addr + answer.RelaySrvIP = ip } return answer diff --git a/client/internal/peer/signaler.go b/client/internal/peer/signaler.go index f6eb87cca..5e437d96b 100644 --- a/client/internal/peer/signaler.go +++ b/client/internal/peer/signaler.go @@ -54,19 +54,19 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string, log.Warnf("failed to get session ID bytes: %v", err) } } - msg, err := signal.MarshalCredential( - s.wgPrivateKey, - offerAnswer.WgListenPort, - remoteKey, - &signal.Credential{ + msg, err := signal.MarshalCredential(s.wgPrivateKey, remoteKey, signal.CredentialPayload{ + Type: bodyType, + WgListenPort: offerAnswer.WgListenPort, + Credential: &signal.Credential{ UFrag: offerAnswer.IceCredentials.UFrag, Pwd: offerAnswer.IceCredentials.Pwd, }, - bodyType, - offerAnswer.RosenpassPubKey, - offerAnswer.RosenpassAddr, - offerAnswer.RelaySrvAddress, - sessionIDBytes) + RosenpassPubKey: offerAnswer.RosenpassPubKey, + RosenpassAddr: offerAnswer.RosenpassAddr, + RelaySrvAddress: offerAnswer.RelaySrvAddress, + RelaySrvIP: offerAnswer.RelaySrvIP, + SessionID: sessionIDBytes, + }) if err != nil { return err } diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index abedc208e..7bd19b0e1 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -919,7 +919,7 @@ func (d *Status) GetRelayStates() []relay.ProbeResult { // if the server connection is not established then we will use the general address // in case of connection we will use the instance specific address - instanceAddr, err := d.relayMgr.RelayInstanceAddress() + instanceAddr, _, err := d.relayMgr.RelayInstanceAddress() if err != nil { // TODO add their status for _, r := range d.relayMgr.ServerURLs() { diff --git a/client/internal/peer/worker_relay.go b/client/internal/peer/worker_relay.go index 06309fbaf..0402992c9 100644 --- a/client/internal/peer/worker_relay.go +++ b/client/internal/peer/worker_relay.go @@ -4,6 +4,7 @@ import ( "context" "errors" "net" + "net/netip" "sync" "sync/atomic" @@ -53,15 +54,19 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.relaySupportedOnRemotePeer.Store(true) // the relayManager will return with error in case if the connection has lost with relay server - currentRelayAddress, err := w.relayManager.RelayInstanceAddress() + currentRelayAddress, _, err := w.relayManager.RelayInstanceAddress() if err != nil { w.log.Errorf("failed to handle new offer: %s", err) return } srv := w.preferredRelayServer(currentRelayAddress, remoteOfferAnswer.RelaySrvAddress) + var serverIP netip.Addr + if srv == remoteOfferAnswer.RelaySrvAddress { + serverIP = remoteOfferAnswer.RelaySrvIP + } - relayedConn, err := w.relayManager.OpenConn(w.peerCtx, srv, w.config.Key) + relayedConn, err := w.relayManager.OpenConn(w.peerCtx, srv, w.config.Key, serverIP) if err != nil { if errors.Is(err, relayClient.ErrConnAlreadyExists) { w.log.Debugf("handled offer by reusing existing relay connection") @@ -90,7 +95,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { }) } -func (w *WorkerRelay) RelayInstanceAddress() (string, error) { +func (w *WorkerRelay) RelayInstanceAddress() (string, netip.Addr, error) { return w.relayManager.RelayInstanceAddress() } diff --git a/shared/relay/client/client.go b/shared/relay/client/client.go index b10b05617..1800bddb2 100644 --- a/shared/relay/client/client.go +++ b/shared/relay/client/client.go @@ -2,8 +2,12 @@ package client import ( "context" + "errors" "fmt" "net" + "net/netip" + "net/url" + "strings" "sync" "time" @@ -146,6 +150,7 @@ func (cc *connContainer) close() { type Client struct { log *log.Entry connectionURL string + serverIP netip.Addr authTokenStore *auth.TokenStore hashedID messages.PeerID @@ -170,13 +175,22 @@ type Client struct { } // NewClient creates a new client for the relay server. The client is not connected to the server until the Connect +// is called. func NewClient(serverURL string, authTokenStore *auth.TokenStore, peerID string, mtu uint16) *Client { + return NewClientWithServerIP(serverURL, netip.Addr{}, authTokenStore, peerID, mtu) +} + +// NewClientWithServerIP creates a new client for the relay server with a known server IP. serverIP, when valid, is +// dialed directly first; the FQDN is only attempted if the IP-based dial fails. TLS verification still uses the +// FQDN from serverURL via SNI. +func NewClientWithServerIP(serverURL string, serverIP netip.Addr, authTokenStore *auth.TokenStore, peerID string, mtu uint16) *Client { hashedID := messages.HashID(peerID) relayLog := log.WithFields(log.Fields{"relay": serverURL}) c := &Client{ log: relayLog, connectionURL: serverURL, + serverIP: serverIP, authTokenStore: authTokenStore, hashedID: hashedID, mtu: mtu, @@ -304,6 +318,23 @@ func (c *Client) ServerInstanceURL() (string, error) { return c.instanceURL.String(), nil } +// ConnectedIP returns the IP address of the live relay-server connection, +// extracted from the underlying socket's RemoteAddr. Zero value if not +// connected or if the address is not an IP literal. +func (c *Client) ConnectedIP() netip.Addr { + c.mu.Lock() + conn := c.relayConn + c.mu.Unlock() + if conn == nil { + return netip.Addr{} + } + addr := conn.RemoteAddr() + if addr == nil { + return netip.Addr{} + } + return extractIPLiteral(addr.String()) +} + // SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed. func (c *Client) SetOnDisconnectListener(fn func(string)) { c.listenerMutex.Lock() @@ -332,10 +363,23 @@ func (c *Client) Close() error { func (c *Client) connect(ctx context.Context) (*RelayAddr, error) { dialers := c.getDialers() - rd := dialer.NewRaceDial(c.log, dialer.DefaultConnectionTimeout, c.connectionURL, dialers...) - conn, err := rd.Dial(ctx) - if err != nil { - return nil, err + var conn net.Conn + if c.serverIP.IsValid() { + var err error + conn, err = c.dialRaceDirect(ctx, dialers) + if err != nil { + c.log.Infof("dial via server IP %s failed, falling back to FQDN: %v", c.serverIP, err) + conn = nil + } + } + + if conn == nil { + rd := dialer.NewRaceDial(c.log, dialer.DefaultConnectionTimeout, c.connectionURL, dialers...) + var err error + conn, err = rd.Dial(ctx) + if err != nil { + return nil, fmt.Errorf("dial via FQDN: %w", err) + } } c.relayConn = conn @@ -351,6 +395,52 @@ func (c *Client) connect(ctx context.Context) (*RelayAddr, error) { return instanceURL, nil } +// dialRaceDirect dials c.serverIP, preserving the original FQDN as the TLS ServerName for SNI. +func (c *Client) dialRaceDirect(ctx context.Context, dialers []dialer.DialeFn) (net.Conn, error) { + directURL, serverName, err := substituteHost(c.connectionURL, c.serverIP) + if err != nil { + return nil, fmt.Errorf("substitute host: %w", err) + } + + c.log.Debugf("dialing via server IP %s (SNI=%s)", c.serverIP, serverName) + + rd := dialer.NewRaceDial(c.log, dialer.DefaultConnectionTimeout, directURL, dialers...). + WithServerName(serverName) + return rd.Dial(ctx) +} + +// substituteHost replaces the host portion of a rel/rels URL with ip, +// preserving the scheme and port. Returns the rewritten URL and the +// original host to use as the TLS ServerName, or empty if the original +// host is itself an IP literal (SNI requires a DNS name). +func substituteHost(serverURL string, ip netip.Addr) (string, string, error) { + u, err := url.Parse(serverURL) + if err != nil { + return "", "", fmt.Errorf("parse %q: %w", serverURL, err) + } + if u.Scheme == "" || u.Host == "" { + return "", "", fmt.Errorf("invalid relay URL %q", serverURL) + } + if !ip.IsValid() { + return "", "", errors.New("invalid server IP") + } + origHost := u.Hostname() + if _, err := netip.ParseAddr(origHost); err == nil { + origHost = "" + } + ip = ip.Unmap() + newHost := ip.String() + if ip.Is6() { + newHost = "[" + newHost + "]" + } + if port := u.Port(); port != "" { + u.Host = newHost + ":" + port + } else { + u.Host = newHost + } + return u.String(), origHost, nil +} + func (c *Client) handShake(ctx context.Context) (*RelayAddr, error) { msg, err := messages.MarshalAuthMsg(c.hashedID, c.authTokenStore.TokenBinary()) if err != nil { @@ -716,3 +806,21 @@ func (c *Client) handlePeersWentOfflineMsg(buf []byte) { } c.stateSubscription.OnPeersWentOffline(peersID) } + +// extractIPLiteral returns the IP from address forms produced by the relay +// dialers (URL or host:port). Zero value if the host is not an IP. +func extractIPLiteral(s string) netip.Addr { + if u, err := url.Parse(s); err == nil && u.Host != "" { + s = u.Host + } + host, _, err := net.SplitHostPort(s) + if err != nil { + host = s + } + host = strings.Trim(host, "[]") + ip, err := netip.ParseAddr(host) + if err != nil { + return netip.Addr{} + } + return ip.Unmap() +} diff --git a/shared/relay/client/client_serverip_test.go b/shared/relay/client/client_serverip_test.go new file mode 100644 index 000000000..7e699e37d --- /dev/null +++ b/shared/relay/client/client_serverip_test.go @@ -0,0 +1,280 @@ +package client + +import ( + "context" + "fmt" + "net" + "net/netip" + "testing" + "time" + + "go.opentelemetry.io/otel" + + "github.com/netbirdio/netbird/client/iface" + "github.com/netbirdio/netbird/relay/server" + "github.com/netbirdio/netbird/shared/relay/auth/allow" +) + +// TestClient_ServerIPRecoversFromUnresolvableFQDN verifies that when the +// primary FQDN-based dial fails (unresolvable .invalid host), Connect +// recovers via the server IP and SNI still uses the FQDN. +func TestClient_ServerIPRecoversFromUnresolvableFQDN(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + listenAddr, port := freeAddr(t) + srvCfg := server.Config{ + Meter: otel.Meter(""), + ExposedAddress: fmt.Sprintf("rel://test-unresolvable-host.invalid:%d", port), + TLSSupport: false, + AuthValidator: &allow.Auth{}, + } + srv, err := server.NewServer(srvCfg) + if err != nil { + t.Fatalf("create server: %s", err) + } + + errChan := make(chan error, 1) + go func() { + if err := srv.Listen(server.ListenerConfig{Address: listenAddr}); err != nil { + errChan <- err + } + }() + t.Cleanup(func() { + if err := srv.Shutdown(context.Background()); err != nil { + t.Errorf("shutdown server: %s", err) + } + }) + if err := waitForServerToStart(errChan); err != nil { + t.Fatalf("server failed to start: %s", err) + } + + t.Run("no server IP, primary fails", func(t *testing.T) { + c := NewClient(srvCfg.ExposedAddress, hmacTokenStore, "alice-noip", iface.DefaultMTU) + err := c.Connect(ctx) + if err == nil { + _ = c.Close() + t.Fatalf("expected connect to fail without server IP, got nil") + } + }) + + t.Run("server IP recovers", func(t *testing.T) { + c := NewClientWithServerIP(srvCfg.ExposedAddress, netip.MustParseAddr("127.0.0.1"), hmacTokenStore, "alice-with-ip", iface.DefaultMTU) + if err := c.Connect(ctx); err != nil { + t.Fatalf("connect with server IP: %s", err) + } + t.Cleanup(func() { _ = c.Close() }) + + if !c.Ready() { + t.Fatalf("client not ready after connect") + } + if got := c.ConnectedIP(); got.String() != "127.0.0.1" { + t.Fatalf("ConnectedIP = %q, want 127.0.0.1", got) + } + }) +} + +// TestClient_ConnectedIPAfterFQDNDial verifies ConnectedIP returns the +// resolved IP after a successful FQDN-based dial. The underlying socket's +// RemoteAddr must be exposed through the dialer wrappers; if it returns +// the dial-time URL instead, ConnectedIP returns empty and the dial +// IP we advertise to peers is empty too. +func TestClient_ConnectedIPAfterFQDNDial(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + listenAddr, port := freeAddr(t) + srvCfg := server.Config{ + Meter: otel.Meter(""), + ExposedAddress: fmt.Sprintf("rel://localhost:%d", port), + TLSSupport: false, + AuthValidator: &allow.Auth{}, + } + srv, err := server.NewServer(srvCfg) + if err != nil { + t.Fatalf("create server: %s", err) + } + errChan := make(chan error, 1) + go func() { + if err := srv.Listen(server.ListenerConfig{Address: listenAddr}); err != nil { + errChan <- err + } + }() + t.Cleanup(func() { _ = srv.Shutdown(context.Background()) }) + if err := waitForServerToStart(errChan); err != nil { + t.Fatalf("server failed to start: %s", err) + } + + c := NewClient(srvCfg.ExposedAddress, hmacTokenStore, "alice-fqdn", iface.DefaultMTU) + if err := c.Connect(ctx); err != nil { + t.Fatalf("connect: %s", err) + } + t.Cleanup(func() { _ = c.Close() }) + + got := c.ConnectedIP().String() + if got != "127.0.0.1" && got != "::1" { + t.Fatalf("ConnectedIP after FQDN dial = %q, want 127.0.0.1 or ::1", got) + } +} + +func TestSubstituteHost(t *testing.T) { + tests := []struct { + name string + serverURL string + ip string + wantURL string + wantServerName string + wantErr bool + }{ + { + name: "rels with port", + serverURL: "rels://relay.netbird.io:443", + ip: "10.0.0.5", + wantURL: "rels://10.0.0.5:443", + wantServerName: "relay.netbird.io", + }, + { + name: "rel with port", + serverURL: "rel://relay.example.com:80", + ip: "192.0.2.1", + wantURL: "rel://192.0.2.1:80", + wantServerName: "relay.example.com", + }, + { + name: "ipv6 server IP bracketed", + serverURL: "rels://relay.example.com:443", + ip: "2001:db8::1", + wantURL: "rels://[2001:db8::1]:443", + wantServerName: "relay.example.com", + }, + { + name: "no port", + serverURL: "rels://relay.example.com", + ip: "10.0.0.5", + wantURL: "rels://10.0.0.5", + wantServerName: "relay.example.com", + }, + { + name: "ipv6 server with port returns empty SNI", + serverURL: "rels://[2001:db8::5]:443", + ip: "10.0.0.5", + wantURL: "rels://10.0.0.5:443", + wantServerName: "", + }, + { + name: "ipv4 server with port returns empty SNI", + serverURL: "rels://10.0.0.5:443", + ip: "10.0.0.6", + wantURL: "rels://10.0.0.6:443", + wantServerName: "", + }, + { + name: "ipv6 server IP no port", + serverURL: "rels://relay.example.com", + ip: "2001:db8::1", + wantURL: "rels://[2001:db8::1]", + wantServerName: "relay.example.com", + }, + { + name: "missing scheme", + serverURL: "relay.example.com:443", + ip: "10.0.0.5", + wantErr: true, + }, + { + name: "empty", + serverURL: "", + ip: "10.0.0.5", + wantErr: true, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var ip netip.Addr + if tt.ip != "" { + ip = netip.MustParseAddr(tt.ip) + } + gotURL, gotName, err := substituteHost(tt.serverURL, ip) + if tt.wantErr { + if err == nil { + t.Fatalf("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected error: %s", err) + } + if gotURL != tt.wantURL { + t.Errorf("URL = %q, want %q", gotURL, tt.wantURL) + } + if gotName != tt.wantServerName { + t.Errorf("ServerName = %q, want %q", gotName, tt.wantServerName) + } + }) + } +} + +func TestClient_ConnectedIPEmptyWhenNotConnected(t *testing.T) { + c := NewClient("rel://example.invalid:80", hmacTokenStore, "x", iface.DefaultMTU) + if got := c.ConnectedIP(); got.IsValid() { + t.Fatalf("ConnectedIP on disconnected client = %q, want zero", got) + } +} + +// staticAddr is a net.Addr that returns a fixed string. Used to verify +// ConnectedIP parses RemoteAddr correctly. +type staticAddr struct{ s string } + +func (a staticAddr) Network() string { return "tcp" } +func (a staticAddr) String() string { return a.s } + +type stubConn struct { + net.Conn + remote net.Addr +} + +func (s stubConn) RemoteAddr() net.Addr { return s.remote } + +func TestClient_ConnectedIPParsesRemoteAddr(t *testing.T) { + tests := []struct { + name string + s string + want string + }{ + {"hostport ipv4", "127.0.0.1:50301", "127.0.0.1"}, + {"hostport ipv6 bracketed", "[::1]:50301", "::1"}, + {"url with ipv4", "rel://127.0.0.1:50301", "127.0.0.1"}, + {"url with ipv6", "rels://[2001:db8::1]:443", "2001:db8::1"}, + {"fqdn url returns empty", "rel://relay.example.com:50301", ""}, + {"fqdn hostport returns empty", "relay.example.com:50301", ""}, + {"plain ipv4 no port", "10.0.0.1", "10.0.0.1"}, + {"empty", "", ""}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + c := &Client{relayConn: stubConn{remote: staticAddr{s: tt.s}}} + got := c.ConnectedIP() + var gotStr string + if got.IsValid() { + gotStr = got.String() + } + if gotStr != tt.want { + t.Errorf("ConnectedIP(%q) = %q, want %q", tt.s, gotStr, tt.want) + } + }) + } +} + +// freeAddr returns a 127.0.0.1 address with an OS-assigned port. The +// listener is closed before returning, so the port is briefly free for +// the caller to bind. Avoids hardcoded ports that can collide. +func freeAddr(t *testing.T) (string, int) { + t.Helper() + l, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + t.Fatalf("get free port: %s", err) + } + addr := l.Addr().(*net.TCPAddr) + _ = l.Close() + return addr.String(), addr.Port +} diff --git a/shared/relay/client/dialer/quic/quic.go b/shared/relay/client/dialer/quic/quic.go index 2d7b00a80..602803b19 100644 --- a/shared/relay/client/dialer/quic/quic.go +++ b/shared/relay/client/dialer/quic/quic.go @@ -23,7 +23,7 @@ func (d Dialer) Protocol() string { return Network } -func (d Dialer) Dial(ctx context.Context, address string) (net.Conn, error) { +func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn, error) { quicURL, err := prepareURL(address) if err != nil { return nil, err @@ -32,11 +32,14 @@ func (d Dialer) Dial(ctx context.Context, address string) (net.Conn, error) { // Get the base TLS config tlsClientConfig := quictls.ClientQUICTLSConfig() - // Set ServerName to hostname if not an IP address - host, _, splitErr := net.SplitHostPort(quicURL) - if splitErr == nil && net.ParseIP(host) == nil { - // It's a hostname, not an IP - modify directly - tlsClientConfig.ServerName = host + switch { + case serverName != "" && net.ParseIP(serverName) == nil: + tlsClientConfig.ServerName = serverName + default: + host, _, splitErr := net.SplitHostPort(quicURL) + if splitErr == nil && net.ParseIP(host) == nil { + tlsClientConfig.ServerName = host + } } quicConfig := &quic.Config{ diff --git a/shared/relay/client/dialer/race_dialer.go b/shared/relay/client/dialer/race_dialer.go index 34359d17e..15208b858 100644 --- a/shared/relay/client/dialer/race_dialer.go +++ b/shared/relay/client/dialer/race_dialer.go @@ -14,7 +14,9 @@ const ( ) type DialeFn interface { - Dial(ctx context.Context, address string) (net.Conn, error) + // Dial connects to address. serverName, when non-empty, overrides the TLS + // ServerName used for SNI/cert validation. Empty means derive from address. + Dial(ctx context.Context, address, serverName string) (net.Conn, error) Protocol() string } @@ -27,6 +29,7 @@ type dialResult struct { type RaceDial struct { log *log.Entry serverURL string + serverName string dialerFns []DialeFn connectionTimeout time.Duration } @@ -40,6 +43,16 @@ func NewRaceDial(log *log.Entry, connectionTimeout time.Duration, serverURL stri } } +// WithServerName sets a TLS SNI/cert validation override. Used when serverURL +// contains an IP literal but the cert is issued for a different hostname. +// +// Mutates the receiver and is not safe for concurrent reconfiguration; a +// RaceDial is intended to be constructed per dial and discarded. +func (r *RaceDial) WithServerName(serverName string) *RaceDial { + r.serverName = serverName + return r +} + func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) { connChan := make(chan dialResult, len(r.dialerFns)) winnerConn := make(chan net.Conn, 1) @@ -64,7 +77,7 @@ func (r *RaceDial) dial(dfn DialeFn, abortCtx context.Context, connChan chan dia defer cancel() r.log.Infof("dialing Relay server via %s", dfn.Protocol()) - conn, err := dfn.Dial(ctx, r.serverURL) + conn, err := dfn.Dial(ctx, r.serverURL, r.serverName) connChan <- dialResult{Conn: conn, Protocol: dfn.Protocol(), Err: err} } diff --git a/shared/relay/client/dialer/race_dialer_test.go b/shared/relay/client/dialer/race_dialer_test.go index aa18df578..a53edc00e 100644 --- a/shared/relay/client/dialer/race_dialer_test.go +++ b/shared/relay/client/dialer/race_dialer_test.go @@ -28,7 +28,7 @@ type MockDialer struct { protocolStr string } -func (m *MockDialer) Dial(ctx context.Context, address string) (net.Conn, error) { +func (m *MockDialer) Dial(ctx context.Context, address, _ string) (net.Conn, error) { return m.dialFunc(ctx, address) } diff --git a/shared/relay/client/dialer/ws/conn.go b/shared/relay/client/dialer/ws/conn.go index d5b719f51..9497fab89 100644 --- a/shared/relay/client/dialer/ws/conn.go +++ b/shared/relay/client/dialer/ws/conn.go @@ -12,14 +12,24 @@ import ( type Conn struct { ctx context.Context *websocket.Conn - remoteAddr WebsocketAddr + remoteAddr net.Addr } -func NewConn(wsConn *websocket.Conn, serverAddress string) net.Conn { +// NewConn builds a relay ws.Conn. underlying is the raw TCP/TLS conn captured +// from the http transport's DialContext; when set, RemoteAddr returns its +// peer address (an IP literal). When nil (e.g. wasm), RemoteAddr falls back +// to the dial-time URL. +func NewConn(wsConn *websocket.Conn, serverAddress string, underlying net.Conn) net.Conn { + var addr net.Addr = WebsocketAddr{serverAddress} + if underlying != nil { + if ra := underlying.RemoteAddr(); ra != nil { + addr = ra + } + } return &Conn{ ctx: context.Background(), Conn: wsConn, - remoteAddr: WebsocketAddr{serverAddress}, + remoteAddr: addr, } } diff --git a/shared/relay/client/dialer/ws/dialopts_generic.go b/shared/relay/client/dialer/ws/dialopts_generic.go index 9dfe698d0..8008d89d3 100644 --- a/shared/relay/client/dialer/ws/dialopts_generic.go +++ b/shared/relay/client/dialer/ws/dialopts_generic.go @@ -2,10 +2,14 @@ package ws -import "github.com/coder/websocket" +import ( + "net" -func createDialOptions() *websocket.DialOptions { + "github.com/coder/websocket" +) + +func createDialOptions(serverName string, underlyingOut *net.Conn) *websocket.DialOptions { return &websocket.DialOptions{ - HTTPClient: httpClientNbDialer(), + HTTPClient: httpClientNbDialer(serverName, underlyingOut), } } diff --git a/shared/relay/client/dialer/ws/dialopts_js.go b/shared/relay/client/dialer/ws/dialopts_js.go index 7eac27531..5b11fe765 100644 --- a/shared/relay/client/dialer/ws/dialopts_js.go +++ b/shared/relay/client/dialer/ws/dialopts_js.go @@ -2,9 +2,13 @@ package ws -import "github.com/coder/websocket" +import ( + "net" -func createDialOptions() *websocket.DialOptions { - // WASM version doesn't support HTTPClient + "github.com/coder/websocket" +) + +func createDialOptions(_ string, _ *net.Conn) *websocket.DialOptions { + // WASM version doesn't support HTTPClient or custom TLS config. return &websocket.DialOptions{} } diff --git a/shared/relay/client/dialer/ws/ws.go b/shared/relay/client/dialer/ws/ws.go index 37b189e05..301486514 100644 --- a/shared/relay/client/dialer/ws/ws.go +++ b/shared/relay/client/dialer/ws/ws.go @@ -26,13 +26,14 @@ func (d Dialer) Protocol() string { return "WS" } -func (d Dialer) Dial(ctx context.Context, address string) (net.Conn, error) { +func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn, error) { wsURL, err := prepareURL(address) if err != nil { return nil, err } - opts := createDialOptions() + var underlying net.Conn + opts := createDialOptions(serverName, &underlying) parsedURL, err := url.Parse(wsURL) if err != nil { @@ -52,7 +53,7 @@ func (d Dialer) Dial(ctx context.Context, address string) (net.Conn, error) { _ = resp.Body.Close() } - conn := NewConn(wsConn, address) + conn := NewConn(wsConn, address, underlying) return conn, nil } @@ -64,7 +65,10 @@ func prepareURL(address string) (string, error) { return strings.Replace(address, "rel", "ws", 1), nil } -func httpClientNbDialer() *http.Client { +// httpClientNbDialer builds the http client used by the websocket library. +// underlyingOut, when non-nil, is populated with the raw conn from the +// transport's DialContext so the caller can read its RemoteAddr. +func httpClientNbDialer(serverName string, underlyingOut *net.Conn) *http.Client { customDialer := nbnet.NewDialer() certPool, err := x509.SystemCertPool() @@ -75,10 +79,15 @@ func httpClientNbDialer() *http.Client { customTransport := &http.Transport{ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { - return customDialer.DialContext(ctx, network, addr) + c, err := customDialer.DialContext(ctx, network, addr) + if err == nil && underlyingOut != nil { + *underlyingOut = c + } + return c, err }, TLSClientConfig: &tls.Config{ - RootCAs: certPool, + RootCAs: certPool, + ServerName: serverName, }, } diff --git a/shared/relay/client/manager.go b/shared/relay/client/manager.go index 37104bfe7..3858b3c83 100644 --- a/shared/relay/client/manager.go +++ b/shared/relay/client/manager.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "net" + "net/netip" "reflect" "sync" "time" @@ -75,6 +76,9 @@ type Manager struct { mtu uint16 maxBackoffInterval time.Duration + + cleanupInterval time.Duration + keepUnusedServerTime time.Duration } // NewManager creates a new manager instance. @@ -95,6 +99,8 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin }, relayClients: make(map[string]*RelayTrack), onDisconnectedListeners: make(map[string]*list.List), + cleanupInterval: relayCleanupInterval, + keepUnusedServerTime: keepUnusedServerTime, } for _, opt := range opts { opt(m) @@ -130,7 +136,10 @@ func (m *Manager) Serve() error { // OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be // established via the relay server. If the peer is on a different relay server, the manager will establish a new // connection to the relay server. It returns back with a net.Conn what represent the remote peer connection. -func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string) (net.Conn, error) { +// +// serverIP, when valid and serverAddress is foreign, is used as a dial target if the FQDN-based dial fails. +// Ignored for the local home-server path. TLS verification still uses the FQDN via SNI. +func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string, serverIP netip.Addr) (net.Conn, error) { m.relayClientMu.RLock() defer m.relayClientMu.RUnlock() @@ -151,7 +160,7 @@ func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string) ( netConn, err = m.relayClient.OpenConn(ctx, peerKey) } else { log.Debugf("open peer connection via foreign server: %s", serverAddress) - netConn, err = m.openConnVia(ctx, serverAddress, peerKey) + netConn, err = m.openConnVia(ctx, serverAddress, peerKey, serverIP) } if err != nil { return nil, err @@ -203,16 +212,22 @@ func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServ return nil } -// RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is -// lost. This address will be sent to the target peer to choose the common relay server for the communication. -func (m *Manager) RelayInstanceAddress() (string, error) { +// RelayInstanceAddress returns the address and resolved IP of the permanent relay server. It could change if the +// network connection is lost. The address is sent to the target peer to choose the common relay server for the +// communication; the IP is sent alongside so remote peers can dial directly without their own DNS lookup. Both +// values are read under the same lock so they cannot diverge across a reconnection. +func (m *Manager) RelayInstanceAddress() (string, netip.Addr, error) { m.relayClientMu.RLock() defer m.relayClientMu.RUnlock() if m.relayClient == nil { - return "", ErrRelayClientNotConnected + return "", netip.Addr{}, ErrRelayClientNotConnected } - return m.relayClient.ServerInstanceURL() + addr, err := m.relayClient.ServerInstanceURL() + if err != nil { + return "", netip.Addr{}, err + } + return addr, m.relayClient.ConnectedIP(), nil } // ServerURLs returns the addresses of the relay servers. @@ -236,7 +251,7 @@ func (m *Manager) UpdateToken(token *relayAuth.Token) error { return m.tokenStore.UpdateToken(token) } -func (m *Manager) openConnVia(ctx context.Context, serverAddress, peerKey string) (net.Conn, error) { +func (m *Manager) openConnVia(ctx context.Context, serverAddress, peerKey string, serverIP netip.Addr) (net.Conn, error) { // check if already has a connection to the desired relay server m.relayClientsMutex.RLock() rt, ok := m.relayClients[serverAddress] @@ -271,7 +286,7 @@ func (m *Manager) openConnVia(ctx context.Context, serverAddress, peerKey string m.relayClients[serverAddress] = rt m.relayClientsMutex.Unlock() - relayClient := NewClient(serverAddress, m.tokenStore, m.peerID, m.mtu) + relayClient := NewClientWithServerIP(serverAddress, serverIP, m.tokenStore, m.peerID, m.mtu) err := relayClient.Connect(m.ctx) if err != nil { rt.err = err @@ -364,7 +379,7 @@ func (m *Manager) isForeignServer(address string) (bool, error) { } func (m *Manager) startCleanupLoop() { - ticker := time.NewTicker(relayCleanupInterval) + ticker := time.NewTicker(m.cleanupInterval) defer ticker.Stop() for { select { @@ -389,7 +404,7 @@ func (m *Manager) cleanUpUnusedRelays() { continue } - if time.Since(rt.created) <= keepUnusedServerTime { + if time.Since(rt.created) <= m.keepUnusedServerTime { rt.Unlock() continue } diff --git a/shared/relay/client/manager_serverip_test.go b/shared/relay/client/manager_serverip_test.go new file mode 100644 index 000000000..a354beade --- /dev/null +++ b/shared/relay/client/manager_serverip_test.go @@ -0,0 +1,144 @@ +package client + +import ( + "context" + "io" + "net/netip" + "testing" + "time" + + "github.com/netbirdio/netbird/client/iface" + "github.com/netbirdio/netbird/relay/server" +) + +// TestManager_ForeignRelayServerIP exercises the foreign-relay path +// end-to-end through Manager.OpenConn. Alice and Bob register on different +// relay servers; Alice dials Bob's foreign relay using an unresolvable +// FQDN. Without a server IP the dial fails; with Bob's advertised IP it +// recovers and a payload round-trips between the peers. +func TestManager_ForeignRelayServerIP(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + + // Alice's home relay + homeCfg := server.ListenerConfig{Address: "127.0.0.1:52401"} + homeSrv, err := server.NewServer(newManagerTestServerConfig(homeCfg.Address)) + if err != nil { + t.Fatalf("create home server: %s", err) + } + homeErr := make(chan error, 1) + go func() { + if err := homeSrv.Listen(homeCfg); err != nil { + homeErr <- err + } + }() + t.Cleanup(func() { _ = homeSrv.Shutdown(context.Background()) }) + if err := waitForServerToStart(homeErr); err != nil { + t.Fatalf("home server: %s", err) + } + + // Bob's foreign relay + foreignCfg := server.ListenerConfig{Address: "127.0.0.1:52402"} + foreignSrv, err := server.NewServer(newManagerTestServerConfig(foreignCfg.Address)) + if err != nil { + t.Fatalf("create foreign server: %s", err) + } + foreignErr := make(chan error, 1) + go func() { + if err := foreignSrv.Listen(foreignCfg); err != nil { + foreignErr <- err + } + }() + t.Cleanup(func() { _ = foreignSrv.Shutdown(context.Background()) }) + if err := waitForServerToStart(foreignErr); err != nil { + t.Fatalf("foreign server: %s", err) + } + + mCtx, mCancel := context.WithCancel(ctx) + t.Cleanup(mCancel) + + mgrAlice := NewManager(mCtx, toURL(homeCfg), "alice", iface.DefaultMTU) + if err := mgrAlice.Serve(); err != nil { + t.Fatalf("alice manager serve: %s", err) + } + + mgrBob := NewManager(mCtx, toURL(foreignCfg), "bob", iface.DefaultMTU) + if err := mgrBob.Serve(); err != nil { + t.Fatalf("bob manager serve: %s", err) + } + + // Bob's real relay URL and the IP that would ride along in signal as relayServerIP. + bobRealAddr, bobAdvertisedIP, err := mgrBob.RelayInstanceAddress() + if err != nil { + t.Fatalf("bob relay address: %s", err) + } + if !bobAdvertisedIP.IsValid() { + t.Fatalf("expected valid RelayInstanceIP for bob, got zero") + } + + // .invalid is reserved (RFC 2606), so DNS resolution always fails. + const brokenFQDN = "rel://relay-bob-instance.invalid:52402" + if brokenFQDN == bobRealAddr { + t.Fatalf("broken FQDN must differ from bob's real address (%s)", bobRealAddr) + } + + t.Run("no server IP, dial fails", func(t *testing.T) { + dialCtx, dialCancel := context.WithTimeout(ctx, 5*time.Second) + defer dialCancel() + _, err := mgrAlice.OpenConn(dialCtx, brokenFQDN, "bob", netip.Addr{}) + if err == nil { + t.Fatalf("expected OpenConn to fail without server IP, got success") + } + }) + + t.Run("server IP recovers", func(t *testing.T) { + // Bob waits for Alice's incoming peer connection on his side. + bobSideCh := make(chan error, 1) + go func() { + conn, err := mgrBob.OpenConn(ctx, bobRealAddr, "alice", netip.Addr{}) + if err != nil { + bobSideCh <- err + return + } + buf := make([]byte, 1024) + n, err := conn.Read(buf) + if err != nil { + bobSideCh <- err + return + } + if _, err := conn.Write(buf[:n]); err != nil { + bobSideCh <- err + return + } + bobSideCh <- nil + }() + + aliceConn, err := mgrAlice.OpenConn(ctx, brokenFQDN, "bob", bobAdvertisedIP) + if err != nil { + t.Fatalf("alice OpenConn with server IP: %s", err) + } + t.Cleanup(func() { _ = aliceConn.Close() }) + + payload := []byte("alice-to-bob") + if _, err := aliceConn.Write(payload); err != nil { + t.Fatalf("alice write: %s", err) + } + + buf := make([]byte, len(payload)) + if _, err := io.ReadFull(aliceConn, buf); err != nil { + t.Fatalf("alice read echo: %s", err) + } + if string(buf) != string(payload) { + t.Fatalf("echo mismatch: got %q want %q", buf, payload) + } + + select { + case err := <-bobSideCh: + if err != nil { + t.Fatalf("bob side: %s", err) + } + case <-time.After(5 * time.Second): + t.Fatalf("timed out waiting for bob side") + } + }) +} diff --git a/shared/relay/client/manager_test.go b/shared/relay/client/manager_test.go index 5bbcad886..9e964f688 100644 --- a/shared/relay/client/manager_test.go +++ b/shared/relay/client/manager_test.go @@ -3,6 +3,7 @@ package client import ( "context" "fmt" + "net/netip" "testing" "time" @@ -101,15 +102,15 @@ func TestForeignConn(t *testing.T) { if err := clientBob.Serve(); err != nil { t.Fatalf("failed to serve manager: %s", err) } - bobsSrvAddr, err := clientBob.RelayInstanceAddress() + bobsSrvAddr, _, err := clientBob.RelayInstanceAddress() if err != nil { t.Fatalf("failed to get relay address: %s", err) } - connAliceToBob, err := clientAlice.OpenConn(ctx, bobsSrvAddr, "bob") + connAliceToBob, err := clientAlice.OpenConn(ctx, bobsSrvAddr, "bob", netip.Addr{}) if err != nil { t.Fatalf("failed to bind channel: %s", err) } - connBobToAlice, err := clientBob.OpenConn(ctx, bobsSrvAddr, "alice") + connBobToAlice, err := clientBob.OpenConn(ctx, bobsSrvAddr, "alice", netip.Addr{}) if err != nil { t.Fatalf("failed to bind channel: %s", err) } @@ -209,7 +210,7 @@ func TestForeginConnClose(t *testing.T) { if err != nil { t.Fatalf("failed to serve manager: %s", err) } - conn, err := mgr.OpenConn(ctx, toURL(srvCfg2)[0], "bob") + conn, err := mgr.OpenConn(ctx, toURL(srvCfg2)[0], "bob", netip.Addr{}) if err != nil { t.Fatalf("failed to bind channel: %s", err) } @@ -301,7 +302,7 @@ func TestForeignAutoClose(t *testing.T) { } t.Log("open connection to another peer") - if _, err = mgr.OpenConn(ctx, foreignServerURL, "anotherpeer"); err == nil { + if _, err = mgr.OpenConn(ctx, foreignServerURL, "anotherpeer", netip.Addr{}); err == nil { t.Fatalf("should have failed to open connection to another peer") } @@ -367,11 +368,11 @@ func TestAutoReconnect(t *testing.T) { if err != nil { t.Fatalf("failed to serve manager: %s", err) } - ra, err := clientAlice.RelayInstanceAddress() + ra, _, err := clientAlice.RelayInstanceAddress() if err != nil { t.Errorf("failed to get relay address: %s", err) } - conn, err := clientAlice.OpenConn(ctx, ra, "bob") + conn, err := clientAlice.OpenConn(ctx, ra, "bob", netip.Addr{}) if err != nil { t.Errorf("failed to bind channel: %s", err) } @@ -391,7 +392,7 @@ func TestAutoReconnect(t *testing.T) { } log.Infof("reopent the connection") - _, err = clientAlice.OpenConn(ctx, ra, "bob") + _, err = clientAlice.OpenConn(ctx, ra, "bob", netip.Addr{}) if err != nil { t.Errorf("failed to open channel: %s", err) } @@ -453,7 +454,7 @@ func TestNotifierDoubleAdd(t *testing.T) { t.Fatalf("failed to serve manager: %s", err) } - conn1, err := clientAlice.OpenConn(ctx, clientAlice.ServerURLs()[0], "bob") + conn1, err := clientAlice.OpenConn(ctx, clientAlice.ServerURLs()[0], "bob", netip.Addr{}) if err != nil { t.Fatalf("failed to bind channel: %s", err) } diff --git a/shared/signal/client/client.go b/shared/signal/client/client.go index 5347c80e9..9dc6ccd37 100644 --- a/shared/signal/client/client.go +++ b/shared/signal/client/client.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "net/netip" "strings" "github.com/netbirdio/netbird/shared/signal/proto" @@ -14,17 +15,17 @@ import ( // A set of tools to exchange connection details (Wireguard endpoints) with the remote peer. -// Status is the status of the client -type Status string - -const StreamConnected Status = "Connected" -const StreamDisconnected Status = "Disconnected" - const ( + StreamConnected Status = "Connected" + StreamDisconnected Status = "Disconnected" + // DirectCheck indicates support to direct mode checks DirectCheck uint32 = 1 ) +// Status is the status of the client +type Status string + type Client interface { io.Closer StreamConnected() bool @@ -38,6 +39,24 @@ type Client interface { SetOnReconnectedListener(func()) } +// Credential is an instance of a GrpcClient's Credential +type Credential struct { + UFrag string + Pwd string +} + +// CredentialPayload bundles the fields of a signal Body for MarshalCredential. +type CredentialPayload struct { + Type proto.Body_Type + WgListenPort int + Credential *Credential + RosenpassPubKey []byte + RosenpassAddr string + RelaySrvAddress string + RelaySrvIP netip.Addr + SessionID []byte +} + // UnMarshalCredential parses the credentials from the message and returns a Credential instance func UnMarshalCredential(msg *proto.Message) (*Credential, error) { @@ -52,27 +71,27 @@ func UnMarshalCredential(msg *proto.Message) (*Credential, error) { } // MarshalCredential marshal a Credential instance and returns a Message object -func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey string, credential *Credential, t proto.Body_Type, rosenpassPubKey []byte, rosenpassAddr string, relaySrvAddress string, sessionID []byte) (*proto.Message, error) { +func MarshalCredential(myKey wgtypes.Key, remoteKey string, p CredentialPayload) (*proto.Message, error) { + body := &proto.Body{ + Type: p.Type, + Payload: fmt.Sprintf("%s:%s", p.Credential.UFrag, p.Credential.Pwd), + WgListenPort: uint32(p.WgListenPort), + NetBirdVersion: version.NetbirdVersion(), + RosenpassConfig: &proto.RosenpassConfig{ + RosenpassPubKey: p.RosenpassPubKey, + RosenpassServerAddr: p.RosenpassAddr, + }, + SessionId: p.SessionID, + } + if p.RelaySrvAddress != "" { + body.RelayServerAddress = &p.RelaySrvAddress + } + if p.RelaySrvIP.IsValid() { + body.RelayServerIP = p.RelaySrvIP.Unmap().AsSlice() + } return &proto.Message{ Key: myKey.PublicKey().String(), RemoteKey: remoteKey, - Body: &proto.Body{ - Type: t, - Payload: fmt.Sprintf("%s:%s", credential.UFrag, credential.Pwd), - WgListenPort: uint32(myPort), - NetBirdVersion: version.NetbirdVersion(), - RosenpassConfig: &proto.RosenpassConfig{ - RosenpassPubKey: rosenpassPubKey, - RosenpassServerAddr: rosenpassAddr, - }, - RelayServerAddress: relaySrvAddress, - SessionId: sessionID, - }, + Body: body, }, nil } - -// Credential is an instance of a GrpcClient's Credential -type Credential struct { - UFrag string - Pwd string -} diff --git a/shared/signal/proto/signalexchange.pb.go b/shared/signal/proto/signalexchange.pb.go index d9c61a846..0c80fb489 100644 --- a/shared/signal/proto/signalexchange.pb.go +++ b/shared/signal/proto/signalexchange.pb.go @@ -229,8 +229,13 @@ type Body struct { // RosenpassConfig is a Rosenpass config of the remote peer our peer tries to connect to RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"` // relayServerAddress is url of the relay server - RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"` - SessionId []byte `protobuf:"bytes,10,opt,name=sessionId,proto3,oneof" json:"sessionId,omitempty"` + RelayServerAddress *string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3,oneof" json:"relayServerAddress,omitempty"` + SessionId []byte `protobuf:"bytes,10,opt,name=sessionId,proto3,oneof" json:"sessionId,omitempty"` + // relayServerIP is the IP the sender is connected to on its relay server, + // encoded as 4 bytes (IPv4) or 16 bytes (IPv6). Receivers may use it as a + // fallback dial target when DNS resolution of relayServerAddress fails. + // SNI/TLS verification still uses relayServerAddress. + RelayServerIP []byte `protobuf:"bytes,11,opt,name=relayServerIP,proto3,oneof" json:"relayServerIP,omitempty"` } func (x *Body) Reset() { @@ -315,8 +320,8 @@ func (x *Body) GetRosenpassConfig() *RosenpassConfig { } func (x *Body) GetRelayServerAddress() string { - if x != nil { - return x.RelayServerAddress + if x != nil && x.RelayServerAddress != nil { + return *x.RelayServerAddress } return "" } @@ -328,6 +333,13 @@ func (x *Body) GetSessionId() []byte { return nil } +func (x *Body) GetRelayServerIP() []byte { + if x != nil { + return x.RelayServerIP + } + return nil +} + // Mode indicates a connection mode type Mode struct { state protoimpl.MessageState @@ -451,7 +463,7 @@ var file_signalexchange_proto_rawDesc = []byte{ 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52, - 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xe4, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, + 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xc3, 0x04, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, @@ -471,40 +483,46 @@ var file_signalexchange_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, - 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, + 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x33, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, - 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, - 0x6e, 0x49, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x09, 0x73, 0x65, 0x73, - 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, + 0x28, 0x09, 0x48, 0x00, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, + 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x88, 0x01, 0x01, 0x12, 0x21, 0x0a, 0x09, 0x73, + 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x01, + 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x29, + 0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x50, 0x18, + 0x0b, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x02, 0x52, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, + 0x72, 0x76, 0x65, 0x72, 0x49, 0x50, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53, 0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, - 0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x0c, - 0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x04, - 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, - 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, - 0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, - 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, - 0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, - 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, - 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, - 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, - 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, - 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, - 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, - 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, - 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, - 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, - 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, - 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, - 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, - 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, - 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, - 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, - 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x15, + 0x0a, 0x13, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, + 0x64, 0x72, 0x65, 0x73, 0x73, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, + 0x6e, 0x49, 0x64, 0x42, 0x10, 0x0a, 0x0e, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, + 0x76, 0x65, 0x72, 0x49, 0x50, 0x4a, 0x04, 0x08, 0x09, 0x10, 0x0a, 0x22, 0x2e, 0x0a, 0x04, 0x4d, + 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01, + 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52, + 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, + 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, + 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, + 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65, + 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, + 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, 0x53, + 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a, + 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, + 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, + 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, + 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, + 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, 0x43, + 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x73, + 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, + 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, + 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, + 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, + 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/shared/signal/proto/signalexchange.proto b/shared/signal/proto/signalexchange.proto index 0a33ad78b..96a4001e3 100644 --- a/shared/signal/proto/signalexchange.proto +++ b/shared/signal/proto/signalexchange.proto @@ -63,9 +63,17 @@ message Body { RosenpassConfig rosenpassConfig = 7; // relayServerAddress is url of the relay server - string relayServerAddress = 8; + optional string relayServerAddress = 8; + + reserved 9; optional bytes sessionId = 10; + + // relayServerIP is the IP the sender is connected to on its relay server, + // encoded as 4 bytes (IPv4) or 16 bytes (IPv6). Receivers may use it as a + // fallback dial target when DNS resolution of relayServerAddress fails. + // SNI/TLS verification still uses relayServerAddress. + optional bytes relayServerIP = 11; } // Mode indicates a connection mode From 6262b0d841a5a4c1bd758d45332a6dba51cb09dd Mon Sep 17 00:00:00 2001 From: Bethuel Mmbaga Date: Mon, 4 May 2026 12:47:13 +0300 Subject: [PATCH 2/4] [management] Track pending approval in peer event metadata (#6040) --- management/server/peer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/management/server/peer.go b/management/server/peer.go index d1c52002e..25c6ecd8c 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -818,6 +818,9 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, accountID, setupKe if !addedByUser { opEvent.Meta["setup_key_name"] = peerAddConfig.SetupKeyName } + if newPeer.Status != nil && newPeer.Status.RequiresApproval { + opEvent.Meta["pending_approval"] = true + } if !temporary { am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta) From a21f6ecb0a5d7ba45b4bf570a7af62ba1f66447d Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Mon, 4 May 2026 11:59:01 +0200 Subject: [PATCH 3/4] [client] release Status.mux before invoking notifier callbacks (#6039) The Status recorder used to fire notifier callbacks while holding d.mux: - notifyPeerListChanged / notifyPeerStateChangeListeners ran from inside the locked section of every Update*/AddPeerStateRoute/etc. - notifyAddressChanged ran from UpdateLocalPeerState and CleanLocalPeerState while d.mux was held. - onConnectionChanged was registered with a defer above defer d.mux.Unlock, so it executed before the mutex was released in the Mark*Connected/ Disconnected helpers. - notifyPeerStateChangeListeners did a blocking channel send under d.mux, so a slow subscriber stalled every other d.mux holder. A listener that re-enters the recorder (e.g. calls GetFullStatus from within a callback) deadlocks against d.mux, and any callback that takes longer than expected stalls every other state query for its duration. Capture the values needed for notification under the lock, release d.mux, then call the notifier. Build per-peer router-state snapshots inside the lock and dispatch them via dispatchRouterPeers afterwards. The router-peer channel send stays blocking, but now happens outside d.mux so a slow consumer cannot stall any other d.mux holder, and no peer state transitions are silently dropped. The notifier itself is unchanged: its internal state was already protected by its own locks, and the field d.notifier is set once in NewRecorder and never reassigned, so reading it without d.mux is safe. Also fix a pre-existing race in Test_notifier_RemoveListener / Test_notifier_SetListener: setListener spawns a goroutine that writes listener.peers, but the tests read listener.peers without waiting for it. --- client/internal/peer/notifier_test.go | 17 ++ client/internal/peer/status.go | 229 +++++++++++++++++--------- 2 files changed, 170 insertions(+), 76 deletions(-) diff --git a/client/internal/peer/notifier_test.go b/client/internal/peer/notifier_test.go index bbdc00e13..0b7722b0c 100644 --- a/client/internal/peer/notifier_test.go +++ b/client/internal/peer/notifier_test.go @@ -8,6 +8,7 @@ import ( type mocListener struct { lastState int wg sync.WaitGroup + peersWg sync.WaitGroup peers int } @@ -33,6 +34,7 @@ func (l *mocListener) OnAddressChanged(host, addr string) { } func (l *mocListener) OnPeersListChanged(size int) { l.peers = size + l.peersWg.Done() } func (l *mocListener) setWaiter() { @@ -43,6 +45,14 @@ func (l *mocListener) wait() { l.wg.Wait() } +func (l *mocListener) setPeersWaiter() { + l.peersWg.Add(1) +} + +func (l *mocListener) waitPeers() { + l.peersWg.Wait() +} + func Test_notifier_serverState(t *testing.T) { type scenario struct { @@ -72,11 +82,13 @@ func Test_notifier_serverState(t *testing.T) { func Test_notifier_SetListener(t *testing.T) { listener := &mocListener{} listener.setWaiter() + listener.setPeersWaiter() n := newNotifier() n.lastNotification = stateConnecting n.setListener(listener) listener.wait() + listener.waitPeers() if listener.lastState != n.lastNotification { t.Errorf("invalid state: %d, expected: %d", listener.lastState, n.lastNotification) } @@ -85,9 +97,14 @@ func Test_notifier_SetListener(t *testing.T) { func Test_notifier_RemoveListener(t *testing.T) { listener := &mocListener{} listener.setWaiter() + listener.setPeersWaiter() n := newNotifier() n.lastNotification = stateConnecting n.setListener(listener) + // setListener replays cached state on a goroutine; wait for both the state + // and peers callbacks to finish so we don't race on listener.peers. + listener.wait() + listener.waitPeers() n.removeListener() n.peerListChanged(1) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 7bd19b0e1..e8e61f660 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -320,10 +320,10 @@ func (d *Status) RemovePeer(peerPubKey string) error { // UpdatePeerState updates peer status func (d *Status) UpdatePeerState(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -343,23 +343,29 @@ func (d *Status) UpdatePeerState(receivedState State) error { d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } - + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) // when we close the connection we will not notify the router manager - if receivedState.ConnStatus == StatusIdle { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + notifyRouter := receivedState.ConnStatus == StatusIdle + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() + + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.ResID) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[peer] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -371,17 +377,20 @@ func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.R d.routeIDLookup.AddRemoteRouteID(resourceId, pref) } + numPeers := d.numOfPeers() + d.mux.Unlock() + // todo: consider to make sense of this notification or not - d.notifyPeerListChanged() + d.notifier.peerListChanged(numPeers) return nil } func (d *Status) RemovePeerStateRoute(peer string, route string) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[peer] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -393,8 +402,11 @@ func (d *Status) RemovePeerStateRoute(peer string, route string) error { d.routeIDLookup.RemoveRemoteRouteID(pref) } + numPeers := d.numOfPeers() + d.mux.Unlock() + // todo: consider to make sense of this notification or not - d.notifyPeerListChanged() + d.notifier.peerListChanged(numPeers) return nil } @@ -410,10 +422,10 @@ func (d *Status) CheckRoutes(ip netip.Addr) ([]byte, bool) { func (d *Status) UpdatePeerICEState(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -431,22 +443,28 @@ func (d *Status) UpdatePeerICEState(receivedState State) error { d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) + notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() - if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } func (d *Status) UpdatePeerRelayedState(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -461,22 +479,28 @@ func (d *Status) UpdatePeerRelayedState(receivedState State) error { d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) + notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() - if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -490,22 +514,28 @@ func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) + notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() - if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error { d.mux.Lock() - defer d.mux.Unlock() peerState, ok := d.peers[receivedState.PubKey] if !ok { + d.mux.Unlock() return errors.New("peer doesn't exist") } @@ -522,12 +552,18 @@ func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error { d.peers[receivedState.PubKey] = peerState - if hasConnStatusChanged(oldState, receivedState.ConnStatus) { - d.notifyPeerListChanged() - } + notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus) + notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) + routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter) + numPeers := d.numOfPeers() - if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) { - d.notifyPeerStateChangeListeners(receivedState.PubKey) + d.mux.Unlock() + + if notifyList { + d.notifier.peerListChanged(numPeers) + } + if notifyRouter { + d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot) } return nil } @@ -594,17 +630,33 @@ func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) erro // FinishPeerListModifications this event invoke the notification func (d *Status) FinishPeerListModifications() { d.mux.Lock() - defer d.mux.Unlock() if !d.peerListChangedForNotification { + d.mux.Unlock() return } d.peerListChangedForNotification = false - d.notifyPeerListChanged() + numPeers := d.numOfPeers() + // snapshot per-peer router state to deliver after the lock is released + type routerDispatch struct { + peerID string + snapshot map[string]RouterState + } + dispatches := make([]routerDispatch, 0, len(d.peers)) for key := range d.peers { - d.notifyPeerStateChangeListeners(key) + snapshot := d.snapshotRouterPeersLocked(key, true) + if snapshot != nil { + dispatches = append(dispatches, routerDispatch{peerID: key, snapshot: snapshot}) + } + } + + d.mux.Unlock() + + d.notifier.peerListChanged(numPeers) + for _, rd := range dispatches { + d.dispatchRouterPeers(rd.peerID, rd.snapshot) } } @@ -655,10 +707,12 @@ func (d *Status) GetLocalPeerState() LocalPeerState { // UpdateLocalPeerState updates local peer status func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) { d.mux.Lock() - defer d.mux.Unlock() - d.localPeer = localPeerState - d.notifyAddressChanged() + fqdn := d.localPeer.FQDN + ip := d.localPeer.IP + d.mux.Unlock() + + d.notifier.localAddressChanged(fqdn, ip) } // AddLocalPeerStateRoute adds a route to the local peer state @@ -721,30 +775,36 @@ func (d *Status) CleanLocalPeerStateRoutes() { // CleanLocalPeerState cleans local peer status func (d *Status) CleanLocalPeerState() { d.mux.Lock() - defer d.mux.Unlock() - d.localPeer = LocalPeerState{} - d.notifyAddressChanged() + fqdn := d.localPeer.FQDN + ip := d.localPeer.IP + d.mux.Unlock() + + d.notifier.localAddressChanged(fqdn, ip) } // MarkManagementDisconnected sets ManagementState to disconnected func (d *Status) MarkManagementDisconnected(err error) { d.mux.Lock() - defer d.mux.Unlock() - defer d.onConnectionChanged() - d.managementState = false d.managementError = err + mgm := d.managementState + sig := d.signalState + d.mux.Unlock() + + d.notifier.updateServerStates(mgm, sig) } // MarkManagementConnected sets ManagementState to connected func (d *Status) MarkManagementConnected() { d.mux.Lock() - defer d.mux.Unlock() - defer d.onConnectionChanged() - d.managementState = true d.managementError = nil + mgm := d.managementState + sig := d.signalState + d.mux.Unlock() + + d.notifier.updateServerStates(mgm, sig) } // UpdateSignalAddress update the address of the signal server @@ -778,21 +838,25 @@ func (d *Status) UpdateLazyConnection(enabled bool) { // MarkSignalDisconnected sets SignalState to disconnected func (d *Status) MarkSignalDisconnected(err error) { d.mux.Lock() - defer d.mux.Unlock() - defer d.onConnectionChanged() - d.signalState = false d.signalError = err + mgm := d.managementState + sig := d.signalState + d.mux.Unlock() + + d.notifier.updateServerStates(mgm, sig) } // MarkSignalConnected sets SignalState to connected func (d *Status) MarkSignalConnected() { d.mux.Lock() - defer d.mux.Unlock() - defer d.onConnectionChanged() - d.signalState = true d.signalError = nil + mgm := d.managementState + sig := d.signalState + d.mux.Unlock() + + d.notifier.updateServerStates(mgm, sig) } func (d *Status) UpdateRelayStates(relayResults []relay.ProbeResult) { @@ -1012,18 +1076,17 @@ func (d *Status) RemoveConnectionListener() { d.notifier.removeListener() } -func (d *Status) onConnectionChanged() { - d.notifier.updateServerStates(d.managementState, d.signalState) -} - -// notifyPeerStateChangeListeners notifies route manager about the change in peer state -func (d *Status) notifyPeerStateChangeListeners(peerID string) { - subs, ok := d.changeNotify[peerID] - if !ok { - return +// snapshotRouterPeersLocked builds the RouterState map for a peer's subscribers. +// Caller MUST hold d.mux. Returns nil when there are no subscribers for peerID +// or when notify is false. The snapshot is consumed later by dispatchRouterPeers +// outside the lock so the channel send cannot stall any d.mux holder. +func (d *Status) snapshotRouterPeersLocked(peerID string, notify bool) map[string]RouterState { + if !notify { + return nil + } + if _, ok := d.changeNotify[peerID]; !ok { + return nil } - - // collect the relevant data for router peers routerPeers := make(map[string]RouterState, len(d.changeNotify)) for pid := range d.changeNotify { s, ok := d.peers[pid] @@ -1031,13 +1094,35 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) { log.Warnf("router peer not found in peers list: %s", pid) continue } - routerPeers[pid] = RouterState{ Status: s.ConnStatus, Relayed: s.Relayed, Latency: s.Latency, } } + return routerPeers +} + +// dispatchRouterPeers delivers a previously snapshotted router-state map to +// the peer's subscribers. Caller MUST NOT hold d.mux. The method takes a +// fresh, short read of d.changeNotify under the lock to grab subscriber +// channels, then sends outside the lock so a slow consumer cannot block other +// d.mux holders. The send itself stays blocking (only short-circuited by the +// subscriber's context) so peer state transitions are not silently dropped. +func (d *Status) dispatchRouterPeers(peerID string, routerPeers map[string]RouterState) { + if routerPeers == nil { + return + } + + d.mux.Lock() + subsMap, ok := d.changeNotify[peerID] + subs := make([]*StatusChangeSubscription, 0, len(subsMap)) + if ok { + for _, sub := range subsMap { + subs = append(subs, sub) + } + } + d.mux.Unlock() for _, sub := range subs { select { @@ -1047,14 +1132,6 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) { } } -func (d *Status) notifyPeerListChanged() { - d.notifier.peerListChanged(d.numOfPeers()) -} - -func (d *Status) notifyAddressChanged() { - d.notifier.localAddressChanged(d.localPeer.FQDN, d.localPeer.IP) -} - func (d *Status) numOfPeers() int { return len(d.peers) + len(d.offlinePeers) } From a547fc74edd71268767d258a6ebe8513fa65f467 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Mon, 4 May 2026 11:59:25 +0200 Subject: [PATCH 4/4] [client] Use ctx.Err() instead of gRPC codes.Canceled to detect shutdown (#6019) Detecting shutdown by inspecting the gRPC status code conflates a local context cancellation with a server- or proxy-sent codes.Canceled. When the latter occurs (e.g. an intermediary proxy resets the stream), the retry loop silently terminates and the client never reconnects. Switch to ctx.Err() in the signal Receive loop and management Sync/Job handlers, and stop matching gRPC Canceled/DeadlineExceeded in the flow client's isContextDone helper. With this change, a server-sent Canceled is treated as a transient error and the backoff retry loop continues. --- flow/client/client.go | 15 +++++------- shared/management/client/grpc.go | 39 ++++++++++++-------------------- shared/signal/client/grpc.go | 2 +- 3 files changed, 21 insertions(+), 35 deletions(-) diff --git a/flow/client/client.go b/flow/client/client.go index 8ad637974..180a4b441 100644 --- a/flow/client/client.go +++ b/flow/client/client.go @@ -13,11 +13,9 @@ import ( "github.com/cenkalti/backoff/v4" log "github.com/sirupsen/logrus" "google.golang.org/grpc" - "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/status" nbgrpc "github.com/netbirdio/netbird/client/grpc" "github.com/netbirdio/netbird/flow/proto" @@ -301,12 +299,11 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff }, ctx) } +// isContextDone reports whether the local context has been canceled or has +// exceeded its deadline. It deliberately does not inspect gRPC status codes: +// a server- or proxy-sent codes.Canceled / codes.DeadlineExceeded must not +// short-circuit our retry loop, since retrying is the correct response when +// the local context is still alive. func isContextDone(err error) bool { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - return true - } - if s, ok := status.FromError(err); ok { - return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded - } - return false + return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) } diff --git a/shared/management/client/grpc.go b/shared/management/client/grpc.go index 2a51a777d..80625fe06 100644 --- a/shared/management/client/grpc.go +++ b/shared/management/client/grpc.go @@ -246,27 +246,23 @@ func (c *GrpcClient) handleJobStream( for { jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey) if err != nil { + if ctx.Err() != nil { + log.Debugf("job stream context has been canceled, this usually indicates shutdown") + return nil + } if s, ok := gstatus.FromError(err); ok { switch s.Code() { case codes.PermissionDenied: c.notifyDisconnected(err) return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer - case codes.Canceled: - log.Debugf("job stream context has been canceled, this usually indicates shutdown") - return err case codes.Unimplemented: log.Warn("Job feature is not supported by the current management server version. " + "Please update the management service to use this feature.") return nil - default: - log.Warnf("job stream disconnected, will retry silently. Reason: %v", err) - return err } - } else { - // non-gRPC error - log.Warnf("job stream disconnected, will retry silently. Reason: %v", err) - return err } + log.Warnf("job stream disconnected, will retry silently. Reason: %v", err) + return err } if jobReq == nil || len(jobReq.ID) == 0 { @@ -381,22 +377,15 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes. err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler) if err != nil { c.notifyDisconnected(err) - if s, ok := gstatus.FromError(err); ok { - switch s.Code() { - case codes.PermissionDenied: - return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer - case codes.Canceled: - log.Debugf("management connection context has been canceled, this usually indicates shutdown") - return nil - default: - log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) - return err - } - } else { - // non-gRPC error - log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) - return err + if ctx.Err() != nil { + log.Debugf("management connection context has been canceled, this usually indicates shutdown") + return nil } + if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied { + return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer + } + log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err) + return err } return nil diff --git a/shared/signal/client/grpc.go b/shared/signal/client/grpc.go index d0f598dd7..b245b2296 100644 --- a/shared/signal/client/grpc.go +++ b/shared/signal/client/grpc.go @@ -167,7 +167,7 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes // start receiving messages from the Signal stream (from other peers through signal) err = c.receive(stream) if err != nil { - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { + if ctx.Err() != nil { log.Debugf("signal connection context has been canceled, this usually indicates shutdown") return nil }