mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-19 08:46:38 +00:00
Compare commits
2 Commits
fix/androi
...
feat/sync-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b8d9386466 | ||
|
|
9914212ce5 |
@@ -7,8 +7,10 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"os"
|
"os"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
pb "github.com/golang/protobuf/proto" // nolint
|
pb "github.com/golang/protobuf/proto" // nolint
|
||||||
@@ -44,6 +46,9 @@ import (
|
|||||||
const (
|
const (
|
||||||
envLogBlockedPeers = "NB_LOG_BLOCKED_PEERS"
|
envLogBlockedPeers = "NB_LOG_BLOCKED_PEERS"
|
||||||
envBlockPeers = "NB_BLOCK_SAME_PEERS"
|
envBlockPeers = "NB_BLOCK_SAME_PEERS"
|
||||||
|
envConcurrentSyncs = "NB_MAX_CONCURRENT_SYNCS"
|
||||||
|
|
||||||
|
defaultSyncLim = 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
// GRPCServer an instance of a Management gRPC API server
|
// GRPCServer an instance of a Management gRPC API server
|
||||||
@@ -63,6 +68,9 @@ type GRPCServer struct {
|
|||||||
logBlockedPeers bool
|
logBlockedPeers bool
|
||||||
blockPeersWithSameConfig bool
|
blockPeersWithSameConfig bool
|
||||||
integratedPeerValidator integrated_validator.IntegratedValidator
|
integratedPeerValidator integrated_validator.IntegratedValidator
|
||||||
|
|
||||||
|
syncSem atomic.Int32
|
||||||
|
syncLim int32
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new Management server
|
// NewServer creates a new Management server
|
||||||
@@ -96,6 +104,16 @@ func NewServer(
|
|||||||
logBlockedPeers := strings.ToLower(os.Getenv(envLogBlockedPeers)) == "true"
|
logBlockedPeers := strings.ToLower(os.Getenv(envLogBlockedPeers)) == "true"
|
||||||
blockPeersWithSameConfig := strings.ToLower(os.Getenv(envBlockPeers)) == "true"
|
blockPeersWithSameConfig := strings.ToLower(os.Getenv(envBlockPeers)) == "true"
|
||||||
|
|
||||||
|
syncLim := int32(defaultSyncLim)
|
||||||
|
if syncLimStr := os.Getenv(envConcurrentSyncs); syncLimStr != "" {
|
||||||
|
syncLimParsed, err := strconv.Atoi(syncLimStr)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("invalid value for %s: %v using %d", envConcurrentSyncs, err, defaultSyncLim)
|
||||||
|
} else {
|
||||||
|
syncLim = int32(syncLimParsed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return &GRPCServer{
|
return &GRPCServer{
|
||||||
wgKey: key,
|
wgKey: key,
|
||||||
// peerKey -> event channel
|
// peerKey -> event channel
|
||||||
@@ -110,6 +128,8 @@ func NewServer(
|
|||||||
logBlockedPeers: logBlockedPeers,
|
logBlockedPeers: logBlockedPeers,
|
||||||
blockPeersWithSameConfig: blockPeersWithSameConfig,
|
blockPeersWithSameConfig: blockPeersWithSameConfig,
|
||||||
integratedPeerValidator: integratedPeerValidator,
|
integratedPeerValidator: integratedPeerValidator,
|
||||||
|
|
||||||
|
syncLim: syncLim,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -151,6 +171,11 @@ func getRealIP(ctx context.Context) net.IP {
|
|||||||
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
||||||
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
||||||
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
||||||
|
if s.syncSem.Load() >= s.syncLim {
|
||||||
|
return status.Errorf(codes.ResourceExhausted, "too many concurrent sync requests, please try again later")
|
||||||
|
}
|
||||||
|
s.syncSem.Add(1)
|
||||||
|
|
||||||
reqStart := time.Now()
|
reqStart := time.Now()
|
||||||
|
|
||||||
ctx := srv.Context()
|
ctx := srv.Context()
|
||||||
@@ -158,6 +183,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
|||||||
syncReq := &proto.SyncRequest{}
|
syncReq := &proto.SyncRequest{}
|
||||||
peerKey, err := s.parseRequest(ctx, req, syncReq)
|
peerKey, err := s.parseRequest(ctx, req, syncReq)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
s.syncSem.Add(-1)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
realIP := getRealIP(ctx)
|
realIP := getRealIP(ctx)
|
||||||
@@ -172,6 +198,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
|||||||
log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed)
|
log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed)
|
||||||
}
|
}
|
||||||
if s.blockPeersWithSameConfig {
|
if s.blockPeersWithSameConfig {
|
||||||
|
s.syncSem.Add(-1)
|
||||||
return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn)
|
return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -196,8 +223,10 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
|||||||
ctx = context.WithValue(ctx, nbContext.AccountIDKey, "UNKNOWN")
|
ctx = context.WithValue(ctx, nbContext.AccountIDKey, "UNKNOWN")
|
||||||
log.WithContext(ctx).Tracef("peer %s is not registered", peerKey.String())
|
log.WithContext(ctx).Tracef("peer %s is not registered", peerKey.String())
|
||||||
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
|
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
|
||||||
|
s.syncSem.Add(-1)
|
||||||
return status.Errorf(codes.PermissionDenied, "peer is not registered")
|
return status.Errorf(codes.PermissionDenied, "peer is not registered")
|
||||||
}
|
}
|
||||||
|
s.syncSem.Add(-1)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -213,12 +242,14 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
|||||||
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP)
|
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
|
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
|
||||||
|
s.syncSem.Add(-1)
|
||||||
return mapError(ctx, err)
|
return mapError(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.sendInitialSync(ctx, peerKey, peer, netMap, postureChecks, srv)
|
err = s.sendInitialSync(ctx, peerKey, peer, netMap, postureChecks, srv)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Debugf("error while sending initial sync for %s: %v", peerKey.String(), err)
|
log.WithContext(ctx).Debugf("error while sending initial sync for %s: %v", peerKey.String(), err)
|
||||||
|
s.syncSem.Add(-1)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -237,6 +268,8 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
|||||||
|
|
||||||
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
|
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
|
||||||
|
|
||||||
|
s.syncSem.Add(-1)
|
||||||
|
|
||||||
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
|
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user