mirror of
https://github.com/netbirdio/netbird.git
synced 2026-07-04 13:49:56 +00:00
Compare commits
1 Commits
wg_watcher
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3aa6c02b93 |
@@ -195,6 +195,7 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
||||
statusICE: worker.NewAtomicStatus(),
|
||||
dumpState: dumpState,
|
||||
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
|
||||
wgWatcher: NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState),
|
||||
metricsRecorder: services.MetricsRecorder,
|
||||
}
|
||||
|
||||
@@ -662,12 +663,11 @@ func (conn *Conn) onGuardEvent() {
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) onWGDisconnected(watcherCtx context.Context) {
|
||||
func (conn *Conn) onWGDisconnected() {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
// watcherCtx guards against a stale watcher tearing down a connection that already superseded it.
|
||||
if conn.ctx.Err() != nil || watcherCtx.Err() != nil {
|
||||
if conn.ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -802,39 +802,25 @@ func (conn *Conn) isConnectedOnAllWay() (status guard.ConnStatus) {
|
||||
})
|
||||
}
|
||||
|
||||
// enableWgWatcherIfNeeded starts a fresh watcher instance per connection attempt, so its
|
||||
// lifecycle stays bound to conn.mu and enable/disable can't race an old goroutine's shutdown.
|
||||
// Caller must hold conn.mu.
|
||||
func (conn *Conn) enableWgWatcherIfNeeded(enabledTime time.Time) {
|
||||
if conn.wgWatcher != nil {
|
||||
if !conn.wgWatcher.PrepareInitialHandshake() {
|
||||
return
|
||||
}
|
||||
|
||||
watcher := NewWGWatcher(conn.Log, conn.config.WgConfig.WgInterface, conn.config.Key, conn.dumpState)
|
||||
watcher.PrepareInitialHandshake()
|
||||
|
||||
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
|
||||
conn.wgWatcher = watcher
|
||||
conn.wgWatcherCancel = wgWatcherCancel
|
||||
|
||||
conn.wgWatcherWg.Add(1)
|
||||
go func() {
|
||||
defer conn.wgWatcherWg.Done()
|
||||
onDisconnected := func() { conn.onWGDisconnected(wgWatcherCtx) }
|
||||
watcher.EnableWgWatcher(wgWatcherCtx, enabledTime, onDisconnected, conn.onWGHandshakeSuccess)
|
||||
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, enabledTime, conn.onWGDisconnected, conn.onWGHandshakeSuccess)
|
||||
}()
|
||||
}
|
||||
|
||||
// disableWgWatcherIfNeeded cancels and drops the watcher once no transport is active. It never
|
||||
// waits for the goroutine: the timeout path reentrantly calls back here under conn.mu, so
|
||||
// blocking would deadlock. Caller must hold conn.mu.
|
||||
func (conn *Conn) disableWgWatcherIfNeeded() {
|
||||
if conn.currentConnPriority != conntype.None || conn.wgWatcher == nil {
|
||||
return
|
||||
if conn.currentConnPriority == conntype.None && conn.wgWatcherCancel != nil {
|
||||
conn.wgWatcherCancel()
|
||||
conn.wgWatcherCancel = nil
|
||||
}
|
||||
conn.wgWatcherCancel()
|
||||
conn.wgWatcher = nil
|
||||
conn.wgWatcherCancel = nil
|
||||
}
|
||||
|
||||
func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
|
||||
@@ -857,9 +843,7 @@ func (conn *Conn) resetEndpoint() {
|
||||
return
|
||||
}
|
||||
conn.Log.Infof("reset wg endpoint")
|
||||
if conn.wgWatcher != nil {
|
||||
conn.wgWatcher.Reset()
|
||||
}
|
||||
conn.wgWatcher.Reset()
|
||||
if err := conn.endpointUpdater.RemoveEndpointAddress(); err != nil {
|
||||
conn.Log.Warnf("failed to remove endpoint address before update: %v", err)
|
||||
}
|
||||
|
||||
@@ -85,7 +85,11 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
|
||||
defer g.srWatcher.RemoveListener(srReconnectedChan)
|
||||
|
||||
ticker := g.initialTicker(ctx)
|
||||
defer ticker.Stop()
|
||||
defer func() {
|
||||
// If backoff.Ticker.send is blocked, context.Done will not close the Ticker goroutine.
|
||||
// We have to explicitly call Stop, even if we use backoff.WithContext.
|
||||
ticker.Stop()
|
||||
}()
|
||||
|
||||
tickerChannel := ticker.C
|
||||
|
||||
|
||||
92
client/internal/peer/guard/guard_leak_test.go
Normal file
92
client/internal/peer/guard/guard_leak_test.go
Normal file
@@ -0,0 +1,92 @@
|
||||
package guard
|
||||
|
||||
import (
|
||||
"context"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/peer/ice"
|
||||
)
|
||||
|
||||
func newTestGuard(status connStatusFunc) *Guard {
|
||||
srw := NewSRWatcher(nil, nil, nil, ice.Config{})
|
||||
return NewGuard(log.WithField("test", "guard"), status, 50*time.Millisecond, srw)
|
||||
}
|
||||
|
||||
// countBackoffTickerGoroutines returns how many goroutines are currently sitting
|
||||
// in backoff/v4.(*Ticker).run (a ticker goroutine that has not exited).
|
||||
func countBackoffTickerGoroutines() int {
|
||||
buf := make([]byte, 1<<25) // 32MB
|
||||
n := runtime.Stack(buf, true)
|
||||
return strings.Count(string(buf[:n]), "backoff/v4.(*Ticker).run")
|
||||
}
|
||||
|
||||
// TestGuard_ReconnectTicker_NoGoroutineLeakOnShutdown reproduces a observed
|
||||
// leak: after a shutdown burst, ticker run/send goroutines stay parked
|
||||
// forever even though every reconnect loop has exited.
|
||||
func TestGuard_ReconnectTicker_NoGoroutineLeakOnShutdown(t *testing.T) {
|
||||
before := countBackoffTickerGoroutines()
|
||||
|
||||
const peers = 6000
|
||||
cancels := make([]context.CancelFunc, 0, peers)
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// A status check slower than the tick cadence. This models the real
|
||||
// isConnectedOnAllWay/callback doing work: while the loop is busy in the
|
||||
// handler, the ticker fires the next tick and parks in send(), because
|
||||
// send() never selects on ctx.
|
||||
slowStatus := func() ConnStatus {
|
||||
time.Sleep(70 * time.Millisecond)
|
||||
return ConnStatusConnected
|
||||
}
|
||||
|
||||
for range peers {
|
||||
g := newTestGuard(slowStatus)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
cancels = append(cancels, cancel)
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
g.Start(ctx, func() {})
|
||||
}()
|
||||
// Force the live ticker to be a newReconnectTicker.
|
||||
g.SetRelayedConnDisconnected()
|
||||
}
|
||||
|
||||
// Let the replacement tickers get past their 800ms initial interval, so
|
||||
// many are parked in send() waiting on the (slow) consumer when we tear
|
||||
// everything down.
|
||||
time.Sleep(1500 * time.Millisecond)
|
||||
|
||||
// Shutdown burst: cancel every peer at once, like engine teardown.
|
||||
for _, c := range cancels {
|
||||
c()
|
||||
}
|
||||
|
||||
// Every reconnect loop must return
|
||||
waitCh := make(chan struct{})
|
||||
go func() { wg.Wait(); close(waitCh) }()
|
||||
select {
|
||||
case <-waitCh:
|
||||
case <-time.After(30 * time.Second):
|
||||
t.Fatal("not all reconnect loops returned after ctx cancel")
|
||||
}
|
||||
|
||||
// Give any correctly-stopped ticker goroutines time to unwind.
|
||||
for range 50 {
|
||||
runtime.Gosched()
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
}
|
||||
|
||||
leaked := countBackoffTickerGoroutines() - before
|
||||
t.Logf("backoff Ticker.run goroutines still parked after teardown of %d peers: %d", peers, leaked)
|
||||
if leaked > 0 {
|
||||
t.Errorf("LEAK: %d backoff ticker goroutines parked after all reconnect loops exited "+
|
||||
"(defer ticker.Stop() stops the initial ticker, not the live replacement)", leaked)
|
||||
}
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package peer
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -23,14 +24,14 @@ type WGInterfaceStater interface {
|
||||
GetStats() (map[string]configurer.WGStats, error)
|
||||
}
|
||||
|
||||
// WGWatcher is single-shot: one instance per connection attempt, run once, then discarded.
|
||||
// Lifecycle is owned by Conn under conn.mu, so it keeps no "enabled" state to go stale.
|
||||
type WGWatcher struct {
|
||||
log *log.Entry
|
||||
wgIfaceStater WGInterfaceStater
|
||||
peerKey string
|
||||
stateDump *stateDump
|
||||
|
||||
enabled bool
|
||||
muEnabled sync.Mutex
|
||||
// initialHandshake is not thread-safe; never call PrepareInitialHandshake and EnableWgWatcher concurrently.
|
||||
initialHandshake time.Time
|
||||
|
||||
@@ -47,14 +48,25 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
|
||||
}
|
||||
}
|
||||
|
||||
// PrepareInitialHandshake reads the peer's current WireGuard handshake time. It must be
|
||||
// called before the peer is (re)configured on the WireGuard interface, so the captured
|
||||
// baseline reflects the state prior to this connection attempt instead of racing with
|
||||
// that configuration.
|
||||
func (w *WGWatcher) PrepareInitialHandshake() {
|
||||
// PrepareInitialHandshake reserves the watcher and reads the peer's current WireGuard
|
||||
// handshake time. It must be called before the peer is (re)configured on the WireGuard
|
||||
// interface, so the captured baseline reflects the state prior to this connection attempt
|
||||
// instead of racing with that configuration. Returns ok=false if the watcher is already
|
||||
// running, in which case EnableWgWatcher must not be called.
|
||||
func (w *WGWatcher) PrepareInitialHandshake() (ok bool) {
|
||||
w.muEnabled.Lock()
|
||||
if w.enabled {
|
||||
w.muEnabled.Unlock()
|
||||
return false
|
||||
}
|
||||
|
||||
w.log.Debugf("enable WireGuard watcher")
|
||||
w.enabled = true
|
||||
w.muEnabled.Unlock()
|
||||
|
||||
handshake, _ := w.wgState()
|
||||
w.initialHandshake = handshake
|
||||
return true
|
||||
}
|
||||
|
||||
// EnableWgWatcher runs the WireGuard watcher loop using the handshake baseline captured by
|
||||
@@ -62,6 +74,10 @@ func (w *WGWatcher) PrepareInitialHandshake() {
|
||||
// for context lifecycle management.
|
||||
func (w *WGWatcher) EnableWgWatcher(ctx context.Context, enabledTime time.Time, onDisconnectedFn func(), onHandshakeSuccessFn func(when time.Time)) {
|
||||
w.periodicHandshakeCheck(ctx, onDisconnectedFn, onHandshakeSuccessFn, enabledTime, w.initialHandshake)
|
||||
|
||||
w.muEnabled.Lock()
|
||||
w.enabled = false
|
||||
w.muEnabled.Unlock()
|
||||
}
|
||||
|
||||
// Reset signals the watcher that the WireGuard peer has been reset and a new
|
||||
@@ -87,7 +103,6 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn
|
||||
case <-timer.C:
|
||||
handshake, ok := w.handshakeCheck(lastHandshake)
|
||||
if !ok {
|
||||
// early ctx cancel check return
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
@@ -132,9 +147,9 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) {
|
||||
|
||||
w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake)
|
||||
|
||||
// the current known handshake did not change
|
||||
// the current know handshake did not change
|
||||
if handshake.Equal(lastHandshake) {
|
||||
w.log.Warnf("WireGuard handshake not updated: %v", handshake)
|
||||
w.log.Warnf("WireGuard handshake timed out: %v", handshake)
|
||||
return nil, false
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||
)
|
||||
@@ -34,7 +35,8 @@ func TestWGWatcher_EnableWgWatcher(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
watcher.PrepareInitialHandshake()
|
||||
ok := watcher.PrepareInitialHandshake()
|
||||
require.True(t, ok, "watcher should not be enabled yet")
|
||||
|
||||
onDisconnected := make(chan struct{}, 1)
|
||||
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
|
||||
@@ -64,7 +66,8 @@ func TestWGWatcher_ReEnable(t *testing.T) {
|
||||
watcher := NewWGWatcher(mlog, mocWgIface, "", newStateDump("peer", mlog, &Status{}))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
watcher.PrepareInitialHandshake()
|
||||
ok := watcher.PrepareInitialHandshake()
|
||||
require.True(t, ok, "watcher should not be enabled yet")
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
@@ -80,7 +83,8 @@ func TestWGWatcher_ReEnable(t *testing.T) {
|
||||
ctx, cancel = context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
watcher.PrepareInitialHandshake()
|
||||
ok = watcher.PrepareInitialHandshake()
|
||||
require.True(t, ok, "watcher should be re-enabled after the previous run stopped")
|
||||
|
||||
onDisconnected := make(chan struct{}, 1)
|
||||
go watcher.EnableWgWatcher(ctx, time.Now(), func() {
|
||||
|
||||
Reference in New Issue
Block a user