mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
Remove the on reconnect callback from client layer. This event will be managed by guard.
This commit is contained in:
@@ -141,7 +141,6 @@ type Client struct {
|
|||||||
muInstanceURL sync.Mutex
|
muInstanceURL sync.Mutex
|
||||||
|
|
||||||
onDisconnectListener func(string)
|
onDisconnectListener func(string)
|
||||||
onConnectedListener func()
|
|
||||||
listenerMutex sync.Mutex
|
listenerMutex sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,7 +189,6 @@ func (c *Client) Connect() error {
|
|||||||
|
|
||||||
c.wgReadLoop.Add(1)
|
c.wgReadLoop.Add(1)
|
||||||
go c.readLoop(c.relayConn)
|
go c.readLoop(c.relayConn)
|
||||||
go c.notifyConnected()
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -238,12 +236,6 @@ func (c *Client) SetOnDisconnectListener(fn func(string)) {
|
|||||||
c.onDisconnectListener = fn
|
c.onDisconnectListener = fn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) SetOnConnectedListener(fn func()) {
|
|
||||||
c.listenerMutex.Lock()
|
|
||||||
defer c.listenerMutex.Unlock()
|
|
||||||
c.onConnectedListener = fn
|
|
||||||
}
|
|
||||||
|
|
||||||
// HasConns returns true if there are connections.
|
// HasConns returns true if there are connections.
|
||||||
func (c *Client) HasConns() bool {
|
func (c *Client) HasConns() bool {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
@@ -559,16 +551,6 @@ func (c *Client) notifyDisconnected() {
|
|||||||
go c.onDisconnectListener(c.connectionURL)
|
go c.onDisconnectListener(c.connectionURL)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) notifyConnected() {
|
|
||||||
c.listenerMutex.Lock()
|
|
||||||
defer c.listenerMutex.Unlock()
|
|
||||||
|
|
||||||
if c.onConnectedListener == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
go c.onConnectedListener()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) writeCloseMsg() {
|
func (c *Client) writeCloseMsg() {
|
||||||
msg := messages.MarshalCloseMsg()
|
msg := messages.MarshalCloseMsg()
|
||||||
_, err := c.relayConn.Write(msg)
|
_, err := c.relayConn.Write(msg)
|
||||||
|
|||||||
@@ -14,8 +14,9 @@ var (
|
|||||||
|
|
||||||
// Guard manage the reconnection tries to the Relay server in case of disconnection event.
|
// Guard manage the reconnection tries to the Relay server in case of disconnection event.
|
||||||
type Guard struct {
|
type Guard struct {
|
||||||
// OnNewRelayClient is a channel that is used to notify the relay client about a new relay client instance.
|
// OnNewRelayClient is a channel that is used to notify the relay manager about a new relay client instance.
|
||||||
OnNewRelayClient chan *Client
|
OnNewRelayClient chan *Client
|
||||||
|
OnReconnected chan struct{}
|
||||||
serverPicker *ServerPicker
|
serverPicker *ServerPicker
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -23,6 +24,7 @@ type Guard struct {
|
|||||||
func NewGuard(sp *ServerPicker) *Guard {
|
func NewGuard(sp *ServerPicker) *Guard {
|
||||||
g := &Guard{
|
g := &Guard{
|
||||||
OnNewRelayClient: make(chan *Client, 1),
|
OnNewRelayClient: make(chan *Client, 1),
|
||||||
|
OnReconnected: make(chan struct{}, 1),
|
||||||
serverPicker: sp,
|
serverPicker: sp,
|
||||||
}
|
}
|
||||||
return g
|
return g
|
||||||
@@ -39,14 +41,13 @@ func NewGuard(sp *ServerPicker) *Guard {
|
|||||||
// - relayClient: The relay client instance that was disconnected.
|
// - relayClient: The relay client instance that was disconnected.
|
||||||
// todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent
|
// todo prevent multiple reconnection instances. In the current usage it should not happen, but it is better to prevent
|
||||||
func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
|
func (g *Guard) StartReconnectTrys(ctx context.Context, relayClient *Client) {
|
||||||
if relayClient == nil {
|
// try to reconnect to the same server
|
||||||
goto RETRY
|
if ok := g.tryToQuickReconnect(ctx, relayClient); ok {
|
||||||
}
|
g.notifyReconnected()
|
||||||
if g.isServerURLStillValid(relayClient) && g.quickReconnect(ctx, relayClient) {
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
RETRY:
|
// start a ticker to pick a new server
|
||||||
ticker := exponentTicker(ctx)
|
ticker := exponentTicker(ctx)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
@@ -64,6 +65,28 @@ RETRY:
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *Guard) tryToQuickReconnect(parentCtx context.Context, rc *Client) bool {
|
||||||
|
if rc == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if !g.isServerURLStillValid(rc) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if cancelled := waiteBeforeRetry(parentCtx); !cancelled {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("try to reconnect to Relay server: %s", rc.connectionURL)
|
||||||
|
|
||||||
|
if err := rc.Connect(); err != nil {
|
||||||
|
log.Errorf("failed to reconnect to relay server: %s", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func (g *Guard) retry(ctx context.Context) error {
|
func (g *Guard) retry(ctx context.Context) error {
|
||||||
log.Infof("try to pick up a new Relay server")
|
log.Infof("try to pick up a new Relay server")
|
||||||
relayClient, err := g.serverPicker.PickServer(ctx)
|
relayClient, err := g.serverPicker.PickServer(ctx)
|
||||||
@@ -78,23 +101,6 @@ func (g *Guard) retry(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Guard) quickReconnect(parentCtx context.Context, rc *Client) bool {
|
|
||||||
ctx, cancel := context.WithTimeout(parentCtx, 1500*time.Millisecond)
|
|
||||||
defer cancel()
|
|
||||||
<-ctx.Done()
|
|
||||||
|
|
||||||
if parentCtx.Err() != nil {
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
log.Infof("try to reconnect to Relay server: %s", rc.connectionURL)
|
|
||||||
|
|
||||||
if err := rc.Connect(); err != nil {
|
|
||||||
log.Errorf("failed to reconnect to relay server: %s", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *Guard) drainRelayClientChan() {
|
func (g *Guard) drainRelayClientChan() {
|
||||||
select {
|
select {
|
||||||
case <-g.OnNewRelayClient:
|
case <-g.OnNewRelayClient:
|
||||||
@@ -111,6 +117,21 @@ func (g *Guard) isServerURLStillValid(rc *Client) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (g *Guard) notifyReconnected() {
|
||||||
|
select {
|
||||||
|
case g.OnReconnected <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (g *Guard) isReadyToQuickReconnect(relayClient *Client) bool {
|
||||||
|
if relayClient == nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return g.isServerURLStillValid(relayClient)
|
||||||
|
}
|
||||||
|
|
||||||
func exponentTicker(ctx context.Context) *backoff.Ticker {
|
func exponentTicker(ctx context.Context) *backoff.Ticker {
|
||||||
bo := backoff.WithContext(&backoff.ExponentialBackOff{
|
bo := backoff.WithContext(&backoff.ExponentialBackOff{
|
||||||
InitialInterval: 2 * time.Second,
|
InitialInterval: 2 * time.Second,
|
||||||
@@ -121,3 +142,15 @@ func exponentTicker(ctx context.Context) *backoff.Ticker {
|
|||||||
|
|
||||||
return backoff.NewTicker(bo)
|
return backoff.NewTicker(bo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func waiteBeforeRetry(ctx context.Context) bool {
|
||||||
|
timer := time.NewTimer(1500 * time.Millisecond)
|
||||||
|
defer timer.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
return true
|
||||||
|
case <-ctx.Done():
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -165,6 +165,9 @@ func (m *Manager) Ready() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) SetOnReconnectedListener(f func()) {
|
func (m *Manager) SetOnReconnectedListener(f func()) {
|
||||||
|
m.listenerLock.Lock()
|
||||||
|
defer m.listenerLock.Unlock()
|
||||||
|
|
||||||
m.onReconnectedListenerFn = f
|
m.onReconnectedListenerFn = f
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -284,6 +287,9 @@ func (m *Manager) openConnVia(serverAddress, peerKey string) (net.Conn, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) onServerConnected() {
|
func (m *Manager) onServerConnected() {
|
||||||
|
m.listenerLock.Lock()
|
||||||
|
defer m.listenerLock.Unlock()
|
||||||
|
|
||||||
if m.onReconnectedListenerFn == nil {
|
if m.onReconnectedListenerFn == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -304,8 +310,11 @@ func (m *Manager) onServerDisconnected(serverAddress string) {
|
|||||||
func (m *Manager) listenGuardEvent(ctx context.Context) {
|
func (m *Manager) listenGuardEvent(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-m.reconnectGuard.OnReconnected:
|
||||||
|
m.onServerConnected()
|
||||||
case rc := <-m.reconnectGuard.OnNewRelayClient:
|
case rc := <-m.reconnectGuard.OnNewRelayClient:
|
||||||
m.storeClient(rc)
|
m.storeClient(rc)
|
||||||
|
m.onServerConnected()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -317,7 +326,6 @@ func (m *Manager) storeClient(client *Client) {
|
|||||||
defer m.relayClientMu.Unlock()
|
defer m.relayClientMu.Unlock()
|
||||||
|
|
||||||
m.relayClient = client
|
m.relayClient = client
|
||||||
m.relayClient.SetOnConnectedListener(m.onServerConnected)
|
|
||||||
m.relayClient.SetOnDisconnectListener(m.onServerDisconnected)
|
m.relayClient.SetOnDisconnectListener(m.onServerDisconnected)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user