mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-19 00:36:38 +00:00
Compare commits
38 Commits
fix/androi
...
test/add-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9b2351193c | ||
|
|
63fd508556 | ||
|
|
760d61c7a3 | ||
|
|
93a0315120 | ||
|
|
676f201c83 | ||
|
|
b7173ab956 | ||
|
|
26b418c42f | ||
|
|
0267cd1ddd | ||
|
|
6b86350b9d | ||
|
|
102384bfbb | ||
|
|
0735340a0b | ||
|
|
51e4b9aba6 | ||
|
|
62f9c8ace9 | ||
|
|
c57869aa78 | ||
|
|
abf6a1e08e | ||
|
|
673f441d6e | ||
|
|
1a12100790 | ||
|
|
3e963ffeba | ||
|
|
86fa1eaa16 | ||
|
|
1046342e2c | ||
|
|
89729d85df | ||
|
|
2c5dff2f89 | ||
|
|
779643463d | ||
|
|
22ac5ea0e8 | ||
|
|
cf60191bb5 | ||
|
|
8bfab0d6dd | ||
|
|
921b5606ce | ||
|
|
84126f9425 | ||
|
|
489f13031b | ||
|
|
c5b065aec1 | ||
|
|
b09bc6534c | ||
|
|
34f1a366b3 | ||
|
|
483edfcdc6 | ||
|
|
ef2eace033 | ||
|
|
1bddfa5b7b | ||
|
|
6ea7c665dc | ||
|
|
4a3c782a31 | ||
|
|
9359fea507 |
2
go.mod
2
go.mod
@@ -106,6 +106,7 @@ require (
|
|||||||
golang.org/x/oauth2 v0.24.0
|
golang.org/x/oauth2 v0.24.0
|
||||||
golang.org/x/sync v0.13.0
|
golang.org/x/sync v0.13.0
|
||||||
golang.org/x/term v0.31.0
|
golang.org/x/term v0.31.0
|
||||||
|
golang.org/x/time v0.5.0
|
||||||
google.golang.org/api v0.177.0
|
google.golang.org/api v0.177.0
|
||||||
gopkg.in/yaml.v3 v3.0.1
|
gopkg.in/yaml.v3 v3.0.1
|
||||||
gorm.io/driver/mysql v1.5.7
|
gorm.io/driver/mysql v1.5.7
|
||||||
@@ -240,7 +241,6 @@ require (
|
|||||||
golang.org/x/image v0.18.0 // indirect
|
golang.org/x/image v0.18.0 // indirect
|
||||||
golang.org/x/mod v0.17.0 // indirect
|
golang.org/x/mod v0.17.0 // indirect
|
||||||
golang.org/x/text v0.24.0 // indirect
|
golang.org/x/text v0.24.0 // indirect
|
||||||
golang.org/x/time v0.5.0 // indirect
|
|
||||||
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
|
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
|
||||||
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
golang.zx2c4.com/wintun v0.0.0-20230126152724-0fa3db229ce2 // indirect
|
||||||
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect
|
google.golang.org/genproto/googleapis/api v0.0.0-20240509183442-62759503f434 // indirect
|
||||||
|
|||||||
@@ -134,6 +134,7 @@ var (
|
|||||||
},
|
},
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
startPprof()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(cmd.Context())
|
ctx, cancel := context.WithCancel(cmd.Context())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|||||||
@@ -2,9 +2,13 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
_ "net/http/pprof"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"runtime"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/server/types"
|
"github.com/netbirdio/netbird/management/server/types"
|
||||||
@@ -17,6 +21,17 @@ const (
|
|||||||
idpSignKeyRefreshEnabledFlagName = "idp-sign-key-refresh-enabled"
|
idpSignKeyRefreshEnabledFlagName = "idp-sign-key-refresh-enabled"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func startPprof() {
|
||||||
|
go func() {
|
||||||
|
runtime.SetBlockProfileRate(1)
|
||||||
|
runtime.SetMutexProfileFraction(1)
|
||||||
|
log.Debugf("Starting pprof server on 0.0.0.0:6060")
|
||||||
|
if err := http.ListenAndServe("0.0.0.0:6060", nil); err != nil {
|
||||||
|
log.Fatalf("pprof server failed: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
dnsDomain string
|
dnsDomain string
|
||||||
mgmtDataDir string
|
mgmtDataDir string
|
||||||
|
|||||||
@@ -202,7 +202,7 @@ func BuildManager(
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
initialInterval = 1
|
initialInterval = 1
|
||||||
} else {
|
} else {
|
||||||
initialInterval = int64(interval) * 10
|
initialInterval = int64(interval) * 2
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(30 * time.Second)
|
time.Sleep(30 * time.Second)
|
||||||
am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond))
|
am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond))
|
||||||
|
|||||||
@@ -117,4 +117,5 @@ type Manager interface {
|
|||||||
UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error)
|
UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error)
|
||||||
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
|
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
|
||||||
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
|
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
|
||||||
|
BufferUpdateAccountPeers(ctx context.Context, accountID string)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,8 +3,11 @@ package server
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand/v2"
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -13,6 +16,7 @@ import (
|
|||||||
"github.com/golang/protobuf/ptypes/timestamp"
|
"github.com/golang/protobuf/ptypes/timestamp"
|
||||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
@@ -47,6 +51,11 @@ type GRPCServer struct {
|
|||||||
ephemeralManager *EphemeralManager
|
ephemeralManager *EphemeralManager
|
||||||
peerLocks sync.Map
|
peerLocks sync.Map
|
||||||
authManager auth.Manager
|
authManager auth.Manager
|
||||||
|
|
||||||
|
syncLimiter *rate.Limiter
|
||||||
|
loginLimiter *rate.Limiter
|
||||||
|
loginLimiterStore sync.Map
|
||||||
|
loginPeerLimit rate.Limit
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new Management server
|
// NewServer creates a new Management server
|
||||||
@@ -76,6 +85,41 @@ func NewServer(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
multiplier := time.Second
|
||||||
|
d, e := time.ParseDuration(os.Getenv("NB_LOGIN_RATE"))
|
||||||
|
if e == nil {
|
||||||
|
multiplier = d
|
||||||
|
}
|
||||||
|
|
||||||
|
loginRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_RATE_PER_M"))
|
||||||
|
if loginRatePerS == 0 || err != nil {
|
||||||
|
loginRatePerS = 200
|
||||||
|
}
|
||||||
|
|
||||||
|
loginBurst, err := strconv.Atoi(os.Getenv("NB_LOGIN_BURST"))
|
||||||
|
if loginBurst == 0 || err != nil {
|
||||||
|
loginBurst = 200
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("login burst limit set to %d", loginBurst)
|
||||||
|
|
||||||
|
loginPeerRatePerS, err := strconv.Atoi(os.Getenv("NB_LOGIN_PEER_RATE_PER_M"))
|
||||||
|
if loginPeerRatePerS == 0 || err != nil {
|
||||||
|
loginPeerRatePerS = 200
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("login rate limit set to %d/min", loginRatePerS)
|
||||||
|
|
||||||
|
syncRatePerS, err := strconv.Atoi(os.Getenv("NB_SYNC_RATE_PER_M"))
|
||||||
|
if syncRatePerS == 0 || err != nil {
|
||||||
|
syncRatePerS = 200
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("sync rate limit set to %d/min", syncRatePerS)
|
||||||
|
|
||||||
|
syncBurst, err := strconv.Atoi(os.Getenv("NB_SYNC_BURST"))
|
||||||
|
if syncBurst == 0 || err != nil {
|
||||||
|
syncBurst = 200
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("sync burst limit set to %d", syncBurst)
|
||||||
|
|
||||||
return &GRPCServer{
|
return &GRPCServer{
|
||||||
wgKey: key,
|
wgKey: key,
|
||||||
// peerKey -> event channel
|
// peerKey -> event channel
|
||||||
@@ -87,6 +131,9 @@ func NewServer(
|
|||||||
authManager: authManager,
|
authManager: authManager,
|
||||||
appMetrics: appMetrics,
|
appMetrics: appMetrics,
|
||||||
ephemeralManager: ephemeralManager,
|
ephemeralManager: ephemeralManager,
|
||||||
|
syncLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(syncRatePerS)), syncBurst),
|
||||||
|
loginLimiter: rate.NewLimiter(rate.Every(multiplier/time.Duration(loginRatePerS)), loginBurst),
|
||||||
|
loginPeerLimit: rate.Every(time.Minute / time.Duration(loginPeerRatePerS)),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,11 +175,18 @@ func getRealIP(ctx context.Context) net.IP {
|
|||||||
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
||||||
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
||||||
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
||||||
reqStart := time.Now()
|
|
||||||
if s.appMetrics != nil {
|
if s.appMetrics != nil {
|
||||||
s.appMetrics.GRPCMetrics().CountSyncRequest()
|
s.appMetrics.GRPCMetrics().CountSyncRequest()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !s.syncLimiter.Allow() {
|
||||||
|
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||||
|
log.Warnf("sync rate limit exceeded for peer %s", req.WgPubKey)
|
||||||
|
return status.Errorf(codes.Internal, "temp rate limit reached")
|
||||||
|
}
|
||||||
|
|
||||||
|
reqStart := time.Now()
|
||||||
|
|
||||||
ctx := srv.Context()
|
ctx := srv.Context()
|
||||||
|
|
||||||
syncReq := &proto.SyncRequest{}
|
syncReq := &proto.SyncRequest{}
|
||||||
@@ -416,15 +470,58 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
|
|||||||
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
|
// In case it isn't, the endpoint checks whether setup key is provided within the request and tries to register a peer.
|
||||||
// In case of the successful registration login is also successful
|
// In case of the successful registration login is also successful
|
||||||
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
||||||
|
if s.appMetrics != nil {
|
||||||
|
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
||||||
|
}
|
||||||
|
|
||||||
|
limiterIface, ok := s.loginLimiterStore.Load(req.WgPubKey)
|
||||||
|
if !ok {
|
||||||
|
// Check global limiter before allowing a new peer limiter
|
||||||
|
if !s.loginLimiter.Allow() {
|
||||||
|
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||||
|
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||||
|
return nil, fmt.Errorf("temp rate limit reached (global limit)")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create new limiter for this peer
|
||||||
|
newLimiter := rate.NewLimiter(s.loginPeerLimit, 1000)
|
||||||
|
s.loginLimiterStore.Store(req.WgPubKey, newLimiter)
|
||||||
|
|
||||||
|
if !newLimiter.Allow() {
|
||||||
|
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||||
|
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||||
|
return nil, fmt.Errorf("temp rate limit reached (new peer limit)")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// Use existing limiter for this peer
|
||||||
|
limiter := limiterIface.(*rate.Limiter)
|
||||||
|
if !limiter.Allow() {
|
||||||
|
time.Sleep(time.Second + (time.Millisecond * time.Duration(rand.IntN(20)*100)))
|
||||||
|
log.WithContext(ctx).Warnf("rate limit exceeded for peer %s", req.WgPubKey)
|
||||||
|
return nil, fmt.Errorf("temp rate limit reached (peer limit)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// limiter, _ := s.loginLimiterStore.LoadOrStore(req.WgPubKey, rate.NewLimiter(s.loginPeerLimit, 1))
|
||||||
|
// if !limiter.(*rate.Limiter).Allow() {
|
||||||
|
// time.Sleep(time.Millisecond * time.Duration(rand.IntN(10)*100))
|
||||||
|
// log.WithContext(ctx).Warnf("rate limit exceeded for %s", req.WgPubKey)
|
||||||
|
// return nil, status.Errorf(codes.Internal, "temp rate limit reached")
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if os.Getenv("ENABLE_LOGIN_RATE_LIMIT") == "true" {
|
||||||
|
// if !s.loginLimiter.Allow() {
|
||||||
|
// return nil, status.Errorf(codes.Internal, "temp rate limit reached")
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
|
||||||
reqStart := time.Now()
|
reqStart := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
if s.appMetrics != nil {
|
if s.appMetrics != nil {
|
||||||
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
|
s.appMetrics.GRPCMetrics().CountLoginRequestDuration(time.Since(reqStart))
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
if s.appMetrics != nil {
|
|
||||||
s.appMetrics.GRPCMetrics().CountLoginRequest()
|
|
||||||
}
|
|
||||||
realIP := getRealIP(ctx)
|
realIP := getRealIP(ctx)
|
||||||
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())
|
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())
|
||||||
|
|
||||||
|
|||||||
@@ -4,10 +4,17 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"golang.org/x/time/rate"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/server/account"
|
"github.com/netbirdio/netbird/management/server/account"
|
||||||
"github.com/netbirdio/netbird/management/server/activity"
|
"github.com/netbirdio/netbird/management/server/activity"
|
||||||
@@ -23,6 +30,9 @@ import (
|
|||||||
// Handler is a handler that returns peers of the account
|
// Handler is a handler that returns peers of the account
|
||||||
type Handler struct {
|
type Handler struct {
|
||||||
accountManager account.Manager
|
accountManager account.Manager
|
||||||
|
rateLimiter *rate.Limiter
|
||||||
|
limiterStore sync.Map
|
||||||
|
reqLimit rate.Limit
|
||||||
}
|
}
|
||||||
|
|
||||||
func AddEndpoints(accountManager account.Manager, router *mux.Router) {
|
func AddEndpoints(accountManager account.Manager, router *mux.Router) {
|
||||||
@@ -35,8 +45,15 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) {
|
|||||||
|
|
||||||
// NewHandler creates a new peers Handler
|
// NewHandler creates a new peers Handler
|
||||||
func NewHandler(accountManager account.Manager) *Handler {
|
func NewHandler(accountManager account.Manager) *Handler {
|
||||||
|
apiRatePerM, err := strconv.Atoi(os.Getenv("NB_API_RATE_PER_M"))
|
||||||
|
if apiRatePerM == 0 || err != nil {
|
||||||
|
apiRatePerM = 60
|
||||||
|
}
|
||||||
|
log.Infof("peers API rate limit set to %d/min", apiRatePerM)
|
||||||
return &Handler{
|
return &Handler{
|
||||||
accountManager: accountManager,
|
accountManager: accountManager,
|
||||||
|
rateLimiter: rate.NewLimiter(rate.Every(time.Minute/time.Duration(apiRatePerM)), 1),
|
||||||
|
reqLimit: rate.Every(time.Minute / time.Duration(apiRatePerM)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -54,6 +71,11 @@ func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handler) getPeer(ctx context.Context, accountID, peerID, userID string, w http.ResponseWriter) {
|
func (h *Handler) getPeer(ctx context.Context, accountID, peerID, userID string, w http.ResponseWriter) {
|
||||||
|
if !h.rateLimiter.Allow() {
|
||||||
|
util.WriteError(ctx, fmt.Errorf("temp rate limit reached"), w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
peer, err := h.accountManager.GetPeer(ctx, accountID, peerID, userID)
|
peer, err := h.accountManager.GetPeer(ctx, accountID, peerID, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.WriteError(ctx, err, w)
|
util.WriteError(ctx, err, w)
|
||||||
@@ -91,7 +113,7 @@ func (h *Handler) updatePeer(ctx context.Context, accountID, userID, peerID stri
|
|||||||
req := &api.PeerRequest{}
|
req := &api.PeerRequest{}
|
||||||
err := json.NewDecoder(r.Body).Decode(&req)
|
err := json.NewDecoder(r.Body).Decode(&req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.WriteErrorResponse("couldn't parse JSON request", http.StatusBadRequest, w)
|
util.WriteErrorResponse("couldn't parse JSON request", http.StatusPreconditionRequired, w)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -184,9 +206,40 @@ func (h *Handler) HandlePeer(w http.ResponseWriter, r *http.Request) {
|
|||||||
util.WriteError(r.Context(), status.Errorf(status.NotFound, "unknown METHOD"), w)
|
util.WriteError(r.Context(), status.Errorf(status.NotFound, "unknown METHOD"), w)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
func getCallerIP(r *http.Request) string {
|
||||||
|
// Check X-Forwarded-For header first (can be a comma-separated list)
|
||||||
|
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
|
||||||
|
// Use first IP in the list
|
||||||
|
parts := strings.Split(xff, ",")
|
||||||
|
return strings.TrimSpace(parts[0])
|
||||||
|
}
|
||||||
|
|
||||||
|
// Then check X-Real-IP
|
||||||
|
if xrip := r.Header.Get("X-Real-IP"); xrip != "" {
|
||||||
|
return xrip
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback to RemoteAddr
|
||||||
|
ip, _, err := net.SplitHostPort(r.RemoteAddr)
|
||||||
|
if err != nil {
|
||||||
|
return r.RemoteAddr // may be raw IP
|
||||||
|
}
|
||||||
|
return ip
|
||||||
|
}
|
||||||
|
|
||||||
// GetAllPeers returns a list of all peers associated with a provided account
|
// GetAllPeers returns a list of all peers associated with a provided account
|
||||||
func (h *Handler) GetAllPeers(w http.ResponseWriter, r *http.Request) {
|
func (h *Handler) GetAllPeers(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ip := getCallerIP(r)
|
||||||
|
limiter, _ := h.limiterStore.LoadOrStore(ip, rate.NewLimiter(h.reqLimit, 1))
|
||||||
|
if !limiter.(*rate.Limiter).Allow() {
|
||||||
|
log.WithContext(r.Context()).Errorf("rate limit exceeded for IP: %s", ip)
|
||||||
|
util.WriteError(r.Context(), status.Errorf(status.StatusTooManyRequests, "temp rate limit reached"), w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
//if !h.rateLimiter.Allow() {
|
||||||
|
// util.WriteError(r.Context(), status.Errorf(status.StatusTooManyRequests, "temp rate limit reached"), w)
|
||||||
|
// return
|
||||||
|
//}
|
||||||
userAuth, err := nbcontext.GetUserAuthFromContext(r.Context())
|
userAuth, err := nbcontext.GetUserAuthFromContext(r.Context())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
util.WriteError(r.Context(), err, w)
|
util.WriteError(r.Context(), err, w)
|
||||||
|
|||||||
@@ -106,6 +106,8 @@ func WriteError(ctx context.Context, err error, w http.ResponseWriter) {
|
|||||||
httpStatus = http.StatusUnauthorized
|
httpStatus = http.StatusUnauthorized
|
||||||
case status.BadRequest:
|
case status.BadRequest:
|
||||||
httpStatus = http.StatusBadRequest
|
httpStatus = http.StatusBadRequest
|
||||||
|
case status.StatusTooManyRequests:
|
||||||
|
httpStatus = http.StatusTooManyRequests
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
msg = strings.ToLower(err.Error())
|
msg = strings.ToLower(err.Error())
|
||||||
|
|||||||
@@ -3,12 +3,14 @@ package port_forwarding
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/management/server/peer"
|
||||||
nbtypes "github.com/netbirdio/netbird/management/server/types"
|
nbtypes "github.com/netbirdio/netbird/management/server/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Controller interface {
|
type Controller interface {
|
||||||
SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string)
|
SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string, accountPeers map[string]*peer.Peer)
|
||||||
GetProxyNetworkMaps(ctx context.Context, accountID string) (map[string]*nbtypes.NetworkMap, error)
|
GetProxyNetworkMaps(ctx context.Context, accountID, peerID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error)
|
||||||
|
GetProxyNetworkMapsAll(ctx context.Context, accountID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error)
|
||||||
IsPeerInIngressPorts(ctx context.Context, accountID, peerID string) (bool, error)
|
IsPeerInIngressPorts(ctx context.Context, accountID, peerID string) (bool, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -19,11 +21,15 @@ func NewControllerMock() *ControllerMock {
|
|||||||
return &ControllerMock{}
|
return &ControllerMock{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ControllerMock) SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string) {
|
func (c *ControllerMock) SendUpdate(ctx context.Context, accountID string, affectedProxyID string, affectedPeerIDs []string, accountPeers map[string]*peer.Peer) {
|
||||||
// noop
|
// noop
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ControllerMock) GetProxyNetworkMaps(ctx context.Context, accountID string) (map[string]*nbtypes.NetworkMap, error) {
|
func (c *ControllerMock) GetProxyNetworkMaps(ctx context.Context, accountID, peerID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error) {
|
||||||
|
return make(map[string]*nbtypes.NetworkMap), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ControllerMock) GetProxyNetworkMapsAll(ctx context.Context, accountID string, accountPeers map[string]*peer.Peer) (map[string]*nbtypes.NetworkMap, error) {
|
||||||
return make(map[string]*nbtypes.NetworkMap), nil
|
return make(map[string]*nbtypes.NetworkMap), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -889,3 +889,7 @@ func (am *MockAccountManager) GetCurrentUserInfo(ctx context.Context, userAuth n
|
|||||||
}
|
}
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (am *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||||
|
// noop
|
||||||
|
}
|
||||||
|
|||||||
@@ -419,7 +419,7 @@ func (am *DefaultAccountManager) GetNetworkMap(ctx context.Context, peerID strin
|
|||||||
}
|
}
|
||||||
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
|
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
|
||||||
|
|
||||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id)
|
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id, peerID, account.Peers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -453,6 +453,7 @@ func (am *DefaultAccountManager) GetPeerNetwork(ctx context.Context, peerID stri
|
|||||||
// Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused).
|
// Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused).
|
||||||
// The peer property is just a placeholder for the Peer properties to pass further
|
// The peer property is just a placeholder for the Peer properties to pass further
|
||||||
func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
|
func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
|
||||||
|
startGlobal := time.Now()
|
||||||
if setupKey == "" && userID == "" {
|
if setupKey == "" && userID == "" {
|
||||||
// no auth method provided => reject access
|
// no auth method provided => reject access
|
||||||
return nil, nil, nil, status.Errorf(status.Unauthenticated, "no peer auth method provided, please use a setup key or interactive SSO login")
|
return nil, nil, nil, status.Errorf(status.Unauthenticated, "no peer auth method provided, please use a setup key or interactive SSO login")
|
||||||
@@ -505,6 +506,8 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
var ephemeral bool
|
var ephemeral bool
|
||||||
var groupsToAdd []string
|
var groupsToAdd []string
|
||||||
var allowExtraDNSLabels bool
|
var allowExtraDNSLabels bool
|
||||||
|
|
||||||
|
start := time.Now()
|
||||||
if addedByUser {
|
if addedByUser {
|
||||||
user, err := transaction.GetUserByUserID(ctx, store.LockingStrengthUpdate, userID)
|
user, err := transaction.GetUserByUserID(ctx, store.LockingStrengthUpdate, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -537,6 +540,9 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).Debugf("AddPeer: setup key get took %v", time.Since(start))
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
if (strings.ToLower(peer.Meta.Hostname) == "iphone" || strings.ToLower(peer.Meta.Hostname) == "ipad") && userID != "" {
|
if (strings.ToLower(peer.Meta.Hostname) == "iphone" || strings.ToLower(peer.Meta.Hostname) == "ipad") && userID != "" {
|
||||||
if am.idpManager != nil {
|
if am.idpManager != nil {
|
||||||
userdata, err := am.idpManager.GetUserDataByID(ctx, userID, idp.AppMetadata{WTAccountID: accountID})
|
userdata, err := am.idpManager.GetUserDataByID(ctx, userID, idp.AppMetadata{WTAccountID: accountID})
|
||||||
@@ -545,16 +551,21 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
log.WithContext(ctx).Debugf("AddPeer: idp took %v", time.Since(start))
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
freeLabel, err := am.getFreeDNSLabel(ctx, transaction, accountID, peer.Meta.Hostname)
|
freeLabel, err := am.getFreeDNSLabel(ctx, transaction, accountID, peer.Meta.Hostname)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get free DNS label: %w", err)
|
return fmt.Errorf("failed to get free DNS label: %w", err)
|
||||||
}
|
}
|
||||||
|
log.WithContext(ctx).Debugf("AddPeer: free label took %v", time.Since(start))
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
freeIP, err := getFreeIP(ctx, transaction, accountID)
|
freeIP, err := getFreeIP(ctx, transaction, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get free IP: %w", err)
|
return fmt.Errorf("failed to get free IP: %w", err)
|
||||||
}
|
}
|
||||||
|
log.WithContext(ctx).Debugf("AddPeer: ip took %v", time.Since(start))
|
||||||
|
|
||||||
registrationTime := time.Now().UTC()
|
registrationTime := time.Now().UTC()
|
||||||
newPeer = &nbpeer.Peer{
|
newPeer = &nbpeer.Peer{
|
||||||
@@ -578,17 +589,22 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
ExtraDNSLabels: peer.ExtraDNSLabels,
|
ExtraDNSLabels: peer.ExtraDNSLabels,
|
||||||
AllowExtraDNSLabels: allowExtraDNSLabels,
|
AllowExtraDNSLabels: allowExtraDNSLabels,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
settings, err := transaction.GetAccountSettings(ctx, store.LockingStrengthShare, accountID)
|
settings, err := transaction.GetAccountSettings(ctx, store.LockingStrengthShare, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get account settings: %w", err)
|
return fmt.Errorf("failed to get account settings: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).Debugf("AddPeer: settings took %v", time.Since(start))
|
||||||
|
|
||||||
opEvent.TargetID = newPeer.ID
|
opEvent.TargetID = newPeer.ID
|
||||||
opEvent.Meta = newPeer.EventMeta(am.GetDNSDomain(settings))
|
opEvent.Meta = newPeer.EventMeta(am.GetDNSDomain(settings))
|
||||||
if !addedByUser {
|
if !addedByUser {
|
||||||
opEvent.Meta["setup_key_name"] = setupKeyName
|
opEvent.Meta["setup_key_name"] = setupKeyName
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
if am.geo != nil && newPeer.Location.ConnectionIP != nil {
|
if am.geo != nil && newPeer.Location.ConnectionIP != nil {
|
||||||
location, err := am.geo.Lookup(newPeer.Location.ConnectionIP)
|
location, err := am.geo.Lookup(newPeer.Location.ConnectionIP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -600,8 +616,11 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).Debugf("AddPeer: geo took %v", time.Since(start))
|
||||||
|
|
||||||
newPeer = am.integratedPeerValidator.PreparePeer(ctx, accountID, newPeer, groupsToAdd, settings.Extra)
|
newPeer = am.integratedPeerValidator.PreparePeer(ctx, accountID, newPeer, groupsToAdd, settings.Extra)
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
err = transaction.AddPeerToAllGroup(ctx, store.LockingStrengthUpdate, accountID, newPeer.ID)
|
err = transaction.AddPeerToAllGroup(ctx, store.LockingStrengthUpdate, accountID, newPeer.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed adding peer to All group: %w", err)
|
return fmt.Errorf("failed adding peer to All group: %w", err)
|
||||||
@@ -616,11 +635,16 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).Debugf("AddPeer: add peer to group took %v", time.Since(start))
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
err = transaction.AddPeerToAccount(ctx, store.LockingStrengthUpdate, newPeer)
|
err = transaction.AddPeerToAccount(ctx, store.LockingStrengthUpdate, newPeer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to add peer to account: %w", err)
|
return fmt.Errorf("failed to add peer to account: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).Debugf("AddPeer: add peer to account took %v", time.Since(start))
|
||||||
|
|
||||||
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
|
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to increment network serial: %w", err)
|
return fmt.Errorf("failed to increment network serial: %w", err)
|
||||||
@@ -638,11 +662,14 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
updateAccountPeers, err = isPeerInActiveGroup(ctx, transaction, accountID, newPeer.ID)
|
updateAccountPeers, err = isPeerInActiveGroup(ctx, transaction, accountID, newPeer.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).Debugf("AddPeer: is peer in active group took %v", time.Since(start))
|
||||||
|
|
||||||
log.WithContext(ctx).Debugf("Peer %s added to account %s", newPeer.ID, accountID)
|
log.WithContext(ctx).Debugf("Peer %s added to account %s", newPeer.ID, accountID)
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@@ -657,8 +684,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
|
|
||||||
am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)
|
am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)
|
||||||
|
|
||||||
unlock()
|
log.WithContext(ctx).Debugf("AddPeer took %v", time.Since(startGlobal))
|
||||||
unlock = nil
|
|
||||||
|
|
||||||
if updateAccountPeers {
|
if updateAccountPeers {
|
||||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
@@ -997,48 +1023,56 @@ func (am *DefaultAccountManager) checkIFPeerNeedsLoginWithoutLock(ctx context.Co
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
|
func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, isRequiresApproval bool, accountID string, peer *nbpeer.Peer) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
|
||||||
start := time.Now()
|
mstart := time.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(start))
|
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(mstart))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if isRequiresApproval {
|
if isRequiresApproval {
|
||||||
|
start := time.Now()
|
||||||
network, err := am.Store.GetAccountNetwork(ctx, store.LockingStrengthShare, accountID)
|
network, err := am.Store.GetAccountNetwork(ctx, store.LockingStrengthShare, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).Debugf("getValidatedPeerWithMap: took %s", time.Since(start))
|
||||||
|
|
||||||
emptyMap := &types.NetworkMap{
|
emptyMap := &types.NetworkMap{
|
||||||
Network: network.Copy(),
|
Network: network.Copy(),
|
||||||
}
|
}
|
||||||
return peer, emptyMap, nil, nil
|
return peer, emptyMap, nil, nil
|
||||||
}
|
}
|
||||||
|
start := time.Now()
|
||||||
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
|
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
log.WithContext(ctx).Debugf("GetAccountWithBackpressure: took %s", time.Since(start))
|
||||||
|
start = time.Now()
|
||||||
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
log.WithContext(ctx).Debugf("GetValidatedPeers: took %s", time.Since(start))
|
||||||
|
start = time.Now()
|
||||||
postureChecks, err := am.getPeerPostureChecks(account, peer.ID)
|
postureChecks, err := am.getPeerPostureChecks(account, peer.ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
log.WithContext(ctx).Debugf("getPeerPostureChecks: took %s", time.Since(start))
|
||||||
|
start = time.Now()
|
||||||
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
|
customZone := account.GetPeersCustomZone(ctx, am.GetDNSDomain(account.Settings))
|
||||||
|
log.WithContext(ctx).Debugf("GetPeersCustomZone: took %s", time.Since(start))
|
||||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id)
|
start = time.Now()
|
||||||
|
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, account.Id, peer.ID, account.Peers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
||||||
return nil, nil, nil, err
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
log.WithContext(ctx).Debugf("GetProxyNetworkMaps: took %s", time.Since(start))
|
||||||
|
start = time.Now()
|
||||||
networkMap := account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), am.metrics.AccountManagerMetrics())
|
networkMap := account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), am.metrics.AccountManagerMetrics())
|
||||||
|
log.WithContext(ctx).Debugf("GetPeerNetworkMap: took %s", time.Since(start))
|
||||||
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]
|
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]
|
||||||
if ok {
|
if ok {
|
||||||
networkMap.Merge(proxyNetworkMap)
|
networkMap.Merge(proxyNetworkMap)
|
||||||
@@ -1166,13 +1200,16 @@ func (am *DefaultAccountManager) checkIfUserOwnsPeer(ctx context.Context, accoun
|
|||||||
// UpdateAccountPeers updates all peers that belong to an account.
|
// UpdateAccountPeers updates all peers that belong to an account.
|
||||||
// Should be called when changes have to be synced to peers.
|
// Should be called when changes have to be synced to peers.
|
||||||
func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
|
func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, accountID string) {
|
||||||
|
globalStart := time.Now()
|
||||||
|
start := time.Now()
|
||||||
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
|
account, err := am.requestBuffer.GetAccountWithBackpressure(ctx, accountID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Errorf("failed to send out updates to peers. failed to get account: %v", err)
|
log.WithContext(ctx).Errorf("failed to send out updates to peers. failed to get account: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.WithContext(ctx).Infof("updateAccountPeers: getAccount took %s", time.Since(start))
|
||||||
|
|
||||||
start := time.Now()
|
start = time.Now()
|
||||||
|
|
||||||
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1180,6 +1217,8 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.WithContext(ctx).Infof("updateAccountPeers: validatePeers took %s", time.Since(start))
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
semaphore := make(chan struct{}, 10)
|
semaphore := make(chan struct{}, 10)
|
||||||
|
|
||||||
@@ -1189,11 +1228,21 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
resourcePolicies := account.GetResourcePoliciesMap()
|
resourcePolicies := account.GetResourcePoliciesMap()
|
||||||
routers := account.GetResourceRoutersMap()
|
routers := account.GetResourceRoutersMap()
|
||||||
|
|
||||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountID)
|
start = time.Now()
|
||||||
|
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMapsAll(ctx, accountID, account.Peers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
log.WithContext(ctx).Infof("updateAccountPeers: getProxyNetworkMaps took %s", time.Since(start))
|
||||||
|
for _, id := range []string{"d07kd1ei389c73dq19gg", "d07kcaui389c73dq19g0", "d0e7uo6i389c73f040v0"} {
|
||||||
|
peerMap, ok := proxyNetworkMaps[id]
|
||||||
|
if !ok {
|
||||||
|
log.WithContext(ctx).Infof("updateAccountPeers xxx: proxy network map %s not found", id)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Infof("updateAccountPeers xxx: peer %s has %d peers, %d offline peers, %d, firewall rules, %d forwarding rules, %d routing rules", id, len(peerMap.Peers), len(peerMap.OfflinePeers), len(peerMap.FirewallRules), len(peerMap.ForwardingRules), len(peerMap.RoutesFirewallRules))
|
||||||
|
}
|
||||||
|
|
||||||
for _, peer := range account.Peers {
|
for _, peer := range account.Peers {
|
||||||
if !am.peersUpdateManager.HasChannel(peer.ID) {
|
if !am.peersUpdateManager.HasChannel(peer.ID) {
|
||||||
@@ -1226,16 +1275,22 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
start = time.Now()
|
||||||
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings.RoutingPeerDNSResolutionEnabled, extraSetting)
|
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings.RoutingPeerDNSResolutionEnabled, extraSetting)
|
||||||
|
log.WithContext(ctx).Infof("updateAccountPeers: toSyncResponse took %s", time.Since(start))
|
||||||
|
start = time.Now()
|
||||||
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
|
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
|
||||||
|
log.WithContext(ctx).Infof("updateAccountPeers: sending update toSyncResponse took %s", time.Since(start))
|
||||||
}(peer)
|
}(peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
|
log.WithContext(ctx).Infof("updateAccountPeers: waiting for updates to complete took %s", time.Since(globalStart))
|
||||||
|
|
||||||
if am.metrics != nil {
|
if am.metrics != nil {
|
||||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(start))
|
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1292,7 +1347,7 @@ func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountI
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountId)
|
proxyNetworkMaps, err := am.proxyController.GetProxyNetworkMaps(ctx, accountId, peerId, account.Peers)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
log.WithContext(ctx).Errorf("failed to get proxy network maps: %v", err)
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ type Peer struct {
|
|||||||
// Meta is a Peer system meta data
|
// Meta is a Peer system meta data
|
||||||
Meta PeerSystemMeta `gorm:"embedded;embeddedPrefix:meta_"`
|
Meta PeerSystemMeta `gorm:"embedded;embeddedPrefix:meta_"`
|
||||||
// Name is peer's name (machine name)
|
// Name is peer's name (machine name)
|
||||||
Name string
|
Name string `gorm:"index"`
|
||||||
// DNSLabel is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's
|
// DNSLabel is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's
|
||||||
// domain to the peer label. e.g. peer-dns-label.netbird.cloud
|
// domain to the peer label. e.g. peer-dns-label.netbird.cloud
|
||||||
DNSLabel string
|
DNSLabel string
|
||||||
|
|||||||
@@ -37,6 +37,8 @@ const (
|
|||||||
|
|
||||||
// Unauthenticated indicates that user is not authenticated due to absence of valid credentials
|
// Unauthenticated indicates that user is not authenticated due to absence of valid credentials
|
||||||
Unauthenticated Type = 10
|
Unauthenticated Type = 10
|
||||||
|
|
||||||
|
StatusTooManyRequests = 11
|
||||||
)
|
)
|
||||||
|
|
||||||
// Type is a type of the Error
|
// Type is a type of the Error
|
||||||
|
|||||||
@@ -1311,7 +1311,7 @@ func (s *SqlStore) GetAccountPeers(ctx context.Context, lockStrength LockingStre
|
|||||||
query := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Where(accountIDCondition, accountID)
|
query := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Where(accountIDCondition, accountID)
|
||||||
|
|
||||||
if nameFilter != "" {
|
if nameFilter != "" {
|
||||||
query = query.Where("name LIKE ?", "%"+nameFilter+"%")
|
query = query.Where("name = ?", nameFilter)
|
||||||
}
|
}
|
||||||
if ipFilter != "" {
|
if ipFilter != "" {
|
||||||
query = query.Where("ip LIKE ?", "%"+ipFilter+"%")
|
query = query.Where("ip LIKE ?", "%"+ipFilter+"%")
|
||||||
|
|||||||
Reference in New Issue
Block a user