Compare commits

...

29 Commits

Author SHA1 Message Date
Pascal Fischer
5d34804062 Merge branch 'main' into feature/limit-update-channel 2025-06-28 18:21:17 +02:00
Pascal Fischer
418d23d7f8 Merge branch 'main' into feature/limit-update-channel 2025-06-28 18:20:41 +02:00
Pascal Fischer
b0582c86ba update config if exist 2025-06-28 17:59:35 +02:00
Pascal Fischer
1b7c787cc5 fix tests 2025-06-28 16:08:25 +02:00
Pascal Fischer
ad50e07325 fix testing tool and remove unused const 2025-06-28 15:45:07 +02:00
Pascal Fischer
c66bd0cc71 fix tests 2025-06-28 15:33:49 +02:00
Pascal Fischer
0480507a10 [management] report networkmap duration in ms (#4064) 2025-06-28 11:38:15 +02:00
Krzysztof Nazarewski (kdn)
34ac4e4b5a [misc] fix: self-hosting: the wrong default for NETBIRD_AUTH_PKCE_LOGIN_FLAG (#4055)
* fix: self-hosting: the wrong default for NETBIRD_AUTH_PKCE_LOGIN_FLAG

fixes https://github.com/netbirdio/netbird/issues/4054

* un-quote the number

Co-authored-by: Maycon Santos <mlsmaycon@gmail.com>

---------

Co-authored-by: Maycon Santos <mlsmaycon@gmail.com>
2025-06-26 10:45:00 +02:00
Pascal Fischer
52ff9d9602 [management] remove unused transaction (#4053) 2025-06-26 01:34:22 +02:00
Pascal Fischer
6f0111eab0 fix tests 2025-06-25 16:38:49 +02:00
Pascal Fischer
d403d20e7b fix testing tools 2025-06-25 15:37:31 +02:00
Pascal Fischer
c043018939 fix tests 2025-06-25 12:27:55 +02:00
Pascal Fischer
ed6ed4a597 add metrics 2025-06-25 12:09:06 +02:00
Pascal Fischer
905a3481ec Merge branch 'refs/heads/main' into feature/limit-update-channel 2025-06-25 12:05:33 +02:00
Pascal Fischer
1b73fae46e [management] add breakdown of network map calculation metrics (#4020) 2025-06-25 11:46:35 +02:00
Pascal Fischer
47052fa024 Merge branch 'main' into feature/limit-update-channel 2025-06-25 11:32:30 +02:00
Viktor Liu
d897365abc [client] Don't open cmd.exe during MSI actions (#4041) 2025-06-24 21:32:37 +02:00
Viktor Liu
f37aa2cc9d [misc] Specify netbird binary location in Dockerfiles (#4024) 2025-06-23 10:09:02 +02:00
Maycon Santos
5343bee7b2 [management] check and log on new management version (#4029)
This PR enhances the version checker to send a custom User-Agent header when polling for updates, and configures both the management CLI and client UI to use distinct agents. 

- NewUpdate now takes an `httpAgent` string to set the User-Agent header.
- `fetchVersion` builds a custom HTTP request (instead of `http.Get`) and sets the User-Agent.
- Management CLI and client UI now pass `"nb/management"` and `"nb/client-ui"` respectively to NewUpdate.
- Tests updated to supply an `httpAgent` constant.
- Logs if there is a new version available for management
2025-06-22 16:44:33 +02:00
Maycon Santos
870e29db63 [misc] add additional metrics (#4028)
* add additional metrics

we are collecting active rosenpass, ssh from the client side
we are also collecting active user peers and active users

* remove duplicated
2025-06-22 13:44:25 +02:00
Maycon Santos
08e9b05d51 [client] close windows when process needs to exit (#4027)
This PR fixes a bug by ensuring that the advanced settings and re-authentication windows are closed appropriately when the main GUI process exits.

- Updated runSelfCommand calls throughout the UI to pass a context parameter.
- Modified runSelfCommand’s signature and its internal command invocation to use exec.CommandContext for proper cancellation handling.
2025-06-22 10:33:04 +02:00
hakansa
3581648071 [client] Refactor showLoginURL to improve error handling and connection status checks (#4026)
This PR refactors showLoginURL to improve error handling and connection status checks by delaying the login fetch until user interaction and closing the pop-up if already connected.

- Moved s.login(false) call into the click handler to defer network I/O.
- Added a conn.Status check after opening the URL to skip reconnection if already connected.
- Enhanced error logs for missing verification URLs and service status failures.
2025-06-22 10:03:58 +02:00
Viktor Liu
2a51609436 [client] Handle lazy routing peers that are part of HA groups (#3943)
* Activate new lazy routing peers if the HA group is active
* Prevent lazy peers going to idle if HA group members are active (#3948)
2025-06-20 18:07:19 +02:00
Pascal Fischer
83457f8b99 [management] add transaction for integrated validator groups update and primary account update (#4014) 2025-06-20 12:13:24 +02:00
Pascal Fischer
4795e2fbc4 increment network serial on peer meta changed 2025-06-19 12:46:50 +02:00
Pascal Fischer
ec57c685a9 increment network serial on peer meta changed 2025-06-19 12:41:03 +02:00
Pascal Fischer
4b44b8c46c increment network serial 2025-06-19 12:14:26 +02:00
Pascal Fischer
0c7cac81f0 use update buffer instead of channel 2025-06-17 15:44:14 +02:00
Pascal Fischer
5e9ea122f7 limit channel to 2 messages and drop outdated if needed 2025-06-17 15:00:23 +02:00
41 changed files with 1063 additions and 298 deletions

View File

@@ -9,7 +9,7 @@ on:
pull_request: pull_request:
env: env:
SIGN_PIPE_VER: "v0.0.18" SIGN_PIPE_VER: "v0.0.19"
GORELEASER_VER: "v2.3.2" GORELEASER_VER: "v2.3.2"
PRODUCT_NAME: "NetBird" PRODUCT_NAME: "NetBird"
COPYRIGHT: "NetBird GmbH" COPYRIGHT: "NetBird GmbH"

View File

@@ -1,6 +1,9 @@
FROM alpine:3.21.3 FROM alpine:3.21.3
# iproute2: busybox doesn't display ip rules properly # iproute2: busybox doesn't display ip rules properly
RUN apk add --no-cache ca-certificates ip6tables iproute2 iptables RUN apk add --no-cache ca-certificates ip6tables iproute2 iptables
ARG NETBIRD_BINARY=netbird
COPY ${NETBIRD_BINARY} /usr/local/bin/netbird
ENV NB_FOREGROUND_MODE=true ENV NB_FOREGROUND_MODE=true
ENTRYPOINT [ "/usr/local/bin/netbird","up"] ENTRYPOINT [ "/usr/local/bin/netbird","up"]
COPY netbird /usr/local/bin/netbird

View File

@@ -1,6 +1,7 @@
FROM alpine:3.21.0 FROM alpine:3.21.0
COPY netbird /usr/local/bin/netbird ARG NETBIRD_BINARY=netbird
COPY ${NETBIRD_BINARY} /usr/local/bin/netbird
RUN apk add --no-cache ca-certificates \ RUN apk add --no-cache ca-certificates \
&& adduser -D -h /var/lib/netbird netbird && adduser -D -h /var/lib/netbird netbird

View File

@@ -83,7 +83,6 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
} }
t.Cleanup(cleanUp) t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
if err != nil { if err != nil {
return nil, nil return nil, nil
@@ -92,6 +91,9 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err) require.NoError(t, err)
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)

View File

@@ -175,7 +175,7 @@ func (e *ConnMgr) AddPeerConn(ctx context.Context, peerKey string, conn *peer.Co
PeerConnID: conn.ConnID(), PeerConnID: conn.ConnID(),
Log: conn.Log, Log: conn.Log,
} }
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg) excluded, err := e.lazyConnMgr.AddPeer(e.lazyCtx, lazyPeerCfg)
if err != nil { if err != nil {
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err) conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
if err := conn.Open(ctx); err != nil { if err := conn.Open(ctx); err != nil {

View File

@@ -1456,16 +1456,16 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
} }
t.Cleanup(cleanUp) t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
if err != nil { if err != nil {
return nil, "", err return nil, "", err
} }
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore) ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -68,3 +68,8 @@ func (i *Monitor) PauseTimer() {
func (i *Monitor) ResetTimer() { func (i *Monitor) ResetTimer() {
i.timer.Reset(i.inactivityThreshold) i.timer.Reset(i.inactivityThreshold)
} }
func (i *Monitor) ResetMonitor(ctx context.Context, timeoutChan chan peer.ConnID) {
i.Stop()
go i.Start(ctx, timeoutChan)
}

View File

@@ -58,7 +58,7 @@ type Manager struct {
// Route HA group management // Route HA group management
peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to
haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group
routesMu sync.RWMutex // protects route mappings routesMu sync.RWMutex
onInactive chan peerid.ConnID onInactive chan peerid.ConnID
} }
@@ -146,7 +146,7 @@ func (m *Manager) Start(ctx context.Context) {
case peerConnID := <-m.activityManager.OnActivityChan: case peerConnID := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, peerConnID) m.onPeerActivity(ctx, peerConnID)
case peerConnID := <-m.onInactive: case peerConnID := <-m.onInactive:
m.onPeerInactivityTimedOut(peerConnID) m.onPeerInactivityTimedOut(ctx, peerConnID)
} }
} }
} }
@@ -197,7 +197,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo
return added return added
} }
func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) { func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (bool, error) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -225,6 +225,13 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
peerCfg: &peerCfg, peerCfg: &peerCfg,
expectedWatcher: watcherActivity, expectedWatcher: watcherActivity,
} }
// Check if this peer should be activated because its HA group peers are active
if group, ok := m.shouldActivateNewPeer(peerCfg.PublicKey); ok {
peerCfg.Log.Debugf("peer belongs to active HA group %s, will activate immediately", group)
m.activateNewPeerInActiveGroup(ctx, peerCfg)
}
return false, nil return false, nil
} }
@@ -315,36 +322,38 @@ func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConf
// activateHAGroupPeers activates all peers in HA groups that the given peer belongs to // activateHAGroupPeers activates all peers in HA groups that the given peer belongs to
func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) { func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) {
var peersToActivate []string
m.routesMu.RLock() m.routesMu.RLock()
haGroups := m.peerToHAGroups[triggerPeerID] haGroups := m.peerToHAGroups[triggerPeerID]
m.routesMu.RUnlock()
if len(haGroups) == 0 { if len(haGroups) == 0 {
m.routesMu.RUnlock()
log.Debugf("peer %s is not part of any HA groups", triggerPeerID) log.Debugf("peer %s is not part of any HA groups", triggerPeerID)
return return
} }
activatedCount := 0
for _, haGroup := range haGroups { for _, haGroup := range haGroups {
m.routesMu.RLock()
peers := m.haGroupToPeers[haGroup] peers := m.haGroupToPeers[haGroup]
m.routesMu.RUnlock()
for _, peerID := range peers { for _, peerID := range peers {
if peerID == triggerPeerID { if peerID != triggerPeerID {
continue peersToActivate = append(peersToActivate, peerID)
} }
}
}
m.routesMu.RUnlock()
cfg, mp := m.getPeerForActivation(peerID) activatedCount := 0
if cfg == nil { for _, peerID := range peersToActivate {
continue cfg, mp := m.getPeerForActivation(peerID)
} if cfg == nil {
continue
}
if m.activateSinglePeer(ctx, cfg, mp) { if m.activateSinglePeer(ctx, cfg, mp) {
activatedCount++ activatedCount++
cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID) cfg.Log.Infof("activated peer as part of HA group (triggered by %s)", triggerPeerID)
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey) m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
}
} }
} }
@@ -354,6 +363,51 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string
} }
} }
// shouldActivateNewPeer checks if a newly added peer should be activated
// because other peers in its HA groups are already active
func (m *Manager) shouldActivateNewPeer(peerID string) (route.HAUniqueID, bool) {
m.routesMu.RLock()
defer m.routesMu.RUnlock()
haGroups := m.peerToHAGroups[peerID]
if len(haGroups) == 0 {
return "", false
}
for _, haGroup := range haGroups {
peers := m.haGroupToPeers[haGroup]
for _, groupPeerID := range peers {
if groupPeerID == peerID {
continue
}
cfg, ok := m.managedPeers[groupPeerID]
if !ok {
continue
}
if mp, ok := m.managedPeersByConnID[cfg.PeerConnID]; ok && mp.expectedWatcher == watcherInactivity {
return haGroup, true
}
}
}
return "", false
}
// activateNewPeerInActiveGroup activates a newly added peer that should be active due to HA group
func (m *Manager) activateNewPeerInActiveGroup(ctx context.Context, peerCfg lazyconn.PeerConfig) {
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
if !ok {
return
}
if !m.activateSinglePeer(ctx, &peerCfg, mp) {
return
}
peerCfg.Log.Infof("activated newly added peer due to active HA group peers")
m.peerStore.PeerConnOpen(m.engineCtx, peerCfg.PublicKey)
}
func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error { func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error {
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok { if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
peerCfg.Log.Warnf("peer already managed") peerCfg.Log.Warnf("peer already managed")
@@ -415,6 +469,48 @@ func (m *Manager) close() {
log.Infof("lazy connection manager closed") log.Infof("lazy connection manager closed")
} }
// shouldDeferIdleForHA checks if peer should stay connected due to HA group requirements
func (m *Manager) shouldDeferIdleForHA(peerID string) bool {
m.routesMu.RLock()
defer m.routesMu.RUnlock()
haGroups := m.peerToHAGroups[peerID]
if len(haGroups) == 0 {
return false
}
for _, haGroup := range haGroups {
groupPeers := m.haGroupToPeers[haGroup]
for _, groupPeerID := range groupPeers {
if groupPeerID == peerID {
continue
}
cfg, ok := m.managedPeers[groupPeerID]
if !ok {
continue
}
groupMp, ok := m.managedPeersByConnID[cfg.PeerConnID]
if !ok {
continue
}
if groupMp.expectedWatcher != watcherInactivity {
continue
}
// Other member is still connected, defer idle
if peer, ok := m.peerStore.PeerConn(groupPeerID); ok && peer.IsConnected() {
return true
}
}
}
return false
}
func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) { func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -441,7 +537,7 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID)
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey) m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
} }
func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) { func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peerid.ConnID) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -456,6 +552,17 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
return return
} }
if m.shouldDeferIdleForHA(mp.peerCfg.PublicKey) {
iw, ok := m.inactivityMonitors[peerConnID]
if ok {
mp.peerCfg.Log.Debugf("resetting inactivity timer due to HA group requirements")
iw.ResetMonitor(ctx, m.onInactive)
} else {
mp.peerCfg.Log.Errorf("inactivity monitor not found for HA defer reset")
}
return
}
mp.peerCfg.Log.Infof("connection timed out") mp.peerCfg.Log.Infof("connection timed out")
// this is blocking operation, potentially can be optimized // this is blocking operation, potentially can be optimized
@@ -489,7 +596,7 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) {
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID] iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID]
if !ok { if !ok {
mp.peerCfg.Log.Errorf("inactivity monitor not found for peer") mp.peerCfg.Log.Warnf("inactivity monitor not found for peer")
return return
} }

