add session id to update channel

This commit is contained in:
Pascal Fischer
2024-11-06 20:29:59 +01:00
parent b952d8693d
commit 22126d0484
15 changed files with 202 additions and 133 deletions

View File

@@ -5,6 +5,7 @@ import (
"sync"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/management/proto"
@@ -12,15 +13,22 @@ import (
)
const channelBufferSize = 100
const SessionIdForceOverwrite = "FORCE"
type UpdateMessage struct {
Update *proto.SyncResponse
NetworkMap *NetworkMap
}
type PeerUpdateChannel struct {
peerID string
sessionID string
channel chan *UpdateMessage
}
type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID
peerChannels map[string]chan *UpdateMessage
// peerChannels is a map of peerID to the channel used to deliver updates relevant to the peer
peerChannels map[string]*PeerUpdateChannel
// channelsMux keeps the mutex to access peerChannels
channelsMux *sync.RWMutex
// metrics provides method to collect application metrics
@@ -30,7 +38,7 @@ type PeersUpdateManager struct {
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage),
peerChannels: make(map[string]*PeerUpdateChannel),
channelsMux: &sync.RWMutex{},
metrics: metrics,
}
@@ -50,14 +58,14 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
}
}()
if channel, ok := p.peerChannels[peerID]; ok {
if peerUpdates, ok := p.peerChannels[peerID]; ok {
found = true
select {
case channel <- update:
case peerUpdates.channel <- update:
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
default:
dropped = true
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(peerUpdates.channel))
}
} else {
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
@@ -65,7 +73,7 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
}
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *PeerUpdateChannel {
start := time.Now()
closed := false
@@ -81,24 +89,39 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
if channel, ok := p.peerChannels[peerID]; ok {
closed = true
delete(p.peerChannels, peerID)
close(channel)
close(channel.channel)
log.WithContext(ctx).Debugf("overwriting existing channel for peer %s", peerID)
}
// mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize)
p.peerChannels[peerID] = channel
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
peerUpdateChannel := &PeerUpdateChannel{
peerID: peerID,
sessionID: uuid.New().String(),
// mbragin: todo shouldn't it be more? or configurable?
channel: make(chan *UpdateMessage, channelBufferSize),
}
return channel
p.peerChannels[peerID] = peerUpdateChannel
log.WithContext(ctx).Debugf("opened updates channel for a peer %s and session %s", peerID, peerUpdateChannel.sessionID)
return peerUpdateChannel
}
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID)
close(channel)
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string, sessionID string) bool {
if peerUpdates, ok := p.peerChannels[peerID]; ok {
if peerUpdates.sessionID == sessionID || sessionID == SessionIdForceOverwrite {
delete(p.peerChannels, peerID)
close(peerUpdates.channel)
log.WithContext(ctx).Debugf("closed updates channel of a peer %s and session %s", peerID, sessionID)
return true
}
log.WithContext(ctx).Warnf("tried to close updates channel of a peer %s with session %s, but current session is %s", peerID, sessionID, peerUpdates.sessionID)
return false
}
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
log.WithContext(ctx).Warnf("tried to close updates channel of a peer %s with session %s, but no channel found", peerID, sessionID)
return true
}
// CloseChannels closes updates channel for each given peer
@@ -114,12 +137,12 @@ func (p *PeersUpdateManager) CloseChannels(ctx context.Context, peerIDs []string
}()
for _, id := range peerIDs {
p.closeChannel(ctx, id)
p.closeChannel(ctx, id, SessionIdForceOverwrite)
}
}
// CloseChannel closes updates channel of a given peer
func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) {
func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string, sessionID string) bool {
start := time.Now()
p.channelsMux.Lock()
@@ -130,7 +153,7 @@ func (p *PeersUpdateManager) CloseChannel(ctx context.Context, peerID string) {
}
}()
p.closeChannel(ctx, peerID)
return p.closeChannel(ctx, peerID, sessionID)
}
// GetAllConnectedPeers returns a copy of the connected peers map