Compare commits

...

1 Commits

Author SHA1 Message Date
Viktor Liu
4cb2c62f2a Keep signal stream alive while receive loop is blocked on worker handoff 2026-06-24 12:44:04 +02:00
2 changed files with 47 additions and 1 deletions

View File

@@ -78,6 +78,13 @@ type GrpcClient struct {
// transport-alive but no longer delivering messages. It is the source of
// truth IsHealthy reads, and is cleared once any frame is received again.
receiveStalled atomic.Bool
// receiveHandoffBlocked is set while the receive loop is parked handing a
// message to a busy decryption worker. The loop stops calling Recv (and
// markReceived) in that window, so the stream looks silent though it is
// healthy. The watchdog reads this to avoid misreading self-inflicted
// receive backpressure as a dead stream: reconnecting cannot help, since the
// new stream feeds the same worker, and only triggers a reconnect storm.
receiveHandoffBlocked atomic.Bool
}
// NewClient creates a new Signal client
@@ -439,6 +446,16 @@ func (c *GrpcClient) idleSinceReceive() time.Duration {
return time.Since(time.Unix(0, c.lastReceived.Load()))
}
// receiveAlive reports whether the receive stream shows liveness: it delivered a
// frame within the inactivity threshold, or the receive loop is currently parked
// handing a message to a busy decryption worker. In the latter case the loop has
// stopped calling Recv, so the stream looks silent while being healthy, and
// reconnecting would not help, so the watchdog must treat it as alive.
func (c *GrpcClient) receiveAlive() bool {
return c.idleSinceReceive() < receiveInactivityThreshold ||
c.receiveHandoffBlocked.Load()
}
// watchReceiveStream guards against a receive stream that is transport-alive but
// no longer delivering messages. While the stream is idle past
// receiveInactivityThreshold it sends a self-addressed probe that the Signal
@@ -455,7 +472,7 @@ func (c *GrpcClient) watchReceiveStream(ctx context.Context, cancelStream contex
case <-ctx.Done():
return
case <-ticker.C:
if c.idleSinceReceive() < receiveInactivityThreshold {
if c.receiveAlive() {
probeSentAt = time.Time{}
continue
}
@@ -517,9 +534,14 @@ func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) er
continue
}
// The handoff blocks while the worker is busy, which parks this loop and
// stops Recv. Flag it so the watchdog does not read the resulting silence
// as a dead stream.
c.receiveHandoffBlocked.Store(true)
if err := c.decryptionWorker.AddMsg(c.ctx, msg); err != nil {
log.Errorf("failed to add message to decryption worker: %v", err)
}
c.receiveHandoffBlocked.Store(false)
}
}

View File

@@ -82,3 +82,27 @@ func TestReceiveProbeRoundTrips(t *testing.T) {
t.Fatal("self-addressed heartbeat did not round-trip back through the signal server")
}
}
// TestReceiveAliveTreatsHandoffBlockAsLiveness reproduces the false positive
// where a busy decryption worker parks the receive loop on the worker handoff,
// so Recv (and markReceived) stops firing even though the stream is healthy.
// With the receive stream silent past the inactivity threshold but the loop
// blocked on handoff, the watchdog must consider the stream alive rather than
// tear it down (reconnecting feeds the same worker and would not help).
func TestReceiveAliveTreatsHandoffBlockAsLiveness(t *testing.T) {
c := &GrpcClient{}
// Receive stream silent and the loop not blocked on handoff: genuinely stalled.
c.lastReceived.Store(time.Now().Add(-2 * receiveInactivityThreshold).UnixNano())
require.False(t, c.receiveAlive(), "silent stream with the receive loop idle must be treated as stalled")
// Receive stream silent but the loop is parked handing a message to a busy
// worker: self-inflicted backpressure, not a dead stream, must not tear down.
c.receiveHandoffBlocked.Store(true)
require.True(t, c.receiveAlive(), "a receive loop blocked on worker handoff must keep the stream alive")
// Handoff drained, loop back to reading, a frame just arrived: alive via the receive path.
c.receiveHandoffBlocked.Store(false)
c.markReceived()
require.True(t, c.receiveAlive(), "a freshly received frame must keep the stream alive")
}