diff --git a/management/server/auth/manager_test.go b/management/server/auth/manager_test.go index c8015eb37..81127bff1 100644 --- a/management/server/auth/manager_test.go +++ b/management/server/auth/manager_test.go @@ -231,7 +231,7 @@ func TestAuthManager_ValidateAndParseToken(t *testing.T) { return fmt.Sprintf("%s/%s", audience, name) } - lastLogin := time.Date(2025, 2, 12, 14, 25, 26, 0, time.UTC) //"2025-02-12T14:25:26.186Z" + lastLogin := time.Date(2025, 2, 12, 14, 25, 26, 0, time.UTC) // "2025-02-12T14:25:26.186Z" tests := []struct { name string diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 12b59b691..6af911e9d 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -86,7 +86,7 @@ func NewServer( if appMetrics != nil { // update gauge based on number of connected peers which is equal to open gRPC streams err = appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 { - return int64(len(peersUpdateManager.peerChannels)) + return int64(peersUpdateManager.GetChannelCount()) }) if err != nil { return nil, err @@ -241,14 +241,34 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi } // handleUpdates sends updates to the connected peer until the updates channel is closed. -func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error { +func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates *UpdateChannel, srv proto.ManagementService_SyncServer) error { log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String()) + + // Channel to receive network map updates + networkMapCh := make(chan *UpdateMessage) + + // Start goroutine to Pop from buffer + go func() { + for { + update, ok := updates.NetworkMap.Pop(ctx) + if !ok { + close(networkMapCh) + return + } + select { + case networkMapCh <- update: + case <-ctx.Done(): + return + } + } + }() + for { select { // condition when there are some updates - case update, open := <-updates: + case update, open := <-updates.Important: if s.appMetrics != nil { - s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1) + s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates.Important) + 1) } if !open { @@ -263,6 +283,17 @@ func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKe return err } + case update, ok := <-networkMapCh: + if !ok { + log.WithContext(ctx).Debugf("update buffer for peer %s closed", peerKey.String()) + s.cancelPeerRoutines(ctx, accountID, peer) + return nil + } + log.WithContext(ctx).Debugf("sending latest update to peer %s", peerKey.String()) + if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil { + return err + } + // condition when client <-> server connection has been terminated case <-srv.Context().Done(): // happens when connection drops, e.g. client disconnects diff --git a/management/server/peer.go b/management/server/peer.go index 2c96c10b5..029cb2316 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -1335,7 +1335,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting, maps.Keys(peerGroups), dnsFwdPort) am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start)) - am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update}) + am.peersUpdateManager.SendNetworkMapUpdate(ctx, p.ID, &UpdateMessage{Update: update}) }(peer) } @@ -1452,7 +1452,7 @@ func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountI dnsFwdPort := computeForwarderPort(maps.Values(account.Peers), dnsForwarderPortMinVersion) update := toSyncResponse(ctx, nil, peer, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSettings, maps.Keys(peerGroups), dnsFwdPort) - am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update}) + am.peersUpdateManager.SendNetworkMapUpdate(ctx, peer.ID, &UpdateMessage{Update: update}) } // getNextPeerExpiration returns the minimum duration in which the next peer of the account will expire if it was found. @@ -1659,7 +1659,7 @@ func deletePeers(ctx context.Context, am *DefaultAccountManager, transaction sto return nil, err } - am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{ + am.peersUpdateManager.SendImportantUpdate(ctx, peer.ID, &UpdateMessage{ Update: &proto.SyncResponse{ RemotePeers: []*proto.RemotePeerConfig{}, RemotePeersIsEmpty: true, diff --git a/management/server/telemetry/updatechannel_metrics.go b/management/server/telemetry/updatechannel_metrics.go index 2b280b352..2840bd338 100644 --- a/management/server/telemetry/updatechannel_metrics.go +++ b/management/server/telemetry/updatechannel_metrics.go @@ -22,6 +22,9 @@ type UpdateChannelMetrics struct { calcPeerNetworkMapDurationMs metric.Int64Histogram mergeNetworkMapDurationMicro metric.Int64Histogram toSyncResponseDurationMicro metric.Int64Histogram + bufferPushCounter metric.Int64Counter + bufferOverwriteCounter metric.Int64Counter + bufferIgnoreCounter metric.Int64Counter ctx context.Context } @@ -125,6 +128,27 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh return nil, err } + bufferPushCounter, err := meter.Int64Counter("management.updatechannel.buffer.push.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of updates pushed to an empty buffer")) + if err != nil { + return nil, err + } + + bufferOverwriteCounter, err := meter.Int64Counter("management.updatechannel.buffer.overwrite.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of updates overwriting old unsent updates in the buffer")) + if err != nil { + return nil, err + } + + bufferIgnoreCounter, err := meter.Int64Counter("management.updatechannel.buffer.ignore.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of updates being ignored due to old network serial")) + if err != nil { + return nil, err + } + return &UpdateChannelMetrics{ createChannelDurationMicro: createChannelDurationMicro, closeChannelDurationMicro: closeChannelDurationMicro, @@ -138,6 +162,9 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh calcPeerNetworkMapDurationMs: calcPeerNetworkMapDurationMs, mergeNetworkMapDurationMicro: mergeNetworkMapDurationMicro, toSyncResponseDurationMicro: toSyncResponseDurationMicro, + bufferPushCounter: bufferPushCounter, + bufferOverwriteCounter: bufferOverwriteCounter, + bufferIgnoreCounter: bufferIgnoreCounter, ctx: ctx, }, nil } @@ -193,3 +220,18 @@ func (metrics *UpdateChannelMetrics) CountMergeNetworkMapDuration(duration time. func (metrics *UpdateChannelMetrics) CountToSyncResponseDuration(duration time.Duration) { metrics.toSyncResponseDurationMicro.Record(metrics.ctx, duration.Microseconds()) } + +// CountBufferPush counts how many buffer push operations are happening on an empty buffer +func (metrics *UpdateChannelMetrics) CountBufferPush() { + metrics.bufferPushCounter.Add(metrics.ctx, 1) +} + +// CountBufferOverwrite counts how many buffer overwrite operations are happening on a non-empty buffer +func (metrics *UpdateChannelMetrics) CountBufferOverwrite() { + metrics.bufferOverwriteCounter.Add(metrics.ctx, 1) +} + +// CountBufferIgnore counts how many buffer ignore operations are happening when a new update is pushed +func (metrics *UpdateChannelMetrics) CountBufferIgnore() { + metrics.bufferIgnoreCounter.Add(metrics.ctx, 1) +} diff --git a/management/server/token_mgr.go b/management/server/token_mgr.go index f9293e7a8..e203c7d4c 100644 --- a/management/server/token_mgr.go +++ b/management/server/token_mgr.go @@ -227,7 +227,7 @@ func (m *TimeBasedAuthSecretsManager) pushNewTURNAndRelayTokens(ctx context.Cont m.extendNetbirdConfig(ctx, peerID, accountID, update) log.WithContext(ctx).Debugf("sending new TURN credentials to peer %s", peerID) - m.updateManager.SendUpdate(ctx, peerID, &UpdateMessage{Update: update}) + m.updateManager.SendImportantUpdate(ctx, peerID, &UpdateMessage{Update: update}) } func (m *TimeBasedAuthSecretsManager) pushNewRelayTokens(ctx context.Context, accountID, peerID string) { @@ -251,7 +251,7 @@ func (m *TimeBasedAuthSecretsManager) pushNewRelayTokens(ctx context.Context, ac m.extendNetbirdConfig(ctx, peerID, accountID, update) log.WithContext(ctx).Debugf("sending new relay credentials to peer %s", peerID) - m.updateManager.SendUpdate(ctx, peerID, &UpdateMessage{Update: update}) + m.updateManager.SendImportantUpdate(ctx, peerID, &UpdateMessage{Update: update}) } func (m *TimeBasedAuthSecretsManager) extendNetbirdConfig(ctx context.Context, peerID, accountID string, update *proto.SyncResponse) { diff --git a/management/server/update_buffer.go b/management/server/update_buffer.go new file mode 100644 index 000000000..d31be56b0 --- /dev/null +++ b/management/server/update_buffer.go @@ -0,0 +1,74 @@ +package server + +import ( + "context" + "sync" + + "github.com/netbirdio/netbird/management/server/telemetry" +) + +type UpdateBuffer struct { + mu sync.Mutex + cond *sync.Cond + update *UpdateMessage + closed bool + metrics *telemetry.UpdateChannelMetrics +} + +func NewUpdateBuffer(metrics *telemetry.UpdateChannelMetrics) *UpdateBuffer { + ub := &UpdateBuffer{metrics: metrics} + ub.cond = sync.NewCond(&ub.mu) + return ub +} + +func (b *UpdateBuffer) Push(update *UpdateMessage) { + b.mu.Lock() + defer b.mu.Unlock() + + if b.update == nil || update.Update.NetworkMap.Serial > b.update.Update.NetworkMap.Serial || b.update.Update.NetworkMap.Serial == 0 { + b.update = update + b.cond.Signal() + if b.update == nil { + b.metrics.CountBufferPush() + return + } + + b.metrics.CountBufferOverwrite() + return + } + + b.metrics.CountBufferIgnore() +} + +func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, bool) { + b.mu.Lock() + defer b.mu.Unlock() + + for b.update == nil && !b.closed { + waitCh := make(chan struct{}) + go func() { + select { + case <-ctx.Done(): + b.cond.Broadcast() + case <-waitCh: + // noop + } + }() + b.cond.Wait() + close(waitCh) + } + + if b.closed { + return nil, false + } + msg := b.update + b.update = nil + return msg, true +} + +func (b *UpdateBuffer) Close() { + b.mu.Lock() + b.closed = true + b.cond.Broadcast() + b.mu.Unlock() +} diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index adf64592a..ad46e022c 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -13,13 +13,33 @@ import ( const channelBufferSize = 100 +type UpdateChannel struct { + Important chan *UpdateMessage + NetworkMap *UpdateBuffer +} + +func NewUpdateChannel(metrics *telemetry.UpdateChannelMetrics) *UpdateChannel { + channel := make(chan *UpdateMessage, channelBufferSize) + buffer := NewUpdateBuffer(metrics) + + return &UpdateChannel{ + Important: channel, + NetworkMap: buffer, + } +} + +func (u *UpdateChannel) Close() { + close(u.Important) + u.NetworkMap.Close() +} + type UpdateMessage struct { Update *proto.SyncResponse } type PeersUpdateManager struct { // peerChannels is an update channel indexed by Peer.ID - peerChannels map[string]chan *UpdateMessage + peerChannels map[string]*UpdateChannel // channelsMux keeps the mutex to access peerChannels channelsMux *sync.RWMutex // metrics provides method to collect application metrics @@ -29,14 +49,14 @@ type PeersUpdateManager struct { // NewPeersUpdateManager returns a new instance of PeersUpdateManager func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager { return &PeersUpdateManager{ - peerChannels: make(map[string]chan *UpdateMessage), + peerChannels: make(map[string]*UpdateChannel), channelsMux: &sync.RWMutex{}, metrics: metrics, } } -// SendUpdate sends update message to the peer's channel -func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, update *UpdateMessage) { +// SendImportantUpdate sends update message to the peer that needs to be received +func (p *PeersUpdateManager) SendImportantUpdate(ctx context.Context, peerID string, update *UpdateMessage) { start := time.Now() var found, dropped bool @@ -52,19 +72,41 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda if channel, ok := p.peerChannels[peerID]; ok { found = true select { - case channel <- update: - log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID) + case channel.Important <- update: + log.WithContext(ctx).Debugf("update was sent to important channel for peer %s", peerID) default: dropped = true - log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel)) + log.WithContext(ctx).Warnf("important channel for peer %s is %d full or closed", peerID, len(channel.Important)) } } else { - log.WithContext(ctx).Debugf("peer %s has no channel", peerID) + log.WithContext(ctx).Debugf("peer %s has no important channel", peerID) + } +} + +// SendNetworkMapUpdate sends a network map update to the peer's channel +func (p *PeersUpdateManager) SendNetworkMapUpdate(ctx context.Context, peerID string, update *UpdateMessage) { + start := time.Now() + var found, dropped bool + + p.channelsMux.RLock() + + defer func() { + p.channelsMux.RUnlock() + if p.metrics != nil { + p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped) + } + }() + + if channel, ok := p.peerChannels[peerID]; ok { + found = true + channel.NetworkMap.Push(update) + } else { + log.WithContext(ctx).Debugf("peer %s has no networkmap buffer", peerID) } } // CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer. -func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage { +func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *UpdateChannel { start := time.Now() closed := false @@ -80,21 +122,21 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c if channel, ok := p.peerChannels[peerID]; ok { closed = true delete(p.peerChannels, peerID) - close(channel) + channel.Close() } - // mbragin: todo shouldn't it be more? or configurable? - channel := make(chan *UpdateMessage, channelBufferSize) - p.peerChannels[peerID] = channel + + newChannel := NewUpdateChannel(p.metrics.UpdateChannelMetrics()) + p.peerChannels[peerID] = newChannel log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID) - return channel + return newChannel } func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) { if channel, ok := p.peerChannels[peerID]; ok { delete(p.peerChannels, peerID) - close(channel) + channel.Close() log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID) return @@ -174,3 +216,10 @@ func (p *PeersUpdateManager) HasChannel(peerID string) bool { return ok } + +// GetChannelCount returns the number of active peer channels +func (p *PeersUpdateManager) GetChannelCount() int { + p.channelsMux.RLock() + defer p.channelsMux.RUnlock() + return len(p.peerChannels) +} diff --git a/management/server/updatechannel_test.go b/management/server/updatechannel_test.go index 0dc86563d..5e43070ff 100644 --- a/management/server/updatechannel_test.go +++ b/management/server/updatechannel_test.go @@ -33,15 +33,15 @@ func TestSendUpdate(t *testing.T) { if _, ok := peersUpdater.peerChannels[peer]; !ok { t.Error("Error creating the channel") } - peersUpdater.SendUpdate(context.Background(), peer, update1) + peersUpdater.SendImportantUpdate(context.Background(), peer, update1) select { - case <-peersUpdater.peerChannels[peer]: + case <-peersUpdater.peerChannels[peer].Important: default: t.Error("Update wasn't send") } for range [channelBufferSize]int{} { - peersUpdater.SendUpdate(context.Background(), peer, update1) + peersUpdater.SendImportantUpdate(context.Background(), peer, update1) } update2 := &UpdateMessage{Update: &proto.SyncResponse{ @@ -50,13 +50,13 @@ func TestSendUpdate(t *testing.T) { }, }} - peersUpdater.SendUpdate(context.Background(), peer, update2) + peersUpdater.SendImportantUpdate(context.Background(), peer, update2) timeout := time.After(5 * time.Second) for range [channelBufferSize]int{} { select { case <-timeout: t.Error("timed out reading previously sent updates") - case updateReader := <-peersUpdater.peerChannels[peer]: + case updateReader := <-peersUpdater.peerChannels[peer].Important: if updateReader.Update.NetworkMap.Serial == update2.Update.NetworkMap.Serial { t.Error("got the update that shouldn't have been sent") }