[client] Push-based status stream for the Wails UI

Adds a SubscribeStatus gRPC RPC that pushes a fresh FullStatus snapshot
on every peer-recorder state change, replacing the Wails UI's 2-second
Status poll. The daemon's notifier already triggers on Connected /
Disconnected / Connecting / management or signal flip / address
change / peers-list change; we now coalesce those into ticks on a
buffered chan and stream the resulting snapshots over gRPC.

- Status recorder gains SubscribeToStateChanges /
  UnsubscribeFromStateChanges + a non-blocking notifyStateChange that
  drops ticks when a subscriber's 1-slot buffer is full (next snapshot
  the consumer pulls already reflects everything).
- Server.Status handler split: the snapshot composition is shared
  with the new SubscribeStatus stream handler so unary and stream
  paths return identical bytes.
- UI peers service: pollLoop replaced by statusStreamLoop. The local
  name of the existing SubscribeEvents loop is now toastStreamLoop so
  the two streams are easy to tell apart — the underlying RPC name is
  unchanged.
- Tray applyStatus skips the icon refresh when connected/lastStatus
  hasn't changed; rapid SubscribeStatus bursts during health probes
  no longer churn Shell_NotifyIcon or the log.
This commit is contained in:
Zoltán Papp
2026-04-30 11:45:43 +02:00
parent 0148d926d5
commit 88a2bf582d
8 changed files with 359 additions and 113 deletions

View File

@@ -15,10 +15,6 @@ import (
"github.com/netbirdio/netbird/client/proto"
)
// PollInterval is how often Watch falls back to Status polling when the
// SubscribeEvents stream is unavailable. Matches the Fyne UI's 2-second cadence.
const PollInterval = 2 * time.Second
const (
// EventStatus is emitted to the frontend whenever a fresh Status snapshot
// is captured (from a poll or a stream-driven refresh).
@@ -130,8 +126,18 @@ func NewPeers(conn DaemonConn, emitter Emitter) *Peers {
return &Peers{conn: conn, emitter: emitter}
}
// Watch starts the background loop: a poll-then-stream pair that runs until
// ctx (or the service shutdown) cancels it. Safe to call once at boot.
// Watch starts the background loops that feed the frontend:
// - statusStreamLoop: push-driven snapshots on connection-state change
// (Connected/Disconnected/Connecting, peer list, address). Drives the
// tray icon, Status page, and Peers page.
// - toastStreamLoop: DNS / network / auth / connectivity / update
// SystemEvent stream. Drives OS notifications, the Recent Events
// list, and the update-overlay flag. The daemon-side RPC is named
// SubscribeEvents — only the loop's local alias differs to keep the
// two streams distinguishable in this file.
//
// Safe to call once at boot; both loops self-restart on stream errors
// via exponential backoff.
func (s *Peers) Watch(ctx context.Context) {
s.mu.Lock()
if s.cancel != nil {
@@ -143,8 +149,8 @@ func (s *Peers) Watch(ctx context.Context) {
s.mu.Unlock()
s.streamWg.Add(2)
go s.pollLoop(ctx)
go s.streamLoop(ctx)
go s.statusStreamLoop(ctx)
go s.toastStreamLoop(ctx)
}
// ServiceShutdown is the Wails service hook fired on app exit.
@@ -173,33 +179,61 @@ func (s *Peers) Get(ctx context.Context) (Status, error) {
return statusFromProto(resp), nil
}
func (s *Peers) pollLoop(ctx context.Context) {
// statusStreamLoop subscribes to the daemon's SubscribeStatus stream and
// re-emits each FullStatus snapshot on the Wails event bus. The first
// message is the current snapshot; subsequent messages fire on
// connection-state changes only — no fixed-interval polling, no idle
// chatter. Reconnects with exponential backoff if the stream drops
// (daemon restart, socket break).
func (s *Peers) statusStreamLoop(ctx context.Context) {
defer s.streamWg.Done()
ticker := time.NewTicker(PollInterval)
defer ticker.Stop()
first := true
for {
st, err := s.Get(ctx)
if err == nil {
if first {
log.Infof("peers pollLoop: first status ok status=%q peers=%d", st.Status, len(st.Peers))
first = false
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 0,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
op := func() error {
cli, err := s.conn.Client()
if err != nil {
return fmt.Errorf("get client: %w", err)
}
stream, err := cli.SubscribeStatus(ctx, &proto.StatusRequest{GetFullPeerStatus: true})
if err != nil {
return fmt.Errorf("subscribe status: %w", err)
}
for {
resp, err := stream.Recv()
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("status stream recv: %w", err)
}
st := statusFromProto(resp)
log.Infof("backend event: status status=%q peers=%d", st.Status, len(st.Peers))
s.emitter.Emit(EventStatus, st)
} else if ctx.Err() == nil {
log.Warnf("peers pollLoop: status poll error: %v", err)
}
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if err := backoff.Retry(op, bo); err != nil && ctx.Err() == nil {
log.Errorf("status stream ended: %v", err)
}
}
func (s *Peers) streamLoop(ctx context.Context) {
// toastStreamLoop subscribes to the daemon's SubscribeEvents RPC and
// re-emits every SystemEvent on the Wails event bus. The downstream
// consumers turn these into OS notifications, populate the Recent
// Events card on the Status page, and listen for the
// "new_version_available" metadata to flip the tray's update overlay.
// Local name differs from the RPC ("SubscribeEvents") so the file's
// two streams aren't both called streamLoop.
func (s *Peers) toastStreamLoop(ctx context.Context) {
defer s.streamWg.Done()
bo := backoff.WithContext(&backoff.ExponentialBackOff{
@@ -229,7 +263,9 @@ func (s *Peers) streamLoop(ctx context.Context) {
}
return fmt.Errorf("stream recv: %w", err)
}
s.emitter.Emit(EventSystem, systemEventFromProto(ev))
se := systemEventFromProto(ev)
log.Infof("backend event: system severity=%s category=%s msg=%q", se.Severity, se.Category, se.UserMessage)
s.emitter.Emit(EventSystem, se)
s.fanOutUpdateEvents(ev)
}
}