Compare commits

...

2 Commits

Author SHA1 Message Date
Pascal Fischer
ca46fe215a use single latest message buf 2025-10-08 17:23:28 +02:00
Pascal Fischer
e5f926fa6d remove additional network map object in update message 2025-10-08 17:11:25 +02:00
4 changed files with 122 additions and 70 deletions

View File

@@ -86,7 +86,7 @@ func NewServer(
if appMetrics != nil { if appMetrics != nil {
// update gauge based on number of connected peers which is equal to open gRPC streams // update gauge based on number of connected peers which is equal to open gRPC streams
err = appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 { err = appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 {
return int64(len(peersUpdateManager.peerChannels)) return int64(peersUpdateManager.GetChannelCount())
}) })
if err != nil { if err != nil {
return nil, err return nil, err

View File

@@ -1270,12 +1270,10 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting, maps.Keys(peerGroups), dnsFwdPort) update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting, maps.Keys(peerGroups), dnsFwdPort)
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start)) am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap}) am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update})
}(peer) }(peer)
} }
//
wg.Wait() wg.Wait()
if am.metrics != nil { if am.metrics != nil {
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart)) am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
@@ -1381,7 +1379,7 @@ func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountI
dnsFwdPort := computeForwarderPort(maps.Values(account.Peers), dnsForwarderPortMinVersion) dnsFwdPort := computeForwarderPort(maps.Values(account.Peers), dnsForwarderPortMinVersion)
update := toSyncResponse(ctx, nil, peer, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSettings, maps.Keys(peerGroups), dnsFwdPort) update := toSyncResponse(ctx, nil, peer, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSettings, maps.Keys(peerGroups), dnsFwdPort)
am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap}) am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update})
} }
// getNextPeerExpiration returns the minimum duration in which the next peer of the account will expire if it was found. // getNextPeerExpiration returns the minimum duration in which the next peer of the account will expire if it was found.
@@ -1603,7 +1601,6 @@ func deletePeers(ctx context.Context, am *DefaultAccountManager, transaction sto
}, },
}, },
}, },
NetworkMap: &types.NetworkMap{},
}) })
am.peersUpdateManager.CloseChannel(ctx, peer.ID) am.peersUpdateManager.CloseChannel(ctx, peer.ID)
peerDeletedEvents = append(peerDeletedEvents, func() { peerDeletedEvents = append(peerDeletedEvents, func() {

View File

@@ -1043,8 +1043,8 @@ func TestUpdateAccountPeers(t *testing.T) {
for _, channel := range peerChannels { for _, channel := range peerChannels {
update := <-channel update := <-channel
assert.Nil(t, update.Update.NetbirdConfig) assert.Nil(t, update.Update.NetbirdConfig)
assert.Equal(t, tc.peers, len(update.NetworkMap.Peers)) // assert.Equal(t, tc.peers, len(update.NetworkMap.Peers))
assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules)) // assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules))
} }
}) })
} }

View File

