mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-10 09:59:55 +00:00
Compare commits
1 Commits
main
...
relay-tran
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9fe4804a33 |
@@ -880,25 +880,62 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
|
||||
}
|
||||
|
||||
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
|
||||
return err
|
||||
}
|
||||
if update.GetNetbirdConfig() != nil {
|
||||
wCfg := update.GetNetbirdConfig()
|
||||
err := e.updateTURNs(wCfg.GetTurns())
|
||||
if err != nil {
|
||||
return fmt.Errorf("update TURNs: %w", err)
|
||||
}
|
||||
|
||||
// Posture checks are bound to the network map presence:
|
||||
// NetworkMap != nil, checks present -> apply the received checks
|
||||
// NetworkMap != nil, checks nil -> posture checks were removed, clear them
|
||||
// NetworkMap == nil -> config-only update (e.g. relay token rotation),
|
||||
// leave the previously applied checks untouched
|
||||
nm := update.GetNetworkMap()
|
||||
if nm == nil {
|
||||
return nil
|
||||
err = e.updateSTUNs(wCfg.GetStuns())
|
||||
if err != nil {
|
||||
return fmt.Errorf("update STUNs: %w", err)
|
||||
}
|
||||
|
||||
var stunTurn []*stun.URI
|
||||
stunTurn = append(stunTurn, e.STUNs...)
|
||||
stunTurn = append(stunTurn, e.TURNs...)
|
||||
e.stunTurn.Store(stunTurn)
|
||||
|
||||
err = e.handleRelayUpdate(wCfg.GetRelay())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = e.handleFlowUpdate(wCfg.GetFlow())
|
||||
if err != nil {
|
||||
return fmt.Errorf("handle the flow configuration: %w", err)
|
||||
}
|
||||
|
||||
if err := e.PopulateNetbirdConfig(wCfg, nil); err != nil {
|
||||
log.Warnf("Failed to update DNS server config: %v", err)
|
||||
}
|
||||
|
||||
// todo update signal
|
||||
}
|
||||
|
||||
if err := e.updateChecksIfNew(update.Checks); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
e.persistSyncResponse(update)
|
||||
nm := update.GetNetworkMap()
|
||||
if nm == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Persist sync response under the dedicated lock (syncRespMux), not under syncMsgMux.
|
||||
// A non-nil syncStore is what marks persistence as enabled. Hold the lock for
|
||||
// the whole Set so the store cannot be cleared (disabled / engine close)
|
||||
// mid-call and have this write resurrect a file that was just removed.
|
||||
e.syncRespMux.RLock()
|
||||
if e.syncStore != nil {
|
||||
if err := e.syncStore.Set(update); err != nil {
|
||||
log.Errorf("failed to persist sync response: %v", err)
|
||||
} else {
|
||||
log.Debugf("sync response persisted with serial %d", nm.GetSerial())
|
||||
}
|
||||
}
|
||||
e.syncRespMux.RUnlock()
|
||||
|
||||
// only apply new changes and ignore old ones
|
||||
if err := e.updateNetworkMap(nm); err != nil {
|
||||
@@ -910,64 +947,6 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateNetbirdConfig applies the management-provided NetBird configuration:
|
||||
// STUN/TURN and relay servers, flow logging and DNS settings. A nil config is a no-op,
|
||||
// which is the case for sync updates carrying only a network map.
|
||||
func (e *Engine) updateNetbirdConfig(wCfg *mgmProto.NetbirdConfig) error {
|
||||
if wCfg == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := e.updateTURNs(wCfg.GetTurns()); err != nil {
|
||||
return fmt.Errorf("update TURNs: %w", err)
|
||||
}
|
||||
|
||||
if err := e.updateSTUNs(wCfg.GetStuns()); err != nil {
|
||||
return fmt.Errorf("update STUNs: %w", err)
|
||||
}
|
||||
|
||||
var stunTurn []*stun.URI
|
||||
stunTurn = append(stunTurn, e.STUNs...)
|
||||
stunTurn = append(stunTurn, e.TURNs...)
|
||||
e.stunTurn.Store(stunTurn)
|
||||
|
||||
if err := e.handleRelayUpdate(wCfg.GetRelay()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := e.handleFlowUpdate(wCfg.GetFlow()); err != nil {
|
||||
return fmt.Errorf("handle the flow configuration: %w", err)
|
||||
}
|
||||
|
||||
if err := e.PopulateNetbirdConfig(wCfg, nil); err != nil {
|
||||
log.Warnf("Failed to update DNS server config: %v", err)
|
||||
}
|
||||
|
||||
// todo update signal
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// persistSyncResponse stores the full sync response so it can be restored on the next
|
||||
// startup. Persistence is enabled only when syncStore is set. The dedicated syncRespMux
|
||||
// (not syncMsgMux) is held for the whole Set so the store cannot be cleared (disabled /
|
||||
// engine close) mid-call and have this write resurrect a file that was just removed.
|
||||
func (e *Engine) persistSyncResponse(update *mgmProto.SyncResponse) {
|
||||
e.syncRespMux.RLock()
|
||||
defer e.syncRespMux.RUnlock()
|
||||
|
||||
if e.syncStore == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if err := e.syncStore.Set(update); err != nil {
|
||||
log.Errorf("failed to persist sync response: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
log.Debugf("sync response persisted with serial %d", update.GetNetworkMap().GetSerial())
|
||||
}
|
||||
|
||||
func (e *Engine) handleRelayUpdate(update *mgmProto.RelayConfig) error {
|
||||
if update != nil {
|
||||
// when we receive token we expect valid address list too
|
||||
|
||||
@@ -1016,14 +1016,17 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
|
||||
return d.relayStates
|
||||
}
|
||||
|
||||
// extend the list of stun, turn servers with relay address
|
||||
// extend the list of stun, turn servers with the relay server connections
|
||||
relayStates := slices.Clone(d.relayStates)
|
||||
|
||||
// if the server connection is not established then we will use the general address
|
||||
// in case of connection we will use the instance specific address
|
||||
instanceAddr, _, err := d.relayMgr.RelayInstanceAddress()
|
||||
if err != nil {
|
||||
// TODO add their status
|
||||
states := d.relayMgr.RelayStates()
|
||||
if len(states) == 0 {
|
||||
// no relay connection tracked yet; surface configured servers as
|
||||
// unavailable with the real reconnect error when known
|
||||
err := relayClient.ErrRelayClientNotConnected
|
||||
if connErr := d.relayMgr.RelayConnectError(); connErr != nil {
|
||||
err = connErr
|
||||
}
|
||||
for _, r := range d.relayMgr.ServerURLs() {
|
||||
relayStates = append(relayStates, relay.ProbeResult{
|
||||
URI: r,
|
||||
@@ -1033,10 +1036,14 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
|
||||
return relayStates
|
||||
}
|
||||
|
||||
relayState := relay.ProbeResult{
|
||||
URI: instanceAddr,
|
||||
for _, rs := range states {
|
||||
relayStates = append(relayStates, relay.ProbeResult{
|
||||
URI: rs.URL,
|
||||
Err: rs.Err,
|
||||
Transport: rs.Transport,
|
||||
})
|
||||
}
|
||||
return append(relayStates, relayState)
|
||||
return relayStates
|
||||
}
|
||||
|
||||
func (d *Status) ForwardingRules() []firewall.ForwardRule {
|
||||
@@ -1397,6 +1404,7 @@ func (fs FullStatus) ToProto() *proto.FullStatus {
|
||||
pbRelayState := &proto.RelayState{
|
||||
URI: relayState.URI,
|
||||
Available: relayState.Err == nil,
|
||||
Transport: relayState.Transport,
|
||||
}
|
||||
if err := relayState.Err; err != nil {
|
||||
pbRelayState.Error = err.Error()
|
||||
|
||||
@@ -32,6 +32,9 @@ type ProbeResult struct {
|
||||
URI string
|
||||
Err error
|
||||
Addr string
|
||||
// Transport is the negotiated relay transport, empty
|
||||
// for stun/turn probes or when not connected.
|
||||
Transport string
|
||||
}
|
||||
|
||||
type StunTurnProbe struct {
|
||||
|
||||
@@ -1840,6 +1840,7 @@ type RelayState struct {
|
||||
URI string `protobuf:"bytes,1,opt,name=URI,proto3" json:"URI,omitempty"`
|
||||
Available bool `protobuf:"varint,2,opt,name=available,proto3" json:"available,omitempty"`
|
||||
Error string `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"`
|
||||
Transport string `protobuf:"bytes,4,opt,name=transport,proto3" json:"transport,omitempty"`
|
||||
unknownFields protoimpl.UnknownFields
|
||||
sizeCache protoimpl.SizeCache
|
||||
}
|
||||
@@ -1895,6 +1896,13 @@ func (x *RelayState) GetError() string {
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *RelayState) GetTransport() string {
|
||||
if x != nil {
|
||||
return x.Transport
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type NSGroupState struct {
|
||||
state protoimpl.MessageState `protogen:"open.v1"`
|
||||
Servers []string `protobuf:"bytes,1,rep,name=servers,proto3" json:"servers,omitempty"`
|
||||
@@ -6423,12 +6431,13 @@ const file_daemon_proto_rawDesc = "" +
|
||||
"\x0fManagementState\x12\x10\n" +
|
||||
"\x03URL\x18\x01 \x01(\tR\x03URL\x12\x1c\n" +
|
||||
"\tconnected\x18\x02 \x01(\bR\tconnected\x12\x14\n" +
|
||||
"\x05error\x18\x03 \x01(\tR\x05error\"R\n" +
|
||||
"\x05error\x18\x03 \x01(\tR\x05error\"p\n" +
|
||||
"\n" +
|
||||
"RelayState\x12\x10\n" +
|
||||
"\x03URI\x18\x01 \x01(\tR\x03URI\x12\x1c\n" +
|
||||
"\tavailable\x18\x02 \x01(\bR\tavailable\x12\x14\n" +
|
||||
"\x05error\x18\x03 \x01(\tR\x05error\"r\n" +
|
||||
"\x05error\x18\x03 \x01(\tR\x05error\x12\x1c\n" +
|
||||
"\ttransport\x18\x04 \x01(\tR\ttransport\"r\n" +
|
||||
"\fNSGroupState\x12\x18\n" +
|
||||
"\aservers\x18\x01 \x03(\tR\aservers\x12\x18\n" +
|
||||
"\adomains\x18\x02 \x03(\tR\adomains\x12\x18\n" +
|
||||
|
||||
@@ -371,6 +371,7 @@ message RelayState {
|
||||
string URI = 1;
|
||||
bool available = 2;
|
||||
string error = 3;
|
||||
string transport = 4;
|
||||
}
|
||||
|
||||
message NSGroupState {
|
||||
|
||||
@@ -98,6 +98,7 @@ type RelayStateOutputDetail struct {
|
||||
URI string `json:"uri" yaml:"uri"`
|
||||
Available bool `json:"available" yaml:"available"`
|
||||
Error string `json:"error" yaml:"error"`
|
||||
Transport string `json:"transport,omitempty" yaml:"transport,omitempty"`
|
||||
}
|
||||
|
||||
type RelayStateOutput struct {
|
||||
@@ -219,7 +220,8 @@ func mapRelays(relays []*proto.RelayState) RelayStateOutput {
|
||||
RelayStateOutputDetail{
|
||||
URI: relay.URI,
|
||||
Available: available,
|
||||
Error: relay.GetError(),
|
||||
Error: relayErrorString(relay.GetError()),
|
||||
Transport: relay.GetTransport(),
|
||||
},
|
||||
)
|
||||
|
||||
@@ -235,6 +237,12 @@ func mapRelays(relays []*proto.RelayState) RelayStateOutput {
|
||||
}
|
||||
}
|
||||
|
||||
// relayErrorString flattens a newline-joined aggregated relay error onto a
|
||||
// single line for status output.
|
||||
func relayErrorString(s string) string {
|
||||
return strings.ReplaceAll(s, "\n", "; ")
|
||||
}
|
||||
|
||||
func mapNSGroups(servers []*proto.NSGroupState) []NsServerGroupStateOutput {
|
||||
mappedNSGroups := make([]NsServerGroupStateOutput, 0, len(servers))
|
||||
for _, pbNsGroupServer := range servers {
|
||||
@@ -441,6 +449,8 @@ func (o *OutputOverview) GeneralSummary(showURL bool, showRelays bool, showNameS
|
||||
available = "Unavailable"
|
||||
reason = fmt.Sprintf(", reason: %s", relay.Error)
|
||||
}
|
||||
} else if relay.Transport != "" {
|
||||
available = fmt.Sprintf("%s via %s", available, relay.Transport)
|
||||
}
|
||||
|
||||
relaysString += fmt.Sprintf("\n [%s] is %s%s", relay.URI, available, reason)
|
||||
|
||||
@@ -647,3 +647,13 @@ func TestTimeAgo(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestMapRelaysTransport(t *testing.T) {
|
||||
out := mapRelays([]*proto.RelayState{
|
||||
{URI: "rels://relay.example:443", Available: true, Transport: "quic"},
|
||||
{URI: "rels://relay2.example:443", Available: true, Transport: "ws"},
|
||||
})
|
||||
require.Len(t, out.Details, 2)
|
||||
assert.Equal(t, "quic", out.Details[0].Transport)
|
||||
assert.Equal(t, "ws", out.Details[1].Transport)
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
@@ -119,8 +120,8 @@ func NewMetrics(ctx context.Context, meter metric.Meter) (*Metrics, error) {
|
||||
}
|
||||
|
||||
// PeerConnected increments the number of connected peers and increments number of idle connections
|
||||
func (m *Metrics) PeerConnected(id string) {
|
||||
m.peers.Add(m.ctx, 1)
|
||||
func (m *Metrics) PeerConnected(id, transport string) {
|
||||
m.peers.Add(m.ctx, 1, metric.WithAttributes(attribute.String("transport", transport)))
|
||||
m.mutexActivity.Lock()
|
||||
defer m.mutexActivity.Unlock()
|
||||
|
||||
@@ -138,8 +139,8 @@ func (m *Metrics) RecordPeerStoreTime(duration time.Duration) {
|
||||
}
|
||||
|
||||
// PeerDisconnected decrements the number of connected peers and decrements number of idle or active connections
|
||||
func (m *Metrics) PeerDisconnected(id string) {
|
||||
m.peers.Add(m.ctx, -1)
|
||||
func (m *Metrics) PeerDisconnected(id, transport string) {
|
||||
m.peers.Add(m.ctx, -1, metric.WithAttributes(attribute.String("transport", transport)))
|
||||
m.mutexActivity.Lock()
|
||||
defer m.mutexActivity.Unlock()
|
||||
|
||||
|
||||
@@ -11,4 +11,6 @@ type Conn interface {
|
||||
Write(ctx context.Context, b []byte) (n int, err error)
|
||||
RemoteAddr() net.Addr
|
||||
Close() error
|
||||
// Protocol returns the transport name.
|
||||
Protocol() string
|
||||
}
|
||||
|
||||
@@ -42,6 +42,11 @@ func (c *Conn) RemoteAddr() net.Addr {
|
||||
return c.session.RemoteAddr()
|
||||
}
|
||||
|
||||
// Protocol returns the transport name for this connection.
|
||||
func (c *Conn) Protocol() string {
|
||||
return "quic"
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
c.closedMu.Lock()
|
||||
if c.closed {
|
||||
|
||||
@@ -64,6 +64,11 @@ func (c *Conn) RemoteAddr() net.Addr {
|
||||
return c.rAddr
|
||||
}
|
||||
|
||||
// Protocol returns the transport name for this connection.
|
||||
func (c *Conn) Protocol() string {
|
||||
return "ws"
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
c.closedMu.Lock()
|
||||
c.closed = true
|
||||
|
||||
@@ -154,15 +154,16 @@ func (r *Relay) Accept(conn listener.Conn) {
|
||||
}
|
||||
r.notifier.PeerCameOnline(peer.ID())
|
||||
|
||||
transport := conn.Protocol()
|
||||
r.metrics.RecordPeerStoreTime(time.Since(storeTime))
|
||||
r.metrics.PeerConnected(peer.String())
|
||||
r.metrics.PeerConnected(peer.String(), transport)
|
||||
go func() {
|
||||
peer.Work()
|
||||
if deleted := r.store.DeletePeer(peer); deleted {
|
||||
r.notifier.PeerWentOffline(peer.ID())
|
||||
}
|
||||
peer.log.Debugf("relay connection closed")
|
||||
r.metrics.PeerDisconnected(peer.String())
|
||||
r.metrics.PeerDisconnected(peer.String(), transport)
|
||||
}()
|
||||
|
||||
if err := h.handshakeResponse(hsCtx); err != nil {
|
||||
|
||||
@@ -145,6 +145,11 @@ func (cc *connContainer) close() {
|
||||
}
|
||||
}
|
||||
|
||||
// transportConn is implemented by relay connections that know their transport.
|
||||
type transportConn interface {
|
||||
Protocol() string
|
||||
}
|
||||
|
||||
// Client is a client for the relay server. It is responsible for establishing a connection to the relay server and
|
||||
// managing connections to other peers. All exported functions are safe to call concurrently. After close the connection,
|
||||
// the client can be reused by calling Connect again. When the client is closed, all connections are closed too.
|
||||
@@ -182,6 +187,18 @@ type Client struct {
|
||||
// datagramFallbackTriggered guards a single fallback per connection so a
|
||||
// burst of oversized datagrams triggers one reconnect, not many.
|
||||
datagramFallbackTriggered atomic.Bool
|
||||
|
||||
// transport is the negotiated relay transport of the
|
||||
// current connection, guarded by mu.
|
||||
transport string
|
||||
}
|
||||
|
||||
// Transport returns the negotiated relay transport of the current connection,
|
||||
// or an empty string when not connected.
|
||||
func (c *Client) Transport() string {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.transport
|
||||
}
|
||||
|
||||
// SetTransportFallback wires the shared datagram-transport fallback tracker.
|
||||
@@ -402,6 +419,9 @@ func (c *Client) connect(ctx context.Context) (*RelayAddr, error) {
|
||||
}
|
||||
c.relayConn = conn
|
||||
c.datagramFallbackTriggered.Store(false)
|
||||
if tc, ok := conn.(transportConn); ok {
|
||||
c.transport = tc.Protocol()
|
||||
}
|
||||
|
||||
instanceURL, err := c.handShake(ctx)
|
||||
if err != nil {
|
||||
@@ -792,6 +812,7 @@ func (c *Client) close(gracefullyExit bool) error {
|
||||
return nil
|
||||
}
|
||||
c.serviceIsRunning = false
|
||||
c.transport = ""
|
||||
|
||||
c.muInstanceURL.Lock()
|
||||
c.instanceURL = nil
|
||||
|
||||
@@ -57,6 +57,11 @@ func (c *Conn) Write(b []byte) (int, error) {
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
// Protocol returns the transport name for this connection.
|
||||
func (c *Conn) Protocol() string {
|
||||
return Network
|
||||
}
|
||||
|
||||
func (c *Conn) RemoteAddr() net.Addr {
|
||||
return c.session.RemoteAddr()
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package quic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
@@ -59,22 +58,16 @@ func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn,
|
||||
|
||||
udpConn, err := nbnet.ListenUDP("udp", &net.UDPAddr{Port: 0})
|
||||
if err != nil {
|
||||
log.Errorf("failed to listen on UDP: %s", err)
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("listen udp: %w", err)
|
||||
}
|
||||
|
||||
udpAddr, err := net.ResolveUDPAddr("udp", quicURL)
|
||||
if err != nil {
|
||||
log.Errorf("failed to resolve UDP address: %s", err)
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("resolve %s: %w", quicURL, err)
|
||||
}
|
||||
|
||||
session, err := quic.Dial(ctx, udpConn, udpAddr, tlsClientConfig, quicConfig)
|
||||
if err != nil {
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return nil, err
|
||||
}
|
||||
log.Errorf("failed to dial to Relay server via QUIC '%s': %s", quicURL, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package dialer
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
@@ -71,6 +72,7 @@ func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) {
|
||||
|
||||
connChan := make(chan dialResult, len(r.dialerFns))
|
||||
winnerConn := make(chan net.Conn, 1)
|
||||
errChan := make(chan error, 1)
|
||||
abortCtx, abort := context.WithCancel(ctx)
|
||||
defer abort()
|
||||
|
||||
@@ -78,11 +80,11 @@ func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) {
|
||||
go r.dial(dfn, abortCtx, connChan)
|
||||
}
|
||||
|
||||
go r.processResults(connChan, winnerConn, abort)
|
||||
go r.processResults(connChan, winnerConn, errChan, abort)
|
||||
|
||||
conn, ok := <-winnerConn
|
||||
if !ok {
|
||||
return nil, errors.New("failed to dial to Relay server on any protocol")
|
||||
return nil, <-errChan
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
@@ -90,6 +92,7 @@ func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) {
|
||||
// dialSequential tries each dialer in order, returning the first connection and
|
||||
// falling back to the next on failure.
|
||||
func (r *RaceDial) dialSequential(ctx context.Context) (net.Conn, error) {
|
||||
var errs []error
|
||||
for _, dfn := range r.dialerFns {
|
||||
if err := ctx.Err(); err != nil {
|
||||
return nil, err
|
||||
@@ -103,12 +106,13 @@ func (r *RaceDial) dialSequential(ctx context.Context) (net.Conn, error) {
|
||||
return nil, err
|
||||
}
|
||||
r.log.Errorf("failed to dial via %s: %s", dfn.Protocol(), err)
|
||||
errs = append(errs, fmt.Errorf("%s: %w", dfn.Protocol(), err))
|
||||
continue
|
||||
}
|
||||
r.log.Infof("successfully dialed via: %s", dfn.Protocol())
|
||||
return conn, nil
|
||||
}
|
||||
return nil, errors.New("failed to dial to Relay server on any protocol")
|
||||
return nil, dialErr(errs)
|
||||
}
|
||||
|
||||
func (r *RaceDial) dial(dfn DialeFn, abortCtx context.Context, connChan chan dialResult) {
|
||||
@@ -120,8 +124,9 @@ func (r *RaceDial) dial(dfn DialeFn, abortCtx context.Context, connChan chan dia
|
||||
connChan <- dialResult{Conn: conn, Protocol: dfn.Protocol(), Err: err}
|
||||
}
|
||||
|
||||
func (r *RaceDial) processResults(connChan chan dialResult, winnerConn chan net.Conn, abort context.CancelFunc) {
|
||||
func (r *RaceDial) processResults(connChan chan dialResult, winnerConn chan net.Conn, errChan chan error, abort context.CancelFunc) {
|
||||
var hasWinner bool
|
||||
errsByProtocol := make(map[string]error)
|
||||
for i := 0; i < len(r.dialerFns); i++ {
|
||||
dr := <-connChan
|
||||
if dr.Err != nil {
|
||||
@@ -129,6 +134,7 @@ func (r *RaceDial) processResults(connChan chan dialResult, winnerConn chan net.
|
||||
r.log.Infof("connection attempt aborted via: %s", dr.Protocol)
|
||||
} else {
|
||||
r.log.Errorf("failed to dial via %s: %s", dr.Protocol, dr.Err)
|
||||
errsByProtocol[dr.Protocol] = fmt.Errorf("%s: %w", dr.Protocol, dr.Err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
@@ -146,5 +152,29 @@ func (r *RaceDial) processResults(connChan chan dialResult, winnerConn chan net.
|
||||
hasWinner = true
|
||||
winnerConn <- dr.Conn
|
||||
}
|
||||
if !hasWinner {
|
||||
errChan <- dialErr(r.orderedErrs(errsByProtocol))
|
||||
}
|
||||
close(winnerConn)
|
||||
}
|
||||
|
||||
// orderedErrs returns the per-protocol errors in dialer order, so the combined
|
||||
// error is stable regardless of which attempt failed first.
|
||||
func (r *RaceDial) orderedErrs(byProtocol map[string]error) []error {
|
||||
errs := make([]error, 0, len(byProtocol))
|
||||
for _, dfn := range r.dialerFns {
|
||||
if err, ok := byProtocol[dfn.Protocol()]; ok {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
// dialErr combines per-dialer failures, preserving the underlying reasons
|
||||
// (e.g. "connection refused") rather than a generic message.
|
||||
func dialErr(errs []error) error {
|
||||
if len(errs) == 0 {
|
||||
return errors.New("no relay transport available")
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
@@ -33,6 +33,11 @@ func NewConn(wsConn *websocket.Conn, serverAddress string, underlying net.Conn)
|
||||
}
|
||||
}
|
||||
|
||||
// Protocol returns the transport name for this connection.
|
||||
func (c *Conn) Protocol() string {
|
||||
return Network
|
||||
}
|
||||
|
||||
func (c *Conn) Read(b []byte) (n int, err error) {
|
||||
t, ioReader, err := c.Conn.Reader(c.ctx)
|
||||
if err != nil {
|
||||
|
||||
@@ -22,7 +22,7 @@ type Dialer struct {
|
||||
}
|
||||
|
||||
func (d Dialer) Protocol() string {
|
||||
return "WS"
|
||||
return Network
|
||||
}
|
||||
|
||||
func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn, error) {
|
||||
@@ -39,7 +39,12 @@ func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn,
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return nil, err
|
||||
}
|
||||
log.Errorf("failed to dial to Relay server '%s': %s", wsURL, err)
|
||||
// websocket.Dial wraps the cause in verbose layers; surface the
|
||||
// underlying network error when present.
|
||||
var opErr *net.OpError
|
||||
if errors.As(err, &opErr) {
|
||||
return nil, opErr
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
if resp.Body != nil {
|
||||
|
||||
@@ -41,14 +41,14 @@ func TestGetDialers(t *testing.T) {
|
||||
preferWS bool
|
||||
want []string
|
||||
}{
|
||||
{name: "auto races quic and ws", mode: "auto", mtu: iface.DefaultMTU, want: []string{"quic", "WS"}},
|
||||
{name: "ws pinned", mode: "ws", mtu: iface.DefaultMTU, want: []string{"WS"}},
|
||||
{name: "auto races quic and ws", mode: "auto", mtu: iface.DefaultMTU, want: []string{"quic", "ws"}},
|
||||
{name: "ws pinned", mode: "ws", mtu: iface.DefaultMTU, want: []string{"ws"}},
|
||||
{name: "quic pinned", mode: "quic", mtu: iface.DefaultMTU, want: []string{"quic"}},
|
||||
{name: "prefer-quic orders quic first", mode: "prefer-quic", mtu: iface.DefaultMTU, want: []string{"quic", "WS"}},
|
||||
{name: "prefer-ws orders ws first", mode: "prefer-ws", mtu: iface.DefaultMTU, want: []string{"WS", "quic"}},
|
||||
{name: "mtu above default forces ws", mode: "auto", mtu: iface.DefaultMTU + 100, want: []string{"WS"}},
|
||||
{name: "sticky fallback forces ws in auto", mode: "auto", mtu: iface.DefaultMTU, preferWS: true, want: []string{"WS"}},
|
||||
{name: "sticky fallback forces ws in prefer-quic", mode: "prefer-quic", mtu: iface.DefaultMTU, preferWS: true, want: []string{"WS"}},
|
||||
{name: "prefer-quic orders quic first", mode: "prefer-quic", mtu: iface.DefaultMTU, want: []string{"quic", "ws"}},
|
||||
{name: "prefer-ws orders ws first", mode: "prefer-ws", mtu: iface.DefaultMTU, want: []string{"ws", "quic"}},
|
||||
{name: "mtu above default forces ws", mode: "auto", mtu: iface.DefaultMTU + 100, want: []string{"ws"}},
|
||||
{name: "sticky fallback forces ws in auto", mode: "auto", mtu: iface.DefaultMTU, preferWS: true, want: []string{"ws"}},
|
||||
{name: "sticky fallback forces ws in prefer-quic", mode: "prefer-quic", mtu: iface.DefaultMTU, preferWS: true, want: []string{"ws"}},
|
||||
{name: "quic pin overrides sticky fallback", mode: "quic", mtu: iface.DefaultMTU, preferWS: true, want: []string{"quic"}},
|
||||
}
|
||||
|
||||
@@ -91,11 +91,11 @@ func TestStickyFallbackAfterDatagramTooLarge(t *testing.T) {
|
||||
}
|
||||
|
||||
// First dial races both transports.
|
||||
assert.Equal(t, []string{"quic", "WS"}, protocols(c.getDialers(transportModeFromEnv())))
|
||||
assert.Equal(t, []string{"quic", "ws"}, protocols(c.getDialers(transportModeFromEnv())))
|
||||
|
||||
// An oversized datagram records the fallback for this server.
|
||||
c.onDatagramTooLarge(&closeTrackingConn{}, netErr.ErrDatagramTooLarge)
|
||||
|
||||
// The reconnect now sticks to WebSocket.
|
||||
assert.Equal(t, []string{"WS"}, protocols(c.getDialers(transportModeFromEnv())))
|
||||
assert.Equal(t, []string{"ws"}, protocols(c.getDialers(transportModeFromEnv())))
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
@@ -20,6 +21,10 @@ type Guard struct {
|
||||
// maxBackoffInterval caps the exponential backoff between reconnect
|
||||
// attempts.
|
||||
maxBackoffInterval time.Duration
|
||||
|
||||
// lastErr is the error from the most recent failed reconnect attempt,
|
||||
// surfaced as the home relay status while disconnected.
|
||||
lastErr atomic.Pointer[error]
|
||||
}
|
||||
|
||||
// NewGuard creates a new guard for the relay client. A non-positive
|
||||
@@ -37,6 +42,19 @@ func NewGuard(sp *ServerPicker, maxBackoffInterval time.Duration) *Guard {
|
||||
return g
|
||||
}
|
||||
|
||||
// LastError returns the error from the most recent failed reconnect attempt, or
|
||||
// nil if reconnection last succeeded.
|
||||
func (g *Guard) LastError() error {
|
||||
if p := g.lastErr.Load(); p != nil {
|
||||
return *p
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (g *Guard) setLastError(err error) {
|
||||
g.lastErr.Store(&err)
|
||||
}
|
||||
|
||||
// StartReconnectTrys is called when the relay client is disconnected from the relay server.
|
||||
// It attempts to reconnect to the relay server. The function first tries a quick reconnect
|
||||
// to the same server that was used before, if the server URL is still valid. If the quick
|
||||
@@ -63,6 +81,7 @@ func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
|
||||
case <-ticker.C:
|
||||
if err := g.retry(ctx); err != nil {
|
||||
log.Errorf("failed to pick new Relay server: %s", err)
|
||||
g.setLastError(err)
|
||||
continue
|
||||
}
|
||||
return
|
||||
@@ -89,6 +108,7 @@ func (g *Guard) tryToQuickReconnect(parentCtx context.Context, rc *Client) bool
|
||||
|
||||
if err := rc.Connect(parentCtx); err != nil {
|
||||
log.Errorf("failed to reconnect to relay server: %s", err)
|
||||
g.setLastError(err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
@@ -100,6 +120,7 @@ func (g *Guard) retry(ctx context.Context) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
g.setLastError(nil)
|
||||
|
||||
// prevent to work with a deprecated Relay client instance
|
||||
g.drainRelayClientChan()
|
||||
@@ -125,6 +146,7 @@ func (g *Guard) isServerURLStillValid(rc *Client) bool {
|
||||
}
|
||||
|
||||
func (g *Guard) notifyReconnected() {
|
||||
g.setLastError(nil)
|
||||
select {
|
||||
case g.OnReconnected <- struct{}{}:
|
||||
default:
|
||||
|
||||
@@ -130,6 +130,9 @@ func (m *Manager) Serve() error {
|
||||
|
||||
client, err := m.serverPicker.PickServer(m.ctx)
|
||||
if err != nil {
|
||||
// record the initial failure so status shows the real reason before
|
||||
// the guard's first retry tick
|
||||
m.reconnectGuard.setLastError(err)
|
||||
go m.reconnectGuard.StartReconnectTrys(m.ctx, nil)
|
||||
} else {
|
||||
m.storeClient(client)
|
||||
@@ -242,6 +245,67 @@ func (m *Manager) ServerURLs() []string {
|
||||
return m.serverPicker.ServerURLs.Load().([]string)
|
||||
}
|
||||
|
||||
// RelayConnectError returns the error from the most recent failed home relay
|
||||
// reconnect attempt, or nil if the relay last connected successfully.
|
||||
func (m *Manager) RelayConnectError() error {
|
||||
return m.reconnectGuard.LastError()
|
||||
}
|
||||
|
||||
// RelayConnState is the connection state of a single relay server.
|
||||
type RelayConnState struct {
|
||||
// URL is the server's instance address when connected, otherwise the
|
||||
// configured server URL.
|
||||
URL string
|
||||
// Transport is the negotiated transport, empty if not connected.
|
||||
Transport string
|
||||
// Err is set when the relay is not connected.
|
||||
Err error
|
||||
}
|
||||
|
||||
// RelayStates returns the connection state of the home relay and every foreign
|
||||
// relay the manager currently tracks.
|
||||
func (m *Manager) RelayStates() []RelayConnState {
|
||||
var states []RelayConnState
|
||||
|
||||
m.relayClientMu.RLock()
|
||||
home := m.relayClient
|
||||
m.relayClientMu.RUnlock()
|
||||
if home != nil {
|
||||
st := relayConnState(home)
|
||||
// The home relay reconnects through the guard, so the real failure
|
||||
// reason lives there rather than on the (stale) client.
|
||||
if st.Err != nil {
|
||||
if gErr := m.reconnectGuard.LastError(); gErr != nil {
|
||||
st.Err = gErr
|
||||
}
|
||||
}
|
||||
states = append(states, st)
|
||||
}
|
||||
|
||||
// Snapshot the tracks, then query each outside the map lock: a track can be
|
||||
// held by an in-progress Connect, and blocking on it must not stall other
|
||||
// relay operations.
|
||||
m.relayClientsMutex.RLock()
|
||||
tracks := make([]*RelayTrack, 0, len(m.relayClients))
|
||||
for _, rt := range m.relayClients {
|
||||
tracks = append(tracks, rt)
|
||||
}
|
||||
m.relayClientsMutex.RUnlock()
|
||||
|
||||
// Only connected foreign relays carry state; a failed connect is evicted
|
||||
// immediately (openConnVia), so there is no error state to surface.
|
||||
for _, rt := range tracks {
|
||||
rt.RLock()
|
||||
rc := rt.relayClient
|
||||
rt.RUnlock()
|
||||
if rc != nil {
|
||||
states = append(states, relayConnState(rc))
|
||||
}
|
||||
}
|
||||
|
||||
return states
|
||||
}
|
||||
|
||||
// HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with
|
||||
// Relay service.
|
||||
func (m *Manager) HasRelayAddress() bool {
|
||||
@@ -460,3 +524,11 @@ func (m *Manager) notifyOnDisconnectListeners(serverAddress string) {
|
||||
}
|
||||
delete(m.onDisconnectedListeners, serverAddress)
|
||||
}
|
||||
|
||||
func relayConnState(c *Client) RelayConnState {
|
||||
addr, err := c.ServerInstanceURL()
|
||||
if err != nil {
|
||||
return RelayConnState{URL: c.connectionURL, Err: err}
|
||||
}
|
||||
return RelayConnState{URL: addr, Transport: c.Transport()}
|
||||
}
|
||||
|
||||
@@ -40,6 +40,7 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
|
||||
|
||||
connResultChan := make(chan connResult, totalServers)
|
||||
successChan := make(chan connResult, 1)
|
||||
errChan := make(chan error, 1)
|
||||
concurrentLimiter := make(chan struct{}, maxConcurrentServers)
|
||||
|
||||
log.Debugf("pick server from list: %v", sp.ServerURLs.Load().([]string))
|
||||
@@ -54,17 +55,17 @@ func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) {
|
||||
}(url)
|
||||
}
|
||||
|
||||
go sp.processConnResults(connResultChan, successChan)
|
||||
go sp.processConnResults(connResultChan, successChan, errChan)
|
||||
|
||||
select {
|
||||
case cr, ok := <-successChan:
|
||||
if !ok {
|
||||
return nil, errors.New("failed to connect to any relay server: all attempts failed")
|
||||
return nil, <-errChan
|
||||
}
|
||||
log.Infof("chosen home Relay server: %s", cr.Url)
|
||||
return cr.RelayClient, nil
|
||||
case <-ctx.Done():
|
||||
return nil, fmt.Errorf("failed to connect to any relay server: %w", ctx.Err())
|
||||
return nil, fmt.Errorf("connect to relay server: %w", ctx.Err())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,12 +81,14 @@ func (sp *ServerPicker) startConnection(ctx context.Context, resultChan chan con
|
||||
}
|
||||
}
|
||||
|
||||
func (sp *ServerPicker) processConnResults(resultChan chan connResult, successChan chan connResult) {
|
||||
func (sp *ServerPicker) processConnResults(resultChan chan connResult, successChan chan connResult, errChan chan error) {
|
||||
var hasSuccess bool
|
||||
var errs []error
|
||||
for numOfResults := 0; numOfResults < cap(resultChan); numOfResults++ {
|
||||
cr := <-resultChan
|
||||
if cr.Err != nil {
|
||||
log.Tracef("failed to connect to Relay server: %s: %v", cr.Url, cr.Err)
|
||||
errs = append(errs, cr.Err)
|
||||
continue
|
||||
}
|
||||
log.Infof("connected to Relay server: %s", cr.Url)
|
||||
@@ -101,5 +104,16 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh
|
||||
hasSuccess = true
|
||||
successChan <- cr
|
||||
}
|
||||
if !hasSuccess {
|
||||
errChan <- pickErr(errs)
|
||||
}
|
||||
close(successChan)
|
||||
}
|
||||
|
||||
// pickErr combines per-server connection failures into a single error.
|
||||
func pickErr(errs []error) error {
|
||||
if len(errs) == 0 {
|
||||
return errors.New("no relay server available")
|
||||
}
|
||||
return errors.Join(errs...)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user