add additional buffer channel

This commit is contained in:
Pascal Fischer
2025-10-08 20:42:19 +02:00
parent 8d47175cdb
commit 26f5aee4b9
8 changed files with 226 additions and 30 deletions

View File

@@ -13,13 +13,33 @@ import (
const channelBufferSize = 100
type UpdateChannel struct {
Important chan *UpdateMessage
NetworkMap *UpdateBuffer
}
func NewUpdateChannel(metrics *telemetry.UpdateChannelMetrics) *UpdateChannel {
channel := make(chan *UpdateMessage, channelBufferSize)
buffer := NewUpdateBuffer(metrics)
return &UpdateChannel{
Important: channel,
NetworkMap: buffer,
}
}
func (u *UpdateChannel) Close() {
close(u.Important)
u.NetworkMap.Close()
}
type UpdateMessage struct {
Update *proto.SyncResponse
}
type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID
peerChannels map[string]chan *UpdateMessage
peerChannels map[string]*UpdateChannel
// channelsMux keeps the mutex to access peerChannels
channelsMux *sync.RWMutex
// metrics provides method to collect application metrics
@@ -29,14 +49,14 @@ type PeersUpdateManager struct {
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage),
peerChannels: make(map[string]*UpdateChannel),
channelsMux: &sync.RWMutex{},
metrics: metrics,
}
}
// SendUpdate sends update message to the peer's channel
func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, update *UpdateMessage) {
// SendImportantUpdate sends update message to the peer that needs to be received
func (p *PeersUpdateManager) SendImportantUpdate(ctx context.Context, peerID string, update *UpdateMessage) {
start := time.Now()
var found, dropped bool
@@ -52,19 +72,41 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
if channel, ok := p.peerChannels[peerID]; ok {
found = true
select {
case channel <- update:
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
case channel.Important <- update:
log.WithContext(ctx).Debugf("update was sent to important channel for peer %s", peerID)
default:
dropped = true
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
log.WithContext(ctx).Warnf("important channel for peer %s is %d full or closed", peerID, len(channel.Important))
}
} else {
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
log.WithContext(ctx).Debugf("peer %s has no important channel", peerID)
}
}
// SendNetworkMapUpdate sends a network map update to the peer's channel
func (p *PeersUpdateManager) SendNetworkMapUpdate(ctx context.Context, peerID string, update *UpdateMessage) {
start := time.Now()
var found, dropped bool
p.channelsMux.RLock()
defer func() {
p.channelsMux.RUnlock()
if p.metrics != nil {
p.metrics.UpdateChannelMetrics().CountSendUpdateDuration(time.Since(start), found, dropped)
}
}()
if channel, ok := p.peerChannels[peerID]; ok {
found = true
channel.NetworkMap.Push(update)
} else {
log.WithContext(ctx).Debugf("peer %s has no networkmap buffer", peerID)
}
}
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *UpdateChannel {
start := time.Now()
closed := false
@@ -80,21 +122,21 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
if channel, ok := p.peerChannels[peerID]; ok {
closed = true
delete(p.peerChannels, peerID)
close(channel)
channel.Close()
}
// mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize)
p.peerChannels[peerID] = channel
newChannel := NewUpdateChannel(p.metrics.UpdateChannelMetrics())
p.peerChannels[peerID] = newChannel
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
return channel
return newChannel
}
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID)
close(channel)
channel.Close()
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
return
@@ -174,3 +216,10 @@ func (p *PeersUpdateManager) HasChannel(peerID string) bool {
return ok
}
// GetChannelCount returns the number of active peer channels
func (p *PeersUpdateManager) GetChannelCount() int {
p.channelsMux.RLock()
defer p.channelsMux.RUnlock()
return len(p.peerChannels)
}