From 14a99a8693ec4265ce5d6cf54986edb63cd59c9c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Wed, 29 Jan 2025 13:25:03 +0100 Subject: [PATCH] Remove the on reconnect callback from client layer. This event will be managed by guard. --- relay/client/client.go | 18 ---------- relay/client/guard.go | 79 +++++++++++++++++++++++++++++------------ relay/client/manager.go | 10 +++++- 3 files changed, 65 insertions(+), 42 deletions(-) diff --git a/relay/client/client.go b/relay/client/client.go index 3c23b70d2..9e7e54393 100644 --- a/relay/client/client.go +++ b/relay/client/client.go @@ -141,7 +141,6 @@ type Client struct { muInstanceURL sync.Mutex onDisconnectListener func(string) - onConnectedListener func() listenerMutex sync.Mutex } @@ -190,7 +189,6 @@ func (c *Client) Connect() error { c.wgReadLoop.Add(1) go c.readLoop(c.relayConn) - go c.notifyConnected() return nil } @@ -238,12 +236,6 @@ func (c *Client) SetOnDisconnectListener(fn func(string)) { c.onDisconnectListener = fn } -func (c *Client) SetOnConnectedListener(fn func()) { - c.listenerMutex.Lock() - defer c.listenerMutex.Unlock() - c.onConnectedListener = fn -} - // HasConns returns true if there are connections. func (c *Client) HasConns() bool { c.mu.Lock() @@ -559,16 +551,6 @@ func (c *Client) notifyDisconnected() { go c.onDisconnectListener(c.connectionURL) } -func (c *Client) notifyConnected() { - c.listenerMutex.Lock() - defer c.listenerMutex.Unlock() - - if c.onConnectedListener == nil { - return - } - go c.onConnectedListener() -} - func (c *Client) writeCloseMsg() { msg := messages.MarshalCloseMsg() _, err := c.relayConn.Write(msg) diff --git a/relay/client/guard.go b/relay/client/guard.go index b971363a8..457c5d3ae 100644 --- a/relay/client/guard.go +++ b/relay/client/guard.go @@ -14,8 +14,9 @@ var ( // Guard manage the reconnection tries to the Relay server in case of disconnection event. type Guard struct { - // OnNewRelayClient is a channel that is used to notify the relay client about a new relay client instance. + // OnNewRelayClient is a channel that is used to notify the relay manager about a new relay client instance. OnNewRelayClient chan *Client + OnReconnected chan struct{} serverPicker *ServerPicker } @@ -23,6 +24,7 @@ type Guard struct { func NewGuard(sp *ServerPicker) *Guard { g := &Guard{ OnNewRelayClient: make(chan *Client, 1), + OnReconnected: make(chan struct{}, 1), serverPicker: sp, } return g @@ -39,14 +41,13 @@ func NewGuard(sp *ServerPicker) *Guard { // - relayClient: The relay client instance that was disconnected. // todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) { - if relayClient == nil { - goto RETRY - } - if g.isServerURLStillValid(relayClient) && g.quickReconnect(ctx, relayClient) { + // try to reconnect to the same server + if ok := g.tryToQuickReconnect(ctx, relayClient); ok { + g.notifyReconnected() return } -RETRY: + // start a ticker to pick a new server ticker := exponentTicker(ctx) defer ticker.Stop() @@ -64,6 +65,28 @@ RETRY: } } +func (g *Guard) tryToQuickReconnect(parentCtx context.Context, rc *Client) bool { + if rc == nil { + return false + } + + if !g.isServerURLStillValid(rc) { + return false + } + + if cancelled := waiteBeforeRetry(parentCtx); !cancelled { + return false + } + + log.Infof("try to reconnect to Relay server: %s", rc.connectionURL) + + if err := rc.Connect(); err != nil { + log.Errorf("failed to reconnect to relay server: %s", err) + return false + } + return true +} + func (g *Guard) retry(ctx context.Context) error { log.Infof("try to pick up a new Relay server") relayClient, err := g.serverPicker.PickServer(ctx) @@ -78,23 +101,6 @@ func (g *Guard) retry(ctx context.Context) error { return nil } -func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool { - ctx, cancel := context.WithTimeout(parentCtx, 1500*time.Millisecond) - defer cancel() - <-ctx.Done() - - if parentCtx.Err() != nil { - return false - } - log.Infof("try to reconnect to Relay server: %s", rc.connectionURL) - - if err := rc.Connect(); err != nil { - log.Errorf("failed to reconnect to relay server: %s", err) - return false - } - return true -} - func (g *Guard) drainRelayClientChan() { select { case <-g.OnNewRelayClient: @@ -111,6 +117,21 @@ func (g *Guard) isServerURLStillValid(rc *Client) bool { return false } +func (g *Guard) notifyReconnected() { + select { + case g.OnReconnected <- struct{}{}: + default: + } +} + +func (g *Guard) isReadyToQuickReconnect(relayClient *Client) bool { + if relayClient == nil { + return false + } + + return g.isServerURLStillValid(relayClient) +} + func exponentTicker(ctx context.Context) *backoff.Ticker { bo := backoff.WithContext(&backoff.ExponentialBackOff{ InitialInterval: 2 * time.Second, @@ -121,3 +142,15 @@ func exponentTicker(ctx context.Context) *backoff.Ticker { return backoff.NewTicker(bo) } + +func waiteBeforeRetry(ctx context.Context) bool { + timer := time.NewTimer(1500 * time.Millisecond) + defer timer.Stop() + + select { + case <-timer.C: + return true + case <-ctx.Done(): + return false + } +} diff --git a/relay/client/manager.go b/relay/client/manager.go index d847bb879..26b113050 100644 --- a/relay/client/manager.go +++ b/relay/client/manager.go @@ -165,6 +165,9 @@ func (m *Manager) Ready() bool { } func (m *Manager) SetOnReconnectedListener(f func()) { + m.listenerLock.Lock() + defer m.listenerLock.Unlock() + m.onReconnectedListenerFn = f } @@ -284,6 +287,9 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) { } func (m *Manager) onServerConnected() { + m.listenerLock.Lock() + defer m.listenerLock.Unlock() + if m.onReconnectedListenerFn == nil { return } @@ -304,8 +310,11 @@ func (m *Manager) onServerDisconnected(serverAddress string) { func (m *Manager) listenGuardEvent(ctx context.Context) { for { select { + case <-m.reconnectGuard.OnReconnected: + m.onServerConnected() case rc := <-m.reconnectGuard.OnNewRelayClient: m.storeClient(rc) + m.onServerConnected() case <-ctx.Done(): return } @@ -317,7 +326,6 @@ func (m *Manager) storeClient(client *Client) { defer m.relayClientMu.Unlock() m.relayClient = client - m.relayClient.SetOnConnectedListener(m.onServerConnected) m.relayClient.SetOnDisconnectListener(m.onServerDisconnected) }