From 2a5cb1649402d42f588d374f6a775c62e92a5522 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Fri, 22 Nov 2024 18:12:34 +0100 Subject: [PATCH] [relay] Refactor initial Relay connection (#2800) Can support firewalls with restricted WS rules allow to run engine without Relay servers keep up to date Relay address changes --- client/internal/connect.go | 3 +- client/internal/engine.go | 10 ++- client/internal/peer/status.go | 20 ++--- client/internal/peer/worker_ice.go | 14 +-- relay/client/client.go | 6 +- relay/client/client_test.go | 2 +- relay/client/guard.go | 91 +++++++++++++++---- relay/client/manager.go | 140 +++++++++++++++++++---------- relay/client/picker.go | 16 ++-- relay/client/picker_test.go | 5 +- 10 files changed, 211 insertions(+), 96 deletions(-) diff --git a/client/internal/connect.go b/client/internal/connect.go index f76aa066b..8c2ad4aa1 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -232,6 +232,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold relayURLs, token := parseRelayInfo(loginResp) relayManager := relayClient.NewManager(engineCtx, relayURLs, myPrivateKey.PublicKey().String()) + c.statusRecorder.SetRelayMgr(relayManager) if len(relayURLs) > 0 { if token != nil { if err := relayManager.UpdateToken(token); err != nil { @@ -242,9 +243,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, probes *ProbeHold log.Infof("connecting to the Relay service(s): %s", strings.Join(relayURLs, ", ")) if err = relayManager.Serve(); err != nil { log.Error(err) - return wrapErr(err) } - c.statusRecorder.SetRelayMgr(relayManager) } peerConfig := loginResp.GetPeerConfig() diff --git a/client/internal/engine.go b/client/internal/engine.go index 1c912220c..dc4499e17 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -538,6 +538,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error { relayMsg := wCfg.GetRelay() if relayMsg != nil { + // when we receive token we expect valid address list too c := &auth.Token{ Payload: relayMsg.GetTokenPayload(), Signature: relayMsg.GetTokenSignature(), @@ -546,9 +547,16 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error { log.Errorf("failed to update relay token: %v", err) return fmt.Errorf("update relay token: %w", err) } + + e.relayManager.UpdateServerURLs(relayMsg.Urls) + + // Just in case the agent started with an MGM server where the relay was disabled but was later enabled. + // We can ignore all errors because the guard will manage the reconnection retries. + _ = e.relayManager.Serve() + } else { + e.relayManager.UpdateServerURLs(nil) } - // todo update relay address in the relay manager // todo update signal } diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 0444dc60b..74e2ee82c 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -676,25 +676,23 @@ func (d *Status) GetRelayStates() []relay.ProbeResult { // extend the list of stun, turn servers with relay address relayStates := slices.Clone(d.relayStates) - var relayState relay.ProbeResult - // if the server connection is not established then we will use the general address // in case of connection we will use the instance specific address instanceAddr, err := d.relayMgr.RelayInstanceAddress() if err != nil { // TODO add their status - if errors.Is(err, relayClient.ErrRelayClientNotConnected) { - for _, r := range d.relayMgr.ServerURLs() { - relayStates = append(relayStates, relay.ProbeResult{ - URI: r, - }) - } - return relayStates + for _, r := range d.relayMgr.ServerURLs() { + relayStates = append(relayStates, relay.ProbeResult{ + URI: r, + Err: err, + }) } - relayState.Err = err + return relayStates } - relayState.URI = instanceAddr + relayState := relay.ProbeResult{ + URI: instanceAddr, + } return append(relayStates, relayState) } diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 4c67cb781..7ce4797c3 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -46,8 +46,6 @@ type WorkerICE struct { hasRelayOnLocally bool conn WorkerICECallbacks - selectedPriority ConnPriority - agent *ice.Agent muxAgent sync.Mutex @@ -95,10 +93,8 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { var preferredCandidateTypes []ice.CandidateType if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" { - w.selectedPriority = connPriorityICEP2P preferredCandidateTypes = icemaker.CandidateTypesP2P() } else { - w.selectedPriority = connPriorityICETurn preferredCandidateTypes = icemaker.CandidateTypes() } @@ -159,7 +155,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { RelayedOnLocal: isRelayCandidate(pair.Local), } w.log.Debugf("on ICE conn read to use ready") - go w.conn.OnConnReady(w.selectedPriority, ci) + go w.conn.OnConnReady(selectedPriority(pair), ci) } // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. @@ -394,3 +390,11 @@ func isRelayed(pair *ice.CandidatePair) bool { } return false } + +func selectedPriority(pair *ice.CandidatePair) ConnPriority { + if isRelayed(pair) { + return connPriorityICETurn + } else { + return connPriorityICEP2P + } +} diff --git a/relay/client/client.go b/relay/client/client.go index 154c1787f..db5252f50 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -140,7 +140,7 @@ type Client struct { instanceURL *RelayAddr muInstanceURL sync.Mutex - onDisconnectListener func() + onDisconnectListener func(string) onConnectedListener func() listenerMutex sync.Mutex } @@ -233,7 +233,7 @@ func (c *Client) ServerInstanceURL() (string, error) { } // SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed. -func (c *Client) SetOnDisconnectListener(fn func()) { +func (c *Client) SetOnDisconnectListener(fn func(string)) { c.listenerMutex.Lock() defer c.listenerMutex.Unlock() c.onDisconnectListener = fn @@ -554,7 +554,7 @@ func (c *Client) notifyDisconnected() { if c.onDisconnectListener == nil { return } - go c.onDisconnectListener() + go c.onDisconnectListener(c.connectionURL) } func (c *Client) notifyConnected() { diff --git a/relay/client/client_test.go b/relay/client/client_test.go index ef28203e9..7ddfba4c6 100644 --- a/relay/client/client_test.go +++ b/relay/client/client_test.go @@ -551,7 +551,7 @@ func TestCloseByServer(t *testing.T) { } disconnected := make(chan struct{}) - relayClient.SetOnDisconnectListener(func() { + relayClient.SetOnDisconnectListener(func(_ string) { log.Infof("client disconnected") close(disconnected) }) diff --git a/relay/client/guard.go b/relay/client/guard.go index d6b6b0da5..b971363a8 100644 --- a/relay/client/guard.go +++ b/relay/client/guard.go @@ -4,65 +4,120 @@ import ( "context" "time" + "github.com/cenkalti/backoff/v4" log "github.com/sirupsen/logrus" ) var ( - reconnectingTimeout = 5 * time.Second + reconnectingTimeout = 60 * time.Second ) // Guard manage the reconnection tries to the Relay server in case of disconnection event. type Guard struct { - ctx context.Context - relayClient *Client + // OnNewRelayClient is a channel that is used to notify the relay client about a new relay client instance. + OnNewRelayClient chan *Client + serverPicker *ServerPicker } // NewGuard creates a new guard for the relay client. -func NewGuard(context context.Context, relayClient *Client) *Guard { +func NewGuard(sp *ServerPicker) *Guard { g := &Guard{ - ctx: context, - relayClient: relayClient, + OnNewRelayClient: make(chan *Client, 1), + serverPicker: sp, } return g } -// OnDisconnected is called when the relay client is disconnected from the relay server. It will trigger the reconnection +// StartReconnectTrys is called when the relay client is disconnected from the relay server. +// It attempts to reconnect to the relay server. The function first tries a quick reconnect +// to the same server that was used before, if the server URL is still valid. If the quick +// reconnect fails, it starts a ticker to periodically attempt server picking until it +// succeeds or the context is done. +// +// Parameters: +// - ctx: The context to control the lifecycle of the reconnection attempts. +// - relayClient: The relay client instance that was disconnected. // todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent -func (g *Guard) OnDisconnected() { - if g.quickReconnect() { +func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) { + if relayClient == nil { + goto RETRY + } + if g.isServerURLStillValid(relayClient) && g.quickReconnect(ctx, relayClient) { return } - ticker := time.NewTicker(reconnectingTimeout) +RETRY: + ticker := exponentTicker(ctx) defer ticker.Stop() for { select { case <-ticker.C: - err := g.relayClient.Connect() - if err != nil { - log.Errorf("failed to reconnect to relay server: %s", err) + if err := g.retry(ctx); err != nil { + log.Errorf("failed to pick new Relay server: %s", err) continue } return - case <-g.ctx.Done(): + case <-ctx.Done(): return } } } -func (g *Guard) quickReconnect() bool { - ctx, cancel := context.WithTimeout(g.ctx, 1500*time.Millisecond) +func (g *Guard) retry(ctx context.Context) error { + log.Infof("try to pick up a new Relay server") + relayClient, err := g.serverPicker.PickServer(ctx) + if err != nil { + return err + } + + // prevent to work with a deprecated Relay client instance + g.drainRelayClientChan() + + g.OnNewRelayClient <- relayClient + return nil +} + +func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool { + ctx, cancel := context.WithTimeout(parentCtx, 1500*time.Millisecond) defer cancel() <-ctx.Done() - if g.ctx.Err() != nil { + if parentCtx.Err() != nil { return false } + log.Infof("try to reconnect to Relay server: %s", rc.connectionURL) - if err := g.relayClient.Connect(); err != nil { + if err := rc.Connect(); err != nil { log.Errorf("failed to reconnect to relay server: %s", err) return false } return true } + +func (g *Guard) drainRelayClientChan() { + select { + case <-g.OnNewRelayClient: + default: + } +} + +func (g *Guard) isServerURLStillValid(rc *Client) bool { + for _, url := range g.serverPicker.ServerURLs.Load().([]string) { + if url == rc.connectionURL { + return true + } + } + return false +} + +func exponentTicker(ctx context.Context) *backoff.Ticker { + bo := backoff.WithContext(&backoff.ExponentialBackOff{ + InitialInterval: 2 * time.Second, + Multiplier: 2, + MaxInterval: reconnectingTimeout, + Clock: backoff.SystemClock, + }, ctx) + + return backoff.NewTicker(bo) +} diff --git a/relay/client/manager.go b/relay/client/manager.go index b14a7701b..d847bb879 100644 --- a/relay/client/manager.go +++ b/relay/client/manager.go @@ -57,12 +57,15 @@ type ManagerService interface { // relay servers will be closed if there is no active connection. Periodically the manager will check if there is any // unused relay connection and close it. type Manager struct { - ctx context.Context - serverURLs []string - peerID string - tokenStore *relayAuth.TokenStore + ctx context.Context + peerID string + running bool + tokenStore *relayAuth.TokenStore + serverPicker *ServerPicker - relayClient *Client + relayClient *Client + // the guard logic can overwrite the relayClient variable, this mutex protect the usage of the variable + relayClientMu sync.Mutex reconnectGuard *Guard relayClients map[string]*RelayTrack @@ -76,48 +79,54 @@ type Manager struct { // NewManager creates a new manager instance. // The serverURL address can be empty. In this case, the manager will not serve. func NewManager(ctx context.Context, serverURLs []string, peerID string) *Manager { - return &Manager{ - ctx: ctx, - serverURLs: serverURLs, - peerID: peerID, - tokenStore: &relayAuth.TokenStore{}, + tokenStore := &relayAuth.TokenStore{} + + m := &Manager{ + ctx: ctx, + peerID: peerID, + tokenStore: tokenStore, + serverPicker: &ServerPicker{ + TokenStore: tokenStore, + PeerID: peerID, + }, relayClients: make(map[string]*RelayTrack), onDisconnectedListeners: make(map[string]*list.List), } + m.serverPicker.ServerURLs.Store(serverURLs) + m.reconnectGuard = NewGuard(m.serverPicker) + return m } -// Serve starts the manager. It will establish a connection to the relay server and start the relay cleanup loop for -// the unused relay connections. The manager will automatically reconnect to the relay server in case of disconnection. +// Serve starts the manager, attempting to establish a connection with the relay server. +// If the connection fails, it will keep trying to reconnect in the background. +// Additionally, it starts a cleanup loop to remove unused relay connections. +// The manager will automatically reconnect to the relay server in case of disconnection. func (m *Manager) Serve() error { - if m.relayClient != nil { + if m.running { return fmt.Errorf("manager already serving") } - log.Debugf("starting relay client manager with %v relay servers", m.serverURLs) + m.running = true + log.Debugf("starting relay client manager with %v relay servers", m.serverPicker.ServerURLs.Load()) - sp := ServerPicker{ - TokenStore: m.tokenStore, - PeerID: m.peerID, - } - - client, err := sp.PickServer(m.ctx, m.serverURLs) + client, err := m.serverPicker.PickServer(m.ctx) if err != nil { - return err + go m.reconnectGuard.StartReconnectTrys(m.ctx, nil) + } else { + m.storeClient(client) } - m.relayClient = client - m.reconnectGuard = NewGuard(m.ctx, m.relayClient) - m.relayClient.SetOnConnectedListener(m.onServerConnected) - m.relayClient.SetOnDisconnectListener(func() { - m.onServerDisconnected(client.connectionURL) - }) - m.startCleanupLoop() - return nil + go m.listenGuardEvent(m.ctx) + go m.startCleanupLoop() + return err } // OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be // established via the relay server. If the peer is on a different relay server, the manager will establish a new // connection to the relay server. It returns back with a net.Conn what represent the remote peer connection. func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) { + m.relayClientMu.Lock() + defer m.relayClientMu.Unlock() + if m.relayClient == nil { return nil, ErrRelayClientNotConnected } @@ -146,6 +155,9 @@ func (m *Manager) OpenConn(serverAddress, peerKey string) (net.Conn, error) { // Ready returns true if the home Relay client is connected to the relay server. func (m *Manager) Ready() bool { + m.relayClientMu.Lock() + defer m.relayClientMu.Unlock() + if m.relayClient == nil { return false } @@ -159,6 +171,13 @@ func (m *Manager) SetOnReconnectedListener(f func()) { // AddCloseListener adds a listener to the given server instance address. The listener will be called if the connection // closed. func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServerCloseListener) error { + m.relayClientMu.Lock() + defer m.relayClientMu.Unlock() + + if m.relayClient == nil { + return ErrRelayClientNotConnected + } + foreign, err := m.isForeignServer(serverAddress) if err != nil { return err @@ -177,6 +196,9 @@ func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServ // RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is // lost. This address will be sent to the target peer to choose the common relay server for the communication. func (m *Manager) RelayInstanceAddress() (string, error) { + m.relayClientMu.Lock() + defer m.relayClientMu.Unlock() + if m.relayClient == nil { return "", ErrRelayClientNotConnected } @@ -185,13 +207,18 @@ func (m *Manager) RelayInstanceAddress() (string, error) { // ServerURLs returns the addresses of the relay servers. func (m *Manager) ServerURLs() []string { - return m.serverURLs + return m.serverPicker.ServerURLs.Load().([]string) } // HasRelayAddress returns true if the manager is serving. With this method can check if the peer can communicate with // Relay service. func (m *Manager) HasRelayAddress() bool { - return len(m.serverURLs) > 0 + return len(m.serverPicker.ServerURLs.Load().([]string)) > 0 +} + +func (m *Manager) UpdateServerURLs(serverURLs []string) { + log.Infof("update relay server URLs: %v", serverURLs) + m.serverPicker.ServerURLs.Store(serverURLs) } // UpdateToken updates the token in the token store. @@ -245,9 +272,7 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) { return nil, err } // if connection closed then delete the relay client from the list - relayClient.SetOnDisconnectListener(func() { - m.onServerDisconnected(serverAddress) - }) + relayClient.SetOnDisconnectListener(m.onServerDisconnected) rt.relayClient = relayClient rt.Unlock() @@ -265,14 +290,37 @@ func (m *Manager) onServerConnected() { go m.onReconnectedListenerFn() } +// onServerDisconnected start to reconnection for home server only func (m *Manager) onServerDisconnected(serverAddress string) { + m.relayClientMu.Lock() if serverAddress == m.relayClient.connectionURL { - go m.reconnectGuard.OnDisconnected() + go m.reconnectGuard.StartReconnectTrys(m.ctx, m.relayClient) } + m.relayClientMu.Unlock() m.notifyOnDisconnectListeners(serverAddress) } +func (m *Manager) listenGuardEvent(ctx context.Context) { + for { + select { + case rc := <-m.reconnectGuard.OnNewRelayClient: + m.storeClient(rc) + case <-ctx.Done(): + return + } + } +} + +func (m *Manager) storeClient(client *Client) { + m.relayClientMu.Lock() + defer m.relayClientMu.Unlock() + + m.relayClient = client + m.relayClient.SetOnConnectedListener(m.onServerConnected) + m.relayClient.SetOnDisconnectListener(m.onServerDisconnected) +} + func (m *Manager) isForeignServer(address string) (bool, error) { rAddr, err := m.relayClient.ServerInstanceURL() if err != nil { @@ -282,22 +330,16 @@ func (m *Manager) isForeignServer(address string) (bool, error) { } func (m *Manager) startCleanupLoop() { - if m.ctx.Err() != nil { - return - } - ticker := time.NewTicker(relayCleanupInterval) - go func() { - defer ticker.Stop() - for { - select { - case <-m.ctx.Done(): - return - case <-ticker.C: - m.cleanUpUnusedRelays() - } + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + return + case <-ticker.C: + m.cleanUpUnusedRelays() } - }() + } } func (m *Manager) cleanUpUnusedRelays() { diff --git a/relay/client/picker.go b/relay/client/picker.go index 13b0547aa..eb5062dbb 100644 --- a/relay/client/picker.go +++ b/relay/client/picker.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "sync/atomic" "time" log "github.com/sirupsen/logrus" @@ -12,10 +13,13 @@ import ( ) const ( - connectionTimeout = 30 * time.Second maxConcurrentServers = 7 ) +var ( + connectionTimeout = 30 * time.Second +) + type connResult struct { RelayClient *Client Url string @@ -24,20 +28,22 @@ type connResult struct { type ServerPicker struct { TokenStore *auth.TokenStore + ServerURLs atomic.Value PeerID string } -func (sp *ServerPicker) PickServer(parentCtx context.Context, urls []string) (*Client, error) { +func (sp *ServerPicker) PickServer(parentCtx context.Context) (*Client, error) { ctx, cancel := context.WithTimeout(parentCtx, connectionTimeout) defer cancel() - totalServers := len(urls) + totalServers := len(sp.ServerURLs.Load().([]string)) connResultChan := make(chan connResult, totalServers) successChan := make(chan connResult, 1) concurrentLimiter := make(chan struct{}, maxConcurrentServers) - for _, url := range urls { + log.Debugf("pick server from list: %v", sp.ServerURLs.Load().([]string)) + for _, url := range sp.ServerURLs.Load().([]string) { // todo check if we have a successful connection so we do not need to connect to other servers concurrentLimiter <- struct{}{} go func(url string) { @@ -78,7 +84,7 @@ func (sp *ServerPicker) processConnResults(resultChan chan connResult, successCh for numOfResults := 0; numOfResults < cap(resultChan); numOfResults++ { cr := <-resultChan if cr.Err != nil { - log.Debugf("failed to connect to Relay server: %s: %v", cr.Url, cr.Err) + log.Tracef("failed to connect to Relay server: %s: %v", cr.Url, cr.Err) continue } log.Infof("connected to Relay server: %s", cr.Url) diff --git a/relay/client/picker_test.go b/relay/client/picker_test.go index 4800e05ba..20a03e64d 100644 --- a/relay/client/picker_test.go +++ b/relay/client/picker_test.go @@ -7,16 +7,19 @@ import ( ) func TestServerPicker_UnavailableServers(t *testing.T) { + connectionTimeout = 5 * time.Second + sp := ServerPicker{ TokenStore: nil, PeerID: "test", } + sp.ServerURLs.Store([]string{"rel://dummy1", "rel://dummy2"}) ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout+1) defer cancel() go func() { - _, err := sp.PickServer(ctx, []string{"rel://dummy1", "rel://dummy2"}) + _, err := sp.PickServer(ctx) if err == nil { t.Error(err) }