Compare commits

...

19 Commits

Author SHA1 Message Date
Pedro Costa
87416ce185 Merge branch 'main' into fix/merge-main 2025-07-09 10:12:08 +01:00
Pedro Costa
9bb8134ef3 always suffix ephemeral peer name 2025-07-08 11:12:09 +01:00
Pedro Costa
67a4989cbb Merge branch 'add-account-onboarding' into fix/merge-main 2025-07-08 10:07:36 +01:00
Pedro Costa
2e18d77d40 further optimization to ensure db roundtrip and calculations only occur if there are peers to update 2025-07-08 09:56:15 +01:00
Pedro Costa
3a8a6fcb76 further test fixes 2025-07-08 09:36:16 +01:00
Pedro Costa
c57a8fdd68 Merge branch 'add-account-onboarding' into fix/merge-main 2025-07-08 09:11:02 +01:00
Pedro Costa
49f083a372 fix dns and client tests 2025-07-08 09:07:26 +01:00
Pedro Costa
7d9ca73f6c fix ephemeral test 2025-07-08 08:58:31 +01:00
Pedro Costa
470b80c1b8 get extra settings only once per updateaccountpeers 2025-07-08 08:42:45 +01:00
Maycon Santos
ad0b78a7ac add request id to ephemeral cleanup 2025-07-08 02:04:05 +02:00
Maycon Santos
7aa2ca87f2 split call to BufferUpdateAccountPeers when system user initiates 2025-07-08 01:41:07 +02:00
Maycon Santos
4c58088311 fix go mod 2025-07-07 19:38:02 +02:00
Maycon Santos
bcccd65008 add rate limit 2025-07-07 19:35:48 +02:00
Maycon Santos
1ffc8933de add rate limit 2025-07-07 19:15:54 +02:00
Maycon Santos
ad22e9eea1 Merge branch 'main' into add-account-onboarding 2025-07-02 02:59:03 +02:00
Maycon Santos
d806fc4a03 handle empty onboard to avoid breaking clients and dashboard 2025-07-02 01:38:40 +02:00
Maycon Santos
7a5edb3894 create accounts with pending onboarding 2025-07-01 23:25:52 +02:00
Maycon Santos
2dc230ab9a add store and account manager methods
add store tests
2025-07-01 19:54:53 +02:00
Maycon Santos
432dc42bf5 add account onboarding 2025-07-01 11:51:46 +02:00
13 changed files with 191 additions and 39 deletions

View File

@@ -102,6 +102,11 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
GetSettings(gomock.Any(), gomock.Any(), gomock.Any()). GetSettings(gomock.Any(), gomock.Any(), gomock.Any()).
Return(&types.Settings{}, nil). Return(&types.Settings{}, nil).
AnyTimes() AnyTimes()
settingsMockManager.
EXPECT().
GetExtraSettings(gomock.Any(), gomock.Any()).
Return(&types.ExtraSettings{}, nil).
AnyTimes()
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false) accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
if err != nil { if err != nil {

2
go.mod
View File

@@ -105,6 +105,7 @@ require (
golang.org/x/oauth2 v0.24.0 golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.13.0 golang.org/x/sync v0.13.0
golang.org/x/term v0.31.0 golang.org/x/term v0.31.0
golang.org/x/time v0.5.0
google.golang.org/api v0.177.0 google.golang.org/api v0.177.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.7 gorm.io/driver/mysql v1.5.7
@@ -240,7 +241,6 @@ require (
golang.org/x/image v0.18.0 // indirect golang.org/x/image v0.18.0 // indirect
golang.org/x/mod v0.17.0 // indirect golang.org/x/mod v0.17.0 // indirect
golang.org/x/text v0.24.0 // indirect golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect

View File

@@ -87,6 +87,12 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
). ).
Return(&types.Settings{}, nil). Return(&types.Settings{}, nil).
AnyTimes() AnyTimes()
settingsMockManager.
EXPECT().
GetExtraSettings(gomock.Any(), gomock.Any()).
Return(&types.ExtraSettings{}, nil).
AnyTimes()
permissionsManagerMock := permissions.NewMockManager(ctrl) permissionsManagerMock := permissions.NewMockManager(ctrl)
permissionsManagerMock. permissionsManagerMock.
EXPECT(). EXPECT().

View File

@@ -112,6 +112,7 @@ type Manager interface {
GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error) GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error)
DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error
UpdateAccountPeers(ctx context.Context, accountID string) UpdateAccountPeers(ctx context.Context, accountID string)
BufferUpdateAccountPeers(ctx context.Context, accountID string)
BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error) BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error)
SyncUserJWTGroups(ctx context.Context, userAuth nbcontext.UserAuth) error SyncUserJWTGroups(ctx context.Context, userAuth nbcontext.UserAuth) error
GetStore() store.Store GetStore() store.Store

