mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-20 01:06:45 +00:00
Compare commits
6 Commits
test/netwo
...
test/incre
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1dca0ab404 | ||
|
|
c262c1b252 | ||
|
|
cf330f4b45 | ||
|
|
8f4db28476 | ||
|
|
1b3471a354 | ||
|
|
2d82d2fb83 |
@@ -272,9 +272,9 @@ func (c *GrpcClient) GetServerPublicKey() (*wgtypes.Key, error) {
|
|||||||
return nil, errors.New(errMsgNoMgmtConnection)
|
return nil, errors.New(errMsgNoMgmtConnection)
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second)
|
// mgmCtx, cancel := context.WithTimeout(c.ctx, 30*time.Second)
|
||||||
defer cancel()
|
// defer cancel()
|
||||||
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
|
resp, err := c.realClient.GetServerKey(c.ctx, &proto.Empty{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed while getting Management Service public key: %v", err)
|
log.Errorf("failed while getting Management Service public key: %v", err)
|
||||||
return nil, fmt.Errorf("failed while getting Management Service public key")
|
return nil, fmt.Errorf("failed while getting Management Service public key")
|
||||||
@@ -327,11 +327,9 @@ func (c *GrpcClient) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*pro
|
|||||||
|
|
||||||
var resp *proto.EncryptedMessage
|
var resp *proto.EncryptedMessage
|
||||||
operation := func() error {
|
operation := func() error {
|
||||||
mgmCtx, cancel := context.WithTimeout(context.Background(), ConnectTimeout)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
resp, err = c.realClient.Login(mgmCtx, &proto.EncryptedMessage{
|
resp, err = c.realClient.Login(context.Background(), &proto.EncryptedMessage{
|
||||||
WgPubKey: c.key.PublicKey().String(),
|
WgPubKey: c.key.PublicKey().String(),
|
||||||
Body: loginReq,
|
Body: loginReq,
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"io/fs"
|
"io/fs"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
_ "net/http/pprof"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"net/url"
|
"net/url"
|
||||||
"os"
|
"os"
|
||||||
@@ -34,6 +35,7 @@ import (
|
|||||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||||
|
|
||||||
"github.com/netbirdio/management-integrations/integrations"
|
"github.com/netbirdio/management-integrations/integrations"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/server/peers"
|
"github.com/netbirdio/netbird/management/server/peers"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/encryption"
|
"github.com/netbirdio/netbird/encryption"
|
||||||
@@ -90,6 +92,11 @@ var (
|
|||||||
PreRunE: func(cmd *cobra.Command, args []string) error {
|
PreRunE: func(cmd *cobra.Command, args []string) error {
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
log.Infof("Starting pprof on :6060")
|
||||||
|
http.ListenAndServe("localhost:6060", nil)
|
||||||
|
}()
|
||||||
|
|
||||||
//nolint
|
//nolint
|
||||||
ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource)
|
ctx := context.WithValue(cmd.Context(), hook.ExecutionContextKey, hook.SystemSource)
|
||||||
|
|
||||||
@@ -132,6 +139,7 @@ var (
|
|||||||
return nil
|
return nil
|
||||||
},
|
},
|
||||||
RunE: func(cmd *cobra.Command, args []string) error {
|
RunE: func(cmd *cobra.Command, args []string) error {
|
||||||
|
|
||||||
flag.Parse()
|
flag.Parse()
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(cmd.Context())
|
ctx, cancel := context.WithCancel(cmd.Context())
|
||||||
|
|||||||
@@ -68,6 +68,7 @@ type DefaultAccountManager struct {
|
|||||||
eventStore activity.Store
|
eventStore activity.Store
|
||||||
geo geolocation.Geolocation
|
geo geolocation.Geolocation
|
||||||
|
|
||||||
|
accountUpdateLocks sync.Map
|
||||||
requestBuffer *AccountRequestBuffer
|
requestBuffer *AccountRequestBuffer
|
||||||
|
|
||||||
proxyController port_forwarding.Controller
|
proxyController port_forwarding.Controller
|
||||||
@@ -1222,7 +1223,7 @@ func (am *DefaultAccountManager) SyncUserJWTGroups(ctx context.Context, userAuth
|
|||||||
|
|
||||||
if removedGroupAffectsPeers || newGroupsAffectsPeers {
|
if removedGroupAffectsPeers || newGroupsAffectsPeers {
|
||||||
log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId)
|
log.WithContext(ctx).Tracef("user %s: JWT group membership changed, updating account peers", userAuth.UserId)
|
||||||
am.UpdateAccountPeers(ctx, userAuth.AccountId)
|
am.BufferUpdateAccountPeers(ctx, userAuth.AccountId)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1461,7 +1462,7 @@ func (am *DefaultAccountManager) GetDNSDomain() string {
|
|||||||
|
|
||||||
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
|
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
|
||||||
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
|
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
|
||||||
am.UpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) {
|
func (am *DefaultAccountManager) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) {
|
||||||
|
|||||||
@@ -10,13 +10,12 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
pb "github.com/golang/protobuf/proto" // nolint
|
pb "github.com/golang/protobuf/proto" // nolint
|
||||||
"github.com/golang/protobuf/ptypes/timestamp"
|
|
||||||
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/realip"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/peer"
|
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
|
|
||||||
integrationsConfig "github.com/netbirdio/management-integrations/integrations/config"
|
integrationsConfig "github.com/netbirdio/management-integrations/integrations/config"
|
||||||
"github.com/netbirdio/netbird/encryption"
|
"github.com/netbirdio/netbird/encryption"
|
||||||
@@ -38,6 +37,7 @@ type GRPCServer struct {
|
|||||||
accountManager account.Manager
|
accountManager account.Manager
|
||||||
settingsManager settings.Manager
|
settingsManager settings.Manager
|
||||||
wgKey wgtypes.Key
|
wgKey wgtypes.Key
|
||||||
|
wgPubKeySting string
|
||||||
proto.UnimplementedManagementServiceServer
|
proto.UnimplementedManagementServiceServer
|
||||||
peersUpdateManager *PeersUpdateManager
|
peersUpdateManager *PeersUpdateManager
|
||||||
config *Config
|
config *Config
|
||||||
@@ -77,6 +77,7 @@ func NewServer(
|
|||||||
|
|
||||||
return &GRPCServer{
|
return &GRPCServer{
|
||||||
wgKey: key,
|
wgKey: key,
|
||||||
|
wgPubKeySting: key.PublicKey().String(),
|
||||||
// peerKey -> event channel
|
// peerKey -> event channel
|
||||||
peersUpdateManager: peersUpdateManager,
|
peersUpdateManager: peersUpdateManager,
|
||||||
accountManager: accountManager,
|
accountManager: accountManager,
|
||||||
@@ -90,30 +91,16 @@ func NewServer(
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto.ServerKeyResponse, error) {
|
func (s *GRPCServer) GetServerKey(ctx context.Context, req *proto.Empty) (*proto.ServerKeyResponse, error) {
|
||||||
ip := ""
|
|
||||||
p, ok := peer.FromContext(ctx)
|
|
||||||
if ok {
|
|
||||||
ip = p.Addr.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
log.WithContext(ctx).Tracef("GetServerKey request from %s", ip)
|
|
||||||
start := time.Now()
|
|
||||||
defer func() {
|
|
||||||
log.WithContext(ctx).Tracef("GetServerKey from %s took %v", ip, time.Since(start))
|
|
||||||
}()
|
|
||||||
|
|
||||||
// todo introduce something more meaningful with the key expiration/rotation
|
// todo introduce something more meaningful with the key expiration/rotation
|
||||||
if s.appMetrics != nil {
|
if s.appMetrics != nil {
|
||||||
s.appMetrics.GRPCMetrics().CountGetKeyRequest()
|
s.appMetrics.GRPCMetrics().CountGetKeyRequest()
|
||||||
}
|
}
|
||||||
now := time.Now().Add(24 * time.Hour)
|
|
||||||
secs := int64(now.Second())
|
expiresAt := time.Now().Add(24 * time.Hour)
|
||||||
nanos := int32(now.Nanosecond())
|
|
||||||
expiresAt := ×tamp.Timestamp{Seconds: secs, Nanos: nanos}
|
|
||||||
|
|
||||||
return &proto.ServerKeyResponse{
|
return &proto.ServerKeyResponse{
|
||||||
Key: s.wgKey.PublicKey().String(),
|
Key: s.wgPubKeySting,
|
||||||
ExpiresAt: expiresAt,
|
ExpiresAt: timestamppb.New(expiresAt),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -242,7 +229,7 @@ func (s *GRPCServer) sendUpdate(ctx context.Context, accountID string, peerKey w
|
|||||||
return status.Errorf(codes.Internal, "failed processing update message")
|
return status.Errorf(codes.Internal, "failed processing update message")
|
||||||
}
|
}
|
||||||
err = srv.SendMsg(&proto.EncryptedMessage{
|
err = srv.SendMsg(&proto.EncryptedMessage{
|
||||||
WgPubKey: s.wgKey.PublicKey().String(),
|
WgPubKey: s.wgPubKeySting,
|
||||||
Body: encryptedResp,
|
Body: encryptedResp,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -500,7 +487,7 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &proto.EncryptedMessage{
|
return &proto.EncryptedMessage{
|
||||||
WgPubKey: s.wgKey.PublicKey().String(),
|
WgPubKey: s.wgPubKeySting,
|
||||||
Body: encryptedResp,
|
Body: encryptedResp,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@@ -713,7 +700,7 @@ func (s *GRPCServer) sendInitialSync(ctx context.Context, peerKey wgtypes.Key, p
|
|||||||
}
|
}
|
||||||
|
|
||||||
err = srv.Send(&proto.EncryptedMessage{
|
err = srv.Send(&proto.EncryptedMessage{
|
||||||
WgPubKey: s.wgKey.PublicKey().String(),
|
WgPubKey: s.wgPubKeySting,
|
||||||
Body: encryptedResp,
|
Body: encryptedResp,
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -778,7 +765,7 @@ func (s *GRPCServer) GetDeviceAuthorizationFlow(ctx context.Context, req *proto.
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &proto.EncryptedMessage{
|
return &proto.EncryptedMessage{
|
||||||
WgPubKey: s.wgKey.PublicKey().String(),
|
WgPubKey: s.wgPubKeySting,
|
||||||
Body: encryptedResp,
|
Body: encryptedResp,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@@ -830,7 +817,7 @@ func (s *GRPCServer) GetPKCEAuthorizationFlow(ctx context.Context, req *proto.En
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &proto.EncryptedMessage{
|
return &proto.EncryptedMessage{
|
||||||
WgPubKey: s.wgKey.PublicKey().String(),
|
WgPubKey: s.wgPubKeySting,
|
||||||
Body: encryptedResp,
|
Body: encryptedResp,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,7 +6,9 @@ import (
|
|||||||
b64 "encoding/base64"
|
b64 "encoding/base64"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -135,7 +137,7 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
|
|||||||
if expired {
|
if expired {
|
||||||
// we need to update other peers because when peer login expires all other peers are notified to disconnect from
|
// we need to update other peers because when peer login expires all other peers are notified to disconnect from
|
||||||
// the expired one. Here we notify them that connection is now allowed again.
|
// the expired one. Here we notify them that connection is now allowed again.
|
||||||
am.UpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -302,7 +304,7 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
|
|||||||
}
|
}
|
||||||
|
|
||||||
if peerLabelChanged || requiresPeerUpdates {
|
if peerLabelChanged || requiresPeerUpdates {
|
||||||
am.UpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
} else if sshChanged {
|
} else if sshChanged {
|
||||||
am.UpdateAccountPeer(ctx, accountID, peer.ID)
|
am.UpdateAccountPeer(ctx, accountID, peer.ID)
|
||||||
}
|
}
|
||||||
@@ -383,7 +385,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
|
|||||||
}
|
}
|
||||||
|
|
||||||
if updateAccountPeers {
|
if updateAccountPeers {
|
||||||
am.UpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -653,7 +655,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
|
|||||||
unlock = nil
|
unlock = nil
|
||||||
|
|
||||||
if updateAccountPeers {
|
if updateAccountPeers {
|
||||||
am.UpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
|
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
|
||||||
@@ -748,7 +750,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
|
|||||||
}
|
}
|
||||||
|
|
||||||
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
|
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
|
||||||
am.UpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer)
|
return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer)
|
||||||
@@ -893,7 +895,7 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
|
|||||||
unlockPeer = nil
|
unlockPeer = nil
|
||||||
|
|
||||||
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
|
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
|
||||||
am.UpdateAccountPeers(ctx, accountID)
|
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||||
}
|
}
|
||||||
|
|
||||||
return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer)
|
return am.getValidatedPeerWithMap(ctx, isRequiresApproval, accountID, peer)
|
||||||
@@ -1226,6 +1228,28 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||||
|
mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{})
|
||||||
|
lock := mu.(*sync.Mutex)
|
||||||
|
|
||||||
|
if !lock.TryLock() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
intervalStr := os.Getenv("PEER_UPDATE_INTERVAL_MS")
|
||||||
|
interval, err := strconv.Atoi(intervalStr)
|
||||||
|
if err != nil {
|
||||||
|
interval = 10000
|
||||||
|
}
|
||||||
|
time.Sleep(time.Duration(interval) * time.Millisecond)
|
||||||
|
am.UpdateAccountPeers(ctx, accountID)
|
||||||
|
|
||||||
|
am.accountUpdateLocks.Delete(accountID)
|
||||||
|
lock.Unlock()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateAccountPeer updates a single peer that belongs to an account.
|
// UpdateAccountPeer updates a single peer that belongs to an account.
|
||||||
// Should be called when changes need to be synced to a specific peer only.
|
// Should be called when changes need to be synced to a specific peer only.
|
||||||
func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {
|
func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountId string, peerId string) {
|
||||||
|
|||||||
@@ -57,8 +57,13 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
|
|||||||
case channel <- update:
|
case channel <- update:
|
||||||
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
|
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
|
||||||
default:
|
default:
|
||||||
dropped = true
|
select {
|
||||||
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
|
case <-channel:
|
||||||
|
log.WithContext(ctx).Trace("dropped oldest message from channel for peer %s", peerID)
|
||||||
|
default:
|
||||||
|
channel <- update
|
||||||
|
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
|
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
|
||||||
@@ -85,7 +90,7 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
|
|||||||
close(channel)
|
close(channel)
|
||||||
}
|
}
|
||||||
// mbragin: todo shouldn't it be more? or configurable?
|
// mbragin: todo shouldn't it be more? or configurable?
|
||||||
channel := make(chan *UpdateMessage, channelBufferSize)
|
channel := make(chan *UpdateMessage, 2)
|
||||||
p.peerChannels[peerID] = channel
|
p.peerChannels[peerID] = channel
|
||||||
|
|
||||||
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
|
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
|
||||||
|
|||||||
@@ -71,18 +71,15 @@ func CreateConnection(addr string, tlsEnabled bool) (*grpc.ClientConn, error) {
|
|||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
connCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
conn, err := grpc.DialContext(
|
conn, err := grpc.DialContext(
|
||||||
connCtx,
|
context.Background(),
|
||||||
addr,
|
addr,
|
||||||
transportOption,
|
transportOption,
|
||||||
WithCustomDialer(),
|
WithCustomDialer(),
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||||
Time: 30 * time.Second,
|
Time: 10 * time.Minute,
|
||||||
Timeout: 10 * time.Second,
|
Timeout: 5 * time.Minute,
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
Reference in New Issue
Block a user