mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-20 01:06:45 +00:00
Merge branch 'feature/optimize-network-map-updates' into feature/validate-group-association
# Conflicts: # management/server/dns_test.go # management/server/group.go # management/server/nameserver.go # management/server/peer.go # management/server/peer_test.go # management/server/user.go
This commit is contained in:
@@ -18,6 +18,8 @@ import (
|
||||
|
||||
"github.com/eko/gocache/v3/cache"
|
||||
cacheStore "github.com/eko/gocache/v3/store"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"github.com/miekg/dns"
|
||||
gocache "github.com/patrickmn/go-cache"
|
||||
"github.com/rs/xid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -37,6 +39,7 @@ import (
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/posture"
|
||||
"github.com/netbirdio/netbird/management/server/status"
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
"github.com/netbirdio/netbird/route"
|
||||
)
|
||||
|
||||
@@ -65,6 +68,7 @@ type AccountManager interface {
|
||||
SaveSetupKey(ctx context.Context, accountID string, key *SetupKey, userID string) (*SetupKey, error)
|
||||
CreateUser(ctx context.Context, accountID, initiatorUserID string, key *UserInfo) (*UserInfo, error)
|
||||
DeleteUser(ctx context.Context, accountID, initiatorUserID string, targetUserID string) error
|
||||
DeleteRegularUsers(ctx context.Context, accountID, initiatorUserID string, targetUserIDs []string) error
|
||||
InviteUser(ctx context.Context, accountID string, initiatorUserID string, targetUserID string) error
|
||||
ListSetupKeys(ctx context.Context, accountID, userID string) ([]*SetupKey, error)
|
||||
SaveUser(ctx context.Context, accountID, initiatorUserID string, update *User) (*UserInfo, error)
|
||||
@@ -97,6 +101,7 @@ type AccountManager interface {
|
||||
SaveGroup(ctx context.Context, accountID, userID string, group *nbgroup.Group) error
|
||||
SaveGroups(ctx context.Context, accountID, userID string, newGroups []*nbgroup.Group) error
|
||||
DeleteGroup(ctx context.Context, accountId, userId, groupID string) error
|
||||
DeleteGroups(ctx context.Context, accountId, userId string, groupIDs []string) error
|
||||
ListGroups(ctx context.Context, accountId string) ([]*nbgroup.Group, error)
|
||||
GroupAddPeer(ctx context.Context, accountId, groupID, peerID string) error
|
||||
GroupDeletePeer(ctx context.Context, accountId, groupID, peerID string) error
|
||||
@@ -134,8 +139,8 @@ type AccountManager interface {
|
||||
UpdateIntegratedValidatorGroups(ctx context.Context, accountID string, userID string, groups []string) error
|
||||
GroupValidation(ctx context.Context, accountId string, groups []string) (bool, error)
|
||||
GetValidatedPeers(account *Account) (map[string]struct{}, error)
|
||||
SyncAndMarkPeer(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *NetworkMap, []*posture.Checks, error)
|
||||
CancelPeerRoutines(ctx context.Context, peer *nbpeer.Peer) error
|
||||
SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *NetworkMap, []*posture.Checks, error)
|
||||
OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error
|
||||
SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error
|
||||
FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error)
|
||||
GetAccountIDForPeerKey(ctx context.Context, peerKey string) (string, error)
|
||||
@@ -169,6 +174,8 @@ type DefaultAccountManager struct {
|
||||
userDeleteFromIDPEnabled bool
|
||||
|
||||
integratedPeerValidator integrated_validator.IntegratedValidator
|
||||
|
||||
metrics telemetry.AppMetrics
|
||||
}
|
||||
|
||||
// Settings represents Account settings structure that can be modified via API and Dashboard
|
||||
@@ -400,8 +407,16 @@ func (a *Account) GetGroup(groupID string) *nbgroup.Group {
|
||||
return a.Groups[groupID]
|
||||
}
|
||||
|
||||
// GetPeerNetworkMap returns a group by ID if exists, nil otherwise
|
||||
func (a *Account) GetPeerNetworkMap(ctx context.Context, peerID, dnsDomain string, validatedPeersMap map[string]struct{}) *NetworkMap {
|
||||
// GetPeerNetworkMap returns the networkmap for the given peer ID.
|
||||
func (a *Account) GetPeerNetworkMap(
|
||||
ctx context.Context,
|
||||
peerID string,
|
||||
peersCustomZone nbdns.CustomZone,
|
||||
validatedPeersMap map[string]struct{},
|
||||
metrics *telemetry.AccountManagerMetrics,
|
||||
) *NetworkMap {
|
||||
start := time.Now()
|
||||
|
||||
peer := a.Peers[peerID]
|
||||
if peer == nil {
|
||||
return &NetworkMap{
|
||||
@@ -437,7 +452,7 @@ func (a *Account) GetPeerNetworkMap(ctx context.Context, peerID, dnsDomain strin
|
||||
|
||||
if dnsManagementStatus {
|
||||
var zones []nbdns.CustomZone
|
||||
peersCustomZone := getPeersCustomZone(ctx, a, dnsDomain)
|
||||
|
||||
if peersCustomZone.Domain != "" {
|
||||
zones = append(zones, peersCustomZone)
|
||||
}
|
||||
@@ -445,7 +460,7 @@ func (a *Account) GetPeerNetworkMap(ctx context.Context, peerID, dnsDomain strin
|
||||
dnsUpdate.NameServerGroups = getPeerNSGroups(a, peerID)
|
||||
}
|
||||
|
||||
return &NetworkMap{
|
||||
nm := &NetworkMap{
|
||||
Peers: peersToConnect,
|
||||
Network: a.Network.Copy(),
|
||||
Routes: routesUpdate,
|
||||
@@ -453,6 +468,60 @@ func (a *Account) GetPeerNetworkMap(ctx context.Context, peerID, dnsDomain strin
|
||||
OfflinePeers: expiredPeers,
|
||||
FirewallRules: firewallRules,
|
||||
}
|
||||
|
||||
if metrics != nil {
|
||||
objectCount := int64(len(peersToConnect) + len(expiredPeers) + len(routesUpdate) + len(firewallRules))
|
||||
metrics.CountNetworkMapObjects(objectCount)
|
||||
metrics.CountGetPeerNetworkMapDuration(time.Since(start))
|
||||
}
|
||||
|
||||
return nm
|
||||
}
|
||||
|
||||
func (a *Account) GetPeersCustomZone(ctx context.Context, dnsDomain string) nbdns.CustomZone {
|
||||
var merr *multierror.Error
|
||||
|
||||
if dnsDomain == "" {
|
||||
log.WithContext(ctx).Error("no dns domain is set, returning empty zone")
|
||||
return nbdns.CustomZone{}
|
||||
}
|
||||
|
||||
customZone := nbdns.CustomZone{
|
||||
Domain: dns.Fqdn(dnsDomain),
|
||||
Records: make([]nbdns.SimpleRecord, 0, len(a.Peers)),
|
||||
}
|
||||
|
||||
domainSuffix := "." + dnsDomain
|
||||
|
||||
var sb strings.Builder
|
||||
for _, peer := range a.Peers {
|
||||
if peer.DNSLabel == "" {
|
||||
merr = multierror.Append(merr, fmt.Errorf("peer %s has an empty DNS label", peer.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
sb.Grow(len(peer.DNSLabel) + len(domainSuffix))
|
||||
sb.WriteString(peer.DNSLabel)
|
||||
sb.WriteString(domainSuffix)
|
||||
|
||||
customZone.Records = append(customZone.Records, nbdns.SimpleRecord{
|
||||
Name: sb.String(),
|
||||
Type: int(dns.TypeA),
|
||||
Class: nbdns.DefaultClass,
|
||||
TTL: defaultTTL,
|
||||
RData: peer.IP.String(),
|
||||
})
|
||||
|
||||
sb.Reset()
|
||||
}
|
||||
|
||||
go func() {
|
||||
if merr != nil {
|
||||
log.WithContext(ctx).Errorf("error generating custom zone for account %s: %v", a.Id, merr)
|
||||
}
|
||||
}()
|
||||
|
||||
return customZone
|
||||
}
|
||||
|
||||
// GetExpiredPeers returns peers that have been expired
|
||||
@@ -769,10 +838,6 @@ func (a *Account) GetPeer(peerID string) *nbpeer.Peer {
|
||||
// SetJWTGroups updates the user's auto groups by synchronizing JWT groups.
|
||||
// Returns true if there are changes in the JWT group membership.
|
||||
func (a *Account) SetJWTGroups(userID string, groupsNames []string) bool {
|
||||
if len(groupsNames) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
user, ok := a.Users[userID]
|
||||
if !ok {
|
||||
return false
|
||||
@@ -856,7 +921,7 @@ func (a *Account) UserGroupsAddToPeers(userID string, groups ...string) {
|
||||
func (a *Account) UserGroupsRemoveFromPeers(userID string, groups ...string) {
|
||||
for _, gid := range groups {
|
||||
group, ok := a.Groups[gid]
|
||||
if !ok {
|
||||
if !ok || group.Name == "All" {
|
||||
continue
|
||||
}
|
||||
update := make([]string, 0, len(group.Peers))
|
||||
@@ -874,10 +939,18 @@ func (a *Account) UserGroupsRemoveFromPeers(userID string, groups ...string) {
|
||||
}
|
||||
|
||||
// BuildManager creates a new DefaultAccountManager with a provided Store
|
||||
func BuildManager(ctx context.Context, store Store, peersUpdateManager *PeersUpdateManager, idpManager idp.Manager,
|
||||
singleAccountModeDomain string, dnsDomain string, eventStore activity.Store, geo *geolocation.Geolocation,
|
||||
func BuildManager(
|
||||
ctx context.Context,
|
||||
store Store,
|
||||
peersUpdateManager *PeersUpdateManager,
|
||||
idpManager idp.Manager,
|
||||
singleAccountModeDomain string,
|
||||
dnsDomain string,
|
||||
eventStore activity.Store,
|
||||
geo *geolocation.Geolocation,
|
||||
userDeleteFromIDPEnabled bool,
|
||||
integratedPeerValidator integrated_validator.IntegratedValidator,
|
||||
metrics telemetry.AppMetrics,
|
||||
) (*DefaultAccountManager, error) {
|
||||
am := &DefaultAccountManager{
|
||||
Store: store,
|
||||
@@ -892,6 +965,7 @@ func BuildManager(ctx context.Context, store Store, peersUpdateManager *PeersUpd
|
||||
peerLoginExpiry: NewDefaultScheduler(),
|
||||
userDeleteFromIDPEnabled: userDeleteFromIDPEnabled,
|
||||
integratedPeerValidator: integratedPeerValidator,
|
||||
metrics: metrics,
|
||||
}
|
||||
allAccounts := store.GetAllAccounts(ctx)
|
||||
// enable single account mode only if configured by user and number of existing accounts is not grater than 1
|
||||
@@ -977,7 +1051,7 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
|
||||
return nil, status.Errorf(status.InvalidArgument, "peer login expiration can't be smaller than one hour")
|
||||
}
|
||||
|
||||
unlock := am.Store.AcquireAccountWriteLock(ctx, accountID)
|
||||
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
|
||||
defer unlock()
|
||||
|
||||
account, err := am.Store.GetAccount(ctx, accountID)
|
||||
@@ -1028,7 +1102,7 @@ func (am *DefaultAccountManager) UpdateAccountSettings(ctx context.Context, acco
|
||||
|
||||
func (am *DefaultAccountManager) peerLoginExpirationJob(ctx context.Context, accountID string) func() (time.Duration, bool) {
|
||||
return func() (time.Duration, bool) {
|
||||
unlock := am.Store.AcquireAccountWriteLock(ctx, accountID)
|
||||
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
|
||||
defer unlock()
|
||||
|
||||
account, err := am.Store.GetAccount(ctx, accountID)
|
||||
@@ -1127,7 +1201,7 @@ func (am *DefaultAccountManager) warmupIDPCache(ctx context.Context) error {
|
||||
|
||||
// DeleteAccount deletes an account and all its users from local store and from the remote IDP if the requester is an admin and account owner
|
||||
func (am *DefaultAccountManager) DeleteAccount(ctx context.Context, accountID, userID string) error {
|
||||
unlock := am.Store.AcquireAccountWriteLock(ctx, accountID)
|
||||
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
|
||||
defer unlock()
|
||||
account, err := am.Store.GetAccount(ctx, accountID)
|
||||
if err != nil {
|
||||
@@ -1587,7 +1661,7 @@ func (am *DefaultAccountManager) MarkPATUsed(ctx context.Context, tokenID string
|
||||
return err
|
||||
}
|
||||
|
||||
unlock := am.Store.AcquireAccountWriteLock(ctx, account.Id)
|
||||
unlock := am.Store.AcquireWriteLockByUID(ctx, account.Id)
|
||||
defer unlock()
|
||||
|
||||
account, err = am.Store.GetAccountByUser(ctx, user.Id)
|
||||
@@ -1670,7 +1744,7 @@ func (am *DefaultAccountManager) GetAccountFromToken(ctx context.Context, claims
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
unlock := am.Store.AcquireAccountWriteLock(ctx, newAcc.Id)
|
||||
unlock := am.Store.AcquireWriteLockByUID(ctx, newAcc.Id)
|
||||
alreadyUnlocked := false
|
||||
defer func() {
|
||||
if !alreadyUnlocked {
|
||||
@@ -1831,7 +1905,7 @@ func (am *DefaultAccountManager) getAccountWithAuthorizationClaims(ctx context.C
|
||||
|
||||
account, err := am.Store.GetAccountByUser(ctx, claims.UserId)
|
||||
if err == nil {
|
||||
unlockAccount := am.Store.AcquireAccountWriteLock(ctx, account.Id)
|
||||
unlockAccount := am.Store.AcquireWriteLockByUID(ctx, account.Id)
|
||||
defer unlockAccount()
|
||||
account, err = am.Store.GetAccountByUser(ctx, claims.UserId)
|
||||
if err != nil {
|
||||
@@ -1851,7 +1925,7 @@ func (am *DefaultAccountManager) getAccountWithAuthorizationClaims(ctx context.C
|
||||
return account, nil
|
||||
} else if s, ok := status.FromError(err); ok && s.Type() == status.NotFound {
|
||||
if domainAccount != nil {
|
||||
unlockAccount := am.Store.AcquireAccountWriteLock(ctx, domainAccount.Id)
|
||||
unlockAccount := am.Store.AcquireWriteLockByUID(ctx, domainAccount.Id)
|
||||
defer unlockAccount()
|
||||
domainAccount, err = am.Store.GetAccountByPrivateDomain(ctx, claims.Domain)
|
||||
if err != nil {
|
||||
@@ -1865,17 +1939,11 @@ func (am *DefaultAccountManager) getAccountWithAuthorizationClaims(ctx context.C
|
||||
}
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *NetworkMap, []*posture.Checks, error) {
|
||||
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, peerPubKey)
|
||||
if err != nil {
|
||||
if errStatus, ok := status.FromError(err); ok && errStatus.Type() == status.NotFound {
|
||||
return nil, nil, nil, status.Errorf(status.Unauthenticated, "peer not registered")
|
||||
}
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
unlock := am.Store.AcquireAccountReadLock(ctx, accountID)
|
||||
defer unlock()
|
||||
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *NetworkMap, []*posture.Checks, error) {
|
||||
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
||||
defer accountUnlock()
|
||||
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
||||
defer peerUnlock()
|
||||
|
||||
account, err := am.Store.GetAccount(ctx, accountID)
|
||||
if err != nil {
|
||||
@@ -1895,26 +1963,20 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, peerPubKey
|
||||
return peer, netMap, postureChecks, nil
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) CancelPeerRoutines(ctx context.Context, peer *nbpeer.Peer) error {
|
||||
accountID, err := am.Store.GetAccountIDByPeerPubKey(ctx, peer.Key)
|
||||
if err != nil {
|
||||
if errStatus, ok := status.FromError(err); ok && errStatus.Type() == status.NotFound {
|
||||
return status.Errorf(status.Unauthenticated, "peer not registered")
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
unlock := am.Store.AcquireAccountWriteLock(ctx, accountID)
|
||||
defer unlock()
|
||||
func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error {
|
||||
accountUnlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
||||
defer accountUnlock()
|
||||
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
||||
defer peerUnlock()
|
||||
|
||||
account, err := am.Store.GetAccount(ctx, accountID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = am.MarkPeerConnected(ctx, peer.Key, false, nil, account)
|
||||
err = am.MarkPeerConnected(ctx, peerPubKey, false, nil, account)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peer.Key, err)
|
||||
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -1927,7 +1989,7 @@ func (am *DefaultAccountManager) SyncPeerMeta(ctx context.Context, peerPubKey st
|
||||
return err
|
||||
}
|
||||
|
||||
unlock := am.Store.AcquireAccountReadLock(ctx, accountID)
|
||||
unlock := am.Store.AcquireReadLockByUID(ctx, accountID)
|
||||
defer unlock()
|
||||
|
||||
account, err := am.Store.GetAccount(ctx, accountID)
|
||||
|
||||
Reference in New Issue
Block a user