From f17ea2aa574af4593931ddc837772a14f6a089cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 25 Sep 2025 20:31:26 +0200 Subject: [PATCH] Add guard switch --- client/internal/peer/conn.go | 26 +++- client/internal/peer/guard.go | 10 ++ client/internal/peer/guard/guard_retry.go | 139 ++++++++++++++++++++++ 3 files changed, 172 insertions(+), 3 deletions(-) create mode 100644 client/internal/peer/guard.go create mode 100644 client/internal/peer/guard/guard_retry.go diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index bb0c9dc9a..4620cbac1 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -112,10 +112,14 @@ type Conn struct { wgProxyRelay wgproxy.Proxy handshaker *Handshaker - guard *guard.Guard + guard Guard semaphore *semaphoregroup.SemaphoreGroup wg sync.WaitGroup + // used for replace the new guard with the old one in a thread-safe way + guardCtxCancel context.CancelFunc + wgGuard sync.WaitGroup + // debug purpose dumpState *stateDump } @@ -198,13 +202,18 @@ func (conn *Conn) Open(engineCtx context.Context) error { } conn.wg.Add(1) + conn.wgGuard.Add(1) + guardCtx, cancel := context.WithCancel(conn.ctx) + conn.guardCtxCancel = cancel go func() { defer conn.wg.Done() + defer conn.wgGuard.Done() + defer cancel() conn.waitInitialRandomSleepTime(conn.ctx) conn.semaphore.Done(conn.ctx) - conn.guard.Start(conn.ctx, conn.onGuardEvent) + conn.guard.Start(guardCtx, conn.onGuardEvent) }() // both peer send offer @@ -214,7 +223,7 @@ func (conn *Conn) Open(engineCtx context.Context) error { case ErrPeerNotAvailable: conn.guard.FailedToSendOffer() case ErrSignalNotSupportDeliveryCheck: - // todo replace guard with the original connection logic + conn.switchGuard() } } @@ -796,6 +805,17 @@ func (conn *Conn) rosenpassDetermKey() (*wgtypes.Key, error) { return &key, nil } +func (conn *Conn) switchGuard() { + conn.guardCtxCancel() + conn.wgGuard.Wait() + conn.wg.Add(1) + go func() { + defer conn.wg.Done() + conn.guard = guard.NewGuardRetry(conn.Log, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher) + conn.guard.Start(conn.ctx, conn.onGuardEvent) + }() +} + func isController(config ConnConfig) bool { return config.LocalKey > config.Key } diff --git a/client/internal/peer/guard.go b/client/internal/peer/guard.go new file mode 100644 index 000000000..11ce297aa --- /dev/null +++ b/client/internal/peer/guard.go @@ -0,0 +1,10 @@ +package peer + +import "context" + +type Guard interface { + Start(ctx context.Context, eventCallback func()) + SetRelayedConnDisconnected() + SetICEConnDisconnected() + FailedToSendOffer() +} diff --git a/client/internal/peer/guard/guard_retry.go b/client/internal/peer/guard/guard_retry.go new file mode 100644 index 000000000..6b87311c6 --- /dev/null +++ b/client/internal/peer/guard/guard_retry.go @@ -0,0 +1,139 @@ +package guard + +import ( + "context" + "time" + + "github.com/cenkalti/backoff/v4" + log "github.com/sirupsen/logrus" +) + +// GuardRetry is responsible for the reconnection logic. +// It will trigger to send an offer to the peer then has connection issues. +// Watch these events: +// - Relay client reconnected to home server +// - Signal server connection state changed +// - ICE connection disconnected +// - Relayed connection disconnected +// - ICE candidate changes +type GuardRetry struct { + log *log.Entry + isConnectedOnAllWay isConnectedFunc + timeout time.Duration + srWatcher *SRWatcher + relayedConnDisconnected chan struct{} + iCEConnDisconnected chan struct{} +} + +func (g *GuardRetry) FailedToSendOffer() { + log.Errorf("FailedToSendOffer is not implemented in GuardRetry") +} + +func NewGuardRetry(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *GuardRetry { + return &GuardRetry{ + log: log, + isConnectedOnAllWay: isConnectedFn, + timeout: timeout, + srWatcher: srWatcher, + relayedConnDisconnected: make(chan struct{}, 1), + iCEConnDisconnected: make(chan struct{}, 1), + } +} + +func (g *GuardRetry) Start(ctx context.Context, eventCallback func()) { + g.log.Infof("starting guard for reconnection with MaxInterval: %s", g.timeout) + g.reconnectLoopWithRetry(ctx, eventCallback) +} + +func (g *GuardRetry) SetRelayedConnDisconnected() { + select { + case g.relayedConnDisconnected <- struct{}{}: + default: + } +} + +func (g *GuardRetry) SetICEConnDisconnected() { + select { + case g.iCEConnDisconnected <- struct{}{}: + default: + } +} + +// reconnectLoopWithRetry periodically check the connection status. +// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported +func (g *GuardRetry) reconnectLoopWithRetry(ctx context.Context, callback func()) { + srReconnectedChan := g.srWatcher.NewListener() + defer g.srWatcher.RemoveListener(srReconnectedChan) + + ticker := g.initialTicker(ctx) + defer ticker.Stop() + + tickerChannel := ticker.C + + for { + select { + case t := <-tickerChannel: + if t.IsZero() { + g.log.Infof("retry timed out, stop periodic offer sending") + // after backoff timeout the ticker.C will be closed. We need to a dummy channel to avoid loop + tickerChannel = make(<-chan time.Time) + continue + } + + if !g.isConnectedOnAllWay() { + callback() + } + case <-g.relayedConnDisconnected: + g.log.Debugf("Relay connection changed, reset reconnection ticker") + ticker.Stop() + ticker = g.prepareExponentTicker(ctx) + tickerChannel = ticker.C + + case <-g.iCEConnDisconnected: + g.log.Debugf("ICE connection changed, reset reconnection ticker") + ticker.Stop() + ticker = g.prepareExponentTicker(ctx) + tickerChannel = ticker.C + + case <-srReconnectedChan: + g.log.Debugf("has network changes, reset reconnection ticker") + ticker.Stop() + ticker = g.prepareExponentTicker(ctx) + tickerChannel = ticker.C + + case <-ctx.Done(): + g.log.Debugf("context is done, stop reconnect loop") + return + } + } +} + +// initialTicker give chance to the peer to establish the initial connection. +func (g *GuardRetry) initialTicker(ctx context.Context) *backoff.Ticker { + bo := backoff.WithContext(&backoff.ExponentialBackOff{ + InitialInterval: 3 * time.Second, + RandomizationFactor: 0.1, + Multiplier: 2, + MaxInterval: g.timeout, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + }, ctx) + + return backoff.NewTicker(bo) +} + +func (g *GuardRetry) prepareExponentTicker(ctx context.Context) *backoff.Ticker { + bo := backoff.WithContext(&backoff.ExponentialBackOff{ + InitialInterval: 800 * time.Millisecond, + RandomizationFactor: 0.1, + Multiplier: 2, + MaxInterval: g.timeout, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + }, ctx) + + ticker := backoff.NewTicker(bo) + <-ticker.C // consume the initial tick what is happening right after the ticker has been created + + return ticker +}