mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
[management] monitoring updates (#4937)
This commit is contained in:
@@ -171,8 +171,6 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
|||||||
}
|
}
|
||||||
s.syncSem.Add(1)
|
s.syncSem.Add(1)
|
||||||
|
|
||||||
reqStart := time.Now()
|
|
||||||
|
|
||||||
ctx := srv.Context()
|
ctx := srv.Context()
|
||||||
|
|
||||||
syncReq := &proto.SyncRequest{}
|
syncReq := &proto.SyncRequest{}
|
||||||
@@ -190,7 +188,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
|||||||
s.appMetrics.GRPCMetrics().CountSyncRequestBlocked()
|
s.appMetrics.GRPCMetrics().CountSyncRequestBlocked()
|
||||||
}
|
}
|
||||||
if s.logBlockedPeers {
|
if s.logBlockedPeers {
|
||||||
log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed)
|
log.WithContext(ctx).Tracef("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed)
|
||||||
}
|
}
|
||||||
if s.blockPeersWithSameConfig {
|
if s.blockPeersWithSameConfig {
|
||||||
s.syncSem.Add(-1)
|
s.syncSem.Add(-1)
|
||||||
@@ -263,10 +261,6 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
|||||||
|
|
||||||
s.secretsManager.SetupRefresh(ctx, accountID, peer.ID)
|
s.secretsManager.SetupRefresh(ctx, accountID, peer.ID)
|
||||||
|
|
||||||
if s.appMetrics != nil {
|
|
||||||
s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart), accountID)
|
|
||||||
}
|
|
||||||
|
|
||||||
unlock()
|
unlock()
|
||||||
unlock = nil
|
unlock = nil
|
||||||
|
|
||||||
@@ -518,7 +512,6 @@ func (s *Server) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto
|
|||||||
reqStart := time.Now()
|
reqStart := time.Now()
|
||||||
realIP := getRealIP(ctx)
|
realIP := getRealIP(ctx)
|
||||||
sRealIP := realIP.String()
|
sRealIP := realIP.String()
|
||||||
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, sRealIP)
|
|
||||||
|
|
||||||
loginReq := &proto.LoginRequest{}
|
loginReq := &proto.LoginRequest{}
|
||||||
peerKey, err := s.parseRequest(ctx, req, loginReq)
|
peerKey, err := s.parseRequest(ctx, req, loginReq)
|
||||||
@@ -530,7 +523,7 @@ func (s *Server) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto
|
|||||||
metahashed := metaHash(peerMeta, sRealIP)
|
metahashed := metaHash(peerMeta, sRealIP)
|
||||||
if !s.loginFilter.allowLogin(peerKey.String(), metahashed) {
|
if !s.loginFilter.allowLogin(peerKey.String(), metahashed) {
|
||||||
if s.logBlockedPeers {
|
if s.logBlockedPeers {
|
||||||
log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from login", peerKey.String(), metahashed)
|
log.WithContext(ctx).Tracef("peer %s with meta hash %d is blocked from login", peerKey.String(), metahashed)
|
||||||
}
|
}
|
||||||
if s.appMetrics != nil {
|
if s.appMetrics != nil {
|
||||||
s.appMetrics.GRPCMetrics().CountLoginRequestBlocked()
|
s.appMetrics.GRPCMetrics().CountLoginRequestBlocked()
|
||||||
@@ -554,6 +547,8 @@ func (s *Server) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto
|
|||||||
//nolint
|
//nolint
|
||||||
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
|
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
|
||||||
|
|
||||||
|
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, sRealIP)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if s.appMetrics != nil {
|
if s.appMetrics != nil {
|
||||||
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart), accountID)
|
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart), accountID)
|
||||||
|
|||||||
@@ -787,6 +787,13 @@ func (am *DefaultAccountManager) loadAccount(ctx context.Context, accountID any)
|
|||||||
log.WithContext(ctx).Debugf("account %s not found in cache, reloading", accountID)
|
log.WithContext(ctx).Debugf("account %s not found in cache, reloading", accountID)
|
||||||
accountIDString := fmt.Sprintf("%v", accountID)
|
accountIDString := fmt.Sprintf("%v", accountID)
|
||||||
|
|
||||||
|
if ctx == nil {
|
||||||
|
ctx = context.Background()
|
||||||
|
}
|
||||||
|
|
||||||
|
// nolint:staticcheck
|
||||||
|
ctx = context.WithValue(ctx, nbcontext.AccountIDKey, accountID)
|
||||||
|
|
||||||
accountUsers, err := am.Store.GetAccountUsers(ctx, store.LockingStrengthNone, accountIDString)
|
accountUsers, err := am.Store.GetAccountUsers(ctx, store.LockingStrengthNone, accountIDString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ type GRPCMetrics struct {
|
|||||||
meter metric.Meter
|
meter metric.Meter
|
||||||
syncRequestsCounter metric.Int64Counter
|
syncRequestsCounter metric.Int64Counter
|
||||||
syncRequestsBlockedCounter metric.Int64Counter
|
syncRequestsBlockedCounter metric.Int64Counter
|
||||||
syncRequestHighLatencyCounter metric.Int64Counter
|
|
||||||
loginRequestsCounter metric.Int64Counter
|
loginRequestsCounter metric.Int64Counter
|
||||||
loginRequestsBlockedCounter metric.Int64Counter
|
loginRequestsBlockedCounter metric.Int64Counter
|
||||||
loginRequestHighLatencyCounter metric.Int64Counter
|
loginRequestHighLatencyCounter metric.Int64Counter
|
||||||
@@ -46,14 +45,6 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
syncRequestHighLatencyCounter, err := meter.Int64Counter("management.grpc.sync.request.high.latency.counter",
|
|
||||||
metric.WithUnit("1"),
|
|
||||||
metric.WithDescription("Number of sync gRPC requests from the peers that took longer than the threshold to establish a connection and receive network map updates (update channel)"),
|
|
||||||
)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
loginRequestsCounter, err := meter.Int64Counter("management.grpc.login.request.counter",
|
loginRequestsCounter, err := meter.Int64Counter("management.grpc.login.request.counter",
|
||||||
metric.WithUnit("1"),
|
metric.WithUnit("1"),
|
||||||
metric.WithDescription("Number of login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"),
|
metric.WithDescription("Number of login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"),
|
||||||
@@ -126,7 +117,6 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
|
|||||||
meter: meter,
|
meter: meter,
|
||||||
syncRequestsCounter: syncRequestsCounter,
|
syncRequestsCounter: syncRequestsCounter,
|
||||||
syncRequestsBlockedCounter: syncRequestsBlockedCounter,
|
syncRequestsBlockedCounter: syncRequestsBlockedCounter,
|
||||||
syncRequestHighLatencyCounter: syncRequestHighLatencyCounter,
|
|
||||||
loginRequestsCounter: loginRequestsCounter,
|
loginRequestsCounter: loginRequestsCounter,
|
||||||
loginRequestsBlockedCounter: loginRequestsBlockedCounter,
|
loginRequestsBlockedCounter: loginRequestsBlockedCounter,
|
||||||
loginRequestHighLatencyCounter: loginRequestHighLatencyCounter,
|
loginRequestHighLatencyCounter: loginRequestHighLatencyCounter,
|
||||||
@@ -172,14 +162,6 @@ func (grpcMetrics *GRPCMetrics) CountLoginRequestDuration(duration time.Duration
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// CountSyncRequestDuration counts the duration of the sync gRPC requests
|
|
||||||
func (grpcMetrics *GRPCMetrics) CountSyncRequestDuration(duration time.Duration, accountID string) {
|
|
||||||
grpcMetrics.syncRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds())
|
|
||||||
if duration > HighLatencyThreshold {
|
|
||||||
grpcMetrics.syncRequestHighLatencyCounter.Add(grpcMetrics.ctx, 1, metric.WithAttributes(attribute.String(AccountIDLabel, accountID)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// RegisterConnectedStreams registers a function that collects number of active streams and feeds it to the metrics gauge.
|
// RegisterConnectedStreams registers a function that collects number of active streams and feeds it to the metrics gauge.
|
||||||
func (grpcMetrics *GRPCMetrics) RegisterConnectedStreams(producer func() int64) error {
|
func (grpcMetrics *GRPCMetrics) RegisterConnectedStreams(producer func() int64) error {
|
||||||
_, err := grpcMetrics.meter.RegisterCallback(
|
_, err := grpcMetrics.meter.RegisterCallback(
|
||||||
|
|||||||
@@ -185,6 +185,18 @@ func (m *HTTPMiddleware) Handler(h http.Handler) http.Handler {
|
|||||||
|
|
||||||
h.ServeHTTP(w, r.WithContext(ctx))
|
h.ServeHTTP(w, r.WithContext(ctx))
|
||||||
|
|
||||||
|
userAuth, err := nbContext.GetUserAuthFromContext(r.Context())
|
||||||
|
if err == nil {
|
||||||
|
if userAuth.AccountId != "" {
|
||||||
|
//nolint
|
||||||
|
ctx = context.WithValue(ctx, nbContext.AccountIDKey, userAuth.AccountId)
|
||||||
|
}
|
||||||
|
if userAuth.UserId != "" {
|
||||||
|
//nolint
|
||||||
|
ctx = context.WithValue(ctx, nbContext.UserIDKey, userAuth.UserId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if w.Status() > 399 {
|
if w.Status() > 399 {
|
||||||
log.WithContext(ctx).Errorf("HTTP response %v: %v %v status %v", reqID, r.Method, r.URL, w.Status())
|
log.WithContext(ctx).Errorf("HTTP response %v: %v %v status %v", reqID, r.Method, r.URL, w.Status())
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user