From 5142dc52c11ad9841fe8ae229ed585d27f976503 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Mon, 2 Dec 2024 17:55:02 +0100 Subject: [PATCH] [client] Persist route selection (#2810) --- client/firewall/iptables/rulestore_linux.go | 10 ++ client/internal/engine.go | 13 +- client/internal/engine_test.go | 5 +- client/internal/routemanager/manager.go | 38 ++++- client/internal/routemanager/manager_test.go | 4 +- client/internal/routemanager/mock.go | 2 +- .../routemanager/refcounter/refcounter.go | 13 ++ client/internal/routemanager/state.go | 19 +++ .../internal/routeselector/routeselector.go | 67 +++++++++ client/internal/statemanager/manager.go | 142 ++++++++++++++---- 10 files changed, 273 insertions(+), 40 deletions(-) create mode 100644 client/internal/routemanager/state.go diff --git a/client/firewall/iptables/rulestore_linux.go b/client/firewall/iptables/rulestore_linux.go index bfd08bee2..004c512a4 100644 --- a/client/firewall/iptables/rulestore_linux.go +++ b/client/firewall/iptables/rulestore_linux.go @@ -37,6 +37,11 @@ func (s *ipList) UnmarshalJSON(data []byte) error { return err } s.ips = temp.IPs + + if temp.IPs == nil { + temp.IPs = make(map[string]struct{}) + } + return nil } @@ -89,5 +94,10 @@ func (s *ipsetStore) UnmarshalJSON(data []byte) error { return err } s.ipsets = temp.IPSets + + if temp.IPSets == nil { + temp.IPSets = make(map[string]*ipList) + } + return nil } diff --git a/client/internal/engine.go b/client/internal/engine.go index dc4499e17..920c295cd 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -349,8 +349,17 @@ func (e *Engine) Start() error { } e.dnsServer = dnsServer - e.routeManager = routemanager.NewManager(e.ctx, e.config.WgPrivateKey.PublicKey().String(), e.config.DNSRouteInterval, e.wgInterface, e.statusRecorder, e.relayManager, initialRoutes) - beforePeerHook, afterPeerHook, err := e.routeManager.Init(e.stateManager) + e.routeManager = routemanager.NewManager( + e.ctx, + e.config.WgPrivateKey.PublicKey().String(), + e.config.DNSRouteInterval, + e.wgInterface, + e.statusRecorder, + e.relayManager, + initialRoutes, + e.stateManager, + ) + beforePeerHook, afterPeerHook, err := e.routeManager.Init() if err != nil { log.Errorf("Failed to initialize route manager: %s", err) } else { diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index b6c6186ea..b58c1f7e9 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -245,12 +245,15 @@ func TestEngine_UpdateNetworkMap(t *testing.T) { nil) wgIface := &iface.MockWGIface{ + NameFunc: func() string { return "utun102" }, RemovePeerFunc: func(peerKey string) error { return nil }, } engine.wgInterface = wgIface - engine.routeManager = routemanager.NewManager(ctx, key.PublicKey().String(), time.Minute, engine.wgInterface, engine.statusRecorder, relayMgr, nil) + engine.routeManager = routemanager.NewManager(ctx, key.PublicKey().String(), time.Minute, engine.wgInterface, engine.statusRecorder, relayMgr, nil, nil) + _, _, err = engine.routeManager.Init() + require.NoError(t, err) engine.dnsServer = &dns.MockServer{ UpdateDNSServerFunc: func(serial uint64, update nbdns.Config) error { return nil }, } diff --git a/client/internal/routemanager/manager.go b/client/internal/routemanager/manager.go index 0a1c7dc56..f1c4ae5ef 100644 --- a/client/internal/routemanager/manager.go +++ b/client/internal/routemanager/manager.go @@ -32,7 +32,7 @@ import ( // Manager is a route manager interface type Manager interface { - Init(*statemanager.Manager) (nbnet.AddHookFunc, nbnet.RemoveHookFunc, error) + Init() (nbnet.AddHookFunc, nbnet.RemoveHookFunc, error) UpdateRoutes(updateSerial uint64, newRoutes []*route.Route) (map[route.ID]*route.Route, route.HAMap, error) TriggerSelection(route.HAMap) GetRouteSelector() *routeselector.RouteSelector @@ -59,6 +59,7 @@ type DefaultManager struct { routeRefCounter *refcounter.RouteRefCounter allowedIPsRefCounter *refcounter.AllowedIPsRefCounter dnsRouteInterval time.Duration + stateManager *statemanager.Manager } func NewManager( @@ -69,6 +70,7 @@ func NewManager( statusRecorder *peer.Status, relayMgr *relayClient.Manager, initialRoutes []*route.Route, + stateManager *statemanager.Manager, ) *DefaultManager { mCTX, cancel := context.WithCancel(ctx) notifier := notifier.NewNotifier() @@ -80,12 +82,12 @@ func NewManager( dnsRouteInterval: dnsRouteInterval, clientNetworks: make(map[route.HAUniqueID]*clientNetwork), relayMgr: relayMgr, - routeSelector: routeselector.NewRouteSelector(), sysOps: sysOps, statusRecorder: statusRecorder, wgInterface: wgInterface, pubKey: pubKey, notifier: notifier, + stateManager: stateManager, } dm.routeRefCounter = refcounter.New( @@ -121,7 +123,7 @@ func NewManager( } // Init sets up the routing -func (m *DefaultManager) Init(stateManager *statemanager.Manager) (nbnet.AddHookFunc, nbnet.RemoveHookFunc, error) { +func (m *DefaultManager) Init() (nbnet.AddHookFunc, nbnet.RemoveHookFunc, error) { if nbnet.CustomRoutingDisabled() { return nil, nil, nil } @@ -137,14 +139,38 @@ func (m *DefaultManager) Init(stateManager *statemanager.Manager) (nbnet.AddHook ips := resolveURLsToIPs(initialAddresses) - beforePeerHook, afterPeerHook, err := m.sysOps.SetupRouting(ips, stateManager) + beforePeerHook, afterPeerHook, err := m.sysOps.SetupRouting(ips, m.stateManager) if err != nil { return nil, nil, fmt.Errorf("setup routing: %w", err) } + + m.routeSelector = m.initSelector() + log.Info("Routing setup complete") return beforePeerHook, afterPeerHook, nil } +func (m *DefaultManager) initSelector() *routeselector.RouteSelector { + var state *SelectorState + m.stateManager.RegisterState(state) + + // restore selector state if it exists + if err := m.stateManager.LoadState(state); err != nil { + log.Warnf("failed to load state: %v", err) + return routeselector.NewRouteSelector() + } + + if state := m.stateManager.GetState(state); state != nil { + if selector, ok := state.(*SelectorState); ok { + return (*routeselector.RouteSelector)(selector) + } + + log.Warnf("failed to convert state with type %T to SelectorState", state) + } + + return routeselector.NewRouteSelector() +} + func (m *DefaultManager) EnableServerRouter(firewall firewall.Manager) error { var err error m.serverRouter, err = newServerRouter(m.ctx, m.wgInterface, firewall, m.statusRecorder) @@ -252,6 +278,10 @@ func (m *DefaultManager) TriggerSelection(networks route.HAMap) { go clientNetworkWatcher.peersStateAndUpdateWatcher() clientNetworkWatcher.sendUpdateToClientNetworkWatcher(routesUpdate{routes: routes}) } + + if err := m.stateManager.UpdateState((*SelectorState)(m.routeSelector)); err != nil { + log.Errorf("failed to update state: %v", err) + } } // stopObsoleteClients stops the client network watcher for the networks that are not in the new list diff --git a/client/internal/routemanager/manager_test.go b/client/internal/routemanager/manager_test.go index e669bc44a..07dac21b8 100644 --- a/client/internal/routemanager/manager_test.go +++ b/client/internal/routemanager/manager_test.go @@ -424,9 +424,9 @@ func TestManagerUpdateRoutes(t *testing.T) { statusRecorder := peer.NewRecorder("https://mgm") ctx := context.TODO() - routeManager := NewManager(ctx, localPeerKey, 0, wgInterface, statusRecorder, nil, nil) + routeManager := NewManager(ctx, localPeerKey, 0, wgInterface, statusRecorder, nil, nil, nil) - _, _, err = routeManager.Init(nil) + _, _, err = routeManager.Init() require.NoError(t, err, "should init route manager") defer routeManager.Stop(nil) diff --git a/client/internal/routemanager/mock.go b/client/internal/routemanager/mock.go index 503185f03..556a62351 100644 --- a/client/internal/routemanager/mock.go +++ b/client/internal/routemanager/mock.go @@ -21,7 +21,7 @@ type MockManager struct { StopFunc func(manager *statemanager.Manager) } -func (m *MockManager) Init(*statemanager.Manager) (net.AddHookFunc, net.RemoveHookFunc, error) { +func (m *MockManager) Init() (net.AddHookFunc, net.RemoveHookFunc, error) { return nil, nil, nil } diff --git a/client/internal/routemanager/refcounter/refcounter.go b/client/internal/routemanager/refcounter/refcounter.go index f2f0a169d..27a724f50 100644 --- a/client/internal/routemanager/refcounter/refcounter.go +++ b/client/internal/routemanager/refcounter/refcounter.go @@ -71,11 +71,14 @@ func New[Key comparable, I, O any](add AddFunc[Key, I, O], remove RemoveFunc[Key } // LoadData loads the data from the existing counter +// The passed counter should not be used any longer after calling this function. func (rm *Counter[Key, I, O]) LoadData( existingCounter *Counter[Key, I, O], ) { rm.mu.Lock() defer rm.mu.Unlock() + existingCounter.mu.Lock() + defer existingCounter.mu.Unlock() rm.refCountMap = existingCounter.refCountMap rm.idMap = existingCounter.idMap @@ -231,6 +234,9 @@ func (rm *Counter[Key, I, O]) MarshalJSON() ([]byte, error) { // UnmarshalJSON implements the json.Unmarshaler interface for Counter. func (rm *Counter[Key, I, O]) UnmarshalJSON(data []byte) error { + rm.mu.Lock() + defer rm.mu.Unlock() + var temp struct { RefCountMap map[Key]Ref[O] `json:"refCountMap"` IDMap map[string][]Key `json:"idMap"` @@ -241,6 +247,13 @@ func (rm *Counter[Key, I, O]) UnmarshalJSON(data []byte) error { rm.refCountMap = temp.RefCountMap rm.idMap = temp.IDMap + if temp.RefCountMap == nil { + temp.RefCountMap = map[Key]Ref[O]{} + } + if temp.IDMap == nil { + temp.IDMap = map[string][]Key{} + } + return nil } diff --git a/client/internal/routemanager/state.go b/client/internal/routemanager/state.go new file mode 100644 index 000000000..a45c32b50 --- /dev/null +++ b/client/internal/routemanager/state.go @@ -0,0 +1,19 @@ +package routemanager + +import ( + "github.com/netbirdio/netbird/client/internal/routeselector" +) + +type SelectorState routeselector.RouteSelector + +func (s *SelectorState) Name() string { + return "routeselector_state" +} + +func (s *SelectorState) MarshalJSON() ([]byte, error) { + return (*routeselector.RouteSelector)(s).MarshalJSON() +} + +func (s *SelectorState) UnmarshalJSON(data []byte) error { + return (*routeselector.RouteSelector)(s).UnmarshalJSON(data) +} diff --git a/client/internal/routeselector/routeselector.go b/client/internal/routeselector/routeselector.go index 00128a27b..2874604fd 100644 --- a/client/internal/routeselector/routeselector.go +++ b/client/internal/routeselector/routeselector.go @@ -1,8 +1,10 @@ package routeselector import ( + "encoding/json" "fmt" "slices" + "sync" "github.com/hashicorp/go-multierror" "golang.org/x/exp/maps" @@ -12,6 +14,7 @@ import ( ) type RouteSelector struct { + mu sync.RWMutex selectedRoutes map[route.NetID]struct{} selectAll bool } @@ -26,6 +29,9 @@ func NewRouteSelector() *RouteSelector { // SelectRoutes updates the selected routes based on the provided route IDs. func (rs *RouteSelector) SelectRoutes(routes []route.NetID, appendRoute bool, allRoutes []route.NetID) error { + rs.mu.Lock() + defer rs.mu.Unlock() + if !appendRoute { rs.selectedRoutes = map[route.NetID]struct{}{} } @@ -46,6 +52,9 @@ func (rs *RouteSelector) SelectRoutes(routes []route.NetID, appendRoute bool, al // SelectAllRoutes sets the selector to select all routes. func (rs *RouteSelector) SelectAllRoutes() { + rs.mu.Lock() + defer rs.mu.Unlock() + rs.selectAll = true rs.selectedRoutes = map[route.NetID]struct{}{} } @@ -53,6 +62,9 @@ func (rs *RouteSelector) SelectAllRoutes() { // DeselectRoutes removes specific routes from the selection. // If the selector is in "select all" mode, it will transition to "select specific" mode. func (rs *RouteSelector) DeselectRoutes(routes []route.NetID, allRoutes []route.NetID) error { + rs.mu.Lock() + defer rs.mu.Unlock() + if rs.selectAll { rs.selectAll = false rs.selectedRoutes = map[route.NetID]struct{}{} @@ -76,12 +88,18 @@ func (rs *RouteSelector) DeselectRoutes(routes []route.NetID, allRoutes []route. // DeselectAllRoutes deselects all routes, effectively disabling route selection. func (rs *RouteSelector) DeselectAllRoutes() { + rs.mu.Lock() + defer rs.mu.Unlock() + rs.selectAll = false rs.selectedRoutes = map[route.NetID]struct{}{} } // IsSelected checks if a specific route is selected. func (rs *RouteSelector) IsSelected(routeID route.NetID) bool { + rs.mu.RLock() + defer rs.mu.RUnlock() + if rs.selectAll { return true } @@ -91,6 +109,9 @@ func (rs *RouteSelector) IsSelected(routeID route.NetID) bool { // FilterSelected removes unselected routes from the provided map. func (rs *RouteSelector) FilterSelected(routes route.HAMap) route.HAMap { + rs.mu.RLock() + defer rs.mu.RUnlock() + if rs.selectAll { return maps.Clone(routes) } @@ -103,3 +124,49 @@ func (rs *RouteSelector) FilterSelected(routes route.HAMap) route.HAMap { } return filtered } + +// MarshalJSON implements the json.Marshaler interface +func (rs *RouteSelector) MarshalJSON() ([]byte, error) { + rs.mu.RLock() + defer rs.mu.RUnlock() + + return json.Marshal(struct { + SelectedRoutes map[route.NetID]struct{} `json:"selected_routes"` + SelectAll bool `json:"select_all"` + }{ + SelectAll: rs.selectAll, + SelectedRoutes: rs.selectedRoutes, + }) +} + +// UnmarshalJSON implements the json.Unmarshaler interface +// If the JSON is empty or null, it will initialize like a NewRouteSelector. +func (rs *RouteSelector) UnmarshalJSON(data []byte) error { + rs.mu.Lock() + defer rs.mu.Unlock() + + // Check for null or empty JSON + if len(data) == 0 || string(data) == "null" { + rs.selectedRoutes = map[route.NetID]struct{}{} + rs.selectAll = true + return nil + } + + var temp struct { + SelectedRoutes map[route.NetID]struct{} `json:"selected_routes"` + SelectAll bool `json:"select_all"` + } + + if err := json.Unmarshal(data, &temp); err != nil { + return err + } + + rs.selectedRoutes = temp.SelectedRoutes + rs.selectAll = temp.SelectAll + + if rs.selectedRoutes == nil { + rs.selectedRoutes = map[route.NetID]struct{}{} + } + + return nil +} diff --git a/client/internal/statemanager/manager.go b/client/internal/statemanager/manager.go index da6dd022f..aae73b79f 100644 --- a/client/internal/statemanager/manager.go +++ b/client/internal/statemanager/manager.go @@ -22,9 +22,28 @@ import ( // State interface defines the methods that all state types must implement type State interface { Name() string +} + +// CleanableState interface extends State with cleanup capability +type CleanableState interface { + State Cleanup() error } +// RawState wraps raw JSON data for unregistered states +type RawState struct { + data json.RawMessage +} + +func (r *RawState) Name() string { + return "" // This is a placeholder implementation +} + +// MarshalJSON implements json.Marshaler to preserve the original JSON +func (r *RawState) MarshalJSON() ([]byte, error) { + return r.data, nil +} + // Manager handles the persistence and management of various states type Manager struct { mu sync.Mutex @@ -209,15 +228,15 @@ func (m *Manager) PersistState(ctx context.Context) error { return nil } -// loadState loads the existing state from the state file -func (m *Manager) loadState() error { +// loadStateFile reads and unmarshals the state file into a map of raw JSON messages +func (m *Manager) loadStateFile() (map[string]json.RawMessage, error) { data, err := os.ReadFile(m.filePath) if err != nil { if errors.Is(err, fs.ErrNotExist) { log.Debug("state file does not exist") - return nil + return nil, nil // nolint:nilnil } - return fmt.Errorf("read state file: %w", err) + return nil, fmt.Errorf("read state file: %w", err) } var rawStates map[string]json.RawMessage @@ -228,37 +247,69 @@ func (m *Manager) loadState() error { } else { log.Info("State file deleted") } - return fmt.Errorf("unmarshal states: %w", err) + return nil, fmt.Errorf("unmarshal states: %w", err) } - var merr *multierror.Error + return rawStates, nil +} - for name, rawState := range rawStates { - stateType, ok := m.stateTypes[name] - if !ok { - merr = multierror.Append(merr, fmt.Errorf("unknown state type: %s", name)) - continue - } +// loadSingleRawState unmarshals a raw state into a concrete state object +func (m *Manager) loadSingleRawState(name string, rawState json.RawMessage) (State, error) { + stateType, ok := m.stateTypes[name] + if !ok { + return nil, fmt.Errorf("state %s not registered", name) + } - if string(rawState) == "null" { - continue - } + if string(rawState) == "null" { + return nil, nil //nolint:nilnil + } - statePtr := reflect.New(stateType).Interface().(State) - if err := json.Unmarshal(rawState, statePtr); err != nil { - merr = multierror.Append(merr, fmt.Errorf("unmarshal state %s: %w", name, err)) - continue - } + statePtr := reflect.New(stateType).Interface().(State) + if err := json.Unmarshal(rawState, statePtr); err != nil { + return nil, fmt.Errorf("unmarshal state %s: %w", name, err) + } - m.states[name] = statePtr + return statePtr, nil +} + +// LoadState loads a specific state from the state file +func (m *Manager) LoadState(state State) error { + if m == nil { + return nil + } + + m.mu.Lock() + defer m.mu.Unlock() + + rawStates, err := m.loadStateFile() + if err != nil { + return err + } + if rawStates == nil { + return nil + } + + name := state.Name() + rawState, exists := rawStates[name] + if !exists { + return nil + } + + loadedState, err := m.loadSingleRawState(name, rawState) + if err != nil { + return err + } + + m.states[name] = loadedState + if loadedState != nil { log.Debugf("loaded state: %s", name) } - return nberrors.FormatErrorOrNil(merr) + return nil } -// PerformCleanup retrieves all states from the state file for the registered states and calls Cleanup on them. -// If the cleanup is successful, the state is marked for deletion. +// PerformCleanup retrieves all states from the state file and calls Cleanup on registered states that support it. +// Unregistered states are preserved in their original state. func (m *Manager) PerformCleanup() error { if m == nil { return nil @@ -267,22 +318,53 @@ func (m *Manager) PerformCleanup() error { m.mu.Lock() defer m.mu.Unlock() - if err := m.loadState(); err != nil { + // Load raw states from file + rawStates, err := m.loadStateFile() + if err != nil { log.Warnf("Failed to load state during cleanup: %v", err) + return err + } + if rawStates == nil { + return nil } var merr *multierror.Error - for name, state := range m.states { - if state == nil { - // If no state was found in the state file, we don't mark the state dirty nor return an error + + // Process each state in the file + for name, rawState := range rawStates { + // For unregistered states, preserve the raw JSON + if _, registered := m.stateTypes[name]; !registered { + m.states[name] = &RawState{data: rawState} continue } + // Load the registered state + loadedState, err := m.loadSingleRawState(name, rawState) + if err != nil { + merr = multierror.Append(merr, err) + continue + } + + if loadedState == nil { + continue + } + + // Check if state supports cleanup + cleanableState, isCleanable := loadedState.(CleanableState) + if !isCleanable { + // If it doesn't support cleanup, keep it as-is + m.states[name] = loadedState + continue + } + + // Perform cleanup for cleanable states log.Infof("client was not shut down properly, cleaning up %s", name) - if err := state.Cleanup(); err != nil { + if err := cleanableState.Cleanup(); err != nil { merr = multierror.Append(merr, fmt.Errorf("cleanup state for %s: %w", name, err)) + // On cleanup error, preserve the state + m.states[name] = loadedState } else { - // mark for deletion on cleanup success + // Successfully cleaned up - mark for deletion m.states[name] = nil m.dirty[name] = struct{}{} }