Compare commits

...

1 Commits

Author SHA1 Message Date
Hakan Sariman
2f40ccc713 Add NetworkMapSerial to SyncAndMarkPeer for optimized syncing 2025-10-06 16:16:10 +07:00
7 changed files with 61 additions and 28 deletions

View File

@@ -1634,13 +1634,13 @@ func (am *DefaultAccountManager) AllowSync(wgPubKey string, metahash uint64) boo
return am.loginFilter.allowLogin(wgPubKey, metahash) return am.loginFilter.allowLogin(wgPubKey, metahash)
} }
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) { func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, networkMapSerial uint64) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
start := time.Now() start := time.Now()
defer func() { defer func() {
log.WithContext(ctx).Debugf("SyncAndMarkPeer: took %v", time.Since(start)) log.WithContext(ctx).Debugf("SyncAndMarkPeer: took %v", time.Since(start))
}() }()
peer, netMap, postureChecks, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID) peer, netMap, postureChecks, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta, NetworkMapSerial: networkMapSerial}, accountID)
if err != nil { if err != nil {
return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err) return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err)
} }

View File

@@ -109,7 +109,7 @@ type Manager interface {
UpdateIntegratedValidator(ctx context.Context, accountID, userID, validator string, groups []string) error UpdateIntegratedValidator(ctx context.Context, accountID, userID, validator string, groups []string) error
GroupValidation(ctx context.Context, accountId string, groups []string) (bool, error) GroupValidation(ctx context.Context, accountId string, groups []string) (bool, error)
GetValidatedPeers(ctx context.Context, accountID string) (map[string]struct{}, error) GetValidatedPeers(ctx context.Context, accountID string) (map[string]struct{}, error)
SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, networkMapSerial uint64) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error)
OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error
SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error SyncPeerMeta(ctx context.Context, peerPubKey string, meta nbpeer.PeerSystemMeta) error
FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) FindExistingPostureCheck(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error)

View File

@@ -3040,7 +3040,7 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
b.ResetTimer() b.ResetTimer()
start := time.Now() start := time.Now()
for i := 0; i < b.N; i++ { for i := 0; i < b.N; i++ {
_, _, _, err := manager.SyncAndMarkPeer(context.Background(), account.Id, account.Peers["peer-1"].Key, nbpeer.PeerSystemMeta{Hostname: strconv.Itoa(i)}, net.IP{1, 1, 1, 1}) _, _, _, err := manager.SyncAndMarkPeer(context.Background(), account.Id, account.Peers["peer-1"].Key, nbpeer.PeerSystemMeta{Hostname: strconv.Itoa(i)}, net.IP{1, 1, 1, 1}, 0)
assert.NoError(b, err) assert.NoError(b, err)
} }

View File

