Compare commits

...

3 Commits

5 changed files with 185 additions and 11 deletions

View File

@@ -14,6 +14,7 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-multierror"
@@ -88,6 +89,13 @@ var ErrResetConnection = fmt.Errorf("reset connection")
var ErrEngineAlreadyStarted = errors.New("engine already started")
// engineRestartCount and engineLastRestart track client-restart cadence across
// engine recreations so a restart loop is distinguishable from rare restarts.
var (
engineRestartCount atomic.Int64
engineLastRestart atomic.Int64
)
type EngineConfig struct {
WgPort int
WgIfaceName string
@@ -909,14 +917,23 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
if e.ctx.Err() != nil {
return e.ctx.Err()
}
serial := update.GetNetworkMap().GetSerial()
if nm := update.GetNetworkMap(); nm != nil {
log.Infof("sync update: serial=%d remotePeers=%d offlinePeers=%d routes=%d firewallRules=%d checks=%d configPresent=%v remotePeersEmpty=%v",
nm.GetSerial(), len(nm.GetRemotePeers()), len(nm.GetOfflinePeers()), len(nm.GetRoutes()),
len(nm.GetFirewallRules()), len(update.GetChecks()), update.GetNetbirdConfig() != nil, nm.GetRemotePeersIsEmpty())
} else {
log.Infof("sync update: config-only (no network map), configPresent=%v", update.GetNetbirdConfig() != nil)
}
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
}
startTime := time.Now()
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
return err
}
log.Infof("netbird config updated in %s, serial=%d", time.Since(startTime), serial)
// Posture checks are bound to the network map presence:
// NetworkMap != nil, checks present -> apply the received checks
@@ -927,17 +944,21 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
if nm == nil {
return nil
}
startTime = time.Now()
if err := e.updateChecksIfNew(update.Checks); err != nil {
return err
}
log.Infof("checks updated in %s, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
e.persistSyncResponse(update)
log.Infof("sync response persisted in %s, serial=%d", time.Since(startTime), serial)
// only apply new changes and ignore old ones
startTime = time.Now()
if err := e.updateNetworkMap(nm); err != nil {
return err
}
log.Infof("network map updated in %s, serial=%d", time.Since(startTime), serial)
e.statusRecorder.PublishEvent(cProto.SystemEvent_INFO, cProto.SystemEvent_SYSTEM, "Network map updated", "", nil)
@@ -1357,44 +1378,56 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
dnsConfig := toDNSConfig(protoDNSConfig, e.wgInterface.Address())
startTime := time.Now()
if err := e.dnsServer.UpdateDNSServer(serial, dnsConfig); err != nil {
log.Errorf("failed to update dns server, err: %v", err)
}
log.Infof("updated dns server in %v, serial=%d", time.Since(startTime), serial)
e.routeManager.SetDNSForwarderPort(dnsConfig.ForwarderPort)
// apply routes first, route related actions might depend on routing being enabled
startTime = time.Now()
routes := toRoutes(networkMap.GetRoutes())
serverRoutes, clientRoutes := e.routeManager.ClassifyRoutes(routes)
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
// lazy mgr needs to be aware of which routes are available before they are applied
if e.connMgr != nil {
e.connMgr.UpdateRouteHAMap(clientRoutes)
log.Debugf("updated lazy connection manager with %d HA groups", len(clientRoutes))
}
startTime = time.Now()
dnsRouteFeatureFlag := toDNSFeatureFlag(networkMap)
if err := e.routeManager.UpdateRoutes(serial, serverRoutes, clientRoutes, dnsRouteFeatureFlag); err != nil {
log.Errorf("failed to update routes: %v", err)
}
log.Infof("updated routes in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
if e.acl != nil {
e.acl.ApplyFiltering(networkMap, dnsRouteFeatureFlag)
}
log.Infof("updated filtering in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
fwdEntries := toRouteDomains(e.config.WgPrivateKey.PublicKey().String(), routes)
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
log.Infof("updated DNS forwarder in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
// Ingress forward rules
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
if err != nil {
log.Errorf("failed to update forward rules, err: %v", err)
}
log.Infof("updated forward rules in %v, serial=%d", time.Since(startTime), serial)
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
startTime = time.Now()
e.updateOfflinePeers(networkMap.GetOfflinePeers())
log.Infof("updated offline peers in %v, serial=%d", time.Since(startTime), serial)
// Filter out own peer from the remote peers list
localPubKey := e.config.WgPrivateKey.PublicKey().String()
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
@@ -1412,20 +1445,24 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
return err
}
} else {
startTime = time.Now()
err := e.removePeers(remotePeers)
if err != nil {
return err
}
log.Infof("removed peers in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
err = e.modifyPeers(remotePeers)
if err != nil {
return err
}
log.Infof("modified peers in %v, serial=%d", time.Since(startTime), serial)
startTime = time.Now()
err = e.addNewPeers(remotePeers)
if err != nil {
return err
}
log.Infof("added peers in %v, serial=%d", time.Since(startTime), serial)
e.statusRecorder.FinishPeerListModifications()
@@ -1438,9 +1475,11 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
e.updateSSHServerAuth(networkMap.GetSshAuth())
}
startTime = time.Now()
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
log.Infof("updated lazy connection manager exclude list in %v, serial=%d", time.Since(startTime), serial)
e.networkSerial = serial
@@ -2171,7 +2210,14 @@ func (e *Engine) triggerClientRestart() {
return
}
log.Info("restarting engine")
// Cadence survives engine recreation (package-level), so a restart loop shows
// as a fast-climbing count with a short gap, distinct from rare intentional restarts.
n := engineRestartCount.Add(1)
var sinceLast time.Duration
if prev := engineLastRestart.Swap(time.Now().UnixNano()); prev != 0 {
sinceLast = time.Since(time.Unix(0, prev))
}
log.Infof("restarting engine (restart #%d, %s since previous)", n, sinceLast.Round(time.Second))
CtxGetState(e.ctx).Set(StatusConnecting)
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
log.Infof("cancelling client context, engine will be recreated")

View File

@@ -461,7 +461,7 @@ func (w *WorkerICE) createForwardedCandidate(srflxCandidate ice.Candidate, mappi
}
func (w *WorkerICE) onICESelectedCandidatePair(agent *icemaker.ThreadSafeAgent, c1, c2 ice.Candidate) {
w.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
w.log.Infof("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
w.config.Key)
pairStat, ok := agent.GetSelectedCandidatePairStats()

View File

@@ -78,6 +78,23 @@ type GrpcClient struct {
// transport-alive but no longer delivering messages. It is the source of
// truth IsHealthy reads, and is cleared once any frame is received again.
receiveStalled atomic.Bool
// receiveHandoffBlocked is set while the receive loop is parked handing a
// message to a busy decryption worker. The loop stops calling Recv (and
// markReceived) in that window, so the stream looks silent though it is
// healthy. The watchdog reads this to avoid misreading self-inflicted
// receive backpressure as a dead stream: reconnecting cannot help, since the
// new stream feeds the same worker, and only triggers a reconnect storm.
receiveHandoffBlocked atomic.Bool
// lastDecrypt holds the Unix-nano timestamp of the last message the decryption
// worker pulled off its queue. Diagnostic only: it lets a stall log show
// whether the worker was draining (busy) or idle when the stream went silent.
lastDecrypt atomic.Int64
// handoffWaitTotal, handoffWaitMax (nanos) and handoffWaitCount accumulate the
// time the receive loop spent blocked handing messages to the worker. This is
// time not spent reading the stream, so it quantifies receive backpressure.
handoffWaitTotal atomic.Int64
handoffWaitMax atomic.Int64
handoffWaitCount atomic.Int64
}
// NewClient creates a new Signal client
@@ -353,6 +370,8 @@ func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error {
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
func (c *GrpcClient) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
c.lastDecrypt.Store(time.Now().UnixNano())
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
if err != nil {
return nil, err
@@ -439,6 +458,22 @@ func (c *GrpcClient) idleSinceReceive() time.Duration {
return time.Since(time.Unix(0, c.lastReceived.Load()))
}
// idleSinceDecrypt returns how long since the worker last pulled a message.
// Diagnostic only: distinguishes a busy/wedged worker from an idle one.
func (c *GrpcClient) idleSinceDecrypt() time.Duration {
return time.Since(time.Unix(0, c.lastDecrypt.Load()))
}
// receiveAlive reports whether the receive stream shows liveness: it delivered a
// frame within the inactivity threshold, or the receive loop is currently parked
// handing a message to a busy decryption worker. In the latter case the loop has
// stopped calling Recv, so the stream looks silent while being healthy, and
// reconnecting would not help, so the watchdog must treat it as alive.
func (c *GrpcClient) receiveAlive() bool {
return c.idleSinceReceive() < receiveInactivityThreshold ||
c.receiveHandoffBlocked.Load()
}
// watchReceiveStream guards against a receive stream that is transport-alive but
// no longer delivering messages. While the stream is idle past
// receiveInactivityThreshold it sends a self-addressed probe that the Signal
@@ -450,18 +485,55 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
defer ticker.Stop()
var probeSentAt time.Time
var holdLogged bool
var statTicks int
var lastStatTotal int64
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if c.idleSinceReceive() < receiveInactivityThreshold {
// Periodic backpressure summary so time lost to the worker handoff is
// visible even when no stall fires. Emitted ~once a minute and only
// when the wait grew, to stay quiet on a healthy stream.
if statTicks++; statTicks >= int(time.Minute/receiveWatchdogInterval) {
statTicks = 0
if total, max, count := c.handoffWaitStats(); int64(total) > lastStatTotal {
log.Infof("signal receive backpressure: handoffWaitTotal=%s (+%s last min) handoffWaitMax=%s handoffMsgs=%d",
total.Round(time.Second), (total - time.Duration(lastStatTotal)).Round(time.Millisecond),
max.Round(time.Millisecond), count)
lastStatTotal = int64(total)
}
}
if c.receiveAlive() {
// Attribute the case that matters in the field: silent past the
// threshold but held because the receive loop is parked on the
// worker handoff (backpressure), not a dead stream. Log once per
// hold episode so a persistent worker stall is visible at info.
if c.idleSinceReceive() >= receiveInactivityThreshold && c.receiveHandoffBlocked.Load() {
if !holdLogged {
total, max, count := c.handoffWaitStats()
log.Infof("signal receive idle %s, loop blocked on worker handoff (idleDecrypt=%s queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d); holding stream",
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
total.Round(time.Second), max.Round(time.Millisecond), count)
holdLogged = true
}
} else {
holdLogged = false
}
probeSentAt = time.Time{}
continue
}
holdLogged = false
if !probeSentAt.IsZero() && time.Since(probeSentAt) >= receiveProbeTimeout {
log.Warnf("signal receive stream stalled: no messages for %s and probe did not return, reconnecting", c.idleSinceReceive().Round(time.Second))
total, max, count := c.handoffWaitStats()
log.Warnf("signal receive stream stalled, reconnecting: idleRecv=%s idleDecrypt=%s handoffBlocked=%v queueDepth=%d connState=%s handoffWaitTotal=%s handoffWaitMax=%s handoffMsgs=%d probe did not return",
c.idleSinceReceive().Round(time.Second), c.idleSinceDecrypt().Round(time.Second),
c.receiveHandoffBlocked.Load(), c.decryptionWorker.QueueLen(), c.signalConn.GetState(),
total.Round(time.Second), max.Round(time.Millisecond), count)
c.receiveStalled.Store(true)
c.notifyDisconnected(errReceiveStreamStalled)
cancelStream()
@@ -517,12 +589,37 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er
continue
}
// The handoff blocks while the worker is busy, which parks this loop and
// stops Recv. Flag it so the watchdog does not read the resulting silence
// as a dead stream, and account the wait as receive backpressure.
handoffStart := time.Now()
c.receiveHandoffBlocked.Store(true)
if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
log.Errorf("failed to add message to decryption worker: %v", err)
}
c.receiveHandoffBlocked.Store(false)
c.recordHandoffWait(time.Since(handoffStart))
}
}
// recordHandoffWait accumulates the time the receive loop was blocked handing a
// message to the worker.
func (c *GrpcClient) recordHandoffWait(d time.Duration) {
c.handoffWaitTotal.Add(int64(d))
c.handoffWaitCount.Add(1)
for {
cur := c.handoffWaitMax.Load()
if int64(d) <= cur || c.handoffWaitMax.CompareAndSwap(cur, int64(d)) {
break
}
}
}
// handoffWaitStats returns cumulative receive-loop handoff backpressure.
func (c *GrpcClient) handoffWaitStats() (total, max time.Duration, count int64) {
return time.Duration(c.handoffWaitTotal.Load()), time.Duration(c.handoffWaitMax.Load()), c.handoffWaitCount.Load()
}
func (c *GrpcClient) startEncryptionWorker(handler func(msg *proto.Message) error) {
if c.decryptionWorker != nil {
return

View File

@@ -82,3 +82,27 @@ func TestReceiveProbeRoundTrips(t *testing.T) {
t.Fatal("self-addressed heartbeat did not round-trip back through the signal server")
}
}
// TestReceiveAliveTreatsHandoffBlockAsLiveness reproduces the false positive
// where a busy decryption worker parks the receive loop on the worker handoff,
// so Recv (and markReceived) stops firing even though the stream is healthy.
// With the receive stream silent past the inactivity threshold but the loop
// blocked on handoff, the watchdog must consider the stream alive rather than
// tear it down (reconnecting feeds the same worker and would not help).
func TestReceiveAliveTreatsHandoffBlockAsLiveness(t *testing.T) {
c := &GrpcClient{}
// Receive stream silent and the loop not blocked on handoff: genuinely stalled.
c.lastReceived.Store(time.Now().Add(-2 * receiveInactivityThreshold).UnixNano())
require.False(t, c.receiveAlive(), "silent stream with the receive loop idle must be treated as stalled")
// Receive stream silent but the loop is parked handing a message to a busy
// worker: self-inflicted backpressure, not a dead stream, must not tear down.
c.receiveHandoffBlocked.Store(true)
require.True(t, c.receiveAlive(), "a receive loop blocked on worker handoff must keep the stream alive")
// Handoff drained, loop back to reading, a frame just arrived: alive via the receive path.
c.receiveHandoffBlocked.Store(false)
c.markReceived()
require.True(t, c.receiveAlive(), "a freshly received frame must keep the stream alive")
}

View File

@@ -32,6 +32,13 @@ func (w *Worker) AddMsg(ctx context.Context, msg *proto.EncryptedMessage) error
return nil
}
// QueueLen returns the number of messages buffered for decryption. Diagnostic
// only: a non-empty queue while the receive stream is silent indicates the
// receive loop is parked on the handoff rather than the stream being dead.
func (w *Worker) QueueLen() int {
return len(w.encryptedMsgPool)
}
func (w *Worker) Work(ctx context.Context) {
for {
select {