Compare commits

...

38 Commits

Author SHA1 Message Date
Maycon Santos
9b2351193c sync mock 2025-05-09 08:26:00 +02:00
Maycon Santos
63fd508556 print map stats for selected peers 2025-05-09 06:52:20 +02:00
Maycon Santos
760d61c7a3 enable pprof 2025-05-09 06:05:04 +02:00
Pascal Fischer
93a0315120 update logs and metrics 2025-05-08 20:55:06 +02:00
Pascal Fischer
676f201c83 add more logs 2025-05-08 20:08:42 +02:00
Pascal Fischer
b7173ab956 reuse account peers map 2025-05-08 19:55:00 +02:00
Pascal Fischer
26b418c42f move sync counter metrics 2025-05-08 16:32:33 +02:00
Pascal Fischer
0267cd1ddd move login counter metrics 2025-05-08 16:30:52 +02:00
Pascal Fischer
6b86350b9d configurable burst 2025-05-08 14:57:21 +02:00
Pascal Fischer
102384bfbb configurable burst 2025-05-08 14:53:41 +02:00
Pascal Fischer
0735340a0b add burst 2025-05-08 14:43:11 +02:00
Pascal Fischer
51e4b9aba6 remove write lock 2025-05-08 14:19:47 +02:00
Pascal Fischer
62f9c8ace9 add riming logs to addPeer 2025-05-08 14:11:44 +02:00
Pascal Fischer
c57869aa78 use proxyController AllProxyMaps 2025-05-08 13:57:26 +02:00
Pascal Fischer
abf6a1e08e add timing logs 2025-05-08 13:26:01 +02:00
Pascal Fischer
673f441d6e export accountUpdateBuffer 2025-05-08 12:52:57 +02:00
Maycon Santos
1a12100790 backoff on sync and reduce starting multiplier 2025-05-08 11:30:41 +02:00
Maycon Santos
3e963ffeba sleep on global limiter 2025-05-08 10:45:56 +02:00
Maycon Santos
86fa1eaa16 check global limiter before create peer limiter 2025-05-08 10:43:51 +02:00
Maycon Santos
1046342e2c add sleep on login and log 2025-05-08 10:12:58 +02:00
Maycon Santos
89729d85df rate limit per ip on API 2025-05-08 09:46:22 +02:00
Maycon Santos
2c5dff2f89 use peer id in controller call 2025-05-08 09:10:40 +02:00
Maycon Santos
779643463d fix log ids 2025-05-08 03:20:48 +02:00
Maycon Santos
22ac5ea0e8 add some logs 2025-05-08 02:48:29 +02:00
Maycon Santos
cf60191bb5 allow defining rating dimension 2025-05-08 01:57:23 +02:00
Maycon Santos
8bfab0d6dd add peer key rate limit 2025-05-08 00:17:10 +02:00
Maycon Santos
921b5606ce add too many requests status 2025-05-08 00:01:27 +02:00
Maycon Santos
84126f9425 add too many requests status 2025-05-08 00:01:20 +02:00
Maycon Santos
489f13031b add limiter to the get all peers 2025-05-07 23:21:00 +02:00
Pascal Fischer
c5b065aec1 remove like on name filter 2025-05-07 21:54:53 +02:00
Pascal Fischer
b09bc6534c add index on peer name 2025-05-07 21:52:22 +02:00
Pascal Fischer
34f1a366b3 limiter on api 2025-05-07 21:51:17 +02:00
Pascal Fischer
483edfcdc6 add log info about rate 2025-05-07 21:14:29 +02:00
Pascal Fischer
ef2eace033 configurable rate limit 2025-05-07 20:52:20 +02:00
Pascal Fischer
1bddfa5b7b configurable rate limit 2025-05-07 20:50:29 +02:00
Pascal Fischer
6ea7c665dc update error message 2025-05-07 20:36:53 +02:00
Pascal Fischer
4a3c782a31 add rate limiter to login and sync on grpc 2025-05-07 20:35:41 +02:00
Pascal Fischer
9359fea507 add rate limiter 2025-05-07 20:22:08 +02:00
14 changed files with 266 additions and 30 deletions

2
go.mod
View File