View File

@@ -317,12 +317,12 @@ func (conn *Conn) WgConfig() WgConfig {
return conn.config.WgConfig return conn.config.WgConfig
} }
// IsConnected unit tests only // IsConnected returns true if the peer is connected
// refactor unit test to use status recorder use refactor status recorded to manage connection status in peer.Conn
func (conn *Conn) IsConnected() bool { func (conn *Conn) IsConnected() bool {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
return conn.currentConnPriority != conntype.None
return conn.evalStatus() == StatusConnected
} }
func (conn *Conn) GetKey() string { func (conn *Conn) GetKey() string {

View File

@@ -1,8 +1,10 @@
<Wix <Wix
xmlns="http://wixtoolset.org/schemas/v4/wxs"> xmlns="http://wixtoolset.org/schemas/v4/wxs"
xmlns:util="http://wixtoolset.org/schemas/v4/wxs/util">
<Package Name="NetBird" Version="$(env.NETBIRD_VERSION)" Manufacturer="NetBird GmbH" Language="1033" UpgradeCode="6456ec4e-3ad6-4b9b-a2be-98e81cb21ccf" <Package Name="NetBird" Version="$(env.NETBIRD_VERSION)" Manufacturer="NetBird GmbH" Language="1033" UpgradeCode="6456ec4e-3ad6-4b9b-a2be-98e81cb21ccf"
InstallerVersion="500" Compressed="yes" Codepage="utf-8" > InstallerVersion="500" Compressed="yes" Codepage="utf-8" >
<MediaTemplate EmbedCab="yes" /> <MediaTemplate EmbedCab="yes" />
<Feature Id="NetbirdFeature" Title="Netbird" Level="1"> <Feature Id="NetbirdFeature" Title="Netbird" Level="1">
@@ -46,29 +48,10 @@
<ComponentRef Id="NetbirdFiles" /> <ComponentRef Id="NetbirdFiles" />
</ComponentGroup> </ComponentGroup>
<Property Id="cmd" Value="cmd.exe"/> <util:CloseApplication Id="CloseNetBird" CloseMessage="no" Target="netbird.exe" RebootPrompt="no" />
<util:CloseApplication Id="CloseNetBirdUI" CloseMessage="no" Target="netbird-ui.exe" RebootPrompt="no" />
<CustomAction Id="KillDaemon"
ExeCommand='/c "taskkill /im netbird.exe"'
Execute="deferred"
Property="cmd"
Impersonate="no"
Return="ignore"
/>
<CustomAction Id="KillUI"
ExeCommand='/c "taskkill /im netbird-ui.exe"'
Execute="deferred"
Property="cmd"
Impersonate="no"
Return="ignore"
/>
<InstallExecuteSequence>
<!-- For Uninstallation -->
<Custom Action="KillDaemon" Before="RemoveFiles" Condition="Installed"/>
<Custom Action="KillUI" After="KillDaemon" Condition="Installed"/>
</InstallExecuteSequence>
<!-- Icons --> <!-- Icons -->
<Icon Id="NetbirdIcon" SourceFile=".\client\ui\assets\netbird.ico" /> <Icon Id="NetbirdIcon" SourceFile=".\client\ui\assets\netbird.ico" />

View File

@@ -191,16 +191,16 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
} }
t.Cleanup(cleanUp) t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
if err != nil { if err != nil {
return nil, "", err return nil, "", err
} }
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore) ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -280,7 +280,7 @@ func newServiceClient(addr string, logFile string, a fyne.App, showSettings bool
showAdvancedSettings: showSettings, showAdvancedSettings: showSettings,
showNetworks: showNetworks, showNetworks: showNetworks,
update: version.NewUpdate(), update: version.NewUpdate("nb/client-ui"),
} }
s.eventHandler = newEventHandler(s) s.eventHandler = newEventHandler(s)
@@ -879,7 +879,7 @@ func (s *serviceClient) onUpdateAvailable() {
func (s *serviceClient) onSessionExpire() { func (s *serviceClient) onSessionExpire() {
s.sendNotification = true s.sendNotification = true
if s.sendNotification { if s.sendNotification {
s.eventHandler.runSelfCommand("login-url", "true") s.eventHandler.runSelfCommand(s.ctx, "login-url", "true")
s.sendNotification = false s.sendNotification = false
} }
} }
@@ -992,21 +992,6 @@ func (s *serviceClient) restartClient(loginRequest *proto.LoginRequest) error {
// showLoginURL creates a borderless window styled like a pop-up in the top-right corner using s.wLoginURL. // showLoginURL creates a borderless window styled like a pop-up in the top-right corner using s.wLoginURL.
func (s *serviceClient) showLoginURL() { func (s *serviceClient) showLoginURL() {
resp, err := s.login(false)
if err != nil {
log.Errorf("failed to fetch login URL: %v", err)
return
}
verificationURL := resp.VerificationURIComplete
if verificationURL == "" {
verificationURL = resp.VerificationURI
}
if verificationURL == "" {
log.Error("no verification URL provided in the login response")
return
}
resIcon := fyne.NewStaticResource("netbird.png", iconAbout) resIcon := fyne.NewStaticResource("netbird.png", iconAbout)
if s.wLoginURL == nil { if s.wLoginURL == nil {
@@ -1025,6 +1010,21 @@ func (s *serviceClient) showLoginURL() {
return return
} }
resp, err := s.login(false)
if err != nil {
log.Errorf("failed to fetch login URL: %v", err)
return
}
verificationURL := resp.VerificationURIComplete
if verificationURL == "" {
verificationURL = resp.VerificationURI
}
if verificationURL == "" {
log.Error("no verification URL provided in the login response")
return
}
if err := openURL(verificationURL); err != nil { if err := openURL(verificationURL); err != nil {
log.Errorf("failed to open login URL: %v", err) log.Errorf("failed to open login URL: %v", err)
return return
@@ -1038,7 +1038,19 @@ func (s *serviceClient) showLoginURL() {
} }
label.SetText("Re-authentication successful.\nReconnecting") label.SetText("Re-authentication successful.\nReconnecting")
time.Sleep(300 * time.Millisecond) status, err := conn.Status(s.ctx, &proto.StatusRequest{})
if err != nil {
log.Errorf("get service status: %v", err)
return
}
if status.Status == string(internal.StatusConnected) {
label.SetText("Already connected.\nClosing this window.")
time.Sleep(2 * time.Second)
s.wLoginURL.Close()
return
}
_, err = conn.Up(s.ctx, &proto.UpRequest{}) _, err = conn.Up(s.ctx, &proto.UpRequest{})
if err != nil { if err != nil {
label.SetText("Reconnecting failed, please create \na debug bundle in the settings and contact support.") label.SetText("Reconnecting failed, please create \na debug bundle in the settings and contact support.")

View File

@@ -122,7 +122,7 @@ func (h *eventHandler) handleAdvancedSettingsClick() {
go func() { go func() {
defer h.client.mAdvancedSettings.Enable() defer h.client.mAdvancedSettings.Enable()
defer h.client.getSrvConfig() defer h.client.getSrvConfig()
h.runSelfCommand("settings", "true") h.runSelfCommand(h.client.ctx, "settings", "true")
}() }()
} }
@@ -130,7 +130,7 @@ func (h *eventHandler) handleCreateDebugBundleClick() {
h.client.mCreateDebugBundle.Disable() h.client.mCreateDebugBundle.Disable()
go func() { go func() {
defer h.client.mCreateDebugBundle.Enable() defer h.client.mCreateDebugBundle.Enable()
h.runSelfCommand("debug", "true") h.runSelfCommand(h.client.ctx, "debug", "true")
}() }()
} }
@@ -154,7 +154,7 @@ func (h *eventHandler) handleNetworksClick() {
h.client.mNetworks.Disable() h.client.mNetworks.Disable()
go func() { go func() {
defer h.client.mNetworks.Enable() defer h.client.mNetworks.Enable()
h.runSelfCommand("networks", "true") h.runSelfCommand(h.client.ctx, "networks", "true")
}() }()
} }
@@ -172,14 +172,14 @@ func (h *eventHandler) updateConfigWithErr() {
} }
} }
func (h *eventHandler) runSelfCommand(command, arg string) { func (h *eventHandler) runSelfCommand(ctx context.Context, command, arg string) {
proc, err := os.Executable() proc, err := os.Executable()
if err != nil { if err != nil {
log.Errorf("error getting executable path: %v", err) log.Errorf("error getting executable path: %v", err)
return return
} }
cmd := exec.Command(proc, cmd := exec.CommandContext(ctx, proc,
fmt.Sprintf("--%s=%s", command, arg), fmt.Sprintf("--%s=%s", command, arg),
fmt.Sprintf("--daemon-addr=%s", h.client.addr), fmt.Sprintf("--daemon-addr=%s", h.client.addr),
) )

View File

@@ -60,7 +60,7 @@ NETBIRD_TOKEN_SOURCE=${NETBIRD_TOKEN_SOURCE:-accessToken}
NETBIRD_AUTH_PKCE_REDIRECT_URL_PORTS=${NETBIRD_AUTH_PKCE_REDIRECT_URL_PORTS:-"53000"} NETBIRD_AUTH_PKCE_REDIRECT_URL_PORTS=${NETBIRD_AUTH_PKCE_REDIRECT_URL_PORTS:-"53000"}
NETBIRD_AUTH_PKCE_USE_ID_TOKEN=${NETBIRD_AUTH_PKCE_USE_ID_TOKEN:-false} NETBIRD_AUTH_PKCE_USE_ID_TOKEN=${NETBIRD_AUTH_PKCE_USE_ID_TOKEN:-false}
NETBIRD_AUTH_PKCE_DISABLE_PROMPT_LOGIN=${NETBIRD_AUTH_PKCE_DISABLE_PROMPT_LOGIN:-false} NETBIRD_AUTH_PKCE_DISABLE_PROMPT_LOGIN=${NETBIRD_AUTH_PKCE_DISABLE_PROMPT_LOGIN:-false}
NETBIRD_AUTH_PKCE_LOGIN_FLAG=${NETBIRD_AUTH_PKCE_LOGIN_FLAG:-1} NETBIRD_AUTH_PKCE_LOGIN_FLAG=${NETBIRD_AUTH_PKCE_LOGIN_FLAG:-0}
NETBIRD_AUTH_PKCE_AUDIENCE=$NETBIRD_AUTH_AUDIENCE NETBIRD_AUTH_PKCE_AUDIENCE=$NETBIRD_AUTH_AUDIENCE
# Dashboard # Dashboard

View File

@@ -68,13 +68,13 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
} }
t.Cleanup(cleanUp) t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err) require.NoError(t, err)
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -357,6 +357,13 @@ var (
log.WithContext(ctx).Infof("running HTTP server and gRPC server on the same port: %s", listener.Addr().String()) log.WithContext(ctx).Infof("running HTTP server and gRPC server on the same port: %s", listener.Addr().String())
serveGRPCWithHTTP(ctx, listener, rootHandler, tlsEnabled) serveGRPCWithHTTP(ctx, listener, rootHandler, tlsEnabled)
update := version.NewUpdate("nb/management")
update.SetDaemonVersion(version.NetbirdVersion())
update.SetOnUpdateListener(func() {
log.WithContext(ctx).Infof("your management version, \"%s\", is outdated, a new management version is available. Learn more here: https://github.com/netbirdio/netbird/releases", version.NetbirdVersion())
})
defer update.StopWatch()
SetupCloseHandler() SetupCloseHandler()
<-stopCh <-stopCh

View File

@@ -1624,6 +1624,10 @@ func (am *DefaultAccountManager) GetDNSDomain(settings *types.Settings) string {
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) { func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID) log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
am.BufferUpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
@@ -1853,40 +1857,49 @@ func (am *DefaultAccountManager) GetOrCreateAccountByPrivateDomain(ctx context.C
} }
func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) { func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) {
account, err := am.Store.GetAccount(ctx, accountId) var account *types.Account
err := am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
var err error
account, err = transaction.GetAccount(ctx, accountId)
if err != nil {
return err
}
if account.IsDomainPrimaryAccount {
return nil
}
existingPrimaryAccountID, err := transaction.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain)
// error is not a not found error
if handleNotFound(err) != nil {
return err
}
// a primary account already exists for this private domain
if err == nil {
log.WithContext(ctx).WithFields(log.Fields{
"accountId": accountId,
"existingAccountId": existingPrimaryAccountID,
}).Errorf("cannot update account to primary, another account already exists as primary for the same domain")
return status.Errorf(status.Internal, "cannot update account to primary")
}
account.IsDomainPrimaryAccount = true
if err := transaction.SaveAccount(ctx, account); err != nil {
log.WithContext(ctx).WithFields(log.Fields{
"accountId": accountId,
}).Errorf("failed to update account to primary: %v", err)
return status.Errorf(status.Internal, "failed to update account to primary")
}
return nil
})
if err != nil { if err != nil {
return nil, err return nil, err
} }
if account.IsDomainPrimaryAccount {
return account, nil
}
existingPrimaryAccountID, err := am.Store.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain)
// error is not a not found error
if handleNotFound(err) != nil {
return nil, err
}
// a primary account already exists for this private domain
if err == nil {
log.WithContext(ctx).WithFields(log.Fields{
"accountId": accountId,
"existingAccountId": existingPrimaryAccountID,
}).Errorf("cannot update account to primary, another account already exists as primary for the same domain")
return nil, status.Errorf(status.Internal, "cannot update account to primary")
}
account.IsDomainPrimaryAccount = true
if err := am.Store.SaveAccount(ctx, account); err != nil {
log.WithContext(ctx).WithFields(log.Fields{
"accountId": accountId,
}).Errorf("failed to update account to primary: %v", err)
return nil, status.Errorf(status.Internal, "failed to update account to primary")
}
return account, nil return account, nil
} }

