Compare commits

...

6 Commits

Author SHA1 Message Date
Pascal Fischer
1dca0ab404 increase grpc timeouts 2025-03-14 10:25:53 +01:00
Pascal Fischer
c262c1b252 cut getServerPublicKey 2025-03-13 18:38:18 +01:00
Pascal Fischer
cf330f4b45 limit update channel to 2 messages only 2025-03-13 17:56:48 +01:00
Pascal Fischer
8f4db28476 limit update channel to 2 messages only 2025-03-13 17:32:39 +01:00
Pascal Fischer
1b3471a354 add pprof 2025-03-13 15:53:35 +01:00
Pascal Fischer
2d82d2fb83 add buffer for update account peers 2025-03-13 14:58:47 +01:00
7 changed files with 70 additions and 50 deletions

View File

@@ -272,9 +272,9 @@ func (c *GrpcClient) GetServerPublicKey() (*wgtypes.Key, error) {
return nil, errors.New(errMsgNoMgmtConnection) return nil, errors.New(errMsgNoMgmtConnection)
} }
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) // mgmCtx, cancel := context.WithTimeout(c.ctx, 30*time.Second)
defer cancel() // defer cancel()
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{}) resp, err := c.realClient.GetServerKey(c.ctx, &proto.Empty{})
if err != nil { if err != nil {
log.Errorf("failed while getting Management Service public key: %v", err) log.Errorf("failed while getting Management Service public key: %v", err)
return nil, fmt.Errorf("failed while getting Management Service public key") return nil, fmt.Errorf("failed while getting Management Service public key")
@@ -327,11 +327,9 @@ func (c *GrpcClient) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*pro
var resp *proto.EncryptedMessage var resp *proto.EncryptedMessage
operation := func() error { operation := func() error {
mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
defer cancel()
var err error var err error
resp, err = c.realClient.Login(mgmCtx, &proto.EncryptedMessage{ resp, err = c.realClient.Login(context.Background(), &proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(), WgPubKey: c.key.PublicKey().String(),
Body: loginReq, Body: loginReq,
}) })

View File

@@ -11,6 +11,7 @@ import (
"io/fs" "io/fs"
"net" "net"
"net/http" "net/http"
_ "net/http/pprof"
"net/netip" "net/netip"
"net/url" "net/url"
"os" "os"
@@ -34,6 +35,7 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
"github.com/netbirdio/management-integrations/integrations" "github.com/netbirdio/management-integrations/integrations"
"github.com/netbirdio/netbird/management/server/peers" "github.com/netbirdio/netbird/management/server/peers"
"github.com/netbirdio/netbird/encryption" "github.com/netbirdio/netbird/encryption"
@@ -90,6 +92,11 @@ var (
PreRunE: func(cmd *cobra.Command, args []string) error { PreRunE: func(cmd *cobra.Command, args []string) error {
flag.Parse() flag.Parse()
go func() {
log.Infof("Starting pprof on :6060")
http.ListenAndServe("localhost:6060", nil)
}()
//nolint //nolint
ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource) ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource)
@@ -132,6 +139,7 @@ var (
return nil return nil
}, },
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
flag.Parse() flag.Parse()
ctx, cancel := context.WithCancel(cmd.Context()) ctx, cancel := context.WithCancel(cmd.Context())

View File

@@ -68,7 +68,8 @@ type DefaultAccountManager struct {
eventStore activity.Store eventStore activity.Store
geo geolocation.Geolocation geo geolocation.Geolocation
requestBuffer *AccountRequestBuffer accountUpdateLocks sync.Map
requestBuffer *AccountRequestBuffer
proxyController port_forwarding.Controller proxyController port_forwarding.Controller
settingsManager settings.Manager settingsManager settings.Manager
@@ -1222,7 +1223,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
if removedGroupAffectsPeers || newGroupsAffectsPeers { if removedGroupAffectsPeers || newGroupsAffectsPeers {
log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId) log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId)
am.UpdateAccountPeers(ctx, userAuth.AccountId) am.BufferUpdateAccountPeers(ctx, userAuth.AccountId)
} }
} }
@@ -1461,7 +1462,7 @@ func (am *DefaultAccountManager) GetDNSDomain() string {
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) { func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID) log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) { func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) {

View File

@@ -10,13 +10,12 @@ import (
"time" "time"
pb "github.com/golang/protobuf/proto" // nolint pb "github.com/golang/protobuf/proto" // nolint
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip" "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
integrationsConfig "github.com/netbirdio/management-integrations/integrations/config" integrationsConfig "github.com/netbirdio/management-integrations/integrations/config"
"github.com/netbirdio/netbird/encryption" "github.com/netbirdio/netbird/encryption"
@@ -38,6 +37,7 @@ type GRPCServer struct {
accountManager account.Manager accountManager account.Manager
settingsManager settings.Manager settingsManager settings.Manager
wgKey wgtypes.Key wgKey wgtypes.Key
wgPubKeySting string
proto.UnimplementedManagementServiceServer proto.UnimplementedManagementServiceServer
peersUpdateManager *PeersUpdateManager peersUpdateManager *PeersUpdateManager
config *Config config *Config
@@ -76,7 +76,8 @@ func NewServer(
} }
return &GRPCServer{ return &GRPCServer{
wgKey: key, wgKey: key,
wgPubKeySting: key.PublicKey().String(),
// peerKey -> event channel // peerKey -> event channel
peersUpdateManager: peersUpdateManager, peersUpdateManager: peersUpdateManager,
accountManager: accountManager, accountManager: accountManager,
@@ -90,30 +91,16 @@ func NewServer(
} }
func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto.ServerKeyResponse, error) { func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto.ServerKeyResponse, error) {
ip := ""
p, ok := peer.FromContext(ctx)
if ok {
ip = p.Addr.String()
}
log.WithContext(ctx).Tracef("GetServerKey request from %s", ip)
start := time.Now()
defer func() {
log.WithContext(ctx).Tracef("GetServerKey from %s took %v", ip, time.Since(start))
}()
// todo introduce something more meaningful with the key expiration/rotation // todo introduce something more meaningful with the key expiration/rotation
if s.appMetrics != nil { if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountGetKeyRequest() s.appMetrics.GRPCMetrics().CountGetKeyRequest()
} }
now := time.Now().Add(24 * time.Hour)
secs := int64(now.Second()) expiresAt := time.Now().Add(24 * time.Hour)
nanos := int32(now.Nanosecond())
expiresAt := &timestamp.Timestamp{Seconds: secs, Nanos: nanos}
return &proto.ServerKeyResponse{ return &proto.ServerKeyResponse{
Key: s.wgKey.PublicKey().String(), Key: s.wgPubKeySting,
ExpiresAt: expiresAt, ExpiresAt: timestamppb.New(expiresAt),
}, nil }, nil
} }
@@ -242,7 +229,7 @@ func (s *GRPCServer) sendUpdate(ctx context.Context, accountID string, peerKey w
return status.Errorf(codes.Internal, "failed processing update message") return status.Errorf(codes.Internal, "failed processing update message")
} }
err = srv.SendMsg(&proto.EncryptedMessage{ err = srv.SendMsg(&proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(), WgPubKey: s.wgPubKeySting,
Body: encryptedResp, Body: encryptedResp,
}) })
if err != nil { if err != nil {
@@ -500,7 +487,7 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
} }
return &proto.EncryptedMessage{ return &proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(), WgPubKey: s.wgPubKeySting,
Body: encryptedResp, Body: encryptedResp,
}, nil }, nil
} }
@@ -713,7 +700,7 @@ func (s *GRPCServer) sendInitialSync(ctx context.Context, peerKey wgtypes.Key, p
} }
err = srv.Send(&proto.EncryptedMessage{ err = srv.Send(&proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(), WgPubKey: s.wgPubKeySting,
Body: encryptedResp, Body: encryptedResp,
}) })
@@ -778,7 +765,7 @@ func (s *GRPCServer) GetDeviceAuthorizationFlow(ctx context.Context, req *proto.
} }
return &proto.EncryptedMessage{ return &proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(), WgPubKey: s.wgPubKeySting,
Body: encryptedResp, Body: encryptedResp,
}, nil }, nil
} }
@@ -830,7 +817,7 @@ func (s *GRPCServer) GetPKCEAuthorizationFlow(ctx context.Context, req *proto.En
} }
return &proto.EncryptedMessage{ return &proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(), WgPubKey: s.wgPubKeySting,
Body: encryptedResp, Body: encryptedResp,
}, nil }, nil
} }

