mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-31 04:59:54 +00:00
ui: extract subscribeAndStreamStatus to cut statusStreamLoop complexity
Move the status backoff op closure body into a method so the nested closure no longer carries the stream loop and its conditionals, bringing statusStreamLoop under the 20 cognitive-complexity limit. No behavior change.
This commit is contained in:
@@ -334,26 +334,7 @@ func (s *DaemonFeed) statusStreamLoop(ctx context.Context) {
|
||||
}
|
||||
|
||||
op := func() error {
|
||||
cli, err := s.conn.Client()
|
||||
if err != nil {
|
||||
emitUnavailable()
|
||||
return fmt.Errorf("get client: %w", err)
|
||||
}
|
||||
stream, err := cli.SubscribeStatus(ctx, &proto.StatusRequest{GetFullPeerStatus: true})
|
||||
if err != nil {
|
||||
if isDaemonUnreachable(err) {
|
||||
emitUnavailable()
|
||||
}
|
||||
return fmt.Errorf("subscribe status: %w", err)
|
||||
}
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
return s.handleStatusRecvErr(ctx, err, emitUnavailable)
|
||||
}
|
||||
unavailable = false
|
||||
s.emitStatus(statusFromProto(resp))
|
||||
}
|
||||
return s.subscribeAndStreamStatus(ctx, &unavailable, emitUnavailable)
|
||||
}
|
||||
|
||||
if err := backoff.Retry(op, bo); err != nil && ctx.Err() == nil {
|
||||
@@ -361,6 +342,33 @@ func (s *DaemonFeed) statusStreamLoop(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// subscribeAndStreamStatus is one attempt of the status backoff loop: open the
|
||||
// SubscribeStatus stream and re-emit every snapshot until it errors. Returns a
|
||||
// wrapped error so backoff retries; a daemon-unreachable failure also flips the
|
||||
// synthetic-unavailable signal (once per outage, guarded by *unavailable).
|
||||
func (s *DaemonFeed) subscribeAndStreamStatus(ctx context.Context, unavailable *bool, emitUnavailable func()) error {
|
||||
cli, err := s.conn.Client()
|
||||
if err != nil {
|
||||
emitUnavailable()
|
||||
return fmt.Errorf("get client: %w", err)
|
||||
}
|
||||
stream, err := cli.SubscribeStatus(ctx, &proto.StatusRequest{GetFullPeerStatus: true})
|
||||
if err != nil {
|
||||
if isDaemonUnreachable(err) {
|
||||
emitUnavailable()
|
||||
}
|
||||
return fmt.Errorf("subscribe status: %w", err)
|
||||
}
|
||||
for {
|
||||
resp, err := stream.Recv()
|
||||
if err != nil {
|
||||
return s.handleStatusRecvErr(ctx, err, emitUnavailable)
|
||||
}
|
||||
*unavailable = false
|
||||
s.emitStatus(statusFromProto(resp))
|
||||
}
|
||||
}
|
||||
|
||||
// handleStatusRecvErr maps a SubscribeStatus stream.Recv error into the
|
||||
// backoff loop's return value: ctx cancellation stops the loop, an
|
||||
// unreachable socket flips the synthetic-unavailable signal, everything
|
||||
|
||||
Reference in New Issue
Block a user