From dd29f4c01e4756b7a6d6975b672d929e2c2748aa Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Mon, 19 Jun 2023 11:20:34 +0200 Subject: [PATCH] Reduce the peer status notifications (#956) Reduce the peer status notifications When receive new network map invoke multiple notifications for every single peers. It cause high cpu usage We handle the in a batch the peer notification in update network map. - Remove the unnecessary UpdatePeerFQDN calls in addNewPeer - Fix notification in RemovePeer function - Involve FinishPeerListModifications logic --- client/internal/engine.go | 9 +++--- client/internal/peer/status.go | 43 +++++++++++++++++++++++------ client/internal/peer/status_test.go | 6 ++-- 3 files changed, 41 insertions(+), 17 deletions(-) diff --git a/client/internal/engine.go b/client/internal/engine.go index 3bc3bb334..9cab064b1 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -605,6 +605,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error { // cleanup request, most likely our peer has been deleted if networkMap.GetRemotePeersIsEmpty() { err := e.removeAllPeers() + e.statusRecorder.FinishPeerListModifications() if err != nil { return err } @@ -624,6 +625,8 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error { return err } + e.statusRecorder.FinishPeerListModifications() + // update SSHServer by adding remote peer SSH keys if !isNil(e.sshServer) { for _, config := range networkMap.GetRemotePeers() { @@ -759,17 +762,13 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error { } e.peerConns[peerKey] = conn - err = e.statusRecorder.AddPeer(peerKey) + err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn) if err != nil { log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err) } go e.connWorker(conn, peerKey) } - err := e.statusRecorder.UpdatePeerFQDN(peerKey, peerConfig.Fqdn) - if err != nil { - log.Warnf("error updating peer's %s fqdn in the status recorder, got error: %v", peerKey, err) - } return nil } diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 3910343dd..21e8be177 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -59,6 +59,11 @@ type Status struct { mgmAddress string signalAddress string notifier *notifier + + // To reduce the number of notification invocation this bool will be true when need to call the notification + // Some Peer actions mostly used by in a batch when the network map has been synchronized. In these type of events + // set to true this variable and at the end of the processing we will reset it by the FinishPeerListModifications() + peerListChangedForNotification bool } // NewRecorder returns a new Status instance @@ -78,11 +83,13 @@ func (d *Status) ReplaceOfflinePeers(replacement []State) { defer d.mux.Unlock() d.offlinePeers = make([]State, len(replacement)) copy(d.offlinePeers, replacement) - d.notifyPeerListChanged() + + // todo we should set to true in case if the list changed only + d.peerListChangedForNotification = true } // AddPeer adds peer to Daemon status map -func (d *Status) AddPeer(peerPubKey string) error { +func (d *Status) AddPeer(peerPubKey string, fqdn string) error { d.mux.Lock() defer d.mux.Unlock() @@ -90,7 +97,12 @@ func (d *Status) AddPeer(peerPubKey string) error { if ok { return errors.New("peer already exist") } - d.peers[peerPubKey] = State{PubKey: peerPubKey, ConnStatus: StatusDisconnected} + d.peers[peerPubKey] = State{ + PubKey: peerPubKey, + ConnStatus: StatusDisconnected, + FQDN: fqdn, + } + d.peerListChangedForNotification = true return nil } @@ -112,13 +124,13 @@ func (d *Status) RemovePeer(peerPubKey string) error { defer d.mux.Unlock() _, ok := d.peers[peerPubKey] - if ok { - delete(d.peers, peerPubKey) - return nil + if !ok { + return errors.New("no peer with to remove") } - d.notifyPeerListChanged() - return errors.New("no peer with to remove") + delete(d.peers, peerPubKey) + d.peerListChangedForNotification = true + return nil } // UpdatePeerState updates peer status @@ -188,10 +200,23 @@ func (d *Status) UpdatePeerFQDN(peerPubKey, fqdn string) error { peerState.FQDN = fqdn d.peers[peerPubKey] = peerState - d.notifyPeerListChanged() return nil } +// FinishPeerListModifications this event invoke the notification +func (d *Status) FinishPeerListModifications() { + d.mux.Lock() + + if !d.peerListChangedForNotification { + d.mux.Unlock() + return + } + d.peerListChangedForNotification = false + d.mux.Unlock() + + d.notifyPeerListChanged() +} + // GetPeerStateChangeNotifier returns a change notifier channel for a peer func (d *Status) GetPeerStateChangeNotifier(peer string) <-chan struct{} { d.mux.Lock() diff --git a/client/internal/peer/status_test.go b/client/internal/peer/status_test.go index 86ee933f6..5730b1cf1 100644 --- a/client/internal/peer/status_test.go +++ b/client/internal/peer/status_test.go @@ -9,13 +9,13 @@ import ( func TestAddPeer(t *testing.T) { key := "abc" status := NewRecorder("https://mgm") - err := status.AddPeer(key) + err := status.AddPeer(key, "abc.netbird") assert.NoError(t, err, "shouldn't return error") _, exists := status.peers[key] assert.True(t, exists, "value was found") - err = status.AddPeer(key) + err = status.AddPeer(key, "abc.netbird") assert.Error(t, err, "should return error on duplicate") } @@ -23,7 +23,7 @@ func TestAddPeer(t *testing.T) { func TestGetPeer(t *testing.T) { key := "abc" status := NewRecorder("https://mgm") - err := status.AddPeer(key) + err := status.AddPeer(key, "abc.netbird") assert.NoError(t, err, "shouldn't return error") peerStatus, err := status.GetPeer(key)