View File

@@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
nbdns "github.com/netbirdio/netbird/dns" nbdns "github.com/netbirdio/netbird/dns"
@@ -1186,7 +1187,10 @@ func TestAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap() networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 { if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers)) t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1213,7 +1217,10 @@ func TestAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap() networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 { if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers)) t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
@@ -1249,7 +1256,10 @@ func TestAccountManager_NetworkUpdates_SavePolicy(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap() networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 { if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers)) t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1314,7 +1324,10 @@ func TestAccountManager_NetworkUpdates_DeletePeer(t *testing.T) {
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap() networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 1 { if len(networkMap.RemotePeers) != 1 {
t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers)) t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers))
@@ -1365,15 +1378,24 @@ func TestAccountManager_NetworkUpdates_DeleteGroup(t *testing.T) {
return return
} }
// emptying buffer of previous changes
_, _ = updMsg.Pop(context.Background())
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
message := <-updMsg // expecting 2 messages (policy delete and group delete)
networkMap := message.Update.GetNetworkMap() for i := 0; i < 1; i++ {
if len(networkMap.RemotePeers) != 0 { message, ok := updMsg.Pop(context.Background())
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers)) if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
}
} }
}() }()
@@ -2879,7 +2901,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -2960,23 +2982,58 @@ func setupNetworkMapTest(t *testing.T) (*DefaultAccountManager, *types.Account,
return manager, account, peer1, peer2, peer3 return manager, account, peer1, peer2, peer3
} }
func peerShouldNotReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) { func peerShouldNotReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
t.Helper() t.Helper()
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
select { select {
case msg := <-updateMessage: case msg := <-resultCh:
if !msg.ok {
t.Errorf("Update message channel closed unexpectedly")
}
t.Errorf("Unexpected message received: %+v", msg) t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
return return
} }
} }
func peerShouldReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) { func peerShouldReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
t.Helper() t.Helper()
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
select { select {
case msg := <-updateMessage: case msg := <-resultCh:
if msg == nil { if !msg.ok {
t.Errorf("Update message channel closed unexpectedly")
return
}
if msg.msg == nil {
t.Errorf("Received nil update message, expected valid message") t.Errorf("Received nil update message, expected valid message")
return
} }
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
t.Error("Timed out waiting for update message") t.Error("Timed out waiting for update message")
@@ -3017,9 +3074,13 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
if err != nil { if err != nil {
b.Fatalf("Failed to get account: %v", err) b.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels
@@ -3085,9 +3146,13 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
if err != nil { if err != nil {
b.Fatalf("Failed to get account: %v", err) b.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels
@@ -3160,9 +3225,13 @@ func BenchmarkLoginPeer_NewPeer(b *testing.B) {
if err != nil { if err != nil {
b.Fatalf("Failed to get account: %v", err) b.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels

View File

@@ -217,7 +217,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
} }
func createDNSStore(t *testing.T) (store.Store, error) { func createDNSStore(t *testing.T) (store.Store, error) {
@@ -507,13 +507,12 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Saving DNS settings with groups that have no peers should not trigger updates to account peers or send peer updates // Saving DNS settings with groups that have no peers should not trigger updates to account peers or send peer updates
t.Run("saving dns setting with unused groups", func(t *testing.T) { t.Run("saving dns setting with unused groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -534,6 +533,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Creating DNS settings with groups that have no peers should not update account peers or send peer update // Creating DNS settings with groups that have no peers should not update account peers or send peer update
t.Run("creating dns setting with unused groups", func(t *testing.T) { t.Run("creating dns setting with unused groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -560,6 +563,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Creating DNS settings with groups that have peers should update account peers and send peer update // Creating DNS settings with groups that have peers should update account peers and send peer update
t.Run("creating dns setting with used groups", func(t *testing.T) { t.Run("creating dns setting with used groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{ err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
ID: "groupA", ID: "groupA",
Name: "GroupA", Name: "GroupA",
@@ -567,6 +574,8 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
_, _ = updMsg.Pop(context.Background())
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -593,6 +602,18 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Saving DNS settings with groups that have peers should update account peers and send peer update // Saving DNS settings with groups that have peers should update account peers and send peer update
t.Run("saving dns setting with used groups", func(t *testing.T) { t.Run("saving dns setting with used groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
ID: "groupA",
Name: "GroupA",
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
}, true)
assert.NoError(t, err)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -613,6 +634,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Removing group with no peers from DNS settings should not trigger updates to account peers or send peer updates // Removing group with no peers from DNS settings should not trigger updates to account peers or send peer updates
t.Run("removing group with no peers from dns settings", func(t *testing.T) { t.Run("removing group with no peers from dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -633,6 +658,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Removing group with peers from DNS settings should trigger updates to account peers and send peer updates // Removing group with peers from DNS settings should trigger updates to account peers and send peer updates
t.Run("removing group with peers from dns settings", func(t *testing.T) { t.Run("removing group with peers from dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)

View File

@@ -429,13 +429,12 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Saving a group that is not linked to any resource should not update account peers // Saving a group that is not linked to any resource should not update account peers
t.Run("saving unlinked group", func(t *testing.T) { t.Run("saving unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -459,6 +458,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Adding a peer to a group that is not linked to any resource should not update account peers // Adding a peer to a group that is not linked to any resource should not update account peers
// and not send peer update // and not send peer update
t.Run("adding peer to unlinked group", func(t *testing.T) { t.Run("adding peer to unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -478,6 +481,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Removing a peer from a group that is not linked to any resource should not update account peers // Removing a peer from a group that is not linked to any resource should not update account peers
// and not send peer update // and not send peer update
t.Run("removing peer from unliked group", func(t *testing.T) { t.Run("removing peer from unliked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -496,6 +503,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Deleting group should not update account peers and not send peer update // Deleting group should not update account peers and not send peer update
t.Run("deleting group", func(t *testing.T) { t.Run("deleting group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -529,6 +540,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to policy should update account peers and send peer update // Saving a group linked to policy should update account peers and send peer update
t.Run("saving linked group to policy", func(t *testing.T) { t.Run("saving linked group to policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -551,6 +566,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// adding peer to a used group should update account peers and send peer update // adding peer to a used group should update account peers and send peer update
t.Run("adding peer to linked group", func(t *testing.T) { t.Run("adding peer to linked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -569,6 +588,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// removing peer from a linked group should update account peers and send peer update // removing peer from a linked group should update account peers and send peer update
t.Run("removing peer from linked group", func(t *testing.T) { t.Run("removing peer from linked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -587,6 +610,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to name server group should update account peers and send peer update // Saving a group linked to name server group should update account peers and send peer update
t.Run("saving group linked to name server group", func(t *testing.T) { t.Run("saving group linked to name server group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.CreateNameServerGroup( _, err = manager.CreateNameServerGroup(
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{ context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
IP: netip.MustParseAddr("1.1.1.1"), IP: netip.MustParseAddr("1.1.1.1"),
@@ -620,6 +647,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to route should update account peers and send peer update // Saving a group linked to route should update account peers and send peer update
t.Run("saving group linked to route", func(t *testing.T) { t.Run("saving group linked to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
newRoute := route.Route{ newRoute := route.Route{
ID: "route", ID: "route",
Network: netip.MustParsePrefix("192.168.0.0/16"), Network: netip.MustParsePrefix("192.168.0.0/16"),
@@ -661,6 +692,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to dns settings should update account peers and send peer update // Saving a group linked to dns settings should update account peers and send peer update
t.Run("saving group linked to dns settings", func(t *testing.T) { t.Run("saving group linked to dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err := manager.SaveDNSSettings(context.Background(), account.Id, userID, &types.DNSSettings{ err := manager.SaveDNSSettings(context.Background(), account.Id, userID, &types.DNSSettings{
DisabledManagementGroups: []string{"groupD"}, DisabledManagementGroups: []string{"groupD"},
}) })
@@ -688,6 +723,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to network router should update account peers and send peer update // Saving a group linked to network router should update account peers and send peer update
t.Run("saving group linked to network router", func(t *testing.T) { t.Run("saving group linked to network router", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
permissionsManager := permissions.NewManager(manager.Store) permissionsManager := permissions.NewManager(manager.Store)
groupsManager := groups.NewManager(manager.Store, permissionsManager, manager) groupsManager := groups.NewManager(manager.Store, permissionsManager, manager)
resourcesManager := resources.NewManager(manager.Store, permissionsManager, groupsManager, manager) resourcesManager := resources.NewManager(manager.Store, permissionsManager, groupsManager, manager)

View File

@@ -184,7 +184,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return err return err
} }
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID) updateBuffer := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
s.ephemeralManager.OnPeerConnected(ctx, peer) s.ephemeralManager.OnPeerConnected(ctx, peer)
@@ -199,37 +199,24 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart)) log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv) return s.handleUpdates(ctx, accountID, peerKey, peer, updateBuffer, srv)
} }
// handleUpdates sends updates to the connected peer until the updates channel is closed. // handleUpdates sends updates to the connected peer until the updates channel is closed.
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error { func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates *UpdateBuffer, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String()) log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
for { for {
select { update, ok := updates.Pop(ctx)
// condition when there are some updates if !ok {
case update, open := <-updates: log.WithContext(ctx).Debugf("update buffer for peer %s closed", peerKey.String())
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1)
}
if !open {
log.WithContext(ctx).Debugf("updates channel for peer %s was closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer)
return nil
}
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
// condition when client <-> server connection has been terminated
case <-srv.Context().Done():
// happens when connection drops, e.g. client disconnects
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer) s.cancelPeerRoutines(ctx, accountID, peer)
return srv.Context().Err() return nil
}
log.WithContext(ctx).Debugf("sending latest update to peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
} }
} }
} }

View File

@@ -118,7 +118,7 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
t.Fatalf("Failed to create metrics: %v", err) t.Fatalf("Failed to create metrics: %v", err)
} }
peersUpdateManager := server.NewPeersUpdateManager(nil) peersUpdateManager := server.NewPeersUpdateManager(metrics)
updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId) updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId)
done := make(chan struct{}) done := make(chan struct{})
if validateUpdate { if validateUpdate {
@@ -166,24 +166,54 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
return apiHandler, am, done return apiHandler, am, done
} }
func peerShouldNotReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage) { func peerShouldNotReceiveUpdate(t TB, buffer *server.UpdateBuffer) {
t.Helper() t.Helper()
resultCh := make(chan struct {
msg *server.UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *server.UpdateMessage
ok bool
}{msg, ok}
}()
select { select {
case msg := <-updateMessage: case msg := <-resultCh:
t.Errorf("Unexpected message received: %+v", msg) t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
return return
} }
} }
func peerShouldReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage, expected *server.UpdateMessage) { func peerShouldReceiveUpdate(t TB, buffer *server.UpdateBuffer, expected *server.UpdateMessage) {
t.Helper() t.Helper()
resultCh := make(chan struct {
msg *server.UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *server.UpdateMessage
ok bool
}{msg, ok}
}()
select { select {
case msg := <-updateMessage: case msg := <-resultCh:
if msg == nil { if msg.msg == nil {
t.Errorf("Received nil update message, expected valid message") t.Errorf("Received nil update message, expected valid message")
} }
if !msg.ok {
t.Errorf("Expected to receive an update message, but got ok = false")
}
assert.Equal(t, expected, msg) assert.Equal(t, expected, msg)
case <-time.After(500 * time.Millisecond): case <-time.After(500 * time.Millisecond):
t.Errorf("Timed out waiting for update message") t.Errorf("Timed out waiting for update message")

View File

@@ -37,21 +37,23 @@ func (am *DefaultAccountManager) UpdateIntegratedValidatorGroups(ctx context.Con
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID) unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock() defer unlock()
a, err := am.Store.GetAccountByUser(ctx, userID) return am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
if err != nil { a, err := transaction.GetAccountByUser(ctx, userID)
return err if err != nil {
} return err
}
var extra *types.ExtraSettings var extra *types.ExtraSettings
if a.Settings.Extra != nil { if a.Settings.Extra != nil {
extra = a.Settings.Extra extra = a.Settings.Extra
} else { } else {
extra = &types.ExtraSettings{} extra = &types.ExtraSettings{}
a.Settings.Extra = extra a.Settings.Extra = extra
} }
extra.IntegratedValidatorGroups = groups extra.IntegratedValidatorGroups = groups
return am.Store.SaveAccount(ctx, a) return transaction.SaveAccount(ctx, a)
})
} }
func (am *DefaultAccountManager) GroupValidation(ctx context.Context, accountID string, groupIDs []string) (bool, error) { func (am *DefaultAccountManager) GroupValidation(ctx context.Context, accountID string, groupIDs []string) (bool, error) {
@@ -81,15 +83,12 @@ func (am *DefaultAccountManager) GetValidatedPeers(ctx context.Context, accountI
var peers []*nbpeer.Peer var peers []*nbpeer.Peer
var settings *types.Settings var settings *types.Settings
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error { groups, err = am.Store.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
groups, err = transaction.GetAccountGroups(ctx, store.LockingStrengthShare, accountID) if err != nil {
if err != nil { return nil, err
return err }
}
peers, err = transaction.GetAccountPeers(ctx, store.LockingStrengthShare, accountID, "", "") peers, err = am.Store.GetAccountPeers(ctx, store.LockingStrengthShare, accountID, "", "")
return err
})
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -424,14 +424,14 @@ func startManagementForTest(t *testing.T, testFile string, config *types.Config)
t.Fatal(err) t.Fatal(err)
} }
peersUpdateManager := NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{} eventStore := &activity.InMemoryEventStore{}
ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -173,14 +173,14 @@ func startServer(
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err) log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
} }
peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
metrics, err := telemetry.NewDefaultAppMetrics(context.Background()) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil { if err != nil {
t.Fatalf("failed creating metrics: %v", err) t.Fatalf("failed creating metrics: %v", err)
} }
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -184,7 +184,9 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
ephemeralPeersSKs int ephemeralPeersSKs int
ephemeralPeersSKUsage int ephemeralPeersSKUsage int
activePeersLastDay int activePeersLastDay int
activeUserPeersLastDay int
osPeers map[string]int osPeers map[string]int
activeUsersLastDay map[string]struct{}
userPeers int userPeers int
rules int rules int
rulesProtocol map[string]int rulesProtocol map[string]int
@@ -203,6 +205,7 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
version string version string
peerActiveVersions []string peerActiveVersions []string
osUIClients map[string]int osUIClients map[string]int
rosenpassEnabled int
) )
start := time.Now() start := time.Now()
metricsProperties := make(properties) metricsProperties := make(properties)
@@ -210,6 +213,7 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
osUIClients = make(map[string]int) osUIClients = make(map[string]int)
rulesProtocol = make(map[string]int) rulesProtocol = make(map[string]int)
rulesDirection = make(map[string]int) rulesDirection = make(map[string]int)
activeUsersLastDay = make(map[string]struct{})
uptime = time.Since(w.startupTime).Seconds() uptime = time.Since(w.startupTime).Seconds()
connections := w.connManager.GetAllConnectedPeers() connections := w.connManager.GetAllConnectedPeers()
version = nbversion.NetbirdVersion() version = nbversion.NetbirdVersion()
@@ -277,10 +281,14 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
for _, peer := range account.Peers { for _, peer := range account.Peers {
peers++ peers++
if peer.SSHEnabled { if peer.SSHEnabled || peer.Meta.Flags.ServerSSHAllowed {
peersSSHEnabled++ peersSSHEnabled++
} }
if peer.Meta.Flags.RosenpassEnabled {
rosenpassEnabled++
}
if peer.UserID != "" { if peer.UserID != "" {
userPeers++ userPeers++
} }
@@ -299,6 +307,10 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
_, connected := connections[peer.ID] _, connected := connections[peer.ID]
if connected || peer.Status.LastSeen.After(w.lastRun) { if connected || peer.Status.LastSeen.After(w.lastRun) {
activePeersLastDay++ activePeersLastDay++
if peer.UserID != "" {
activeUserPeersLastDay++
activeUsersLastDay[peer.UserID] = struct{}{}
}
osActiveKey := osKey + "_active" osActiveKey := osKey + "_active"
osActiveCount := osPeers[osActiveKey] osActiveCount := osPeers[osActiveKey]
osPeers[osActiveKey] = osActiveCount + 1 osPeers[osActiveKey] = osActiveCount + 1
@@ -320,6 +332,8 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
metricsProperties["ephemeral_peers_setup_keys"] = ephemeralPeersSKs metricsProperties["ephemeral_peers_setup_keys"] = ephemeralPeersSKs
metricsProperties["ephemeral_peers_setup_keys_usage"] = ephemeralPeersSKUsage metricsProperties["ephemeral_peers_setup_keys_usage"] = ephemeralPeersSKUsage
metricsProperties["active_peers_last_day"] = activePeersLastDay metricsProperties["active_peers_last_day"] = activePeersLastDay
metricsProperties["active_user_peers_last_day"] = activeUserPeersLastDay
metricsProperties["active_users_last_day"] = len(activeUsersLastDay)
metricsProperties["user_peers"] = userPeers metricsProperties["user_peers"] = userPeers
metricsProperties["rules"] = rules metricsProperties["rules"] = rules
metricsProperties["rules_with_src_posture_checks"] = rulesWithSrcPostureChecks metricsProperties["rules_with_src_posture_checks"] = rulesWithSrcPostureChecks
@@ -338,6 +352,7 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
metricsProperties["ui_clients"] = uiClient metricsProperties["ui_clients"] = uiClient
metricsProperties["idp_manager"] = w.idpManager metricsProperties["idp_manager"] = w.idpManager
metricsProperties["store_engine"] = w.dataSource.GetStoreEngine() metricsProperties["store_engine"] = w.dataSource.GetStoreEngine()
metricsProperties["rosenpass_enabled"] = rosenpassEnabled
for protocol, count := range rulesProtocol { for protocol, count := range rulesProtocol {
metricsProperties["rules_protocol_"+protocol] = count metricsProperties["rules_protocol_"+protocol] = count

View File

@@ -47,8 +47,8 @@ func (mockDatasource) GetAllAccounts(_ context.Context) []*types.Account {
"1": { "1": {
ID: "1", ID: "1",
UserID: "test", UserID: "test",
SSHEnabled: true, SSHEnabled: false,
Meta: nbpeer.PeerSystemMeta{GoOS: "linux", WtVersion: "0.0.1"}, Meta: nbpeer.PeerSystemMeta{GoOS: "linux", WtVersion: "0.0.1", Flags: nbpeer.Flags{ServerSSHAllowed: true, RosenpassEnabled: true}},
}, },
}, },
Policies: []*types.Policy{ Policies: []*types.Policy{
@@ -312,7 +312,19 @@ func TestGenerateProperties(t *testing.T) {
} }
if properties["posture_checks"] != 2 { if properties["posture_checks"] != 2 {
t.Errorf("expected 1 posture_checks, got %d", properties["posture_checks"]) t.Errorf("expected 2 posture_checks, got %d", properties["posture_checks"])
}
if properties["rosenpass_enabled"] != 1 {
t.Errorf("expected 1 rosenpass_enabled, got %d", properties["rosenpass_enabled"])
}
if properties["active_user_peers_last_day"] != 2 {
t.Errorf("expected 2 active_user_peers_last_day, got %d", properties["active_user_peers_last_day"])
}
if properties["active_users_last_day"] != 1 {
t.Errorf("expected 1 active_users_last_day, got %d", properties["active_users_last_day"])
} }
} }

View File

@@ -779,7 +779,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
t.Cleanup(ctrl.Finish) t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
} }
func createNSStore(t *testing.T) (store.Store, error) { func createNSStore(t *testing.T) (store.Store, error) {
@@ -988,14 +988,14 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Creating a nameserver group with a distribution group no peers should not update account peers // Creating a nameserver group with a distribution group no peers should not update account peers
// and not send peer update // and not send peer update
t.Run("creating nameserver group with distribution group no peers", func(t *testing.T) { t.Run("creating nameserver group with distribution group no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1023,6 +1023,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// saving a nameserver group with a distribution group with no peers should not update account peers // saving a nameserver group with a distribution group with no peers should not update account peers
// and not send peer update // and not send peer update
t.Run("saving nameserver group with distribution group no peers", func(t *testing.T) { t.Run("saving nameserver group with distribution group no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1041,6 +1046,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// Creating a nameserver group with a distribution group no peers should update account peers and send peer update // Creating a nameserver group with a distribution group no peers should update account peers and send peer update
t.Run("creating nameserver group with distribution group has peers", func(t *testing.T) { t.Run("creating nameserver group with distribution group has peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1067,6 +1077,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// saving a nameserver group with a distribution group with peers should update account peers and send peer update // saving a nameserver group with a distribution group with peers should update account peers and send peer update
t.Run("saving nameserver group with distribution group has peers", func(t *testing.T) { t.Run("saving nameserver group with distribution group has peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1097,6 +1111,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// Deleting a nameserver group should update account peers and send peer update // Deleting a nameserver group should update account peers and send peer update
t.Run("deleting nameserver group", func(t *testing.T) { t.Run("deleting nameserver group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)

View File

@@ -144,6 +144,10 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
if expired { if expired {
// we need to update other peers because when peer login expires all other peers are notified to disconnect from // we need to update other peers because when peer login expires all other peers are notified to disconnect from
// the expired one. Here we notify them that connection is now allowed again. // the expired one. Here we notify them that connection is now allowed again.
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
am.BufferUpdateAccountPeers(ctx, accountID) am.BufferUpdateAccountPeers(ctx, accountID)
} }
@@ -270,6 +274,11 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
inactivityExpirationChanged = true inactivityExpirationChanged = true
} }
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return err
}
return transaction.SavePeer(ctx, store.LockingStrengthUpdate, accountID, peer) return transaction.SavePeer(ctx, store.LockingStrengthUpdate, accountID, peer)
}) })
if err != nil { if err != nil {
@@ -755,6 +764,13 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
return err return err
} }
} }
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
if err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID); err != nil {
return err
}
}
return nil return nil
}) })
if err != nil { if err != nil {
@@ -888,6 +904,13 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
} }
} }
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return fmt.Errorf("failed to increment network serial: %w", err)
}
}
return nil return nil
}) })
if err != nil { if err != nil {
@@ -1169,7 +1192,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return return
} }
start := time.Now() globalStart := time.Now()
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra) approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
if err != nil { if err != nil {
@@ -1204,18 +1227,27 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
defer wg.Done() defer wg.Done()
defer func() { <-semaphore }() defer func() { <-semaphore }()
start := time.Now()
postureChecks, err := am.getPeerPostureChecks(account, p.ID) postureChecks, err := am.getPeerPostureChecks(account, p.ID)
if err != nil { if err != nil {
log.WithContext(ctx).Debugf("failed to get posture checks for peer %s: %v", peer.ID, err) log.WithContext(ctx).Debugf("failed to get posture checks for peer %s: %v", peer.ID, err)
return return
} }
am.metrics.UpdateChannelMetrics().CountCalcPostureChecksDuration(time.Since(start))
start = time.Now()
remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, p.ID, customZone, approvedPeersMap, resourcePolicies, routers, am.metrics.AccountManagerMetrics()) remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, p.ID, customZone, approvedPeersMap, resourcePolicies, routers, am.metrics.AccountManagerMetrics())
am.metrics.UpdateChannelMetrics().CountCalcPeerNetworkMapDuration(time.Since(start))
start = time.Now()
proxyNetworkMap, ok := proxyNetworkMaps[p.ID] proxyNetworkMap, ok := proxyNetworkMaps[p.ID]
if ok { if ok {
remotePeerNetworkMap.Merge(proxyNetworkMap) remotePeerNetworkMap.Merge(proxyNetworkMap)
} }
am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start))
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID) extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
if err != nil { if err != nil {
@@ -1223,7 +1255,10 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return return
} }
start = time.Now()
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting) update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting)
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap}) am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
}(peer) }(peer)
} }
@@ -1232,7 +1267,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
wg.Wait() wg.Wait()
if am.metrics != nil { if am.metrics != nil {
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(start)) am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
} }
} }

View File

@@ -19,6 +19,7 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding" "github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
@@ -963,10 +964,14 @@ func BenchmarkUpdateAccountPeers(b *testing.B) {
b.Fatalf("Failed to get account: %v", err) b.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels
@@ -1028,17 +1033,24 @@ func TestUpdateAccountPeers(t *testing.T) {
t.Fatalf("Failed to get account: %v", err) t.Fatalf("Failed to get account: %v", err)
} }
peerChannels := make(map[string]chan *UpdateMessage) peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
t.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers { for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize) peerChannels[peerID] = NewUpdateBuffer(metrics)
} }
manager.peersUpdateManager.peerChannels = peerChannels manager.peersUpdateManager.peerChannels = peerChannels
manager.UpdateAccountPeers(ctx, account.Id) manager.UpdateAccountPeers(ctx, account.Id)
for _, channel := range peerChannels { for _, channel := range peerChannels {
update := <-channel update, ok := channel.Pop(context.Background())
if !ok {
t.Fatalf("Expected update for peer, but channel is empty")
}
assert.Nil(t, update.Update.NetbirdConfig) assert.Nil(t, update.Update.NetbirdConfig)
assert.Equal(t, tc.peers, len(update.NetworkMap.Peers)) assert.Equal(t, tc.peers, len(update.NetworkMap.Peers))
assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules)) assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules))
@@ -1267,7 +1279,7 @@ func Test_RegisterPeerByUser(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err) assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1342,7 +1354,7 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err) assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1477,7 +1489,7 @@ func Test_RegisterPeerRollbackOnFailure(t *testing.T) {
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err) assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1546,7 +1558,7 @@ func Test_LoginPeer(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl) settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s) permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err) assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b" existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1734,13 +1746,12 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
var peer5 *nbpeer.Peer var peer5 *nbpeer.Peer
var peer6 *nbpeer.Peer var peer6 *nbpeer.Peer
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Updating not expired peer and peer expiration is enabled should not update account peers and not send peer update // Updating not expired peer and peer expiration is enabled should not update account peers and not send peer update
t.Run("updating not expired peer and peer expiration is enabled", func(t *testing.T) { t.Run("updating not expired peer and peer expiration is enabled", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1759,6 +1770,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to unlinked group should not update account peers and not send peer update // Adding peer to unlinked group should not update account peers and not send peer update
t.Run("adding peer to unlinked group", func(t *testing.T) { t.Run("adding peer to unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1784,6 +1799,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with unlinked group should not update account peers and not send peer update // Deleting peer with unlinked group should not update account peers and not send peer update
t.Run("deleting peer with unlinked group", func(t *testing.T) { t.Run("deleting peer with unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1802,6 +1821,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Updating peer label should update account peers and send peer update // Updating peer label should update account peers and send peer update
t.Run("updating peer label", func(t *testing.T) { t.Run("updating peer label", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1820,6 +1843,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
}) })
t.Run("validator requires update", func(t *testing.T) { t.Run("validator requires update", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
requireUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) { requireUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
return update, true, nil return update, true, nil
} }
@@ -1842,6 +1869,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
}) })
t.Run("validator requires no update", func(t *testing.T) { t.Run("validator requires no update", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
requireNoUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) { requireNoUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
return update, false, nil return update, false, nil
} }
@@ -1865,6 +1896,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with policy should update account peers and send peer update // Adding peer to group linked with policy should update account peers and send peer update
t.Run("adding peer to group linked with policy", func(t *testing.T) { t.Run("adding peer to group linked with policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{ _, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
AccountID: account.Id, AccountID: account.Id,
Enabled: true, Enabled: true,
@@ -1906,6 +1941,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to policy should update account peers and send peer update // Deleting peer with linked group to policy should update account peers and send peer update
t.Run("deleting peer with linked group to policy", func(t *testing.T) { t.Run("deleting peer with linked group to policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1924,6 +1963,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with route should update account peers and send peer update // Adding peer to group linked with route should update account peers and send peer update
t.Run("adding peer to group linked with route", func(t *testing.T) { t.Run("adding peer to group linked with route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
route := nbroute.Route{ route := nbroute.Route{
ID: "testingRoute1", ID: "testingRoute1",
Network: netip.MustParsePrefix("100.65.250.202/32"), Network: netip.MustParsePrefix("100.65.250.202/32"),
@@ -1970,6 +2013,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to route should update account peers and send peer update // Deleting peer with linked group to route should update account peers and send peer update
t.Run("deleting peer with linked group to route", func(t *testing.T) { t.Run("deleting peer with linked group to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1988,6 +2035,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with name server group should update account peers and send peer update // Adding peer to group linked with name server group should update account peers and send peer update
t.Run("adding peer to group linked with name server group", func(t *testing.T) { t.Run("adding peer to group linked with name server group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.CreateNameServerGroup( _, err = manager.CreateNameServerGroup(
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{ context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
IP: netip.MustParseAddr("1.1.1.1"), IP: netip.MustParseAddr("1.1.1.1"),
@@ -2025,6 +2076,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to name server group should update account peers and send peer update // Deleting peer with linked group to name server group should update account peers and send peer update
t.Run("deleting peer with linked group to route", func(t *testing.T) { t.Run("deleting peer with linked group to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)

View File

@@ -1017,17 +1017,16 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
var policyWithGroupRulesNoPeers *types.Policy var policyWithGroupRulesNoPeers *types.Policy
var policyWithDestinationPeersOnly *types.Policy var policyWithDestinationPeersOnly *types.Policy
var policyWithSourceAndDestinationPeers *types.Policy var policyWithSourceAndDestinationPeers *types.Policy
// Saving policy with rule groups with no peers should not update account's peers and not send peer update // Saving policy with rule groups with no peers should not update account's peers and not send peer update
t.Run("saving policy with rule groups with no peers", func(t *testing.T) { t.Run("saving policy with rule groups with no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1059,6 +1058,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with source group containing peers, but destination group without peers should // Saving policy with source group containing peers, but destination group without peers should
// update account's peers and send peer update // update account's peers and send peer update
t.Run("saving policy where source has peers but destination does not", func(t *testing.T) { t.Run("saving policy where source has peers but destination does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1091,6 +1094,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with destination group containing peers, but source group without peers should // Saving policy with destination group containing peers, but source group without peers should
// update account's peers and send peer update // update account's peers and send peer update
t.Run("saving policy where destination has peers but source does not", func(t *testing.T) { t.Run("saving policy where destination has peers but source does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1123,6 +1130,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with destination and source groups containing peers should update account's peers // Saving policy with destination and source groups containing peers should update account's peers
// and send peer update // and send peer update
t.Run("saving policy with source and destination groups with peers", func(t *testing.T) { t.Run("saving policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1154,6 +1165,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Disabling policy with destination and source groups containing peers should update account's peers // Disabling policy with destination and source groups containing peers should update account's peers
// and send peer update // and send peer update
t.Run("disabling policy with source and destination groups with peers", func(t *testing.T) { t.Run("disabling policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1174,6 +1189,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Updating disabled policy with destination and source groups containing peers should not update account's peers // Updating disabled policy with destination and source groups containing peers should not update account's peers
// or send peer update // or send peer update
t.Run("updating disabled policy with source and destination groups with peers", func(t *testing.T) { t.Run("updating disabled policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -1195,6 +1214,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Enabling policy with destination and source groups containing peers should update account's peers // Enabling policy with destination and source groups containing peers should update account's peers
// and send peer update // and send peer update
t.Run("enabling policy with source and destination groups with peers", func(t *testing.T) { t.Run("enabling policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1214,6 +1237,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy should trigger account peers update and send peer update // Deleting policy should trigger account peers update and send peer update
t.Run("deleting policy with source and destination groups with peers", func(t *testing.T) { t.Run("deleting policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1234,6 +1261,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy with destination group containing peers, but source group without peers should // Deleting policy with destination group containing peers, but source group without peers should
// update account's peers and send peer update // update account's peers and send peer update
t.Run("deleting policy where destination has peers but source does not", func(t *testing.T) { t.Run("deleting policy where destination has peers but source does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -1252,6 +1283,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy with no peers in groups should not update account's peers and not send peer update // Deleting policy with no peers in groups should not update account's peers and not send peer update
t.Run("deleting policy with no peers in groups", func(t *testing.T) { t.Run("deleting policy with no peers in groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)

View File

@@ -140,11 +140,6 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
postureCheckA := &posture.Checks{ postureCheckA := &posture.Checks{
Name: "postureCheckA", Name: "postureCheckA",
AccountID: account.Id, AccountID: account.Id,
@@ -171,6 +166,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Saving unused posture check should not update account peers and not send peer update // Saving unused posture check should not update account peers and not send peer update
t.Run("saving unused posture check", func(t *testing.T) { t.Run("saving unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -189,6 +188,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating unused posture check should not update account peers and not send peer update // Updating unused posture check should not update account peers and not send peer update
t.Run("updating unused posture check", func(t *testing.T) { t.Run("updating unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -226,6 +229,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Linking posture check to policy should trigger update account peers and send peer update // Linking posture check to policy should trigger update account peers and send peer update
t.Run("linking posture check to policy with peers", func(t *testing.T) { t.Run("linking posture check to policy with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -244,6 +251,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture checks should update account peers and send peer update // Updating linked posture checks should update account peers and send peer update
t.Run("updating linked to posture check with peers", func(t *testing.T) { t.Run("updating linked to posture check with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
postureCheckB.Checks = posture.ChecksDefinition{ postureCheckB.Checks = posture.ChecksDefinition{
NBVersionCheck: &posture.NBVersionCheck{ NBVersionCheck: &posture.NBVersionCheck{
MinVersion: "0.29.0", MinVersion: "0.29.0",
@@ -273,6 +284,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Removing posture check from policy should trigger account peers update and send peer update // Removing posture check from policy should trigger account peers update and send peer update
t.Run("removing posture check from policy", func(t *testing.T) { t.Run("removing posture check from policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -292,6 +307,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Deleting unused posture check should not trigger account peers update and not send peer update // Deleting unused posture check should not trigger account peers update and not send peer update
t.Run("deleting unused posture check", func(t *testing.T) { t.Run("deleting unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldNotReceiveUpdate(t, updMsg) peerShouldNotReceiveUpdate(t, updMsg)
@@ -313,6 +332,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture check to policy with no peers should not trigger account peers update and not send peer update // Updating linked posture check to policy with no peers should not trigger account peers update and not send peer update
t.Run("updating linked posture check to policy with no peers", func(t *testing.T) { t.Run("updating linked posture check to policy with no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{ _, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
Enabled: true, Enabled: true,
Rules: []*types.PolicyRule{ Rules: []*types.PolicyRule{
@@ -352,7 +375,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture check to policy where destination has peers but source does not // Updating linked posture check to policy where destination has peers but source does not
// should trigger account peers update and send peer update // should trigger account peers update and send peer update
t.Run("updating linked posture check to policy where destination has peers but source does not", func(t *testing.T) { t.Run("updating linked posture check to policy where destination has peers but source does not", func(t *testing.T) {
updMsg1 := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID) updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID)
t.Cleanup(func() { t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer2.ID) manager.peersUpdateManager.CloseChannel(context.Background(), peer2.ID)
}) })
@@ -374,7 +397,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg1) peerShouldReceiveUpdate(t, updMsg)
close(done) close(done)
}() }()
@@ -396,6 +419,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked client posture check to policy where source has peers but destination does not, // Updating linked client posture check to policy where source has peers but destination does not,
// should trigger account peers update and send peer update // should trigger account peers update and send peer update
t.Run("updating linked posture check to policy where source has peers but destination does not", func(t *testing.T) { t.Run("updating linked posture check to policy where source has peers but destination does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{ _, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
Enabled: true, Enabled: true,
Rules: []*types.PolicyRule{ Rules: []*types.PolicyRule{

View File

@@ -1284,7 +1284,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store) permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager) return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
} }
func createRouterStore(t *testing.T) (store.Store, error) { func createRouterStore(t *testing.T) (store.Store, error) {
@@ -1972,13 +1972,12 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
}, true) }, true)
assert.NoError(t, err) assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
// Creating a route with no routing peer and no peers in PeerGroups or Groups should not update account peers and not send peer update // Creating a route with no routing peer and no peers in PeerGroups or Groups should not update account peers and not send peer update
t.Run("creating route no routing peer and no peers in groups", func(t *testing.T) { t.Run("creating route no routing peer and no peers in groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
route := route.Route{ route := route.Route{
ID: "testingRoute1", ID: "testingRoute1",
Network: netip.MustParsePrefix("100.65.250.202/32"), Network: netip.MustParsePrefix("100.65.250.202/32"),
@@ -2015,6 +2014,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Creating a route with no routing peer and having peers in groups should update account peers and send peer update // Creating a route with no routing peer and having peers in groups should update account peers and send peer update
t.Run("creating a route with peers in PeerGroups and Groups", func(t *testing.T) { t.Run("creating a route with peers in PeerGroups and Groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
route := route.Route{ route := route.Route{
ID: "testingRoute2", ID: "testingRoute2",
Network: netip.MustParsePrefix("192.0.2.0/32"), Network: netip.MustParsePrefix("192.0.2.0/32"),
@@ -2064,6 +2067,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Creating route should update account peers and send peer update // Creating route should update account peers and send peer update
t.Run("creating route with a routing peer", func(t *testing.T) { t.Run("creating route with a routing peer", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -2089,6 +2096,11 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
t.Run("updating route", func(t *testing.T) { t.Run("updating route", func(t *testing.T) {
baseRoute.Groups = []string{routeGroup1, routeGroup2} baseRoute.Groups = []string{routeGroup1, routeGroup2}
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -2107,6 +2119,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Deleting the route should update account peers and send peer update // Deleting the route should update account peers and send peer update
t.Run("deleting route", func(t *testing.T) { t.Run("deleting route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
peerShouldReceiveUpdate(t, updMsg) peerShouldReceiveUpdate(t, updMsg)
@@ -2125,6 +2141,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Adding peer to route peer groups that do not have any peers should update account peers and send peer update // Adding peer to route peer groups that do not have any peers should update account peers and send peer update
t.Run("adding peer to route peer groups that do not have any peers", func(t *testing.T) { t.Run("adding peer to route peer groups that do not have any peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
newRoute := route.Route{ newRoute := route.Route{
Network: netip.MustParsePrefix("192.168.12.0/16"), Network: netip.MustParsePrefix("192.168.12.0/16"),
NetID: "superNet", NetID: "superNet",
@@ -2165,6 +2185,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Adding peer to route groups that do not have any peers should update account peers and send peer update // Adding peer to route groups that do not have any peers should update account peers and send peer update
t.Run("adding peer to route groups that do not have any peers", func(t *testing.T) { t.Run("adding peer to route groups that do not have any peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
newRoute := route.Route{ newRoute := route.Route{
Network: netip.MustParsePrefix("192.168.13.0/16"), Network: netip.MustParsePrefix("192.168.13.0/16"),
NetID: "superNet", NetID: "superNet",

View File

@@ -18,6 +18,13 @@ type UpdateChannelMetrics struct {
getAllConnectedPeersDurationMicro metric.Int64Histogram getAllConnectedPeersDurationMicro metric.Int64Histogram
getAllConnectedPeers metric.Int64Histogram getAllConnectedPeers metric.Int64Histogram
hasChannelDurationMicro metric.Int64Histogram hasChannelDurationMicro metric.Int64Histogram
calcPostureChecksDurationMicro metric.Int64Histogram
calcPeerNetworkMapDurationMs metric.Int64Histogram
mergeNetworkMapDurationMicro metric.Int64Histogram
toSyncResponseDurationMicro metric.Int64Histogram
bufferPushCounter metric.Int64Counter
bufferOverwriteCounter metric.Int64Counter
bufferIgnoreCounter metric.Int64Counter
ctx context.Context ctx context.Context
} }
@@ -89,6 +96,59 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
return nil, err return nil, err
} }
calcPostureChecksDurationMicro, err := meter.Int64Histogram("management.updatechannel.calc.posturechecks.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to get the posture checks for a peer"),
)
if err != nil {
return nil, err
}
calcPeerNetworkMapDurationMs, err := meter.Int64Histogram("management.updatechannel.calc.networkmap.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of how long it takes to calculate the network map for a peer"),
)
if err != nil {
return nil, err
}
mergeNetworkMapDurationMicro, err := meter.Int64Histogram("management.updatechannel.merge.networkmap.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to merge the network maps for a peer"),
)
if err != nil {
return nil, err
}
toSyncResponseDurationMicro, err := meter.Int64Histogram("management.updatechannel.tosyncresponse.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to convert the network map to sync response"),
)
if err != nil {
return nil, err
}
bufferPushCounter, err := meter.Int64Counter("management.updatechannel.buffer.push.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates pushed to an empty buffer"))
if err != nil {
return nil, err
}
bufferOverwriteCounter, err := meter.Int64Counter("management.updatechannel.buffer.overwrite.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates overwriting old unsent updates in the buffer"))
if err != nil {
return nil, err
}
bufferIgnoreCounter, err := meter.Int64Counter("management.updatechannel.buffer.ignore.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates being ignored due to old network serial"))
if err != nil {
return nil, err
}
return &UpdateChannelMetrics{ return &UpdateChannelMetrics{
createChannelDurationMicro: createChannelDurationMicro, createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMicro: closeChannelDurationMicro, closeChannelDurationMicro: closeChannelDurationMicro,
@@ -98,6 +158,13 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro, getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
getAllConnectedPeers: getAllConnectedPeers, getAllConnectedPeers: getAllConnectedPeers,
hasChannelDurationMicro: hasChannelDurationMicro, hasChannelDurationMicro: hasChannelDurationMicro,
calcPostureChecksDurationMicro: calcPostureChecksDurationMicro,
calcPeerNetworkMapDurationMs: calcPeerNetworkMapDurationMs,
mergeNetworkMapDurationMicro: mergeNetworkMapDurationMicro,
toSyncResponseDurationMicro: toSyncResponseDurationMicro,
bufferPushCounter: bufferPushCounter,
bufferOverwriteCounter: bufferOverwriteCounter,
bufferIgnoreCounter: bufferIgnoreCounter,
ctx: ctx, ctx: ctx,
}, nil }, nil
} }
@@ -137,3 +204,34 @@ func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration
func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) { func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) {
metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds()) metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
} }
func (metrics *UpdateChannelMetrics) CountCalcPostureChecksDuration(duration time.Duration) {
metrics.calcPostureChecksDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
func (metrics *UpdateChannelMetrics) CountCalcPeerNetworkMapDuration(duration time.Duration) {
metrics.calcPeerNetworkMapDurationMs.Record(metrics.ctx, duration.Milliseconds())
}
func (metrics *UpdateChannelMetrics) CountMergeNetworkMapDuration(duration time.Duration) {
metrics.mergeNetworkMapDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
func (metrics *UpdateChannelMetrics) CountToSyncResponseDuration(duration time.Duration) {
metrics.toSyncResponseDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
// CountBufferPush counts how many buffer push operations are happening on an empty buffer
func (metrics *UpdateChannelMetrics) CountBufferPush() {
metrics.bufferPushCounter.Add(metrics.ctx, 1)
}
// CountBufferOverwrite counts how many buffer overwrite operations are happening on a non-empty buffer
func (metrics *UpdateChannelMetrics) CountBufferOverwrite() {
metrics.bufferOverwriteCounter.Add(metrics.ctx, 1)
}
// CountBufferIgnore counts how many buffer ignore operations are happening when a new update is pushed
func (metrics *UpdateChannelMetrics) CountBufferIgnore() {
metrics.bufferIgnoreCounter.Add(metrics.ctx, 1)
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/netbirdio/netbird/management/proto" "github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/settings" "github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/util"
) )
@@ -29,7 +30,11 @@ var TurnTestHost = &types.Host{
func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) { func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
ttl := util.Duration{Duration: time.Hour} ttl := util.Duration{Duration: time.Hour}
secret := "some_secret" secret := "some_secret"
peersManager := NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
rc := &types.Relay{ rc := &types.Relay{
Addresses: []string{"localhost:0"}, Addresses: []string{"localhost:0"},
@@ -77,9 +82,25 @@ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) { func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
ttl := util.Duration{Duration: 2 * time.Second} ttl := util.Duration{Duration: 2 * time.Second}
secret := "some_secret" secret := "some_secret"
peersManager := NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
peer := "some_peer" peer := "some_peer"
updateChannel := peersManager.CreateChannel(context.Background(), peer) buffer := peersManager.CreateChannel(context.Background(), peer)
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
rc := &types.Relay{ rc := &types.Relay{
Addresses: []string{"localhost:0"}, Addresses: []string{"localhost:0"},
@@ -117,8 +138,8 @@ func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
loop: loop:
for timeout := time.After(5 * time.Second); ; { for timeout := time.After(5 * time.Second); ; {
select { select {
case update := <-updateChannel: case update := <-resultCh:
updates = append(updates, update) updates = append(updates, update.msg)
case <-timeout: case <-timeout:
break loop break loop
} }
@@ -181,7 +202,11 @@ loop:
func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) { func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) {
ttl := util.Duration{Duration: time.Hour} ttl := util.Duration{Duration: time.Hour}
secret := "some_secret" secret := "some_secret"
peersManager := NewPeersUpdateManager(nil) metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
peer := "some_peer" peer := "some_peer"
rc := &types.Relay{ rc := &types.Relay{

View File

@@ -0,0 +1,93 @@
package server
import (
"context"
"sync"
"github.com/netbirdio/netbird/management/server/telemetry"
)
type UpdateBuffer struct {
mu sync.Mutex
cond *sync.Cond
update *UpdateMessage
closed bool
metrics *telemetry.UpdateChannelMetrics
}
func NewUpdateBuffer(metrics *telemetry.UpdateChannelMetrics) *UpdateBuffer {
ub := &UpdateBuffer{metrics: metrics}
ub.cond = sync.NewCond(&ub.mu)
return ub
}
func (b *UpdateBuffer) Push(update *UpdateMessage) {
b.mu.Lock()
defer b.mu.Unlock()
if b.update != nil && update.Update.NetbirdConfig != nil {
if update.Update.NetbirdConfig.Relay != nil {
b.update.Update.NetbirdConfig.Relay = update.Update.NetbirdConfig.Relay
}
if update.Update.NetbirdConfig.Signal != nil {
b.update.Update.NetbirdConfig.Signal = update.Update.NetbirdConfig.Signal
}
if update.Update.NetbirdConfig.Flow != nil {
b.update.Update.NetbirdConfig.Flow = update.Update.NetbirdConfig.Flow
}
if update.Update.NetbirdConfig.Stuns != nil {
b.update.Update.NetbirdConfig.Stuns = update.Update.NetbirdConfig.Stuns
}
if update.Update.NetbirdConfig.Turns != nil {
b.update.Update.NetbirdConfig.Turns = update.Update.NetbirdConfig.Turns
}
}
// the equal case we need because we don't always increment the serial number
if b.update == nil || update.Update.NetworkMap.Serial > b.update.Update.NetworkMap.Serial || b.update.Update.NetworkMap.Serial == 0 {
b.update = update
b.cond.Signal()
if b.update == nil {
b.metrics.CountBufferPush()
return
}
b.metrics.CountBufferOverwrite()
return
}
b.metrics.CountBufferIgnore()
}
func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, bool) {
b.mu.Lock()
defer b.mu.Unlock()
for b.update == nil && !b.closed {
waitCh := make(chan struct{})
go func() {
select {
case <-ctx.Done():
b.cond.Broadcast()
case <-waitCh:
// noop
}
}()
b.cond.Wait()
close(waitCh)
}
if b.closed {
return nil, false
}
msg := b.update
b.update = nil
return msg, true
}
func (b *UpdateBuffer) Close() {
b.mu.Lock()
b.closed = true
b.cond.Broadcast()
b.mu.Unlock()
}

View File

@@ -12,8 +12,6 @@ import (
"github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/types"
) )
const channelBufferSize = 100
type UpdateMessage struct { type UpdateMessage struct {
Update *proto.SyncResponse Update *proto.SyncResponse
NetworkMap *types.NetworkMap NetworkMap *types.NetworkMap
@@ -21,7 +19,7 @@ type UpdateMessage struct {
type PeersUpdateManager struct { type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID // peerChannels is an update channel indexed by Peer.ID
peerChannels map[string]chan *UpdateMessage peerChannels map[string]*UpdateBuffer
// channelsMux keeps the mutex to access peerChannels // channelsMux keeps the mutex to access peerChannels
channelsMux *sync.RWMutex channelsMux *sync.RWMutex
// metrics provides method to collect application metrics // metrics provides method to collect application metrics
@@ -31,7 +29,7 @@ type PeersUpdateManager struct {
// NewPeersUpdateManager returns a new instance of PeersUpdateManager // NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager { func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{ return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage), peerChannels: make(map[string]*UpdateBuffer),
channelsMux: &sync.RWMutex{}, channelsMux: &sync.RWMutex{},
metrics: metrics, metrics: metrics,
} }
@@ -53,20 +51,14 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
if channel, ok := p.peerChannels[peerID]; ok { if channel, ok := p.peerChannels[peerID]; ok {
found = true found = true
select { channel.Push(update)
case channel <- update:
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
default:
dropped = true
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
}
} else { } else {
log.WithContext(ctx).Debugf("peer %s has no channel", peerID) log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
} }
} }
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer. // CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage { func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *UpdateBuffer {
start := time.Now() start := time.Now()
closed := false closed := false
@@ -81,22 +73,22 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
if channel, ok := p.peerChannels[peerID]; ok { if channel, ok := p.peerChannels[peerID]; ok {
closed = true closed = true
channel.Close()
delete(p.peerChannels, peerID) delete(p.peerChannels, peerID)
close(channel)
} }
// mbragin: todo shouldn't it be more? or configurable? // mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize) buffer := NewUpdateBuffer(p.metrics.UpdateChannelMetrics())
p.peerChannels[peerID] = channel p.peerChannels[peerID] = buffer
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID) log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
return channel return buffer
} }
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) { func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok { if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID) delete(p.peerChannels, peerID)
close(channel) channel.Close()
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID) log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
return return

View File

@@ -6,13 +6,19 @@ import (
"time" "time"
"github.com/netbirdio/netbird/management/proto" "github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/telemetry"
) )
// var peersUpdater *PeersUpdateManager // var peersUpdater *PeersUpdateManager
func TestCreateChannel(t *testing.T) { func TestCreateChannel(t *testing.T) {
peer := "test-create" peer := "test-create"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
defer peersUpdater.CloseChannel(context.Background(), peer) defer peersUpdater.CloseChannel(context.Background(), peer)
_ = peersUpdater.CreateChannel(context.Background(), peer) _ = peersUpdater.CreateChannel(context.Background(), peer)
@@ -23,7 +29,12 @@ func TestCreateChannel(t *testing.T) {
func TestSendUpdate(t *testing.T) { func TestSendUpdate(t *testing.T) {
peer := "test-sendupdate" peer := "test-sendupdate"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
update1 := &UpdateMessage{Update: &proto.SyncResponse{ update1 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{ NetworkMap: &proto.NetworkMap{
Serial: 0, Serial: 0,
@@ -33,41 +44,62 @@ func TestSendUpdate(t *testing.T) {
if _, ok := peersUpdater.peerChannels[peer]; !ok { if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel") t.Error("Error creating the channel")
} }
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
for {
msg, ok := peersUpdater.peerChannels[peer].Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}
}()
peersUpdater.SendUpdate(context.Background(), peer, update1) peersUpdater.SendUpdate(context.Background(), peer, update1)
select { select {
case <-peersUpdater.peerChannels[peer]: case <-resultCh:
default: case <-time.After(1 * time.Second):
t.Error("Update wasn't send") t.Error("Update wasn't send")
} }
for range [channelBufferSize]int{} {
peersUpdater.SendUpdate(context.Background(), peer, update1)
}
update2 := &UpdateMessage{Update: &proto.SyncResponse{ update2 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{ NetworkMap: &proto.NetworkMap{
Serial: 10, Serial: 10,
}, },
}} }}
update3 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{
Serial: 8,
},
}}
peersUpdater.SendUpdate(context.Background(), peer, update2) peersUpdater.SendUpdate(context.Background(), peer, update2)
timeout := time.After(5 * time.Second) timeout := time.After(5 * time.Second)
for range [channelBufferSize]int{} {
select { select {
case <-timeout: case <-timeout:
t.Error("timed out reading previously sent updates") t.Error("timed out reading previously sent updates")
case updateReader := <-peersUpdater.peerChannels[peer]: case updateReader := <-resultCh:
if updateReader.Update.NetworkMap.Serial == update2.Update.NetworkMap.Serial { if updateReader.msg.Update.NetworkMap.Serial == update3.Update.NetworkMap.Serial {
t.Error("got the update that shouldn't have been sent") t.Error("got the update that shouldn't have been sent")
}
} }
} }
} }
func TestCloseChannel(t *testing.T) { func TestCloseChannel(t *testing.T) {
peer := "test-close" peer := "test-close"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
_ = peersUpdater.CreateChannel(context.Background(), peer) _ = peersUpdater.CreateChannel(context.Background(), peer)
if _, ok := peersUpdater.peerChannels[peer]; !ok { if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel") t.Error("Error creating the channel")

View File

@@ -960,6 +960,12 @@ func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, accou
) )
} }
// ideally this should run in a transaction
err = am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
if len(peerIDs) != 0 { if len(peerIDs) != 0 {
// this will trigger peer disconnect from the management service // this will trigger peer disconnect from the management service
am.peersUpdateManager.CloseChannels(ctx, peerIDs) am.peersUpdateManager.CloseChannels(ctx, peerIDs)

View File

@@ -21,6 +21,7 @@ var (
// Update fetch the version info periodically and notify the onUpdateListener in case the UI version or the // Update fetch the version info periodically and notify the onUpdateListener in case the UI version or the
// daemon version are deprecated // daemon version are deprecated
type Update struct { type Update struct {
httpAgent string
uiVersion *goversion.Version uiVersion *goversion.Version
daemonVersion *goversion.Version daemonVersion *goversion.Version
latestAvailable *goversion.Version latestAvailable *goversion.Version
@@ -34,7 +35,7 @@ type Update struct {
} }
// NewUpdate instantiate Update and start to fetch the new version information // NewUpdate instantiate Update and start to fetch the new version information
func NewUpdate() *Update { func NewUpdate(httpAgent string) *Update {
currentVersion, err := goversion.NewVersion(version) currentVersion, err := goversion.NewVersion(version)
if err != nil { if err != nil {
currentVersion, _ = goversion.NewVersion("0.0.0") currentVersion, _ = goversion.NewVersion("0.0.0")
@@ -43,6 +44,7 @@ func NewUpdate() *Update {
latestAvailable, _ := goversion.NewVersion("0.0.0") latestAvailable, _ := goversion.NewVersion("0.0.0")
u := &Update{ u := &Update{
httpAgent: httpAgent,
latestAvailable: latestAvailable, latestAvailable: latestAvailable,
uiVersion: currentVersion, uiVersion: currentVersion,
fetchTicker: time.NewTicker(fetchPeriod), fetchTicker: time.NewTicker(fetchPeriod),
@@ -112,7 +114,15 @@ func (u *Update) startFetcher() {
func (u *Update) fetchVersion() bool { func (u *Update) fetchVersion() bool {
log.Debugf("fetching version info from %s", versionURL) log.Debugf("fetching version info from %s", versionURL)
resp, err := http.Get(versionURL) req, err := http.NewRequest("GET", versionURL, nil)
if err != nil {
log.Errorf("failed to create request for version info: %s", err)
return false
}
req.Header.Set("User-Agent", u.httpAgent)
resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
log.Errorf("failed to fetch version info: %s", err) log.Errorf("failed to fetch version info: %s", err)
return false return false

View File

@@ -9,6 +9,8 @@ import (
"time" "time"
) )
const httpAgent = "pkg/test"
func TestNewUpdate(t *testing.T) { func TestNewUpdate(t *testing.T) {
version = "1.0.0" version = "1.0.0"
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -21,7 +23,7 @@ func TestNewUpdate(t *testing.T) {
wg.Add(1) wg.Add(1)
onUpdate := false onUpdate := false
u := NewUpdate() u := NewUpdate(httpAgent)
defer u.StopWatch() defer u.StopWatch()
u.SetOnUpdateListener(func() { u.SetOnUpdateListener(func() {
onUpdate = true onUpdate = true
@@ -46,7 +48,7 @@ func TestDoNotUpdate(t *testing.T) {
wg.Add(1) wg.Add(1)
onUpdate := false onUpdate := false
u := NewUpdate() u := NewUpdate(httpAgent)
defer u.StopWatch() defer u.StopWatch()
u.SetOnUpdateListener(func() { u.SetOnUpdateListener(func() {
onUpdate = true onUpdate = true
@@ -71,7 +73,7 @@ func TestDaemonUpdate(t *testing.T) {
wg.Add(1) wg.Add(1)
onUpdate := false onUpdate := false
u := NewUpdate() u := NewUpdate(httpAgent)
defer u.StopWatch() defer u.StopWatch()
u.SetOnUpdateListener(func() { u.SetOnUpdateListener(func() {
onUpdate = true onUpdate = true