mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-29 11:19:56 +00:00
Compare commits
2 Commits
client-jso
...
fix/signal
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4a4d506221 | ||
|
|
4cb2c62f2a |
@@ -78,6 +78,14 @@ type GrpcClient struct {
|
||||
// transport-alive but no longer delivering messages. It is the source of
|
||||
// truth IsHealthy reads, and is cleared once any frame is received again.
|
||||
receiveStalled atomic.Bool
|
||||
// receiveHandoffBlocked is set while the receive loop is parked handing a
|
||||
// message to a busy decryption worker. The loop stops calling Recv (and
|
||||
// markReceived) in that window, so the stream looks silent though it is
|
||||
// healthy. The watchdog reads this to avoid misreading self-inflicted
|
||||
// receive backpressure as a dead stream: reconnecting cannot help, since the
|
||||
// new stream feeds the same worker, and only triggers a reconnect storm.
|
||||
receiveHandoffBlocked atomic.Bool
|
||||
watchdogWg sync.WaitGroup
|
||||
}
|
||||
|
||||
// NewClient creates a new Signal client
|
||||
@@ -193,10 +201,18 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes
|
||||
// Guard the receive direction: the transport can stay healthy while the
|
||||
// server stops delivering messages. The watchdog reconnects via cancelStream.
|
||||
c.markReceived()
|
||||
go c.watchReceiveStream(streamCtx, cancelStream)
|
||||
c.watchdogWg.Add(1)
|
||||
go func() {
|
||||
defer c.watchdogWg.Done()
|
||||
c.watchReceiveStream(streamCtx, cancelStream)
|
||||
}()
|
||||
|
||||
// start receiving messages from the Signal stream (from other peers through signal)
|
||||
err = c.receive(stream)
|
||||
|
||||
cancelStream()
|
||||
c.watchdogWg.Wait()
|
||||
|
||||
if err != nil {
|
||||
// Check the parent context, not streamCtx: a watchdog-triggered
|
||||
// cancelStream must reconnect, only a parent cancel is shutdown.
|
||||
@@ -439,6 +455,16 @@ func (c *GrpcClient) idleSinceReceive() time.Duration {
|
||||
return time.Since(time.Unix(0, c.lastReceived.Load()))
|
||||
}
|
||||
|
||||
// receiveAlive reports whether the receive stream shows liveness: it delivered a
|
||||
// frame within the inactivity threshold, or the receive loop is currently parked
|
||||
// handing a message to a busy decryption worker. In the latter case the loop has
|
||||
// stopped calling Recv, so the stream looks silent while being healthy, and
|
||||
// reconnecting would not help, so the watchdog must treat it as alive.
|
||||
func (c *GrpcClient) receiveAlive() bool {
|
||||
return c.idleSinceReceive() < receiveInactivityThreshold ||
|
||||
c.receiveHandoffBlocked.Load()
|
||||
}
|
||||
|
||||
// watchReceiveStream guards against a receive stream that is transport-alive but
|
||||
// no longer delivering messages. While the stream is idle past
|
||||
// receiveInactivityThreshold it sends a self-addressed probe that the Signal
|
||||
@@ -455,7 +481,7 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
if c.idleSinceReceive() < receiveInactivityThreshold {
|
||||
if c.receiveAlive() {
|
||||
probeSentAt = time.Time{}
|
||||
continue
|
||||
}
|
||||
@@ -517,9 +543,14 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er
|
||||
continue
|
||||
}
|
||||
|
||||
// The handoff blocks while the worker is busy, which parks this loop and
|
||||
// stops Recv. Flag it so the watchdog does not read the resulting silence
|
||||
// as a dead stream.
|
||||
c.receiveHandoffBlocked.Store(true)
|
||||
if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
|
||||
log.Errorf("failed to add message to decryption worker: %v", err)
|
||||
}
|
||||
c.receiveHandoffBlocked.Store(false)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -82,3 +82,27 @@ func TestReceiveProbeRoundTrips(t *testing.T) {
|
||||
t.Fatal("self-addressed heartbeat did not round-trip back through the signal server")
|
||||
}
|
||||
}
|
||||
|
||||
// TestReceiveAliveTreatsHandoffBlockAsLiveness reproduces the false positive
|
||||
// where a busy decryption worker parks the receive loop on the worker handoff,
|
||||
// so Recv (and markReceived) stops firing even though the stream is healthy.
|
||||
// With the receive stream silent past the inactivity threshold but the loop
|
||||
// blocked on handoff, the watchdog must consider the stream alive rather than
|
||||
// tear it down (reconnecting feeds the same worker and would not help).
|
||||
func TestReceiveAliveTreatsHandoffBlockAsLiveness(t *testing.T) {
|
||||
c := &GrpcClient{}
|
||||
|
||||
// Receive stream silent and the loop not blocked on handoff: genuinely stalled.
|
||||
c.lastReceived.Store(time.Now().Add(-2 * receiveInactivityThreshold).UnixNano())
|
||||
require.False(t, c.receiveAlive(), "silent stream with the receive loop idle must be treated as stalled")
|
||||
|
||||
// Receive stream silent but the loop is parked handing a message to a busy
|
||||
// worker: self-inflicted backpressure, not a dead stream, must not tear down.
|
||||
c.receiveHandoffBlocked.Store(true)
|
||||
require.True(t, c.receiveAlive(), "a receive loop blocked on worker handoff must keep the stream alive")
|
||||
|
||||
// Handoff drained, loop back to reading, a frame just arrived: alive via the receive path.
|
||||
c.receiveHandoffBlocked.Store(false)
|
||||
c.markReceived()
|
||||
require.True(t, c.receiveAlive(), "a freshly received frame must keep the stream alive")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user