mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-26 09:49:56 +00:00
Compare commits
3 Commits
client-jso
...
signal-wat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8446713d28 | ||
|
|
12f2e69af2 | ||
|
|
4cb2c62f2a |
@@ -14,6 +14,7 @@ import (
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-multierror"
|
||||
@@ -88,6 +89,13 @@ var ErrResetConnection = fmt.Errorf("reset connection")
|
||||
|
||||
var ErrEngineAlreadyStarted = errors.New("engine already started")
|
||||
|
||||
// engineRestartCount and engineLastRestart track client-restart cadence across
|
||||
// engine recreations so a restart loop is distinguishable from rare restarts.
|
||||
var (
|
||||
engineRestartCount atomic.Int64
|
||||
engineLastRestart atomic.Int64
|
||||
)
|
||||
|
||||
type EngineConfig struct {
|
||||
WgPort int
|
||||
WgIfaceName string
|
||||
@@ -909,14 +917,23 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
if e.ctx.Err() != nil {
|
||||
return e.ctx.Err()
|
||||
}
|
||||
serial := update.GetNetworkMap().GetSerial()
|
||||
if nm := update.GetNetworkMap(); nm != nil {
|
||||
log.Infof("sync update: serial=%d remotePeers=%d offlinePeers=%d routes=%d firewallRules=%d checks=%d configPresent=%v remotePeersEmpty=%v",
|
||||
nm.GetSerial(), len(nm.GetRemotePeers()), len(nm.GetOfflinePeers()), len(nm.GetRoutes()),
|
||||
len(nm.GetFirewallRules()), len(update.GetChecks()), update.GetNetbirdConfig() != nil, nm.GetRemotePeersIsEmpty())
|
||||
} else {
|
||||
log.Infof("sync update: config-only (no network map), configPresent=%v", update.GetNetbirdConfig() != nil)
|
||||
}
|
||||
|
||||
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
|
||||
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
|
||||
}
|
||||
|
||||
startTime := time.Now()
|
||||
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("netbird config updated in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
// Posture checks are bound to the network map presence:
|
||||
// NetworkMap != nil, checks present -> apply the received checks
|
||||
@@ -927,17 +944,21 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
if nm == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
if err := e.updateChecksIfNew(update.Checks); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("checks updated in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
e.persistSyncResponse(update)
|
||||
|
||||
log.Infof("sync response persisted in %s, serial=%d", time.Since(startTime), serial)
|
||||
// only apply new changes and ignore old ones
|
||||
startTime = time.Now()
|
||||
if err := e.updateNetworkMap(nm); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("network map updated in %s, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.statusRecorder.PublishEvent(cProto.SystemEvent_INFO, cProto.SystemEvent_SYSTEM, "Network map updated", "", nil)
|
||||
|
||||
@@ -1357,44 +1378,56 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
|
||||
dnsConfig := toDNSConfig(protoDNSConfig, e.wgInterface.Address())
|
||||
|
||||
startTime := time.Now()
|
||||
if err := e.dnsServer.UpdateDNSServer(serial, dnsConfig); err != nil {
|
||||
log.Errorf("failed to update dns server, err: %v", err)
|
||||
}
|
||||
log.Infof("updated dns server in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.routeManager.SetDNSForwarderPort(dnsConfig.ForwarderPort)
|
||||
|
||||
// apply routes first, route related actions might depend on routing being enabled
|
||||
startTime = time.Now()
|
||||
routes := toRoutes(networkMap.GetRoutes())
|
||||
serverRoutes, clientRoutes := e.routeManager.ClassifyRoutes(routes)
|
||||
|
||||
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
|
||||
// lazy mgr needs to be aware of which routes are available before they are applied
|
||||
if e.connMgr != nil {
|
||||
e.connMgr.UpdateRouteHAMap(clientRoutes)
|
||||
log.Debugf("updated lazy connection manager with %d HA groups", len(clientRoutes))
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
dnsRouteFeatureFlag := toDNSFeatureFlag(networkMap)
|
||||
if err := e.routeManager.UpdateRoutes(serial, serverRoutes, clientRoutes, dnsRouteFeatureFlag); err != nil {
|
||||
log.Errorf("failed to update routes: %v", err)
|
||||
}
|
||||
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
if e.acl != nil {
|
||||
e.acl.ApplyFiltering(networkMap, dnsRouteFeatureFlag)
|
||||
}
|
||||
log.Infof("updated filtering in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
fwdEntries := toRouteDomains(e.config.WgPrivateKey.PublicKey().String(), routes)
|
||||
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
|
||||
log.Infof("updated DNS forwarder in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
startTime = time.Now()
|
||||
// Ingress forward rules
|
||||
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
|
||||
if err != nil {
|
||||
log.Errorf("failed to update forward rules, err: %v", err)
|
||||
}
|
||||
log.Infof("updated forward rules in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
||||
|
||||
startTime = time.Now()
|
||||
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
||||
|
||||
log.Infof("updated offline peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
// Filter out own peer from the remote peers list
|
||||
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
||||
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
|
||||
@@ -1412,20 +1445,24 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
startTime = time.Now()
|
||||
err := e.removePeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("removed peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
startTime = time.Now()
|
||||
err = e.modifyPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("modified peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
startTime = time.Now()
|
||||
err = e.addNewPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Infof("added peers in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
|
||||
@@ -1438,9 +1475,11 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
e.updateSSHServerAuth(networkMap.GetSshAuth())
|
||||
}
|
||||
|
||||
startTime = time.Now()
|
||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
|
||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||
log.Infof("updated lazy connection manager exclude list in %v, serial=%d", time.Since(startTime), serial)
|
||||
|
||||
e.networkSerial = serial
|
||||
|
||||
@@ -2171,7 +2210,14 @@ func (e *Engine) triggerClientRestart() {
|
||||
return
|
||||
}
|
||||
|
||||
log.Info("restarting engine")
|
||||
// Cadence survives engine recreation (package-level), so a restart loop shows
|
||||
// as a fast-climbing count with a short gap, distinct from rare intentional restarts.
|
||||
n := engineRestartCount.Add(1)
|
||||
var sinceLast time.Duration
|
||||
if prev := engineLastRestart.Swap(time.Now().UnixNano()); prev != 0 {
|
||||
sinceLast = time.Since(time.Unix(0, prev))
|
||||
}
|
||||
log.Infof("restarting engine (restart #%d, %s since previous)", n, sinceLast.Round(time.Second))
|
||||
CtxGetState(e.ctx).Set(StatusConnecting)
|
||||
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
|
||||
log.Infof("cancelling client context, engine will be recreated")
|
||||
|
||||
@@ -461,7 +461,7 @@ func (w *WorkerICE) createForwardedCandidate(srflxCandidate ice.Candidate, mappi
|
||||
}
|
||||
|
||||
func (w *WorkerICE) onICESelectedCandidatePair(agent *icemaker.ThreadSafeAgent, c1, c2 ice.Candidate) {
|
||||
w.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
|
||||
w.log.Infof("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
|
||||
w.config.Key)
|
||||
|
||||
pairStat, ok := agent.GetSelectedCandidatePairStats()
|
||||
|
||||
@@ -78,6 +78,23 @@ type GrpcClient struct {
|
||||
// transport-alive but no longer delivering messages. It is the source of
|
||||
// truth IsHealthy reads, and is cleared once any frame is received again.
|
||||
receiveStalled atomic.Bool
|
||||
// receiveHandoffBlocked is set while the receive loop is parked handing a
|
||||
// message to a busy decryption worker. The loop stops calling Recv (and
|
||||
// markReceived) in that window, so the stream looks silent though it is
|
||||
// healthy. The watchdog reads this to avoid misreading self-inflicted
|
||||
// receive backpressure as a dead stream: reconnecting cannot help, since the
|
||||
// new stream feeds the same worker, and only triggers a reconnect storm.
|
||||
receiveHandoffBlocked atomic.Bool
|
||||
// lastDecrypt holds the Unix-nano timestamp of the last message the decryption
|
||||
// worker pulled off its queue. Diagnostic only: it lets a stall log show
|
||||
// whether the worker was draining (busy) or idle when the stream went silent.
|
||||
lastDecrypt atomic.Int64
|
||||
// handoffWaitTotal, handoffWaitMax (nanos) and handoffWaitCount accumulate the
|
||||
// time the receive loop spent blocked handing messages to the worker. This is
|
||||
// time not spent reading the stream, so it quantifies receive backpressure.
|
||||
handoffWaitTotal atomic.Int64
|
||||
handoffWaitMax atomic.Int64
|
||||
handoffWaitCount atomic.Int64
|
||||
}
|
||||
|
||||
// NewClient creates a new Signal client
|
||||
@@ -353,6 +370,8 @@ func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
|
||||
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
|
||||
func (c *GrpcClient) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
|
||||
c.lastDecrypt.Store(time.Now().UnixNano())
|
||||
|
||||
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -439,6 +458,22 @@ func (c *GrpcClient) idleSinceReceive() time.Duration {
|
||||
return time.Since(time.Unix(0, c.lastReceived.Load()))
|
||||
}
|
||||
|
||||
// idleSinceDecrypt returns how long since the worker last pulled a message.
|
||||
// Diagnostic only: distinguishes a busy/wedged worker from an idle one.
|
||||
func (c *GrpcClient) idleSinceDecrypt() time.Duration {
|
||||
return time.Since(time.Unix(0, c.lastDecrypt.Load()))
|
||||
}
|
||||
|
||||
// receiveAlive reports whether the receive stream shows liveness: it delivered a
|
||||
// frame within the inactivity threshold, or the receive loop is currently parked
|
||||
// handing a message to a busy decryption worker. In the latter case the loop has
|
||||
// stopped calling Recv, so the stream looks silent while being healthy, and
|
||||
// reconnecting would not help, so the watchdog must treat it as alive.
|
||||
func (c *GrpcClient) receiveAlive() bool {
|
||||
return c.idleSinceReceive() < receiveInactivityThreshold ||
|
||||
c.receiveHandoffBlocked.Load()
|
||||
}
|
||||
|
||||
// watchReceiveStream guards against a receive stream that is transport-alive but
|
||||
// no longer delivering messages. While the stream is idle past
|
||||
// receiveInactivityThreshold it sends a self-addressed probe that the Signal
|
||||
@@ -450,18 +485,55 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
|
||||
defer ticker.Stop()
|
||||
|
||||
var probeSentAt time.Time
|
||||
var holdLogged bool
|
||||
var statTicks int
|
||||
var lastStatTotal int64
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if c.idleSinceReceive() < receiveInactivityThreshold {
|
||||
// Periodic backpressure summary so time lost to the worker handoff is
|
||||
// visible even when no stall fires. Emitted ~once a minute and only
|
||||
// when the wait grew, to stay quiet on a healthy stream.
|
||||
if statTicks++; statTicks >= int(time.Minute/receiveWatchdogInterval) {
|
||||
statTicks = 0
|
||||
if total, max, count := c.handoffWaitStats(); int64(total) > lastStatTotal {
|
||||
log.Infof("signal receive backpressure: handoffWaitTotal=%s (+%s last min) handoffWaitMax=%s handoffMsgs=%d",
|
||||
total.Round(time.Second), (total - time.Duration(lastStatTotal)).Round(time.Millisecond),
|
||||
max.Round(time.Millisecond), count)
|
||||
lastStatTotal = int64(total)
|
||||
}
|
||||
}
|
||||
|
||||
if c.receiveAlive() {
|
||||
// Attribute the case that matters in the field: silent past the
|
||||
// threshold but held because the receive loop is parked on the
|
||||
// worker handoff (backpressure), not a dead stream. Log once per
|
||||
// hold episode so a persistent worker stall is visible at info.
|
||||
if c.idleSinceReceive() >= receiveInactivityThreshold && c.receiveHandoffBlocked.Load() {
|
||||
if !holdLogged {
|
||||
total, max, count := c.handoffWaitStats()
|
||||
log.Infof("signal receive idle %s, loop blocked on worker handoff (idleDecrypt=%s queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d); holding stream",
|
||||
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
|
||||
c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
|
||||
total.Round(time.Second), max.Round(time.Millisecond), count)
|
||||
holdLogged = true
|
||||
}
|
||||
} else {
|
||||
holdLogged = false
|
||||
}
|
||||
probeSentAt = time.Time{}
|
||||
continue
|
||||
}
|
||||
holdLogged = false
|
||||
|
||||
if !probeSentAt.IsZero() && time.Since(probeSentAt) >= receiveProbeTimeout {
|
||||
log.Warnf("signal receive stream stalled: no messages for %s and probe did not return, reconnecting", c.idleSinceReceive().Round(time.Second))
|
||||
total, max, count := c.handoffWaitStats()
|
||||
log.Warnf("signal receive stream stalled, reconnecting: idleRecv=%s idleDecrypt=%s handoffBlocked=%v queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d probe did not return",
|
||||
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
|
||||
c.receiveHandoffBlocked.Load(), c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
|
||||
total.Round(time.Second), max.Round(time.Millisecond), count)
|
||||
c.receiveStalled.Store(true)
|
||||
c.notifyDisconnected(errReceiveStreamStalled)
|
||||
cancelStream()
|
||||
@@ -517,12 +589,37 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er
|
||||
continue
|
||||
}
|
||||
|
||||
// The handoff blocks while the worker is busy, which parks this loop and
|
||||
// stops Recv. Flag it so the watchdog does not read the resulting silence
|
||||
// as a dead stream, and account the wait as receive backpressure.
|
||||
handoffStart := time.Now()
|
||||
c.receiveHandoffBlocked.Store(true)
|
||||
if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
|
||||
log.Errorf("failed to add message to decryption worker: %v", err)
|
||||
}
|
||||
c.receiveHandoffBlocked.Store(false)
|
||||
c.recordHandoffWait(time.Since(handoffStart))
|
||||
}
|
||||
}
|
||||
|
||||
// recordHandoffWait accumulates the time the receive loop was blocked handing a
|
||||
// message to the worker.
|
||||
func (c *GrpcClient) recordHandoffWait(d time.Duration) {
|
||||
c.handoffWaitTotal.Add(int64(d))
|
||||
c.handoffWaitCount.Add(1)
|
||||
for {
|
||||
cur := c.handoffWaitMax.Load()
|
||||
if int64(d) <= cur || c.handoffWaitMax.CompareAndSwap(cur, int64(d)) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handoffWaitStats returns cumulative receive-loop handoff backpressure.
|
||||
func (c *GrpcClient) handoffWaitStats() (total, max time.Duration, count int64) {
|
||||
return time.Duration(c.handoffWaitTotal.Load()), time.Duration(c.handoffWaitMax.Load()), c.handoffWaitCount.Load()
|
||||
}
|
||||
|
||||
func (c *GrpcClient) startEncryptionWorker(handler func(msg *proto.Message) error) {
|
||||
if c.decryptionWorker != nil {
|
||||
return
|
||||
|
||||
@@ -82,3 +82,27 @@ func TestReceiveProbeRoundTrips(t *testing.T) {
|
||||
t.Fatal("self-addressed heartbeat did not round-trip back through the signal server")
|
||||
}
|
||||
}
|
||||
|
||||
// TestReceiveAliveTreatsHandoffBlockAsLiveness reproduces the false positive
|
||||
// where a busy decryption worker parks the receive loop on the worker handoff,
|
||||
// so Recv (and markReceived) stops firing even though the stream is healthy.
|
||||
// With the receive stream silent past the inactivity threshold but the loop
|
||||
// blocked on handoff, the watchdog must consider the stream alive rather than
|
||||
// tear it down (reconnecting feeds the same worker and would not help).
|
||||
func TestReceiveAliveTreatsHandoffBlockAsLiveness(t *testing.T) {
|
||||
c := &GrpcClient{}
|
||||
|
||||
// Receive stream silent and the loop not blocked on handoff: genuinely stalled.
|
||||
c.lastReceived.Store(time.Now().Add(-2 * receiveInactivityThreshold).UnixNano())
|
||||
require.False(t, c.receiveAlive(), "silent stream with the receive loop idle must be treated as stalled")
|
||||
|
||||
// Receive stream silent but the loop is parked handing a message to a busy
|
||||
// worker: self-inflicted backpressure, not a dead stream, must not tear down.
|
||||
c.receiveHandoffBlocked.Store(true)
|
||||
require.True(t, c.receiveAlive(), "a receive loop blocked on worker handoff must keep the stream alive")
|
||||
|
||||
// Handoff drained, loop back to reading, a frame just arrived: alive via the receive path.
|
||||
c.receiveHandoffBlocked.Store(false)
|
||||
c.markReceived()
|
||||
require.True(t, c.receiveAlive(), "a freshly received frame must keep the stream alive")
|
||||
}
|
||||
|
||||
@@ -32,6 +32,13 @@ func (w *Worker) AddMsg(ctx context.Context, msg *proto.EncryptedMessage) error
|
||||
return nil
|
||||
}
|
||||
|
||||
// QueueLen returns the number of messages buffered for decryption. Diagnostic
|
||||
// only: a non-empty queue while the receive stream is silent indicates the
|
||||
// receive loop is parked on the handoff rather than the stream being dead.
|
||||
func (w *Worker) QueueLen() int {
|
||||
return len(w.encryptedMsgPool)
|
||||
}
|
||||
|
||||
func (w *Worker) Work(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
|
||||
Reference in New Issue
Block a user