mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-02 15:16:38 +00:00
pre-processing
This commit is contained in:
@@ -33,6 +33,10 @@ import (
|
||||
"github.com/netbirdio/netbird/management/server/status"
|
||||
)
|
||||
|
||||
// Declare sqlStore and ok at the top so they are in scope for all usages
|
||||
var sqlStore *store.SqlStore
|
||||
var ok bool
|
||||
|
||||
// GetPeers returns a list of peers under the given account filtering out peers that do not belong to a user if
|
||||
// the current user is not an admin.
|
||||
func (am *DefaultAccountManager) GetPeers(ctx context.Context, accountID, userID, nameFilter, ipFilter string) ([]*nbpeer.Peer, error) {
|
||||
@@ -407,6 +411,24 @@ func (am *DefaultAccountManager) GetNetworkMap(ctx context.Context, peerID strin
|
||||
return nil, status.Errorf(status.NotFound, "peer with ID %s not found", peerID)
|
||||
}
|
||||
|
||||
// Try to serve precomputed network map from DB if up-to-date
|
||||
sqlStore, ok = am.Store.(*store.SqlStore)
|
||||
if ok {
|
||||
db := sqlStore.GetDB()
|
||||
var record *types.NetworkMapRecord
|
||||
var err error
|
||||
record, err = types.GetNetworkMapRecord(db, peer.ID)
|
||||
if err == nil && record.Serial == account.Network.CurrentSerial() {
|
||||
var nm *types.NetworkMap
|
||||
nm, err = types.DeserializeNetworkMap(record.MapJSON)
|
||||
if err == nil {
|
||||
log.WithContext(ctx).Debugf("serving precomputed network map for peer %s from DB", peer.ID)
|
||||
return nm, nil
|
||||
}
|
||||
log.WithContext(ctx).Warnf("failed to deserialize precomputed network map for peer %s: %v", peer.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
groups := make(map[string][]string)
|
||||
for groupID, group := range account.Groups {
|
||||
groups[groupID] = group.Peers
|
||||
@@ -424,13 +446,34 @@ func (am *DefaultAccountManager) GetNetworkMap(ctx context.Context, peerID strin
|
||||
return nil, err
|
||||
}
|
||||
|
||||
networkMap := account.GetPeerNetworkMap(ctx, peer.ID, customZone, validatedPeers, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), nil)
|
||||
|
||||
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]
|
||||
var proxyNetworkMap *types.NetworkMap
|
||||
networkMap := account.GetPeerNetworkMap(ctx, peerID, customZone, validatedPeers, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), nil)
|
||||
proxyNetworkMap, ok = proxyNetworkMaps[peerID]
|
||||
if ok {
|
||||
networkMap.Merge(proxyNetworkMap)
|
||||
}
|
||||
|
||||
// After generating the network map, store it as a precomputed blob in the DB
|
||||
sqlStore, ok = am.Store.(*store.SqlStore)
|
||||
if ok {
|
||||
db := sqlStore.GetDB()
|
||||
data, err := types.SerializeNetworkMap(networkMap)
|
||||
if err == nil {
|
||||
record := &types.NetworkMapRecord{
|
||||
PeerID: peer.ID,
|
||||
AccountID: account.Id,
|
||||
MapJSON: data,
|
||||
Serial: networkMap.Network.CurrentSerial(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
err = types.SaveNetworkMapRecord(db, record)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to store precomputed network map for peer %s: %v", peer.ID, err)
|
||||
}
|
||||
} else {
|
||||
log.WithContext(ctx).Warnf("failed to serialize network map for peer %s: %v", peer.ID, err)
|
||||
}
|
||||
}
|
||||
return networkMap, nil
|
||||
}
|
||||
|
||||
@@ -1053,13 +1096,47 @@ func (am *DefaultAccountManager) getValidatedPeerWithMap(ctx context.Context, is
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
networkMap := account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), am.metrics.AccountManagerMetrics())
|
||||
|
||||
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]
|
||||
var proxyNetworkMap *types.NetworkMap
|
||||
networkMap := account.GetPeerNetworkMap(ctx, peer.ID, customZone, approvedPeersMap, account.GetResourcePoliciesMap(), account.GetResourceRoutersMap(), nil)
|
||||
proxyNetworkMap, ok = proxyNetworkMaps[peer.ID]
|
||||
if ok {
|
||||
networkMap.Merge(proxyNetworkMap)
|
||||
}
|
||||
|
||||
// After generating the network map, store it as a precomputed blob in the DB
|
||||
sqlStore, ok = am.Store.(*store.SqlStore)
|
||||
if ok {
|
||||
db := sqlStore.GetDB()
|
||||
data, err := types.SerializeNetworkMap(networkMap)
|
||||
if err == nil {
|
||||
record := &types.NetworkMapRecord{
|
||||
PeerID: peer.ID,
|
||||
AccountID: account.Id,
|
||||
MapJSON: data,
|
||||
Serial: networkMap.Network.CurrentSerial(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
err = types.SaveNetworkMapRecord(db, record)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to store precomputed network map for peer %s: %v", peer.ID, err)
|
||||
}
|
||||
} else {
|
||||
log.WithContext(ctx).Warnf("failed to serialize network map for peer %s: %v", peer.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
|
||||
return nil, nil, nil, err
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
update := toSyncResponse(ctx, nil, peer, nil, nil, networkMap, am.GetDNSDomain(account.Settings), postureChecks, nil, account.Settings, extraSetting)
|
||||
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
|
||||
|
||||
am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update, NetworkMap: networkMap})
|
||||
|
||||
return peer, networkMap, postureChecks, nil
|
||||
}
|
||||
|
||||
@@ -1239,12 +1316,35 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
am.metrics.UpdateChannelMetrics().CountCalcPeerNetworkMapDuration(time.Since(start))
|
||||
start = time.Now()
|
||||
|
||||
proxyNetworkMap, ok := proxyNetworkMaps[p.ID]
|
||||
var proxyNetworkMap *types.NetworkMap
|
||||
proxyNetworkMap, ok = proxyNetworkMaps[p.ID]
|
||||
if ok {
|
||||
remotePeerNetworkMap.Merge(proxyNetworkMap)
|
||||
}
|
||||
am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start))
|
||||
|
||||
// Store the precomputed network map in the DB
|
||||
sqlStore, ok = am.Store.(*store.SqlStore)
|
||||
if ok {
|
||||
db := sqlStore.GetDB()
|
||||
data, err := types.SerializeNetworkMap(remotePeerNetworkMap)
|
||||
if err == nil {
|
||||
record := &types.NetworkMapRecord{
|
||||
PeerID: p.ID,
|
||||
AccountID: account.Id,
|
||||
MapJSON: data,
|
||||
Serial: remotePeerNetworkMap.Network.CurrentSerial(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
err = types.SaveNetworkMapRecord(db, record)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to store precomputed network map for peer %s: %v", p.ID, err)
|
||||
}
|
||||
} else {
|
||||
log.WithContext(ctx).Warnf("failed to serialize network map for peer %s: %v", p.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get flow enabled status: %v", err)
|
||||
@@ -1259,8 +1359,6 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
}(peer)
|
||||
}
|
||||
|
||||
//
|
||||
|
||||
wg.Wait()
|
||||
if am.metrics != nil {
|
||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
||||
@@ -1326,21 +1424,43 @@ func (am *DefaultAccountManager) UpdateAccountPeer(ctx context.Context, accountI
|
||||
return
|
||||
}
|
||||
|
||||
remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, peerId, customZone, approvedPeersMap, resourcePolicies, routers, am.metrics.AccountManagerMetrics())
|
||||
|
||||
proxyNetworkMap, ok := proxyNetworkMaps[peer.ID]
|
||||
var proxyNetworkMap *types.NetworkMap
|
||||
proxyNetworkMap, ok = proxyNetworkMaps[peer.ID]
|
||||
if ok {
|
||||
remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, peerId, customZone, approvedPeersMap, resourcePolicies, routers, am.metrics.AccountManagerMetrics())
|
||||
remotePeerNetworkMap.Merge(proxyNetworkMap)
|
||||
}
|
||||
|
||||
extraSettings, err := am.settingsManager.GetExtraSettings(ctx, peer.AccountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get extra settings: %v", err)
|
||||
return
|
||||
}
|
||||
// Store the precomputed network map in the DB
|
||||
sqlStore, ok = am.Store.(*store.SqlStore)
|
||||
if ok {
|
||||
db := sqlStore.GetDB()
|
||||
data, err := types.SerializeNetworkMap(remotePeerNetworkMap)
|
||||
if err == nil {
|
||||
record := &types.NetworkMapRecord{
|
||||
PeerID: peer.ID,
|
||||
AccountID: account.Id,
|
||||
MapJSON: data,
|
||||
Serial: remotePeerNetworkMap.Network.CurrentSerial(),
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
err = types.SaveNetworkMapRecord(db, record)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to store precomputed network map for peer %s: %v", peer.ID, err)
|
||||
}
|
||||
} else {
|
||||
log.WithContext(ctx).Warnf("failed to serialize network map for peer %s: %v", peer.ID, err)
|
||||
}
|
||||
}
|
||||
|
||||
update := toSyncResponse(ctx, nil, peer, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSettings)
|
||||
am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
|
||||
extraSettings, err := am.settingsManager.GetExtraSettings(ctx, peer.AccountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to get extra settings: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
update := toSyncResponse(ctx, nil, peer, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSettings)
|
||||
am.peersUpdateManager.SendUpdate(ctx, peer.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
|
||||
}
|
||||
}
|
||||
|
||||
// getNextPeerExpiration returns the minimum duration in which the next peer of the account will expire if it was found.
|
||||
|
||||
@@ -100,6 +100,7 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met
|
||||
&types.Account{}, &types.Policy{}, &types.PolicyRule{}, &route.Route{}, &nbdns.NameServerGroup{},
|
||||
&installation{}, &types.ExtraSettings{}, &posture.Checks{}, &nbpeer.NetworkAddress{},
|
||||
&networkTypes.Network{}, &routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{}, &types.AccountOnboarding{},
|
||||
&types.NetworkMapRecord{}, // <-- Added for precomputed network maps
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("auto migratePreAuto: %w", err)
|
||||
|
||||
17
management/server/types/network_map_helpers.go
Normal file
17
management/server/types/network_map_helpers.go
Normal file
@@ -0,0 +1,17 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
)
|
||||
|
||||
// SerializeNetworkMap serializes a NetworkMap to JSON
|
||||
func SerializeNetworkMap(nm *NetworkMap) ([]byte, error) {
|
||||
return json.Marshal(nm)
|
||||
}
|
||||
|
||||
// DeserializeNetworkMap deserializes JSON data into a NetworkMap
|
||||
func DeserializeNetworkMap(data []byte) (*NetworkMap, error) {
|
||||
var nm NetworkMap
|
||||
err := json.Unmarshal(data, &nm)
|
||||
return &nm, err
|
||||
}
|
||||
39
management/server/types/network_map_record.go
Normal file
39
management/server/types/network_map_record.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"gorm.io/datatypes"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
// NetworkMapRecord stores a precomputed network map for a peer
|
||||
// MapJSON is stored as jsonb (Postgres), json (MySQL), or text (SQLite)
|
||||
type NetworkMapRecord struct {
|
||||
PeerID string `gorm:"primaryKey"`
|
||||
AccountID string `gorm:"index"`
|
||||
MapJSON datatypes.JSON `gorm:"type:jsonb"` // GORM will use the right type for your DB
|
||||
Serial uint64
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
// TableName sets the table name for GORM
|
||||
// This ensures the table is named consistently across all supported databases.
|
||||
func (NetworkMapRecord) TableName() string {
|
||||
return "network_map_records"
|
||||
}
|
||||
|
||||
// SaveNetworkMapRecord stores or updates a NetworkMapRecord in the database
|
||||
func SaveNetworkMapRecord(db *gorm.DB, record *NetworkMapRecord) error {
|
||||
return db.Save(record).Error
|
||||
}
|
||||
|
||||
// GetNetworkMapRecord retrieves a NetworkMapRecord by peer ID
|
||||
func GetNetworkMapRecord(db *gorm.DB, peerID string) (*NetworkMapRecord, error) {
|
||||
var record NetworkMapRecord
|
||||
err := db.First(&record, "peer_id = ?", peerID).Error
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &record, nil
|
||||
}
|
||||
102
management/server/types/network_map_record_test.go
Normal file
102
management/server/types/network_map_record_test.go
Normal file
@@ -0,0 +1,102 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
)
|
||||
|
||||
func TestNetworkMapRecordCRUD(t *testing.T) {
|
||||
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, db.AutoMigrate(&NetworkMapRecord{}))
|
||||
|
||||
record := &NetworkMapRecord{
|
||||
PeerID: "peer1",
|
||||
AccountID: "account1",
|
||||
MapJSON: []byte(`{"Peers":[],"Network":null}`),
|
||||
Serial: 1,
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
require.NoError(t, SaveNetworkMapRecord(db, record))
|
||||
|
||||
fetched, err := GetNetworkMapRecord(db, "peer1")
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, record.PeerID, fetched.PeerID)
|
||||
require.Equal(t, record.AccountID, fetched.AccountID)
|
||||
require.Equal(t, record.Serial, fetched.Serial)
|
||||
require.Equal(t, record.MapJSON, fetched.MapJSON)
|
||||
}
|
||||
|
||||
// Simulate a normalized structure for comparison
|
||||
// In a real scenario, this would be split across multiple tables
|
||||
// Here, we just use a struct for benchmarking
|
||||
|
||||
type NormalizedPeer struct {
|
||||
ID string
|
||||
AccountID string
|
||||
Name string
|
||||
IP string
|
||||
}
|
||||
|
||||
type NormalizedNetworkMap struct {
|
||||
PeerID string
|
||||
Peers []NormalizedPeer
|
||||
Serial uint64
|
||||
UpdatedAt time.Time
|
||||
}
|
||||
|
||||
func BenchmarkNetworkMapRecord_StoreAndRetrieve_JSON(b *testing.B) {
|
||||
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
db.AutoMigrate(&NetworkMapRecord{})
|
||||
|
||||
record := &NetworkMapRecord{
|
||||
PeerID: "peer1",
|
||||
AccountID: "account1",
|
||||
MapJSON: []byte(`{"Peers":[{"ID":"p1","AccountID":"account1","Name":"peer1","IP":"10.0.0.1"}],"Network":null}`),
|
||||
Serial: 1,
|
||||
UpdatedAt: time.Now(),
|
||||
}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
record.Serial = uint64(i)
|
||||
record.UpdatedAt = time.Now()
|
||||
if err := SaveNetworkMapRecord(db, record); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
_, err := GetNetworkMapRecord(db, "peer1")
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkNetworkMapRecord_StoreAndRetrieve_Normalized(b *testing.B) {
|
||||
db, err := gorm.Open(sqlite.Open(":memory:"), &gorm.Config{})
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
db.AutoMigrate(&NormalizedPeer{})
|
||||
|
||||
peers := []NormalizedPeer{{ID: "p1", AccountID: "account1", Name: "peer1", IP: "10.0.0.1"}}
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, peer := range peers {
|
||||
if err := db.Save(&peer).Error; err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
var fetched []NormalizedPeer
|
||||
if err := db.Find(&fetched, "account_id = ?", "account1").Error; err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user