Compare commits

...

6 Commits

Author SHA1 Message Date
Pascal Fischer
1dca0ab404 increase grpc timeouts 2025-03-14 10:25:53 +01:00
Pascal Fischer
c262c1b252 cut getServerPublicKey 2025-03-13 18:38:18 +01:00
Pascal Fischer
cf330f4b45 limit update channel to 2 messages only 2025-03-13 17:56:48 +01:00
Pascal Fischer
8f4db28476 limit update channel to 2 messages only 2025-03-13 17:32:39 +01:00
Pascal Fischer
1b3471a354 add pprof 2025-03-13 15:53:35 +01:00
Pascal Fischer
2d82d2fb83 add buffer for update account peers 2025-03-13 14:58:47 +01:00
7 changed files with 70 additions and 50 deletions

View File

@@ -272,9 +272,9 @@ func (c *GrpcClient) GetServerPublicKey() (*wgtypes.Key, error) {
return nil, errors.New(errMsgNoMgmtConnection)
}
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
defer cancel()
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
// mgmCtx, cancel := context.WithTimeout(c.ctx, 30*time.Second)
// defer cancel()
resp, err := c.realClient.GetServerKey(c.ctx, &proto.Empty{})
if err != nil {
log.Errorf("failed while getting Management Service public key: %v", err)
return nil, fmt.Errorf("failed while getting Management Service public key")
@@ -327,11 +327,9 @@ func (c *GrpcClient) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*pro
var resp *proto.EncryptedMessage
operation := func() error {
mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
defer cancel()
var err error
resp, err = c.realClient.Login(mgmCtx, &proto.EncryptedMessage{
resp, err = c.realClient.Login(context.Background(), &proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(),
Body: loginReq,
})

View File

@@ -11,6 +11,7 @@ import (
"io/fs"
"net"
"net/http"
_ "net/http/pprof"
"net/netip"
"net/url"
"os"
@@ -34,6 +35,7 @@ import (
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
"github.com/netbirdio/management-integrations/integrations"
"github.com/netbirdio/netbird/management/server/peers"
"github.com/netbirdio/netbird/encryption"
@@ -90,6 +92,11 @@ var (
PreRunE: func(cmd *cobra.Command, args []string) error {
flag.Parse()
go func() {
log.Infof("Starting pprof on :6060")
http.ListenAndServe("localhost:6060", nil)
}()
//nolint
ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource)
@@ -132,6 +139,7 @@ var (
return nil
},
RunE: func(cmd *cobra.Command, args []string) error {
flag.Parse()
ctx, cancel := context.WithCancel(cmd.Context())

View File

@@ -68,7 +68,8 @@ type DefaultAccountManager struct {
eventStore activity.Store
geo geolocation.Geolocation
requestBuffer *AccountRequestBuffer
accountUpdateLocks sync.Map
requestBuffer *AccountRequestBuffer
proxyController port_forwarding.Controller
settingsManager settings.Manager
@@ -1222,7 +1223,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
if removedGroupAffectsPeers || newGroupsAffectsPeers {
log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId)
am.UpdateAccountPeers(ctx, userAuth.AccountId)
am.BufferUpdateAccountPeers(ctx, userAuth.AccountId)
}
}
@@ -1461,7 +1462,7 @@ func (am *DefaultAccountManager) GetDNSDomain() string {
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) {

View File

@@ -10,13 +10,12 @@ import (
"time"
pb "github.com/golang/protobuf/proto" // nolint
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
integrationsConfig "github.com/netbirdio/management-integrations/integrations/config"
"github.com/netbirdio/netbird/encryption"
@@ -38,6 +37,7 @@ type GRPCServer struct {
accountManager account.Manager
settingsManager settings.Manager
wgKey wgtypes.Key
wgPubKeySting string
proto.UnimplementedManagementServiceServer
peersUpdateManager *PeersUpdateManager
config *Config
@@ -76,7 +76,8 @@ func NewServer(
}
return &GRPCServer{
wgKey: key,
wgKey: key,
wgPubKeySting: key.PublicKey().String(),
// peerKey -> event channel
peersUpdateManager: peersUpdateManager,
accountManager: accountManager,
@@ -90,30 +91,16 @@ func NewServer(
}
func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto.ServerKeyResponse, error) {
ip := ""
p, ok := peer.FromContext(ctx)
if ok {
ip = p.Addr.String()
}
log.WithContext(ctx).Tracef("GetServerKey request from %s", ip)
start := time.Now()
defer func() {
log.WithContext(ctx).Tracef("GetServerKey from %s took %v", ip, time.Since(start))
}()
// todo introduce something more meaningful with the key expiration/rotation
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountGetKeyRequest()
}
now := time.Now().Add(24 * time.Hour)
secs := int64(now.Second())
nanos := int32(now.Nanosecond())
expiresAt := &timestamp.Timestamp{Seconds: secs, Nanos: nanos}
expiresAt := time.Now().Add(24 * time.Hour)
return &proto.ServerKeyResponse{
Key: s.wgKey.PublicKey().String(),
ExpiresAt: expiresAt,
Key: s.wgPubKeySting,
ExpiresAt: timestamppb.New(expiresAt),
}, nil
}
@@ -242,7 +229,7 @@ func (s *GRPCServer) sendUpdate(ctx context.Context, accountID string, peerKey w
return status.Errorf(codes.Internal, "failed processing update message")
}
err = srv.SendMsg(&proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(),
WgPubKey: s.wgPubKeySting,
Body: encryptedResp,
})
if err != nil {
@@ -500,7 +487,7 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
}
return &proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(),
WgPubKey: s.wgPubKeySting,
Body: encryptedResp,
}, nil
}
@@ -713,7 +700,7 @@ func (s *GRPCServer) sendInitialSync(ctx context.Context, peerKey wgtypes.Key, p
}
err = srv.Send(&proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(),
WgPubKey: s.wgPubKeySting,
Body: encryptedResp,
})
@@ -778,7 +765,7 @@ func (s *GRPCServer) GetDeviceAuthorizationFlow(ctx context.Context, req *proto.
}
return &proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(),
WgPubKey: s.wgPubKeySting,
Body: encryptedResp,
}, nil
}
@@ -830,7 +817,7 @@ func (s *GRPCServer) GetPKCEAuthorizationFlow(ctx context.Context, req *proto.En
}
return &proto.EncryptedMessage{
WgPubKey: s.wgKey.PublicKey().String(),
WgPubKey: s.wgPubKeySting,
Body: encryptedResp,
}, nil
}

View File

@@ -6,7 +6,9 @@ import (
b64 "encoding/base64"
"fmt"
"net"
"os"
"slices"
"strconv"
"strings"
"sync"
"time"
@@ -135,7 +137,7 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
if expired {
// we need to update other peers because when peer login expires all other peers are notified to disconnect from
// the expired one. Here we notify them that connection is now allowed again.
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return nil
@@ -302,7 +304,7 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
}
if peerLabelChanged || requiresPeerUpdates {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
} else if sshChanged {
am.UpdateAccountPeer(ctx, accountID, peer.ID)
}
@@ -383,7 +385,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
}
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return nil
@@ -653,7 +655,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
unlock = nil
if updateAccountPeers {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
@@ -748,7 +750,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
}
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer)
@@ -893,7 +895,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
unlockPeer = nil
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
am.UpdateAccountPeers(ctx, accountID)
am.BufferUpdateAccountPeers(ctx, accountID)
}
return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer)
@@ -1226,6 +1228,28 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
}
}
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{})
lock := mu.(*sync.Mutex)
if !lock.TryLock() {
return
}
go func() {
intervalStr := os.Getenv("PEER_UPDATE_INTERVAL_MS")
interval, err := strconv.Atoi(intervalStr)
if err != nil {
interval = 10000
}
time.Sleep(time.Duration(interval) * time.Millisecond)
am.UpdateAccountPeers(ctx, accountID)
am.accountUpdateLocks.Delete(accountID)
lock.Unlock()
}()
}
// UpdateAccountPeer updates a single peer that belongs to an account.
// Should be called when changes need to be synced to a specific peer only.
func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {

View File

@@ -57,8 +57,13 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
case channel <- update:
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
default:
dropped = true
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
select {
case <-channel:
log.WithContext(ctx).Trace("dropped oldest message from channel for peer %s", peerID)
default:
channel <- update
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
}
}
} else {
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
@@ -85,7 +90,7 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
close(channel)
}
// mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize)
channel := make(chan *UpdateMessage, 2)
p.peerChannels[peerID] = channel
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)

View File

@@ -71,18 +71,15 @@ func CreateConnection(addr string, tlsEnabled bool) (*grpc.ClientConn, error) {
}))
}
connCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
conn, err := grpc.DialContext(
connCtx,
context.Background(),
addr,
transportOption,
WithCustomDialer(),
grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 30 * time.Second,
Timeout: 10 * time.Second,
Time: 10 * time.Minute,
Timeout: 5 * time.Minute,
}),
)
if err != nil {