View File

@@ -216,6 +216,8 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
// return empty extra settings for expected calls to UpdateAccountPeers
settingsMockManager.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
} }

View File

@@ -5,10 +5,12 @@ import (
"sync" "sync"
"time" "time"
"github.com/google/uuid"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
nbAccount "github.com/netbirdio/netbird/management/server/account" nbAccount "github.com/netbirdio/netbird/management/server/account"
"github.com/netbirdio/netbird/management/server/activity" "github.com/netbirdio/netbird/management/server/activity"
nbContext "github.com/netbirdio/netbird/management/server/context"
nbpeer "github.com/netbirdio/netbird/management/server/peer" nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store" "github.com/netbirdio/netbird/management/server/store"
) )
@@ -138,6 +140,9 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
func (e *EphemeralManager) cleanup(ctx context.Context) { func (e *EphemeralManager) cleanup(ctx context.Context) {
log.Tracef("on ephemeral cleanup") log.Tracef("on ephemeral cleanup")
reqID := uuid.New().String()
//nolint
ctx = context.WithValue(ctx, nbContext.RequestIDKey, reqID)
deletePeers := make(map[string]*ephemeralPeer) deletePeers := make(map[string]*ephemeralPeer)
e.peersLock.Lock() e.peersLock.Lock()
@@ -164,13 +169,21 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
e.peersLock.Unlock() e.peersLock.Unlock()
bufferAccountCall := make(map[string]struct{})
for id, p := range deletePeers { for id, p := range deletePeers {
log.WithContext(ctx).Debugf("delete ephemeral peer: %s", id) log.WithContext(ctx).Debugf("delete ephemeral peer: %s", id)
err := e.accountManager.DeletePeer(ctx, p.accountID, id, activity.SystemInitiator) err := e.accountManager.DeletePeer(ctx, p.accountID, id, activity.SystemInitiator)
if err != nil { if err != nil {
log.WithContext(ctx).Errorf("failed to delete ephemeral peer: %s", err) log.WithContext(ctx).Errorf("failed to delete ephemeral peer: %s", err)
} else {
bufferAccountCall[p.accountID] = struct{}{}
} }
} }
for accountID := range bufferAccountCall {
log.WithContext(ctx).Debugf("ephemeral - buffer update account peers for account: %s", accountID)
e.accountManager.BufferUpdateAccountPeers(ctx, accountID)
}
} }
func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) { func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) {

View File

@@ -37,6 +37,10 @@ func (a MocAccountManager) DeletePeer(_ context.Context, accountID, peerID, user
return nil //nolint:nil return nil //nolint:nil
} }
func (a MocAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
// noop
}
func (a MocAccountManager) GetStore() store.Store { func (a MocAccountManager) GetStore() store.Store {
return a.store return a.store
} }

View File

