diff --git a/management/server/telemetry/updatechannel_metrics.go b/management/server/telemetry/updatechannel_metrics.go index 21b4a6085..6486f0283 100644 --- a/management/server/telemetry/updatechannel_metrics.go +++ b/management/server/telemetry/updatechannel_metrics.go @@ -22,6 +22,9 @@ type UpdateChannelMetrics struct { calcPeerNetworkMapDurationMicro metric.Int64Histogram mergeNetworkMapDurationMicro metric.Int64Histogram toSyncResponseDurationMicro metric.Int64Histogram + bufferPushCounter metric.Int64Counter + bufferOverwriteCounter metric.Int64Counter + bufferIgnoreCounter metric.Int64Counter ctx context.Context } @@ -125,6 +128,27 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh return nil, err } + bufferPushCounter, err := meter.Int64Counter("management.updatechannel.buffer.push.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of updates pushed to an empty buffer")) + if err != nil { + return nil, err + } + + bufferOverwriteCounter, err := meter.Int64Counter("management.updatechannel.buffer.overwrite.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of updates overwriting old unsent updates in the buffer")) + if err != nil { + return nil, err + } + + bufferIgnoreCounter, err := meter.Int64Counter("management.updatechannel.buffer.ignore.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of updates being ignored due to old network serial")) + if err != nil { + return nil, err + } + return &UpdateChannelMetrics{ createChannelDurationMicro: createChannelDurationMicro, closeChannelDurationMicro: closeChannelDurationMicro, @@ -138,6 +162,9 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh calcPeerNetworkMapDurationMicro: calcPeerNetworkMapDurationMicro, mergeNetworkMapDurationMicro: mergeNetworkMapDurationMicro, toSyncResponseDurationMicro: toSyncResponseDurationMicro, + bufferPushCounter: bufferPushCounter, + bufferOverwriteCounter: bufferOverwriteCounter, + bufferIgnoreCounter: bufferIgnoreCounter, ctx: ctx, }, nil } @@ -193,3 +220,18 @@ func (metrics *UpdateChannelMetrics) CountMergeNetworkMapDuration(duration time. func (metrics *UpdateChannelMetrics) CountToSyncResponseDuration(duration time.Duration) { metrics.toSyncResponseDurationMicro.Record(metrics.ctx, duration.Microseconds()) } + +// CountBufferPush counts how many buffer push operations are happening on an empty buffer +func (metrics *UpdateChannelMetrics) CountBufferPush() { + metrics.bufferPushCounter.Add(metrics.ctx, 1) +} + +// CountBufferOverwrite counts how many buffer overwrite operations are happening on a non-empty buffer +func (metrics *UpdateChannelMetrics) CountBufferOverwrite() { + metrics.bufferOverwriteCounter.Add(metrics.ctx, 1) +} + +// CountBufferIgnore counts how many buffer ignore operations are happening when a new update is pushed +func (metrics *UpdateChannelMetrics) CountBufferIgnore() { + metrics.bufferIgnoreCounter.Add(metrics.ctx, 1) +} diff --git a/management/server/update_buffer.go b/management/server/update_buffer.go index c6ce37152..9f4de6ed0 100644 --- a/management/server/update_buffer.go +++ b/management/server/update_buffer.go @@ -3,17 +3,20 @@ package server import ( "context" "sync" + + "github.com/netbirdio/netbird/management/server/telemetry" ) type UpdateBuffer struct { - mu sync.Mutex - cond *sync.Cond - update *UpdateMessage - closed bool + mu sync.Mutex + cond *sync.Cond + update *UpdateMessage + closed bool + metrics *telemetry.UpdateChannelMetrics } -func NewUpdateBuffer() *UpdateBuffer { - ub := &UpdateBuffer{} +func NewUpdateBuffer(metrics *telemetry.UpdateChannelMetrics) *UpdateBuffer { + ub := &UpdateBuffer{metrics: metrics} ub.cond = sync.NewCond(&ub.mu) return ub } @@ -22,11 +25,22 @@ func (b *UpdateBuffer) Push(update *UpdateMessage) { b.mu.Lock() defer b.mu.Unlock() - // the equal case we need because we don't always increment the serial number - if b.update == nil || update.NetworkMap.Network.Serial >= b.update.NetworkMap.Network.Serial { + if b.update == nil { b.update = update b.cond.Signal() + b.metrics.CountBufferPush() + return } + + // the equal case we need because we don't always increment the serial number + if update.NetworkMap.Network.Serial >= b.update.NetworkMap.Network.Serial { + b.update = update + b.cond.Signal() + b.metrics.CountBufferOverwrite() + return + } + + b.metrics.CountBufferIgnore() } func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, bool) { diff --git a/management/server/updatechannel.go b/management/server/updatechannel.go index 8938d52ce..0d0eb64fd 100644 --- a/management/server/updatechannel.go +++ b/management/server/updatechannel.go @@ -79,7 +79,7 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) * delete(p.peerChannels, peerID) } // mbragin: todo shouldn't it be more? or configurable? - buffer := NewUpdateBuffer() + buffer := NewUpdateBuffer(p.metrics.UpdateChannelMetrics()) log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)