mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-19 16:56:39 +00:00
Compare commits
2 Commits
trigger-pr
...
test/updat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ca46fe215a | ||
|
|
e5f926fa6d |
@@ -86,7 +86,7 @@ func NewServer(
|
|||||||
if appMetrics != nil {
|
if appMetrics != nil {
|
||||||
// update gauge based on number of connected peers which is equal to open gRPC streams
|
// update gauge based on number of connected peers which is equal to open gRPC streams
|
||||||
err = appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 {
|
err = appMetrics.GRPCMetrics().RegisterConnectedStreams(func() int64 {
|
||||||
return int64(len(peersUpdateManager.peerChannels))
|
return int64(peersUpdateManager.GetChannelCount())
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|||||||
@@ -1270,12 +1270,10 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting, maps.Keys(peerGroups), dnsFwdPort)
|
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting, maps.Keys(peerGroups), dnsFwdPort)
|
||||||
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
|
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
|
||||||
|
|
||||||
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
|
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update})
|
||||||
}(peer)
|
}(peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
if am.metrics != nil {
|
if am.metrics != nil {
|
||||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
||||||
@@ -1381,7 +1379,7 @@ func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountI
|
|||||||
dnsFwdPort := computeForwarderPort(maps.Values(account.Peers), dnsForwarderPortMinVersion)
|
dnsFwdPort := computeForwarderPort(maps.Values(account.Peers), dnsForwarderPortMinVersion)
|
||||||
|
|
||||||
update := toSyncResponse(ctx, nil, peer, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSettings, maps.Keys(peerGroups), dnsFwdPort)
|
update := toSyncResponse(ctx, nil, peer, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSettings, maps.Keys(peerGroups), dnsFwdPort)
|
||||||
am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
|
am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update})
|
||||||
}
|
}
|
||||||
|
|
||||||
// getNextPeerExpiration returns the minimum duration in which the next peer of the account will expire if it was found.
|
// getNextPeerExpiration returns the minimum duration in which the next peer of the account will expire if it was found.
|
||||||
@@ -1603,7 +1601,6 @@ func deletePeers(ctx context.Context, am *DefaultAccountManager, transaction sto
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
NetworkMap: &types.NetworkMap{},
|
|
||||||
})
|
})
|
||||||
am.peersUpdateManager.CloseChannel(ctx, peer.ID)
|
am.peersUpdateManager.CloseChannel(ctx, peer.ID)
|
||||||
peerDeletedEvents = append(peerDeletedEvents, func() {
|
peerDeletedEvents = append(peerDeletedEvents, func() {
|
||||||
|
|||||||
@@ -1043,8 +1043,8 @@ func TestUpdateAccountPeers(t *testing.T) {
|
|||||||
for _, channel := range peerChannels {
|
for _, channel := range peerChannels {
|
||||||
update := <-channel
|
update := <-channel
|
||||||
assert.Nil(t, update.Update.NetbirdConfig)
|
assert.Nil(t, update.Update.NetbirdConfig)
|
||||||
assert.Equal(t, tc.peers, len(update.NetworkMap.Peers))
|
// assert.Equal(t, tc.peers, len(update.NetworkMap.Peers))
|
||||||
assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules))
|
// assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules))
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -7,23 +7,25 @@ import (
|
|||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/shared/management/proto"
|
|
||||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||||
"github.com/netbirdio/netbird/management/server/types"
|
"github.com/netbirdio/netbird/shared/management/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
const channelBufferSize = 100
|
|
||||||
|
|
||||||
type UpdateMessage struct {
|
type UpdateMessage struct {
|
||||||
Update *proto.SyncResponse
|
Update *proto.SyncResponse
|
||||||
NetworkMap *types.NetworkMap
|
}
|
||||||
|
|
||||||
|
type peerUpdate struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
message *UpdateMessage
|
||||||
|
notify chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type PeersUpdateManager struct {
|
type PeersUpdateManager struct {
|
||||||
// peerChannels is an update channel indexed by Peer.ID
|
// latestUpdates stores the latest update message per peer
|
||||||
peerChannels map[string]chan *UpdateMessage
|
latestUpdates sync.Map // map[string]*peerUpdate
|
||||||
// channelsMux keeps the mutex to access peerChannels
|
// activePeers tracks which peers have active sender goroutines
|
||||||
channelsMux *sync.RWMutex
|
activePeers sync.Map // map[string]struct{}
|
||||||
// metrics provides method to collect application metrics
|
// metrics provides method to collect application metrics
|
||||||
metrics telemetry.AppMetrics
|
metrics telemetry.AppMetrics
|
||||||
}
|
}
|
||||||
@@ -31,87 +33,137 @@ type PeersUpdateManager struct {
|
|||||||
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
|
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
|
||||||
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
|
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
|
||||||
return &PeersUpdateManager{
|
return &PeersUpdateManager{
|
||||||
peerChannels: make(map[string]chan *UpdateMessage),
|
metrics: metrics,
|
||||||
channelsMux: &sync.RWMutex{},
|
|
||||||
metrics: metrics,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// SendUpdate sends update message to the peer's channel
|
// SendUpdate stores the latest update message for a peer and notifies the sender goroutine
|
||||||
func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, update *UpdateMessage) {
|
func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, update *UpdateMessage) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
var found, dropped bool
|
var found, dropped bool
|
||||||
|
|
||||||
p.channelsMux.RLock()
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
p.channelsMux.RUnlock()
|
|
||||||
if p.metrics != nil {
|
if p.metrics != nil {
|
||||||
p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped)
|
p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if channel, ok := p.peerChannels[peerID]; ok {
|
// Check if peer has an active sender goroutine
|
||||||
found = true
|
if _, ok := p.activePeers.Load(peerID); !ok {
|
||||||
select {
|
log.WithContext(ctx).Debugf("peer %s has no active sender", peerID)
|
||||||
case channel <- update:
|
return
|
||||||
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
|
}
|
||||||
default:
|
|
||||||
dropped = true
|
found = true
|
||||||
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
|
|
||||||
}
|
// Load or create peerUpdate entry
|
||||||
} else {
|
val, _ := p.latestUpdates.LoadOrStore(peerID, &peerUpdate{
|
||||||
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
|
notify: make(chan struct{}, 1),
|
||||||
|
})
|
||||||
|
|
||||||
|
pu := val.(*peerUpdate)
|
||||||
|
|
||||||
|
// Store the latest message (overwrites any previous unsent message)
|
||||||
|
pu.mu.Lock()
|
||||||
|
pu.message = update
|
||||||
|
pu.mu.Unlock()
|
||||||
|
|
||||||
|
// Non-blocking notification
|
||||||
|
select {
|
||||||
|
case pu.notify <- struct{}{}:
|
||||||
|
log.WithContext(ctx).Debugf("update notification sent for peer %s", peerID)
|
||||||
|
default:
|
||||||
|
// Already notified, sender will pick up the latest message anyway
|
||||||
|
log.WithContext(ctx).Tracef("peer %s already notified, update will be picked up", peerID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
|
// CreateChannel creates a sender goroutine for a given peer and returns a channel to receive updates
|
||||||
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
|
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
closed := false
|
closed := false
|
||||||
|
|
||||||
p.channelsMux.Lock()
|
|
||||||
defer func() {
|
defer func() {
|
||||||
p.channelsMux.Unlock()
|
|
||||||
if p.metrics != nil {
|
if p.metrics != nil {
|
||||||
p.metrics.UpdateChannelMetrics().CountCreateChannelDuration(time.Since(start), closed)
|
p.metrics.UpdateChannelMetrics().CountCreateChannelDuration(time.Since(start), closed)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if channel, ok := p.peerChannels[peerID]; ok {
|
// Close existing sender if any
|
||||||
|
if _, exists := p.activePeers.LoadOrStore(peerID, struct{}{}); exists {
|
||||||
closed = true
|
closed = true
|
||||||
delete(p.peerChannels, peerID)
|
p.closeChannel(ctx, peerID)
|
||||||
close(channel)
|
|
||||||
}
|
}
|
||||||
// mbragin: todo shouldn't it be more? or configurable?
|
|
||||||
channel := make(chan *UpdateMessage, channelBufferSize)
|
|
||||||
p.peerChannels[peerID] = channel
|
|
||||||
|
|
||||||
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
|
// Create peerUpdate entry with notification channel
|
||||||
|
pu := &peerUpdate{
|
||||||
|
notify: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
p.latestUpdates.Store(peerID, pu)
|
||||||
|
|
||||||
return channel
|
// Create output channel for consumer
|
||||||
|
outChan := make(chan *UpdateMessage, 1)
|
||||||
|
|
||||||
|
// Start sender goroutine
|
||||||
|
go func() {
|
||||||
|
defer close(outChan)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.WithContext(ctx).Debugf("sender goroutine for peer %s stopped due to context cancellation", peerID)
|
||||||
|
return
|
||||||
|
case <-pu.notify:
|
||||||
|
// Check if still active
|
||||||
|
if _, ok := p.activePeers.Load(peerID); !ok {
|
||||||
|
log.WithContext(ctx).Debugf("sender goroutine for peer %s stopped", peerID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the latest message with mutex protection
|
||||||
|
pu.mu.Lock()
|
||||||
|
msg := pu.message
|
||||||
|
pu.message = nil // Clear after reading
|
||||||
|
pu.mu.Unlock()
|
||||||
|
|
||||||
|
if msg != nil {
|
||||||
|
select {
|
||||||
|
case outChan <- msg:
|
||||||
|
log.WithContext(ctx).Tracef("sent update to peer %s", peerID)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.WithContext(ctx).Debugf("created sender goroutine for peer %s", peerID)
|
||||||
|
|
||||||
|
return outChan
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
|
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
|
||||||
if channel, ok := p.peerChannels[peerID]; ok {
|
// Mark peer as inactive to stop the sender goroutine
|
||||||
delete(p.peerChannels, peerID)
|
if _, ok := p.activePeers.LoadAndDelete(peerID); ok {
|
||||||
close(channel)
|
// Close notification channel
|
||||||
|
if val, ok := p.latestUpdates.Load(peerID); ok {
|
||||||
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
|
pu := val.(*peerUpdate)
|
||||||
|
close(pu.notify)
|
||||||
|
}
|
||||||
|
p.latestUpdates.Delete(peerID)
|
||||||
|
log.WithContext(ctx).Debugf("closed sender for peer %s", peerID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.WithContext(ctx).Debugf("closing updates channel: peer %s has no channel", peerID)
|
log.WithContext(ctx).Debugf("closing sender: peer %s has no active sender", peerID)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CloseChannels closes updates channel for each given peer
|
// CloseChannels closes sender goroutines for each given peer
|
||||||
func (p *PeersUpdateManager) CloseChannels(ctx context.Context, peerIDs []string) {
|
func (p *PeersUpdateManager) CloseChannels(ctx context.Context, peerIDs []string) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
p.channelsMux.Lock()
|
|
||||||
defer func() {
|
defer func() {
|
||||||
p.channelsMux.Unlock()
|
|
||||||
if p.metrics != nil {
|
if p.metrics != nil {
|
||||||
p.metrics.UpdateChannelMetrics().CountCloseChannelsDuration(time.Since(start), len(peerIDs))
|
p.metrics.UpdateChannelMetrics().CountCloseChannelsDuration(time.Since(start), len(peerIDs))
|
||||||
}
|
}
|
||||||
@@ -122,13 +174,11 @@ func (p *PeersUpdateManager) CloseChannels(ctx context.Context, peerIDs []string
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CloseChannel closes updates channel of a given peer
|
// CloseChannel closes the sender goroutine of a given peer
|
||||||
func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) {
|
func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
p.channelsMux.Lock()
|
|
||||||
defer func() {
|
defer func() {
|
||||||
p.channelsMux.Unlock()
|
|
||||||
if p.metrics != nil {
|
if p.metrics != nil {
|
||||||
p.metrics.UpdateChannelMetrics().CountCloseChannelDuration(time.Since(start))
|
p.metrics.UpdateChannelMetrics().CountCloseChannelDuration(time.Since(start))
|
||||||
}
|
}
|
||||||
@@ -141,38 +191,43 @@ func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) {
|
|||||||
func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} {
|
func (p *PeersUpdateManager) GetAllConnectedPeers() map[string]struct{} {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
p.channelsMux.RLock()
|
|
||||||
|
|
||||||
m := make(map[string]struct{})
|
m := make(map[string]struct{})
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
p.channelsMux.RUnlock()
|
|
||||||
if p.metrics != nil {
|
if p.metrics != nil {
|
||||||
p.metrics.UpdateChannelMetrics().CountGetAllConnectedPeersDuration(time.Since(start), len(m))
|
p.metrics.UpdateChannelMetrics().CountGetAllConnectedPeersDuration(time.Since(start), len(m))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
for ID := range p.peerChannels {
|
p.activePeers.Range(func(key, value interface{}) bool {
|
||||||
m[ID] = struct{}{}
|
m[key.(string)] = struct{}{}
|
||||||
}
|
return true
|
||||||
|
})
|
||||||
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
|
|
||||||
// HasChannel returns true if peers has channel in update manager, otherwise false
|
// HasChannel returns true if peer has an active sender goroutine, otherwise false
|
||||||
func (p *PeersUpdateManager) HasChannel(peerID string) bool {
|
func (p *PeersUpdateManager) HasChannel(peerID string) bool {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
p.channelsMux.RLock()
|
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
p.channelsMux.RUnlock()
|
|
||||||
if p.metrics != nil {
|
if p.metrics != nil {
|
||||||
p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start))
|
p.metrics.UpdateChannelMetrics().CountHasChannelDuration(time.Since(start))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
_, ok := p.peerChannels[peerID]
|
_, ok := p.activePeers.Load(peerID)
|
||||||
|
|
||||||
return ok
|
return ok
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetChannelCount returns the number of active peer channels
|
||||||
|
func (p *PeersUpdateManager) GetChannelCount() int {
|
||||||
|
count := 0
|
||||||
|
p.activePeers.Range(func(key, value interface{}) bool {
|
||||||
|
count++
|
||||||
|
return true
|
||||||
|
})
|
||||||
|
return count
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user