diff --git a/go.mod b/go.mod index a12058278..6ddd0dac5 100644 --- a/go.mod +++ b/go.mod @@ -105,6 +105,7 @@ require ( golang.org/x/oauth2 v0.24.0 golang.org/x/sync v0.13.0 golang.org/x/term v0.31.0 + golang.org/x/time v0.5.0 google.golang.org/api v0.177.0 gopkg.in/yaml.v3 v3.0.1 gorm.io/driver/mysql v1.5.7 @@ -240,7 +241,6 @@ require ( golang.org/x/image v0.18.0 // indirect golang.org/x/mod v0.17.0 // indirect golang.org/x/text v0.24.0 // indirect - golang.org/x/time v0.5.0 // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect diff --git a/management/client/client_test.go b/management/client/client_test.go index c163d1833..1847af73e 100644 --- a/management/client/client_test.go +++ b/management/client/client_test.go @@ -87,6 +87,12 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) { ). Return(&types.Settings{}, nil). AnyTimes() + settingsMockManager. + EXPECT(). + GetExtraSettings(gomock.Any(), gomock.Any()). + Return(&types.ExtraSettings{}, nil). + AnyTimes() + permissionsManagerMock := permissions.NewMockManager(ctrl) permissionsManagerMock. EXPECT(). diff --git a/management/server/account/manager.go b/management/server/account/manager.go index ed17fa5ec..f8aa2756a 100644 --- a/management/server/account/manager.go +++ b/management/server/account/manager.go @@ -112,6 +112,7 @@ type Manager interface { GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error UpdateAccountPeers(ctx context.Context, accountID string) + BufferUpdateAccountPeers(ctx context.Context, accountID string) BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error) SyncUserJWTGroups(ctx context.Context, userAuth nbcontext.UserAuth) error GetStore() store.Store diff --git a/management/server/dns_test.go b/management/server/dns_test.go index 02bb042d7..31c944a25 100644 --- a/management/server/dns_test.go +++ b/management/server/dns_test.go @@ -216,6 +216,8 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) { t.Cleanup(ctrl.Finish) settingsMockManager := settings.NewMockManager(ctrl) + // return empty extra settings for expected calls to UpdateAccountPeers + settingsMockManager.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes() permissionsManager := permissions.NewManager(store) return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) } diff --git a/management/server/ephemeral.go b/management/server/ephemeral.go index 3cb9b7536..4bf7dba2b 100644 --- a/management/server/ephemeral.go +++ b/management/server/ephemeral.go @@ -5,10 +5,12 @@ import ( "sync" "time" + "github.com/google/uuid" log "github.com/sirupsen/logrus" nbAccount "github.com/netbirdio/netbird/management/server/account" "github.com/netbirdio/netbird/management/server/activity" + nbContext "github.com/netbirdio/netbird/management/server/context" nbpeer "github.com/netbirdio/netbird/management/server/peer" "github.com/netbirdio/netbird/management/server/store" ) @@ -138,6 +140,9 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) { func (e *EphemeralManager) cleanup(ctx context.Context) { log.Tracef("on ephemeral cleanup") + reqID := uuid.New().String() + //nolint + ctx = context.WithValue(ctx, nbContext.RequestIDKey, reqID) deletePeers := make(map[string]*ephemeralPeer) e.peersLock.Lock() @@ -164,13 +169,21 @@ func (e *EphemeralManager) cleanup(ctx context.Context) { e.peersLock.Unlock() + bufferAccountCall := make(map[string]struct{}) + for id, p := range deletePeers { log.WithContext(ctx).Debugf("delete ephemeral peer: %s", id) err := e.accountManager.DeletePeer(ctx, p.accountID, id, activity.SystemInitiator) if err != nil { log.WithContext(ctx).Errorf("failed to delete ephemeral peer: %s", err) + } else { + bufferAccountCall[p.accountID] = struct{}{} } } + for accountID := range bufferAccountCall { + log.WithContext(ctx).Debugf("ephemeral - buffer update account peers for account: %s", accountID) + e.accountManager.BufferUpdateAccountPeers(ctx, accountID) + } } func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) { diff --git a/management/server/ephemeral_test.go b/management/server/ephemeral_test.go index 3cf6ae7f3..7dbecf2da 100644 --- a/management/server/ephemeral_test.go +++ b/management/server/ephemeral_test.go @@ -37,6 +37,10 @@ func (a MocAccountManager) DeletePeer(_ context.Context, accountID, peerID, user return nil //nolint:nil } +func (a MocAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { + // noop +} + func (a MocAccountManager) GetStore() store.Store { return a.store } diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 2b27f9e0f..a9fb60c21 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -5,6 +5,8 @@ import ( "fmt" "net" "net/netip" + "os" + "strconv" "strings" "sync" "time" @@ -13,6 +15,7 @@ import ( "github.com/golang/protobuf/ptypes/timestamp" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip" log "github.com/sirupsen/logrus" + "golang.org/x/time/rate" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc/codes" "google.golang.org/grpc/peer" @@ -47,6 +50,10 @@ type GRPCServer struct { ephemeralManager *EphemeralManager peerLocks sync.Map authManager auth.Manager + syncLimiter *rate.Limiter + loginLimiterStore sync.Map + loginPeerBooster int + loginPeerLimit rate.Limit } // NewServer creates a new Management server @@ -76,6 +83,41 @@ func NewServer( } } + multiplier := time.Minute + d, e := time.ParseDuration(os.Getenv("NB_LOGIN_RATE")) + if e == nil { + multiplier = d + } + + loginRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_RATE_PER_M")) + if loginRatePerS == 0 || err != nil { + loginRatePerS = 200 + } + + loginBurst, err := strconv.Atoi(os.Getenv("NB_LOGIN_BURST")) + if loginBurst == 0 || err != nil { + loginBurst = 200 + } + log.WithContext(ctx).Infof("login burst limit set to %d", loginBurst) + + loginPeerRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_PEER_RATE_PER_M")) + if loginPeerRatePerS == 0 || err != nil { + loginPeerRatePerS = 200 + } + log.WithContext(ctx).Infof("login rate limit set to %d/min", loginRatePerS) + + syncRatePerS, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M")) + if syncRatePerS == 0 || err != nil { + syncRatePerS = 20000 + } + log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncRatePerS) + + syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST")) + if syncBurst == 0 || err != nil { + syncBurst = 30000 + } + log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst) + return &GRPCServer{ wgKey: key, // peerKey -> event channel @@ -87,6 +129,9 @@ func NewServer( authManager: authManager, appMetrics: appMetrics, ephemeralManager: ephemeralManager, + syncLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(syncRatePerS)), syncBurst), + loginPeerLimit: rate.Every(multiplier / time.Duration(loginPeerRatePerS)), + loginPeerBooster: loginBurst, }, nil } @@ -128,11 +173,17 @@ func getRealIP(ctx context.Context) net.IP { // Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and // notifies the connected peer of any updates (e.g. new peers under the same account) func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error { - reqStart := time.Now() if s.appMetrics != nil { s.appMetrics.GRPCMetrics().CountSyncRequest() } + if !s.syncLimiter.Allow() { + log.Warnf("sync rate limit exceeded for peer %s", req.WgPubKey) + return status.Errorf(codes.Internal, "temp rate limit reached") + } + + reqStart := time.Now() + ctx := srv.Context() syncReq := &proto.SyncRequest{} @@ -428,15 +479,39 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa // In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer. // In case of the successful registration login is also successful func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) { + if s.appMetrics != nil { + s.appMetrics.GRPCMetrics().CountLoginRequest() + } + + limiterIface, ok := s.loginLimiterStore.Load(req.WgPubKey) + if !ok { + // Create new limiter for this peer + newLimiter := rate.NewLimiter(s.loginPeerLimit, s.loginPeerBooster) + s.loginLimiterStore.Store(req.WgPubKey, newLimiter) + + if !newLimiter.Allow() { + //time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100))) + log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey) + return nil, fmt.Errorf("temp rate limit reached (new peer limit)") + } + } else { + // Use existing limiter for this peer + limiter := limiterIface.(*rate.Limiter) + if !limiter.Allow() { + //time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100))) + log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey) + return nil, fmt.Errorf("temp rate limit reached (peer limit)") + } + } reqStart := time.Now() defer func() { if s.appMetrics != nil { s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart)) } }() - if s.appMetrics != nil { - s.appMetrics.GRPCMetrics().CountLoginRequest() - } + //if s.appMetrics != nil { + // s.appMetrics.GRPCMetrics().CountLoginRequest() + //} realIP := getRealIP(ctx) log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String()) diff --git a/management/server/mock_server/account_mock.go b/management/server/mock_server/account_mock.go index 8837f9f50..4004f1b57 100644 --- a/management/server/mock_server/account_mock.go +++ b/management/server/mock_server/account_mock.go @@ -126,6 +126,10 @@ func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID // do nothing } +func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { + // do nothing +} + func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error { if am.DeleteSetupKeyFunc != nil { return am.DeleteSetupKeyFunc(ctx, accountID, userID, keyID) diff --git a/management/server/peer.go b/management/server/peer.go index 1dd390dd9..f64b16bb0 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -388,7 +388,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer storeEvent() } - if updateAccountPeers { + if updateAccountPeers && userID != activity.SystemInitiator { am.BufferUpdateAccountPeers(ctx, accountID) } @@ -1211,6 +1211,12 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account return } + extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID) + if err != nil { + log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err) + return + } + for _, peer := range account.Peers { if !am.peersUpdateManager.HasChannel(peer.ID) { log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID) @@ -1245,12 +1251,6 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account } am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start)) - extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID) - if err != nil { - log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err) - return - } - start = time.Now() update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting) am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))