From 41cd4952f17d4e117bddf078d6ab56704f73b809 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Mon, 2 Jun 2025 12:11:54 +0200 Subject: [PATCH 1/5] [client] Apply return traffic rules only if firewall is stateless (#3895) --- client/firewall/iptables/manager_linux.go | 4 + client/firewall/manager/firewall.go | 2 + client/firewall/nftables/manager_linux.go | 4 + client/firewall/uspfilter/uspfilter.go | 9 +- client/internal/acl/manager.go | 6 +- client/internal/acl/manager_test.go | 187 +++++++++++-------- client/internal/routemanager/static/route.go | 1 - 7 files changed, 130 insertions(+), 83 deletions(-) diff --git a/client/firewall/iptables/manager_linux.go b/client/firewall/iptables/manager_linux.go index b229688fc..0897f831f 100644 --- a/client/firewall/iptables/manager_linux.go +++ b/client/firewall/iptables/manager_linux.go @@ -147,6 +147,10 @@ func (m *Manager) IsServerRouteSupported() bool { return true } +func (m *Manager) IsStateful() bool { + return true +} + func (m *Manager) AddNatRule(pair firewall.RouterPair) error { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/client/firewall/manager/firewall.go b/client/firewall/manager/firewall.go index 084d19423..3b3164823 100644 --- a/client/firewall/manager/firewall.go +++ b/client/firewall/manager/firewall.go @@ -116,6 +116,8 @@ type Manager interface { // IsServerRouteSupported returns true if the firewall supports server side routing operations IsServerRouteSupported() bool + IsStateful() bool + AddRouteFiltering( id []byte, sources []netip.Prefix, diff --git a/client/firewall/nftables/manager_linux.go b/client/firewall/nftables/manager_linux.go index e6b3a031b..2f8ee81a4 100644 --- a/client/firewall/nftables/manager_linux.go +++ b/client/firewall/nftables/manager_linux.go @@ -170,6 +170,10 @@ func (m *Manager) IsServerRouteSupported() bool { return true } +func (m *Manager) IsStateful() bool { + return true +} + func (m *Manager) AddNatRule(pair firewall.RouterPair) error { m.mutex.Lock() defer m.mutex.Unlock() diff --git a/client/firewall/uspfilter/uspfilter.go b/client/firewall/uspfilter/uspfilter.go index 11730dbb3..287e52773 100644 --- a/client/firewall/uspfilter/uspfilter.go +++ b/client/firewall/uspfilter/uspfilter.go @@ -326,6 +326,10 @@ func (m *Manager) IsServerRouteSupported() bool { return true } +func (m *Manager) IsStateful() bool { + return m.stateful +} + func (m *Manager) AddNatRule(pair firewall.RouterPair) error { if m.nativeRouter.Load() && m.nativeFirewall != nil { return m.nativeFirewall.AddNatRule(pair) @@ -606,9 +610,8 @@ func (m *Manager) processOutgoingHooks(packetData []byte, size int) bool { return true } - if m.stateful { - m.trackOutbound(d, srcIP, dstIP, size) - } + // for netflow we keep track even if the firewall is stateless + m.trackOutbound(d, srcIP, dstIP, size) return false } diff --git a/client/internal/acl/manager.go b/client/internal/acl/manager.go index a6316d7a2..5caf2b770 100644 --- a/client/internal/acl/manager.go +++ b/client/internal/acl/manager.go @@ -285,8 +285,10 @@ func (d *DefaultManager) protoRuleToFirewallRule( case mgmProto.RuleDirection_IN: rules, err = d.addInRules(r.PolicyID, ip, protocol, port, action, ipsetName) case mgmProto.RuleDirection_OUT: - // TODO: Remove this soon. Outbound rules are obsolete. - // We only maintain this for return traffic (inbound dir) which is now handled by the stateful firewall already + if d.firewall.IsStateful() { + return "", nil, nil + } + // return traffic for outbound connections if firewall is stateless rules, err = d.addOutRules(r.PolicyID, ip, protocol, port, action, ipsetName) default: return "", nil, fmt.Errorf("invalid direction, skipping firewall rule") diff --git a/client/internal/acl/manager_test.go b/client/internal/acl/manager_test.go index 3595ca600..532d70a24 100644 --- a/client/internal/acl/manager_test.go +++ b/client/internal/acl/manager_test.go @@ -5,9 +5,10 @@ import ( "testing" "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/netbirdio/netbird/client/firewall" - "github.com/netbirdio/netbird/client/firewall/manager" "github.com/netbirdio/netbird/client/iface/wgaddr" "github.com/netbirdio/netbird/client/internal/acl/mocks" "github.com/netbirdio/netbird/client/internal/netflow" @@ -43,9 +44,7 @@ func TestDefaultManager(t *testing.T) { ifaceMock.EXPECT().IsUserspaceBind().Return(true).AnyTimes() ifaceMock.EXPECT().SetFilter(gomock.Any()) ip, network, err := net.ParseCIDR("172.0.0.1/32") - if err != nil { - t.Fatalf("failed to parse IP address: %v", err) - } + require.NoError(t, err) ifaceMock.EXPECT().Name().Return("lo").AnyTimes() ifaceMock.EXPECT().Address().Return(wgaddr.Address{ @@ -54,23 +53,22 @@ func TestDefaultManager(t *testing.T) { }).AnyTimes() ifaceMock.EXPECT().GetWGDevice().Return(nil).AnyTimes() - // we receive one rule from the management so for testing purposes ignore it fw, err := firewall.NewFirewall(ifaceMock, nil, flowLogger, false) - if err != nil { - t.Errorf("create firewall: %v", err) - return - } - defer func(fw manager.Manager) { - _ = fw.Close(nil) - }(fw) + require.NoError(t, err) + defer func() { + err = fw.Close(nil) + require.NoError(t, err) + }() + acl := NewDefaultManager(fw) t.Run("apply firewall rules", func(t *testing.T) { acl.ApplyFiltering(networkMap, false) - if len(acl.peerRulesPairs) != 2 { - t.Errorf("firewall rules not applied: %v", acl.peerRulesPairs) - return + if fw.IsStateful() { + assert.Equal(t, 0, len(acl.peerRulesPairs)) + } else { + assert.Equal(t, 2, len(acl.peerRulesPairs)) } }) @@ -94,12 +92,13 @@ func TestDefaultManager(t *testing.T) { acl.ApplyFiltering(networkMap, false) - // we should have one old and one new rule in the existed rules - if len(acl.peerRulesPairs) != 2 { - t.Errorf("firewall rules not applied") - return + expectedRules := 2 + if fw.IsStateful() { + expectedRules = 1 // only the inbound rule } + assert.Equal(t, expectedRules, len(acl.peerRulesPairs)) + // check that old rule was removed previousCount := 0 for id := range acl.peerRulesPairs { @@ -107,26 +106,87 @@ func TestDefaultManager(t *testing.T) { previousCount++ } } - if previousCount != 1 { - t.Errorf("old rule was not removed") + + expectedPreviousCount := 0 + if !fw.IsStateful() { + expectedPreviousCount = 1 } + assert.Equal(t, expectedPreviousCount, previousCount) }) t.Run("handle default rules", func(t *testing.T) { networkMap.FirewallRules = networkMap.FirewallRules[:0] networkMap.FirewallRulesIsEmpty = true - if acl.ApplyFiltering(networkMap, false); len(acl.peerRulesPairs) != 0 { - t.Errorf("rules should be empty if FirewallRulesIsEmpty is set, got: %v", len(acl.peerRulesPairs)) - return - } + acl.ApplyFiltering(networkMap, false) + assert.Equal(t, 0, len(acl.peerRulesPairs)) networkMap.FirewallRulesIsEmpty = false acl.ApplyFiltering(networkMap, false) - if len(acl.peerRulesPairs) != 1 { - t.Errorf("rules should contain 1 rules if FirewallRulesIsEmpty is not set, got: %v", len(acl.peerRulesPairs)) - return + + expectedRules := 1 + if fw.IsStateful() { + expectedRules = 1 // only inbound allow-all rule } + assert.Equal(t, expectedRules, len(acl.peerRulesPairs)) + }) +} + +func TestDefaultManagerStateless(t *testing.T) { + // stateless currently only in userspace, so we have to disable kernel + t.Setenv("NB_WG_KERNEL_DISABLED", "true") + t.Setenv("NB_DISABLE_CONNTRACK", "true") + + networkMap := &mgmProto.NetworkMap{ + FirewallRules: []*mgmProto.FirewallRule{ + { + PeerIP: "10.93.0.1", + Direction: mgmProto.RuleDirection_OUT, + Action: mgmProto.RuleAction_ACCEPT, + Protocol: mgmProto.RuleProtocol_TCP, + Port: "80", + }, + { + PeerIP: "10.93.0.2", + Direction: mgmProto.RuleDirection_IN, + Action: mgmProto.RuleAction_ACCEPT, + Protocol: mgmProto.RuleProtocol_UDP, + Port: "53", + }, + }, + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + ifaceMock := mocks.NewMockIFaceMapper(ctrl) + ifaceMock.EXPECT().IsUserspaceBind().Return(true).AnyTimes() + ifaceMock.EXPECT().SetFilter(gomock.Any()) + ip, network, err := net.ParseCIDR("172.0.0.1/32") + require.NoError(t, err) + + ifaceMock.EXPECT().Name().Return("lo").AnyTimes() + ifaceMock.EXPECT().Address().Return(wgaddr.Address{ + IP: ip, + Network: network, + }).AnyTimes() + ifaceMock.EXPECT().GetWGDevice().Return(nil).AnyTimes() + + fw, err := firewall.NewFirewall(ifaceMock, nil, flowLogger, false) + require.NoError(t, err) + defer func() { + err = fw.Close(nil) + require.NoError(t, err) + }() + + acl := NewDefaultManager(fw) + + t.Run("stateless firewall creates outbound rules", func(t *testing.T) { + acl.ApplyFiltering(networkMap, false) + + // In stateless mode, we should have both inbound and outbound rules + assert.False(t, fw.IsStateful()) + assert.Equal(t, 2, len(acl.peerRulesPairs)) }) } @@ -192,42 +252,19 @@ func TestDefaultManagerSquashRules(t *testing.T) { manager := &DefaultManager{} rules, _ := manager.squashAcceptRules(networkMap) - if len(rules) != 2 { - t.Errorf("rules should contain 2, got: %v", rules) - return - } + assert.Equal(t, 2, len(rules)) r := rules[0] - switch { - case r.PeerIP != "0.0.0.0": - t.Errorf("IP should be 0.0.0.0, got: %v", r.PeerIP) - return - case r.Direction != mgmProto.RuleDirection_IN: - t.Errorf("direction should be IN, got: %v", r.Direction) - return - case r.Protocol != mgmProto.RuleProtocol_ALL: - t.Errorf("protocol should be ALL, got: %v", r.Protocol) - return - case r.Action != mgmProto.RuleAction_ACCEPT: - t.Errorf("action should be ACCEPT, got: %v", r.Action) - return - } + assert.Equal(t, "0.0.0.0", r.PeerIP) + assert.Equal(t, mgmProto.RuleDirection_IN, r.Direction) + assert.Equal(t, mgmProto.RuleProtocol_ALL, r.Protocol) + assert.Equal(t, mgmProto.RuleAction_ACCEPT, r.Action) r = rules[1] - switch { - case r.PeerIP != "0.0.0.0": - t.Errorf("IP should be 0.0.0.0, got: %v", r.PeerIP) - return - case r.Direction != mgmProto.RuleDirection_OUT: - t.Errorf("direction should be OUT, got: %v", r.Direction) - return - case r.Protocol != mgmProto.RuleProtocol_ALL: - t.Errorf("protocol should be ALL, got: %v", r.Protocol) - return - case r.Action != mgmProto.RuleAction_ACCEPT: - t.Errorf("action should be ACCEPT, got: %v", r.Action) - return - } + assert.Equal(t, "0.0.0.0", r.PeerIP) + assert.Equal(t, mgmProto.RuleDirection_OUT, r.Direction) + assert.Equal(t, mgmProto.RuleProtocol_ALL, r.Protocol) + assert.Equal(t, mgmProto.RuleAction_ACCEPT, r.Action) } func TestDefaultManagerSquashRulesNoAffect(t *testing.T) { @@ -291,9 +328,8 @@ func TestDefaultManagerSquashRulesNoAffect(t *testing.T) { } manager := &DefaultManager{} - if rules, _ := manager.squashAcceptRules(networkMap); len(rules) != len(networkMap.FirewallRules) { - t.Errorf("we should get the same amount of rules as output, got %v", len(rules)) - } + rules, _ := manager.squashAcceptRules(networkMap) + assert.Equal(t, len(networkMap.FirewallRules), len(rules)) } func TestDefaultManagerEnableSSHRules(t *testing.T) { @@ -337,9 +373,7 @@ func TestDefaultManagerEnableSSHRules(t *testing.T) { ifaceMock.EXPECT().IsUserspaceBind().Return(true).AnyTimes() ifaceMock.EXPECT().SetFilter(gomock.Any()) ip, network, err := net.ParseCIDR("172.0.0.1/32") - if err != nil { - t.Fatalf("failed to parse IP address: %v", err) - } + require.NoError(t, err) ifaceMock.EXPECT().Name().Return("lo").AnyTimes() ifaceMock.EXPECT().Address().Return(wgaddr.Address{ @@ -348,21 +382,20 @@ func TestDefaultManagerEnableSSHRules(t *testing.T) { }).AnyTimes() ifaceMock.EXPECT().GetWGDevice().Return(nil).AnyTimes() - // we receive one rule from the management so for testing purposes ignore it fw, err := firewall.NewFirewall(ifaceMock, nil, flowLogger, false) - if err != nil { - t.Errorf("create firewall: %v", err) - return - } - defer func(fw manager.Manager) { - _ = fw.Close(nil) - }(fw) + require.NoError(t, err) + defer func() { + err = fw.Close(nil) + require.NoError(t, err) + }() + acl := NewDefaultManager(fw) acl.ApplyFiltering(networkMap, false) - if len(acl.peerRulesPairs) != 3 { - t.Errorf("expect 3 rules (last must be SSH), got: %d", len(acl.peerRulesPairs)) - return + expectedRules := 3 + if fw.IsStateful() { + expectedRules = 3 // 2 inbound rules + SSH rule } + assert.Equal(t, expectedRules, len(acl.peerRulesPairs)) } diff --git a/client/internal/routemanager/static/route.go b/client/internal/routemanager/static/route.go index 98c34dbee..681c192fb 100644 --- a/client/internal/routemanager/static/route.go +++ b/client/internal/routemanager/static/route.go @@ -24,7 +24,6 @@ func NewRoute(rt *route.Route, routeRefCounter *refcounter.RouteRefCounter, allo } } -// Route route methods func (r *Route) String() string { return r.route.Network.String() } From 07b220d91ba77248dbf320ea943e83ea6da18e69 Mon Sep 17 00:00:00 2001 From: Pedro Maia Costa <550684+pnmcosta@users.noreply.github.com> Date: Mon, 2 Jun 2025 21:11:28 +0100 Subject: [PATCH 2/5] [management] REST client impersonation (#3879) --- management/client/rest/client.go | 2 + management/client/rest/impersonation.go | 48 ++++++++++++ management/client/rest/impersonation_test.go | 77 ++++++++++++++++++++ management/client/rest/options.go | 9 +++ management/server/types/settings.go | 2 +- 5 files changed, 137 insertions(+), 1 deletion(-) create mode 100644 management/client/rest/impersonation.go create mode 100644 management/client/rest/impersonation_test.go diff --git a/management/client/rest/client.go b/management/client/rest/client.go index 25e8ad0da..8bf11caae 100644 --- a/management/client/rest/client.go +++ b/management/client/rest/client.go @@ -86,6 +86,7 @@ func NewWithBearerToken(managementURL, token string) *Client { ) } +// NewWithOptions initialize new Client instance with options func NewWithOptions(opts ...option) *Client { client := &Client{ httpClient: http.DefaultClient, @@ -115,6 +116,7 @@ func (c *Client) initialize() { c.Events = &EventsAPI{c} } +// NewRequest creates and executes new management API request func (c *Client) NewRequest(ctx context.Context, method, path string, body io.Reader) (*http.Response, error) { req, err := http.NewRequestWithContext(ctx, method, c.managementURL+path, body) if err != nil { diff --git a/management/client/rest/impersonation.go b/management/client/rest/impersonation.go new file mode 100644 index 000000000..4d47c9373 --- /dev/null +++ b/management/client/rest/impersonation.go @@ -0,0 +1,48 @@ +package rest + +import ( + "net/http" + "net/url" +) + +// Impersonate returns a Client impersonated for a specific account +func (c *Client) Impersonate(account string) *Client { + client := NewWithOptions( + WithManagementURL(c.managementURL), + WithAuthHeader(c.authHeader), + WithHttpClient(newImpersonatedHttpClient(c, account)), + ) + return client +} + +type impersonatedHttpClient struct { + baseClient HttpClient + account string +} + +func newImpersonatedHttpClient(c *Client, account string) *impersonatedHttpClient { + if hc, ok := c.httpClient.(*impersonatedHttpClient); ok { + hc.account = account + return hc + } + + return &impersonatedHttpClient{ + baseClient: c.httpClient, + account: account, + } +} + +func (c *impersonatedHttpClient) Do(req *http.Request) (*http.Response, error) { + parsedURL, err := url.Parse(req.URL.String()) + if err != nil { + return nil, err + } + + query := parsedURL.Query() + query.Set("account", c.account) + parsedURL.RawQuery = query.Encode() + + req.URL = parsedURL + + return c.baseClient.Do(req) +} diff --git a/management/client/rest/impersonation_test.go b/management/client/rest/impersonation_test.go new file mode 100644 index 000000000..69c0f9728 --- /dev/null +++ b/management/client/rest/impersonation_test.go @@ -0,0 +1,77 @@ +//go:build integration +// +build integration + +package rest_test + +import ( + "context" + "encoding/json" + "net/http" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/netbirdio/netbird/management/client/rest" + "github.com/netbirdio/netbird/management/server/http/api" +) + +var ( + testImpersonatedAccount = api.Account{ + Id: "ImpersonatedTest", + Settings: api.AccountSettings{ + Extra: &api.AccountExtraSettings{ + PeerApprovalEnabled: false, + }, + GroupsPropagationEnabled: ptr(true), + JwtGroupsEnabled: ptr(false), + PeerInactivityExpiration: 7, + PeerInactivityExpirationEnabled: true, + PeerLoginExpiration: 24, + PeerLoginExpirationEnabled: true, + RegularUsersViewBlocked: false, + RoutingPeerDnsResolutionEnabled: ptr(false), + }, + } +) + +func TestImpersonation_Peers_List_200(t *testing.T) { + withMockClient(func(c *rest.Client, mux *http.ServeMux) { + impersonatedClient := c.Impersonate(testImpersonatedAccount.Id) + mux.HandleFunc("/api/peers", func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.URL.Query().Get("account"), testImpersonatedAccount.Id) + retBytes, _ := json.Marshal([]api.Peer{testPeer}) + _, err := w.Write(retBytes) + require.NoError(t, err) + }) + ret, err := impersonatedClient.Peers.List(context.Background()) + require.NoError(t, err) + assert.Len(t, ret, 1) + assert.Equal(t, testPeer, ret[0]) + }) +} + +func TestImpersonation_Change_Account(t *testing.T) { + withMockClient(func(c *rest.Client, mux *http.ServeMux) { + impersonatedClient := c.Impersonate(testImpersonatedAccount.Id) + mux.HandleFunc("/api/peers", func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.URL.Query().Get("account"), testImpersonatedAccount.Id) + retBytes, _ := json.Marshal([]api.Peer{testPeer}) + _, err := w.Write(retBytes) + require.NoError(t, err) + }) + _, err := impersonatedClient.Peers.List(context.Background()) + require.NoError(t, err) + + impersonatedClient = impersonatedClient.Impersonate("another-test-account") + mux.HandleFunc("/api/peers/Test", func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, r.URL.Query().Get("account"), "another-test-account") + retBytes, _ := json.Marshal(testPeer) + _, err := w.Write(retBytes) + require.NoError(t, err) + }) + + _, err = impersonatedClient.Peers.Get(context.Background(), "Test") + require.NoError(t, err) + }) +} diff --git a/management/client/rest/options.go b/management/client/rest/options.go index 5aad7dd7e..21f2394e9 100644 --- a/management/client/rest/options.go +++ b/management/client/rest/options.go @@ -2,32 +2,41 @@ package rest import "net/http" +// option modifier for creation of Client type option func(*Client) +// HTTPClient interface for HTTP client type HttpClient interface { Do(req *http.Request) (*http.Response, error) } +// WithHTTPClient overrides HTTPClient used func WithHttpClient(client HttpClient) option { return func(c *Client) { c.httpClient = client } } +// WithBearerToken uses provided bearer token acquired from SSO for authentication func WithBearerToken(token string) option { return WithAuthHeader("Bearer " + token) } +// WithPAT uses provided Personal Access Token +// (created from NetBird Management Dashboard) for authentication func WithPAT(token string) option { return WithAuthHeader("Token " + token) } +// WithManagementURL overrides target NetBird Management server func WithManagementURL(url string) option { return func(c *Client) { c.managementURL = url } } +// WithAuthHeader overrides auth header completely, this should generally not be used +// and WithBearerToken or WithPAT should be used instead func WithAuthHeader(value string) option { return func(c *Client) { c.authHeader = value diff --git a/management/server/types/settings.go b/management/server/types/settings.go index bd361f3ff..a22a36b03 100644 --- a/management/server/types/settings.go +++ b/management/server/types/settings.go @@ -45,7 +45,7 @@ type Settings struct { // Extra is a dictionary of Account settings Extra *ExtraSettings `gorm:"embedded;embeddedPrefix:extra_"` - // LazyConnectionEnabled indicates wether the experimental feature is enabled or disabled + // LazyConnectionEnabled indicates if the experimental feature is enabled or disabled LazyConnectionEnabled bool `gorm:"default:false"` } From 35287f8241f4bf3582ed9d1049c5da5c2e6bde2e Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Mon, 2 Jun 2025 23:37:51 +0100 Subject: [PATCH 3/5] [misc] Fail linter workflows on codespell failures (#3913) * Fail linter workflows on codespell failures * testing workflow * remove test --- .github/workflows/golangci-lint.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index bdd508e9b..7e6583cc6 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -21,7 +21,6 @@ jobs: with: ignore_words_list: erro,clienta,hastable,iif,groupd,testin,groupe skip: go.mod,go.sum - only_warn: 1 golangci: strategy: fail-fast: false From af27aaf9af28124198fcf0b1a90ef3443f6aeb5d Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Tue, 3 Jun 2025 09:20:33 +0200 Subject: [PATCH 4/5] [client] Refactor peer state change subscription mechanism (#3910) * Refactor peer state change subscription mechanism Because the code generated new channel for every single event, was easy to miss notification. Use single channel. * Fix lint * Avoid potential deadlock * Fix test * Add context * Fix test --- client/internal/peer/status.go | 81 +++++++++++++++++++++----- client/internal/peer/status_test.go | 13 +++-- client/internal/routemanager/client.go | 11 ++-- 3 files changed, 78 insertions(+), 27 deletions(-) diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index 40956e68e..0c6aac372 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -1,6 +1,7 @@ package peer import ( + "context" "errors" "net/netip" "slices" @@ -146,11 +147,31 @@ type FullStatus struct { LazyConnectionEnabled bool } +type StatusChangeSubscription struct { + peerID string + id string + eventsChan chan struct{} + ctx context.Context +} + +func newStatusChangeSubscription(ctx context.Context, peerID string) *StatusChangeSubscription { + return &StatusChangeSubscription{ + ctx: ctx, + peerID: peerID, + id: uuid.New().String(), + eventsChan: make(chan struct{}, 1), + } +} + +func (s *StatusChangeSubscription) Events() chan struct{} { + return s.eventsChan +} + // Status holds a state of peers, signal, management connections and relays type Status struct { mux sync.Mutex peers map[string]State - changeNotify map[string]chan struct{} + changeNotify map[string]map[string]*StatusChangeSubscription // map[peerID]map[subscriptionID]*StatusChangeSubscription signalState bool signalError error managementState bool @@ -187,7 +208,7 @@ type Status struct { func NewRecorder(mgmAddress string) *Status { return &Status{ peers: make(map[string]State), - changeNotify: make(map[string]chan struct{}), + changeNotify: make(map[string]map[string]*StatusChangeSubscription), eventStreams: make(map[string]chan *proto.SystemEvent), eventQueue: NewEventQueue(eventQueueSize), offlinePeers: make([]State, 0), @@ -312,7 +333,6 @@ func (d *Status) UpdatePeerState(receivedState State) error { // when we close the connection we will not notify the router manager if receivedState.ConnStatus == StatusIdle { d.notifyPeerStateChangeListeners(receivedState.PubKey) - } return nil } @@ -552,19 +572,41 @@ func (d *Status) FinishPeerListModifications() { d.notifyPeerListChanged() } -// GetPeerStateChangeNotifier returns a change notifier channel for a peer -func (d *Status) GetPeerStateChangeNotifier(peer string) <-chan struct{} { +func (d *Status) SubscribeToPeerStateChanges(ctx context.Context, peerID string) *StatusChangeSubscription { d.mux.Lock() defer d.mux.Unlock() - ch, found := d.changeNotify[peer] - if found { - return ch + sub := newStatusChangeSubscription(ctx, peerID) + if _, ok := d.changeNotify[peerID]; !ok { + d.changeNotify[peerID] = make(map[string]*StatusChangeSubscription) + } + d.changeNotify[peerID][sub.id] = sub + + return sub +} + +func (d *Status) UnsubscribePeerStateChanges(subscription *StatusChangeSubscription) { + d.mux.Lock() + defer d.mux.Unlock() + + if subscription == nil { + return } - ch = make(chan struct{}) - d.changeNotify[peer] = ch - return ch + channels, ok := d.changeNotify[subscription.peerID] + if !ok { + return + } + + sub, exists := channels[subscription.id] + if !exists { + return + } + + delete(channels, subscription.id) + if len(channels) == 0 { + delete(d.changeNotify, sub.peerID) + } } // GetLocalPeerState returns the local peer state @@ -939,13 +981,20 @@ func (d *Status) onConnectionChanged() { // notifyPeerStateChangeListeners notifies route manager about the change in peer state func (d *Status) notifyPeerStateChangeListeners(peerID string) { - ch, found := d.changeNotify[peerID] - if !found { + subs, ok := d.changeNotify[peerID] + if !ok { return } - - close(ch) - delete(d.changeNotify, peerID) + for _, sub := range subs { + // block the write because we do not want to miss notification + // must have to be sure we will run the GetPeerState() on separated thread + go func() { + select { + case sub.eventsChan <- struct{}{}: + case <-sub.ctx.Done(): + } + }() + } } func (d *Status) notifyPeerListChanged() { diff --git a/client/internal/peer/status_test.go b/client/internal/peer/status_test.go index 8f28a9862..272638750 100644 --- a/client/internal/peer/status_test.go +++ b/client/internal/peer/status_test.go @@ -1,6 +1,7 @@ package peer import ( + "context" "errors" "sync" "testing" @@ -86,8 +87,8 @@ func TestGetPeerStateChangeNotifierLogic(t *testing.T) { status := NewRecorder("https://mgm") _ = status.AddPeer(key, "abc.netbird", ip) - ch := status.GetPeerStateChangeNotifier(key) - assert.NotNil(t, ch, "channel shouldn't be nil") + sub := status.SubscribeToPeerStateChanges(context.Background(), key) + assert.NotNil(t, sub, "channel shouldn't be nil") peerState := State{ PubKey: key, @@ -99,10 +100,12 @@ func TestGetPeerStateChangeNotifierLogic(t *testing.T) { err := status.UpdatePeerRelayedStateToDisconnected(peerState) assert.NoError(t, err, "shouldn't return error") + timeoutCtx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() select { - case <-ch: - default: - t.Errorf("channel wasn't closed after update") + case <-sub.eventsChan: + case <-timeoutCtx.Done(): + t.Errorf("timed out waiting for event") } } diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index 137e00d31..bff954c27 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -224,19 +224,18 @@ func (c *clientNetwork) getBestRouteFromStatuses(routePeerStatuses map[route.ID] } func (c *clientNetwork) watchPeerStatusChanges(ctx context.Context, peerKey string, peerStateUpdate chan struct{}, closer chan struct{}) { + subscription := c.statusRecorder.SubscribeToPeerStateChanges(ctx, peerKey) + defer c.statusRecorder.UnsubscribePeerStateChanges(subscription) + for { select { case <-ctx.Done(): return case <-closer: return - case <-c.statusRecorder.GetPeerStateChangeNotifier(peerKey): - state, err := c.statusRecorder.GetPeer(peerKey) - if err != nil { - continue - } + case <-subscription.Events(): peerStateUpdate <- struct{}{} - log.Debugf("triggered route state update for Peer %s, state: %s", peerKey, state.ConnStatus) + log.Debugf("triggered route state update for Peer: %s", peerKey) } } } From 616b19c0644be0d1b0b5ccf4632653f6402e4faf Mon Sep 17 00:00:00 2001 From: hakansa <43675540+hakansa@users.noreply.github.com> Date: Tue, 3 Jun 2025 10:49:13 +0300 Subject: [PATCH 5/5] [client] Add "Deselect All" Menu Item to Exit Node Menu (#3877) * [client] Enhance exit node menu functionality with deselect all option * Hide exit nodes before removal in recreateExitNodeMenu * recreateExitNodeMenu adding mutex locks * Refetch exit nodes after deselecting all in exit node menu --- client/ui/client_ui.go | 18 +++++---- client/ui/network.go | 90 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+), 8 deletions(-) diff --git a/client/ui/client_ui.go b/client/ui/client_ui.go index c23b78582..c0c8692c6 100644 --- a/client/ui/client_ui.go +++ b/client/ui/client_ui.go @@ -235,9 +235,11 @@ type serviceClient struct { eventManager *event.Manager - exitNodeMu sync.Mutex - mExitNodeItems []menuHandler - logFile string + exitNodeMu sync.Mutex + mExitNodeItems []menuHandler + exitNodeStates []exitNodeState + mExitNodeDeselectAll *systray.MenuItem + logFile string } type menuHandler struct { @@ -1035,11 +1037,11 @@ func (s *serviceClient) updateConfig() error { lazyConnectionEnabled := s.mLazyConnEnabled.Checked() loginRequest := proto.LoginRequest{ - IsUnixDesktopClient: runtime.GOOS == "linux" || runtime.GOOS == "freebsd", - ServerSSHAllowed: &sshAllowed, - RosenpassEnabled: &rosenpassEnabled, - DisableAutoConnect: &disableAutoStart, - DisableNotifications: ¬ificationsDisabled, + IsUnixDesktopClient: runtime.GOOS == "linux" || runtime.GOOS == "freebsd", + ServerSSHAllowed: &sshAllowed, + RosenpassEnabled: &rosenpassEnabled, + DisableAutoConnect: &disableAutoStart, + DisableNotifications: ¬ificationsDisabled, LazyConnectionEnabled: &lazyConnectionEnabled, } diff --git a/client/ui/network.go b/client/ui/network.go index 435917f30..b3748a89d 100644 --- a/client/ui/network.go +++ b/client/ui/network.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "runtime" + "slices" "sort" "strings" "time" @@ -33,6 +34,11 @@ const ( type filter string +type exitNodeState struct { + id string + selected bool +} + func (s *serviceClient) showNetworksUI() { s.wNetworks = s.app.NewWindow("Networks") s.wNetworks.SetOnClosed(s.cancel) @@ -357,18 +363,45 @@ func (s *serviceClient) updateExitNodes() { } func (s *serviceClient) recreateExitNodeMenu(exitNodes []*proto.Network) { + var exitNodeIDs []exitNodeState + for _, node := range exitNodes { + exitNodeIDs = append(exitNodeIDs, exitNodeState{ + id: node.ID, + selected: node.Selected, + }) + } + + sort.Slice(exitNodeIDs, func(i, j int) bool { + return exitNodeIDs[i].id < exitNodeIDs[j].id + }) + if slices.Equal(s.exitNodeStates, exitNodeIDs) { + log.Debug("Exit node menu already up to date") + return + } + for _, node := range s.mExitNodeItems { node.cancel() + node.Hide() node.Remove() } s.mExitNodeItems = nil + if s.mExitNodeDeselectAll != nil { + s.mExitNodeDeselectAll.Remove() + s.mExitNodeDeselectAll = nil + } if runtime.GOOS == "linux" || runtime.GOOS == "freebsd" { s.mExitNode.Remove() s.mExitNode = systray.AddMenuItem("Exit Node", exitNodeMenuDescr) } + var showDeselectAll bool + for _, node := range exitNodes { + if node.Selected { + showDeselectAll = true + } + menuItem := s.mExitNode.AddSubMenuItemCheckbox( node.ID, fmt.Sprintf("Use exit node %s", node.ID), @@ -383,6 +416,32 @@ func (s *serviceClient) recreateExitNodeMenu(exitNodes []*proto.Network) { go s.handleChecked(ctx, node.ID, menuItem) } + s.exitNodeStates = exitNodeIDs + + if showDeselectAll { + s.mExitNode.AddSeparator() + deselectAllItem := s.mExitNode.AddSubMenuItem("Deselect All", "Deselect All") + s.mExitNodeDeselectAll = deselectAllItem + go func() { + for { + _, ok := <-deselectAllItem.ClickedCh + if !ok { + // channel closed: exit the goroutine + return + } + exitNodes, err := s.handleExitNodeMenuDeselectAll() + if err != nil { + log.Warnf("failed to handle deselect all exit nodes: %v", err) + } else { + s.exitNodeMu.Lock() + s.recreateExitNodeMenu(exitNodes) + s.exitNodeMu.Unlock() + } + } + + }() + } + } func (s *serviceClient) getExitNodes(conn proto.DaemonServiceClient) ([]*proto.Network, error) { @@ -420,6 +479,37 @@ func (s *serviceClient) handleChecked(ctx context.Context, id string, item *syst } } +func (s *serviceClient) handleExitNodeMenuDeselectAll() ([]*proto.Network, error) { + conn, err := s.getSrvClient(defaultFailTimeout) + if err != nil { + return nil, fmt.Errorf("get client: %v", err) + } + + exitNodes, err := s.getExitNodes(conn) + if err != nil { + return nil, fmt.Errorf("get exit nodes: %v", err) + } + + var ids []string + for _, e := range exitNodes { + if e.Selected { + ids = append(ids, e.ID) + } + } + + // deselect selected exit nodes + if err := s.deselectOtherExitNodes(conn, ids); err != nil { + return nil, err + } + + updatedExitNodes, err := s.getExitNodes(conn) + if err != nil { + return nil, fmt.Errorf("re-fetch exit nodes: %v", err) + } + + return updatedExitNodes, nil +} + // Add function to toggle exit node selection func (s *serviceClient) toggleExitNode(nodeID string, item *systray.MenuItem) error { conn, err := s.getSrvClient(defaultFailTimeout)