@@ -5,6 +5,8 @@ import (
"fmt" "fmt"
"net" "net"
"net/netip" "net/netip"
"os"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -13,6 +15,7 @@ import (
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
@@ -47,6 +50,10 @@ type GRPCServer struct {
ephemeralManager *EphemeralManager ephemeralManager *EphemeralManager
peerLocks sync.Map peerLocks sync.Map
authManager auth.Manager authManager auth.Manager
syncLimiter *rate.Limiter
loginLimiterStore sync.Map
loginPeerBooster int
loginPeerLimit rate.Limit
} }
// NewServer creates a new Management server // NewServer creates a new Management server
@@ -76,6 +83,41 @@ func NewServer(
} }
} }
multiplier := time.Minute
d, e := time.ParseDuration(os.Getenv("NB_LOGIN_RATE"))
if e == nil {
multiplier = d
}
loginRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_RATE_PER_M"))
if loginRatePerS == 0 || err != nil {
loginRatePerS = 200
}
loginBurst, err := strconv.Atoi(os.Getenv("NB_LOGIN_BURST"))
if loginBurst == 0 || err != nil {
loginBurst = 200
}
log.WithContext(ctx).Infof("login burst limit set to %d", loginBurst)
loginPeerRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_PEER_RATE_PER_M"))
if loginPeerRatePerS == 0 || err != nil {
loginPeerRatePerS = 200
}
log.WithContext(ctx).Infof("login rate limit set to %d/min", loginRatePerS)
syncRatePerS, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
if syncRatePerS == 0 || err != nil {
syncRatePerS = 20000
}
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncRatePerS)
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
if syncBurst == 0 || err != nil {
syncBurst = 30000
}
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
return &GRPCServer{ return &GRPCServer{
wgKey: key, wgKey: key,
// peerKey -> event channel // peerKey -> event channel
@@ -87,6 +129,9 @@ func NewServer(
authManager: authManager, authManager: authManager,
appMetrics: appMetrics, appMetrics: appMetrics,
ephemeralManager: ephemeralManager, ephemeralManager: ephemeralManager,
syncLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(syncRatePerS)), syncBurst),
loginPeerLimit: rate.Every(multiplier / time.Duration(loginPeerRatePerS)),
loginPeerBooster: loginBurst,
}, nil }, nil
} }
@@ -128,11 +173,17 @@ func getRealIP(ctx context.Context) net.IP {
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and // Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
// notifies the connected peer of any updates (e.g. new peers under the same account) // notifies the connected peer of any updates (e.g. new peers under the same account)
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error { func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
reqStart := time.Now()
if s.appMetrics != nil { if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequest() s.appMetrics.GRPCMetrics().CountSyncRequest()
} }
if !s.syncLimiter.Allow() {
log.Warnf("sync rate limit exceeded for peer %s", req.WgPubKey)
return status.Errorf(codes.Internal, "temp rate limit reached")
}
reqStart := time.Now()
ctx := srv.Context() ctx := srv.Context()
syncReq := &proto.SyncRequest{} syncReq := &proto.SyncRequest{}
@@ -428,15 +479,39 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer. // In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
// In case of the successful registration login is also successful // In case of the successful registration login is also successful
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) { func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
limiterIface, ok := s.loginLimiterStore.Load(req.WgPubKey)
if !ok {
// Create new limiter for this peer
newLimiter := rate.NewLimiter(s.loginPeerLimit, s.loginPeerBooster)
s.loginLimiterStore.Store(req.WgPubKey, newLimiter)
if !newLimiter.Allow() {
//time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (new peer limit)")
}
} else {
// Use existing limiter for this peer
limiter := limiterIface.(*rate.Limiter)
if !limiter.Allow() {
//time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (peer limit)")
}
}
reqStart := time.Now() reqStart := time.Now()
defer func() { defer func() {
if s.appMetrics != nil { if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart)) s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
} }
}() }()
if s.appMetrics != nil { //if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest() // s.appMetrics.GRPCMetrics().CountLoginRequest()
} //}
realIP := getRealIP(ctx) realIP := getRealIP(ctx)
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String()) log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())

View File

