mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-19 00:36:38 +00:00
Compare commits
6 Commits
test/netwo
...
test/incre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dca0ab404 | ||
|
|
c262c1b252 | ||
|
|
cf330f4b45 | ||
|
|
8f4db28476 | ||
|
|
1b3471a354 | ||
|
|
2d82d2fb83 |
@@ -272,9 +272,9 @@ func (c *GrpcClient) GetServerPublicKey() (*wgtypes.Key, error) {
|
||||
return nil, errors.New(errMsgNoMgmtConnection)
|
||||
}
|
||||
|
||||
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
|
||||
defer cancel()
|
||||
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
|
||||
// mgmCtx, cancel := context.WithTimeout(c.ctx, 30*time.Second)
|
||||
// defer cancel()
|
||||
resp, err := c.realClient.GetServerKey(c.ctx, &proto.Empty{})
|
||||
if err != nil {
|
||||
log.Errorf("failed while getting Management Service public key: %v", err)
|
||||
return nil, fmt.Errorf("failed while getting Management Service public key")
|
||||
@@ -327,11 +327,9 @@ func (c *GrpcClient) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*pro
|
||||
|
||||
var resp *proto.EncryptedMessage
|
||||
operation := func() error {
|
||||
mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
|
||||
defer cancel()
|
||||
|
||||
var err error
|
||||
resp, err = c.realClient.Login(mgmCtx, &proto.EncryptedMessage{
|
||||
resp, err = c.realClient.Login(context.Background(), &proto.EncryptedMessage{
|
||||
WgPubKey: c.key.PublicKey().String(),
|
||||
Body: loginReq,
|
||||
})
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
"io/fs"
|
||||
"net"
|
||||
"net/http"
|
||||
_ "net/http/pprof"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
"os"
|
||||
@@ -34,6 +35,7 @@ import (
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||
|
||||
"github.com/netbirdio/management-integrations/integrations"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/peers"
|
||||
|
||||
"github.com/netbirdio/netbird/encryption"
|
||||
@@ -90,6 +92,11 @@ var (
|
||||
PreRunE: func(cmd *cobra.Command, args []string) error {
|
||||
flag.Parse()
|
||||
|
||||
go func() {
|
||||
log.Infof("Starting pprof on :6060")
|
||||
http.ListenAndServe("localhost:6060", nil)
|
||||
}()
|
||||
|
||||
//nolint
|
||||
ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource)
|
||||
|
||||
@@ -132,6 +139,7 @@ var (
|
||||
return nil
|
||||
},
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
|
||||
flag.Parse()
|
||||
|
||||
ctx, cancel := context.WithCancel(cmd.Context())
|
||||
|
||||
@@ -68,7 +68,8 @@ type DefaultAccountManager struct {
|
||||
eventStore activity.Store
|
||||
geo geolocation.Geolocation
|
||||
|
||||
requestBuffer *AccountRequestBuffer
|
||||
accountUpdateLocks sync.Map
|
||||
requestBuffer *AccountRequestBuffer
|
||||
|
||||
proxyController port_forwarding.Controller
|
||||
settingsManager settings.Manager
|
||||
@@ -1222,7 +1223,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
|
||||
|
||||
if removedGroupAffectsPeers || newGroupsAffectsPeers {
|
||||
log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId)
|
||||
am.UpdateAccountPeers(ctx, userAuth.AccountId)
|
||||
am.BufferUpdateAccountPeers(ctx, userAuth.AccountId)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1461,7 +1462,7 @@ func (am *DefaultAccountManager) GetDNSDomain() string {
|
||||
|
||||
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
|
||||
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
|
||||
am.UpdateAccountPeers(ctx, accountID)
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) {
|
||||
|
||||
@@ -10,13 +10,12 @@ import (
|
||||
"time"
|
||||
|
||||
pb "github.com/golang/protobuf/proto" // nolint
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/status"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
integrationsConfig "github.com/netbirdio/management-integrations/integrations/config"
|
||||
"github.com/netbirdio/netbird/encryption"
|
||||
@@ -38,6 +37,7 @@ type GRPCServer struct {
|
||||
accountManager account.Manager
|
||||
settingsManager settings.Manager
|
||||
wgKey wgtypes.Key
|
||||
wgPubKeySting string
|
||||
proto.UnimplementedManagementServiceServer
|
||||
peersUpdateManager *PeersUpdateManager
|
||||
config *Config
|
||||
@@ -76,7 +76,8 @@ func NewServer(
|
||||
}
|
||||
|
||||
return &GRPCServer{
|
||||
wgKey: key,
|
||||
wgKey: key,
|
||||
wgPubKeySting: key.PublicKey().String(),
|
||||
// peerKey -> event channel
|
||||
peersUpdateManager: peersUpdateManager,
|
||||
accountManager: accountManager,
|
||||
@@ -90,30 +91,16 @@ func NewServer(
|
||||
}
|
||||
|
||||
func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto.ServerKeyResponse, error) {
|
||||
ip := ""
|
||||
p, ok := peer.FromContext(ctx)
|
||||
if ok {
|
||||
ip = p.Addr.String()
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Tracef("GetServerKey request from %s", ip)
|
||||
start := time.Now()
|
||||
defer func() {
|
||||
log.WithContext(ctx).Tracef("GetServerKey from %s took %v", ip, time.Since(start))
|
||||
}()
|
||||
|
||||
// todo introduce something more meaningful with the key expiration/rotation
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().CountGetKeyRequest()
|
||||
}
|
||||
now := time.Now().Add(24 * time.Hour)
|
||||
secs := int64(now.Second())
|
||||
nanos := int32(now.Nanosecond())
|
||||
expiresAt := ×tamp.Timestamp{Seconds: secs, Nanos: nanos}
|
||||
|
||||
expiresAt := time.Now().Add(24 * time.Hour)
|
||||
|
||||
return &proto.ServerKeyResponse{
|
||||
Key: s.wgKey.PublicKey().String(),
|
||||
ExpiresAt: expiresAt,
|
||||
Key: s.wgPubKeySting,
|
||||
ExpiresAt: timestamppb.New(expiresAt),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -242,7 +229,7 @@ func (s *GRPCServer) sendUpdate(ctx context.Context, accountID string, peerKey w
|
||||
return status.Errorf(codes.Internal, "failed processing update message")
|
||||
}
|
||||
err = srv.SendMsg(&proto.EncryptedMessage{
|
||||
WgPubKey: s.wgKey.PublicKey().String(),
|
||||
WgPubKey: s.wgPubKeySting,
|
||||
Body: encryptedResp,
|
||||
})
|
||||
if err != nil {
|
||||
@@ -500,7 +487,7 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
|
||||
}
|
||||
|
||||
return &proto.EncryptedMessage{
|
||||
WgPubKey: s.wgKey.PublicKey().String(),
|
||||
WgPubKey: s.wgPubKeySting,
|
||||
Body: encryptedResp,
|
||||
}, nil
|
||||
}
|
||||
@@ -713,7 +700,7 @@ func (s *GRPCServer) sendInitialSync(ctx context.Context, peerKey wgtypes.Key, p
|
||||
}
|
||||
|
||||
err = srv.Send(&proto.EncryptedMessage{
|
||||
WgPubKey: s.wgKey.PublicKey().String(),
|
||||
WgPubKey: s.wgPubKeySting,
|
||||
Body: encryptedResp,
|
||||
})
|
||||
|
||||
@@ -778,7 +765,7 @@ func (s *GRPCServer) GetDeviceAuthorizationFlow(ctx context.Context, req *proto.
|
||||
}
|
||||
|
||||
return &proto.EncryptedMessage{
|
||||
WgPubKey: s.wgKey.PublicKey().String(),
|
||||
WgPubKey: s.wgPubKeySting,
|
||||
Body: encryptedResp,
|
||||
}, nil
|
||||
}
|
||||
@@ -830,7 +817,7 @@ func (s *GRPCServer) GetPKCEAuthorizationFlow(ctx context.Context, req *proto.En
|
||||
}
|
||||
|
||||
return &proto.EncryptedMessage{
|
||||
WgPubKey: s.wgKey.PublicKey().String(),
|
||||
WgPubKey: s.wgPubKeySting,
|
||||
Body: encryptedResp,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -6,7 +6,9 @@ import (
|
||||
b64 "encoding/base64"
|
||||
"fmt"
|
||||
"net"
|
||||
"os"
|
||||
"slices"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -135,7 +137,7 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
|
||||
if expired {
|
||||
// we need to update other peers because when peer login expires all other peers are notified to disconnect from
|
||||
// the expired one. Here we notify them that connection is now allowed again.
|
||||
am.UpdateAccountPeers(ctx, accountID)
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -302,7 +304,7 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
|
||||
}
|
||||
|
||||
if peerLabelChanged || requiresPeerUpdates {
|
||||
am.UpdateAccountPeers(ctx, accountID)
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
} else if sshChanged {
|
||||
am.UpdateAccountPeer(ctx, accountID, peer.ID)
|
||||
}
|
||||
@@ -383,7 +385,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
|
||||
}
|
||||
|
||||
if updateAccountPeers {
|
||||
am.UpdateAccountPeers(ctx, accountID)
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -653,7 +655,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
||||
unlock = nil
|
||||
|
||||
if updateAccountPeers {
|
||||
am.UpdateAccountPeers(ctx, accountID)
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
|
||||
@@ -748,7 +750,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
|
||||
}
|
||||
|
||||
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
|
||||
am.UpdateAccountPeers(ctx, accountID)
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer)
|
||||
@@ -893,7 +895,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
|
||||
unlockPeer = nil
|
||||
|
||||
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
|
||||
am.UpdateAccountPeers(ctx, accountID)
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer)
|
||||
@@ -1226,6 +1228,28 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
}
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||
mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{})
|
||||
lock := mu.(*sync.Mutex)
|
||||
|
||||
if !lock.TryLock() {
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
intervalStr := os.Getenv("PEER_UPDATE_INTERVAL_MS")
|
||||
interval, err := strconv.Atoi(intervalStr)
|
||||
if err != nil {
|
||||
interval = 10000
|
||||
}
|
||||
time.Sleep(time.Duration(interval) * time.Millisecond)
|
||||
am.UpdateAccountPeers(ctx, accountID)
|
||||
|
||||
am.accountUpdateLocks.Delete(accountID)
|
||||
lock.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
// UpdateAccountPeer updates a single peer that belongs to an account.
|
||||
// Should be called when changes need to be synced to a specific peer only.
|
||||
func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {
|
||||
|
||||
@@ -57,8 +57,13 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
|
||||
case channel <- update:
|
||||
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
|
||||
default:
|
||||
dropped = true
|
||||
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
|
||||
select {
|
||||
case <-channel:
|
||||
log.WithContext(ctx).Trace("dropped oldest message from channel for peer %s", peerID)
|
||||
default:
|
||||
channel <- update
|
||||
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
|
||||
@@ -85,7 +90,7 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
|
||||
close(channel)
|
||||
}
|
||||
// mbragin: todo shouldn't it be more? or configurable?
|
||||
channel := make(chan *UpdateMessage, channelBufferSize)
|
||||
channel := make(chan *UpdateMessage, 2)
|
||||
p.peerChannels[peerID] = channel
|
||||
|
||||
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
|
||||
|
||||
@@ -71,18 +71,15 @@ func CreateConnection(addr string, tlsEnabled bool) (*grpc.ClientConn, error) {
|
||||
}))
|
||||
}
|
||||
|
||||
connCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
conn, err := grpc.DialContext(
|
||||
connCtx,
|
||||
context.Background(),
|
||||
addr,
|
||||
transportOption,
|
||||
WithCustomDialer(),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 30 * time.Second,
|
||||
Timeout: 10 * time.Second,
|
||||
Time: 10 * time.Minute,
|
||||
Timeout: 5 * time.Minute,
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user