Compare commits

..

1 Commits

Author SHA1 Message Date
Viktor Liu
9fe4804a33 Expose relay transport and connection errors in status and metrics 2026-06-10 11:00:58 +02:00
22 changed files with 317 additions and 116 deletions

View File

@@ -880,25 +880,62 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
}
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
return err
}
if update.GetNetbirdConfig() != nil {
wCfg := update.GetNetbirdConfig()
err := e.updateTURNs(wCfg.GetTurns())
if err != nil {
return fmt.Errorf("update TURNs: %w", err)
}
// Posture checks are bound to the network map presence:
// NetworkMap != nil, checks present -> apply the received checks
// NetworkMap != nil, checks nil -> posture checks were removed, clear them
// NetworkMap == nil -> config-only update (e.g. relay token rotation),
// leave the previously applied checks untouched
nm := update.GetNetworkMap()
if nm == nil {
return nil
err = e.updateSTUNs(wCfg.GetStuns())
if err != nil {
return fmt.Errorf("update STUNs: %w", err)
}
var stunTurn []*stun.URI
stunTurn = append(stunTurn, e.STUNs...)
stunTurn = append(stunTurn, e.TURNs...)
e.stunTurn.Store(stunTurn)
err = e.handleRelayUpdate(wCfg.GetRelay())
if err != nil {
return err
}
err = e.handleFlowUpdate(wCfg.GetFlow())
if err != nil {
return fmt.Errorf("handle the flow configuration: %w", err)
}
if err := e.PopulateNetbirdConfig(wCfg, nil); err != nil {
log.Warnf("Failed to update DNS server config: %v", err)
}
// todo update signal
}
if err := e.updateChecksIfNew(update.Checks); err != nil {
return err
}
e.persistSyncResponse(update)
nm := update.GetNetworkMap()
if nm == nil {
return nil
}
// Persist sync response under the dedicated lock (syncRespMux), not under syncMsgMux.
// A non-nil syncStore is what marks persistence as enabled. Hold the lock for
// the whole Set so the store cannot be cleared (disabled / engine close)
// mid-call and have this write resurrect a file that was just removed.
e.syncRespMux.RLock()
if e.syncStore != nil {
if err := e.syncStore.Set(update); err != nil {
log.Errorf("failed to persist sync response: %v", err)
} else {
log.Debugf("sync response persisted with serial %d", nm.GetSerial())
}
}
e.syncRespMux.RUnlock()
// only apply new changes and ignore old ones
if err := e.updateNetworkMap(nm); err != nil {
@@ -910,64 +947,6 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
return nil
}
// updateNetbirdConfig applies the management-provided NetBird configuration:
// STUN/TURN and relay servers, flow logging and DNS settings. A nil config is a no-op,
// which is the case for sync updates carrying only a network map.
func (e *Engine) updateNetbirdConfig(wCfg *mgmProto.NetbirdConfig) error {
if wCfg == nil {
return nil
}
if err := e.updateTURNs(wCfg.GetTurns()); err != nil {
return fmt.Errorf("update TURNs: %w", err)
}
if err := e.updateSTUNs(wCfg.GetStuns()); err != nil {
return fmt.Errorf("update STUNs: %w", err)
}
var stunTurn []*stun.URI
stunTurn = append(stunTurn, e.STUNs...)
stunTurn = append(stunTurn, e.TURNs...)
e.stunTurn.Store(stunTurn)
if err := e.handleRelayUpdate(wCfg.GetRelay()); err != nil {
return err
}
if err := e.handleFlowUpdate(wCfg.GetFlow()); err != nil {
return fmt.Errorf("handle the flow configuration: %w", err)
}
if err := e.PopulateNetbirdConfig(wCfg, nil); err != nil {
log.Warnf("Failed to update DNS server config: %v", err)
}
// todo update signal
return nil
}
// persistSyncResponse stores the full sync response so it can be restored on the next
// startup. Persistence is enabled only when syncStore is set. The dedicated syncRespMux
// (not syncMsgMux) is held for the whole Set so the store cannot be cleared (disabled /
// engine close) mid-call and have this write resurrect a file that was just removed.
func (e *Engine) persistSyncResponse(update *mgmProto.SyncResponse) {
e.syncRespMux.RLock()
defer e.syncRespMux.RUnlock()
if e.syncStore == nil {
return
}
if err := e.syncStore.Set(update); err != nil {
log.Errorf("failed to persist sync response: %v", err)
return
}
log.Debugf("sync response persisted with serial %d", update.GetNetworkMap().GetSerial())
}
func (e *Engine) handleRelayUpdate(update *mgmProto.RelayConfig) error {
if update != nil {
// when we receive token we expect valid address list too

View File

@@ -1016,14 +1016,17 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
return d.relayStates
}
// extend the list of stun, turn servers with relay address
// extend the list of stun, turn servers with the relay server connections
relayStates := slices.Clone(d.relayStates)
// if the server connection is not established then we will use the general address
// in case of connection we will use the instance specific address
instanceAddr, _, err := d.relayMgr.RelayInstanceAddress()
if err != nil {
// TODO add their status
states := d.relayMgr.RelayStates()
if len(states) == 0 {
// no relay connection tracked yet; surface configured servers as
// unavailable with the real reconnect error when known
err := relayClient.ErrRelayClientNotConnected
if connErr := d.relayMgr.RelayConnectError(); connErr != nil {
err = connErr
}
for _, r := range d.relayMgr.ServerURLs() {
relayStates = append(relayStates, relay.ProbeResult{
URI: r,
@@ -1033,10 +1036,14 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
return relayStates
}
relayState := relay.ProbeResult{
URI: instanceAddr,
for _, rs := range states {
relayStates = append(relayStates, relay.ProbeResult{
URI: rs.URL,
Err: rs.Err,
Transport: rs.Transport,
})
}
return append(relayStates, relayState)
return relayStates
}
func (d *Status) ForwardingRules() []firewall.ForwardRule {
@@ -1397,6 +1404,7 @@ func (fs FullStatus) ToProto() *proto.FullStatus {
pbRelayState := &proto.RelayState{
URI: relayState.URI,
Available: relayState.Err == nil,
Transport: relayState.Transport,
}
if err := relayState.Err; err != nil {
pbRelayState.Error = err.Error()

View File

@@ -32,6 +32,9 @@ type ProbeResult struct {
URI string
Err error
Addr string
// Transport is the negotiated relay transport, empty
// for stun/turn probes or when not connected.
Transport string
}
type StunTurnProbe struct {

View File

@@ -1840,6 +1840,7 @@ type RelayState struct {
URI string `protobuf:"bytes,1,opt,name=URI,proto3" json:"URI,omitempty"`
Available bool `protobuf:"varint,2,opt,name=available,proto3" json:"available,omitempty"`
Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"`
Transport string `protobuf:"bytes,4,opt,name=transport,proto3" json:"transport,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@@ -1895,6 +1896,13 @@ func (x *RelayState) GetError() string {
return ""
}
func (x *RelayState) GetTransport() string {
if x != nil {
return x.Transport
}
return ""
}
type NSGroupState struct {
state protoimpl.MessageState `protogen:"open.v1"`
Servers []string `protobuf:"bytes,1,rep,name=servers,proto3" json:"servers,omitempty"`
@@ -6423,12 +6431,13 @@ const file_daemon_proto_rawDesc = "" +
"\x0fManagementState\x12\x10\n" +
"\x03URL\x18\x01 \x01(\tR\x03URL\x12\x1c\n" +
"\tconnected\x18\x02 \x01(\bR\tconnected\x12\x14\n" +
"\x05error\x18\x03 \x01(\tR\x05error\"R\n" +
"\x05error\x18\x03 \x01(\tR\x05error\"p\n" +
"\n" +
"RelayState\x12\x10\n" +
"\x03URI\x18\x01 \x01(\tR\x03URI\x12\x1c\n" +
"\tavailable\x18\x02 \x01(\bR\tavailable\x12\x14\n" +
"\x05error\x18\x03 \x01(\tR\x05error\"r\n" +
"\x05error\x18\x03 \x01(\tR\x05error\x12\x1c\n" +
"\ttransport\x18\x04 \x01(\tR\ttransport\"r\n" +
"\fNSGroupState\x12\x18\n" +
"\aservers\x18\x01 \x03(\tR\aservers\x12\x18\n" +
"\adomains\x18\x02 \x03(\tR\adomains\x12\x18\n" +

View File

@@ -371,6 +371,7 @@ message RelayState {
string URI = 1;
bool available = 2;
string error = 3;
string transport = 4;
}
message NSGroupState {

View File

@@ -98,6 +98,7 @@ type RelayStateOutputDetail struct {
URI string `json:"uri" yaml:"uri"`
Available bool `json:"available" yaml:"available"`
Error string `json:"error" yaml:"error"`
Transport string `json:"transport,omitempty" yaml:"transport,omitempty"`
}
type RelayStateOutput struct {
@@ -219,7 +220,8 @@ func mapRelays(relays []*proto.RelayState) RelayStateOutput {
RelayStateOutputDetail{
URI: relay.URI,
Available: available,
Error: relay.GetError(),
Error: relayErrorString(relay.GetError()),
Transport: relay.GetTransport(),
},
)
@@ -235,6 +237,12 @@ func mapRelays(relays []*proto.RelayState) RelayStateOutput {
}
}
// relayErrorString flattens a newline-joined aggregated relay error onto a
// single line for status output.
func relayErrorString(s string) string {
return strings.ReplaceAll(s, "\n", "; ")
}
func mapNSGroups(servers []*proto.NSGroupState) []NsServerGroupStateOutput {
mappedNSGroups := make([]NsServerGroupStateOutput, 0, len(servers))
for _, pbNsGroupServer := range servers {
@@ -441,6 +449,8 @@ func (o *OutputOverview) GeneralSummary(showURL bool, showRelays bool, showNameS
available = "Unavailable"
reason = fmt.Sprintf(", reason: %s", relay.Error)
}
} else if relay.Transport != "" {
available = fmt.Sprintf("%s via %s", available, relay.Transport)
}
relaysString += fmt.Sprintf("\n [%s] is %s%s", relay.URI, available, reason)

View File

@@ -647,3 +647,13 @@ func TestTimeAgo(t *testing.T) {
})
}
}
func TestMapRelaysTransport(t *testing.T) {
out := mapRelays([]*proto.RelayState{
{URI: "rels://relay.example:443", Available: true, Transport: "quic"},
{URI: "rels://relay2.example:443", Available: true, Transport: "ws"},
})
require.Len(t, out.Details, 2)
assert.Equal(t, "quic", out.Details[0].Transport)
assert.Equal(t, "ws", out.Details[1].Transport)
}

View File

@@ -6,6 +6,7 @@ import (
"time"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
@@ -119,8 +120,8 @@ func NewMetrics(ctx context.Context, meter metric.Meter) (*Metrics, error) {
}
// PeerConnected increments the number of connected peers and increments number of idle connections
func (m *Metrics) PeerConnected(id string) {
m.peers.Add(m.ctx, 1)
func (m *Metrics) PeerConnected(id, transport string) {
m.peers.Add(m.ctx, 1, metric.WithAttributes(attribute.String("transport", transport)))
m.mutexActivity.Lock()
defer m.mutexActivity.Unlock()
@@ -138,8 +139,8 @@ func (m *Metrics) RecordPeerStoreTime(duration time.Duration) {
}
// PeerDisconnected decrements the number of connected peers and decrements number of idle or active connections
func (m *Metrics) PeerDisconnected(id string) {
m.peers.Add(m.ctx, -1)
func (m *Metrics) PeerDisconnected(id, transport string) {
m.peers.Add(m.ctx, -1, metric.WithAttributes(attribute.String("transport", transport)))
m.mutexActivity.Lock()
defer m.mutexActivity.Unlock()

View File

@@ -11,4 +11,6 @@ type Conn interface {
Write(ctx context.Context, b []byte) (n int, err error)
RemoteAddr() net.Addr
Close() error
// Protocol returns the transport name.
Protocol() string
}

View File

@@ -42,6 +42,11 @@ func (c *Conn) RemoteAddr() net.Addr {
return c.session.RemoteAddr()
}
// Protocol returns the transport name for this connection.
func (c *Conn) Protocol() string {
return "quic"
}
func (c *Conn) Close() error {
c.closedMu.Lock()
if c.closed {

View File

@@ -64,6 +64,11 @@ func (c *Conn) RemoteAddr() net.Addr {
return c.rAddr
}
// Protocol returns the transport name for this connection.
func (c *Conn) Protocol() string {
return "ws"
}
func (c *Conn) Close() error {
c.closedMu.Lock()
c.closed = true

View File

@@ -154,15 +154,16 @@ func (r *Relay) Accept(conn listener.Conn) {
}
r.notifier.PeerCameOnline(peer.ID())
transport := conn.Protocol()
r.metrics.RecordPeerStoreTime(time.Since(storeTime))
r.metrics.PeerConnected(peer.String())
r.metrics.PeerConnected(peer.String(), transport)
go func() {
peer.Work()
if deleted := r.store.DeletePeer(peer); deleted {
r.notifier.PeerWentOffline(peer.ID())
}
peer.log.Debugf("relay connection closed")
r.metrics.PeerDisconnected(peer.String())
r.metrics.PeerDisconnected(peer.String(), transport)
}()
if err := h.handshakeResponse(hsCtx); err != nil {

View File

@@ -145,6 +145,11 @@ func (cc *connContainer) close() {
}
}
// transportConn is implemented by relay connections that know their transport.
type transportConn interface {
Protocol() string
}
// Client is a client for the relay server. It is responsible for establishing a connection to the relay server and
// managing connections to other peers. All exported functions are safe to call concurrently. After close the connection,
// the client can be reused by calling Connect again. When the client is closed, all connections are closed too.
@@ -182,6 +187,18 @@ type Client struct {
// datagramFallbackTriggered guards a single fallback per connection so a
// burst of oversized datagrams triggers one reconnect, not many.
datagramFallbackTriggered atomic.Bool
// transport is the negotiated relay transport of the
// current connection, guarded by mu.
transport string
}
// Transport returns the negotiated relay transport of the current connection,
// or an empty string when not connected.
func (c *Client) Transport() string {
c.mu.Lock()
defer c.mu.Unlock()
return c.transport
}
// SetTransportFallback wires the shared datagram-transport fallback tracker.
@@ -402,6 +419,9 @@ func (c *Client) connect(ctx context.Context) (*RelayAddr, error) {
}
c.relayConn = conn
c.datagramFallbackTriggered.Store(false)
if tc, ok := conn.(transportConn); ok {
c.transport = tc.Protocol()
}
instanceURL, err := c.handShake(ctx)
if err != nil {
@@ -792,6 +812,7 @@ func (c *Client) close(gracefullyExit bool) error {
return nil
}
c.serviceIsRunning = false
c.transport = ""
c.muInstanceURL.Lock()
c.instanceURL = nil

View File

@@ -57,6 +57,11 @@ func (c *Conn) Write(b []byte) (int, error) {
return len(b), nil
}
// Protocol returns the transport name for this connection.
func (c *Conn) Protocol() string {
return Network
}
func (c *Conn) RemoteAddr() net.Addr {
return c.session.RemoteAddr()
}

View File

@@ -2,7 +2,6 @@ package quic
import (
"context"
"errors"
"fmt"
"net"
"strings"
@@ -59,22 +58,16 @@ func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn,
udpConn, err := nbnet.ListenUDP("udp", &net.UDPAddr{Port: 0})
if err != nil {
log.Errorf("failed to listen on UDP: %s", err)
return nil, err
return nil, fmt.Errorf("listen udp: %w", err)
}
udpAddr, err := net.ResolveUDPAddr("udp", quicURL)
if err != nil {
log.Errorf("failed to resolve UDP address: %s", err)
return nil, err
return nil, fmt.Errorf("resolve %s: %w", quicURL, err)
}
session, err := quic.Dial(ctx, udpConn, udpAddr, tlsClientConfig, quicConfig)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil, err
}
log.Errorf("failed to dial to Relay server via QUIC '%s': %s", quicURL, err)
return nil, err
}

View File

@@ -3,6 +3,7 @@ package dialer
import (
"context"
"errors"
"fmt"
"net"
"time"
@@ -71,6 +72,7 @@ func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) {
connChan := make(chan dialResult, len(r.dialerFns))
winnerConn := make(chan net.Conn, 1)
errChan := make(chan error, 1)
abortCtx, abort := context.WithCancel(ctx)
defer abort()
@@ -78,11 +80,11 @@ func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) {
go r.dial(dfn, abortCtx, connChan)
}
go r.processResults(connChan, winnerConn, abort)
go r.processResults(connChan, winnerConn, errChan, abort)
conn, ok := <-winnerConn
if !ok {
return nil, errors.New("failed to dial to Relay server on any protocol")
return nil, <-errChan
}
return conn, nil
}
@@ -90,6 +92,7 @@ func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) {
// dialSequential tries each dialer in order, returning the first connection and
// falling back to the next on failure.
func (r *RaceDial) dialSequential(ctx context.Context) (net.Conn, error) {
var errs []error
for _, dfn := range r.dialerFns {
if err := ctx.Err(); err != nil {
return nil, err
@@ -103,12 +106,13 @@ func (r *RaceDial) dialSequential(ctx context.Context) (net.Conn, error) {
return nil, err
}
r.log.Errorf("failed to dial via %s: %s", dfn.Protocol(), err)
errs = append(errs, fmt.Errorf("%s: %w", dfn.Protocol(), err))
continue
}
r.log.Infof("successfully dialed via: %s", dfn.Protocol())
return conn, nil
}
return nil, errors.New("failed to dial to Relay server on any protocol")
return nil, dialErr(errs)
}
func (r *RaceDial) dial(dfn DialeFn, abortCtx context.Context, connChan chan dialResult) {
@@ -120,8 +124,9 @@ func (r *RaceDial) dial(dfn DialeFn, abortCtx context.Context, connChan chan dia
connChan <- dialResult{Conn: conn, Protocol: dfn.Protocol(), Err: err}
}
func (r *RaceDial) processResults(connChan chan dialResult, winnerConn chan net.Conn, abort context.CancelFunc) {
func (r *RaceDial) processResults(connChan chan dialResult, winnerConn chan net.Conn, errChan chan error, abort context.CancelFunc) {
var hasWinner bool
errsByProtocol := make(map[string]error)
for i := 0; i < len(r.dialerFns); i++ {
dr := <-connChan
if dr.Err != nil {
@@ -129,6 +134,7 @@ func (r *RaceDial) processResults(connChan chan dialResult, winnerConn chan net.
r.log.Infof("connection attempt aborted via: %s", dr.Protocol)
} else {
r.log.Errorf("failed to dial via %s: %s", dr.Protocol, dr.Err)
errsByProtocol[dr.Protocol] = fmt.Errorf("%s: %w", dr.Protocol, dr.Err)
}
continue
}
@@ -146,5 +152,29 @@ func (r *RaceDial) processResults(connChan chan dialResult, winnerConn chan net.
hasWinner = true
winnerConn <- dr.Conn
}
if !hasWinner {
errChan <- dialErr(r.orderedErrs(errsByProtocol))
}
close(winnerConn)
}
// orderedErrs returns the per-protocol errors in dialer order, so the combined
// error is stable regardless of which attempt failed first.
func (r *RaceDial) orderedErrs(byProtocol map[string]error) []error {
errs := make([]error, 0, len(byProtocol))
for _, dfn := range r.dialerFns {
if err, ok := byProtocol[dfn.Protocol()]; ok {
errs = append(errs, err)
}
}
return errs
}
// dialErr combines per-dialer failures, preserving the underlying reasons
// (e.g. "connection refused") rather than a generic message.
func dialErr(errs []error) error {
if len(errs) == 0 {
return errors.New("no relay transport available")
}
return errors.Join(errs...)
}

View File

@@ -33,6 +33,11 @@ func NewConn(wsConn *websocket.Conn, serverAddress string, underlying net.Conn)
}
}
// Protocol returns the transport name for this connection.
func (c *Conn) Protocol() string {
return Network
}
func (c *Conn) Read(b []byte) (n int, err error) {
t, ioReader, err := c.Conn.Reader(c.ctx)
if err != nil {

View File

@@ -22,7 +22,7 @@ type Dialer struct {
}
func (d Dialer) Protocol() string {
return "WS"
return Network
}
func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn, error) {
@@ -39,7 +39,12 @@ func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn,
if errors.Is(err, context.Canceled) {
return nil, err
}
log.Errorf("failed to dial to Relay server '%s': %s", wsURL, err)
// websocket.Dial wraps the cause in verbose layers; surface the
// underlying network error when present.
var opErr *net.OpError
if errors.As(err, &opErr) {
return nil, opErr
}
return nil, err
}
if resp.Body != nil {

View File

@@ -41,14 +41,14 @@ func TestGetDialers(t *testing.T) {
preferWS bool
want []string
}{
{name: "auto races quic and ws", mode: "auto", mtu: iface.DefaultMTU, want: []string{"quic", "WS"}},
{name: "ws pinned", mode: "ws", mtu: iface.DefaultMTU, want: []string{"WS"}},
{name: "auto races quic and ws", mode: "auto", mtu: iface.DefaultMTU, want: []string{"quic", "ws"}},
{name: "ws pinned", mode: "ws", mtu: iface.DefaultMTU, want: []string{"ws"}},
{name: "quic pinned", mode: "quic", mtu: iface.DefaultMTU, want: []string{"quic"}},
{name: "prefer-quic orders quic first", mode: "prefer-quic", mtu: iface.DefaultMTU, want: []string{"quic", "WS"}},
{name: "prefer-ws orders ws first", mode: "prefer-ws", mtu: iface.DefaultMTU, want: []string{"WS", "quic"}},
{name: "mtu above default forces ws", mode: "auto", mtu: iface.DefaultMTU + 100, want: []string{"WS"}},
{name: "sticky fallback forces ws in auto", mode: "auto", mtu: iface.DefaultMTU, preferWS: true, want: []string{"WS"}},
{name: "sticky fallback forces ws in prefer-quic", mode: "prefer-quic", mtu: iface.DefaultMTU, preferWS: true, want: []string{"WS"}},
{name: "prefer-quic orders quic first", mode: "prefer-quic", mtu: iface.DefaultMTU, want: []string{"quic", "ws"}},
{name: "prefer-ws orders ws first", mode: "prefer-ws", mtu: iface.DefaultMTU, want: []string{"ws", "quic"}},
{name: "mtu above default forces ws", mode: "auto", mtu: iface.DefaultMTU + 100, want: []string{"ws"}},
{name: "sticky fallback forces ws in auto", mode: "auto", mtu: iface.DefaultMTU, preferWS: true, want: []string{"ws"}},
{name: "sticky fallback forces ws in prefer-quic", mode: "prefer-quic", mtu: iface.DefaultMTU, preferWS: true, want: []string{"ws"}},
{name: "quic pin overrides sticky fallback", mode: "quic", mtu: iface.DefaultMTU, preferWS: true, want: []string{"quic"}},
}
@@ -91,11 +91,11 @@ func TestStickyFallbackAfterDatagramTooLarge(t *testing.T) {
}
// First dial races both transports.
assert.Equal(t, []string{"quic", "WS"}, protocols(c.getDialers(transportModeFromEnv())))
assert.Equal(t, []string{"quic", "ws"}, protocols(c.getDialers(transportModeFromEnv())))
// An oversized datagram records the fallback for this server.
c.onDatagramTooLarge(&closeTrackingConn{}, netErr.ErrDatagramTooLarge)
// The reconnect now sticks to WebSocket.
assert.Equal(t, []string{"WS"}, protocols(c.getDialers(transportModeFromEnv())))
assert.Equal(t, []string{"ws"}, protocols(c.getDialers(transportModeFromEnv())))
}

View File

@@ -2,6 +2,7 @@ package client
import (
"context"
"sync/atomic"
"time"
"github.com/cenkalti/backoff/v4"
@@ -20,6 +21,10 @@ type Guard struct {
// maxBackoffInterval caps the exponential backoff between reconnect
// attempts.
maxBackoffInterval time.Duration
// lastErr is the error from the most recent failed reconnect attempt,
// surfaced as the home relay status while disconnected.
lastErr atomic.Pointer[error]
}
// NewGuard creates a new guard for the relay client. A non-positive
@@ -37,6 +42,19 @@ func NewGuard(sp *ServerPicker, maxBackoffInterval time.Duration) *Guard {
return g
}
// LastError returns the error from the most recent failed reconnect attempt, or
// nil if reconnection last succeeded.
func (g *Guard) LastError() error {
if p := g.lastErr.Load(); p != nil {
return *p
}
return nil
}
func (g *Guard) setLastError(err error) {
g.lastErr.Store(&err)
}
// StartReconnectTrys is called when the relay client is disconnected from the relay server.
// It attempts to reconnect to the relay server. The function first tries a quick reconnect
// to the same server that was used before, if the server URL is still valid. If the quick
@@ -63,6 +81,7 @@ func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
case <-ticker.C:
if err := g.retry(ctx); err != nil {
log.Errorf("failed to pick new Relay server: %s", err)
g.setLastError(err)
continue
}
return
@@ -89,6 +108,7 @@ func (g *Guard) tryToQuickReconnect(parentCtx context.Context, rc *Client) bool
if err := rc.Connect(parentCtx); err != nil {
log.Errorf("failed to reconnect to relay server: %s", err)
g.setLastError(err)
return false
}
return true
@@ -100,6 +120,7 @@ func (g *Guard) retry(ctx context.Context) error {
if err != nil {
return err
}
g.setLastError(nil)
// prevent to work with a deprecated Relay client instance
g.drainRelayClientChan()
@@ -125,6 +146,7 @@ func (g *Guard) isServerURLStillValid(rc *Client) bool {
}
func (g *Guard) notifyReconnected() {
g.setLastError(nil)
select {
case g.OnReconnected <- struct{}{}:
default:

View File

@@ -130,6 +130,9 @@ func (m *Manager) Serve() error {
client, err := m.serverPicker.PickServer(m.ctx)
if err != nil {
// record the initial failure so status shows the real reason before
// the guard's first retry tick
m.reconnectGuard.setLastError(err)
go m.reconnectGuard.StartReconnectTrys(m.ctx, nil)
} else {
m.storeClient(client)
@@ -242,6 +245,67 @@ func (m *Manager) ServerURLs() []string {
return m.serverPicker.ServerURLs.Load().([]string)
}
// RelayConnectError returns the error from the most recent failed home relay
// reconnect attempt, or nil if the relay last connected successfully.
func (m *Manager) RelayConnectError() error {
return m.reconnectGuard.LastError()
}
// RelayConnState is the connection state of a single relay server.
type RelayConnState struct {
// URL is the server's instance address when connected, otherwise the
// configured server URL.
URL string
// Transport is the negotiated transport, empty if not connected.
Transport string
// Err is set when the relay is not connected.
Err error
}
// RelayStates returns the connection state of the home relay and every foreign
// relay the manager currently tracks.
func (m *Manager) RelayStates() []RelayConnState {
var states []RelayConnState
m.relayClientMu.RLock()
home := m.relayClient
m.relayClientMu.RUnlock()
if home != nil {
st := relayConnState(home)
// The home relay reconnects through the guard, so the real failure
// reason lives there rather than on the (stale) client.
if st.Err != nil {
if gErr := m.reconnectGuard.LastError(); gErr != nil {
st.Err = gErr
}
}
states = append(states, st)
}
// Snapshot the tracks, then query each outside the map lock: a track can be
// held by an in-progress Connect, and blocking on it must not stall other
// relay operations.
m.relayClientsMutex.RLock()
tracks := make([]*RelayTrack, 0, len(m.relayClients))
for _, rt := range m.relayClients {
tracks = append(tracks, rt)
}
m.relayClientsMutex.RUnlock()
// Only connected foreign relays carry state; a failed connect is evicted
// immediately (openConnVia), so there is no error state to surface.
for _, rt := range tracks {
rt.RLock()
rc := rt.relayClient
rt.RUnlock()
if rc != nil {
states = append(states, relayConnState(rc))
}
}
return states
}
// HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with
// Relay service.
func (m *Manager) HasRelayAddress() bool {
@@ -460,3 +524,11 @@ func (m *Manager) notifyOnDisconnectListeners(serverAddress string) {
}
delete(m.onDisconnectedListeners, serverAddress)
}
func relayConnState(c *Client) RelayConnState {
addr, err := c.ServerInstanceURL()
if err != nil {
return RelayConnState{URL: c.connectionURL, Err: err}
}
return RelayConnState{URL: addr, Transport: c.Transport()}
}

View File

@@ -40,6 +40,7 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
connResultChan := make(chan connResult, totalServers)
successChan := make(chan connResult, 1)
errChan := make(chan error, 1)
concurrentLimiter := make(chan struct{}, maxConcurrentServers)
log.Debugf("pick server from list: %v", sp.ServerURLs.Load().([]string))
@@ -54,17 +55,17 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
}(url)
}
go sp.processConnResults(connResultChan, successChan)
go sp.processConnResults(connResultChan, successChan, errChan)
select {
case cr, ok := <-successChan:
if !ok {
return nil, errors.New("failed to connect to any relay server: all attempts failed")
return nil, <-errChan
}
log.Infof("chosen home Relay server: %s", cr.Url)
return cr.RelayClient, nil
case <-ctx.Done():
return nil, fmt.Errorf("failed to connect to any relay server: %w", ctx.Err())
return nil, fmt.Errorf("connect to relay server: %w", ctx.Err())
}
}
@@ -80,12 +81,14 @@ func (sp *ServerPicker) startConnection(ctx context.Context, resultChan chan con
}
}
func (sp *ServerPicker) processConnResults(resultChan chan connResult, successChan chan connResult) {
func (sp *ServerPicker) processConnResults(resultChan chan connResult, successChan chan connResult, errChan chan error) {
var hasSuccess bool
var errs []error
for numOfResults := 0; numOfResults < cap(resultChan); numOfResults++ {
cr := <-resultChan
if cr.Err != nil {
log.Tracef("failed to connect to Relay server: %s: %v", cr.Url, cr.Err)
errs = append(errs, cr.Err)
continue
}
log.Infof("connected to Relay server: %s", cr.Url)
@@ -101,5 +104,16 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh
hasSuccess = true
successChan <- cr
}
if !hasSuccess {
errChan <- pickErr(errs)
}
close(successChan)
}
// pickErr combines per-server connection failures into a single error.
func pickErr(errs []error) error {
if len(errs) == 0 {
return errors.New("no relay server available")
}
return errors.Join(errs...)
}