diff --git a/client/cmd/testutil.go b/client/cmd/testutil.go index c90faa24a..28423776f 100644 --- a/client/cmd/testutil.go +++ b/client/cmd/testutil.go @@ -73,7 +73,7 @@ func startManagement(t *testing.T, config *mgmt.Config) (*grpc.Server, net.Liste t.Fatal(err) } - peersUpdateManager := mgmt.NewPeersUpdateManager() + peersUpdateManager := mgmt.NewPeersUpdateManager(nil) eventStore := &activity.InMemoryEventStore{} if err != nil { return nil, nil diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index 42012bd0a..a855cf051 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -1044,7 +1044,7 @@ func startManagement(dataDir string) (*grpc.Server, string, error) { return nil, "", err } - peersUpdateManager := server.NewPeersUpdateManager() + peersUpdateManager := server.NewPeersUpdateManager(nil) eventStore := &activity.InMemoryEventStore{} if err != nil { return nil, "", err diff --git a/management/client/client_test.go b/management/client/client_test.go index ab8df99df..c5e5b8140 100644 --- a/management/client/client_test.go +++ b/management/client/client_test.go @@ -58,7 +58,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) { t.Fatal(err) } - peersUpdateManager := mgmt.NewPeersUpdateManager() + peersUpdateManager := mgmt.NewPeersUpdateManager(nil) eventStore := &activity.InMemoryEventStore{} accountManager, err := mgmt.BuildManager(store, peersUpdateManager, nil, "", "", eventStore, false) diff --git a/management/cmd/management.go b/management/cmd/management.go index 1a00a0f57..f05de4e4e 100644 --- a/management/cmd/management.go +++ b/management/cmd/management.go @@ -130,7 +130,7 @@ var ( if err != nil { return fmt.Errorf("failed creating Store: %s: %v", config.Datadir, err) } - peersUpdateManager := server.NewPeersUpdateManager() + peersUpdateManager := server.NewPeersUpdateManager(appMetrics) var idpManager idp.Manager if config.IdpManagerConfig != nil { diff --git a/management/server/account_test.go b/management/server/account_test.go index 181d0447b..ad0ccfbce 100644 --- a/management/server/account_test.go +++ b/management/server/account_test.go @@ -2047,7 +2047,7 @@ func createManager(t *testing.T) (*DefaultAccountManager, error) { return nil, err } eventStore := &activity.InMemoryEventStore{} - return BuildManager(store, NewPeersUpdateManager(), nil, "", "netbird.cloud", eventStore, false) + return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, false) } func createStore(t *testing.T) (Store, error) { diff --git a/management/server/dns_test.go b/management/server/dns_test.go index 54044cf93..62408e4b3 100644 --- a/management/server/dns_test.go +++ b/management/server/dns_test.go @@ -192,7 +192,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) { return nil, err } eventStore := &activity.InMemoryEventStore{} - return BuildManager(store, NewPeersUpdateManager(), nil, "", "netbird.test", eventStore, false) + return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, false) } func createDNSStore(t *testing.T) (Store, error) { diff --git a/management/server/management_proto_test.go b/management/server/management_proto_test.go index 000f6c9de..21e9862f0 100644 --- a/management/server/management_proto_test.go +++ b/management/server/management_proto_test.go @@ -410,7 +410,7 @@ func startManagement(t *testing.T, config *Config) (*grpc.Server, string, error) if err != nil { return nil, "", err } - peersUpdateManager := NewPeersUpdateManager() + peersUpdateManager := NewPeersUpdateManager(nil) eventStore := &activity.InMemoryEventStore{} accountManager, err := BuildManager(store, peersUpdateManager, nil, "", "", eventStore, false) diff --git a/management/server/management_test.go b/management/server/management_test.go index 375e7e634..ee9641a8c 100644 --- a/management/server/management_test.go +++ b/management/server/management_test.go @@ -501,7 +501,7 @@ func startServer(config *server.Config) (*grpc.Server, net.Listener) { if err != nil { log.Fatalf("failed creating a store: %s: %v", config.Datadir, err) } - peersUpdateManager := server.NewPeersUpdateManager() + peersUpdateManager := server.NewPeersUpdateManager(nil) eventStore := &activity.InMemoryEventStore{} accountManager, err := server.BuildManager(store, peersUpdateManager, nil, "", "", eventStore, false) diff --git a/management/server/nameserver_test.go b/management/server/nameserver_test.go index 6a08706ca..d1f4cd015 100644 --- a/management/server/nameserver_test.go +++ b/management/server/nameserver_test.go @@ -747,7 +747,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) { return nil, err } eventStore := &activity.InMemoryEventStore{} - return BuildManager(store, NewPeersUpdateManager(), nil, "", "", eventStore, false) + return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "", eventStore, false) } func createNSStore(t *testing.T) (Store, error) { diff --git a/management/server/route_test.go b/management/server/route_test.go index e55e76928..5112a5780 100644 --- a/management/server/route_test.go +++ b/management/server/route_test.go @@ -1013,7 +1013,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) { return nil, err } eventStore := &activity.InMemoryEventStore{} - return BuildManager(store, NewPeersUpdateManager(), nil, "", "", eventStore, false) + return BuildManager(store, NewPeersUpdateManager(nil), nil, "", "", eventStore, false) } func createRouterStore(t *testing.T) (Store, error) { diff --git a/management/server/telemetry/app_metrics.go b/management/server/telemetry/app_metrics.go index de5d278b9..56f4fb9c8 100644 --- a/management/server/telemetry/app_metrics.go +++ b/management/server/telemetry/app_metrics.go @@ -20,13 +20,14 @@ const defaultEndpoint = "/metrics" // MockAppMetrics mocks the AppMetrics interface type MockAppMetrics struct { - GetMeterFunc func() metric2.Meter - CloseFunc func() error - ExposeFunc func(port int, endpoint string) error - IDPMetricsFunc func() *IDPMetrics - HTTPMiddlewareFunc func() *HTTPMiddleware - GRPCMetricsFunc func() *GRPCMetrics - StoreMetricsFunc func() *StoreMetrics + GetMeterFunc func() metric2.Meter + CloseFunc func() error + ExposeFunc func(port int, endpoint string) error + IDPMetricsFunc func() *IDPMetrics + HTTPMiddlewareFunc func() *HTTPMiddleware + GRPCMetricsFunc func() *GRPCMetrics + StoreMetricsFunc func() *StoreMetrics + UpdateChannelMetricsFunc func() *UpdateChannelMetrics } // GetMeter mocks the GetMeter function of the AppMetrics interface @@ -85,6 +86,14 @@ func (mock *MockAppMetrics) StoreMetrics() *StoreMetrics { return nil } +// UpdateChannelMetrics mocks the MockAppMetrics function of the UpdateChannelMetrics interface +func (mock *MockAppMetrics) UpdateChannelMetrics() *UpdateChannelMetrics { + if mock.UpdateChannelMetricsFunc != nil { + return mock.UpdateChannelMetricsFunc() + } + return nil +} + // AppMetrics is metrics interface type AppMetrics interface { GetMeter() metric2.Meter @@ -94,18 +103,20 @@ type AppMetrics interface { HTTPMiddleware() *HTTPMiddleware GRPCMetrics() *GRPCMetrics StoreMetrics() *StoreMetrics + UpdateChannelMetrics() *UpdateChannelMetrics } // defaultAppMetrics are core application metrics based on OpenTelemetry https://opentelemetry.io/ type defaultAppMetrics struct { // Meter can be used by different application parts to create counters and measure things - Meter metric2.Meter - listener net.Listener - ctx context.Context - idpMetrics *IDPMetrics - httpMiddleware *HTTPMiddleware - grpcMetrics *GRPCMetrics - storeMetrics *StoreMetrics + Meter metric2.Meter + listener net.Listener + ctx context.Context + idpMetrics *IDPMetrics + httpMiddleware *HTTPMiddleware + grpcMetrics *GRPCMetrics + storeMetrics *StoreMetrics + updateChannelMetrics *UpdateChannelMetrics } // IDPMetrics returns metrics for the idp package @@ -128,6 +139,11 @@ func (appMetrics *defaultAppMetrics) StoreMetrics() *StoreMetrics { return appMetrics.storeMetrics } +// UpdateChannelMetrics returns metrics for the updatechannel +func (appMetrics *defaultAppMetrics) UpdateChannelMetrics() *UpdateChannelMetrics { + return appMetrics.updateChannelMetrics +} + // Close stop application metrics HTTP handler and closes listener. func (appMetrics *defaultAppMetrics) Close() error { if appMetrics.listener == nil { @@ -199,6 +215,18 @@ func NewDefaultAppMetrics(ctx context.Context) (AppMetrics, error) { return nil, err } - return &defaultAppMetrics{Meter: meter, ctx: ctx, idpMetrics: idpMetrics, httpMiddleware: middleware, - grpcMetrics: grpcMetrics, storeMetrics: storeMetrics}, nil + updateChannelMetrics, err := NewUpdateChannelMetrics(ctx, meter) + if err != nil { + return nil, err + } + + return &defaultAppMetrics{ + Meter: meter, + ctx: ctx, + idpMetrics: idpMetrics, + httpMiddleware: middleware, + grpcMetrics: grpcMetrics, + storeMetrics: storeMetrics, + updateChannelMetrics: updateChannelMetrics, + }, nil } diff --git a/management/server/telemetry/store_metrics.go b/management/server/telemetry/store_metrics.go index 98c13f12a..6415f765e 100644 --- a/management/server/telemetry/store_metrics.go +++ b/management/server/telemetry/store_metrics.go @@ -9,7 +9,7 @@ import ( "go.opentelemetry.io/otel/metric/instrument/syncint64" ) -// StoreMetrics represents all metrics related to the FileStore +// StoreMetrics represents all metrics related to the Store type StoreMetrics struct { globalLockAcquisitionDurationMicro syncint64.Histogram globalLockAcquisitionDurationMs syncint64.Histogram diff --git a/management/server/telemetry/updatechannel_metrics.go b/management/server/telemetry/updatechannel_metrics.go new file mode 100644 index 000000000..ede1671fc --- /dev/null +++ b/management/server/telemetry/updatechannel_metrics.go @@ -0,0 +1,141 @@ +package telemetry + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/metric/instrument/syncint64" +) + +// UpdateChannelMetrics represents all metrics related to the UpdateChannel +type UpdateChannelMetrics struct { + createChannelDurationMs syncint64.Histogram + createChannelDurationMicro syncint64.Histogram + closeChannelDurationMs syncint64.Histogram + closeChannelDurationMicro syncint64.Histogram + closeChannelsDurationMs syncint64.Histogram + closeChannelsDurationMicro syncint64.Histogram + closeChannels syncint64.Histogram + sendUpdateDurationMs syncint64.Histogram + sendUpdateDurationMicro syncint64.Histogram + getAllConnectedPeersDurationMs syncint64.Histogram + getAllConnectedPeersDurationMicro syncint64.Histogram + getAllConnectedPeers syncint64.Histogram + ctx context.Context +} + +// NewUpdateChannelMetrics creates an instance of UpdateChannel +func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateChannelMetrics, error) { + createChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.ms") + if err != nil { + return nil, err + } + + createChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.create.duration.micro") + if err != nil { + return nil, err + } + + closeChannelDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.ms") + if err != nil { + return nil, err + } + + closeChannelDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.one.duration.micro") + if err != nil { + return nil, err + } + + closeChannelsDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.ms") + if err != nil { + return nil, err + } + + closeChannelsDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.duration.micro") + if err != nil { + return nil, err + } + + closeChannels, err := meter.SyncInt64().Histogram("management.updatechannel.close.multiple.channels") + if err != nil { + return nil, err + } + + sendUpdateDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.ms") + if err != nil { + return nil, err + } + + sendUpdateDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.send.duration.micro") + if err != nil { + return nil, err + } + + getAllConnectedPeersDurationMs, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.ms") + if err != nil { + return nil, err + } + + getAllConnectedPeersDurationMicro, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.duration.micro") + if err != nil { + return nil, err + } + + getAllConnectedPeers, err := meter.SyncInt64().Histogram("management.updatechannel.get.all.peers") + if err != nil { + return nil, err + } + + return &UpdateChannelMetrics{ + createChannelDurationMs: createChannelDurationMs, + createChannelDurationMicro: createChannelDurationMicro, + closeChannelDurationMs: closeChannelDurationMs, + closeChannelDurationMicro: closeChannelDurationMicro, + closeChannelsDurationMs: closeChannelsDurationMs, + closeChannelsDurationMicro: closeChannelsDurationMicro, + closeChannels: closeChannels, + sendUpdateDurationMs: sendUpdateDurationMs, + sendUpdateDurationMicro: sendUpdateDurationMicro, + getAllConnectedPeersDurationMs: getAllConnectedPeersDurationMs, + getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro, + getAllConnectedPeers: getAllConnectedPeers, + ctx: ctx, + }, nil +} + +// CountCreateChannelDuration counts the duration of the CreateChannel method, +// closed indicates if existing channel was closed before creation of a new one +func (metrics *UpdateChannelMetrics) CountCreateChannelDuration(duration time.Duration, closed bool) { + metrics.createChannelDurationMs.Record(metrics.ctx, duration.Milliseconds(), attribute.Bool("closed", closed)) + metrics.createChannelDurationMicro.Record(metrics.ctx, duration.Microseconds(), attribute.Bool("closed", closed)) +} + +// CountCloseChannelDuration counts the duration of the CloseChannel method +func (metrics *UpdateChannelMetrics) CountCloseChannelDuration(duration time.Duration) { + metrics.closeChannelDurationMs.Record(metrics.ctx, duration.Milliseconds()) + metrics.closeChannelDurationMicro.Record(metrics.ctx, duration.Microseconds()) +} + +// CountCloseChannelsDuration counts the duration of the CloseChannels method and the number of channels have been closed +func (metrics *UpdateChannelMetrics) CountCloseChannelsDuration(duration time.Duration, channels int) { + metrics.closeChannelsDurationMs.Record(metrics.ctx, duration.Milliseconds()) + metrics.closeChannelsDurationMicro.Record(metrics.ctx, duration.Microseconds()) + metrics.closeChannels.Record(metrics.ctx, int64(channels)) +} + +// CountSendUpdateDuration counts the duration of the SendUpdate method +// found indicates if peer had channel, dropped indicates if the message was dropped due channel buffer overload +func (metrics *UpdateChannelMetrics) CountSendUpdateDuration(duration time.Duration, found, dropped bool) { + attrs := []attribute.KeyValue{attribute.Bool("found", found), attribute.Bool("dropped", dropped)} + metrics.sendUpdateDurationMs.Record(metrics.ctx, duration.Milliseconds(), attrs...) + metrics.sendUpdateDurationMicro.Record(metrics.ctx, duration.Microseconds(), attrs...) +} + +// CountGetAllConnectedPeersDuration counts the duration of the GetAllConnectedPeers method and the number of peers have been returned +func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration time.Duration, peers int) { + metrics.getAllConnectedPeersDurationMs.Record(metrics.ctx, duration.Milliseconds()) + metrics.getAllConnectedPeersDurationMicro.Record(metrics.ctx, duration.Microseconds()) + metrics.getAllConnectedPeers.Record(metrics.ctx, int64(peers)) +} diff --git a/management/server/turncredentials_test.go b/management/server/turncredentials_test.go index a8d282266..5066fdbe9 100644 --- a/management/server/turncredentials_test.go +++ b/management/server/turncredentials_test.go @@ -20,7 +20,7 @@ var TurnTestHost = &Host{ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) { ttl := util.Duration{Duration: time.Hour} secret := "some_secret" - peersManager := NewPeersUpdateManager() + peersManager := NewPeersUpdateManager(nil) tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{ CredentialsTTL: ttl, @@ -44,7 +44,7 @@ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) { func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) { ttl := util.Duration{Duration: 2 * time.Second} secret := "some_secret" - peersManager := NewPeersUpdateManager() + peersManager := NewPeersUpdateManager(nil) peer := "some_peer" updateChannel := peersManager.CreateChannel(peer) @@ -93,7 +93,7 @@ loop: func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) { ttl := util.Duration{Duration: time.Hour} secret := "some_secret" - peersManager := NewPeersUpdateManager() + peersManager := NewPeersUpdateManager(nil) peer := "some_peer" tested := NewTimeBasedAuthSecretsManager(peersManager, &TURNConfig{ diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index 5e6bcbb1c..0f14497fd 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -2,10 +2,12 @@ package server import ( "sync" + "time" log "github.com/sirupsen/logrus" "github.com/netbirdio/netbird/management/proto" + "github.com/netbirdio/netbird/management/server/telemetry" ) const channelBufferSize = 100 @@ -17,26 +19,41 @@ type UpdateMessage struct { type PeersUpdateManager struct { // peerChannels is an update channel indexed by Peer.ID peerChannels map[string]chan *UpdateMessage - channelsMux *sync.Mutex + // channelsMux keeps the mutex to access peerChannels + channelsMux *sync.Mutex + // metrics provides method to collect application metrics + metrics telemetry.AppMetrics } // NewPeersUpdateManager returns a new instance of PeersUpdateManager -func NewPeersUpdateManager() *PeersUpdateManager { +func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager { return &PeersUpdateManager{ peerChannels: make(map[string]chan *UpdateMessage), channelsMux: &sync.Mutex{}, + metrics: metrics, } } // SendUpdate sends update message to the peer's channel func (p *PeersUpdateManager) SendUpdate(peerID string, update *UpdateMessage) { + start := time.Now() + var found, dropped bool + p.channelsMux.Lock() - defer p.channelsMux.Unlock() + defer func() { + p.channelsMux.Unlock() + if p.metrics != nil { + p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped) + } + }() + if channel, ok := p.peerChannels[peerID]; ok { + found = true select { case channel <- update: log.Debugf("update was sent to channel for peer %s", peerID) default: + dropped = true log.Warnf("channel for peer %s is %d full", peerID, len(channel)) } } else { @@ -46,10 +63,20 @@ func (p *PeersUpdateManager) SendUpdate(peerID string, update *UpdateMessage) { // CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer. func (p *PeersUpdateManager) CreateChannel(peerID string) chan *UpdateMessage { + start := time.Now() + + closed := false + p.channelsMux.Lock() - defer p.channelsMux.Unlock() + defer func() { + p.channelsMux.Unlock() + if p.metrics != nil { + p.metrics.UpdateChannelMetrics().CountCreateChannelDuration(time.Since(start), closed) + } + }() if channel, ok := p.peerChannels[peerID]; ok { + closed = true delete(p.peerChannels, peerID) close(channel) } @@ -58,6 +85,7 @@ func (p *PeersUpdateManager) CreateChannel(peerID string) chan *UpdateMessage { p.peerChannels[peerID] = channel log.Debugf("opened updates channel for a peer %s", peerID) + return channel } @@ -72,8 +100,16 @@ func (p *PeersUpdateManager) closeChannel(peerID string) { // CloseChannels closes updates channel for each given peer func (p *PeersUpdateManager) CloseChannels(peerIDs []string) { + start := time.Now() + p.channelsMux.Lock() - defer p.channelsMux.Unlock() + defer func() { + p.channelsMux.Unlock() + if p.metrics != nil { + p.metrics.UpdateChannelMetrics().CountCloseChannelsDuration(time.Since(start), len(peerIDs)) + } + }() + for _, id := range peerIDs { p.closeChannel(id) } @@ -81,18 +117,37 @@ func (p *PeersUpdateManager) CloseChannels(peerIDs []string) { // CloseChannel closes updates channel of a given peer func (p *PeersUpdateManager) CloseChannel(peerID string) { + start := time.Now() + p.channelsMux.Lock() - defer p.channelsMux.Unlock() + defer func() { + p.channelsMux.Unlock() + if p.metrics != nil { + p.metrics.UpdateChannelMetrics().CountCloseChannelDuration(time.Since(start)) + } + }() + p.closeChannel(peerID) } // GetAllConnectedPeers returns a copy of the connected peers map func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} { + start := time.Now() + p.channelsMux.Lock() - defer p.channelsMux.Unlock() + m := make(map[string]struct{}) + + defer func() { + p.channelsMux.Unlock() + if p.metrics != nil { + p.metrics.UpdateChannelMetrics().CountGetAllConnectedPeersDuration(time.Since(start), len(m)) + } + }() + for ID := range p.peerChannels { m[ID] = struct{}{} } + return m } diff --git a/management/server/updatechannel_test.go b/management/server/updatechannel_test.go index 6cfb4d52f..187e404c5 100644 --- a/management/server/updatechannel_test.go +++ b/management/server/updatechannel_test.go @@ -1,16 +1,17 @@ package server import ( - "github.com/netbirdio/netbird/management/proto" "testing" "time" + + "github.com/netbirdio/netbird/management/proto" ) //var peersUpdater *PeersUpdateManager func TestCreateChannel(t *testing.T) { peer := "test-create" - peersUpdater := NewPeersUpdateManager() + peersUpdater := NewPeersUpdateManager(nil) defer peersUpdater.CloseChannel(peer) _ = peersUpdater.CreateChannel(peer) @@ -21,7 +22,7 @@ func TestCreateChannel(t *testing.T) { func TestSendUpdate(t *testing.T) { peer := "test-sendupdate" - peersUpdater := NewPeersUpdateManager() + peersUpdater := NewPeersUpdateManager(nil) update1 := &UpdateMessage{Update: &proto.SyncResponse{ NetworkMap: &proto.NetworkMap{ Serial: 0, @@ -65,7 +66,7 @@ func TestSendUpdate(t *testing.T) { func TestCloseChannel(t *testing.T) { peer := "test-close" - peersUpdater := NewPeersUpdateManager() + peersUpdater := NewPeersUpdateManager(nil) _ = peersUpdater.CreateChannel(peer) if _, ok := peersUpdater.peerChannels[peer]; !ok { t.Error("Error creating the channel")