Merge branch 'ui-refactor' into ui-refactor-ui

This commit is contained in:
Eduard Gert
2026-04-30 14:57:32 +02:00
8 changed files with 359 additions and 113 deletions

View File

@@ -215,6 +215,14 @@ type Status struct {
eventStreams map[string]chan *proto.SystemEvent
eventQueue *EventQueue
// stateChangeStreams fan-out connection-state changes (connected /
// disconnected / connecting / address change / peers list change) to
// every active SubscribeStatus gRPC stream. Each subscriber gets a
// buffered chan; the notifier non-blockingly pings them so a slow
// consumer can never stall the daemon.
stateChangeMux sync.Mutex
stateChangeStreams map[string]chan struct{}
ingressGwMgr *ingressgw.Manager
routeIDLookup routeIDLookup
@@ -228,6 +236,7 @@ func NewRecorder(mgmAddress string) *Status {
changeNotify: make(map[string]map[string]*StatusChangeSubscription),
eventStreams: make(map[string]chan *proto.SystemEvent),
eventQueue: NewEventQueue(eventQueueSize),
stateChangeStreams: make(map[string]chan struct{}),
offlinePeers: make([]State, 0),
notifier: newNotifier(),
mgmAddress: mgmAddress,
@@ -990,16 +999,19 @@ func (d *Status) GetFullStatus() FullStatus {
// ClientStart will notify all listeners about the new service state
func (d *Status) ClientStart() {
d.notifier.clientStart()
d.notifyStateChange()
}
// ClientStop will notify all listeners about the new service state
func (d *Status) ClientStop() {
d.notifier.clientStop()
d.notifyStateChange()
}
// ClientTeardown will notify all listeners about the service is under teardown
func (d *Status) ClientTeardown() {
d.notifier.clientTearDown()
d.notifyStateChange()
}
// SetConnectionListener set a listener to the notifier
@@ -1014,6 +1026,7 @@ func (d *Status) RemoveConnectionListener() {
func (d *Status) onConnectionChanged() {
d.notifier.updateServerStates(d.managementState, d.signalState)
d.notifyStateChange()
}
// notifyPeerStateChangeListeners notifies route manager about the change in peer state
@@ -1049,10 +1062,12 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) {
func (d *Status) notifyPeerListChanged() {
d.notifier.peerListChanged(d.numOfPeers())
d.notifyStateChange()
}
func (d *Status) notifyAddressChanged() {
d.notifier.localAddressChanged(d.localPeer.FQDN, d.localPeer.IP)
d.notifyStateChange()
}
func (d *Status) numOfPeers() int {
@@ -1128,6 +1143,50 @@ func (d *Status) GetEventHistory() []*proto.SystemEvent {
return d.eventQueue.GetAll()
}
// SubscribeToStateChanges hands back a channel that receives a tick on
// every connection-state change (connected / disconnected / connecting /
// address change / peers-list change). The channel is buffered to one
// pending tick so a coalesced burst still wakes the consumer exactly
// once. Pass the returned id to UnsubscribeFromStateChanges to detach.
func (d *Status) SubscribeToStateChanges() (string, <-chan struct{}) {
d.stateChangeMux.Lock()
defer d.stateChangeMux.Unlock()
id := uuid.New().String()
ch := make(chan struct{}, 1)
d.stateChangeStreams[id] = ch
return id, ch
}
// UnsubscribeFromStateChanges releases a SubscribeToStateChanges channel
// and closes it so any consumer goroutine selecting on the channel
// unblocks cleanly.
func (d *Status) UnsubscribeFromStateChanges(id string) {
d.stateChangeMux.Lock()
defer d.stateChangeMux.Unlock()
if ch, ok := d.stateChangeStreams[id]; ok {
close(ch)
delete(d.stateChangeStreams, id)
}
}
// notifyStateChange wakes every SubscribeToStateChanges subscriber. Drops
// the tick if a subscriber's buffer is full — by definition the consumer
// is already going to fetch the latest snapshot, so multiple pending ticks
// would be redundant.
func (d *Status) notifyStateChange() {
d.stateChangeMux.Lock()
defer d.stateChangeMux.Unlock()
for _, ch := range d.stateChangeStreams {
select {
case ch <- struct{}{}:
default:
}
}
}
func (d *Status) SetWgIface(wgInterface WGIfaceStatus) {
d.mux.Lock()
defer d.mux.Unlock()

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.36.6
// protoc v6.33.1
// protoc v7.34.1
// source: daemon.proto
package proto
@@ -6566,12 +6566,13 @@ const file_daemon_proto_rawDesc = "" +
"\n" +
"EXPOSE_UDP\x10\x03\x12\x0e\n" +
"\n" +
"EXPOSE_TLS\x10\x042\xfc\x15\n" +
"EXPOSE_TLS\x10\x042\xc2\x16\n" +
"\rDaemonService\x126\n" +
"\x05Login\x12\x14.daemon.LoginRequest\x1a\x15.daemon.LoginResponse\"\x00\x12K\n" +
"\fWaitSSOLogin\x12\x1b.daemon.WaitSSOLoginRequest\x1a\x1c.daemon.WaitSSOLoginResponse\"\x00\x12-\n" +
"\x02Up\x12\x11.daemon.UpRequest\x1a\x12.daemon.UpResponse\"\x00\x129\n" +
"\x06Status\x12\x15.daemon.StatusRequest\x1a\x16.daemon.StatusResponse\"\x00\x123\n" +
"\x06Status\x12\x15.daemon.StatusRequest\x1a\x16.daemon.StatusResponse\"\x00\x12D\n" +
"\x0fSubscribeStatus\x12\x15.daemon.StatusRequest\x1a\x16.daemon.StatusResponse\"\x000\x01\x123\n" +
"\x04Down\x12\x13.daemon.DownRequest\x1a\x14.daemon.DownResponse\"\x00\x12B\n" +
"\tGetConfig\x12\x18.daemon.GetConfigRequest\x1a\x19.daemon.GetConfigResponse\"\x00\x12K\n" +
"\fListNetworks\x12\x1b.daemon.ListNetworksRequest\x1a\x1c.daemon.ListNetworksResponse\"\x00\x12Q\n" +
@@ -6766,78 +6767,80 @@ var file_daemon_proto_depIdxs = []int32{
10, // 37: daemon.DaemonService.WaitSSOLogin:input_type -> daemon.WaitSSOLoginRequest
12, // 38: daemon.DaemonService.Up:input_type -> daemon.UpRequest
14, // 39: daemon.DaemonService.Status:input_type -> daemon.StatusRequest
16, // 40: daemon.DaemonService.Down:input_type -> daemon.DownRequest
18, // 41: daemon.DaemonService.GetConfig:input_type -> daemon.GetConfigRequest
29, // 42: daemon.DaemonService.ListNetworks:input_type -> daemon.ListNetworksRequest
31, // 43: daemon.DaemonService.SelectNetworks:input_type -> daemon.SelectNetworksRequest
31, // 44: daemon.DaemonService.DeselectNetworks:input_type -> daemon.SelectNetworksRequest
5, // 45: daemon.DaemonService.ForwardingRules:input_type -> daemon.EmptyRequest
38, // 46: daemon.DaemonService.DebugBundle:input_type -> daemon.DebugBundleRequest
40, // 47: daemon.DaemonService.GetLogLevel:input_type -> daemon.GetLogLevelRequest
42, // 48: daemon.DaemonService.SetLogLevel:input_type -> daemon.SetLogLevelRequest
45, // 49: daemon.DaemonService.ListStates:input_type -> daemon.ListStatesRequest
47, // 50: daemon.DaemonService.CleanState:input_type -> daemon.CleanStateRequest
49, // 51: daemon.DaemonService.DeleteState:input_type -> daemon.DeleteStateRequest
51, // 52: daemon.DaemonService.SetSyncResponsePersistence:input_type -> daemon.SetSyncResponsePersistenceRequest
54, // 53: daemon.DaemonService.TracePacket:input_type -> daemon.TracePacketRequest
57, // 54: daemon.DaemonService.SubscribeEvents:input_type -> daemon.SubscribeRequest
59, // 55: daemon.DaemonService.GetEvents:input_type -> daemon.GetEventsRequest
61, // 56: daemon.DaemonService.SwitchProfile:input_type -> daemon.SwitchProfileRequest
63, // 57: daemon.DaemonService.SetConfig:input_type -> daemon.SetConfigRequest
65, // 58: daemon.DaemonService.AddProfile:input_type -> daemon.AddProfileRequest
67, // 59: daemon.DaemonService.RemoveProfile:input_type -> daemon.RemoveProfileRequest
69, // 60: daemon.DaemonService.ListProfiles:input_type -> daemon.ListProfilesRequest
72, // 61: daemon.DaemonService.GetActiveProfile:input_type -> daemon.GetActiveProfileRequest
74, // 62: daemon.DaemonService.Logout:input_type -> daemon.LogoutRequest
76, // 63: daemon.DaemonService.GetFeatures:input_type -> daemon.GetFeaturesRequest
78, // 64: daemon.DaemonService.TriggerUpdate:input_type -> daemon.TriggerUpdateRequest
80, // 65: daemon.DaemonService.GetPeerSSHHostKey:input_type -> daemon.GetPeerSSHHostKeyRequest
82, // 66: daemon.DaemonService.RequestJWTAuth:input_type -> daemon.RequestJWTAuthRequest
84, // 67: daemon.DaemonService.WaitJWTToken:input_type -> daemon.WaitJWTTokenRequest
86, // 68: daemon.DaemonService.StartCPUProfile:input_type -> daemon.StartCPUProfileRequest
88, // 69: daemon.DaemonService.StopCPUProfile:input_type -> daemon.StopCPUProfileRequest
6, // 70: daemon.DaemonService.NotifyOSLifecycle:input_type -> daemon.OSLifecycleRequest
90, // 71: daemon.DaemonService.GetInstallerResult:input_type -> daemon.InstallerResultRequest
92, // 72: daemon.DaemonService.ExposeService:input_type -> daemon.ExposeServiceRequest
9, // 73: daemon.DaemonService.Login:output_type -> daemon.LoginResponse
11, // 74: daemon.DaemonService.WaitSSOLogin:output_type -> daemon.WaitSSOLoginResponse
13, // 75: daemon.DaemonService.Up:output_type -> daemon.UpResponse
15, // 76: daemon.DaemonService.Status:output_type -> daemon.StatusResponse
17, // 77: daemon.DaemonService.Down:output_type -> daemon.DownResponse
19, // 78: daemon.DaemonService.GetConfig:output_type -> daemon.GetConfigResponse
30, // 79: daemon.DaemonService.ListNetworks:output_type -> daemon.ListNetworksResponse
32, // 80: daemon.DaemonService.SelectNetworks:output_type -> daemon.SelectNetworksResponse
32, // 81: daemon.DaemonService.DeselectNetworks:output_type -> daemon.SelectNetworksResponse
37, // 82: daemon.DaemonService.ForwardingRules:output_type -> daemon.ForwardingRulesResponse
39, // 83: daemon.DaemonService.DebugBundle:output_type -> daemon.DebugBundleResponse
41, // 84: daemon.DaemonService.GetLogLevel:output_type -> daemon.GetLogLevelResponse
43, // 85: daemon.DaemonService.SetLogLevel:output_type -> daemon.SetLogLevelResponse
46, // 86: daemon.DaemonService.ListStates:output_type -> daemon.ListStatesResponse
48, // 87: daemon.DaemonService.CleanState:output_type -> daemon.CleanStateResponse
50, // 88: daemon.DaemonService.DeleteState:output_type -> daemon.DeleteStateResponse
52, // 89: daemon.DaemonService.SetSyncResponsePersistence:output_type -> daemon.SetSyncResponsePersistenceResponse
56, // 90: daemon.DaemonService.TracePacket:output_type -> daemon.TracePacketResponse
58, // 91: daemon.DaemonService.SubscribeEvents:output_type -> daemon.SystemEvent
60, // 92: daemon.DaemonService.GetEvents:output_type -> daemon.GetEventsResponse
62, // 93: daemon.DaemonService.SwitchProfile:output_type -> daemon.SwitchProfileResponse
64, // 94: daemon.DaemonService.SetConfig:output_type -> daemon.SetConfigResponse
66, // 95: daemon.DaemonService.AddProfile:output_type -> daemon.AddProfileResponse
68, // 96: daemon.DaemonService.RemoveProfile:output_type -> daemon.RemoveProfileResponse
70, // 97: daemon.DaemonService.ListProfiles:output_type -> daemon.ListProfilesResponse
73, // 98: daemon.DaemonService.GetActiveProfile:output_type -> daemon.GetActiveProfileResponse
75, // 99: daemon.DaemonService.Logout:output_type -> daemon.LogoutResponse
77, // 100: daemon.DaemonService.GetFeatures:output_type -> daemon.GetFeaturesResponse
79, // 101: daemon.DaemonService.TriggerUpdate:output_type -> daemon.TriggerUpdateResponse
81, // 102: daemon.DaemonService.GetPeerSSHHostKey:output_type -> daemon.GetPeerSSHHostKeyResponse
83, // 103: daemon.DaemonService.RequestJWTAuth:output_type -> daemon.RequestJWTAuthResponse
85, // 104: daemon.DaemonService.WaitJWTToken:output_type -> daemon.WaitJWTTokenResponse
87, // 105: daemon.DaemonService.StartCPUProfile:output_type -> daemon.StartCPUProfileResponse
89, // 106: daemon.DaemonService.StopCPUProfile:output_type -> daemon.StopCPUProfileResponse
7, // 107: daemon.DaemonService.NotifyOSLifecycle:output_type -> daemon.OSLifecycleResponse
91, // 108: daemon.DaemonService.GetInstallerResult:output_type -> daemon.InstallerResultResponse
93, // 109: daemon.DaemonService.ExposeService:output_type -> daemon.ExposeServiceEvent
73, // [73:110] is the sub-list for method output_type
36, // [36:73] is the sub-list for method input_type
14, // 40: daemon.DaemonService.SubscribeStatus:input_type -> daemon.StatusRequest
16, // 41: daemon.DaemonService.Down:input_type -> daemon.DownRequest
18, // 42: daemon.DaemonService.GetConfig:input_type -> daemon.GetConfigRequest
29, // 43: daemon.DaemonService.ListNetworks:input_type -> daemon.ListNetworksRequest
31, // 44: daemon.DaemonService.SelectNetworks:input_type -> daemon.SelectNetworksRequest
31, // 45: daemon.DaemonService.DeselectNetworks:input_type -> daemon.SelectNetworksRequest
5, // 46: daemon.DaemonService.ForwardingRules:input_type -> daemon.EmptyRequest
38, // 47: daemon.DaemonService.DebugBundle:input_type -> daemon.DebugBundleRequest
40, // 48: daemon.DaemonService.GetLogLevel:input_type -> daemon.GetLogLevelRequest
42, // 49: daemon.DaemonService.SetLogLevel:input_type -> daemon.SetLogLevelRequest
45, // 50: daemon.DaemonService.ListStates:input_type -> daemon.ListStatesRequest
47, // 51: daemon.DaemonService.CleanState:input_type -> daemon.CleanStateRequest
49, // 52: daemon.DaemonService.DeleteState:input_type -> daemon.DeleteStateRequest
51, // 53: daemon.DaemonService.SetSyncResponsePersistence:input_type -> daemon.SetSyncResponsePersistenceRequest
54, // 54: daemon.DaemonService.TracePacket:input_type -> daemon.TracePacketRequest
57, // 55: daemon.DaemonService.SubscribeEvents:input_type -> daemon.SubscribeRequest
59, // 56: daemon.DaemonService.GetEvents:input_type -> daemon.GetEventsRequest
61, // 57: daemon.DaemonService.SwitchProfile:input_type -> daemon.SwitchProfileRequest
63, // 58: daemon.DaemonService.SetConfig:input_type -> daemon.SetConfigRequest
65, // 59: daemon.DaemonService.AddProfile:input_type -> daemon.AddProfileRequest
67, // 60: daemon.DaemonService.RemoveProfile:input_type -> daemon.RemoveProfileRequest
69, // 61: daemon.DaemonService.ListProfiles:input_type -> daemon.ListProfilesRequest
72, // 62: daemon.DaemonService.GetActiveProfile:input_type -> daemon.GetActiveProfileRequest
74, // 63: daemon.DaemonService.Logout:input_type -> daemon.LogoutRequest
76, // 64: daemon.DaemonService.GetFeatures:input_type -> daemon.GetFeaturesRequest
78, // 65: daemon.DaemonService.TriggerUpdate:input_type -> daemon.TriggerUpdateRequest
80, // 66: daemon.DaemonService.GetPeerSSHHostKey:input_type -> daemon.GetPeerSSHHostKeyRequest
82, // 67: daemon.DaemonService.RequestJWTAuth:input_type -> daemon.RequestJWTAuthRequest
84, // 68: daemon.DaemonService.WaitJWTToken:input_type -> daemon.WaitJWTTokenRequest
86, // 69: daemon.DaemonService.StartCPUProfile:input_type -> daemon.StartCPUProfileRequest
88, // 70: daemon.DaemonService.StopCPUProfile:input_type -> daemon.StopCPUProfileRequest
6, // 71: daemon.DaemonService.NotifyOSLifecycle:input_type -> daemon.OSLifecycleRequest
90, // 72: daemon.DaemonService.GetInstallerResult:input_type -> daemon.InstallerResultRequest
92, // 73: daemon.DaemonService.ExposeService:input_type -> daemon.ExposeServiceRequest
9, // 74: daemon.DaemonService.Login:output_type -> daemon.LoginResponse
11, // 75: daemon.DaemonService.WaitSSOLogin:output_type -> daemon.WaitSSOLoginResponse
13, // 76: daemon.DaemonService.Up:output_type -> daemon.UpResponse
15, // 77: daemon.DaemonService.Status:output_type -> daemon.StatusResponse
15, // 78: daemon.DaemonService.SubscribeStatus:output_type -> daemon.StatusResponse
17, // 79: daemon.DaemonService.Down:output_type -> daemon.DownResponse
19, // 80: daemon.DaemonService.GetConfig:output_type -> daemon.GetConfigResponse
30, // 81: daemon.DaemonService.ListNetworks:output_type -> daemon.ListNetworksResponse
32, // 82: daemon.DaemonService.SelectNetworks:output_type -> daemon.SelectNetworksResponse
32, // 83: daemon.DaemonService.DeselectNetworks:output_type -> daemon.SelectNetworksResponse
37, // 84: daemon.DaemonService.ForwardingRules:output_type -> daemon.ForwardingRulesResponse
39, // 85: daemon.DaemonService.DebugBundle:output_type -> daemon.DebugBundleResponse
41, // 86: daemon.DaemonService.GetLogLevel:output_type -> daemon.GetLogLevelResponse
43, // 87: daemon.DaemonService.SetLogLevel:output_type -> daemon.SetLogLevelResponse
46, // 88: daemon.DaemonService.ListStates:output_type -> daemon.ListStatesResponse
48, // 89: daemon.DaemonService.CleanState:output_type -> daemon.CleanStateResponse
50, // 90: daemon.DaemonService.DeleteState:output_type -> daemon.DeleteStateResponse
52, // 91: daemon.DaemonService.SetSyncResponsePersistence:output_type -> daemon.SetSyncResponsePersistenceResponse
56, // 92: daemon.DaemonService.TracePacket:output_type -> daemon.TracePacketResponse
58, // 93: daemon.DaemonService.SubscribeEvents:output_type -> daemon.SystemEvent
60, // 94: daemon.DaemonService.GetEvents:output_type -> daemon.GetEventsResponse
62, // 95: daemon.DaemonService.SwitchProfile:output_type -> daemon.SwitchProfileResponse
64, // 96: daemon.DaemonService.SetConfig:output_type -> daemon.SetConfigResponse
66, // 97: daemon.DaemonService.AddProfile:output_type -> daemon.AddProfileResponse
68, // 98: daemon.DaemonService.RemoveProfile:output_type -> daemon.RemoveProfileResponse
70, // 99: daemon.DaemonService.ListProfiles:output_type -> daemon.ListProfilesResponse
73, // 100: daemon.DaemonService.GetActiveProfile:output_type -> daemon.GetActiveProfileResponse
75, // 101: daemon.DaemonService.Logout:output_type -> daemon.LogoutResponse
77, // 102: daemon.DaemonService.GetFeatures:output_type -> daemon.GetFeaturesResponse
79, // 103: daemon.DaemonService.TriggerUpdate:output_type -> daemon.TriggerUpdateResponse
81, // 104: daemon.DaemonService.GetPeerSSHHostKey:output_type -> daemon.GetPeerSSHHostKeyResponse
83, // 105: daemon.DaemonService.RequestJWTAuth:output_type -> daemon.RequestJWTAuthResponse
85, // 106: daemon.DaemonService.WaitJWTToken:output_type -> daemon.WaitJWTTokenResponse
87, // 107: daemon.DaemonService.StartCPUProfile:output_type -> daemon.StartCPUProfileResponse
89, // 108: daemon.DaemonService.StopCPUProfile:output_type -> daemon.StopCPUProfileResponse
7, // 109: daemon.DaemonService.NotifyOSLifecycle:output_type -> daemon.OSLifecycleResponse
91, // 110: daemon.DaemonService.GetInstallerResult:output_type -> daemon.InstallerResultResponse
93, // 111: daemon.DaemonService.ExposeService:output_type -> daemon.ExposeServiceEvent
74, // [74:112] is the sub-list for method output_type
36, // [36:74] is the sub-list for method input_type
36, // [36:36] is the sub-list for extension type_name
36, // [36:36] is the sub-list for extension extendee
0, // [0:36] is the sub-list for field type_name

View File

@@ -24,6 +24,12 @@ service DaemonService {
// Status of the service.
rpc Status(StatusRequest) returns (StatusResponse) {}
// SubscribeStatus pushes a fresh StatusResponse on connection state
// changes (Connected / Disconnected / Connecting / address change /
// peers list change). The first message on the stream is the current
// snapshot, so a freshly-subscribed UI doesn't need to also call Status.
rpc SubscribeStatus(StatusRequest) returns (stream StatusResponse) {}
// Down stops engine work in the daemon.
rpc Down(DownRequest) returns (DownResponse) {}

View File

@@ -27,6 +27,11 @@ type DaemonServiceClient interface {
Up(ctx context.Context, in *UpRequest, opts ...grpc.CallOption) (*UpResponse, error)
// Status of the service.
Status(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (*StatusResponse, error)
// SubscribeStatus pushes a fresh StatusResponse on connection state
// changes (Connected / Disconnected / Connecting / address change /
// peers list change). The first message on the stream is the current
// snapshot, so a freshly-subscribed UI doesn't need to also call Status.
SubscribeStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (DaemonService_SubscribeStatusClient, error)
// Down stops engine work in the daemon.
Down(ctx context.Context, in *DownRequest, opts ...grpc.CallOption) (*DownResponse, error)
// GetConfig of the daemon.
@@ -127,6 +132,38 @@ func (c *daemonServiceClient) Status(ctx context.Context, in *StatusRequest, opt
return out, nil
}
func (c *daemonServiceClient) SubscribeStatus(ctx context.Context, in *StatusRequest, opts ...grpc.CallOption) (DaemonService_SubscribeStatusClient, error) {
stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[0], "/daemon.DaemonService/SubscribeStatus", opts...)
if err != nil {
return nil, err
}
x := &daemonServiceSubscribeStatusClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type DaemonService_SubscribeStatusClient interface {
Recv() (*StatusResponse, error)
grpc.ClientStream
}
type daemonServiceSubscribeStatusClient struct {
grpc.ClientStream
}
func (x *daemonServiceSubscribeStatusClient) Recv() (*StatusResponse, error) {
m := new(StatusResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *daemonServiceClient) Down(ctx context.Context, in *DownRequest, opts ...grpc.CallOption) (*DownResponse, error) {
out := new(DownResponse)
err := c.cc.Invoke(ctx, "/daemon.DaemonService/Down", in, out, opts...)
@@ -254,7 +291,7 @@ func (c *daemonServiceClient) TracePacket(ctx context.Context, in *TracePacketRe
}
func (c *daemonServiceClient) SubscribeEvents(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (DaemonService_SubscribeEventsClient, error) {
stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[0], "/daemon.DaemonService/SubscribeEvents", opts...)
stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[1], "/daemon.DaemonService/SubscribeEvents", opts...)
if err != nil {
return nil, err
}
@@ -439,7 +476,7 @@ func (c *daemonServiceClient) GetInstallerResult(ctx context.Context, in *Instal
}
func (c *daemonServiceClient) ExposeService(ctx context.Context, in *ExposeServiceRequest, opts ...grpc.CallOption) (DaemonService_ExposeServiceClient, error) {
stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[1], "/daemon.DaemonService/ExposeService", opts...)
stream, err := c.cc.NewStream(ctx, &DaemonService_ServiceDesc.Streams[2], "/daemon.DaemonService/ExposeService", opts...)
if err != nil {
return nil, err
}
@@ -483,6 +520,11 @@ type DaemonServiceServer interface {
Up(context.Context, *UpRequest) (*UpResponse, error)
// Status of the service.
Status(context.Context, *StatusRequest) (*StatusResponse, error)
// SubscribeStatus pushes a fresh StatusResponse on connection state
// changes (Connected / Disconnected / Connecting / address change /
// peers list change). The first message on the stream is the current
// snapshot, so a freshly-subscribed UI doesn't need to also call Status.
SubscribeStatus(*StatusRequest, DaemonService_SubscribeStatusServer) error
// Down stops engine work in the daemon.
Down(context.Context, *DownRequest) (*DownResponse, error)
// GetConfig of the daemon.
@@ -556,6 +598,9 @@ func (UnimplementedDaemonServiceServer) Up(context.Context, *UpRequest) (*UpResp
func (UnimplementedDaemonServiceServer) Status(context.Context, *StatusRequest) (*StatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Status not implemented")
}
func (UnimplementedDaemonServiceServer) SubscribeStatus(*StatusRequest, DaemonService_SubscribeStatusServer) error {
return status.Errorf(codes.Unimplemented, "method SubscribeStatus not implemented")
}
func (UnimplementedDaemonServiceServer) Down(context.Context, *DownRequest) (*DownResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Down not implemented")
}
@@ -740,6 +785,27 @@ func _DaemonService_Status_Handler(srv interface{}, ctx context.Context, dec fun
return interceptor(ctx, in, info, handler)
}
func _DaemonService_SubscribeStatus_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(StatusRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(DaemonServiceServer).SubscribeStatus(m, &daemonServiceSubscribeStatusServer{stream})
}
type DaemonService_SubscribeStatusServer interface {
Send(*StatusResponse) error
grpc.ServerStream
}
type daemonServiceSubscribeStatusServer struct {
grpc.ServerStream
}
func (x *daemonServiceSubscribeStatusServer) Send(m *StatusResponse) error {
return x.ServerStream.SendMsg(m)
}
func _DaemonService_Down_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DownRequest)
if err := dec(in); err != nil {
@@ -1489,6 +1555,11 @@ var DaemonService_ServiceDesc = grpc.ServiceDesc{
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "SubscribeStatus",
Handler: _DaemonService_SubscribeStatus_Handler,
ServerStreams: true,
},
{
StreamName: "SubscribeEvents",
Handler: _DaemonService_SubscribeEvents_Handler,

View File

@@ -1101,6 +1101,13 @@ func (s *Server) Status(
}
}
return s.buildStatusResponse(msg)
}
// buildStatusResponse composes a StatusResponse from the current daemon
// state. Shared between the unary Status RPC and the SubscribeStatus
// stream so both paths return identical snapshots.
func (s *Server) buildStatusResponse(msg *proto.StatusRequest) (*proto.StatusResponse, error) {
status, err := internal.CtxGetState(s.rootCtx).Status()
if err != nil {
return nil, err

View File

@@ -0,0 +1,57 @@
package server
import (
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/proto"
)
// SubscribeStatus pushes a fresh StatusResponse on every connection state
// change. The first message is the current snapshot, so a re-subscribing
// client doesn't need to also call Status. Subsequent messages fire when
// the peer recorder reports any of: connected/disconnected/connecting,
// management or signal flip, address change, or peers list change.
//
// The change channel coalesces bursts to a single tick. If the consumer
// is slow the daemon drops extras (not blocks), and the next snapshot
// the consumer pulls already reflects everything.
func (s *Server) SubscribeStatus(req *proto.StatusRequest, stream proto.DaemonService_SubscribeStatusServer) error {
subID, ch := s.statusRecorder.SubscribeToStateChanges()
defer func() {
s.statusRecorder.UnsubscribeFromStateChanges(subID)
log.Debug("client unsubscribed from status updates")
}()
log.Debug("client subscribed to status updates")
if err := s.sendStatusSnapshot(req, stream); err != nil {
return err
}
for {
select {
case _, ok := <-ch:
if !ok {
return nil
}
if err := s.sendStatusSnapshot(req, stream); err != nil {
return err
}
case <-stream.Context().Done():
return nil
}
}
}
func (s *Server) sendStatusSnapshot(req *proto.StatusRequest, stream proto.DaemonService_SubscribeStatusServer) error {
resp, err := s.buildStatusResponse(req)
if err != nil {
log.Warnf("build status snapshot for stream: %v", err)
return err
}
if err := stream.Send(resp); err != nil {
log.Warnf("send status snapshot to stream: %v", err)
return err
}
return nil
}

View File

@@ -15,10 +15,6 @@ import (
"github.com/netbirdio/netbird/client/proto"
)
// PollInterval is how often Watch falls back to Status polling when the
// SubscribeEvents stream is unavailable. Matches the Fyne UI's 2-second cadence.
const PollInterval = 2 * time.Second
const (
// EventStatus is emitted to the frontend whenever a fresh Status snapshot
// is captured (from a poll or a stream-driven refresh).
@@ -130,8 +126,18 @@ func NewPeers(conn DaemonConn, emitter Emitter) *Peers {
return &Peers{conn: conn, emitter: emitter}
}
// Watch starts the background loop: a poll-then-stream pair that runs until
// ctx (or the service shutdown) cancels it. Safe to call once at boot.
// Watch starts the background loops that feed the frontend:
// - statusStreamLoop: push-driven snapshots on connection-state change
// (Connected/Disconnected/Connecting, peer list, address). Drives the
// tray icon, Status page, and Peers page.
// - toastStreamLoop: DNS / network / auth / connectivity / update
// SystemEvent stream. Drives OS notifications, the Recent Events
// list, and the update-overlay flag. The daemon-side RPC is named
// SubscribeEvents — only the loop's local alias differs to keep the
// two streams distinguishable in this file.
//
// Safe to call once at boot; both loops self-restart on stream errors
// via exponential backoff.
func (s *Peers) Watch(ctx context.Context) {
s.mu.Lock()
if s.cancel != nil {
@@ -143,8 +149,8 @@ func (s *Peers) Watch(ctx context.Context) {
s.mu.Unlock()
s.streamWg.Add(2)
go s.pollLoop(ctx)
go s.streamLoop(ctx)
go s.statusStreamLoop(ctx)
go s.toastStreamLoop(ctx)
}
// ServiceShutdown is the Wails service hook fired on app exit.
@@ -173,33 +179,61 @@ func (s *Peers) Get(ctx context.Context) (Status, error) {
return statusFromProto(resp), nil
}
func (s *Peers) pollLoop(ctx context.Context) {
// statusStreamLoop subscribes to the daemon's SubscribeStatus stream and
// re-emits each FullStatus snapshot on the Wails event bus. The first
// message is the current snapshot; subsequent messages fire on
// connection-state changes only — no fixed-interval polling, no idle
// chatter. Reconnects with exponential backoff if the stream drops
// (daemon restart, socket break).
func (s *Peers) statusStreamLoop(ctx context.Context) {
defer s.streamWg.Done()
ticker := time.NewTicker(PollInterval)
defer ticker.Stop()
first := true
for {
st, err := s.Get(ctx)
if err == nil {
if first {
log.Infof("peers pollLoop: first status ok status=%q peers=%d", st.Status, len(st.Peers))
first = false
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 0,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
op := func() error {
cli, err := s.conn.Client()
if err != nil {
return fmt.Errorf("get client: %w", err)
}
stream, err := cli.SubscribeStatus(ctx, &proto.StatusRequest{GetFullPeerStatus: true})
if err != nil {
return fmt.Errorf("subscribe status: %w", err)
}
for {
resp, err := stream.Recv()
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
}
return fmt.Errorf("status stream recv: %w", err)
}
st := statusFromProto(resp)
log.Infof("backend event: status status=%q peers=%d", st.Status, len(st.Peers))
s.emitter.Emit(EventStatus, st)
} else if ctx.Err() == nil {
log.Warnf("peers pollLoop: status poll error: %v", err)
}
}
select {
case <-ctx.Done():
return
case <-ticker.C:
}
if err := backoff.Retry(op, bo); err != nil && ctx.Err() == nil {
log.Errorf("status stream ended: %v", err)
}
}
func (s *Peers) streamLoop(ctx context.Context) {
// toastStreamLoop subscribes to the daemon's SubscribeEvents RPC and
// re-emits every SystemEvent on the Wails event bus. The downstream
// consumers turn these into OS notifications, populate the Recent
// Events card on the Status page, and listen for the
// "new_version_available" metadata to flip the tray's update overlay.
// Local name differs from the RPC ("SubscribeEvents") so the file's
// two streams aren't both called streamLoop.
func (s *Peers) toastStreamLoop(ctx context.Context) {
defer s.streamWg.Done()
bo := backoff.WithContext(&backoff.ExponentialBackOff{
@@ -229,7 +263,9 @@ func (s *Peers) streamLoop(ctx context.Context) {
}
return fmt.Errorf("stream recv: %w", err)
}
s.emitter.Emit(EventSystem, systemEventFromProto(ev))
se := systemEventFromProto(ev)
log.Infof("backend event: system severity=%s category=%s msg=%q", se.Severity, se.Category, se.UserMessage)
s.emitter.Emit(EventSystem, se)
s.fanOutUpdateEvents(ev)
}
}

View File

@@ -343,9 +343,14 @@ func (t *Tray) onUpdateProgress(ev *application.CustomEvent) {
// applyStatus updates the tray icon, status label, exit-node submenu, and
// connect/disconnect enablement based on the latest daemon snapshot.
// Skips the icon refresh when none of the icon-relevant inputs
// (connected, hasUpdate, status label) changed — the daemon emits
// rapid SubscribeStatus bursts during health probes that would
// otherwise spam Shell_NotifyIcon and the log.
func (t *Tray) applyStatus(st services.Status) {
t.mu.Lock()
connected := strings.EqualFold(st.Status, "Connected")
iconChanged := connected != t.connected || st.Status != t.lastStatus
t.connected = connected
t.lastStatus = st.Status
@@ -354,15 +359,17 @@ func (t *Tray) applyStatus(st services.Status) {
t.exitNodes = exitNodes
t.mu.Unlock()
t.applyIcon()
if t.statusItem != nil {
t.statusItem.SetLabel(st.Status)
}
if t.upItem != nil {
t.upItem.SetEnabled(!connected)
}
if t.downItem != nil {
t.downItem.SetEnabled(connected)
if iconChanged {
t.applyIcon()
if t.statusItem != nil {
t.statusItem.SetLabel(st.Status)
}
if t.upItem != nil {
t.upItem.SetEnabled(!connected)
}
if t.downItem != nil {
t.downItem.SetEnabled(connected)
}
}
if exitNodesChanged {
t.rebuildExitNodes(exitNodes)