Compare commits

...

2 Commits

Author SHA1 Message Date
Zoltan Papp
50a29c07ce [relay] Race home and foreign relay for peer connection setup
When a peer advertises a relay different from the home relay, open the
peer connection by racing the home relay and the remote peer's relay in
parallel: start the preferred one (controller prefers home, otherwise the
remote relay), fall back to the other after a delay, use whichever connects
first and drain the loser.

Expose the foreign relay cache as ForeignRelaysStore and the race as
ConnRacer. The manager passes the remote relay as a RelayServer{Addr, IP}.
OpenConn no longer returns the winning server address; the worker derives it
from the returned conn's RemoteAddr to register the close listener, matching
the previous main-branch behavior.
2026-06-30 18:26:13 +02:00
Zoltan Papp
7d8e20030b [relay] Extract foreign relay client cache into a dedicated type
Move the foreign-relay client cache out of Manager into a foreignRelays
type. Concurrent first-time connects to the same server are deduplicated
with singleflight, so the cache mutex is never held during a network
connect (removing the previous stall where a slow connect blocked all map
operations). A per-entry in-use refcount prevents the cleanup loop from
closing a client while a connection is being opened on it.

This drops RelayTrack, its per-track lock and the hand-over-hand locking
between the map lock and the track lock. The exported API is unchanged.
2026-06-29 00:41:47 +02:00
4 changed files with 335 additions and 179 deletions

View File