@@ -440,7 +440,11 @@ func startManagementForTest(t *testing.T, testFile string, config *types.Config)
GetSettings(gomock.Any(), gomock.Any(), gomock.Any()). GetSettings(gomock.Any(), gomock.Any(), gomock.Any()).
AnyTimes(). AnyTimes().
Return(&types.Settings{}, nil) Return(&types.Settings{}, nil)
settingsMockManager.
EXPECT().
GetExtraSettings(gomock.Any(), gomock.Any()).
Return(&types.ExtraSettings{}, nil).
AnyTimes()
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
accountManager, err := BuildManager(ctx, store, peersUpdateManager, nil, "", "netbird.selfhosted", accountManager, err := BuildManager(ctx, store, peersUpdateManager, nil, "", "netbird.selfhosted",

View File

@@ -126,6 +126,10 @@ func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID
// do nothing // do nothing
} }
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
// do nothing
}
func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error { func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error {
if am.DeleteSetupKeyFunc != nil { if am.DeleteSetupKeyFunc != nil {
return am.DeleteSetupKeyFunc(ctx, accountID, userID, keyID) return am.DeleteSetupKeyFunc(ctx, accountID, userID, keyID)

View File

@@ -778,6 +778,12 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
settingsMockManager.
EXPECT().
GetExtraSettings(gomock.Any(), gomock.Any()).
Return(&types.ExtraSettings{}, nil).
AnyTimes()
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
} }

View File

@@ -236,11 +236,23 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
if peer.Name != update.Name { if peer.Name != update.Name {
var newLabel string var newLabel string
newLabel, err = getPeerIPDNSLabel(ctx, transaction, peer.IP, accountID, update.Name)
newLabel, err = nbdns.GetParsedDomainLabel(update.Name)
if err != nil { if err != nil {
return fmt.Errorf("failed to get free DNS label: %w", err) newLabel = ""
} else {
_, err := transaction.GetPeerIdByLabel(ctx, store.LockingStrengthNone, accountID, update.Name)
if err == nil {
newLabel = ""
}
} }
if newLabel == "" {
newLabel, err = getPeerIPDNSLabel(peer.IP, update.Name)
if err != nil {
return fmt.Errorf("failed to get free DNS label: %w", err)
}
}
peer.Name = update.Name peer.Name = update.Name
peer.DNSLabel = newLabel peer.DNSLabel = newLabel
peerLabelChanged = true peerLabelChanged = true
@@ -375,7 +387,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
storeEvent() storeEvent()
} }
if updateAccountPeers { if updateAccountPeers && userID != activity.SystemInitiator {
am.BufferUpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
@@ -472,6 +484,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
var groupsToAdd []string var groupsToAdd []string
var allowExtraDNSLabels bool var allowExtraDNSLabels bool
var accountID string var accountID string
var isEphemeral bool
if addedByUser { if addedByUser {
user, err := am.Store.GetUserByUserID(ctx, store.LockingStrengthNone, userID) user, err := am.Store.GetUserByUserID(ctx, store.LockingStrengthNone, userID)
if err != nil { if err != nil {
@@ -501,7 +514,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
setupKeyName = sk.Name setupKeyName = sk.Name
allowExtraDNSLabels = sk.AllowExtraDNSLabels allowExtraDNSLabels = sk.AllowExtraDNSLabels
accountID = sk.AccountID accountID = sk.AccountID
isEphemeral = sk.Ephemeral
if !sk.AllowExtraDNSLabels && len(peer.ExtraDNSLabels) > 0 { if !sk.AllowExtraDNSLabels && len(peer.ExtraDNSLabels) > 0 {
return nil, nil, nil, status.Errorf(status.PreconditionFailed, "couldn't add peer: setup key doesn't allow extra DNS labels") return nil, nil, nil, status.Errorf(status.PreconditionFailed, "couldn't add peer: setup key doesn't allow extra DNS labels")
} }
@@ -573,11 +586,17 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
} }
var freeLabel string var freeLabel string
freeLabel, err = getPeerIPDNSLabel(ctx, am.Store, freeIP, accountID, peer.Meta.Hostname) if isEphemeral || attempt > 1 {
if err != nil { freeLabel, err = getPeerIPDNSLabel(freeIP, peer.Meta.Hostname)
return nil, nil, nil, fmt.Errorf("failed to get free DNS label: %w", err) if err != nil {
return nil, nil, nil, fmt.Errorf("failed to get free DNS label: %w", err)
}
} else {
freeLabel, err = nbdns.GetParsedDomainLabel(peer.Meta.Hostname)
if err != nil {
return nil, nil, nil, fmt.Errorf("failed to get free DNS label: %w", err)
}
} }
newPeer.DNSLabel = freeLabel newPeer.DNSLabel = freeLabel
newPeer.IP = freeIP newPeer.IP = freeIP
@@ -647,7 +666,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
if isUniqueConstraintError(err) { if isUniqueConstraintError(err) {
unlock() unlock()
unlock = nil unlock = nil
log.WithContext(ctx).Debugf("Failed to add peer in attempt %d, retrying: %v", attempt, err) log.WithContext(ctx).WithFields(log.Fields{"dns_label": freeLabel, "ip": freeIP}).Tracef("Failed to add peer in attempt %d, retrying: %v", attempt, err)
continue continue
} }
@@ -681,7 +700,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer) return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
} }
func getPeerIPDNSLabel(ctx context.Context, tx store.Store, ip net.IP, accountID, peerHostName string) (string, error) { func getPeerIPDNSLabel(ip net.IP, peerHostName string) (string, error) {
ip = ip.To4() ip = ip.To4()
dnsName, err := nbdns.GetParsedDomainLabel(peerHostName) dnsName, err := nbdns.GetParsedDomainLabel(peerHostName)
@@ -689,12 +708,6 @@ func getPeerIPDNSLabel(ctx context.Context, tx store.Store, ip net.IP, accountID
return "", fmt.Errorf("failed to parse peer host name %s: %w", peerHostName, err) return "", fmt.Errorf("failed to parse peer host name %s: %w", peerHostName, err)
} }
_, err = tx.GetPeerIdByLabel(ctx, store.LockingStrengthNone, accountID, dnsName)
if err != nil {
//nolint:nilerr
return dnsName, nil
}
return fmt.Sprintf("%s-%d-%d", dnsName, ip[2], ip[3]), nil return fmt.Sprintf("%s-%d-%d", dnsName, ip[2], ip[3]), nil
} }
@@ -1175,7 +1188,26 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return return
} }
globalStart := time.Now() if am.metrics != nil {
globalStart := time.Now()
defer func() {
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
}()
}
peersToUpdate := []*nbpeer.Peer{}
for _, peer := range account.Peers {
if !am.peersUpdateManager.HasChannel(peer.ID) {
log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID)
continue
}
peersToUpdate = append(peersToUpdate, peer)
}
if len(peersToUpdate) == 0 {
return
}
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra) approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
if err != nil { if err != nil {
@@ -1198,12 +1230,13 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return return
} }
for _, peer := range account.Peers { extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
if !am.peersUpdateManager.HasChannel(peer.ID) { if err != nil {
log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID) log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
continue return
} }
for _, peer := range peersToUpdate {
wg.Add(1) wg.Add(1)
semaphore <- struct{}{} semaphore <- struct{}{}
go func(p *nbpeer.Peer) { go func(p *nbpeer.Peer) {
@@ -1232,12 +1265,6 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
} }
am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start)) am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start))
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
if err != nil {
log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
return
}
start = time.Now() start = time.Now()
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting) update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting)
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start)) am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
@@ -1246,12 +1273,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
}(peer) }(peer)
} }
//
wg.Wait() wg.Wait()
if am.metrics != nil {
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
}
} }
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) { func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {

View File

@@ -1344,6 +1344,11 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
settingsMockManager.
EXPECT().
GetExtraSettings(gomock.Any(), gomock.Any()).
Return(&types.ExtraSettings{}, nil).
AnyTimes()
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
@@ -1556,6 +1561,11 @@ func Test_LoginPeer(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
settingsMockManager.
EXPECT().
GetExtraSettings(gomock.Any(), gomock.Any()).
Return(&types.ExtraSettings{}, nil).
AnyTimes()
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)