diff --git a/management/server/account.go b/management/server/account.go index f217eadb3..b57550f14 100644 --- a/management/server/account.go +++ b/management/server/account.go @@ -104,6 +104,8 @@ type DefaultAccountManager struct { accountUpdateLocks sync.Map updateAccountPeersBufferInterval atomic.Int64 + loginFilter *loginFilter + disableDefaultPolicy bool } @@ -211,6 +213,7 @@ func BuildManager( proxyController: proxyController, settingsManager: settingsManager, permissionsManager: permissionsManager, + loginFilter: newLoginFilter(), disableDefaultPolicy: disableDefaultPolicy, } @@ -1612,6 +1615,10 @@ func domainIsUpToDate(domain string, domainCategory string, userAuth nbcontext.U return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain } +func (am *DefaultAccountManager) AllowSync(wgPubKey string, metahash uint64) bool { + return am.loginFilter.allowLogin(wgPubKey, metahash) +} + func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) { start := time.Now() defer func() { @@ -1628,6 +1635,9 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err) } + metahash := metaHash(meta, realIP.String()) + am.loginFilter.addLogin(peerPubKey, metahash) + return peer, netMap, postureChecks, nil } @@ -1636,7 +1646,6 @@ func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, account if err != nil { log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err) } - return nil } diff --git a/management/server/account/manager.go b/management/server/account/manager.go index c7a39004a..c7329a1da 100644 --- a/management/server/account/manager.go +++ b/management/server/account/manager.go @@ -123,4 +123,5 @@ type Manager interface { UpdateToPrimaryAccount(ctx context.Context, accountId string) error GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error) GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error) + AllowSync(string, uint64) bool } diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 41d463ae3..95d4f9eee 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -2,9 +2,11 @@ package server import ( "context" + "errors" "fmt" "net" "net/netip" + "os" "strings" "sync" "time" @@ -38,20 +40,28 @@ import ( internalStatus "github.com/netbirdio/netbird/shared/management/status" ) +const ( + envLogBlockedPeers = "NB_LOG_BLOCKED_PEERS" + envBlockPeers = "NB_BLOCK_SAME_PEERS" +) + // GRPCServer an instance of a Management gRPC API server type GRPCServer struct { accountManager account.Manager settingsManager settings.Manager wgKey wgtypes.Key proto.UnimplementedManagementServiceServer - peersUpdateManager *PeersUpdateManager - config *nbconfig.Config - secretsManager SecretsManager - appMetrics telemetry.AppMetrics - ephemeralManager *EphemeralManager - peerLocks sync.Map - authManager auth.Manager - integratedPeerValidator integrated_validator.IntegratedValidator + peersUpdateManager *PeersUpdateManager + config *nbconfig.Config + secretsManager SecretsManager + appMetrics telemetry.AppMetrics + ephemeralManager *EphemeralManager + peerLocks sync.Map + authManager auth.Manager + + logBlockedPeers bool + blockPeersWithSameConfig bool + integratedPeerValidator integrated_validator.IntegratedValidator } // NewServer creates a new Management server @@ -82,18 +92,23 @@ func NewServer( } } + logBlockedPeers := strings.ToLower(os.Getenv(envLogBlockedPeers)) == "true" + blockPeersWithSameConfig := strings.ToLower(os.Getenv(envBlockPeers)) == "true" + return &GRPCServer{ wgKey: key, // peerKey -> event channel - peersUpdateManager: peersUpdateManager, - accountManager: accountManager, - settingsManager: settingsManager, - config: config, - secretsManager: secretsManager, - authManager: authManager, - appMetrics: appMetrics, - ephemeralManager: ephemeralManager, - integratedPeerValidator: integratedPeerValidator, + peersUpdateManager: peersUpdateManager, + accountManager: accountManager, + settingsManager: settingsManager, + config: config, + secretsManager: secretsManager, + authManager: authManager, + appMetrics: appMetrics, + ephemeralManager: ephemeralManager, + logBlockedPeers: logBlockedPeers, + blockPeersWithSameConfig: blockPeersWithSameConfig, + integratedPeerValidator: integratedPeerValidator, }, nil } @@ -136,9 +151,6 @@ func getRealIP(ctx context.Context) net.IP { // notifies the connected peer of any updates (e.g. new peers under the same account) func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error { reqStart := time.Now() - if s.appMetrics != nil { - s.appMetrics.GRPCMetrics().CountSyncRequest() - } ctx := srv.Context() @@ -147,6 +159,25 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi if err != nil { return err } + realIP := getRealIP(ctx) + sRealIP := realIP.String() + peerMeta := extractPeerMeta(ctx, syncReq.GetMeta()) + metahashed := metaHash(peerMeta, sRealIP) + if !s.accountManager.AllowSync(peerKey.String(), metahashed) { + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().CountSyncRequestBlocked() + } + if s.logBlockedPeers { + log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed) + } + if s.blockPeersWithSameConfig { + return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn) + } + } + + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().CountSyncRequest() + } // nolint:staticcheck ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String()) @@ -172,14 +203,13 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi // nolint:staticcheck ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID) - realIP := getRealIP(ctx) - log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, realIP.String()) + log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, sRealIP) if syncReq.GetMeta() == nil { log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP) } - peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP) + peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP) if err != nil { log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err) return mapError(ctx, err) @@ -345,6 +375,9 @@ func mapError(ctx context.Context, err error) error { default: } } + if errors.Is(err, internalStatus.ErrPeerAlreadyLoggedIn) { + return status.Error(codes.PermissionDenied, internalStatus.ErrPeerAlreadyLoggedIn.Error()) + } log.WithContext(ctx).Errorf("got an unhandled error: %s", err) return status.Errorf(codes.Internal, "failed handling request") } @@ -436,12 +469,9 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa // In case of the successful registration login is also successful func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) { reqStart := time.Now() - - if s.appMetrics != nil { - s.appMetrics.GRPCMetrics().CountLoginRequest() - } realIP := getRealIP(ctx) - log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String()) + sRealIP := realIP.String() + log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, sRealIP) loginReq := &proto.LoginRequest{} peerKey, err := s.parseRequest(ctx, req, loginReq) @@ -449,6 +479,24 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p return nil, err } + peerMeta := extractPeerMeta(ctx, loginReq.GetMeta()) + metahashed := metaHash(peerMeta, sRealIP) + if !s.accountManager.AllowSync(peerKey.String(), metahashed) { + if s.logBlockedPeers { + log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from login", peerKey.String(), metahashed) + } + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().CountLoginRequestBlocked() + } + if s.blockPeersWithSameConfig { + return nil, internalStatus.ErrPeerAlreadyLoggedIn + } + } + + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().CountLoginRequest() + } + //nolint ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String()) accountID, err := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String()) @@ -485,7 +533,7 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p peer, netMap, postureChecks, err := s.accountManager.LoginPeer(ctx, types.PeerLogin{ WireGuardPubKey: peerKey.String(), SSHKey: string(sshKey), - Meta: extractPeerMeta(ctx, loginReq.GetMeta()), + Meta: peerMeta, UserID: userID, SetupKey: loginReq.GetSetupKey(), ConnectionIP: realIP, diff --git a/management/server/loginfilter.go b/management/server/loginfilter.go new file mode 100644 index 000000000..8604af6e2 --- /dev/null +++ b/management/server/loginfilter.go @@ -0,0 +1,160 @@ +package server + +import ( + "hash/fnv" + "math" + "sync" + "time" + + nbpeer "github.com/netbirdio/netbird/management/server/peer" +) + +const ( + reconnThreshold = 5 * time.Minute + baseBlockDuration = 10 * time.Minute // Duration for which a peer is banned after exceeding the reconnection limit + reconnLimitForBan = 30 // Number of reconnections within the reconnTreshold that triggers a ban + metaChangeLimit = 3 // Number of reconnections with different metadata that triggers a ban of one peer +) + +type lfConfig struct { + reconnThreshold time.Duration + baseBlockDuration time.Duration + reconnLimitForBan int + metaChangeLimit int +} + +func initCfg() *lfConfig { + return &lfConfig{ + reconnThreshold: reconnThreshold, + baseBlockDuration: baseBlockDuration, + reconnLimitForBan: reconnLimitForBan, + metaChangeLimit: metaChangeLimit, + } +} + +type loginFilter struct { + mu sync.RWMutex + cfg *lfConfig + logged map[string]*peerState +} + +type peerState struct { + currentHash uint64 + sessionCounter int + sessionStart time.Time + lastSeen time.Time + isBanned bool + banLevel int + banExpiresAt time.Time + metaChangeCounter int + metaChangeWindowStart time.Time +} + +func newLoginFilter() *loginFilter { + return newLoginFilterWithCfg(initCfg()) +} + +func newLoginFilterWithCfg(cfg *lfConfig) *loginFilter { + return &loginFilter{ + logged: make(map[string]*peerState), + cfg: cfg, + } +} + +func (l *loginFilter) allowLogin(wgPubKey string, metaHash uint64) bool { + l.mu.RLock() + defer func() { + l.mu.RUnlock() + }() + state, ok := l.logged[wgPubKey] + if !ok { + return true + } + if state.isBanned && time.Now().Before(state.banExpiresAt) { + return false + } + if metaHash != state.currentHash { + if time.Now().Before(state.metaChangeWindowStart.Add(l.cfg.reconnThreshold)) && state.metaChangeCounter >= l.cfg.metaChangeLimit { + return false + } + } + return true +} + +func (l *loginFilter) addLogin(wgPubKey string, metaHash uint64) { + now := time.Now() + l.mu.Lock() + defer func() { + l.mu.Unlock() + }() + + state, ok := l.logged[wgPubKey] + + if !ok { + l.logged[wgPubKey] = &peerState{ + currentHash: metaHash, + sessionCounter: 1, + sessionStart: now, + lastSeen: now, + metaChangeWindowStart: now, + metaChangeCounter: 1, + } + return + } + + if state.isBanned && now.After(state.banExpiresAt) { + state.isBanned = false + } + + if state.banLevel > 0 && now.Sub(state.lastSeen) > (2*l.cfg.baseBlockDuration) { + state.banLevel = 0 + } + + if metaHash != state.currentHash { + if now.After(state.metaChangeWindowStart.Add(l.cfg.reconnThreshold)) { + state.metaChangeWindowStart = now + state.metaChangeCounter = 1 + } else { + state.metaChangeCounter++ + } + state.currentHash = metaHash + state.sessionCounter = 1 + state.sessionStart = now + state.lastSeen = now + return + } + + state.sessionCounter++ + if state.sessionCounter > l.cfg.reconnLimitForBan && now.Sub(state.sessionStart) < l.cfg.reconnThreshold { + state.isBanned = true + state.banLevel++ + + backoffFactor := math.Pow(2, float64(state.banLevel-1)) + duration := time.Duration(float64(l.cfg.baseBlockDuration) * backoffFactor) + state.banExpiresAt = now.Add(duration) + + state.sessionCounter = 0 + state.sessionStart = now + } + state.lastSeen = now +} + +func metaHash(meta nbpeer.PeerSystemMeta, pubip string) uint64 { + h := fnv.New64a() + + h.Write([]byte(meta.WtVersion)) + h.Write([]byte(meta.OSVersion)) + h.Write([]byte(meta.KernelVersion)) + h.Write([]byte(meta.Hostname)) + h.Write([]byte(meta.SystemSerialNumber)) + h.Write([]byte(pubip)) + + macs := uint64(0) + for _, na := range meta.NetworkAddresses { + for _, r := range na.Mac { + macs += uint64(r) + } + } + + return h.Sum64() + macs +} diff --git a/management/server/loginfilter_test.go b/management/server/loginfilter_test.go new file mode 100644 index 000000000..65782dd9d --- /dev/null +++ b/management/server/loginfilter_test.go @@ -0,0 +1,275 @@ +package server + +import ( + "hash/fnv" + "math" + "math/rand" + "strconv" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/suite" + + nbpeer "github.com/netbirdio/netbird/management/server/peer" +) + +func testAdvancedCfg() *lfConfig { + return &lfConfig{ + reconnThreshold: 50 * time.Millisecond, + baseBlockDuration: 100 * time.Millisecond, + reconnLimitForBan: 3, + metaChangeLimit: 2, + } +} + +type LoginFilterTestSuite struct { + suite.Suite + filter *loginFilter +} + +func (s *LoginFilterTestSuite) SetupTest() { + s.filter = newLoginFilterWithCfg(testAdvancedCfg()) +} + +func TestLoginFilterTestSuite(t *testing.T) { + suite.Run(t, new(LoginFilterTestSuite)) +} + +func (s *LoginFilterTestSuite) TestFirstLoginIsAlwaysAllowed() { + pubKey := "PUB_KEY_A" + meta := uint64(1) + + s.True(s.filter.allowLogin(pubKey, meta)) + + s.filter.addLogin(pubKey, meta) + s.Require().Contains(s.filter.logged, pubKey) + s.Equal(1, s.filter.logged[pubKey].sessionCounter) +} + +func (s *LoginFilterTestSuite) TestFlappingSameHashTriggersBan() { + pubKey := "PUB_KEY_A" + meta := uint64(1) + limit := s.filter.cfg.reconnLimitForBan + + for i := 0; i <= limit; i++ { + s.filter.addLogin(pubKey, meta) + } + + s.False(s.filter.allowLogin(pubKey, meta)) + s.Require().Contains(s.filter.logged, pubKey) + s.True(s.filter.logged[pubKey].isBanned) +} + +func (s *LoginFilterTestSuite) TestBanDurationIncreasesExponentially() { + pubKey := "PUB_KEY_A" + meta := uint64(1) + limit := s.filter.cfg.reconnLimitForBan + baseBan := s.filter.cfg.baseBlockDuration + + for i := 0; i <= limit; i++ { + s.filter.addLogin(pubKey, meta) + } + s.Require().Contains(s.filter.logged, pubKey) + s.True(s.filter.logged[pubKey].isBanned) + s.Equal(1, s.filter.logged[pubKey].banLevel) + firstBanDuration := s.filter.logged[pubKey].banExpiresAt.Sub(s.filter.logged[pubKey].lastSeen) + s.InDelta(baseBan, firstBanDuration, float64(time.Millisecond)) + + s.filter.logged[pubKey].banExpiresAt = time.Now().Add(-time.Second) + s.filter.logged[pubKey].isBanned = false + + for i := 0; i <= limit; i++ { + s.filter.addLogin(pubKey, meta) + } + s.True(s.filter.logged[pubKey].isBanned) + s.Equal(2, s.filter.logged[pubKey].banLevel) + secondBanDuration := s.filter.logged[pubKey].banExpiresAt.Sub(s.filter.logged[pubKey].lastSeen) + expectedSecondDuration := time.Duration(float64(baseBan) * math.Pow(2, 1)) + s.InDelta(expectedSecondDuration, secondBanDuration, float64(time.Millisecond)) +} + +func (s *LoginFilterTestSuite) TestPeerIsAllowedAfterBanExpires() { + pubKey := "PUB_KEY_A" + meta := uint64(1) + + s.filter.logged[pubKey] = &peerState{ + isBanned: true, + banExpiresAt: time.Now().Add(-(s.filter.cfg.baseBlockDuration + time.Second)), + } + + s.True(s.filter.allowLogin(pubKey, meta)) + + s.filter.addLogin(pubKey, meta) + s.Require().Contains(s.filter.logged, pubKey) + s.False(s.filter.logged[pubKey].isBanned) +} + +func (s *LoginFilterTestSuite) TestBanLevelResetsAfterGoodBehavior() { + pubKey := "PUB_KEY_A" + meta := uint64(1) + + s.filter.logged[pubKey] = &peerState{ + currentHash: meta, + banLevel: 3, + lastSeen: time.Now().Add(-3 * s.filter.cfg.baseBlockDuration), + } + + s.filter.addLogin(pubKey, meta) + s.Require().Contains(s.filter.logged, pubKey) + s.Equal(0, s.filter.logged[pubKey].banLevel) +} + +func (s *LoginFilterTestSuite) TestFlappingDifferentHashesTriggersBlock() { + pubKey := "PUB_KEY_A" + limit := s.filter.cfg.metaChangeLimit + + for i := range limit { + s.filter.addLogin(pubKey, uint64(i+1)) + } + + s.Require().Contains(s.filter.logged, pubKey) + s.Equal(limit, s.filter.logged[pubKey].metaChangeCounter) + + isAllowed := s.filter.allowLogin(pubKey, uint64(limit+1)) + + s.False(isAllowed, "should block new meta hash after limit is reached") +} + +func (s *LoginFilterTestSuite) TestMetaChangeIsAllowedAfterWindowResets() { + pubKey := "PUB_KEY_A" + meta1 := uint64(1) + meta2 := uint64(2) + meta3 := uint64(3) + + s.filter.addLogin(pubKey, meta1) + s.filter.addLogin(pubKey, meta2) + s.Require().Contains(s.filter.logged, pubKey) + s.Equal(s.filter.cfg.metaChangeLimit, s.filter.logged[pubKey].metaChangeCounter) + s.False(s.filter.allowLogin(pubKey, meta3), "should be blocked inside window") + + s.filter.logged[pubKey].metaChangeWindowStart = time.Now().Add(-(s.filter.cfg.reconnThreshold + time.Second)) + + s.True(s.filter.allowLogin(pubKey, meta3), "should be allowed after window expires") + + s.filter.addLogin(pubKey, meta3) + s.Equal(1, s.filter.logged[pubKey].metaChangeCounter, "meta change counter should reset") +} + +func BenchmarkHashingMethods(b *testing.B) { + meta := nbpeer.PeerSystemMeta{ + WtVersion: "1.25.1", + OSVersion: "Ubuntu 22.04.3 LTS", + KernelVersion: "5.15.0-76-generic", + Hostname: "prod-server-database-01", + SystemSerialNumber: "PC-1234567890", + NetworkAddresses: []nbpeer.NetworkAddress{{Mac: "00:1B:44:11:3A:B7"}, {Mac: "00:1B:44:11:3A:B8"}}, + } + pubip := "8.8.8.8" + + var resultString string + var resultUint uint64 + + b.Run("BuilderString", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + resultString = builderString(meta, pubip) + } + }) + + b.Run("FnvHashToString", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + resultString = fnvHashToString(meta, pubip) + } + }) + + b.Run("FnvHashToUint64 - used", func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + resultUint = metaHash(meta, pubip) + } + }) + + _ = resultString + _ = resultUint +} + +func fnvHashToString(meta nbpeer.PeerSystemMeta, pubip string) string { + h := fnv.New64a() + + if len(meta.NetworkAddresses) != 0 { + for _, na := range meta.NetworkAddresses { + h.Write([]byte(na.Mac)) + } + } + + h.Write([]byte(meta.WtVersion)) + h.Write([]byte(meta.OSVersion)) + h.Write([]byte(meta.KernelVersion)) + h.Write([]byte(meta.Hostname)) + h.Write([]byte(meta.SystemSerialNumber)) + h.Write([]byte(pubip)) + + return strconv.FormatUint(h.Sum64(), 16) +} + +func builderString(meta nbpeer.PeerSystemMeta, pubip string) string { + mac := getMacAddress(meta.NetworkAddresses) + estimatedSize := len(meta.WtVersion) + len(meta.OSVersion) + len(meta.KernelVersion) + len(meta.Hostname) + len(meta.SystemSerialNumber) + + len(pubip) + len(mac) + 6 + + var b strings.Builder + b.Grow(estimatedSize) + + b.WriteString(meta.WtVersion) + b.WriteByte('|') + b.WriteString(meta.OSVersion) + b.WriteByte('|') + b.WriteString(meta.KernelVersion) + b.WriteByte('|') + b.WriteString(meta.Hostname) + b.WriteByte('|') + b.WriteString(meta.SystemSerialNumber) + b.WriteByte('|') + b.WriteString(pubip) + + return b.String() +} + +func getMacAddress(nas []nbpeer.NetworkAddress) string { + if len(nas) == 0 { + return "" + } + macs := make([]string, 0, len(nas)) + for _, na := range nas { + macs = append(macs, na.Mac) + } + return strings.Join(macs, "/") +} + +func BenchmarkLoginFilter_ParallelLoad(b *testing.B) { + filter := newLoginFilterWithCfg(testAdvancedCfg()) + numKeys := 100000 + pubKeys := make([]string, numKeys) + for i := range numKeys { + pubKeys[i] = "PUB_KEY_" + strconv.Itoa(i) + } + + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + + for pb.Next() { + key := pubKeys[r.Intn(numKeys)] + meta := r.Uint64() + + if filter.allowLogin(key, meta) { + filter.addLogin(key, meta) + } + } + }) +} diff --git a/management/server/mock_server/account_mock.go b/management/server/mock_server/account_mock.go index 6f9c2696f..caba58c8b 100644 --- a/management/server/mock_server/account_mock.go +++ b/management/server/mock_server/account_mock.go @@ -121,8 +121,10 @@ type MockAccountManager struct { GetAccountOnboardingFunc func(ctx context.Context, accountID, userID string) (*types.AccountOnboarding, error) UpdateAccountOnboardingFunc func(ctx context.Context, accountID, userID string, onboarding *types.AccountOnboarding) (*types.AccountOnboarding, error) GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error) - UpdateAccountPeersFunc func(ctx context.Context, accountID string) - BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string) + + AllowSyncFunc func(string, uint64) bool + UpdateAccountPeersFunc func(ctx context.Context, accountID string) + BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string) } func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error { @@ -953,3 +955,10 @@ func (am *MockAccountManager) GetCurrentUserInfo(ctx context.Context, userAuth n } return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented") } + +func (am *MockAccountManager) AllowSync(key string, hash uint64) bool { + if am.AllowSyncFunc != nil { + return am.AllowSyncFunc(key, hash) + } + return true +} diff --git a/management/server/peer_test.go b/management/server/peer_test.go index c4822aa62..df40014f0 100644 --- a/management/server/peer_test.go +++ b/management/server/peer_test.go @@ -1609,7 +1609,6 @@ func Test_LoginPeer(t *testing.T) { testCases := []struct { name string setupKey string - wireGuardPubKey string expectExtraDNSLabelsMismatch bool extraDNSLabels []string expectLoginError bool diff --git a/management/server/telemetry/grpc_metrics.go b/management/server/telemetry/grpc_metrics.go index 9840b4b32..d4301802f 100644 --- a/management/server/telemetry/grpc_metrics.go +++ b/management/server/telemetry/grpc_metrics.go @@ -15,8 +15,10 @@ const HighLatencyThreshold = time.Second * 7 type GRPCMetrics struct { meter metric.Meter syncRequestsCounter metric.Int64Counter + syncRequestsBlockedCounter metric.Int64Counter syncRequestHighLatencyCounter metric.Int64Counter loginRequestsCounter metric.Int64Counter + loginRequestsBlockedCounter metric.Int64Counter loginRequestHighLatencyCounter metric.Int64Counter getKeyRequestsCounter metric.Int64Counter activeStreamsGauge metric.Int64ObservableGauge @@ -36,6 +38,14 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro return nil, err } + syncRequestsBlockedCounter, err := meter.Int64Counter("management.grpc.sync.request.blocked.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of sync gRPC requests from blocked peers"), + ) + if err != nil { + return nil, err + } + syncRequestHighLatencyCounter, err := meter.Int64Counter("management.grpc.sync.request.high.latency.counter", metric.WithUnit("1"), metric.WithDescription("Number of sync gRPC requests from the peers that took longer than the threshold to establish a connection and receive network map updates (update channel)"), @@ -52,6 +62,14 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro return nil, err } + loginRequestsBlockedCounter, err := meter.Int64Counter("management.grpc.login.request.blocked.counter", + metric.WithUnit("1"), + metric.WithDescription("Number of login gRPC requests from blocked peers"), + ) + if err != nil { + return nil, err + } + loginRequestHighLatencyCounter, err := meter.Int64Counter("management.grpc.login.request.high.latency.counter", metric.WithUnit("1"), metric.WithDescription("Number of login gRPC requests from the peers that took longer than the threshold to authenticate and receive initial configuration and relay credentials"), @@ -107,8 +125,10 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro return &GRPCMetrics{ meter: meter, syncRequestsCounter: syncRequestsCounter, + syncRequestsBlockedCounter: syncRequestsBlockedCounter, syncRequestHighLatencyCounter: syncRequestHighLatencyCounter, loginRequestsCounter: loginRequestsCounter, + loginRequestsBlockedCounter: loginRequestsBlockedCounter, loginRequestHighLatencyCounter: loginRequestHighLatencyCounter, getKeyRequestsCounter: getKeyRequestsCounter, activeStreamsGauge: activeStreamsGauge, @@ -124,6 +144,11 @@ func (grpcMetrics *GRPCMetrics) CountSyncRequest() { grpcMetrics.syncRequestsCounter.Add(grpcMetrics.ctx, 1) } +// CountSyncRequestBlocked counts the number of gRPC sync requests from blocked peers +func (grpcMetrics *GRPCMetrics) CountSyncRequestBlocked() { + grpcMetrics.syncRequestsBlockedCounter.Add(grpcMetrics.ctx, 1) +} + // CountGetKeyRequest counts the number of gRPC get server key requests coming to the gRPC API func (grpcMetrics *GRPCMetrics) CountGetKeyRequest() { grpcMetrics.getKeyRequestsCounter.Add(grpcMetrics.ctx, 1) @@ -134,6 +159,11 @@ func (grpcMetrics *GRPCMetrics) CountLoginRequest() { grpcMetrics.loginRequestsCounter.Add(grpcMetrics.ctx, 1) } +// CountLoginRequestBlocked counts the number of gRPC login requests from blocked peers +func (grpcMetrics *GRPCMetrics) CountLoginRequestBlocked() { + grpcMetrics.loginRequestsBlockedCounter.Add(grpcMetrics.ctx, 1) +} + // CountLoginRequestDuration counts the duration of the login gRPC requests func (grpcMetrics *GRPCMetrics) CountLoginRequestDuration(duration time.Duration, accountID string) { grpcMetrics.loginRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds()) diff --git a/shared/management/status/error.go b/shared/management/status/error.go index 7660174d6..52d27b062 100644 --- a/shared/management/status/error.go +++ b/shared/management/status/error.go @@ -42,7 +42,10 @@ const ( // Type is a type of the Error type Type int32 -var ErrExtraSettingsNotFound = fmt.Errorf("extra settings not found") +var ( + ErrExtraSettingsNotFound = errors.New("extra settings not found") + ErrPeerAlreadyLoggedIn = errors.New("peer with the same public key is already logged in") +) // Error is an internal error type Error struct {