Merge branch 'add-account-onboarding' into fix/merge-main

This commit is contained in:
Pedro Costa
2025-07-08 09:11:02 +01:00
9 changed files with 117 additions and 12 deletions

View File

@@ -112,6 +112,7 @@ type Manager interface {
GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error)
DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error
UpdateAccountPeers(ctx context.Context, accountID string)
BufferUpdateAccountPeers(ctx context.Context, accountID string)
BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error)
SyncUserJWTGroups(ctx context.Context, userAuth nbcontext.UserAuth) error
GetStore() store.Store

View File

@@ -216,6 +216,8 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)
// return empty extra settings for expected calls to UpdateAccountPeers
settingsMockManager.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
}

View File

@@ -5,10 +5,12 @@ import (
"sync"
"time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
nbAccount "github.com/netbirdio/netbird/management/server/account"
"github.com/netbirdio/netbird/management/server/activity"
nbContext "github.com/netbirdio/netbird/management/server/context"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store"
)
@@ -138,6 +140,9 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
func (e *EphemeralManager) cleanup(ctx context.Context) {
log.Tracef("on ephemeral cleanup")
reqID := uuid.New().String()
//nolint
ctx = context.WithValue(ctx, nbContext.RequestIDKey, reqID)
deletePeers := make(map[string]*ephemeralPeer)
e.peersLock.Lock()
@@ -164,13 +169,21 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
e.peersLock.Unlock()
bufferAccountCall := make(map[string]struct{})
for id, p := range deletePeers {
log.WithContext(ctx).Debugf("delete ephemeral peer: %s", id)
err := e.accountManager.DeletePeer(ctx, p.accountID, id, activity.SystemInitiator)
if err != nil {
log.WithContext(ctx).Errorf("failed to delete ephemeral peer: %s", err)
} else {
bufferAccountCall[p.accountID] = struct{}{}
}
}
for accountID := range bufferAccountCall {
log.WithContext(ctx).Debugf("ephemeral - buffer update account peers for account: %s", accountID)
e.accountManager.BufferUpdateAccountPeers(ctx, accountID)
}
}
func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) {

View File

@@ -37,6 +37,10 @@ func (a MocAccountManager) DeletePeer(_ context.Context, accountID, peerID, user
return nil //nolint:nil
}
func (a MocAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
// noop
}
func (a MocAccountManager) GetStore() store.Store {
return a.store
}

View File

@@ -5,6 +5,8 @@ import (
"fmt"
"net"
"net/netip"
"os"
"strconv"
"strings"
"sync"
"time"
@@ -13,6 +15,7 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
@@ -47,6 +50,10 @@ type GRPCServer struct {
ephemeralManager *EphemeralManager
peerLocks sync.Map
authManager auth.Manager
syncLimiter *rate.Limiter
loginLimiterStore sync.Map
loginPeerBooster int
loginPeerLimit rate.Limit
}
// NewServer creates a new Management server
@@ -76,6 +83,41 @@ func NewServer(
}
}
multiplier := time.Minute
d, e := time.ParseDuration(os.Getenv("NB_LOGIN_RATE"))
if e == nil {
multiplier = d
}
loginRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_RATE_PER_M"))
if loginRatePerS == 0 || err != nil {
loginRatePerS = 200
}
loginBurst, err := strconv.Atoi(os.Getenv("NB_LOGIN_BURST"))
if loginBurst == 0 || err != nil {
loginBurst = 200
}
log.WithContext(ctx).Infof("login burst limit set to %d", loginBurst)
loginPeerRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_PEER_RATE_PER_M"))
if loginPeerRatePerS == 0 || err != nil {
loginPeerRatePerS = 200
}
log.WithContext(ctx).Infof("login rate limit set to %d/min", loginRatePerS)
syncRatePerS, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
if syncRatePerS == 0 || err != nil {
syncRatePerS = 20000
}
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncRatePerS)
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
if syncBurst == 0 || err != nil {
syncBurst = 30000
}
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
return &GRPCServer{
wgKey: key,
// peerKey -> event channel
@@ -87,6 +129,9 @@ func NewServer(
authManager: authManager,
appMetrics: appMetrics,
ephemeralManager: ephemeralManager,
syncLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(syncRatePerS)), syncBurst),
loginPeerLimit: rate.Every(multiplier / time.Duration(loginPeerRatePerS)),
loginPeerBooster: loginBurst,
}, nil
}
@@ -128,11 +173,17 @@ func getRealIP(ctx context.Context) net.IP {
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
// notifies the connected peer of any updates (e.g. new peers under the same account)
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
reqStart := time.Now()
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequest()
}
if !s.syncLimiter.Allow() {
log.Warnf("sync rate limit exceeded for peer %s", req.WgPubKey)
return status.Errorf(codes.Internal, "temp rate limit reached")
}
reqStart := time.Now()
ctx := srv.Context()
syncReq := &proto.SyncRequest{}
@@ -428,15 +479,39 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
// In case of the successful registration login is also successful
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
limiterIface, ok := s.loginLimiterStore.Load(req.WgPubKey)
if !ok {
// Create new limiter for this peer
newLimiter := rate.NewLimiter(s.loginPeerLimit, s.loginPeerBooster)
s.loginLimiterStore.Store(req.WgPubKey, newLimiter)
if !newLimiter.Allow() {
//time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (new peer limit)")
}
} else {
// Use existing limiter for this peer
limiter := limiterIface.(*rate.Limiter)
if !limiter.Allow() {
//time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (peer limit)")
}
}
reqStart := time.Now()
defer func() {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
}
}()
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
//if s.appMetrics != nil {
// s.appMetrics.GRPCMetrics().CountLoginRequest()
//}
realIP := getRealIP(ctx)
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())

View File

@@ -126,6 +126,10 @@ func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID
// do nothing
}
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
// do nothing
}
func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error {
if am.DeleteSetupKeyFunc != nil {
return am.DeleteSetupKeyFunc(ctx, accountID, userID, keyID)

View File

@@ -388,7 +388,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
storeEvent()
}
if updateAccountPeers {
if updateAccountPeers && userID != activity.SystemInitiator {
am.BufferUpdateAccountPeers(ctx, accountID)
}
@@ -1211,6 +1211,12 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return
}
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
return
}
for _, peer := range account.Peers {
if !am.peersUpdateManager.HasChannel(peer.ID) {
log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID)
@@ -1245,12 +1251,6 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
}
am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start))
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
return
}
start = time.Now()
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting)
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))