mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 15:26:40 +00:00
[management] login filter to fix multiple peers connected with the same pub key (#3986)
This commit is contained in:
@@ -104,6 +104,8 @@ type DefaultAccountManager struct {
|
||||
accountUpdateLocks sync.Map
|
||||
updateAccountPeersBufferInterval atomic.Int64
|
||||
|
||||
loginFilter *loginFilter
|
||||
|
||||
disableDefaultPolicy bool
|
||||
}
|
||||
|
||||
@@ -211,6 +213,7 @@ func BuildManager(
|
||||
proxyController: proxyController,
|
||||
settingsManager: settingsManager,
|
||||
permissionsManager: permissionsManager,
|
||||
loginFilter: newLoginFilter(),
|
||||
disableDefaultPolicy: disableDefaultPolicy,
|
||||
}
|
||||
|
||||
@@ -1612,6 +1615,10 @@ func domainIsUpToDate(domain string, domainCategory string, userAuth nbcontext.U
|
||||
return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) AllowSync(wgPubKey string, metahash uint64) bool {
|
||||
return am.loginFilter.allowLogin(wgPubKey, metahash)
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
@@ -1628,6 +1635,9 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
|
||||
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
|
||||
}
|
||||
|
||||
metahash := metaHash(meta, realIP.String())
|
||||
am.loginFilter.addLogin(peerPubKey, metahash)
|
||||
|
||||
return peer, netMap, postureChecks, nil
|
||||
}
|
||||
|
||||
@@ -1636,7 +1646,6 @@ func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, account
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -123,4 +123,5 @@ type Manager interface {
|
||||
UpdateToPrimaryAccount(ctx context.Context, accountId string) error
|
||||
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
|
||||
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
|
||||
AllowSync(string, uint64) bool
|
||||
}
|
||||
|
||||
@@ -2,9 +2,11 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"os"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -38,20 +40,28 @@ import (
|
||||
internalStatus "github.com/netbirdio/netbird/shared/management/status"
|
||||
)
|
||||
|
||||
const (
|
||||
envLogBlockedPeers = "NB_LOG_BLOCKED_PEERS"
|
||||
envBlockPeers = "NB_BLOCK_SAME_PEERS"
|
||||
)
|
||||
|
||||
// GRPCServer an instance of a Management gRPC API server
|
||||
type GRPCServer struct {
|
||||
accountManager account.Manager
|
||||
settingsManager settings.Manager
|
||||
wgKey wgtypes.Key
|
||||
proto.UnimplementedManagementServiceServer
|
||||
peersUpdateManager *PeersUpdateManager
|
||||
config *nbconfig.Config
|
||||
secretsManager SecretsManager
|
||||
appMetrics telemetry.AppMetrics
|
||||
ephemeralManager *EphemeralManager
|
||||
peerLocks sync.Map
|
||||
authManager auth.Manager
|
||||
integratedPeerValidator integrated_validator.IntegratedValidator
|
||||
peersUpdateManager *PeersUpdateManager
|
||||
config *nbconfig.Config
|
||||
secretsManager SecretsManager
|
||||
appMetrics telemetry.AppMetrics
|
||||
ephemeralManager *EphemeralManager
|
||||
peerLocks sync.Map
|
||||
authManager auth.Manager
|
||||
|
||||
logBlockedPeers bool
|
||||
blockPeersWithSameConfig bool
|
||||
integratedPeerValidator integrated_validator.IntegratedValidator
|
||||
}
|
||||
|
||||
// NewServer creates a new Management server
|
||||
@@ -82,18 +92,23 @@ func NewServer(
|
||||
}
|
||||
}
|
||||
|
||||
logBlockedPeers := strings.ToLower(os.Getenv(envLogBlockedPeers)) == "true"
|
||||
blockPeersWithSameConfig := strings.ToLower(os.Getenv(envBlockPeers)) == "true"
|
||||
|
||||
return &GRPCServer{
|
||||
wgKey: key,
|
||||
// peerKey -> event channel
|
||||
peersUpdateManager: peersUpdateManager,
|
||||
accountManager: accountManager,
|
||||
settingsManager: settingsManager,
|
||||
config: config,
|
||||
secretsManager: secretsManager,
|
||||
authManager: authManager,
|
||||
appMetrics: appMetrics,
|
||||
ephemeralManager: ephemeralManager,
|
||||
integratedPeerValidator: integratedPeerValidator,
|
||||
peersUpdateManager: peersUpdateManager,
|
||||
accountManager: accountManager,
|
||||
settingsManager: settingsManager,
|
||||
config: config,
|
||||
secretsManager: secretsManager,
|
||||
authManager: authManager,
|
||||
appMetrics: appMetrics,
|
||||
ephemeralManager: ephemeralManager,
|
||||
logBlockedPeers: logBlockedPeers,
|
||||
blockPeersWithSameConfig: blockPeersWithSameConfig,
|
||||
integratedPeerValidator: integratedPeerValidator,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -136,9 +151,6 @@ func getRealIP(ctx context.Context) net.IP {
|
||||
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
||||
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
||||
reqStart := time.Now()
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountSyncRequest()
|
||||
}
|
||||
|
||||
ctx := srv.Context()
|
||||
|
||||
@@ -147,6 +159,25 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
realIP := getRealIP(ctx)
|
||||
sRealIP := realIP.String()
|
||||
peerMeta := extractPeerMeta(ctx, syncReq.GetMeta())
|
||||
metahashed := metaHash(peerMeta, sRealIP)
|
||||
if !s.accountManager.AllowSync(peerKey.String(), metahashed) {
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountSyncRequestBlocked()
|
||||
}
|
||||
if s.logBlockedPeers {
|
||||
log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed)
|
||||
}
|
||||
if s.blockPeersWithSameConfig {
|
||||
return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn)
|
||||
}
|
||||
}
|
||||
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountSyncRequest()
|
||||
}
|
||||
|
||||
// nolint:staticcheck
|
||||
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
|
||||
@@ -172,14 +203,13 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
||||
// nolint:staticcheck
|
||||
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
|
||||
|
||||
realIP := getRealIP(ctx)
|
||||
log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, realIP.String())
|
||||
log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, sRealIP)
|
||||
|
||||
if syncReq.GetMeta() == nil {
|
||||
log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP)
|
||||
}
|
||||
|
||||
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP)
|
||||
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
|
||||
return mapError(ctx, err)
|
||||
@@ -345,6 +375,9 @@ func mapError(ctx context.Context, err error) error {
|
||||
default:
|
||||
}
|
||||
}
|
||||
if errors.Is(err, internalStatus.ErrPeerAlreadyLoggedIn) {
|
||||
return status.Error(codes.PermissionDenied, internalStatus.ErrPeerAlreadyLoggedIn.Error())
|
||||
}
|
||||
log.WithContext(ctx).Errorf("got an unhandled error: %s", err)
|
||||
return status.Errorf(codes.Internal, "failed handling request")
|
||||
}
|
||||
@@ -436,12 +469,9 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
|
||||
// In case of the successful registration login is also successful
|
||||
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
||||
reqStart := time.Now()
|
||||
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||
}
|
||||
realIP := getRealIP(ctx)
|
||||
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())
|
||||
sRealIP := realIP.String()
|
||||
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, sRealIP)
|
||||
|
||||
loginReq := &proto.LoginRequest{}
|
||||
peerKey, err := s.parseRequest(ctx, req, loginReq)
|
||||
@@ -449,6 +479,24 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
|
||||
return nil, err
|
||||
}
|
||||
|
||||
peerMeta := extractPeerMeta(ctx, loginReq.GetMeta())
|
||||
metahashed := metaHash(peerMeta, sRealIP)
|
||||
if !s.accountManager.AllowSync(peerKey.String(), metahashed) {
|
||||
if s.logBlockedPeers {
|
||||
log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from login", peerKey.String(), metahashed)
|
||||
}
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequestBlocked()
|
||||
}
|
||||
if s.blockPeersWithSameConfig {
|
||||
return nil, internalStatus.ErrPeerAlreadyLoggedIn
|
||||
}
|
||||
}
|
||||
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||
}
|
||||
|
||||
//nolint
|
||||
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
|
||||
accountID, err := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String())
|
||||
@@ -485,7 +533,7 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
|
||||
peer, netMap, postureChecks, err := s.accountManager.LoginPeer(ctx, types.PeerLogin{
|
||||
WireGuardPubKey: peerKey.String(),
|
||||
SSHKey: string(sshKey),
|
||||
Meta: extractPeerMeta(ctx, loginReq.GetMeta()),
|
||||
Meta: peerMeta,
|
||||
UserID: userID,
|
||||
SetupKey: loginReq.GetSetupKey(),
|
||||
ConnectionIP: realIP,
|
||||
|
||||
160
management/server/loginfilter.go
Normal file
160
management/server/loginfilter.go
Normal file
@@ -0,0 +1,160 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"hash/fnv"
|
||||
"math"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
)
|
||||
|
||||
const (
|
||||
reconnThreshold = 5 * time.Minute
|
||||
baseBlockDuration = 10 * time.Minute // Duration for which a peer is banned after exceeding the reconnection limit
|
||||
reconnLimitForBan = 30 // Number of reconnections within the reconnTreshold that triggers a ban
|
||||
metaChangeLimit = 3 // Number of reconnections with different metadata that triggers a ban of one peer
|
||||
)
|
||||
|
||||
type lfConfig struct {
|
||||
reconnThreshold time.Duration
|
||||
baseBlockDuration time.Duration
|
||||
reconnLimitForBan int
|
||||
metaChangeLimit int
|
||||
}
|
||||
|
||||
func initCfg() *lfConfig {
|
||||
return &lfConfig{
|
||||
reconnThreshold: reconnThreshold,
|
||||
baseBlockDuration: baseBlockDuration,
|
||||
reconnLimitForBan: reconnLimitForBan,
|
||||
metaChangeLimit: metaChangeLimit,
|
||||
}
|
||||
}
|
||||
|
||||
type loginFilter struct {
|
||||
mu sync.RWMutex
|
||||
cfg *lfConfig
|
||||
logged map[string]*peerState
|
||||
}
|
||||
|
||||
type peerState struct {
|
||||
currentHash uint64
|
||||
sessionCounter int
|
||||
sessionStart time.Time
|
||||
lastSeen time.Time
|
||||
isBanned bool
|
||||
banLevel int
|
||||
banExpiresAt time.Time
|
||||
metaChangeCounter int
|
||||
metaChangeWindowStart time.Time
|
||||
}
|
||||
|
||||
func newLoginFilter() *loginFilter {
|
||||
return newLoginFilterWithCfg(initCfg())
|
||||
}
|
||||
|
||||
func newLoginFilterWithCfg(cfg *lfConfig) *loginFilter {
|
||||
return &loginFilter{
|
||||
logged: make(map[string]*peerState),
|
||||
cfg: cfg,
|
||||
}
|
||||
}
|
||||
|
||||
func (l *loginFilter) allowLogin(wgPubKey string, metaHash uint64) bool {
|
||||
l.mu.RLock()
|
||||
defer func() {
|
||||
l.mu.RUnlock()
|
||||
}()
|
||||
state, ok := l.logged[wgPubKey]
|
||||
if !ok {
|
||||
return true
|
||||
}
|
||||
if state.isBanned && time.Now().Before(state.banExpiresAt) {
|
||||
return false
|
||||
}
|
||||
if metaHash != state.currentHash {
|
||||
if time.Now().Before(state.metaChangeWindowStart.Add(l.cfg.reconnThreshold)) && state.metaChangeCounter >= l.cfg.metaChangeLimit {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (l *loginFilter) addLogin(wgPubKey string, metaHash uint64) {
|
||||
now := time.Now()
|
||||
l.mu.Lock()
|
||||
defer func() {
|
||||
l.mu.Unlock()
|
||||
}()
|
||||
|
||||
state, ok := l.logged[wgPubKey]
|
||||
|
||||
if !ok {
|
||||
l.logged[wgPubKey] = &peerState{
|
||||
currentHash: metaHash,
|
||||
sessionCounter: 1,
|
||||
sessionStart: now,
|
||||
lastSeen: now,
|
||||
metaChangeWindowStart: now,
|
||||
metaChangeCounter: 1,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
if state.isBanned && now.After(state.banExpiresAt) {
|
||||
state.isBanned = false
|
||||
}
|
||||
|
||||
if state.banLevel > 0 && now.Sub(state.lastSeen) > (2*l.cfg.baseBlockDuration) {
|
||||
state.banLevel = 0
|
||||
}
|
||||
|
||||
if metaHash != state.currentHash {
|
||||
if now.After(state.metaChangeWindowStart.Add(l.cfg.reconnThreshold)) {
|
||||
state.metaChangeWindowStart = now
|
||||
state.metaChangeCounter = 1
|
||||
} else {
|
||||
state.metaChangeCounter++
|
||||
}
|
||||
state.currentHash = metaHash
|
||||
state.sessionCounter = 1
|
||||
state.sessionStart = now
|
||||
state.lastSeen = now
|
||||
return
|
||||
}
|
||||
|
||||
state.sessionCounter++
|
||||
if state.sessionCounter > l.cfg.reconnLimitForBan && now.Sub(state.sessionStart) < l.cfg.reconnThreshold {
|
||||
state.isBanned = true
|
||||
state.banLevel++
|
||||
|
||||
backoffFactor := math.Pow(2, float64(state.banLevel-1))
|
||||
duration := time.Duration(float64(l.cfg.baseBlockDuration) * backoffFactor)
|
||||
state.banExpiresAt = now.Add(duration)
|
||||
|
||||
state.sessionCounter = 0
|
||||
state.sessionStart = now
|
||||
}
|
||||
state.lastSeen = now
|
||||
}
|
||||
|
||||
func metaHash(meta nbpeer.PeerSystemMeta, pubip string) uint64 {
|
||||
h := fnv.New64a()
|
||||
|
||||
h.Write([]byte(meta.WtVersion))
|
||||
h.Write([]byte(meta.OSVersion))
|
||||
h.Write([]byte(meta.KernelVersion))
|
||||
h.Write([]byte(meta.Hostname))
|
||||
h.Write([]byte(meta.SystemSerialNumber))
|
||||
h.Write([]byte(pubip))
|
||||
|
||||
macs := uint64(0)
|
||||
for _, na := range meta.NetworkAddresses {
|
||||
for _, r := range na.Mac {
|
||||
macs += uint64(r)
|
||||
}
|
||||
}
|
||||
|
||||
return h.Sum64() + macs
|
||||
}
|
||||
275
management/server/loginfilter_test.go
Normal file
275
management/server/loginfilter_test.go
Normal file
@@ -0,0 +1,275 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"hash/fnv"
|
||||
"math"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/suite"
|
||||
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
)
|
||||
|
||||
func testAdvancedCfg() *lfConfig {
|
||||
return &lfConfig{
|
||||
reconnThreshold: 50 * time.Millisecond,
|
||||
baseBlockDuration: 100 * time.Millisecond,
|
||||
reconnLimitForBan: 3,
|
||||
metaChangeLimit: 2,
|
||||
}
|
||||
}
|
||||
|
||||
type LoginFilterTestSuite struct {
|
||||
suite.Suite
|
||||
filter *loginFilter
|
||||
}
|
||||
|
||||
func (s *LoginFilterTestSuite) SetupTest() {
|
||||
s.filter = newLoginFilterWithCfg(testAdvancedCfg())
|
||||
}
|
||||
|
||||
func TestLoginFilterTestSuite(t *testing.T) {
|
||||
suite.Run(t, new(LoginFilterTestSuite))
|
||||
}
|
||||
|
||||
func (s *LoginFilterTestSuite) TestFirstLoginIsAlwaysAllowed() {
|
||||
pubKey := "PUB_KEY_A"
|
||||
meta := uint64(1)
|
||||
|
||||
s.True(s.filter.allowLogin(pubKey, meta))
|
||||
|
||||
s.filter.addLogin(pubKey, meta)
|
||||
s.Require().Contains(s.filter.logged, pubKey)
|
||||
s.Equal(1, s.filter.logged[pubKey].sessionCounter)
|
||||
}
|
||||
|
||||
func (s *LoginFilterTestSuite) TestFlappingSameHashTriggersBan() {
|
||||
pubKey := "PUB_KEY_A"
|
||||
meta := uint64(1)
|
||||
limit := s.filter.cfg.reconnLimitForBan
|
||||
|
||||
for i := 0; i <= limit; i++ {
|
||||
s.filter.addLogin(pubKey, meta)
|
||||
}
|
||||
|
||||
s.False(s.filter.allowLogin(pubKey, meta))
|
||||
s.Require().Contains(s.filter.logged, pubKey)
|
||||
s.True(s.filter.logged[pubKey].isBanned)
|
||||
}
|
||||
|
||||
func (s *LoginFilterTestSuite) TestBanDurationIncreasesExponentially() {
|
||||
pubKey := "PUB_KEY_A"
|
||||
meta := uint64(1)
|
||||
limit := s.filter.cfg.reconnLimitForBan
|
||||
baseBan := s.filter.cfg.baseBlockDuration
|
||||
|
||||
for i := 0; i <= limit; i++ {
|
||||
s.filter.addLogin(pubKey, meta)
|
||||
}
|
||||
s.Require().Contains(s.filter.logged, pubKey)
|
||||
s.True(s.filter.logged[pubKey].isBanned)
|
||||
s.Equal(1, s.filter.logged[pubKey].banLevel)
|
||||
firstBanDuration := s.filter.logged[pubKey].banExpiresAt.Sub(s.filter.logged[pubKey].lastSeen)
|
||||
s.InDelta(baseBan, firstBanDuration, float64(time.Millisecond))
|
||||
|
||||
s.filter.logged[pubKey].banExpiresAt = time.Now().Add(-time.Second)
|
||||
s.filter.logged[pubKey].isBanned = false
|
||||
|
||||
for i := 0; i <= limit; i++ {
|
||||
s.filter.addLogin(pubKey, meta)
|
||||
}
|
||||
s.True(s.filter.logged[pubKey].isBanned)
|
||||
s.Equal(2, s.filter.logged[pubKey].banLevel)
|
||||
secondBanDuration := s.filter.logged[pubKey].banExpiresAt.Sub(s.filter.logged[pubKey].lastSeen)
|
||||
expectedSecondDuration := time.Duration(float64(baseBan) * math.Pow(2, 1))
|
||||
s.InDelta(expectedSecondDuration, secondBanDuration, float64(time.Millisecond))
|
||||
}
|
||||
|
||||
func (s *LoginFilterTestSuite) TestPeerIsAllowedAfterBanExpires() {
|
||||
pubKey := "PUB_KEY_A"
|
||||
meta := uint64(1)
|
||||
|
||||
s.filter.logged[pubKey] = &peerState{
|
||||
isBanned: true,
|
||||
banExpiresAt: time.Now().Add(-(s.filter.cfg.baseBlockDuration + time.Second)),
|
||||
}
|
||||
|
||||
s.True(s.filter.allowLogin(pubKey, meta))
|
||||
|
||||
s.filter.addLogin(pubKey, meta)
|
||||
s.Require().Contains(s.filter.logged, pubKey)
|
||||
s.False(s.filter.logged[pubKey].isBanned)
|
||||
}
|
||||
|
||||
func (s *LoginFilterTestSuite) TestBanLevelResetsAfterGoodBehavior() {
|
||||
pubKey := "PUB_KEY_A"
|
||||
meta := uint64(1)
|
||||
|
||||
s.filter.logged[pubKey] = &peerState{
|
||||
currentHash: meta,
|
||||
banLevel: 3,
|
||||
lastSeen: time.Now().Add(-3 * s.filter.cfg.baseBlockDuration),
|
||||
}
|
||||
|
||||
s.filter.addLogin(pubKey, meta)
|
||||
s.Require().Contains(s.filter.logged, pubKey)
|
||||
s.Equal(0, s.filter.logged[pubKey].banLevel)
|
||||
}
|
||||
|
||||
func (s *LoginFilterTestSuite) TestFlappingDifferentHashesTriggersBlock() {
|
||||
pubKey := "PUB_KEY_A"
|
||||
limit := s.filter.cfg.metaChangeLimit
|
||||
|
||||
for i := range limit {
|
||||
s.filter.addLogin(pubKey, uint64(i+1))
|
||||
}
|
||||
|
||||
s.Require().Contains(s.filter.logged, pubKey)
|
||||
s.Equal(limit, s.filter.logged[pubKey].metaChangeCounter)
|
||||
|
||||
isAllowed := s.filter.allowLogin(pubKey, uint64(limit+1))
|
||||
|
||||
s.False(isAllowed, "should block new meta hash after limit is reached")
|
||||
}
|
||||
|
||||
func (s *LoginFilterTestSuite) TestMetaChangeIsAllowedAfterWindowResets() {
|
||||
pubKey := "PUB_KEY_A"
|
||||
meta1 := uint64(1)
|
||||
meta2 := uint64(2)
|
||||
meta3 := uint64(3)
|
||||
|
||||
s.filter.addLogin(pubKey, meta1)
|
||||
s.filter.addLogin(pubKey, meta2)
|
||||
s.Require().Contains(s.filter.logged, pubKey)
|
||||
s.Equal(s.filter.cfg.metaChangeLimit, s.filter.logged[pubKey].metaChangeCounter)
|
||||
s.False(s.filter.allowLogin(pubKey, meta3), "should be blocked inside window")
|
||||
|
||||
s.filter.logged[pubKey].metaChangeWindowStart = time.Now().Add(-(s.filter.cfg.reconnThreshold + time.Second))
|
||||
|
||||
s.True(s.filter.allowLogin(pubKey, meta3), "should be allowed after window expires")
|
||||
|
||||
s.filter.addLogin(pubKey, meta3)
|
||||
s.Equal(1, s.filter.logged[pubKey].metaChangeCounter, "meta change counter should reset")
|
||||
}
|
||||
|
||||
func BenchmarkHashingMethods(b *testing.B) {
|
||||
meta := nbpeer.PeerSystemMeta{
|
||||
WtVersion: "1.25.1",
|
||||
OSVersion: "Ubuntu 22.04.3 LTS",
|
||||
KernelVersion: "5.15.0-76-generic",
|
||||
Hostname: "prod-server-database-01",
|
||||
SystemSerialNumber: "PC-1234567890",
|
||||
NetworkAddresses: []nbpeer.NetworkAddress{{Mac: "00:1B:44:11:3A:B7"}, {Mac: "00:1B:44:11:3A:B8"}},
|
||||
}
|
||||
pubip := "8.8.8.8"
|
||||
|
||||
var resultString string
|
||||
var resultUint uint64
|
||||
|
||||
b.Run("BuilderString", func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
resultString = builderString(meta, pubip)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("FnvHashToString", func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
resultString = fnvHashToString(meta, pubip)
|
||||
}
|
||||
})
|
||||
|
||||
b.Run("FnvHashToUint64 - used", func(b *testing.B) {
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
resultUint = metaHash(meta, pubip)
|
||||
}
|
||||
})
|
||||
|
||||
_ = resultString
|
||||
_ = resultUint
|
||||
}
|
||||
|
||||
func fnvHashToString(meta nbpeer.PeerSystemMeta, pubip string) string {
|
||||
h := fnv.New64a()
|
||||
|
||||
if len(meta.NetworkAddresses) != 0 {
|
||||
for _, na := range meta.NetworkAddresses {
|
||||
h.Write([]byte(na.Mac))
|
||||
}
|
||||
}
|
||||
|
||||
h.Write([]byte(meta.WtVersion))
|
||||
h.Write([]byte(meta.OSVersion))
|
||||
h.Write([]byte(meta.KernelVersion))
|
||||
h.Write([]byte(meta.Hostname))
|
||||
h.Write([]byte(meta.SystemSerialNumber))
|
||||
h.Write([]byte(pubip))
|
||||
|
||||
return strconv.FormatUint(h.Sum64(), 16)
|
||||
}
|
||||
|
||||
func builderString(meta nbpeer.PeerSystemMeta, pubip string) string {
|
||||
mac := getMacAddress(meta.NetworkAddresses)
|
||||
estimatedSize := len(meta.WtVersion) + len(meta.OSVersion) + len(meta.KernelVersion) + len(meta.Hostname) + len(meta.SystemSerialNumber) +
|
||||
len(pubip) + len(mac) + 6
|
||||
|
||||
var b strings.Builder
|
||||
b.Grow(estimatedSize)
|
||||
|
||||
b.WriteString(meta.WtVersion)
|
||||
b.WriteByte('|')
|
||||
b.WriteString(meta.OSVersion)
|
||||
b.WriteByte('|')
|
||||
b.WriteString(meta.KernelVersion)
|
||||
b.WriteByte('|')
|
||||
b.WriteString(meta.Hostname)
|
||||
b.WriteByte('|')
|
||||
b.WriteString(meta.SystemSerialNumber)
|
||||
b.WriteByte('|')
|
||||
b.WriteString(pubip)
|
||||
|
||||
return b.String()
|
||||
}
|
||||
|
||||
func getMacAddress(nas []nbpeer.NetworkAddress) string {
|
||||
if len(nas) == 0 {
|
||||
return ""
|
||||
}
|
||||
macs := make([]string, 0, len(nas))
|
||||
for _, na := range nas {
|
||||
macs = append(macs, na.Mac)
|
||||
}
|
||||
return strings.Join(macs, "/")
|
||||
}
|
||||
|
||||
func BenchmarkLoginFilter_ParallelLoad(b *testing.B) {
|
||||
filter := newLoginFilterWithCfg(testAdvancedCfg())
|
||||
numKeys := 100000
|
||||
pubKeys := make([]string, numKeys)
|
||||
for i := range numKeys {
|
||||
pubKeys[i] = "PUB_KEY_" + strconv.Itoa(i)
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
r := rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
for pb.Next() {
|
||||
key := pubKeys[r.Intn(numKeys)]
|
||||
meta := r.Uint64()
|
||||
|
||||
if filter.allowLogin(key, meta) {
|
||||
filter.addLogin(key, meta)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -121,8 +121,10 @@ type MockAccountManager struct {
|
||||
GetAccountOnboardingFunc func(ctx context.Context, accountID, userID string) (*types.AccountOnboarding, error)
|
||||
UpdateAccountOnboardingFunc func(ctx context.Context, accountID, userID string, onboarding *types.AccountOnboarding) (*types.AccountOnboarding, error)
|
||||
GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error)
|
||||
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||
|
||||
AllowSyncFunc func(string, uint64) bool
|
||||
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error {
|
||||
@@ -953,3 +955,10 @@ func (am *MockAccountManager) GetCurrentUserInfo(ctx context.Context, userAuth n
|
||||
}
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented")
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) AllowSync(key string, hash uint64) bool {
|
||||
if am.AllowSyncFunc != nil {
|
||||
return am.AllowSyncFunc(key, hash)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -1609,7 +1609,6 @@ func Test_LoginPeer(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
setupKey string
|
||||
wireGuardPubKey string
|
||||
expectExtraDNSLabelsMismatch bool
|
||||
extraDNSLabels []string
|
||||
expectLoginError bool
|
||||
|
||||
@@ -15,8 +15,10 @@ const HighLatencyThreshold = time.Second * 7
|
||||
type GRPCMetrics struct {
|
||||
meter metric.Meter
|
||||
syncRequestsCounter metric.Int64Counter
|
||||
syncRequestsBlockedCounter metric.Int64Counter
|
||||
syncRequestHighLatencyCounter metric.Int64Counter
|
||||
loginRequestsCounter metric.Int64Counter
|
||||
loginRequestsBlockedCounter metric.Int64Counter
|
||||
loginRequestHighLatencyCounter metric.Int64Counter
|
||||
getKeyRequestsCounter metric.Int64Counter
|
||||
activeStreamsGauge metric.Int64ObservableGauge
|
||||
@@ -36,6 +38,14 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
|
||||
return nil, err
|
||||
}
|
||||
|
||||
syncRequestsBlockedCounter, err := meter.Int64Counter("management.grpc.sync.request.blocked.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of sync gRPC requests from blocked peers"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
syncRequestHighLatencyCounter, err := meter.Int64Counter("management.grpc.sync.request.high.latency.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of sync gRPC requests from the peers that took longer than the threshold to establish a connection and receive network map updates (update channel)"),
|
||||
@@ -52,6 +62,14 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
|
||||
return nil, err
|
||||
}
|
||||
|
||||
loginRequestsBlockedCounter, err := meter.Int64Counter("management.grpc.login.request.blocked.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of login gRPC requests from blocked peers"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
loginRequestHighLatencyCounter, err := meter.Int64Counter("management.grpc.login.request.high.latency.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of login gRPC requests from the peers that took longer than the threshold to authenticate and receive initial configuration and relay credentials"),
|
||||
@@ -107,8 +125,10 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
|
||||
return &GRPCMetrics{
|
||||
meter: meter,
|
||||
syncRequestsCounter: syncRequestsCounter,
|
||||
syncRequestsBlockedCounter: syncRequestsBlockedCounter,
|
||||
syncRequestHighLatencyCounter: syncRequestHighLatencyCounter,
|
||||
loginRequestsCounter: loginRequestsCounter,
|
||||
loginRequestsBlockedCounter: loginRequestsBlockedCounter,
|
||||
loginRequestHighLatencyCounter: loginRequestHighLatencyCounter,
|
||||
getKeyRequestsCounter: getKeyRequestsCounter,
|
||||
activeStreamsGauge: activeStreamsGauge,
|
||||
@@ -124,6 +144,11 @@ func (grpcMetrics *GRPCMetrics) CountSyncRequest() {
|
||||
grpcMetrics.syncRequestsCounter.Add(grpcMetrics.ctx, 1)
|
||||
}
|
||||
|
||||
// CountSyncRequestBlocked counts the number of gRPC sync requests from blocked peers
|
||||
func (grpcMetrics *GRPCMetrics) CountSyncRequestBlocked() {
|
||||
grpcMetrics.syncRequestsBlockedCounter.Add(grpcMetrics.ctx, 1)
|
||||
}
|
||||
|
||||
// CountGetKeyRequest counts the number of gRPC get server key requests coming to the gRPC API
|
||||
func (grpcMetrics *GRPCMetrics) CountGetKeyRequest() {
|
||||
grpcMetrics.getKeyRequestsCounter.Add(grpcMetrics.ctx, 1)
|
||||
@@ -134,6 +159,11 @@ func (grpcMetrics *GRPCMetrics) CountLoginRequest() {
|
||||
grpcMetrics.loginRequestsCounter.Add(grpcMetrics.ctx, 1)
|
||||
}
|
||||
|
||||
// CountLoginRequestBlocked counts the number of gRPC login requests from blocked peers
|
||||
func (grpcMetrics *GRPCMetrics) CountLoginRequestBlocked() {
|
||||
grpcMetrics.loginRequestsBlockedCounter.Add(grpcMetrics.ctx, 1)
|
||||
}
|
||||
|
||||
// CountLoginRequestDuration counts the duration of the login gRPC requests
|
||||
func (grpcMetrics *GRPCMetrics) CountLoginRequestDuration(duration time.Duration, accountID string) {
|
||||
grpcMetrics.loginRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds())
|
||||
|
||||
@@ -42,7 +42,10 @@ const (
|
||||
// Type is a type of the Error
|
||||
type Type int32
|
||||
|
||||
var ErrExtraSettingsNotFound = fmt.Errorf("extra settings not found")
|
||||
var (
|
||||
ErrExtraSettingsNotFound = errors.New("extra settings not found")
|
||||
ErrPeerAlreadyLoggedIn = errors.New("peer with the same public key is already logged in")
|
||||
)
|
||||
|
||||
// Error is an internal error
|
||||
type Error struct {
|
||||
|
||||
Reference in New Issue
Block a user