@@ -106,6 +106,7 @@ require (
golang.org/x/oauth2 v0.24.0 golang.org/x/oauth2 v0.24.0
golang.org/x/sync v0.13.0 golang.org/x/sync v0.13.0
golang.org/x/term v0.31.0 golang.org/x/term v0.31.0
golang.org/x/time v0.5.0
google.golang.org/api v0.177.0 google.golang.org/api v0.177.0
gopkg.in/yaml.v3 v3.0.1 gopkg.in/yaml.v3 v3.0.1
gorm.io/driver/mysql v1.5.7 gorm.io/driver/mysql v1.5.7
@@ -240,7 +241,6 @@ require (
golang.org/x/image v0.18.0 // indirect golang.org/x/image v0.18.0 // indirect
golang.org/x/mod v0.17.0 // indirect golang.org/x/mod v0.17.0 // indirect
golang.org/x/text v0.24.0 // indirect golang.org/x/text v0.24.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect

View File

@@ -134,6 +134,7 @@ var (
}, },
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
flag.Parse() flag.Parse()
startPprof()
ctx, cancel := context.WithCancel(cmd.Context()) ctx, cancel := context.WithCancel(cmd.Context())
defer cancel() defer cancel()

View File

@@ -2,9 +2,13 @@ package cmd
import ( import (
"fmt" "fmt"
"net/http"
_ "net/http/pprof"
"os" "os"
"os/signal" "os/signal"
"runtime"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/types"
@@ -17,6 +21,17 @@ const (
idpSignKeyRefreshEnabledFlagName = "idp-sign-key-refresh-enabled" idpSignKeyRefreshEnabledFlagName = "idp-sign-key-refresh-enabled"
) )
func startPprof() {
go func() {
runtime.SetBlockProfileRate(1)
runtime.SetMutexProfileFraction(1)
log.Debugf("Starting pprof server on 0.0.0.0:6060")
if err := http.ListenAndServe("0.0.0.0:6060", nil); err != nil {
log.Fatalf("pprof server failed: %v", err)
}
}()
}
var ( var (
dnsDomain string dnsDomain string
mgmtDataDir string mgmtDataDir string

View File

@@ -202,7 +202,7 @@ func BuildManager(
if err != nil { if err != nil {
initialInterval = 1 initialInterval = 1
} else { } else {
initialInterval = int64(interval) * 10 initialInterval = int64(interval) * 2
go func() { go func() {
time.Sleep(30 * time.Second) time.Sleep(30 * time.Second)
am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond)) am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond))

View File

@@ -117,4 +117,5 @@ type Manager interface {
UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error)
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error) GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error) GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
BufferUpdateAccountPeers(ctx context.Context, accountID string)
} }

View File

@@ -3,8 +3,11 @@ package server
import ( import (
"context" "context"
"fmt" "fmt"
"math/rand/v2"
"net" "net"
"net/netip" "net/netip"
"os"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -13,6 +16,7 @@ import (
"github.com/golang/protobuf/ptypes/timestamp" "github.com/golang/protobuf/ptypes/timestamp"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"
@@ -47,6 +51,11 @@ type GRPCServer struct {
ephemeralManager *EphemeralManager ephemeralManager *EphemeralManager
peerLocks sync.Map peerLocks sync.Map
authManager auth.Manager authManager auth.Manager
syncLimiter *rate.Limiter
loginLimiter *rate.Limiter
loginLimiterStore sync.Map
loginPeerLimit rate.Limit
} }
// NewServer creates a new Management server // NewServer creates a new Management server
@@ -76,6 +85,41 @@ func NewServer(
} }
} }
multiplier := time.Second
d, e := time.ParseDuration(os.Getenv("NB_LOGIN_RATE"))
if e == nil {
multiplier = d
}
loginRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_RATE_PER_M"))
if loginRatePerS == 0 || err != nil {
loginRatePerS = 200
}
loginBurst, err := strconv.Atoi(os.Getenv("NB_LOGIN_BURST"))
if loginBurst == 0 || err != nil {
loginBurst = 200
}
log.WithContext(ctx).Infof("login burst limit set to %d", loginBurst)
loginPeerRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_PEER_RATE_PER_M"))
if loginPeerRatePerS == 0 || err != nil {
loginPeerRatePerS = 200
}
log.WithContext(ctx).Infof("login rate limit set to %d/min", loginRatePerS)
syncRatePerS, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
if syncRatePerS == 0 || err != nil {
syncRatePerS = 200
}
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncRatePerS)
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
if syncBurst == 0 || err != nil {
syncBurst = 200
}
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
return &GRPCServer{ return &GRPCServer{
wgKey: key, wgKey: key,
// peerKey -> event channel // peerKey -> event channel
@@ -87,6 +131,9 @@ func NewServer(
authManager: authManager, authManager: authManager,
appMetrics: appMetrics, appMetrics: appMetrics,
ephemeralManager: ephemeralManager, ephemeralManager: ephemeralManager,
syncLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(syncRatePerS)), syncBurst),
loginLimiter: rate.NewLimiter(rate.Every(multiplier/time.Duration(loginRatePerS)), loginBurst),
loginPeerLimit: rate.Every(time.Minute / time.Duration(loginPeerRatePerS)),
}, nil }, nil
} }
@@ -128,11 +175,18 @@ func getRealIP(ctx context.Context) net.IP {
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and // Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
// notifies the connected peer of any updates (e.g. new peers under the same account) // notifies the connected peer of any updates (e.g. new peers under the same account)
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error { func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
reqStart := time.Now()
if s.appMetrics != nil { if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequest() s.appMetrics.GRPCMetrics().CountSyncRequest()
} }
if !s.syncLimiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.Warnf("sync rate limit exceeded for peer %s", req.WgPubKey)
return status.Errorf(codes.Internal, "temp rate limit reached")
}
reqStart := time.Now()
ctx := srv.Context() ctx := srv.Context()
syncReq := &proto.SyncRequest{} syncReq := &proto.SyncRequest{}
@@ -416,15 +470,58 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer. // In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
// In case of the successful registration login is also successful // In case of the successful registration login is also successful
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) { func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
limiterIface, ok := s.loginLimiterStore.Load(req.WgPubKey)
if !ok {
// Check global limiter before allowing a new peer limiter
if !s.loginLimiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (global limit)")
}
// Create new limiter for this peer
newLimiter := rate.NewLimiter(s.loginPeerLimit, 1000)
s.loginLimiterStore.Store(req.WgPubKey, newLimiter)
if !newLimiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (new peer limit)")
}
} else {
// Use existing limiter for this peer
limiter := limiterIface.(*rate.Limiter)
if !limiter.Allow() {
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
return nil, fmt.Errorf("temp rate limit reached (peer limit)")
}
}
// limiter, _ := s.loginLimiterStore.LoadOrStore(req.WgPubKey, rate.NewLimiter(s.loginPeerLimit, 1))
// if !limiter.(*rate.Limiter).Allow() {
// time.Sleep(time.Millisecond * time.Duration(rand.IntN(10)*100))
// log.WithContext(ctx).Warnf("rate limit exceeded for %s", req.WgPubKey)
// return nil, status.Errorf(codes.Internal, "temp rate limit reached")
// }
//
// if os.Getenv("ENABLE_LOGIN_RATE_LIMIT") == "true" {
// if !s.loginLimiter.Allow() {
// return nil, status.Errorf(codes.Internal, "temp rate limit reached")
// }
// }
reqStart := time.Now() reqStart := time.Now()
defer func() { defer func() {
if s.appMetrics != nil { if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart)) s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
} }
}() }()
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
realIP := getRealIP(ctx) realIP := getRealIP(ctx)
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String()) log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())