View File

@@ -6,7 +6,9 @@ import (
b64 "encoding/base64" b64 "encoding/base64"
"fmt" "fmt"
"net" "net"
"os"
"slices" "slices"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -135,7 +137,7 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
if expired { if expired {
// we need to update other peers because when peer login expires all other peers are notified to disconnect from // we need to update other peers because when peer login expires all other peers are notified to disconnect from
// the expired one. Here we notify them that connection is now allowed again. // the expired one. Here we notify them that connection is now allowed again.
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return nil return nil
@@ -302,7 +304,7 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
} }
if peerLabelChanged || requiresPeerUpdates { if peerLabelChanged || requiresPeerUpdates {
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} else if sshChanged { } else if sshChanged {
am.UpdateAccountPeer(ctx, accountID, peer.ID) am.UpdateAccountPeer(ctx, accountID, peer.ID)
} }
@@ -383,7 +385,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
} }
if updateAccountPeers { if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return nil return nil
@@ -653,7 +655,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
unlock = nil unlock = nil
if updateAccountPeers { if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer) return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
@@ -748,7 +750,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
} }
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) { if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer) return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer)
@@ -893,7 +895,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
unlockPeer = nil unlockPeer = nil
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) { if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
am.UpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer) return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer)
@@ -1226,6 +1228,28 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
} }
} }
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{})
lock := mu.(*sync.Mutex)
if !lock.TryLock() {
return
}
go func() {
intervalStr := os.Getenv("PEER_UPDATE_INTERVAL_MS")
interval, err := strconv.Atoi(intervalStr)
if err != nil {
interval = 10000
}
time.Sleep(time.Duration(interval) * time.Millisecond)
am.UpdateAccountPeers(ctx, accountID)
am.accountUpdateLocks.Delete(accountID)
lock.Unlock()
}()
}
// UpdateAccountPeer updates a single peer that belongs to an account. // UpdateAccountPeer updates a single peer that belongs to an account.
// Should be called when changes need to be synced to a specific peer only. // Should be called when changes need to be synced to a specific peer only.
func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) { func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {

View File

@@ -57,8 +57,13 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
case channel <- update: case channel <- update:
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID) log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
default: default:
dropped = true select {
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel)) case <-channel:
log.WithContext(ctx).Trace("dropped oldest message from channel for peer %s", peerID)
default:
channel <- update
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
}
} }
} else { } else {
log.WithContext(ctx).Debugf("peer %s has no channel", peerID) log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
@@ -85,7 +90,7 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
close(channel) close(channel)
} }
// mbragin: todo shouldn't it be more? or configurable? // mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize) channel := make(chan *UpdateMessage, 2)
p.peerChannels[peerID] = channel p.peerChannels[peerID] = channel
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID) log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)

View File

@@ -71,18 +71,15 @@ func CreateConnection(addr string, tlsEnabled bool) (*grpc.ClientConn, error) {
})) }))
} }
connCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
conn, err := grpc.DialContext( conn, err := grpc.DialContext(
connCtx, context.Background(),
addr, addr,
transportOption, transportOption,
WithCustomDialer(), WithCustomDialer(),
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second, Time: 10 * time.Minute,
Timeout: 10 * time.Second, Timeout: 5 * time.Minute,
}), }),
) )
if err != nil { if err != nil {