Add guard switch

This commit is contained in:
Zoltán Papp
2025-09-25 20:31:26 +02:00
parent 316fc15701
commit f17ea2aa57
3 changed files with 172 additions and 3 deletions

View File

@@ -112,10 +112,14 @@ type Conn struct {
wgProxyRelay wgproxy.Proxy
handshaker *Handshaker
guard *guard.Guard
guard Guard
semaphore *semaphoregroup.SemaphoreGroup
wg sync.WaitGroup
// used for replace the new guard with the old one in a thread-safe way
guardCtxCancel context.CancelFunc
wgGuard sync.WaitGroup
// debug purpose
dumpState *stateDump
}
@@ -198,13 +202,18 @@ func (conn *Conn) Open(engineCtx context.Context) error {
}
conn.wg.Add(1)
conn.wgGuard.Add(1)
guardCtx, cancel := context.WithCancel(conn.ctx)
conn.guardCtxCancel = cancel
go func() {
defer conn.wg.Done()
defer conn.wgGuard.Done()
defer cancel()
conn.waitInitialRandomSleepTime(conn.ctx)
conn.semaphore.Done(conn.ctx)
conn.guard.Start(conn.ctx, conn.onGuardEvent)
conn.guard.Start(guardCtx, conn.onGuardEvent)
}()
// both peer send offer
@@ -214,7 +223,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
case ErrPeerNotAvailable:
conn.guard.FailedToSendOffer()
case ErrSignalNotSupportDeliveryCheck:
// todo replace guard with the original connection logic
conn.switchGuard()
}
}
@@ -796,6 +805,17 @@ func (conn *Conn) rosenpassDetermKey() (*wgtypes.Key, error) {
return &key, nil
}
func (conn *Conn) switchGuard() {
conn.guardCtxCancel()
conn.wgGuard.Wait()
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
conn.guard = guard.NewGuardRetry(conn.Log, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
conn.guard.Start(conn.ctx, conn.onGuardEvent)
}()
}
func isController(config ConnConfig) bool {
return config.LocalKey > config.Key
}

View File

@@ -0,0 +1,10 @@
package peer
import "context"
type Guard interface {
Start(ctx context.Context, eventCallback func())
SetRelayedConnDisconnected()
SetICEConnDisconnected()
FailedToSendOffer()
}

View File

@@ -0,0 +1,139 @@
package guard
import (
"context"
"time"
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
)
// GuardRetry is responsible for the reconnection logic.
// It will trigger to send an offer to the peer then has connection issues.
// Watch these events:
// - Relay client reconnected to home server
// - Signal server connection state changed
// - ICE connection disconnected
// - Relayed connection disconnected
// - ICE candidate changes
type GuardRetry struct {
log *log.Entry
isConnectedOnAllWay isConnectedFunc
timeout time.Duration
srWatcher *SRWatcher
relayedConnDisconnected chan struct{}
iCEConnDisconnected chan struct{}
}
func (g *GuardRetry) FailedToSendOffer() {
log.Errorf("FailedToSendOffer is not implemented in GuardRetry")
}
func NewGuardRetry(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *GuardRetry {
return &GuardRetry{
log: log,
isConnectedOnAllWay: isConnectedFn,
timeout: timeout,
srWatcher: srWatcher,
relayedConnDisconnected: make(chan struct{}, 1),
iCEConnDisconnected: make(chan struct{}, 1),
}
}
func (g *GuardRetry) Start(ctx context.Context, eventCallback func()) {
g.log.Infof("starting guard for reconnection with MaxInterval: %s", g.timeout)
g.reconnectLoopWithRetry(ctx, eventCallback)
}
func (g *GuardRetry) SetRelayedConnDisconnected() {
select {
case g.relayedConnDisconnected <- struct{}{}:
default:
}
}
func (g *GuardRetry) SetICEConnDisconnected() {
select {
case g.iCEConnDisconnected <- struct{}{}:
default:
}
}
// reconnectLoopWithRetry periodically check the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
func (g *GuardRetry) reconnectLoopWithRetry(ctx context.Context, callback func()) {
srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan)
ticker := g.initialTicker(ctx)
defer ticker.Stop()
tickerChannel := ticker.C
for {
select {
case t := <-tickerChannel:
if t.IsZero() {
g.log.Infof("retry timed out, stop periodic offer sending")
// after backoff timeout the ticker.C will be closed. We need to a dummy channel to avoid loop
tickerChannel = make(<-chan time.Time)
continue
}
if !g.isConnectedOnAllWay() {
callback()
}
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE connection changed, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-srReconnectedChan:
g.log.Debugf("has network changes, reset reconnection ticker")
ticker.Stop()
ticker = g.prepareExponentTicker(ctx)
tickerChannel = ticker.C
case <-ctx.Done():
g.log.Debugf("context is done, stop reconnect loop")
return
}
}
}
// initialTicker give chance to the peer to establish the initial connection.
func (g *GuardRetry) initialTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 3 * time.Second,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
return backoff.NewTicker(bo)
}
func (g *GuardRetry) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0.1,
Multiplier: 2,
MaxInterval: g.timeout,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
ticker := backoff.NewTicker(bo)
<-ticker.C // consume the initial tick what is happening right after the ticker has been created
return ticker
}