View File

@@ -4,10 +4,17 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net"
"net/http" "net/http"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/gorilla/mux" "github.com/gorilla/mux"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.org/x/time/rate"
"github.com/netbirdio/netbird/management/server/account" "github.com/netbirdio/netbird/management/server/account"
"github.com/netbirdio/netbird/management/server/activity" "github.com/netbirdio/netbird/management/server/activity"
@@ -23,6 +30,9 @@ import (
// Handler is a handler that returns peers of the account // Handler is a handler that returns peers of the account
type Handler struct { type Handler struct {
accountManager account.Manager accountManager account.Manager
rateLimiter *rate.Limiter
limiterStore sync.Map
reqLimit rate.Limit
} }
func AddEndpoints(accountManager account.Manager, router *mux.Router) { func AddEndpoints(accountManager account.Manager, router *mux.Router) {
@@ -35,8 +45,15 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) {
// NewHandler creates a new peers Handler // NewHandler creates a new peers Handler
func NewHandler(accountManager account.Manager) *Handler { func NewHandler(accountManager account.Manager) *Handler {
apiRatePerM, err := strconv.Atoi(os.Getenv("NB_API_RATE_PER_M"))
if apiRatePerM == 0 || err != nil {
apiRatePerM = 60
}
log.Infof("peers API rate limit set to %d/min", apiRatePerM)
return &Handler{ return &Handler{
accountManager: accountManager, accountManager: accountManager,
rateLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(apiRatePerM)), 1),
reqLimit: rate.Every(time.Minute / time.Duration(apiRatePerM)),
} }
} }
@@ -54,6 +71,11 @@ func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
} }
func (h *Handler) getPeer(ctx context.Context, accountID, peerID, userID string, w http.ResponseWriter) { func (h *Handler) getPeer(ctx context.Context, accountID, peerID, userID string, w http.ResponseWriter) {
if !h.rateLimiter.Allow() {
util.WriteError(ctx, fmt.Errorf("temp rate limit reached"), w)
return
}
peer, err := h.accountManager.GetPeer(ctx, accountID, peerID, userID) peer, err := h.accountManager.GetPeer(ctx, accountID, peerID, userID)
if err != nil { if err != nil {
util.WriteError(ctx, err, w) util.WriteError(ctx, err, w)
@@ -91,7 +113,7 @@ func (h *Handler) updatePeer(ctx context.Context, accountID, userID, peerID stri
req := &api.PeerRequest{} req := &api.PeerRequest{}
err := json.NewDecoder(r.Body).Decode(&req) err := json.NewDecoder(r.Body).Decode(&req)
if err != nil { if err != nil {
util.WriteErrorResponse("couldn't parse JSON request", http.StatusBadRequest, w) util.WriteErrorResponse("couldn't parse JSON request", http.StatusPreconditionRequired, w)
return return
} }
@@ -184,9 +206,40 @@ func (h *Handler) HandlePeer(w http.ResponseWriter, r *http.Request) {
util.WriteError(r.Context(), status.Errorf(status.NotFound, "unknown METHOD"), w) util.WriteError(r.Context(), status.Errorf(status.NotFound, "unknown METHOD"), w)
} }
} }
func getCallerIP(r *http.Request) string {
// Check X-Forwarded-For header first (can be a comma-separated list)
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
// Use first IP in the list
parts := strings.Split(xff, ",")
return strings.TrimSpace(parts[0])
}
// Then check X-Real-IP
if xrip := r.Header.Get("X-Real-IP"); xrip != "" {
return xrip
}
// Fallback to RemoteAddr
ip, _, err := net.SplitHostPort(r.RemoteAddr)
if err != nil {
return r.RemoteAddr // may be raw IP
}
return ip
}
// GetAllPeers returns a list of all peers associated with a provided account // GetAllPeers returns a list of all peers associated with a provided account
func (h *Handler) GetAllPeers(w http.ResponseWriter, r *http.Request) { func (h *Handler) GetAllPeers(w http.ResponseWriter, r *http.Request) {
ip := getCallerIP(r)
limiter, _ := h.limiterStore.LoadOrStore(ip, rate.NewLimiter(h.reqLimit, 1))
if !limiter.(*rate.Limiter).Allow() {
log.WithContext(r.Context()).Errorf("rate limit exceeded for IP: %s", ip)
util.WriteError(r.Context(), status.Errorf(status.StatusTooManyRequests, "temp rate limit reached"), w)
return
}
//if !h.rateLimiter.Allow() {
// util.WriteError(r.Context(), status.Errorf(status.StatusTooManyRequests, "temp rate limit reached"), w)
// return
//}
userAuth, err := nbcontext.GetUserAuthFromContext(r.Context()) userAuth, err := nbcontext.GetUserAuthFromContext(r.Context())
if err != nil { if err != nil {
util.WriteError(r.Context(), err, w) util.WriteError(r.Context(), err, w)

View File

@@ -106,6 +106,8 @@ func WriteError(ctx context.Context, err error, w http.ResponseWriter) {
httpStatus = http.StatusUnauthorized httpStatus = http.StatusUnauthorized
case status.BadRequest: case status.BadRequest:
httpStatus = http.StatusBadRequest httpStatus = http.StatusBadRequest
case status.StatusTooManyRequests:
httpStatus = http.StatusTooManyRequests
default: default:
} }
msg = strings.ToLower(err.Error()) msg = strings.ToLower(err.Error())

View File

@@ -3,12 +3,14 @@ package port_forwarding
import ( import (
"context" "context"
"github.com/netbirdio/netbird/management/server/peer"
nbtypes "github.com/netbirdio/netbird/management/server/types" nbtypes "github.com/netbirdio/netbird/management/server/types"
) )
type Controller interface { type Controller interface {
SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string) SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string, accountPeers map[string]*peer.Peer)
GetProxyNetworkMaps(ctx context.Context, accountID string) (map[string]*nbtypes.NetworkMap, error) GetProxyNetworkMaps(ctx context.Context, accountID, peerID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error)
GetProxyNetworkMapsAll(ctx context.Context, accountID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error)
IsPeerInIngressPorts(ctx context.Context, accountID, peerID string) (bool, error) IsPeerInIngressPorts(ctx context.Context, accountID, peerID string) (bool, error)
} }
@@ -19,11 +21,15 @@ func NewControllerMock() *ControllerMock {
return &ControllerMock{} return &ControllerMock{}
} }
func (c *ControllerMock) SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string) { func (c *ControllerMock) SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string, accountPeers map[string]*peer.Peer) {
// noop // noop
} }
func (c *ControllerMock) GetProxyNetworkMaps(ctx context.Context, accountID string) (map[string]*nbtypes.NetworkMap, error) { func (c *ControllerMock) GetProxyNetworkMaps(ctx context.Context, accountID, peerID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error) {
return make(map[string]*nbtypes.NetworkMap), nil
}
func (c *ControllerMock) GetProxyNetworkMapsAll(ctx context.Context, accountID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error) {
return make(map[string]*nbtypes.NetworkMap), nil return make(map[string]*nbtypes.NetworkMap), nil
} }

