From 88a2bf582d81f577bf575acc6a8943324e84f619 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zolt=C3=A1n=20Papp?= Date: Thu, 30 Apr 2026 11:45:43 +0200 Subject: [PATCH] [client] Push-based status stream for the Wails UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a SubscribeStatus gRPC RPC that pushes a fresh FullStatus snapshot on every peer-recorder state change, replacing the Wails UI's 2-second Status poll. The daemon's notifier already triggers on Connected / Disconnected / Connecting / management or signal flip / address change / peers-list change; we now coalesce those into ticks on a buffered chan and stream the resulting snapshots over gRPC. - Status recorder gains SubscribeToStateChanges / UnsubscribeFromStateChanges + a non-blocking notifyStateChange that drops ticks when a subscriber's 1-slot buffer is full (next snapshot the consumer pulls already reflects everything). - Server.Status handler split: the snapshot composition is shared with the new SubscribeStatus stream handler so unary and stream paths return identical bytes. - UI peers service: pollLoop replaced by statusStreamLoop. The local name of the existing SubscribeEvents loop is now toastStreamLoop so the two streams are easy to tell apart — the underlying RPC name is unchanged. - Tray applyStatus skips the icon refresh when connected/lastStatus hasn't changed; rapid SubscribeStatus bursts during health probes no longer churn Shell_NotifyIcon or the log. --- client/internal/peer/status.go | 59 ++++++++++++ client/proto/daemon.pb.go | 153 +++++++++++++++--------------- client/proto/daemon.proto | 6 ++ client/proto/daemon_grpc.pb.go | 75 ++++++++++++++- client/server/server.go | 7 ++ client/server/status_stream.go | 57 +++++++++++ client/ui-wails/services/peers.go | 90 ++++++++++++------ client/ui-wails/tray.go | 25 +++-- 8 files changed, 359 insertions(+), 113 deletions(-) create mode 100644 client/server/status_stream.go diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index abedc208e..be310b24b 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -215,6 +215,14 @@ type Status struct { eventStreams map[string]chan *proto.SystemEvent eventQueue *EventQueue + // stateChangeStreams fan-out connection-state changes (connected / + // disconnected / connecting / address change / peers list change) to + // every active SubscribeStatus gRPC stream. Each subscriber gets a + // buffered chan; the notifier non-blockingly pings them so a slow + // consumer can never stall the daemon. + stateChangeMux sync.Mutex + stateChangeStreams map[string]chan struct{} + ingressGwMgr *ingressgw.Manager routeIDLookup routeIDLookup @@ -228,6 +236,7 @@ func NewRecorder(mgmAddress string) *Status { changeNotify: make(map[string]map[string]*StatusChangeSubscription), eventStreams: make(map[string]chan *proto.SystemEvent), eventQueue: NewEventQueue(eventQueueSize), + stateChangeStreams: make(map[string]chan struct{}), offlinePeers: make([]State, 0), notifier: newNotifier(), mgmAddress: mgmAddress, @@ -990,16 +999,19 @@ func (d *Status) GetFullStatus() FullStatus { // ClientStart will notify all listeners about the new service state func (d *Status) ClientStart() { d.notifier.clientStart() + d.notifyStateChange() } // ClientStop will notify all listeners about the new service state func (d *Status) ClientStop() { d.notifier.clientStop() + d.notifyStateChange() } // ClientTeardown will notify all listeners about the service is under teardown func (d *Status) ClientTeardown() { d.notifier.clientTearDown() + d.notifyStateChange() } // SetConnectionListener set a listener to the notifier @@ -1014,6 +1026,7 @@ func (d *Status) RemoveConnectionListener() { func (d *Status) onConnectionChanged() { d.notifier.updateServerStates(d.managementState, d.signalState) + d.notifyStateChange() } // notifyPeerStateChangeListeners notifies route manager about the change in peer state @@ -1049,10 +1062,12 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) { func (d *Status) notifyPeerListChanged() { d.notifier.peerListChanged(d.numOfPeers()) + d.notifyStateChange() } func (d *Status) notifyAddressChanged() { d.notifier.localAddressChanged(d.localPeer.FQDN, d.localPeer.IP) + d.notifyStateChange() } func (d *Status) numOfPeers() int { @@ -1128,6 +1143,50 @@ func (d *Status) GetEventHistory() []*proto.SystemEvent { return d.eventQueue.GetAll() } +// SubscribeToStateChanges hands back a channel that receives a tick on +// every connection-state change (connected / disconnected / connecting / +// address change / peers-list change). The channel is buffered to one +// pending tick so a coalesced burst still wakes the consumer exactly +// once. Pass the returned id to UnsubscribeFromStateChanges to detach. +func (d *Status) SubscribeToStateChanges() (string, <-chan struct{}) { + d.stateChangeMux.Lock() + defer d.stateChangeMux.Unlock() + + id := uuid.New().String() + ch := make(chan struct{}, 1) + d.stateChangeStreams[id] = ch + return id, ch +} + +// UnsubscribeFromStateChanges releases a SubscribeToStateChanges channel +// and closes it so any consumer goroutine selecting on the channel +// unblocks cleanly. +func (d *Status) UnsubscribeFromStateChanges(id string) { + d.stateChangeMux.Lock() + defer d.stateChangeMux.Unlock() + + if ch, ok := d.stateChangeStreams[id]; ok { + close(ch) + delete(d.stateChangeStreams, id) + } +} + +// notifyStateChange wakes every SubscribeToStateChanges subscriber. Drops +// the tick if a subscriber's buffer is full — by definition the consumer +// is already going to fetch the latest snapshot, so multiple pending ticks +// would be redundant. +func (d *Status) notifyStateChange() { + d.stateChangeMux.Lock() + defer d.stateChangeMux.Unlock() + + for _, ch := range d.stateChangeStreams { + select { + case ch <- struct{}{}: + default: + } + } +} + func (d *Status) SetWgIface(wgInterface WGIfaceStatus) { d.mux.Lock() defer d.mux.Unlock() diff --git a/client/proto/daemon.pb.go b/client/proto/daemon.pb.go index 6506307d3..7de9f697b 100644 --- a/client/proto/daemon.pb.go +++ b/client/proto/daemon.pb.go @@ -1,7 +1,7 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: // protoc-gen-go v1.36.6 -// protoc v6.33.1 +// protoc v7.34.1 // source: daemon.proto package proto @@ -6566,12 +6566,13 @@ const file_daemon_proto_rawDesc = "" + "\n" + "EXPOSE_UDP\x10\x03\x12\x0e\n" + "\n" + - "EXPOSE_TLS\x10\x042\xfc\x15\n" + + "EXPOSE_TLS\x10\x042\xc2\x16\n" + "\rDaemonService\x126\n" + "\x05Login\x12\x14.daemon.LoginRequest\x1a\x15.daemon.LoginResponse\"\x00\x12K\n" + "\fWaitSSOLogin\x12\x1b.daemon.WaitSSOLoginRequest\x1a\x1c.daemon.WaitSSOLoginResponse\"\x00\x12-\n" + "\x02Up\x12\x11.daemon.UpRequest\x1a\x12.daemon.UpResponse\"\x00\x129\n" + - "\x06Status\x12\x15.daemon.StatusRequest\x1a\x16.daemon.StatusResponse\"\x00\x123\n" + + "\x06Status\x12\x15.daemon.StatusRequest\x1a\x16.daemon.StatusResponse\"\x00\x12D\n" + + "\x0fSubscribeStatus\x12\x15.daemon.StatusRequest\x1a\x16.daemon.StatusResponse\"\x000\x01\x123\n" + "\x04Down\x12\x13.daemon.DownRequest\x1a\x14.daemon.DownResponse\"\x00\x12B\n" + "\tGetConfig\x12\x18.daemon.GetConfigRequest\x1a\x19.daemon.GetConfigResponse\"\x00\x12K\n" + "\fListNetworks\x12\x1b.daemon.ListNetworksRequest\x1a\x1c.daemon.ListNetworksResponse\"\x00\x12Q\n" + @@ -6766,78 +6767,80 @@ var file_daemon_proto_depIdxs = []int32{ 10, // 37: daemon.DaemonService.WaitSSOLogin:input_type -> daemon.WaitSSOLoginRequest 12, // 38: daemon.DaemonService.Up:input_type -> daemon.UpRequest 14, // 39: daemon.DaemonService.Status:input_type -> daemon.StatusRequest - 16, // 40: daemon.DaemonService.Down:input_type -> daemon.DownRequest - 18, // 41: daemon.DaemonService.GetConfig:input_type -> daemon.GetConfigRequest - 29, // 42: daemon.DaemonService.ListNetworks:input_type -> daemon.ListNetworksRequest - 31, // 43: daemon.DaemonService.SelectNetworks:input_type -> daemon.SelectNetworksRequest - 31, // 44: daemon.DaemonService.DeselectNetworks:input_type -> daemon.SelectNetworksRequest - 5, // 45: daemon.DaemonService.ForwardingRules:input_type -> daemon.EmptyRequest - 38, // 46: daemon.DaemonService.DebugBundle:input_type -> daemon.DebugBundleRequest - 40, // 47: daemon.DaemonService.GetLogLevel:input_type -> daemon.GetLogLevelRequest - 42, // 48: daemon.DaemonService.SetLogLevel:input_type -> daemon.SetLogLevelRequest - 45, // 49: daemon.DaemonService.ListStates:input_type -> daemon.ListStatesRequest - 47, // 50: daemon.DaemonService.CleanState:input_type -> daemon.CleanStateRequest - 49, // 51: daemon.DaemonService.DeleteState:input_type -> daemon.DeleteStateRequest - 51, // 52: daemon.DaemonService.SetSyncResponsePersistence:input_type -> daemon.SetSyncResponsePersistenceRequest - 54, // 53: daemon.DaemonService.TracePacket:input_type -> daemon.TracePacketRequest - 57, // 54: daemon.DaemonService.SubscribeEvents:input_type -> daemon.SubscribeRequest - 59, // 55: daemon.DaemonService.GetEvents:input_type -> daemon.GetEventsRequest - 61, // 56: daemon.DaemonService.SwitchProfile:input_type -> daemon.SwitchProfileRequest - 63, // 57: daemon.DaemonService.SetConfig:input_type -> daemon.SetConfigRequest - 65, // 58: daemon.DaemonService.AddProfile:input_type -> daemon.AddProfileRequest - 67, // 59: daemon.DaemonService.RemoveProfile:input_type -> daemon.RemoveProfileRequest - 69, // 60: daemon.DaemonService.ListProfiles:input_type -> daemon.ListProfilesRequest - 72, // 61: daemon.DaemonService.GetActiveProfile:input_type -> daemon.GetActiveProfileRequest - 74, // 62: daemon.DaemonService.Logout:input_type -> daemon.LogoutRequest - 76, // 63: daemon.DaemonService.GetFeatures:input_type -> daemon.GetFeaturesRequest - 78, // 64: daemon.DaemonService.TriggerUpdate:input_type -> daemon.TriggerUpdateRequest - 80, // 65: daemon.DaemonService.GetPeerSSHHostKey:input_type -> daemon.GetPeerSSHHostKeyRequest - 82, // 66: daemon.DaemonService.RequestJWTAuth:input_type -> daemon.RequestJWTAuthRequest - 84, // 67: daemon.DaemonService.WaitJWTToken:input_type -> daemon.WaitJWTTokenRequest - 86, // 68: daemon.DaemonService.StartCPUProfile:input_type -> daemon.StartCPUProfileRequest - 88, // 69: daemon.DaemonService.StopCPUProfile:input_type -> daemon.StopCPUProfileRequest - 6, // 70: daemon.DaemonService.NotifyOSLifecycle:input_type -> daemon.OSLifecycleRequest - 90, // 71: daemon.DaemonService.GetInstallerResult:input_type -> daemon.InstallerResultRequest - 92, // 72: daemon.DaemonService.ExposeService:input_type -> daemon.ExposeServiceRequest - 9, // 73: daemon.DaemonService.Login:output_type -> daemon.LoginResponse - 11, // 74: daemon.DaemonService.WaitSSOLogin:output_type -> daemon.WaitSSOLoginResponse - 13, // 75: daemon.DaemonService.Up:output_type -> daemon.UpResponse - 15, // 76: daemon.DaemonService.Status:output_type -> daemon.StatusResponse - 17, // 77: daemon.DaemonService.Down:output_type -> daemon.DownResponse - 19, // 78: daemon.DaemonService.GetConfig:output_type -> daemon.GetConfigResponse - 30, // 79: daemon.DaemonService.ListNetworks:output_type -> daemon.ListNetworksResponse - 32, // 80: daemon.DaemonService.SelectNetworks:output_type -> daemon.SelectNetworksResponse - 32, // 81: daemon.DaemonService.DeselectNetworks:output_type -> daemon.SelectNetworksResponse - 37, // 82: daemon.DaemonService.ForwardingRules:output_type -> daemon.ForwardingRulesResponse - 39, // 83: daemon.DaemonService.DebugBundle:output_type -> daemon.DebugBundleResponse - 41, // 84: daemon.DaemonService.GetLogLevel:output_type -> daemon.GetLogLevelResponse - 43, // 85: daemon.DaemonService.SetLogLevel:output_type -> daemon.SetLogLevelResponse - 46, // 86: daemon.DaemonService.ListStates:output_type -> daemon.ListStatesResponse - 48, // 87: daemon.DaemonService.CleanState:output_type -> daemon.CleanStateResponse - 50, // 88: daemon.DaemonService.DeleteState:output_type -> daemon.DeleteStateResponse - 52, // 89: daemon.DaemonService.SetSyncResponsePersistence:output_type -> daemon.SetSyncResponsePersistenceResponse - 56, // 90: daemon.DaemonService.TracePacket:output_type -> daemon.TracePacketResponse - 58, // 91: daemon.DaemonService.SubscribeEvents:output_type -> daemon.SystemEvent - 60, // 92: daemon.DaemonService.GetEvents:output_type -> daemon.GetEventsResponse - 62, // 93: daemon.DaemonService.SwitchProfile:output_type -> daemon.SwitchProfileResponse - 64, // 94: daemon.DaemonService.SetConfig:output_type -> daemon.SetConfigResponse - 66, // 95: daemon.DaemonService.AddProfile:output_type -> daemon.AddProfileResponse - 68, // 96: daemon.DaemonService.RemoveProfile:output_type -> daemon.RemoveProfileResponse - 70, // 97: daemon.DaemonService.ListProfiles:output_type -> daemon.ListProfilesResponse - 73, // 98: daemon.DaemonService.GetActiveProfile:output_type -> daemon.GetActiveProfileResponse - 75, // 99: daemon.DaemonService.Logout:output_type -> daemon.LogoutResponse - 77, // 100: daemon.DaemonService.GetFeatures:output_type -> daemon.GetFeaturesResponse - 79, // 101: daemon.DaemonService.TriggerUpdate:output_type -> daemon.TriggerUpdateResponse - 81, // 102: daemon.DaemonService.GetPeerSSHHostKey:output_type -> daemon.GetPeerSSHHostKeyResponse - 83, // 103: daemon.DaemonService.RequestJWTAuth:output_type -> daemon.RequestJWTAuthResponse - 85, // 104: daemon.DaemonService.WaitJWTToken:output_type -> daemon.WaitJWTTokenResponse - 87, // 105: daemon.DaemonService.StartCPUProfile:output_type -> daemon.StartCPUProfileResponse - 89, // 106: daemon.DaemonService.StopCPUProfile:output_type -> daemon.StopCPUProfileResponse - 7, // 107: daemon.DaemonService.NotifyOSLifecycle:output_type -> daemon.OSLifecycleResponse - 91, // 108: daemon.DaemonService.GetInstallerResult:output_type -> daemon.InstallerResultResponse - 93, // 109: daemon.DaemonService.ExposeService:output_type -> daemon.ExposeServiceEvent - 73, // [73:110] is the sub-list for method output_type - 36, // [36:73] is the sub-list for method input_type + 14, // 40: daemon.DaemonService.SubscribeStatus:input_type -> daemon.StatusRequest + 16, // 41: daemon.DaemonService.Down:input_type -> daemon.DownRequest + 18, // 42: daemon.DaemonService.GetConfig:input_type -> daemon.GetConfigRequest + 29, // 43: daemon.DaemonService.ListNetworks:input_type -> daemon.ListNetworksRequest + 31, // 44: daemon.DaemonService.SelectNetworks:input_type -> daemon.SelectNetworksRequest + 31, // 45: daemon.DaemonService.DeselectNetworks:input_type -> daemon.SelectNetworksRequest + 5, // 46: daemon.DaemonService.ForwardingRules:input_type -> daemon.EmptyRequest + 38, // 47: daemon.DaemonService.DebugBundle:input_type -> daemon.DebugBundleRequest + 40, // 48: daemon.DaemonService.GetLogLevel:input_type -> daemon.GetLogLevelRequest + 42, // 49: daemon.DaemonService.SetLogLevel:input_type -> daemon.SetLogLevelRequest + 45, // 50: daemon.DaemonService.ListStates:input_type -> daemon.ListStatesRequest + 47, // 51: daemon.DaemonService.CleanState:input_type -> daemon.CleanStateRequest + 49, // 52: daemon.DaemonService.DeleteState:input_type -> daemon.DeleteStateRequest + 51, // 53: daemon.DaemonService.SetSyncResponsePersistence:input_type -> daemon.SetSyncResponsePersistenceRequest + 54, // 54: daemon.DaemonService.TracePacket:input_type -> daemon.TracePacketRequest + 57, // 55: daemon.DaemonService.SubscribeEvents:input_type -> daemon.SubscribeRequest + 59, // 56: daemon.DaemonService.GetEvents:input_type -> daemon.GetEventsRequest + 61, // 57: daemon.DaemonService.SwitchProfile:input_type -> daemon.SwitchProfileRequest + 63, // 58: daemon.DaemonService.SetConfig:input_type -> daemon.SetConfigRequest + 65, // 59: daemon.DaemonService.AddProfile:input_type -> daemon.AddProfileRequest + 67, // 60: daemon.DaemonService.RemoveProfile:input_type -> daemon.RemoveProfileRequest + 69, // 61: daemon.DaemonService.ListProfiles:input_type -> daemon.ListProfilesRequest + 72, // 62: daemon.DaemonService.GetActiveProfile:input_type -> daemon.GetActiveProfileRequest + 74, // 63: daemon.DaemonService.Logout:input_type -> daemon.LogoutRequest + 76, // 64: daemon.DaemonService.GetFeatures:input_type -> daemon.GetFeaturesRequest + 78, // 65: daemon.DaemonService.TriggerUpdate:input_type -> daemon.TriggerUpdateRequest + 80, // 66: daemon.DaemonService.GetPeerSSHHostKey:input_type -> daemon.GetPeerSSHHostKeyRequest + 82, // 67: daemon.DaemonService.RequestJWTAuth:input_type -> daemon.RequestJWTAuthRequest + 84, // 68: daemon.DaemonService.WaitJWTToken:input_type -> daemon.WaitJWTTokenRequest + 86, // 69: daemon.DaemonService.StartCPUProfile:input_type -> daemon.StartCPUProfileRequest + 88, // 70: daemon.DaemonService.StopCPUProfile:input_type -> daemon.StopCPUProfileRequest + 6, // 71: daemon.DaemonService.NotifyOSLifecycle:input_type -> daemon.OSLifecycleRequest + 90, // 72: daemon.DaemonService.GetInstallerResult:input_type -> daemon.InstallerResultRequest + 92, // 73: daemon.DaemonService.ExposeService:input_type -> daemon.ExposeServiceRequest + 9, // 74: daemon.DaemonService.Login:output_type -> daemon.LoginResponse + 11, // 75: daemon.DaemonService.WaitSSOLogin:output_type -> daemon.WaitSSOLoginResponse + 13, // 76: daemon.DaemonService.Up:output_type -> daemon.UpResponse + 15, // 77: daemon.DaemonService.Status:output_type -> daemon.StatusResponse + 15, // 78: daemon.DaemonService.SubscribeStatus:output_type -> daemon.StatusResponse + 17, // 79: daemon.DaemonService.Down:output_type -> daemon.DownResponse + 19, // 80: daemon.DaemonService.GetConfig:output_type -> daemon.GetConfigResponse + 30, // 81: daemon.DaemonService.ListNetworks:output_type -> daemon.ListNetworksResponse + 32, // 82: daemon.DaemonService.SelectNetworks:output_type -> daemon.SelectNetworksResponse + 32, // 83: daemon.DaemonService.DeselectNetworks:output_type -> daemon.SelectNetworksResponse + 37, // 84: daemon.DaemonService.ForwardingRules:output_type -> daemon.ForwardingRulesResponse + 39, // 85: daemon.DaemonService.DebugBundle:output_type -> daemon.DebugBundleResponse + 41, // 86: daemon.DaemonService.GetLogLevel:output_type -> daemon.GetLogLevelResponse + 43, // 87: daemon.DaemonService.SetLogLevel:output_type -> daemon.SetLogLevelResponse + 46, // 88: daemon.DaemonService.ListStates:output_type -> daemon.ListStatesResponse + 48, // 89: daemon.DaemonService.CleanState:output_type -> daemon.CleanStateResponse + 50, // 90: daemon.DaemonService.DeleteState:output_type -> daemon.DeleteStateResponse + 52, // 91: daemon.DaemonService.SetSyncResponsePersistence:output_type -> daemon.SetSyncResponsePersistenceResponse + 56, // 92: daemon.DaemonService.TracePacket:output_type -> daemon.TracePacketResponse + 58, // 93: daemon.DaemonService.SubscribeEvents:output_type -> daemon.SystemEvent + 60, // 94: daemon.DaemonService.GetEvents:output_type -> daemon.GetEventsResponse + 62, // 95: daemon.DaemonService.SwitchProfile:output_type -> daemon.SwitchProfileResponse + 64, // 96: daemon.DaemonService.SetConfig:output_type -> daemon.SetConfigResponse + 66, // 97: daemon.DaemonService.AddProfile:output_type -> daemon.AddProfileResponse + 68, // 98: daemon.DaemonService.RemoveProfile:output_type -> daemon.RemoveProfileResponse + 70, // 99: daemon.DaemonService.ListProfiles:output_type -> daemon.ListProfilesResponse + 73, // 100: daemon.DaemonService.GetActiveProfile:output_type -> daemon.GetActiveProfileResponse + 75, // 101: daemon.DaemonService.Logout:output_type -> daemon.LogoutResponse + 77, // 102: daemon.DaemonService.GetFeatures:output_type -> daemon.GetFeaturesResponse + 79, // 103: daemon.DaemonService.TriggerUpdate:output_type -> daemon.TriggerUpdateResponse + 81, // 104: daemon.DaemonService.GetPeerSSHHostKey:output_type -> daemon.GetPeerSSHHostKeyResponse + 83, // 105: daemon.DaemonService.RequestJWTAuth:output_type -> daemon.RequestJWTAuthResponse + 85, // 106: daemon.DaemonService.WaitJWTToken:output_type -> daemon.WaitJWTTokenResponse + 87, // 107: daemon.DaemonService.StartCPUProfile:output_type -> daemon.StartCPUProfileResponse + 89, // 108: daemon.DaemonService.StopCPUProfile:output_type -> daemon.StopCPUProfileResponse + 7, // 109: daemon.DaemonService.NotifyOSLifecycle:output_type -> daemon.OSLifecycleResponse + 91, // 110: daemon.DaemonService.GetInstallerResult:output_type -> daemon.InstallerResultResponse + 93, // 111: daemon.DaemonService.ExposeService:output_type -> daemon.ExposeServiceEvent + 74, // [74:112] is the sub-list for method output_type + 36, // [36:74] is the sub-list for method input_type 36, // [36:36] is the sub-list for extension type_name 36, // [36:36] is the sub-list for extension extendee 0, // [0:36] is the sub-list for field type_name diff --git a/client/proto/daemon.proto b/client/proto/daemon.proto index 19976660c..7c56b94f8 100644 --- a/client/proto/daemon.proto +++ b/client/proto/daemon.proto @@ -24,6 +24,12 @@ service DaemonService { // Status of the service. rpc Status(StatusRequest) returns (StatusResponse) {} + // SubscribeStatus pushes a fresh StatusResponse on connection state + // changes (Connected / Disconnected / Connecting / address change / + // peers list change). The first message on the stream is the current + // snapshot, so a freshly-subscribed UI doesn't need to also call Status. + rpc SubscribeStatus(StatusRequest) returns (stream StatusResponse) {} + // Down stops engine work in the daemon. rpc Down(DownRequest) returns (DownResponse) {} diff --git a/client/proto/daemon_grpc.pb.go b/client/proto/daemon_grpc.pb.go index e5bd89597..47dd2a26f 100644 --- a/client/proto/daemon_grpc.pb.go +++ b/client/proto/daemon_grpc.pb.go @@ -27,6 +27,11 @@ type DaemonServiceClient interface { Up(ctx context.Context, in *UpRequest, opts ...grpc.CallOption) (*UpResponse, error) // Status of the service. Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error) + // SubscribeStatus pushes a fresh StatusResponse on connection state + // changes (Connected / Disconnected / Connecting / address change / + // peers list change). The first message on the stream is the current + // snapshot, so a freshly-subscribed UI doesn't need to also call Status. + SubscribeStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (DaemonService_SubscribeStatusClient, error) // Down stops engine work in the daemon. Down(ctx context.Context, in *DownRequest, opts ...grpc.CallOption) (*DownResponse, error) // GetConfig of the daemon. @@ -127,6 +132,38 @@ func (c *daemonServiceClient) Status(ctx context.Context, in *StatusRequest, opt return out, nil } +func (c *daemonServiceClient) SubscribeStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (DaemonService_SubscribeStatusClient, error) { + stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[0], "/daemon.DaemonService/SubscribeStatus", opts...) + if err != nil { + return nil, err + } + x := &daemonServiceSubscribeStatusClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type DaemonService_SubscribeStatusClient interface { + Recv() (*StatusResponse, error) + grpc.ClientStream +} + +type daemonServiceSubscribeStatusClient struct { + grpc.ClientStream +} + +func (x *daemonServiceSubscribeStatusClient) Recv() (*StatusResponse, error) { + m := new(StatusResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *daemonServiceClient) Down(ctx context.Context, in *DownRequest, opts ...grpc.CallOption) (*DownResponse, error) { out := new(DownResponse) err := c.cc.Invoke(ctx, "/daemon.DaemonService/Down", in, out, opts...) @@ -254,7 +291,7 @@ func (c *daemonServiceClient) TracePacket(ctx context.Context, in *TracePacketRe } func (c *daemonServiceClient) SubscribeEvents(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (DaemonService_SubscribeEventsClient, error) { - stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[0], "/daemon.DaemonService/SubscribeEvents", opts...) + stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[1], "/daemon.DaemonService/SubscribeEvents", opts...) if err != nil { return nil, err } @@ -439,7 +476,7 @@ func (c *daemonServiceClient) GetInstallerResult(ctx context.Context, in *Instal } func (c *daemonServiceClient) ExposeService(ctx context.Context, in *ExposeServiceRequest, opts ...grpc.CallOption) (DaemonService_ExposeServiceClient, error) { - stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[1], "/daemon.DaemonService/ExposeService", opts...) + stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[2], "/daemon.DaemonService/ExposeService", opts...) if err != nil { return nil, err } @@ -483,6 +520,11 @@ type DaemonServiceServer interface { Up(context.Context, *UpRequest) (*UpResponse, error) // Status of the service. Status(context.Context, *StatusRequest) (*StatusResponse, error) + // SubscribeStatus pushes a fresh StatusResponse on connection state + // changes (Connected / Disconnected / Connecting / address change / + // peers list change). The first message on the stream is the current + // snapshot, so a freshly-subscribed UI doesn't need to also call Status. + SubscribeStatus(*StatusRequest, DaemonService_SubscribeStatusServer) error // Down stops engine work in the daemon. Down(context.Context, *DownRequest) (*DownResponse, error) // GetConfig of the daemon. @@ -556,6 +598,9 @@ func (UnimplementedDaemonServiceServer) Up(context.Context, *UpRequest) (*UpResp func (UnimplementedDaemonServiceServer) Status(context.Context, *StatusRequest) (*StatusResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Status not implemented") } +func (UnimplementedDaemonServiceServer) SubscribeStatus(*StatusRequest, DaemonService_SubscribeStatusServer) error { + return status.Errorf(codes.Unimplemented, "method SubscribeStatus not implemented") +} func (UnimplementedDaemonServiceServer) Down(context.Context, *DownRequest) (*DownResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method Down not implemented") } @@ -740,6 +785,27 @@ func _DaemonService_Status_Handler(srv interface{}, ctx context.Context, dec fun return interceptor(ctx, in, info, handler) } +func _DaemonService_SubscribeStatus_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(StatusRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(DaemonServiceServer).SubscribeStatus(m, &daemonServiceSubscribeStatusServer{stream}) +} + +type DaemonService_SubscribeStatusServer interface { + Send(*StatusResponse) error + grpc.ServerStream +} + +type daemonServiceSubscribeStatusServer struct { + grpc.ServerStream +} + +func (x *daemonServiceSubscribeStatusServer) Send(m *StatusResponse) error { + return x.ServerStream.SendMsg(m) +} + func _DaemonService_Down_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(DownRequest) if err := dec(in); err != nil { @@ -1489,6 +1555,11 @@ var DaemonService_ServiceDesc = grpc.ServiceDesc{ }, }, Streams: []grpc.StreamDesc{ + { + StreamName: "SubscribeStatus", + Handler: _DaemonService_SubscribeStatus_Handler, + ServerStreams: true, + }, { StreamName: "SubscribeEvents", Handler: _DaemonService_SubscribeEvents_Handler, diff --git a/client/server/server.go b/client/server/server.go index 70e4c342f..60dfb2633 100644 --- a/client/server/server.go +++ b/client/server/server.go @@ -1101,6 +1101,13 @@ func (s *Server) Status( } } + return s.buildStatusResponse(msg) +} + +// buildStatusResponse composes a StatusResponse from the current daemon +// state. Shared between the unary Status RPC and the SubscribeStatus +// stream so both paths return identical snapshots. +func (s *Server) buildStatusResponse(msg *proto.StatusRequest) (*proto.StatusResponse, error) { status, err := internal.CtxGetState(s.rootCtx).Status() if err != nil { return nil, err diff --git a/client/server/status_stream.go b/client/server/status_stream.go new file mode 100644 index 000000000..aa1dc508c --- /dev/null +++ b/client/server/status_stream.go @@ -0,0 +1,57 @@ +package server + +import ( + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/proto" +) + +// SubscribeStatus pushes a fresh StatusResponse on every connection state +// change. The first message is the current snapshot, so a re-subscribing +// client doesn't need to also call Status. Subsequent messages fire when +// the peer recorder reports any of: connected/disconnected/connecting, +// management or signal flip, address change, or peers list change. +// +// The change channel coalesces bursts to a single tick. If the consumer +// is slow the daemon drops extras (not blocks), and the next snapshot +// the consumer pulls already reflects everything. +func (s *Server) SubscribeStatus(req *proto.StatusRequest, stream proto.DaemonService_SubscribeStatusServer) error { + subID, ch := s.statusRecorder.SubscribeToStateChanges() + defer func() { + s.statusRecorder.UnsubscribeFromStateChanges(subID) + log.Debug("client unsubscribed from status updates") + }() + + log.Debug("client subscribed to status updates") + + if err := s.sendStatusSnapshot(req, stream); err != nil { + return err + } + + for { + select { + case _, ok := <-ch: + if !ok { + return nil + } + if err := s.sendStatusSnapshot(req, stream); err != nil { + return err + } + case <-stream.Context().Done(): + return nil + } + } +} + +func (s *Server) sendStatusSnapshot(req *proto.StatusRequest, stream proto.DaemonService_SubscribeStatusServer) error { + resp, err := s.buildStatusResponse(req) + if err != nil { + log.Warnf("build status snapshot for stream: %v", err) + return err + } + if err := stream.Send(resp); err != nil { + log.Warnf("send status snapshot to stream: %v", err) + return err + } + return nil +} diff --git a/client/ui-wails/services/peers.go b/client/ui-wails/services/peers.go index 977cb4f59..8559f3bf1 100644 --- a/client/ui-wails/services/peers.go +++ b/client/ui-wails/services/peers.go @@ -15,10 +15,6 @@ import ( "github.com/netbirdio/netbird/client/proto" ) -// PollInterval is how often Watch falls back to Status polling when the -// SubscribeEvents stream is unavailable. Matches the Fyne UI's 2-second cadence. -const PollInterval = 2 * time.Second - const ( // EventStatus is emitted to the frontend whenever a fresh Status snapshot // is captured (from a poll or a stream-driven refresh). @@ -130,8 +126,18 @@ func NewPeers(conn DaemonConn, emitter Emitter) *Peers { return &Peers{conn: conn, emitter: emitter} } -// Watch starts the background loop: a poll-then-stream pair that runs until -// ctx (or the service shutdown) cancels it. Safe to call once at boot. +// Watch starts the background loops that feed the frontend: +// - statusStreamLoop: push-driven snapshots on connection-state change +// (Connected/Disconnected/Connecting, peer list, address). Drives the +// tray icon, Status page, and Peers page. +// - toastStreamLoop: DNS / network / auth / connectivity / update +// SystemEvent stream. Drives OS notifications, the Recent Events +// list, and the update-overlay flag. The daemon-side RPC is named +// SubscribeEvents — only the loop's local alias differs to keep the +// two streams distinguishable in this file. +// +// Safe to call once at boot; both loops self-restart on stream errors +// via exponential backoff. func (s *Peers) Watch(ctx context.Context) { s.mu.Lock() if s.cancel != nil { @@ -143,8 +149,8 @@ func (s *Peers) Watch(ctx context.Context) { s.mu.Unlock() s.streamWg.Add(2) - go s.pollLoop(ctx) - go s.streamLoop(ctx) + go s.statusStreamLoop(ctx) + go s.toastStreamLoop(ctx) } // ServiceShutdown is the Wails service hook fired on app exit. @@ -173,33 +179,61 @@ func (s *Peers) Get(ctx context.Context) (Status, error) { return statusFromProto(resp), nil } -func (s *Peers) pollLoop(ctx context.Context) { +// statusStreamLoop subscribes to the daemon's SubscribeStatus stream and +// re-emits each FullStatus snapshot on the Wails event bus. The first +// message is the current snapshot; subsequent messages fire on +// connection-state changes only — no fixed-interval polling, no idle +// chatter. Reconnects with exponential backoff if the stream drops +// (daemon restart, socket break). +func (s *Peers) statusStreamLoop(ctx context.Context) { defer s.streamWg.Done() - ticker := time.NewTicker(PollInterval) - defer ticker.Stop() - first := true - for { - st, err := s.Get(ctx) - if err == nil { - if first { - log.Infof("peers pollLoop: first status ok status=%q peers=%d", st.Status, len(st.Peers)) - first = false + bo := backoff.WithContext(&backoff.ExponentialBackOff{ + InitialInterval: time.Second, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: 10 * time.Second, + MaxElapsedTime: 0, + Stop: backoff.Stop, + Clock: backoff.SystemClock, + }, ctx) + + op := func() error { + cli, err := s.conn.Client() + if err != nil { + return fmt.Errorf("get client: %w", err) + } + stream, err := cli.SubscribeStatus(ctx, &proto.StatusRequest{GetFullPeerStatus: true}) + if err != nil { + return fmt.Errorf("subscribe status: %w", err) + } + for { + resp, err := stream.Recv() + if err != nil { + if ctx.Err() != nil { + return ctx.Err() + } + return fmt.Errorf("status stream recv: %w", err) } + st := statusFromProto(resp) + log.Infof("backend event: status status=%q peers=%d", st.Status, len(st.Peers)) s.emitter.Emit(EventStatus, st) - } else if ctx.Err() == nil { - log.Warnf("peers pollLoop: status poll error: %v", err) } + } - select { - case <-ctx.Done(): - return - case <-ticker.C: - } + if err := backoff.Retry(op, bo); err != nil && ctx.Err() == nil { + log.Errorf("status stream ended: %v", err) } } -func (s *Peers) streamLoop(ctx context.Context) { +// toastStreamLoop subscribes to the daemon's SubscribeEvents RPC and +// re-emits every SystemEvent on the Wails event bus. The downstream +// consumers turn these into OS notifications, populate the Recent +// Events card on the Status page, and listen for the +// "new_version_available" metadata to flip the tray's update overlay. +// Local name differs from the RPC ("SubscribeEvents") so the file's +// two streams aren't both called streamLoop. +func (s *Peers) toastStreamLoop(ctx context.Context) { defer s.streamWg.Done() bo := backoff.WithContext(&backoff.ExponentialBackOff{ @@ -229,7 +263,9 @@ func (s *Peers) streamLoop(ctx context.Context) { } return fmt.Errorf("stream recv: %w", err) } - s.emitter.Emit(EventSystem, systemEventFromProto(ev)) + se := systemEventFromProto(ev) + log.Infof("backend event: system severity=%s category=%s msg=%q", se.Severity, se.Category, se.UserMessage) + s.emitter.Emit(EventSystem, se) s.fanOutUpdateEvents(ev) } } diff --git a/client/ui-wails/tray.go b/client/ui-wails/tray.go index 34d785209..5cd770150 100644 --- a/client/ui-wails/tray.go +++ b/client/ui-wails/tray.go @@ -343,9 +343,14 @@ func (t *Tray) onUpdateProgress(ev *application.CustomEvent) { // applyStatus updates the tray icon, status label, exit-node submenu, and // connect/disconnect enablement based on the latest daemon snapshot. +// Skips the icon refresh when none of the icon-relevant inputs +// (connected, hasUpdate, status label) changed — the daemon emits +// rapid SubscribeStatus bursts during health probes that would +// otherwise spam Shell_NotifyIcon and the log. func (t *Tray) applyStatus(st services.Status) { t.mu.Lock() connected := strings.EqualFold(st.Status, "Connected") + iconChanged := connected != t.connected || st.Status != t.lastStatus t.connected = connected t.lastStatus = st.Status @@ -354,15 +359,17 @@ func (t *Tray) applyStatus(st services.Status) { t.exitNodes = exitNodes t.mu.Unlock() - t.applyIcon() - if t.statusItem != nil { - t.statusItem.SetLabel(st.Status) - } - if t.upItem != nil { - t.upItem.SetEnabled(!connected) - } - if t.downItem != nil { - t.downItem.SetEnabled(connected) + if iconChanged { + t.applyIcon() + if t.statusItem != nil { + t.statusItem.SetLabel(st.Status) + } + if t.upItem != nil { + t.upItem.SetEnabled(!connected) + } + if t.downItem != nil { + t.downItem.SetEnabled(connected) + } } if exitNodesChanged { t.rebuildExitNodes(exitNodes)