@@ -54,19 +54,15 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.relaySupportedOnRemotePeer.Store(true)
// the relayManager will return with error in case if the connection has lost with relay server
currentRelayAddress, _, err := w.relayManager.RelayInstanceAddress()
_, _, err := w.relayManager.RelayInstanceAddress()
if err != nil {
w.log.Errorf("failed to handle new offer: %s", err)
return
}
srv := w.preferredRelayServer(currentRelayAddress, remoteOfferAnswer.RelaySrvAddress)
var serverIP netip.Addr
if srv == remoteOfferAnswer.RelaySrvAddress {
serverIP = remoteOfferAnswer.RelaySrvIP
}
relayedConn, err := w.relayManager.OpenConn(w.peerCtx, srv, w.config.Key, serverIP)
preferForeign := !w.isController
remoteRelayServer := relayClient.RelayServer{Addr: remoteOfferAnswer.RelaySrvAddress, IP: remoteOfferAnswer.RelaySrvIP}
relayedConn, err := w.relayManager.OpenConn(w.peerCtx, remoteRelayServer, w.config.Key, preferForeign)
if err != nil {
if errors.Is(err, relayClient.ErrConnAlreadyExists) {
w.log.Debugf("handled offer by reusing existing relay connection")
@@ -80,14 +76,13 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.relayedConn = relayedConn
w.relayLock.Unlock()
err = w.relayManager.AddCloseListener(srv, w.onRelayClientDisconnected)
if err != nil {
log.Errorf("failed to add close listener: %s", err)
if err := w.relayManager.AddCloseListener(relayedConn.RemoteAddr().String(), w.onRelayClientDisconnected); err != nil {
w.log.Errorf("failed to add close listener: %s", err)
_ = relayedConn.Close()
return
}
w.log.Debugf("peer conn opened via Relay: %s", srv)
w.log.Debugf("peer conn opened via Relay: %s", relayedConn.RemoteAddr())
go w.conn.onRelayConnectionIsReady(RelayConnInfo{
relayedConn: relayedConn,
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
@@ -126,13 +121,6 @@ func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
return answer.RelaySrvAddress != ""
}
func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress string) string {
if w.isController {
return myRelayAddress
}
return remoteRelayAddress
}
func (w *WorkerRelay) onRelayClientDisconnected() {
go w.conn.onRelayDisconnected()
}

View File

@@ -0,0 +1,158 @@
package client
import (
"context"
"errors"
"net"
"time"
log "github.com/sirupsen/logrus"
)
const (
raceTotalTimeout = 60 * time.Second
raceFallbackDelay = 10 * time.Second
)
type raceAttempt struct {
conn net.Conn
err error
}
type connRace struct {
racer *ConnRacer
peerKey string
remoteRelayServer RelayServer
preferForeign bool
raceCtx context.Context
otherCtx context.Context
cancelPreferred context.CancelFunc
cancelOther context.CancelFunc
results chan raceAttempt
fallbackTimer *time.Timer
otherStarted bool
settled int
lastErr error
}
type ConnRacer struct {
home *Client
foreignStore *ForeignRelaysStore
}
func NewConnRacer(home *Client, foreignStore *ForeignRelaysStore) *ConnRacer {
return &ConnRacer{
home: home,
foreignStore: foreignStore,
}
}
func (r *ConnRacer) Run(ctx context.Context, peerKey string, remoteRelayServer RelayServer, preferForeign bool) (net.Conn, error) {
raceCtx, cancel := context.WithTimeout(ctx, raceTotalTimeout)
defer cancel()
preferredCtx, cancelPreferred := context.WithCancel(raceCtx)
otherCtx, cancelOther := context.WithCancel(raceCtx)
race := &connRace{
racer: r,
peerKey: peerKey,
remoteRelayServer: remoteRelayServer,
preferForeign: preferForeign,
raceCtx: raceCtx,
otherCtx: otherCtx,
cancelPreferred: cancelPreferred,
cancelOther: cancelOther,
results: make(chan raceAttempt, 2),
fallbackTimer: time.NewTimer(raceFallbackDelay),
}
defer race.fallbackTimer.Stop()
go func() {
race.results <- r.open(preferredCtx, peerKey, remoteRelayServer, preferForeign)
}()
for {
select {
case <-race.fallbackTimer.C:
race.startOther()
case res := <-race.results:
if conn, err, done := race.handleResult(res); done {
return conn, err
}
case <-raceCtx.Done():
return race.onTimeout()
}
}
}
func (c *connRace) startOther() {
if c.otherStarted {
return
}
c.otherStarted = true
c.fallbackTimer.Stop()
go func() {
c.results <- c.racer.open(c.otherCtx, c.peerKey, c.remoteRelayServer, !c.preferForeign)
}()
}
func (c *connRace) handleResult(res raceAttempt) (net.Conn, error, bool) {
if (res.err == nil && res.conn != nil) || errors.Is(res.err, ErrConnAlreadyExists) {
c.stop()
return res.conn, res.err, true
}
c.lastErr = res.err
c.settled++
if !c.otherStarted {
c.startOther()
return nil, nil, false
}
if c.settled == 2 {
c.cancelPreferred()
c.cancelOther()
return nil, c.lastErr, true
}
return nil, nil, false
}
func (c *connRace) onTimeout() (net.Conn, error) {
c.stop()
if c.lastErr != nil {
return nil, c.lastErr
}
return nil, c.raceCtx.Err()
}
func (c *connRace) stop() {
c.cancelPreferred()
c.cancelOther()
go c.racer.drainLoser(c.results, c.settled, c.otherStarted)
}
func (r *ConnRacer) open(ctx context.Context, peerKey string, remoteRelayServer RelayServer, foreign bool) raceAttempt {
if foreign {
conn, err := r.foreignStore.OpenConn(ctx, peerKey, remoteRelayServer)
return raceAttempt{conn: conn, err: err}
}
conn, err := r.home.OpenConn(ctx, peerKey)
return raceAttempt{conn: conn, err: err}
}
func (r *ConnRacer) drainLoser(results chan raceAttempt, settled int, otherStarted bool) {
started := 1
if otherStarted {
started = 2
}
for i := settled; i < started; i++ {
res := <-results
if res.conn != nil {
if err := res.conn.Close(); err != nil {
log.Debugf("failed to close losing relay connection: %v", err)
}
}
}
}

View File

@@ -0,0 +1,155 @@
package client
import (
"context"
"net"
"sync"
"time"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/singleflight"
relayAuth "github.com/netbirdio/netbird/shared/relay/auth/hmac"
)
type foreignRelay struct {
client *Client
created time.Time
inUse int
}
type ForeignRelaysStore struct {
mu sync.RWMutex
clients map[string]*foreignRelay
group singleflight.Group
ctx context.Context
tokenStore *relayAuth.TokenStore
peerID string
mtu uint16
transportFallback *transportFallback
onDisconnect func(string)
keepUnusedServerTime time.Duration
}
func NewForeignRelaysStore(ctx context.Context, tokenStore *relayAuth.TokenStore, peerID string, mtu uint16, transportFallback *transportFallback, onDisconnect func(string), keepUnusedServerTime time.Duration) *ForeignRelaysStore {
return &ForeignRelaysStore{
clients: make(map[string]*foreignRelay),
ctx: ctx,
tokenStore: tokenStore,
peerID: peerID,
mtu: mtu,
transportFallback: transportFallback,
onDisconnect: onDisconnect,
keepUnusedServerTime: keepUnusedServerTime,
}
}
func (f *ForeignRelaysStore) OpenConn(ctx context.Context, peerKey string, remoteRelayServer RelayServer) (net.Conn, error) {
fr, err := f.acquire(remoteRelayServer)
if err != nil {
return nil, err
}
defer f.release(fr)
return fr.client.OpenConn(ctx, peerKey)
}
func (f *ForeignRelaysStore) acquire(remoteRelayServer RelayServer) (*foreignRelay, error) {
f.mu.Lock()
if fr, ok := f.clients[remoteRelayServer.Addr]; ok {
fr.inUse++
f.mu.Unlock()
return fr, nil
}
f.mu.Unlock()
v, err, _ := f.group.Do(remoteRelayServer.Addr, func() (any, error) {
f.mu.RLock()
fr, ok := f.clients[remoteRelayServer.Addr]
f.mu.RUnlock()
if ok {
return fr, nil
}
relayClient := NewClientWithServerIP(remoteRelayServer.Addr, remoteRelayServer.IP, f.tokenStore, f.peerID, f.mtu)
relayClient.SetTransportFallback(f.transportFallback)
if err := relayClient.Connect(f.ctx); err != nil {
return nil, err
}
relayClient.SetOnDisconnectListener(f.onDisconnect)
f.mu.Lock()
fr = &foreignRelay{client: relayClient, created: time.Now()}
f.clients[remoteRelayServer.Addr] = fr
f.mu.Unlock()
return fr, nil
})
if err != nil {
return nil, err
}
fr := v.(*foreignRelay)
f.mu.Lock()
if cur, ok := f.clients[remoteRelayServer.Addr]; !ok || cur != fr {
f.mu.Unlock()
return f.acquire(remoteRelayServer)
}
fr.inUse++
f.mu.Unlock()
return fr, nil
}
func (f *ForeignRelaysStore) release(fr *foreignRelay) {
f.mu.Lock()
fr.inUse--
f.mu.Unlock()
}
func (f *ForeignRelaysStore) evict(serverAddress string) {
f.mu.Lock()
defer f.mu.Unlock()
if _, ok := f.clients[serverAddress]; ok {
delete(f.clients, serverAddress)
log.Debugf("evicted disconnected foreign relay client: %s", serverAddress)
}
}
func (f *ForeignRelaysStore) cleanupUnused() {
f.mu.Lock()
defer f.mu.Unlock()
for addr, fr := range f.clients {
if time.Since(fr.created) <= f.keepUnusedServerTime {
continue
}
if fr.inUse > 0 {
continue
}
if fr.client.HasConns() {
continue
}
fr.client.SetOnDisconnectListener(nil)
go func() {
_ = fr.client.Close()
}()
log.Debugf("clean up unused relay server connection: %s", addr)
delete(f.clients, addr)
}
}
func (f *ForeignRelaysStore) states() []RelayConnState {
f.mu.RLock()
clients := make([]*Client, 0, len(f.clients))
for _, fr := range f.clients {
clients = append(clients, fr.client)
}
f.mu.RUnlock()
states := make([]RelayConnState, 0, len(clients))
for _, c := range clients {
states = append(states, relayConnState(c))
}
return states
}

View File

@@ -22,22 +22,6 @@ var (
ErrRelayClientNotConnected = fmt.Errorf("relay client not connected")
)
// RelayTrack hold the relay clients for the foreign relay servers.
// With the mutex can ensure we can open new connection in case the relay connection has been established with
// the relay server.
type RelayTrack struct {
sync.RWMutex
relayClient *Client
err error
created time.Time
}
func NewRelayTrack() *RelayTrack {
return &RelayTrack{
created: time.Now(),
}
}
type OnServerCloseListener func()
// ManagerOption configures a Manager at construction time.
@@ -54,6 +38,11 @@ type RelayConnState struct {
Err error
}
type RelayServer struct {
Addr string
IP netip.Addr
}
// WithMaxBackoffInterval caps the exponential backoff between reconnect
// attempts to the home relay. A non-positive value keeps the default.
func WithMaxBackoffInterval(d time.Duration) ManagerOption {
@@ -78,8 +67,7 @@ type Manager struct {
relayClientMu sync.RWMutex
reconnectGuard *Guard
relayClients map[string]*RelayTrack
relayClientsMutex sync.RWMutex
foreign *ForeignRelaysStore
onDisconnectedListeners map[string]*list.List
onReconnectedListenerFn func()
@@ -115,7 +103,6 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin
ConnectionTimeout: defaultConnectionTimeout,
TransportFallback: tf,
},
relayClients: make(map[string]*RelayTrack),
onDisconnectedListeners: make(map[string]*list.List),
cleanupInterval: relayCleanupInterval,
keepUnusedServerTime: keepUnusedServerTime,
@@ -123,6 +110,7 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin
for _, opt := range opts {
opt(m)
}
m.foreign = NewForeignRelaysStore(ctx, tokenStore, peerID, mtu, tf, m.onServerDisconnected, m.keepUnusedServerTime)
m.serverPicker.ServerURLs.Store(serverURLs)
m.reconnectGuard = NewGuard(m.serverPicker, m.maxBackoffInterval)
return m
@@ -154,13 +142,7 @@ func (m *Manager) Serve() error {
return err
}
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
// established via the relay server. If the peer is on a different relay server, the manager will establish a new
// connection to the relay server. It returns back with a net.Conn what represent the remote peer connection.
//
// serverIP, when valid and serverAddress is foreign, is used as a dial target if the FQDN-based dial fails.
// Ignored for the local home-server path. TLS verification still uses the FQDN via SNI.
func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string, serverIP netip.Addr) (net.Conn, error) {
func (m *Manager) OpenConn(ctx context.Context, remoteRelayServer RelayServer, peerKey string, preferForeign bool) (net.Conn, error) {
m.relayClientMu.RLock()
defer m.relayClientMu.RUnlock()
@@ -168,26 +150,17 @@ func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string, s
return nil, ErrRelayClientNotConnected
}
foreign, err := m.isForeignServer(serverAddress)
foreign, err := m.isForeignServer(remoteRelayServer.Addr)
if err != nil {
return nil, err
}
var (
netConn net.Conn
)
if !foreign {
log.Debugf("open peer connection via permanent server: %s", peerKey)
netConn, err = m.relayClient.OpenConn(ctx, peerKey)
} else {
log.Debugf("open peer connection via foreign server: %s", serverAddress)
netConn, err = m.openConnVia(ctx, serverAddress, peerKey, serverIP)
}
if err != nil {
return nil, err
return m.relayClient.OpenConn(ctx, peerKey)
}
return netConn, err
racer := NewConnRacer(m.relayClient, m.foreign)
return racer.Run(ctx, peerKey, remoteRelayServer, preferForeign)
}
// Ready returns true if the home Relay client is connected to the relay server.
@@ -282,26 +255,7 @@ func (m *Manager) RelayStates() []RelayConnState {
states = append(states, st)
}
// Snapshot the tracks, then query each outside the map lock: a track can be
// held by an in-progress Connect, and blocking on it must not stall other
// relay operations.
m.relayClientsMutex.RLock()
tracks := make([]*RelayTrack, 0, len(m.relayClients))
for _, rt := range m.relayClients {
tracks = append(tracks, rt)
}
m.relayClientsMutex.RUnlock()
// Only connected foreign relays carry state; a failed connect is evicted
// immediately (openConnVia), so there is no error state to surface.
for _, rt := range tracks {
rt.RLock()
rc := rt.relayClient
rt.RUnlock()
if rc != nil {
states = append(states, relayConnState(rc))
}
}
states = append(states, m.foreign.states()...)
return states
}
@@ -322,64 +276,6 @@ func (m *Manager) UpdateToken(token *relayAuth.Token) error {
return m.tokenStore.UpdateToken(token)
}
func (m *Manager) openConnVia(ctx context.Context, serverAddress, peerKey string, serverIP netip.Addr) (net.Conn, error) {
// check if already has a connection to the desired relay server
m.relayClientsMutex.RLock()
rt, ok := m.relayClients[serverAddress]
if ok {
rt.RLock()
m.relayClientsMutex.RUnlock()
defer rt.RUnlock()
if rt.err != nil {
return nil, rt.err
}
return rt.relayClient.OpenConn(ctx, peerKey)
}
m.relayClientsMutex.RUnlock()
// if not, establish a new connection but check it again (because changed the lock type) before starting the
// connection
m.relayClientsMutex.Lock()
rt, ok = m.relayClients[serverAddress]
if ok {
rt.RLock()
m.relayClientsMutex.Unlock()
defer rt.RUnlock()
if rt.err != nil {
return nil, rt.err
}
return rt.relayClient.OpenConn(ctx, peerKey)
}
// create a new relay client and store it in the relayClients map
rt = NewRelayTrack()
rt.Lock()
m.relayClients[serverAddress] = rt
m.relayClientsMutex.Unlock()
relayClient := NewClientWithServerIP(serverAddress, serverIP, m.tokenStore, m.peerID, m.mtu)
relayClient.SetTransportFallback(m.transportFallback)
err := relayClient.Connect(m.ctx)
if err != nil {
rt.err = err
rt.Unlock()
m.relayClientsMutex.Lock()
delete(m.relayClients, serverAddress)
m.relayClientsMutex.Unlock()
return nil, err
}
// if connection closed then delete the relay client from the list
relayClient.SetOnDisconnectListener(m.onServerDisconnected)
rt.relayClient = relayClient
rt.Unlock()
conn, err := relayClient.OpenConn(ctx, peerKey)
if err != nil {
return nil, err
}
return conn, nil
}
func (m *Manager) onServerConnected() {
m.listenerLock.Lock()
defer m.listenerLock.Unlock()
@@ -405,21 +301,12 @@ func (m *Manager) onServerDisconnected(serverAddress string) {
m.relayClientMu.Unlock()
if !isHome {
m.evictForeignRelay(serverAddress)
m.foreign.evict(serverAddress)
}
m.notifyOnDisconnectListeners(serverAddress)
}
func (m *Manager) evictForeignRelay(serverAddress string) {
m.relayClientsMutex.Lock()
defer m.relayClientsMutex.Unlock()
if _, ok := m.relayClients[serverAddress]; ok {
delete(m.relayClients, serverAddress)
log.Debugf("evicted disconnected foreign relay client: %s", serverAddress)
}
}
func (m *Manager) listenGuardEvent(ctx context.Context) {
for {
select {
@@ -458,43 +345,11 @@ func (m *Manager) startCleanupLoop() {
case <-m.ctx.Done():
return
case <-ticker.C:
m.cleanUpUnusedRelays()
m.foreign.cleanupUnused()
}
}
}
func (m *Manager) cleanUpUnusedRelays() {
m.relayClientsMutex.Lock()
defer m.relayClientsMutex.Unlock()
for addr, rt := range m.relayClients {
rt.Lock()
// if the connection failed to the server the relay client will be nil
// but the instance will be kept in the relayClients until the next locking
if rt.err != nil {
rt.Unlock()
continue
}
if time.Since(rt.created) <= m.keepUnusedServerTime {
rt.Unlock()
continue
}
if rt.relayClient.HasConns() {
rt.Unlock()
continue
}
rt.relayClient.SetOnDisconnectListener(nil)
go func() {
_ = rt.relayClient.Close()
}()
log.Debugf("clean up unused relay server connection: %s", addr)
delete(m.relayClients, addr)
rt.Unlock()
}
}
func (m *Manager) addListener(serverAddress string, onClosedListener OnServerCloseListener) {
m.listenerLock.Lock()
defer m.listenerLock.Unlock()