mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-20 09:16:40 +00:00
Compare commits
19 Commits
handle-exi
...
fix/merge-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
87416ce185 | ||
|
|
9bb8134ef3 | ||
|
|
67a4989cbb | ||
|
|
2e18d77d40 | ||
|
|
3a8a6fcb76 | ||
|
|
c57a8fdd68 | ||
|
|
49f083a372 | ||
|
|
7d9ca73f6c | ||
|
|
470b80c1b8 | ||
|
|
ad0b78a7ac | ||
|
|
7aa2ca87f2 | ||
|
|
4c58088311 | ||
|
|
bcccd65008 | ||
|
|
1ffc8933de | ||
|
|
ad22e9eea1 | ||
|
|
d806fc4a03 | ||
|
|
7a5edb3894 | ||
|
|
2dc230ab9a | ||
|
|
432dc42bf5 |
@@ -102,6 +102,11 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
|
|||||||
GetSettings(gomock.Any(), gomock.Any(), gomock.Any()).
|
GetSettings(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||||
Return(&types.Settings{}, nil).
|
Return(&types.Settings{}, nil).
|
||||||
AnyTimes()
|
AnyTimes()
|
||||||
|
settingsMockManager.
|
||||||
|
EXPECT().
|
||||||
|
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||||
|
Return(&types.ExtraSettings{}, nil).
|
||||||
|
AnyTimes()
|
||||||
|
|
||||||
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -105,6 +105,7 @@ require (
|
|||||||
golang.org/x/oauth2 v0.24.0
|
golang.org/x/oauth2 v0.24.0
|
||||||
golang.org/x/sync v0.13.0
|
golang.org/x/sync v0.13.0
|
||||||
golang.org/x/term v0.31.0
|
golang.org/x/term v0.31.0
|
||||||
|
golang.org/x/time v0.5.0
|
||||||
google.golang.org/api v0.177.0
|
google.golang.org/api v0.177.0
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
gorm.io/driver/mysql v1.5.7
|
gorm.io/driver/mysql v1.5.7
|
||||||
@@ -240,7 +241,6 @@ require (
|
|||||||
golang.org/x/image v0.18.0 // indirect
|
golang.org/x/image v0.18.0 // indirect
|
||||||
golang.org/x/mod v0.17.0 // indirect
|
golang.org/x/mod v0.17.0 // indirect
|
||||||
golang.org/x/text v0.24.0 // indirect
|
golang.org/x/text v0.24.0 // indirect
|
||||||
golang.org/x/time v0.5.0 // indirect
|
|
||||||
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
|
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
|
||||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect
|
||||||
|
|||||||
@@ -87,6 +87,12 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
|||||||
).
|
).
|
||||||
Return(&types.Settings{}, nil).
|
Return(&types.Settings{}, nil).
|
||||||
AnyTimes()
|
AnyTimes()
|
||||||
|
settingsMockManager.
|
||||||
|
EXPECT().
|
||||||
|
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||||
|
Return(&types.ExtraSettings{}, nil).
|
||||||
|
AnyTimes()
|
||||||
|
|
||||||
permissionsManagerMock := permissions.NewMockManager(ctrl)
|
permissionsManagerMock := permissions.NewMockManager(ctrl)
|
||||||
permissionsManagerMock.
|
permissionsManagerMock.
|
||||||
EXPECT().
|
EXPECT().
|
||||||
|
|||||||
@@ -112,6 +112,7 @@ type Manager interface {
|
|||||||
GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error)
|
GetAccountSettings(ctx context.Context, accountID string, userID string) (*types.Settings, error)
|
||||||
DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error
|
DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error
|
||||||
UpdateAccountPeers(ctx context.Context, accountID string)
|
UpdateAccountPeers(ctx context.Context, accountID string)
|
||||||
|
BufferUpdateAccountPeers(ctx context.Context, accountID string)
|
||||||
BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error)
|
BuildUserInfosForAccount(ctx context.Context, accountID, initiatorUserID string, accountUsers []*types.User) (map[string]*types.UserInfo, error)
|
||||||
SyncUserJWTGroups(ctx context.Context, userAuth nbcontext.UserAuth) error
|
SyncUserJWTGroups(ctx context.Context, userAuth nbcontext.UserAuth) error
|
||||||
GetStore() store.Store
|
GetStore() store.Store
|
||||||
|
|||||||
@@ -216,6 +216,8 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
|
|||||||
t.Cleanup(ctrl.Finish)
|
t.Cleanup(ctrl.Finish)
|
||||||
|
|
||||||
settingsMockManager := settings.NewMockManager(ctrl)
|
settingsMockManager := settings.NewMockManager(ctrl)
|
||||||
|
// return empty extra settings for expected calls to UpdateAccountPeers
|
||||||
|
settingsMockManager.EXPECT().GetExtraSettings(gomock.Any(), gomock.Any()).Return(&types.ExtraSettings{}, nil).AnyTimes()
|
||||||
permissionsManager := permissions.NewManager(store)
|
permissionsManager := permissions.NewManager(store)
|
||||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,10 +5,12 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
nbAccount "github.com/netbirdio/netbird/management/server/account"
|
nbAccount "github.com/netbirdio/netbird/management/server/account"
|
||||||
"github.com/netbirdio/netbird/management/server/activity"
|
"github.com/netbirdio/netbird/management/server/activity"
|
||||||
|
nbContext "github.com/netbirdio/netbird/management/server/context"
|
||||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||||
"github.com/netbirdio/netbird/management/server/store"
|
"github.com/netbirdio/netbird/management/server/store"
|
||||||
)
|
)
|
||||||
@@ -138,6 +140,9 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
|
|||||||
|
|
||||||
func (e *EphemeralManager) cleanup(ctx context.Context) {
|
func (e *EphemeralManager) cleanup(ctx context.Context) {
|
||||||
log.Tracef("on ephemeral cleanup")
|
log.Tracef("on ephemeral cleanup")
|
||||||
|
reqID := uuid.New().String()
|
||||||
|
//nolint
|
||||||
|
ctx = context.WithValue(ctx, nbContext.RequestIDKey, reqID)
|
||||||
deletePeers := make(map[string]*ephemeralPeer)
|
deletePeers := make(map[string]*ephemeralPeer)
|
||||||
|
|
||||||
e.peersLock.Lock()
|
e.peersLock.Lock()
|
||||||
@@ -164,13 +169,21 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
|
|||||||
|
|
||||||
e.peersLock.Unlock()
|
e.peersLock.Unlock()
|
||||||
|
|
||||||
|
bufferAccountCall := make(map[string]struct{})
|
||||||
|
|
||||||
for id, p := range deletePeers {
|
for id, p := range deletePeers {
|
||||||
log.WithContext(ctx).Debugf("delete ephemeral peer: %s", id)
|
log.WithContext(ctx).Debugf("delete ephemeral peer: %s", id)
|
||||||
err := e.accountManager.DeletePeer(ctx, p.accountID, id, activity.SystemInitiator)
|
err := e.accountManager.DeletePeer(ctx, p.accountID, id, activity.SystemInitiator)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Errorf("failed to delete ephemeral peer: %s", err)
|
log.WithContext(ctx).Errorf("failed to delete ephemeral peer: %s", err)
|
||||||
|
} else {
|
||||||
|
bufferAccountCall[p.accountID] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
for accountID := range bufferAccountCall {
|
||||||
|
log.WithContext(ctx).Debugf("ephemeral - buffer update account peers for account: %s", accountID)
|
||||||
|
e.accountManager.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) {
|
func (e *EphemeralManager) addPeer(accountID string, peerID string, deadline time.Time) {
|
||||||
|
|||||||
@@ -37,6 +37,10 @@ func (a MocAccountManager) DeletePeer(_ context.Context, accountID, peerID, user
|
|||||||
return nil //nolint:nil
|
return nil //nolint:nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a MocAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||||
|
// noop
|
||||||
|
}
|
||||||
|
|
||||||
func (a MocAccountManager) GetStore() store.Store {
|
func (a MocAccountManager) GetStore() store.Store {
|
||||||
return a.store
|
return a.store
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -13,6 +15,7 @@ import (
|
|||||||
"github.com/golang/protobuf/ptypes/timestamp"
|
"github.com/golang/protobuf/ptypes/timestamp"
|
||||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
@@ -47,6 +50,10 @@ type GRPCServer struct {
|
|||||||
ephemeralManager *EphemeralManager
|
ephemeralManager *EphemeralManager
|
||||||
peerLocks sync.Map
|
peerLocks sync.Map
|
||||||
authManager auth.Manager
|
authManager auth.Manager
|
||||||
|
syncLimiter *rate.Limiter
|
||||||
|
loginLimiterStore sync.Map
|
||||||
|
loginPeerBooster int
|
||||||
|
loginPeerLimit rate.Limit
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new Management server
|
// NewServer creates a new Management server
|
||||||
@@ -76,6 +83,41 @@ func NewServer(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
multiplier := time.Minute
|
||||||
|
d, e := time.ParseDuration(os.Getenv("NB_LOGIN_RATE"))
|
||||||
|
if e == nil {
|
||||||
|
multiplier = d
|
||||||
|
}
|
||||||
|
|
||||||
|
loginRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_RATE_PER_M"))
|
||||||
|
if loginRatePerS == 0 || err != nil {
|
||||||
|
loginRatePerS = 200
|
||||||
|
}
|
||||||
|
|
||||||
|
loginBurst, err := strconv.Atoi(os.Getenv("NB_LOGIN_BURST"))
|
||||||
|
if loginBurst == 0 || err != nil {
|
||||||
|
loginBurst = 200
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("login burst limit set to %d", loginBurst)
|
||||||
|
|
||||||
|
loginPeerRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_PEER_RATE_PER_M"))
|
||||||
|
if loginPeerRatePerS == 0 || err != nil {
|
||||||
|
loginPeerRatePerS = 200
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("login rate limit set to %d/min", loginRatePerS)
|
||||||
|
|
||||||
|
syncRatePerS, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
|
||||||
|
if syncRatePerS == 0 || err != nil {
|
||||||
|
syncRatePerS = 20000
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncRatePerS)
|
||||||
|
|
||||||
|
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
|
||||||
|
if syncBurst == 0 || err != nil {
|
||||||
|
syncBurst = 30000
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
|
||||||
|
|
||||||
return &GRPCServer{
|
return &GRPCServer{
|
||||||
wgKey: key,
|
wgKey: key,
|
||||||
// peerKey -> event channel
|
// peerKey -> event channel
|
||||||
@@ -87,6 +129,9 @@ func NewServer(
|
|||||||
authManager: authManager,
|
authManager: authManager,
|
||||||
appMetrics: appMetrics,
|
appMetrics: appMetrics,
|
||||||
ephemeralManager: ephemeralManager,
|
ephemeralManager: ephemeralManager,
|
||||||
|
syncLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(syncRatePerS)), syncBurst),
|
||||||
|
loginPeerLimit: rate.Every(multiplier / time.Duration(loginPeerRatePerS)),
|
||||||
|
loginPeerBooster: loginBurst,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,11 +173,17 @@ func getRealIP(ctx context.Context) net.IP {
|
|||||||
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
||||||
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
||||||
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
||||||
reqStart := time.Now()
|
|
||||||
if s.appMetrics != nil {
|
if s.appMetrics != nil {
|
||||||
s.appMetrics.GRPCMetrics().CountSyncRequest()
|
s.appMetrics.GRPCMetrics().CountSyncRequest()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !s.syncLimiter.Allow() {
|
||||||
|
log.Warnf("sync rate limit exceeded for peer %s", req.WgPubKey)
|
||||||
|
return status.Errorf(codes.Internal, "temp rate limit reached")
|
||||||
|
}
|
||||||
|
|
||||||
|
reqStart := time.Now()
|
||||||
|
|
||||||
ctx := srv.Context()
|
ctx := srv.Context()
|
||||||
|
|
||||||
syncReq := &proto.SyncRequest{}
|
syncReq := &proto.SyncRequest{}
|
||||||
@@ -428,15 +479,39 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
|
|||||||
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
|
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
|
||||||
// In case of the successful registration login is also successful
|
// In case of the successful registration login is also successful
|
||||||
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
||||||
|
if s.appMetrics != nil {
|
||||||
|
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||||
|
}
|
||||||
|
|
||||||
|
limiterIface, ok := s.loginLimiterStore.Load(req.WgPubKey)
|
||||||
|
if !ok {
|
||||||
|
// Create new limiter for this peer
|
||||||
|
newLimiter := rate.NewLimiter(s.loginPeerLimit, s.loginPeerBooster)
|
||||||
|
s.loginLimiterStore.Store(req.WgPubKey, newLimiter)
|
||||||
|
|
||||||
|
if !newLimiter.Allow() {
|
||||||
|
//time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||||
|
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||||
|
return nil, fmt.Errorf("temp rate limit reached (new peer limit)")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Use existing limiter for this peer
|
||||||
|
limiter := limiterIface.(*rate.Limiter)
|
||||||
|
if !limiter.Allow() {
|
||||||
|
//time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||||
|
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||||
|
return nil, fmt.Errorf("temp rate limit reached (peer limit)")
|
||||||
|
}
|
||||||
|
}
|
||||||
reqStart := time.Now()
|
reqStart := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
if s.appMetrics != nil {
|
if s.appMetrics != nil {
|
||||||
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
|
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if s.appMetrics != nil {
|
//if s.appMetrics != nil {
|
||||||
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
// s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||||
}
|
//}
|
||||||
realIP := getRealIP(ctx)
|
realIP := getRealIP(ctx)
|
||||||
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())
|
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())
|
||||||
|
|
||||||
|
|||||||
@@ -440,7 +440,11 @@ func startManagementForTest(t *testing.T, testFile string, config *types.Config)
|
|||||||
GetSettings(gomock.Any(), gomock.Any(), gomock.Any()).
|
GetSettings(gomock.Any(), gomock.Any(), gomock.Any()).
|
||||||
AnyTimes().
|
AnyTimes().
|
||||||
Return(&types.Settings{}, nil)
|
Return(&types.Settings{}, nil)
|
||||||
|
settingsMockManager.
|
||||||
|
EXPECT().
|
||||||
|
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||||
|
Return(&types.ExtraSettings{}, nil).
|
||||||
|
AnyTimes()
|
||||||
permissionsManager := permissions.NewManager(store)
|
permissionsManager := permissions.NewManager(store)
|
||||||
|
|
||||||
accountManager, err := BuildManager(ctx, store, peersUpdateManager, nil, "", "netbird.selfhosted",
|
accountManager, err := BuildManager(ctx, store, peersUpdateManager, nil, "", "netbird.selfhosted",
|
||||||
|
|||||||
@@ -126,6 +126,10 @@ func (am *MockAccountManager) UpdateAccountPeers(ctx context.Context, accountID
|
|||||||
// do nothing
|
// do nothing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||||
|
// do nothing
|
||||||
|
}
|
||||||
|
|
||||||
func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error {
|
func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, userID, keyID string) error {
|
||||||
if am.DeleteSetupKeyFunc != nil {
|
if am.DeleteSetupKeyFunc != nil {
|
||||||
return am.DeleteSetupKeyFunc(ctx, accountID, userID, keyID)
|
return am.DeleteSetupKeyFunc(ctx, accountID, userID, keyID)
|
||||||
|
|||||||
@@ -778,6 +778,12 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
|
|||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
t.Cleanup(ctrl.Finish)
|
t.Cleanup(ctrl.Finish)
|
||||||
settingsMockManager := settings.NewMockManager(ctrl)
|
settingsMockManager := settings.NewMockManager(ctrl)
|
||||||
|
settingsMockManager.
|
||||||
|
EXPECT().
|
||||||
|
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||||
|
Return(&types.ExtraSettings{}, nil).
|
||||||
|
AnyTimes()
|
||||||
|
|
||||||
permissionsManager := permissions.NewManager(store)
|
permissionsManager := permissions.NewManager(store)
|
||||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -236,11 +236,23 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
|
|||||||
|
|
||||||
if peer.Name != update.Name {
|
if peer.Name != update.Name {
|
||||||
var newLabel string
|
var newLabel string
|
||||||
newLabel, err = getPeerIPDNSLabel(ctx, transaction, peer.IP, accountID, update.Name)
|
|
||||||
|
newLabel, err = nbdns.GetParsedDomainLabel(update.Name)
|
||||||
|
if err != nil {
|
||||||
|
newLabel = ""
|
||||||
|
} else {
|
||||||
|
_, err := transaction.GetPeerIdByLabel(ctx, store.LockingStrengthNone, accountID, update.Name)
|
||||||
|
if err == nil {
|
||||||
|
newLabel = ""
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if newLabel == "" {
|
||||||
|
newLabel, err = getPeerIPDNSLabel(peer.IP, update.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get free DNS label: %w", err)
|
return fmt.Errorf("failed to get free DNS label: %w", err)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
peer.Name = update.Name
|
peer.Name = update.Name
|
||||||
peer.DNSLabel = newLabel
|
peer.DNSLabel = newLabel
|
||||||
peerLabelChanged = true
|
peerLabelChanged = true
|
||||||
@@ -375,7 +387,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
|
|||||||
storeEvent()
|
storeEvent()
|
||||||
}
|
}
|
||||||
|
|
||||||
if updateAccountPeers {
|
if updateAccountPeers && userID != activity.SystemInitiator {
|
||||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -472,6 +484,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
var groupsToAdd []string
|
var groupsToAdd []string
|
||||||
var allowExtraDNSLabels bool
|
var allowExtraDNSLabels bool
|
||||||
var accountID string
|
var accountID string
|
||||||
|
var isEphemeral bool
|
||||||
if addedByUser {
|
if addedByUser {
|
||||||
user, err := am.Store.GetUserByUserID(ctx, store.LockingStrengthNone, userID)
|
user, err := am.Store.GetUserByUserID(ctx, store.LockingStrengthNone, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -501,7 +514,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
setupKeyName = sk.Name
|
setupKeyName = sk.Name
|
||||||
allowExtraDNSLabels = sk.AllowExtraDNSLabels
|
allowExtraDNSLabels = sk.AllowExtraDNSLabels
|
||||||
accountID = sk.AccountID
|
accountID = sk.AccountID
|
||||||
|
isEphemeral = sk.Ephemeral
|
||||||
if !sk.AllowExtraDNSLabels && len(peer.ExtraDNSLabels) > 0 {
|
if !sk.AllowExtraDNSLabels && len(peer.ExtraDNSLabels) > 0 {
|
||||||
return nil, nil, nil, status.Errorf(status.PreconditionFailed, "couldn't add peer: setup key doesn't allow extra DNS labels")
|
return nil, nil, nil, status.Errorf(status.PreconditionFailed, "couldn't add peer: setup key doesn't allow extra DNS labels")
|
||||||
}
|
}
|
||||||
@@ -573,11 +586,17 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
}
|
}
|
||||||
|
|
||||||
var freeLabel string
|
var freeLabel string
|
||||||
freeLabel, err = getPeerIPDNSLabel(ctx, am.Store, freeIP, accountID, peer.Meta.Hostname)
|
if isEphemeral || attempt > 1 {
|
||||||
|
freeLabel, err = getPeerIPDNSLabel(freeIP, peer.Meta.Hostname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, fmt.Errorf("failed to get free DNS label: %w", err)
|
return nil, nil, nil, fmt.Errorf("failed to get free DNS label: %w", err)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
freeLabel, err = nbdns.GetParsedDomainLabel(peer.Meta.Hostname)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, nil, fmt.Errorf("failed to get free DNS label: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
newPeer.DNSLabel = freeLabel
|
newPeer.DNSLabel = freeLabel
|
||||||
newPeer.IP = freeIP
|
newPeer.IP = freeIP
|
||||||
|
|
||||||
@@ -647,7 +666,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
if isUniqueConstraintError(err) {
|
if isUniqueConstraintError(err) {
|
||||||
unlock()
|
unlock()
|
||||||
unlock = nil
|
unlock = nil
|
||||||
log.WithContext(ctx).Debugf("Failed to add peer in attempt %d, retrying: %v", attempt, err)
|
log.WithContext(ctx).WithFields(log.Fields{"dns_label": freeLabel, "ip": freeIP}).Tracef("Failed to add peer in attempt %d, retrying: %v", attempt, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -681,7 +700,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
|
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPeerIPDNSLabel(ctx context.Context, tx store.Store, ip net.IP, accountID, peerHostName string) (string, error) {
|
func getPeerIPDNSLabel(ip net.IP, peerHostName string) (string, error) {
|
||||||
ip = ip.To4()
|
ip = ip.To4()
|
||||||
|
|
||||||
dnsName, err := nbdns.GetParsedDomainLabel(peerHostName)
|
dnsName, err := nbdns.GetParsedDomainLabel(peerHostName)
|
||||||
@@ -689,12 +708,6 @@ func getPeerIPDNSLabel(ctx context.Context, tx store.Store, ip net.IP, accountID
|
|||||||
return "", fmt.Errorf("failed to parse peer host name %s: %w", peerHostName, err)
|
return "", fmt.Errorf("failed to parse peer host name %s: %w", peerHostName, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = tx.GetPeerIdByLabel(ctx, store.LockingStrengthNone, accountID, dnsName)
|
|
||||||
if err != nil {
|
|
||||||
//nolint:nilerr
|
|
||||||
return dnsName, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
return fmt.Sprintf("%s-%d-%d", dnsName, ip[2], ip[3]), nil
|
return fmt.Sprintf("%s-%d-%d", dnsName, ip[2], ip[3]), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1175,7 +1188,26 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if am.metrics != nil {
|
||||||
globalStart := time.Now()
|
globalStart := time.Now()
|
||||||
|
defer func() {
|
||||||
|
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
peersToUpdate := []*nbpeer.Peer{}
|
||||||
|
for _, peer := range account.Peers {
|
||||||
|
if !am.peersUpdateManager.HasChannel(peer.ID) {
|
||||||
|
log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
peersToUpdate = append(peersToUpdate, peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(peersToUpdate) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1198,12 +1230,13 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, peer := range account.Peers {
|
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
|
||||||
if !am.peersUpdateManager.HasChannel(peer.ID) {
|
if err != nil {
|
||||||
log.WithContext(ctx).Tracef("peer %s doesn't have a channel, skipping network map update", peer.ID)
|
log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
|
||||||
continue
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, peer := range peersToUpdate {
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
semaphore <- struct{}{}
|
semaphore <- struct{}{}
|
||||||
go func(p *nbpeer.Peer) {
|
go func(p *nbpeer.Peer) {
|
||||||
@@ -1232,12 +1265,6 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
}
|
}
|
||||||
am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start))
|
am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start))
|
||||||
|
|
||||||
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
|
|
||||||
if err != nil {
|
|
||||||
log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
start = time.Now()
|
start = time.Now()
|
||||||
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting)
|
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting)
|
||||||
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
|
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
|
||||||
@@ -1246,12 +1273,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
}(peer)
|
}(peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
if am.metrics != nil {
|
|
||||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||||
|
|||||||
@@ -1344,6 +1344,11 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
|
|||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
t.Cleanup(ctrl.Finish)
|
t.Cleanup(ctrl.Finish)
|
||||||
settingsMockManager := settings.NewMockManager(ctrl)
|
settingsMockManager := settings.NewMockManager(ctrl)
|
||||||
|
settingsMockManager.
|
||||||
|
EXPECT().
|
||||||
|
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||||
|
Return(&types.ExtraSettings{}, nil).
|
||||||
|
AnyTimes()
|
||||||
permissionsManager := permissions.NewManager(s)
|
permissionsManager := permissions.NewManager(s)
|
||||||
|
|
||||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
@@ -1556,6 +1561,11 @@ func Test_LoginPeer(t *testing.T) {
|
|||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
t.Cleanup(ctrl.Finish)
|
t.Cleanup(ctrl.Finish)
|
||||||
settingsMockManager := settings.NewMockManager(ctrl)
|
settingsMockManager := settings.NewMockManager(ctrl)
|
||||||
|
settingsMockManager.
|
||||||
|
EXPECT().
|
||||||
|
GetExtraSettings(gomock.Any(), gomock.Any()).
|
||||||
|
Return(&types.ExtraSettings{}, nil).
|
||||||
|
AnyTimes()
|
||||||
permissionsManager := permissions.NewManager(s)
|
permissionsManager := permissions.NewManager(s)
|
||||||
|
|
||||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
|
|||||||
Reference in New Issue
Block a user