View File

@@ -889,3 +889,7 @@ func (am *MockAccountManager) GetCurrentUserInfo(ctx context.Context, userAuth n
} }
return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented") return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented")
} }
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
// noop
}

View File

@@ -419,7 +419,7 @@ func (am *DefaultAccountManager) GetNetworkMap(ctx context.Context, peerID strin
} }
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings)) customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id) proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id, peerID, account.Peers)
if err != nil { if err != nil {
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err) log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
return nil, err return nil, err
@@ -453,6 +453,7 @@ func (am *DefaultAccountManager) GetPeerNetwork(ctx context.Context, peerID stri
// Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused). // Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused).
// The peer property is just a placeholder for the Peer properties to pass further // The peer property is just a placeholder for the Peer properties to pass further
func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) { func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
startGlobal := time.Now()
if setupKey == "" && userID == "" { if setupKey == "" && userID == "" {
// no auth method provided => reject access // no auth method provided => reject access
return nil, nil, nil, status.Errorf(status.Unauthenticated, "no peer auth method provided, please use a setup key or interactive SSO login") return nil, nil, nil, status.Errorf(status.Unauthenticated, "no peer auth method provided, please use a setup key or interactive SSO login")
@@ -505,6 +506,8 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
var ephemeral bool var ephemeral bool
var groupsToAdd []string var groupsToAdd []string
var allowExtraDNSLabels bool var allowExtraDNSLabels bool
start := time.Now()
if addedByUser { if addedByUser {
user, err := transaction.GetUserByUserID(ctx, store.LockingStrengthUpdate, userID) user, err := transaction.GetUserByUserID(ctx, store.LockingStrengthUpdate, userID)
if err != nil { if err != nil {
@@ -537,6 +540,9 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
} }
} }
log.WithContext(ctx).Debugf("AddPeer: setup key get took %v", time.Since(start))
start = time.Now()
if (strings.ToLower(peer.Meta.Hostname) == "iphone" || strings.ToLower(peer.Meta.Hostname) == "ipad") && userID != "" { if (strings.ToLower(peer.Meta.Hostname) == "iphone" || strings.ToLower(peer.Meta.Hostname) == "ipad") && userID != "" {
if am.idpManager != nil { if am.idpManager != nil {
userdata, err := am.idpManager.GetUserDataByID(ctx, userID, idp.AppMetadata{WTAccountID: accountID}) userdata, err := am.idpManager.GetUserDataByID(ctx, userID, idp.AppMetadata{WTAccountID: accountID})
@@ -545,16 +551,21 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
} }
} }
} }
log.WithContext(ctx).Debugf("AddPeer: idp took %v", time.Since(start))
start = time.Now()
freeLabel, err := am.getFreeDNSLabel(ctx, transaction, accountID, peer.Meta.Hostname) freeLabel, err := am.getFreeDNSLabel(ctx, transaction, accountID, peer.Meta.Hostname)
if err != nil { if err != nil {
return fmt.Errorf("failed to get free DNS label: %w", err) return fmt.Errorf("failed to get free DNS label: %w", err)
} }
log.WithContext(ctx).Debugf("AddPeer: free label took %v", time.Since(start))
start = time.Now()
freeIP, err := getFreeIP(ctx, transaction, accountID) freeIP, err := getFreeIP(ctx, transaction, accountID)
if err != nil { if err != nil {
return fmt.Errorf("failed to get free IP: %w", err) return fmt.Errorf("failed to get free IP: %w", err)
} }
log.WithContext(ctx).Debugf("AddPeer: ip took %v", time.Since(start))
registrationTime := time.Now().UTC() registrationTime := time.Now().UTC()
newPeer = &nbpeer.Peer{ newPeer = &nbpeer.Peer{
@@ -578,17 +589,22 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
ExtraDNSLabels: peer.ExtraDNSLabels, ExtraDNSLabels: peer.ExtraDNSLabels,
AllowExtraDNSLabels: allowExtraDNSLabels, AllowExtraDNSLabels: allowExtraDNSLabels,
} }
start = time.Now()
settings, err := transaction.GetAccountSettings(ctx, store.LockingStrengthShare, accountID) settings, err := transaction.GetAccountSettings(ctx, store.LockingStrengthShare, accountID)
if err != nil { if err != nil {
return fmt.Errorf("failed to get account settings: %w", err) return fmt.Errorf("failed to get account settings: %w", err)
} }
log.WithContext(ctx).Debugf("AddPeer: settings took %v", time.Since(start))
opEvent.TargetID = newPeer.ID opEvent.TargetID = newPeer.ID
opEvent.Meta = newPeer.EventMeta(am.GetDNSDomain(settings)) opEvent.Meta = newPeer.EventMeta(am.GetDNSDomain(settings))
if !addedByUser { if !addedByUser {
opEvent.Meta["setup_key_name"] = setupKeyName opEvent.Meta["setup_key_name"] = setupKeyName
} }
start = time.Now()
if am.geo != nil && newPeer.Location.ConnectionIP != nil { if am.geo != nil && newPeer.Location.ConnectionIP != nil {
location, err := am.geo.Lookup(newPeer.Location.ConnectionIP) location, err := am.geo.Lookup(newPeer.Location.ConnectionIP)
if err != nil { if err != nil {
@@ -600,8 +616,11 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
} }
} }
log.WithContext(ctx).Debugf("AddPeer: geo took %v", time.Since(start))
newPeer = am.integratedPeerValidator.PreparePeer(ctx, accountID, newPeer, groupsToAdd, settings.Extra) newPeer = am.integratedPeerValidator.PreparePeer(ctx, accountID, newPeer, groupsToAdd, settings.Extra)
start = time.Now()
err = transaction.AddPeerToAllGroup(ctx, store.LockingStrengthUpdate, accountID, newPeer.ID) err = transaction.AddPeerToAllGroup(ctx, store.LockingStrengthUpdate, accountID, newPeer.ID)
if err != nil { if err != nil {
return fmt.Errorf("failed adding peer to All group: %w", err) return fmt.Errorf("failed adding peer to All group: %w", err)
@@ -616,11 +635,16 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
} }
} }
log.WithContext(ctx).Debugf("AddPeer: add peer to group took %v", time.Since(start))
start = time.Now()
err = transaction.AddPeerToAccount(ctx, store.LockingStrengthUpdate, newPeer) err = transaction.AddPeerToAccount(ctx, store.LockingStrengthUpdate, newPeer)
if err != nil { if err != nil {
return fmt.Errorf("failed to add peer to account: %w", err) return fmt.Errorf("failed to add peer to account: %w", err)
} }
log.WithContext(ctx).Debugf("AddPeer: add peer to account took %v", time.Since(start))
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID) err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil { if err != nil {
return fmt.Errorf("failed to increment network serial: %w", err) return fmt.Errorf("failed to increment network serial: %w", err)
@@ -638,11 +662,14 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
} }
} }
start = time.Now()
updateAccountPeers, err = isPeerInActiveGroup(ctx, transaction, accountID, newPeer.ID) updateAccountPeers, err = isPeerInActiveGroup(ctx, transaction, accountID, newPeer.ID)
if err != nil { if err != nil {
return err return err
} }
log.WithContext(ctx).Debugf("AddPeer: is peer in active group took %v", time.Since(start))
log.WithContext(ctx).Debugf("Peer %s added to account %s", newPeer.ID, accountID) log.WithContext(ctx).Debugf("Peer %s added to account %s", newPeer.ID, accountID)
return nil return nil
}) })
@@ -657,8 +684,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta) am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)
unlock() log.WithContext(ctx).Debugf("AddPeer took %v", time.Since(startGlobal))
unlock = nil
if updateAccountPeers { if updateAccountPeers {
am.BufferUpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
@@ -997,48 +1023,56 @@ func (am *DefaultAccountManager) checkIFPeerNeedsLoginWithoutLock(ctx context.Co
} }
func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) { func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
start := time.Now() mstart := time.Now()
defer func() { defer func() {
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(start)) log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(mstart))
}() }()
if isRequiresApproval { if isRequiresApproval {
start := time.Now()
network, err := am.Store.GetAccountNetwork(ctx, store.LockingStrengthShare, accountID) network, err := am.Store.GetAccountNetwork(ctx, store.LockingStrengthShare, accountID)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(start))
emptyMap := &types.NetworkMap{ emptyMap := &types.NetworkMap{
Network: network.Copy(), Network: network.Copy(),
} }
return peer, emptyMap, nil, nil return peer, emptyMap, nil, nil
} }
start := time.Now()
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
log.WithContext(ctx).Debugf("GetAccountWithBackpressure: took %s", time.Since(start))
start = time.Now()
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra) approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
log.WithContext(ctx).Debugf("GetValidatedPeers: took %s", time.Since(start))
start = time.Now()
postureChecks, err := am.getPeerPostureChecks(account, peer.ID) postureChecks, err := am.getPeerPostureChecks(account, peer.ID)
if err != nil { if err != nil {
return nil, nil, nil, err return nil, nil, nil, err
} }
log.WithContext(ctx).Debugf("getPeerPostureChecks: took %s", time.Since(start))
start = time.Now()
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings)) customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
log.WithContext(ctx).Debugf("GetPeersCustomZone: took %s", time.Since(start))
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id) start = time.Now()
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id, peer.ID, account.Peers)
if err != nil { if err != nil {
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err) log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
return nil, nil, nil, err return nil, nil, nil, err
} }
log.WithContext(ctx).Debugf("GetProxyNetworkMaps: took %s", time.Since(start))
start = time.Now()
networkMap := account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), am.metrics.AccountManagerMetrics()) networkMap := account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), am.metrics.AccountManagerMetrics())
log.WithContext(ctx).Debugf("GetPeerNetworkMap: took %s", time.Since(start))
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID] proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]
if ok { if ok {
networkMap.Merge(proxyNetworkMap) networkMap.Merge(proxyNetworkMap)
@@ -1166,13 +1200,16 @@ func (am *DefaultAccountManager) checkIfUserOwnsPeer(ctx context.Context, accoun
// UpdateAccountPeers updates all peers that belong to an account. // UpdateAccountPeers updates all peers that belong to an account.
// Should be called when changes have to be synced to peers. // Should be called when changes have to be synced to peers.
func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) { func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
globalStart := time.Now()
start := time.Now()
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID) account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
if err != nil { if err != nil {
log.WithContext(ctx).Errorf("failed to send out updates to peers. failed to get account: %v", err) log.WithContext(ctx).Errorf("failed to send out updates to peers. failed to get account: %v", err)
return return
} }
log.WithContext(ctx).Infof("updateAccountPeers: getAccount took %s", time.Since(start))
start := time.Now() start = time.Now()
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra) approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
if err != nil { if err != nil {
@@ -1180,6 +1217,8 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return return
} }
log.WithContext(ctx).Infof("updateAccountPeers: validatePeers took %s", time.Since(start))
var wg sync.WaitGroup var wg sync.WaitGroup
semaphore := make(chan struct{}, 10) semaphore := make(chan struct{}, 10)
@@ -1189,11 +1228,21 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
resourcePolicies := account.GetResourcePoliciesMap() resourcePolicies := account.GetResourcePoliciesMap()
routers := account.GetResourceRoutersMap() routers := account.GetResourceRoutersMap()
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountID) start = time.Now()
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMapsAll(ctx, accountID, account.Peers)
if err != nil { if err != nil {
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err) log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
return return
} }
log.WithContext(ctx).Infof("updateAccountPeers: getProxyNetworkMaps took %s", time.Since(start))
for _, id := range []string{"d07kd1ei389c73dq19gg", "d07kcaui389c73dq19g0", "d0e7uo6i389c73f040v0"} {
peerMap, ok := proxyNetworkMaps[id]
if !ok {
log.WithContext(ctx).Infof("updateAccountPeers xxx: proxy network map %s not found", id)
continue
}
log.WithContext(ctx).Infof("updateAccountPeers xxx: peer %s has %d peers, %d offline peers, %d, firewall rules, %d forwarding rules, %d routing rules", id, len(peerMap.Peers), len(peerMap.OfflinePeers), len(peerMap.FirewallRules), len(peerMap.ForwardingRules), len(peerMap.RoutesFirewallRules))
}
for _, peer := range account.Peers { for _, peer := range account.Peers {
if !am.peersUpdateManager.HasChannel(peer.ID) { if !am.peersUpdateManager.HasChannel(peer.ID) {
@@ -1226,16 +1275,22 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return return
} }
start = time.Now()
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings.RoutingPeerDNSResolutionEnabled, extraSetting) update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings.RoutingPeerDNSResolutionEnabled, extraSetting)
log.WithContext(ctx).Infof("updateAccountPeers: toSyncResponse took %s", time.Since(start))
start = time.Now()
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap}) am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
log.WithContext(ctx).Infof("updateAccountPeers: sending update toSyncResponse took %s", time.Since(start))
}(peer) }(peer)
} }
// //
wg.Wait() wg.Wait()
log.WithContext(ctx).Infof("updateAccountPeers: waiting for updates to complete took %s", time.Since(globalStart))
if am.metrics != nil { if am.metrics != nil {
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(start)) am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
} }
} }
@@ -1292,7 +1347,7 @@ func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountI
return return
} }
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountId) proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountId, peerId, account.Peers)
if err != nil { if err != nil {
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err) log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
return return

View File

@@ -24,7 +24,7 @@ type Peer struct {
// Meta is a Peer system meta data // Meta is a Peer system meta data
Meta PeerSystemMeta `gorm:"embedded;embeddedPrefix:meta_"` Meta PeerSystemMeta `gorm:"embedded;embeddedPrefix:meta_"`
// Name is peer's name (machine name) // Name is peer's name (machine name)
Name string Name string `gorm:"index"`
// DNSLabel is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's // DNSLabel is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's
// domain to the peer label. e.g. peer-dns-label.netbird.cloud // domain to the peer label. e.g. peer-dns-label.netbird.cloud
DNSLabel string DNSLabel string

View File

@@ -37,6 +37,8 @@ const (
// Unauthenticated indicates that user is not authenticated due to absence of valid credentials // Unauthenticated indicates that user is not authenticated due to absence of valid credentials
Unauthenticated Type = 10 Unauthenticated Type = 10
StatusTooManyRequests = 11
) )
// Type is a type of the Error // Type is a type of the Error

View File

@@ -1311,7 +1311,7 @@ func (s *SqlStore) GetAccountPeers(ctx context.Context, lockStrength LockingStre
query := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Where(accountIDCondition, accountID) query := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Where(accountIDCondition, accountID)
if nameFilter != "" { if nameFilter != "" {
query = query.Where("name LIKE ?", "%"+nameFilter+"%") query = query.Where("name = ?", nameFilter)
} }
if ipFilter != "" { if ipFilter != "" {
query = query.Where("ip LIKE ?", "%"+ipFilter+"%") query = query.Where("ip LIKE ?", "%"+ipFilter+"%")