@@ -209,7 +209,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP) log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP)
} }
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP) peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP, syncReq.GetNetworkMapSerial())
if err != nil { if err != nil {
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err) log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
return mapError(ctx, err) return mapError(ctx, err)
@@ -716,39 +716,59 @@ func toPeerConfig(peer *nbpeer.Peer, network *types.Network, dnsName string, set
func toSyncResponse(ctx context.Context, config *nbconfig.Config, peer *nbpeer.Peer, turnCredentials *Token, relayCredentials *Token, networkMap *types.NetworkMap, dnsName string, checks []*posture.Checks, dnsCache *DNSConfigCache, settings *types.Settings, extraSettings *types.ExtraSettings, peerGroups []string) *proto.SyncResponse { func toSyncResponse(ctx context.Context, config *nbconfig.Config, peer *nbpeer.Peer, turnCredentials *Token, relayCredentials *Token, networkMap *types.NetworkMap, dnsName string, checks []*posture.Checks, dnsCache *DNSConfigCache, settings *types.Settings, extraSettings *types.ExtraSettings, peerGroups []string) *proto.SyncResponse {
response := &proto.SyncResponse{ response := &proto.SyncResponse{
PeerConfig: toPeerConfig(peer, networkMap.Network, dnsName, settings), Checks: toProtocolChecks(ctx, checks),
NetworkMap: &proto.NetworkMap{ }
// If networkMap is nil, indicate skip and omit NetworkMap
if networkMap == nil {
response.SkipNetworkMapUpdate = true
} else {
response.PeerConfig = toPeerConfig(peer, networkMap.Network, dnsName, settings)
response.NetworkMap = &proto.NetworkMap{
Serial: networkMap.Network.CurrentSerial(), Serial: networkMap.Network.CurrentSerial(),
Routes: toProtocolRoutes(networkMap.Routes), Routes: toProtocolRoutes(networkMap.Routes),
DNSConfig: toProtocolDNSConfig(networkMap.DNSConfig, dnsCache), DNSConfig: toProtocolDNSConfig(networkMap.DNSConfig, dnsCache),
}, }
Checks: toProtocolChecks(ctx, checks),
} }
nbConfig := toNetbirdConfig(config, turnCredentials, relayCredentials, extraSettings) nbConfig := toNetbirdConfig(config, turnCredentials, relayCredentials, extraSettings)
extendedConfig := integrationsConfig.ExtendNetBirdConfig(peer.ID, peerGroups, nbConfig, extraSettings) extendedConfig := integrationsConfig.ExtendNetBirdConfig(peer.ID, peerGroups, nbConfig, extraSettings)
response.NetbirdConfig = extendedConfig response.NetbirdConfig = extendedConfig
response.NetworkMap.PeerConfig = response.PeerConfig if response.NetworkMap != nil {
response.NetworkMap.PeerConfig = response.PeerConfig
}
allPeers := make([]*proto.RemotePeerConfig, 0, len(networkMap.Peers)+len(networkMap.OfflinePeers)) if networkMap != nil {
allPeers = appendRemotePeerConfig(allPeers, networkMap.Peers, dnsName) allPeers := make([]*proto.RemotePeerConfig, 0, len(networkMap.Peers)+len(networkMap.OfflinePeers))
response.RemotePeers = allPeers allPeers = appendRemotePeerConfig(allPeers, networkMap.Peers, dnsName)
response.NetworkMap.RemotePeers = allPeers response.RemotePeers = allPeers
response.RemotePeersIsEmpty = len(allPeers) == 0 if response.NetworkMap != nil {
response.NetworkMap.RemotePeersIsEmpty = response.RemotePeersIsEmpty response.NetworkMap.RemotePeers = allPeers
}
response.RemotePeersIsEmpty = len(allPeers) == 0
if response.NetworkMap != nil {
response.NetworkMap.RemotePeersIsEmpty = response.RemotePeersIsEmpty
}
}
response.NetworkMap.OfflinePeers = appendRemotePeerConfig(nil, networkMap.OfflinePeers, dnsName) if networkMap != nil && response.NetworkMap != nil {
response.NetworkMap.OfflinePeers = appendRemotePeerConfig(nil, networkMap.OfflinePeers, dnsName)
}
firewallRules := toProtocolFirewallRules(networkMap.FirewallRules) if networkMap != nil && response.NetworkMap != nil {
response.NetworkMap.FirewallRules = firewallRules firewallRules := toProtocolFirewallRules(networkMap.FirewallRules)
response.NetworkMap.FirewallRulesIsEmpty = len(firewallRules) == 0 response.NetworkMap.FirewallRules = firewallRules
response.NetworkMap.FirewallRulesIsEmpty = len(firewallRules) == 0
}
routesFirewallRules := toProtocolRoutesFirewallRules(networkMap.RoutesFirewallRules) if networkMap != nil && response.NetworkMap != nil {
response.NetworkMap.RoutesFirewallRules = routesFirewallRules routesFirewallRules := toProtocolRoutesFirewallRules(networkMap.RoutesFirewallRules)
response.NetworkMap.RoutesFirewallRulesIsEmpty = len(routesFirewallRules) == 0 response.NetworkMap.RoutesFirewallRules = routesFirewallRules
response.NetworkMap.RoutesFirewallRulesIsEmpty = len(routesFirewallRules) == 0
}
if networkMap.ForwardingRules != nil { if networkMap != nil && response.NetworkMap != nil && networkMap.ForwardingRules != nil {
forwardingRules := make([]*proto.ForwardingRule, 0, len(networkMap.ForwardingRules)) forwardingRules := make([]*proto.ForwardingRule, 0, len(networkMap.ForwardingRules))
for _, rule := range networkMap.ForwardingRules { for _, rule := range networkMap.ForwardingRules {
forwardingRules = append(forwardingRules, rule.ToProto()) forwardingRules = append(forwardingRules, rule.ToProto())

View File

@@ -37,7 +37,7 @@ type MockAccountManager struct {
ListUsersFunc func(ctx context.Context, accountID string) ([]*types.User, error) ListUsersFunc func(ctx context.Context, accountID string) ([]*types.User, error)
GetPeersFunc func(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error) GetPeersFunc func(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error)
MarkPeerConnectedFunc func(ctx context.Context, peerKey string, connected bool, realIP net.IP) error MarkPeerConnectedFunc func(ctx context.Context, peerKey string, connected bool, realIP net.IP) error
SyncAndMarkPeerFunc func(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) SyncAndMarkPeerFunc func(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, networkMapSerial uint64) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error)
DeletePeerFunc func(ctx context.Context, accountID, peerKey, userID string) error DeletePeerFunc func(ctx context.Context, accountID, peerKey, userID string) error
GetNetworkMapFunc func(ctx context.Context, peerKey string) (*types.NetworkMap, error) GetNetworkMapFunc func(ctx context.Context, peerKey string) (*types.NetworkMap, error)
GetPeerNetworkFunc func(ctx context.Context, peerKey string) (*types.Network, error) GetPeerNetworkFunc func(ctx context.Context, peerKey string) (*types.Network, error)
@@ -176,11 +176,11 @@ func (am *MockAccountManager) DeleteSetupKey(ctx context.Context, accountID, use
return status.Errorf(codes.Unimplemented, "method DeleteSetupKey is not implemented") return status.Errorf(codes.Unimplemented, "method DeleteSetupKey is not implemented")
} }
func (am *MockAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) { func (am *MockAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP, networkMapSerial uint64) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
if am.SyncAndMarkPeerFunc != nil { if am.SyncAndMarkPeerFunc != nil {
return am.SyncAndMarkPeerFunc(ctx, accountID, peerPubKey, meta, realIP) return am.SyncAndMarkPeerFunc(ctx, accountID, peerPubKey, meta, realIP, networkMapSerial)
} }
return nil, nil, nil, status.Errorf(codes.Unimplemented, "method MarkPeerConnected is not implemented") return nil, nil, nil, status.Errorf(codes.Unimplemented, "method SyncAndMarkPeer is not implemented")
} }
func (am *MockAccountManager) OnPeerDisconnected(_ context.Context, accountID string, peerPubKey string) error { func (am *MockAccountManager) OnPeerDisconnected(_ context.Context, accountID string, peerPubKey string) error {

View File

@@ -775,6 +775,17 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
am.BufferUpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
// Optimization: if client's network map serial matches current, skip full map calculation
if !peerNotValid && !isStatusChanged && !sync.UpdateAccountPeers && !(updated && len(postureChecks) > 0) && sync.NetworkMapSerial > 0 {
network, err := am.Store.GetAccountNetwork(ctx, store.LockingStrengthNone, accountID)
if err == nil {
currentSerial := network.CurrentSerial()
if currentSerial == sync.NetworkMapSerial {
return peer, nil, postureChecks, nil
}
}
}
return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer) return am.getValidatedPeerWithMap(ctx, peerNotValid, accountID, peer)
} }

View File

@@ -12,6 +12,8 @@ type PeerSync struct {
WireGuardPubKey string WireGuardPubKey string
// Meta is the system information passed by peer, must be always present // Meta is the system information passed by peer, must be always present
Meta nbpeer.PeerSystemMeta Meta nbpeer.PeerSystemMeta
// NetworkMapSerial is the last known network map serial from the client
NetworkMapSerial uint64
// UpdateAccountPeers indicate updating account peers, // UpdateAccountPeers indicate updating account peers,
// which occurs when the peer's metadata is updated // which occurs when the peer's metadata is updated
UpdateAccountPeers bool UpdateAccountPeers bool