@@ -7,23 +7,25 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/shared/management/proto"
) )
const channelBufferSize = 100
type UpdateMessage struct { type UpdateMessage struct {
Update *proto.SyncResponse Update *proto.SyncResponse
NetworkMap *types.NetworkMap }
type peerUpdate struct {
mu sync.Mutex
message *UpdateMessage
notify chan struct{}
} }
type PeersUpdateManager struct { type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID // latestUpdates stores the latest update message per peer
peerChannels map[string]chan *UpdateMessage latestUpdates sync.Map // map[string]*peerUpdate
// channelsMux keeps the mutex to access peerChannels // activePeers tracks which peers have active sender goroutines
channelsMux *sync.RWMutex activePeers sync.Map // map[string]struct{}
// metrics provides method to collect application metrics // metrics provides method to collect application metrics
metrics telemetry.AppMetrics metrics telemetry.AppMetrics
} }
@@ -31,87 +33,137 @@ type PeersUpdateManager struct {
// NewPeersUpdateManager returns a new instance of PeersUpdateManager // NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager { func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{ return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage), metrics: metrics,
channelsMux: &sync.RWMutex{},
metrics: metrics,
} }
} }
// SendUpdate sends update message to the peer's channel // SendUpdate stores the latest update message for a peer and notifies the sender goroutine
func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, update *UpdateMessage) { func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, update *UpdateMessage) {
start := time.Now() start := time.Now()
var found, dropped bool var found, dropped bool
p.channelsMux.RLock()
defer func() { defer func() {
p.channelsMux.RUnlock()
if p.metrics != nil { if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped) p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped)
} }
}() }()
if channel, ok := p.peerChannels[peerID]; ok { // Check if peer has an active sender goroutine
found = true if _, ok := p.activePeers.Load(peerID); !ok {
select { log.WithContext(ctx).Debugf("peer %s has no active sender", peerID)
case channel <- update: return
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID) }
default:
dropped = true found = true
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
} // Load or create peerUpdate entry
} else { val, _ := p.latestUpdates.LoadOrStore(peerID, &peerUpdate{
log.WithContext(ctx).Debugf("peer %s has no channel", peerID) notify: make(chan struct{}, 1),
})
pu := val.(*peerUpdate)
// Store the latest message (overwrites any previous unsent message)
pu.mu.Lock()
pu.message = update
pu.mu.Unlock()
// Non-blocking notification
select {
case pu.notify <- struct{}{}:
log.WithContext(ctx).Debugf("update notification sent for peer %s", peerID)
default:
// Already notified, sender will pick up the latest message anyway
log.WithContext(ctx).Tracef("peer %s already notified, update will be picked up", peerID)
} }
} }
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer. // CreateChannel creates a sender goroutine for a given peer and returns a channel to receive updates
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage { func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
start := time.Now() start := time.Now()
closed := false closed := false
p.channelsMux.Lock()
defer func() { defer func() {
p.channelsMux.Unlock()
if p.metrics != nil { if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountCreateChannelDuration(time.Since(start), closed) p.metrics.UpdateChannelMetrics().CountCreateChannelDuration(time.Since(start), closed)
} }
}() }()
if channel, ok := p.peerChannels[peerID]; ok { // Close existing sender if any
if _, exists := p.activePeers.LoadOrStore(peerID, struct{}{}); exists {
closed = true closed = true
delete(p.peerChannels, peerID) p.closeChannel(ctx, peerID)
close(channel)
} }
// mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize)
p.peerChannels[peerID] = channel
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID) // Create peerUpdate entry with notification channel
pu := &peerUpdate{
notify: make(chan struct{}, 1),
}
p.latestUpdates.Store(peerID, pu)
return channel // Create output channel for consumer
outChan := make(chan *UpdateMessage, 1)
// Start sender goroutine
go func() {
defer close(outChan)
for {
select {
case <-ctx.Done():
log.WithContext(ctx).Debugf("sender goroutine for peer %s stopped due to context cancellation", peerID)
return
case <-pu.notify:
// Check if still active
if _, ok := p.activePeers.Load(peerID); !ok {
log.WithContext(ctx).Debugf("sender goroutine for peer %s stopped", peerID)
return
}
// Get the latest message with mutex protection
pu.mu.Lock()
msg := pu.message
pu.message = nil // Clear after reading
pu.mu.Unlock()
if msg != nil {
select {
case outChan <- msg:
log.WithContext(ctx).Tracef("sent update to peer %s", peerID)
case <-ctx.Done():
return
}
}
}
}
}()
log.WithContext(ctx).Debugf("created sender goroutine for peer %s", peerID)
return outChan
} }
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) { func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok { // Mark peer as inactive to stop the sender goroutine
delete(p.peerChannels, peerID) if _, ok := p.activePeers.LoadAndDelete(peerID); ok {
close(channel) // Close notification channel
if val, ok := p.latestUpdates.Load(peerID); ok {
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID) pu := val.(*peerUpdate)
close(pu.notify)
}
p.latestUpdates.Delete(peerID)
log.WithContext(ctx).Debugf("closed sender for peer %s", peerID)
return return
} }
log.WithContext(ctx).Debugf("closing updates channel: peer %s has no channel", peerID) log.WithContext(ctx).Debugf("closing sender: peer %s has no active sender", peerID)
} }
// CloseChannels closes updates channel for each given peer // CloseChannels closes sender goroutines for each given peer
func (p *PeersUpdateManager) CloseChannels(ctx context.Context, peerIDs []string) { func (p *PeersUpdateManager) CloseChannels(ctx context.Context, peerIDs []string) {
start := time.Now() start := time.Now()
p.channelsMux.Lock()
defer func() { defer func() {
p.channelsMux.Unlock()
if p.metrics != nil { if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountCloseChannelsDuration(time.Since(start), len(peerIDs)) p.metrics.UpdateChannelMetrics().CountCloseChannelsDuration(time.Since(start), len(peerIDs))
} }
@@ -122,13 +174,11 @@ func (p *PeersUpdateManager) CloseChannels(ctx context.Context, peerIDs []string
} }
} }
// CloseChannel closes updates channel of a given peer // CloseChannel closes the sender goroutine of a given peer
func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) { func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) {
start := time.Now() start := time.Now()
p.channelsMux.Lock()
defer func() { defer func() {
p.channelsMux.Unlock()
if p.metrics != nil { if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountCloseChannelDuration(time.Since(start)) p.metrics.UpdateChannelMetrics().CountCloseChannelDuration(time.Since(start))
} }
@@ -141,38 +191,43 @@ func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) {
func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} { func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} {
start := time.Now() start := time.Now()
p.channelsMux.RLock()
m := make(map[string]struct{}) m := make(map[string]struct{})
defer func() { defer func() {
p.channelsMux.RUnlock()
if p.metrics != nil { if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountGetAllConnectedPeersDuration(time.Since(start), len(m)) p.metrics.UpdateChannelMetrics().CountGetAllConnectedPeersDuration(time.Since(start), len(m))
} }
}() }()
for ID := range p.peerChannels { p.activePeers.Range(func(key, value interface{}) bool {
m[ID] = struct{}{} m[key.(string)] = struct{}{}
} return true
})
return m return m
} }
// HasChannel returns true if peers has channel in update manager, otherwise false // HasChannel returns true if peer has an active sender goroutine, otherwise false
func (p *PeersUpdateManager) HasChannel(peerID string) bool { func (p *PeersUpdateManager) HasChannel(peerID string) bool {
start := time.Now() start := time.Now()
p.channelsMux.RLock()
defer func() { defer func() {
p.channelsMux.RUnlock()
if p.metrics != nil { if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start)) p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start))
} }
}() }()
_, ok := p.peerChannels[peerID] _, ok := p.activePeers.Load(peerID)
return ok return ok
} }
// GetChannelCount returns the number of active peer channels
func (p *PeersUpdateManager) GetChannelCount() int {
count := 0
p.activePeers.Range(func(key, value interface{}) bool {
count++
return true
})
return count
}