Compare commits

...

29 Commits

Author SHA1 Message Date
Pascal Fischer
5d34804062 Merge branch 'main' into feature/limit-update-channel 2025-06-28 18:21:17 +02:00
Pascal Fischer
418d23d7f8 Merge branch 'main' into feature/limit-update-channel 2025-06-28 18:20:41 +02:00
Pascal Fischer
b0582c86ba update config if exist 2025-06-28 17:59:35 +02:00
Pascal Fischer
1b7c787cc5 fix tests 2025-06-28 16:08:25 +02:00
Pascal Fischer
ad50e07325 fix testing tool and remove unused const 2025-06-28 15:45:07 +02:00
Pascal Fischer
c66bd0cc71 fix tests 2025-06-28 15:33:49 +02:00
Pascal Fischer
0480507a10 [management] report networkmap duration in ms (#4064) 2025-06-28 11:38:15 +02:00
Krzysztof Nazarewski (kdn)
34ac4e4b5a [misc] fix: self-hosting: the wrong default for NETBIRD_AUTH_PKCE_LOGIN_FLAG (#4055)
* fix: self-hosting: the wrong default for NETBIRD_AUTH_PKCE_LOGIN_FLAG

fixes https://github.com/netbirdio/netbird/issues/4054

* un-quote the number

Co-authored-by: Maycon Santos <mlsmaycon@gmail.com>

---------

Co-authored-by: Maycon Santos <mlsmaycon@gmail.com>
2025-06-26 10:45:00 +02:00
Pascal Fischer
52ff9d9602 [management] remove unused transaction (#4053) 2025-06-26 01:34:22 +02:00
Pascal Fischer
6f0111eab0 fix tests 2025-06-25 16:38:49 +02:00
Pascal Fischer
d403d20e7b fix testing tools 2025-06-25 15:37:31 +02:00
Pascal Fischer
c043018939 fix tests 2025-06-25 12:27:55 +02:00
Pascal Fischer
ed6ed4a597 add metrics 2025-06-25 12:09:06 +02:00
Pascal Fischer
905a3481ec Merge branch 'refs/heads/main' into feature/limit-update-channel 2025-06-25 12:05:33 +02:00
Pascal Fischer
1b73fae46e [management] add breakdown of network map calculation metrics (#4020) 2025-06-25 11:46:35 +02:00
Pascal Fischer
47052fa024 Merge branch 'main' into feature/limit-update-channel 2025-06-25 11:32:30 +02:00
Viktor Liu
d897365abc [client] Don't open cmd.exe during MSI actions (#4041) 2025-06-24 21:32:37 +02:00
Viktor Liu
f37aa2cc9d [misc] Specify netbird binary location in Dockerfiles (#4024) 2025-06-23 10:09:02 +02:00
Maycon Santos
5343bee7b2 [management] check and log on new management version (#4029)
This PR enhances the version checker to send a custom User-Agent header when polling for updates, and configures both the management CLI and client UI to use distinct agents. 

- NewUpdate now takes an `httpAgent` string to set the User-Agent header.
- `fetchVersion` builds a custom HTTP request (instead of `http.Get`) and sets the User-Agent.
- Management CLI and client UI now pass `"nb/management"` and `"nb/client-ui"` respectively to NewUpdate.
- Tests updated to supply an `httpAgent` constant.
- Logs if there is a new version available for management
2025-06-22 16:44:33 +02:00
Maycon Santos
870e29db63 [misc] add additional metrics (#4028)
* add additional metrics

we are collecting active rosenpass, ssh from the client side
we are also collecting active user peers and active users

* remove duplicated
2025-06-22 13:44:25 +02:00
Maycon Santos
08e9b05d51 [client] close windows when process needs to exit (#4027)
This PR fixes a bug by ensuring that the advanced settings and re-authentication windows are closed appropriately when the main GUI process exits.

- Updated runSelfCommand calls throughout the UI to pass a context parameter.
- Modified runSelfCommand’s signature and its internal command invocation to use exec.CommandContext for proper cancellation handling.
2025-06-22 10:33:04 +02:00
hakansa
3581648071 [client] Refactor showLoginURL to improve error handling and connection status checks (#4026)
This PR refactors showLoginURL to improve error handling and connection status checks by delaying the login fetch until user interaction and closing the pop-up if already connected.

- Moved s.login(false) call into the click handler to defer network I/O.
- Added a conn.Status check after opening the URL to skip reconnection if already connected.
- Enhanced error logs for missing verification URLs and service status failures.
2025-06-22 10:03:58 +02:00
Viktor Liu
2a51609436 [client] Handle lazy routing peers that are part of HA groups (#3943)
* Activate new lazy routing peers if the HA group is active
* Prevent lazy peers going to idle if HA group members are active (#3948)
2025-06-20 18:07:19 +02:00
Pascal Fischer
83457f8b99 [management] add transaction for integrated validator groups update and primary account update (#4014) 2025-06-20 12:13:24 +02:00
Pascal Fischer
4795e2fbc4 increment network serial on peer meta changed 2025-06-19 12:46:50 +02:00
Pascal Fischer
ec57c685a9 increment network serial on peer meta changed 2025-06-19 12:41:03 +02:00
Pascal Fischer
4b44b8c46c increment network serial 2025-06-19 12:14:26 +02:00
Pascal Fischer
0c7cac81f0 use update buffer instead of channel 2025-06-17 15:44:14 +02:00
Pascal Fischer
5e9ea122f7 limit channel to 2 messages and drop outdated if needed 2025-06-17 15:00:23 +02:00
41 changed files with 1063 additions and 298 deletions

View File

@@ -9,7 +9,7 @@ on:
pull_request:
env:
SIGN_PIPE_VER: "v0.0.18"
SIGN_PIPE_VER: "v0.0.19"
GORELEASER_VER: "v2.3.2"
PRODUCT_NAME: "NetBird"
COPYRIGHT: "NetBird GmbH"

View File

@@ -1,6 +1,9 @@
FROM alpine:3.21.3
# iproute2: busybox doesn't display ip rules properly
RUN apk add --no-cache ca-certificates ip6tables iproute2 iptables
ARG NETBIRD_BINARY=netbird
COPY ${NETBIRD_BINARY} /usr/local/bin/netbird
ENV NB_FOREGROUND_MODE=true
ENTRYPOINT [ "/usr/local/bin/netbird","up"]
COPY netbird /usr/local/bin/netbird

View File

@@ -1,6 +1,7 @@
FROM alpine:3.21.0
COPY netbird /usr/local/bin/netbird
ARG NETBIRD_BINARY=netbird
COPY ${NETBIRD_BINARY} /usr/local/bin/netbird
RUN apk add --no-cache ca-certificates \
&& adduser -D -h /var/lib/netbird netbird

View File

@@ -83,7 +83,6 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
}
t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, nil
@@ -92,6 +91,9 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)

View File

@@ -175,7 +175,7 @@ func (e *ConnMgr) AddPeerConn(ctx context.Context, peerKey string, conn *peer.Co
PeerConnID: conn.ConnID(),
Log: conn.Log,
}
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
excluded, err := e.lazyConnMgr.AddPeer(e.lazyCtx, lazyPeerCfg)
if err != nil {
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
if err := conn.Open(ctx); err != nil {

View File

@@ -1456,16 +1456,16 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
}
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -68,3 +68,8 @@ func (i *Monitor) PauseTimer() {
func (i *Monitor) ResetTimer() {
i.timer.Reset(i.inactivityThreshold)
}
func (i *Monitor) ResetMonitor(ctx context.Context, timeoutChan chan peer.ConnID) {
i.Stop()
go i.Start(ctx, timeoutChan)
}

View File

@@ -58,7 +58,7 @@ type Manager struct {
// Route HA group management
peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to
haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group
routesMu sync.RWMutex // protects route mappings
routesMu sync.RWMutex
onInactive chan peerid.ConnID
}
@@ -146,7 +146,7 @@ func (m *Manager) Start(ctx context.Context) {
case peerConnID := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, peerConnID)
case peerConnID := <-m.onInactive:
m.onPeerInactivityTimedOut(peerConnID)
m.onPeerInactivityTimedOut(ctx, peerConnID)
}
}
}
@@ -197,7 +197,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo
return added
}
func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (bool, error) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -225,6 +225,13 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
peerCfg: &peerCfg,
expectedWatcher: watcherActivity,
}
// Check if this peer should be activated because its HA group peers are active
if group, ok := m.shouldActivateNewPeer(peerCfg.PublicKey); ok {
peerCfg.Log.Debugf("peer belongs to active HA group %s, will activate immediately", group)
m.activateNewPeerInActiveGroup(ctx, peerCfg)
}
return false, nil
}
@@ -315,36 +322,38 @@ func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConf
// activateHAGroupPeers activates all peers in HA groups that the given peer belongs to
func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) {
var peersToActivate []string
m.routesMu.RLock()
haGroups := m.peerToHAGroups[triggerPeerID]
m.routesMu.RUnlock()
if len(haGroups) == 0 {
m.routesMu.RUnlock()
log.Debugf("peer %s is not part of any HA groups", triggerPeerID)
return
}
activatedCount := 0
for _, haGroup := range haGroups {
m.routesMu.RLock()
peers := m.haGroupToPeers[haGroup]
m.routesMu.RUnlock()
for _, peerID := range peers {
if peerID == triggerPeerID {
continue
if peerID != triggerPeerID {
peersToActivate = append(peersToActivate, peerID)
}
}
}
m.routesMu.RUnlock()
cfg, mp := m.getPeerForActivation(peerID)
if cfg == nil {
continue
}
activatedCount := 0
for _, peerID := range peersToActivate {
cfg, mp := m.getPeerForActivation(peerID)
if cfg == nil {
continue
}
if m.activateSinglePeer(ctx, cfg, mp) {
activatedCount++
cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID)
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
}
if m.activateSinglePeer(ctx, cfg, mp) {
activatedCount++
cfg.Log.Infof("activated peer as part of HA group (triggered by %s)", triggerPeerID)
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
}
}
@@ -354,6 +363,51 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string
}
}
// shouldActivateNewPeer checks if a newly added peer should be activated
// because other peers in its HA groups are already active
func (m *Manager) shouldActivateNewPeer(peerID string) (route.HAUniqueID, bool) {
m.routesMu.RLock()
defer m.routesMu.RUnlock()
haGroups := m.peerToHAGroups[peerID]
if len(haGroups) == 0 {
return "", false
}
for _, haGroup := range haGroups {
peers := m.haGroupToPeers[haGroup]
for _, groupPeerID := range peers {
if groupPeerID == peerID {
continue
}
cfg, ok := m.managedPeers[groupPeerID]
if !ok {
continue
}
if mp, ok := m.managedPeersByConnID[cfg.PeerConnID]; ok && mp.expectedWatcher == watcherInactivity {
return haGroup, true
}
}
}
return "", false
}
// activateNewPeerInActiveGroup activates a newly added peer that should be active due to HA group
func (m *Manager) activateNewPeerInActiveGroup(ctx context.Context, peerCfg lazyconn.PeerConfig) {
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
if !ok {
return
}
if !m.activateSinglePeer(ctx, &peerCfg, mp) {
return
}
peerCfg.Log.Infof("activated newly added peer due to active HA group peers")
m.peerStore.PeerConnOpen(m.engineCtx, peerCfg.PublicKey)
}
func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error {
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
peerCfg.Log.Warnf("peer already managed")
@@ -415,6 +469,48 @@ func (m *Manager) close() {
log.Infof("lazy connection manager closed")
}
// shouldDeferIdleForHA checks if peer should stay connected due to HA group requirements
func (m *Manager) shouldDeferIdleForHA(peerID string) bool {
m.routesMu.RLock()
defer m.routesMu.RUnlock()
haGroups := m.peerToHAGroups[peerID]
if len(haGroups) == 0 {
return false
}
for _, haGroup := range haGroups {
groupPeers := m.haGroupToPeers[haGroup]
for _, groupPeerID := range groupPeers {
if groupPeerID == peerID {
continue
}
cfg, ok := m.managedPeers[groupPeerID]
if !ok {
continue
}
groupMp, ok := m.managedPeersByConnID[cfg.PeerConnID]
if !ok {
continue
}
if groupMp.expectedWatcher != watcherInactivity {
continue
}
// Other member is still connected, defer idle
if peer, ok := m.peerStore.PeerConn(groupPeerID); ok && peer.IsConnected() {
return true
}
}
}
return false
}
func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -441,7 +537,7 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID)
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
}
func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peerid.ConnID) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -456,6 +552,17 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
return
}
if m.shouldDeferIdleForHA(mp.peerCfg.PublicKey) {
iw, ok := m.inactivityMonitors[peerConnID]
if ok {
mp.peerCfg.Log.Debugf("resetting inactivity timer due to HA group requirements")
iw.ResetMonitor(ctx, m.onInactive)
} else {
mp.peerCfg.Log.Errorf("inactivity monitor not found for HA defer reset")
}
return
}
mp.peerCfg.Log.Infof("connection timed out")
// this is blocking operation, potentially can be optimized
@@ -489,7 +596,7 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) {
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID]
if !ok {
mp.peerCfg.Log.Errorf("inactivity monitor not found for peer")
mp.peerCfg.Log.Warnf("inactivity monitor not found for peer")
return
}

View File

@@ -317,12 +317,12 @@ func (conn *Conn) WgConfig() WgConfig {
return conn.config.WgConfig
}
// IsConnected unit tests only
// refactor unit test to use status recorder use refactor status recorded to manage connection status in peer.Conn
// IsConnected returns true if the peer is connected
func (conn *Conn) IsConnected() bool {
conn.mu.Lock()
defer conn.mu.Unlock()
return conn.currentConnPriority != conntype.None
return conn.evalStatus() == StatusConnected
}
func (conn *Conn) GetKey() string {

View File

@@ -1,8 +1,10 @@
<Wix
xmlns="http://wixtoolset.org/schemas/v4/wxs">
xmlns="http://wixtoolset.org/schemas/v4/wxs"
xmlns:util="http://wixtoolset.org/schemas/v4/wxs/util">
<Package Name="NetBird" Version="$(env.NETBIRD_VERSION)" Manufacturer="NetBird GmbH" Language="1033" UpgradeCode="6456ec4e-3ad6-4b9b-a2be-98e81cb21ccf"
InstallerVersion="500" Compressed="yes" Codepage="utf-8" >
<MediaTemplate EmbedCab="yes" />
<Feature Id="NetbirdFeature" Title="Netbird" Level="1">
@@ -46,29 +48,10 @@
<ComponentRef Id="NetbirdFiles" />
</ComponentGroup>
<Property Id="cmd" Value="cmd.exe"/>
<util:CloseApplication Id="CloseNetBird" CloseMessage="no" Target="netbird.exe" RebootPrompt="no" />
<util:CloseApplication Id="CloseNetBirdUI" CloseMessage="no" Target="netbird-ui.exe" RebootPrompt="no" />
<CustomAction Id="KillDaemon"
ExeCommand='/c "taskkill /im netbird.exe"'
Execute="deferred"
Property="cmd"
Impersonate="no"
Return="ignore"
/>
<CustomAction Id="KillUI"
ExeCommand='/c "taskkill /im netbird-ui.exe"'
Execute="deferred"
Property="cmd"
Impersonate="no"
Return="ignore"
/>
<InstallExecuteSequence>
<!-- For Uninstallation -->
<Custom Action="KillDaemon" Before="RemoveFiles" Condition="Installed"/>
<Custom Action="KillUI" After="KillDaemon" Condition="Installed"/>
</InstallExecuteSequence>
<!-- Icons -->
<Icon Id="NetbirdIcon" SourceFile=".\client\ui\assets\netbird.ico" />

View File

@@ -191,16 +191,16 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
}
t.Cleanup(cleanUp)
peersUpdateManager := server.NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
if err != nil {
return nil, "", err
}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -280,7 +280,7 @@ func newServiceClient(addr string, logFile string, a fyne.App, showSettings bool
showAdvancedSettings: showSettings,
showNetworks: showNetworks,
update: version.NewUpdate(),
update: version.NewUpdate("nb/client-ui"),
}
s.eventHandler = newEventHandler(s)
@@ -879,7 +879,7 @@ func (s *serviceClient) onUpdateAvailable() {
func (s *serviceClient) onSessionExpire() {
s.sendNotification = true
if s.sendNotification {
s.eventHandler.runSelfCommand("login-url", "true")
s.eventHandler.runSelfCommand(s.ctx, "login-url", "true")
s.sendNotification = false
}
}
@@ -992,21 +992,6 @@ func (s *serviceClient) restartClient(loginRequest *proto.LoginRequest) error {
// showLoginURL creates a borderless window styled like a pop-up in the top-right corner using s.wLoginURL.
func (s *serviceClient) showLoginURL() {
resp, err := s.login(false)
if err != nil {
log.Errorf("failed to fetch login URL: %v", err)
return
}
verificationURL := resp.VerificationURIComplete
if verificationURL == "" {
verificationURL = resp.VerificationURI
}
if verificationURL == "" {
log.Error("no verification URL provided in the login response")
return
}
resIcon := fyne.NewStaticResource("netbird.png", iconAbout)
if s.wLoginURL == nil {
@@ -1025,6 +1010,21 @@ func (s *serviceClient) showLoginURL() {
return
}
resp, err := s.login(false)
if err != nil {
log.Errorf("failed to fetch login URL: %v", err)
return
}
verificationURL := resp.VerificationURIComplete
if verificationURL == "" {
verificationURL = resp.VerificationURI
}
if verificationURL == "" {
log.Error("no verification URL provided in the login response")
return
}
if err := openURL(verificationURL); err != nil {
log.Errorf("failed to open login URL: %v", err)
return
@@ -1038,7 +1038,19 @@ func (s *serviceClient) showLoginURL() {
}
label.SetText("Re-authentication successful.\nReconnecting")
time.Sleep(300 * time.Millisecond)
status, err := conn.Status(s.ctx, &proto.StatusRequest{})
if err != nil {
log.Errorf("get service status: %v", err)
return
}
if status.Status == string(internal.StatusConnected) {
label.SetText("Already connected.\nClosing this window.")
time.Sleep(2 * time.Second)
s.wLoginURL.Close()
return
}
_, err = conn.Up(s.ctx, &proto.UpRequest{})
if err != nil {
label.SetText("Reconnecting failed, please create \na debug bundle in the settings and contact support.")

View File

@@ -122,7 +122,7 @@ func (h *eventHandler) handleAdvancedSettingsClick() {
go func() {
defer h.client.mAdvancedSettings.Enable()
defer h.client.getSrvConfig()
h.runSelfCommand("settings", "true")
h.runSelfCommand(h.client.ctx, "settings", "true")
}()
}
@@ -130,7 +130,7 @@ func (h *eventHandler) handleCreateDebugBundleClick() {
h.client.mCreateDebugBundle.Disable()
go func() {
defer h.client.mCreateDebugBundle.Enable()
h.runSelfCommand("debug", "true")
h.runSelfCommand(h.client.ctx, "debug", "true")
}()
}
@@ -154,7 +154,7 @@ func (h *eventHandler) handleNetworksClick() {
h.client.mNetworks.Disable()
go func() {
defer h.client.mNetworks.Enable()
h.runSelfCommand("networks", "true")
h.runSelfCommand(h.client.ctx, "networks", "true")
}()
}
@@ -172,14 +172,14 @@ func (h *eventHandler) updateConfigWithErr() {
}
}
func (h *eventHandler) runSelfCommand(command, arg string) {
func (h *eventHandler) runSelfCommand(ctx context.Context, command, arg string) {
proc, err := os.Executable()
if err != nil {
log.Errorf("error getting executable path: %v", err)
return
}
cmd := exec.Command(proc,
cmd := exec.CommandContext(ctx, proc,
fmt.Sprintf("--%s=%s", command, arg),
fmt.Sprintf("--daemon-addr=%s", h.client.addr),
)

View File

@@ -60,7 +60,7 @@ NETBIRD_TOKEN_SOURCE=${NETBIRD_TOKEN_SOURCE:-accessToken}
NETBIRD_AUTH_PKCE_REDIRECT_URL_PORTS=${NETBIRD_AUTH_PKCE_REDIRECT_URL_PORTS:-"53000"}
NETBIRD_AUTH_PKCE_USE_ID_TOKEN=${NETBIRD_AUTH_PKCE_USE_ID_TOKEN:-false}
NETBIRD_AUTH_PKCE_DISABLE_PROMPT_LOGIN=${NETBIRD_AUTH_PKCE_DISABLE_PROMPT_LOGIN:-false}
NETBIRD_AUTH_PKCE_LOGIN_FLAG=${NETBIRD_AUTH_PKCE_LOGIN_FLAG:-1}
NETBIRD_AUTH_PKCE_LOGIN_FLAG=${NETBIRD_AUTH_PKCE_LOGIN_FLAG:-0}
NETBIRD_AUTH_PKCE_AUDIENCE=$NETBIRD_AUTH_AUDIENCE
# Dashboard

View File

@@ -68,13 +68,13 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
}
t.Cleanup(cleanUp)
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -357,6 +357,13 @@ var (
log.WithContext(ctx).Infof("running HTTP server and gRPC server on the same port: %s", listener.Addr().String())
serveGRPCWithHTTP(ctx, listener, rootHandler, tlsEnabled)
update := version.NewUpdate("nb/management")
update.SetDaemonVersion(version.NetbirdVersion())
update.SetOnUpdateListener(func() {
log.WithContext(ctx).Infof("your management version, \"%s\", is outdated, a new management version is available. Learn more here: https://github.com/netbirdio/netbird/releases", version.NetbirdVersion())
})
defer update.StopWatch()
SetupCloseHandler()
<-stopCh

View File

@@ -1624,6 +1624,10 @@ func (am *DefaultAccountManager) GetDNSDomain(settings *types.Settings) string {
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
am.BufferUpdateAccountPeers(ctx, accountID)
}
@@ -1853,40 +1857,49 @@ func (am *DefaultAccountManager) GetOrCreateAccountByPrivateDomain(ctx context.C
}
func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) {
account, err := am.Store.GetAccount(ctx, accountId)
var account *types.Account
err := am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
var err error
account, err = transaction.GetAccount(ctx, accountId)
if err != nil {
return err
}
if account.IsDomainPrimaryAccount {
return nil
}
existingPrimaryAccountID, err := transaction.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain)
// error is not a not found error
if handleNotFound(err) != nil {
return err
}
// a primary account already exists for this private domain
if err == nil {
log.WithContext(ctx).WithFields(log.Fields{
"accountId": accountId,
"existingAccountId": existingPrimaryAccountID,
}).Errorf("cannot update account to primary, another account already exists as primary for the same domain")
return status.Errorf(status.Internal, "cannot update account to primary")
}
account.IsDomainPrimaryAccount = true
if err := transaction.SaveAccount(ctx, account); err != nil {
log.WithContext(ctx).WithFields(log.Fields{
"accountId": accountId,
}).Errorf("failed to update account to primary: %v", err)
return status.Errorf(status.Internal, "failed to update account to primary")
}
return nil
})
if err != nil {
return nil, err
}
if account.IsDomainPrimaryAccount {
return account, nil
}
existingPrimaryAccountID, err := am.Store.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain)
// error is not a not found error
if handleNotFound(err) != nil {
return nil, err
}
// a primary account already exists for this private domain
if err == nil {
log.WithContext(ctx).WithFields(log.Fields{
"accountId": accountId,
"existingAccountId": existingPrimaryAccountID,
}).Errorf("cannot update account to primary, another account already exists as primary for the same domain")
return nil, status.Errorf(status.Internal, "cannot update account to primary")
}
account.IsDomainPrimaryAccount = true
if err := am.Store.SaveAccount(ctx, account); err != nil {
log.WithContext(ctx).WithFields(log.Fields{
"accountId": accountId,
}).Errorf("failed to update account to primary: %v", err)
return nil, status.Errorf(status.Internal, "failed to update account to primary")
}
return account, nil
}

View File

@@ -17,6 +17,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
nbdns "github.com/netbirdio/netbird/dns"
@@ -1186,7 +1187,10 @@ func TestAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1213,7 +1217,10 @@ func TestAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
@@ -1249,7 +1256,10 @@ func TestAccountManager_NetworkUpdates_SavePolicy(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 2 {
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
@@ -1314,7 +1324,10 @@ func TestAccountManager_NetworkUpdates_DeletePeer(t *testing.T) {
go func() {
defer wg.Done()
message := <-updMsg
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 1 {
t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers))
@@ -1365,15 +1378,24 @@ func TestAccountManager_NetworkUpdates_DeleteGroup(t *testing.T) {
return
}
// emptying buffer of previous changes
_, _ = updMsg.Pop(context.Background())
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
message := <-updMsg
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
// expecting 2 messages (policy delete and group delete)
for i := 0; i < 1; i++ {
message, ok := updMsg.Pop(context.Background())
if !ok {
t.Errorf("failed to receive update message")
}
networkMap := message.Update.GetNetworkMap()
if len(networkMap.RemotePeers) != 0 {
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
}
}
}()
@@ -2879,7 +2901,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
if err != nil {
return nil, err
}
@@ -2960,23 +2982,58 @@ func setupNetworkMapTest(t *testing.T) (*DefaultAccountManager, *types.Account,
return manager, account, peer1, peer2, peer3
}
func peerShouldNotReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) {
func peerShouldNotReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
t.Helper()
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
select {
case msg := <-updateMessage:
case msg := <-resultCh:
if !msg.ok {
t.Errorf("Update message channel closed unexpectedly")
}
t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond):
return
}
}
func peerShouldReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) {
func peerShouldReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
t.Helper()
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
select {
case msg := <-updateMessage:
if msg == nil {
case msg := <-resultCh:
if !msg.ok {
t.Errorf("Update message channel closed unexpectedly")
return
}
if msg.msg == nil {
t.Errorf("Received nil update message, expected valid message")
return
}
case <-time.After(500 * time.Millisecond):
t.Error("Timed out waiting for update message")
@@ -3017,9 +3074,13 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels
@@ -3085,9 +3146,13 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels
@@ -3160,9 +3225,13 @@ func BenchmarkLoginPeer_NewPeer(b *testing.B) {
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels

View File

@@ -217,7 +217,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
}
func createDNSStore(t *testing.T) (store.Store, error) {
@@ -507,13 +507,12 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Saving DNS settings with groups that have no peers should not trigger updates to account peers or send peer updates
t.Run("saving dns setting with unused groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -534,6 +533,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Creating DNS settings with groups that have no peers should not update account peers or send peer update
t.Run("creating dns setting with unused groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -560,6 +563,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Creating DNS settings with groups that have peers should update account peers and send peer update
t.Run("creating dns setting with used groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
ID: "groupA",
Name: "GroupA",
@@ -567,6 +574,8 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
_, _ = updMsg.Pop(context.Background())
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -593,6 +602,18 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Saving DNS settings with groups that have peers should update account peers and send peer update
t.Run("saving dns setting with used groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
ID: "groupA",
Name: "GroupA",
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
}, true)
assert.NoError(t, err)
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -613,6 +634,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Removing group with no peers from DNS settings should not trigger updates to account peers or send peer updates
t.Run("removing group with no peers from dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -633,6 +658,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
// Removing group with peers from DNS settings should trigger updates to account peers and send peer updates
t.Run("removing group with peers from dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)

View File

@@ -429,13 +429,12 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Saving a group that is not linked to any resource should not update account peers
t.Run("saving unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -459,6 +458,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Adding a peer to a group that is not linked to any resource should not update account peers
// and not send peer update
t.Run("adding peer to unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -478,6 +481,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Removing a peer from a group that is not linked to any resource should not update account peers
// and not send peer update
t.Run("removing peer from unliked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -496,6 +503,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Deleting group should not update account peers and not send peer update
t.Run("deleting group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -529,6 +540,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to policy should update account peers and send peer update
t.Run("saving linked group to policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -551,6 +566,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// adding peer to a used group should update account peers and send peer update
t.Run("adding peer to linked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -569,6 +588,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// removing peer from a linked group should update account peers and send peer update
t.Run("removing peer from linked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -587,6 +610,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to name server group should update account peers and send peer update
t.Run("saving group linked to name server group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.CreateNameServerGroup(
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
IP: netip.MustParseAddr("1.1.1.1"),
@@ -620,6 +647,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to route should update account peers and send peer update
t.Run("saving group linked to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
newRoute := route.Route{
ID: "route",
Network: netip.MustParsePrefix("192.168.0.0/16"),
@@ -661,6 +692,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to dns settings should update account peers and send peer update
t.Run("saving group linked to dns settings", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
err := manager.SaveDNSSettings(context.Background(), account.Id, userID, &types.DNSSettings{
DisabledManagementGroups: []string{"groupD"},
})
@@ -688,6 +723,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
// Saving a group linked to network router should update account peers and send peer update
t.Run("saving group linked to network router", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
permissionsManager := permissions.NewManager(manager.Store)
groupsManager := groups.NewManager(manager.Store, permissionsManager, manager)
resourcesManager := resources.NewManager(manager.Store, permissionsManager, groupsManager, manager)

View File

@@ -184,7 +184,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
return err
}
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
updateBuffer := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
s.ephemeralManager.OnPeerConnected(ctx, peer)
@@ -199,37 +199,24 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
return s.handleUpdates(ctx, accountID, peerKey, peer, updateBuffer, srv)
}
// handleUpdates sends updates to the connected peer until the updates channel is closed.
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error {
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates *UpdateBuffer, srv proto.ManagementService_SyncServer) error {
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
for {
select {
// condition when there are some updates
case update, open := <-updates:
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1)
}
if !open {
log.WithContext(ctx).Debugf("updates channel for peer %s was closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer)
return nil
}
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
// condition when client <-> server connection has been terminated
case <-srv.Context().Done():
// happens when connection drops, e.g. client disconnects
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
update, ok := updates.Pop(ctx)
if !ok {
log.WithContext(ctx).Debugf("update buffer for peer %s closed", peerKey.String())
s.cancelPeerRoutines(ctx, accountID, peer)
return srv.Context().Err()
return nil
}
log.WithContext(ctx).Debugf("sending latest update to peer %s", peerKey.String())
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
return err
}
}
}

View File

@@ -118,7 +118,7 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
t.Fatalf("Failed to create metrics: %v", err)
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
peersUpdateManager := server.NewPeersUpdateManager(metrics)
updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId)
done := make(chan struct{})
if validateUpdate {
@@ -166,24 +166,54 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
return apiHandler, am, done
}
func peerShouldNotReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage) {
func peerShouldNotReceiveUpdate(t TB, buffer *server.UpdateBuffer) {
t.Helper()
resultCh := make(chan struct {
msg *server.UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *server.UpdateMessage
ok bool
}{msg, ok}
}()
select {
case msg := <-updateMessage:
case msg := <-resultCh:
t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond):
return
}
}
func peerShouldReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage, expected *server.UpdateMessage) {
func peerShouldReceiveUpdate(t TB, buffer *server.UpdateBuffer, expected *server.UpdateMessage) {
t.Helper()
resultCh := make(chan struct {
msg *server.UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *server.UpdateMessage
ok bool
}{msg, ok}
}()
select {
case msg := <-updateMessage:
if msg == nil {
case msg := <-resultCh:
if msg.msg == nil {
t.Errorf("Received nil update message, expected valid message")
}
if !msg.ok {
t.Errorf("Expected to receive an update message, but got ok = false")
}
assert.Equal(t, expected, msg)
case <-time.After(500 * time.Millisecond):
t.Errorf("Timed out waiting for update message")

View File

@@ -37,21 +37,23 @@ func (am *DefaultAccountManager) UpdateIntegratedValidatorGroups(ctx context.Con
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock()
a, err := am.Store.GetAccountByUser(ctx, userID)
if err != nil {
return err
}
return am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
a, err := transaction.GetAccountByUser(ctx, userID)
if err != nil {
return err
}
var extra *types.ExtraSettings
var extra *types.ExtraSettings
if a.Settings.Extra != nil {
extra = a.Settings.Extra
} else {
extra = &types.ExtraSettings{}
a.Settings.Extra = extra
}
extra.IntegratedValidatorGroups = groups
return am.Store.SaveAccount(ctx, a)
if a.Settings.Extra != nil {
extra = a.Settings.Extra
} else {
extra = &types.ExtraSettings{}
a.Settings.Extra = extra
}
extra.IntegratedValidatorGroups = groups
return transaction.SaveAccount(ctx, a)
})
}
func (am *DefaultAccountManager) GroupValidation(ctx context.Context, accountID string, groupIDs []string) (bool, error) {
@@ -81,15 +83,12 @@ func (am *DefaultAccountManager) GetValidatedPeers(ctx context.Context, accountI
var peers []*nbpeer.Peer
var settings *types.Settings
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
groups, err = transaction.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return err
}
groups, err = am.Store.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
if err != nil {
return nil, err
}
peers, err = transaction.GetAccountPeers(ctx, store.LockingStrengthShare, accountID, "", "")
return err
})
peers, err = am.Store.GetAccountPeers(ctx, store.LockingStrengthShare, accountID, "", "")
if err != nil {
return nil, err
}

View File

@@ -424,14 +424,14 @@ func startManagementForTest(t *testing.T, testFile string, config *types.Config)
t.Fatal(err)
}
peersUpdateManager := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
peersUpdateManager := NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
require.NoError(t, err)
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -173,14 +173,14 @@ func startServer(
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
eventStore := &activity.InMemoryEventStore{}
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed creating metrics: %v", err)
}
peersUpdateManager := server.NewPeersUpdateManager(metrics)
eventStore := &activity.InMemoryEventStore{}
ctrl := gomock.NewController(t)
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)

View File

@@ -184,7 +184,9 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
ephemeralPeersSKs int
ephemeralPeersSKUsage int
activePeersLastDay int
activeUserPeersLastDay int
osPeers map[string]int
activeUsersLastDay map[string]struct{}
userPeers int
rules int
rulesProtocol map[string]int
@@ -203,6 +205,7 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
version string
peerActiveVersions []string
osUIClients map[string]int
rosenpassEnabled int
)
start := time.Now()
metricsProperties := make(properties)
@@ -210,6 +213,7 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
osUIClients = make(map[string]int)
rulesProtocol = make(map[string]int)
rulesDirection = make(map[string]int)
activeUsersLastDay = make(map[string]struct{})
uptime = time.Since(w.startupTime).Seconds()
connections := w.connManager.GetAllConnectedPeers()
version = nbversion.NetbirdVersion()
@@ -277,10 +281,14 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
for _, peer := range account.Peers {
peers++
if peer.SSHEnabled {
if peer.SSHEnabled || peer.Meta.Flags.ServerSSHAllowed {
peersSSHEnabled++
}
if peer.Meta.Flags.RosenpassEnabled {
rosenpassEnabled++
}
if peer.UserID != "" {
userPeers++
}
@@ -299,6 +307,10 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
_, connected := connections[peer.ID]
if connected || peer.Status.LastSeen.After(w.lastRun) {
activePeersLastDay++
if peer.UserID != "" {
activeUserPeersLastDay++
activeUsersLastDay[peer.UserID] = struct{}{}
}
osActiveKey := osKey + "_active"
osActiveCount := osPeers[osActiveKey]
osPeers[osActiveKey] = osActiveCount + 1
@@ -320,6 +332,8 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
metricsProperties["ephemeral_peers_setup_keys"] = ephemeralPeersSKs
metricsProperties["ephemeral_peers_setup_keys_usage"] = ephemeralPeersSKUsage
metricsProperties["active_peers_last_day"] = activePeersLastDay
metricsProperties["active_user_peers_last_day"] = activeUserPeersLastDay
metricsProperties["active_users_last_day"] = len(activeUsersLastDay)
metricsProperties["user_peers"] = userPeers
metricsProperties["rules"] = rules
metricsProperties["rules_with_src_posture_checks"] = rulesWithSrcPostureChecks
@@ -338,6 +352,7 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
metricsProperties["ui_clients"] = uiClient
metricsProperties["idp_manager"] = w.idpManager
metricsProperties["store_engine"] = w.dataSource.GetStoreEngine()
metricsProperties["rosenpass_enabled"] = rosenpassEnabled
for protocol, count := range rulesProtocol {
metricsProperties["rules_protocol_"+protocol] = count

View File

@@ -47,8 +47,8 @@ func (mockDatasource) GetAllAccounts(_ context.Context) []*types.Account {
"1": {
ID: "1",
UserID: "test",
SSHEnabled: true,
Meta: nbpeer.PeerSystemMeta{GoOS: "linux", WtVersion: "0.0.1"},
SSHEnabled: false,
Meta: nbpeer.PeerSystemMeta{GoOS: "linux", WtVersion: "0.0.1", Flags: nbpeer.Flags{ServerSSHAllowed: true, RosenpassEnabled: true}},
},
},
Policies: []*types.Policy{
@@ -312,7 +312,19 @@ func TestGenerateProperties(t *testing.T) {
}
if properties["posture_checks"] != 2 {
t.Errorf("expected 1 posture_checks, got %d", properties["posture_checks"])
t.Errorf("expected 2 posture_checks, got %d", properties["posture_checks"])
}
if properties["rosenpass_enabled"] != 1 {
t.Errorf("expected 1 rosenpass_enabled, got %d", properties["rosenpass_enabled"])
}
if properties["active_user_peers_last_day"] != 2 {
t.Errorf("expected 2 active_user_peers_last_day, got %d", properties["active_user_peers_last_day"])
}
if properties["active_users_last_day"] != 1 {
t.Errorf("expected 1 active_users_last_day, got %d", properties["active_users_last_day"])
}
}

View File

@@ -779,7 +779,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
t.Cleanup(ctrl.Finish)
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
}
func createNSStore(t *testing.T) (store.Store, error) {
@@ -988,14 +988,14 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Creating a nameserver group with a distribution group no peers should not update account peers
// and not send peer update
t.Run("creating nameserver group with distribution group no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1023,6 +1023,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// saving a nameserver group with a distribution group with no peers should not update account peers
// and not send peer update
t.Run("saving nameserver group with distribution group no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1041,6 +1046,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// Creating a nameserver group with a distribution group no peers should update account peers and send peer update
t.Run("creating nameserver group with distribution group has peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1067,6 +1077,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// saving a nameserver group with a distribution group with peers should update account peers and send peer update
t.Run("saving nameserver group with distribution group has peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1097,6 +1111,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
// Deleting a nameserver group should update account peers and send peer update
t.Run("deleting nameserver group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)

View File

@@ -144,6 +144,10 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
if expired {
// we need to update other peers because when peer login expires all other peers are notified to disconnect from
// the expired one. Here we notify them that connection is now allowed again.
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
am.BufferUpdateAccountPeers(ctx, accountID)
}
@@ -270,6 +274,11 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
inactivityExpirationChanged = true
}
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return err
}
return transaction.SavePeer(ctx, store.LockingStrengthUpdate, accountID, peer)
})
if err != nil {
@@ -755,6 +764,13 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
return err
}
}
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
if err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID); err != nil {
return err
}
}
return nil
})
if err != nil {
@@ -888,6 +904,13 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
}
}
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
return fmt.Errorf("failed to increment network serial: %w", err)
}
}
return nil
})
if err != nil {
@@ -1169,7 +1192,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return
}
start := time.Now()
globalStart := time.Now()
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
if err != nil {
@@ -1204,18 +1227,27 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
defer wg.Done()
defer func() { <-semaphore }()
start := time.Now()
postureChecks, err := am.getPeerPostureChecks(account, p.ID)
if err != nil {
log.WithContext(ctx).Debugf("failed to get posture checks for peer %s: %v", peer.ID, err)
return
}
am.metrics.UpdateChannelMetrics().CountCalcPostureChecksDuration(time.Since(start))
start = time.Now()
remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, p.ID, customZone, approvedPeersMap, resourcePolicies, routers, am.metrics.AccountManagerMetrics())
am.metrics.UpdateChannelMetrics().CountCalcPeerNetworkMapDuration(time.Since(start))
start = time.Now()
proxyNetworkMap, ok := proxyNetworkMaps[p.ID]
if ok {
remotePeerNetworkMap.Merge(proxyNetworkMap)
}
am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start))
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
if err != nil {
@@ -1223,7 +1255,10 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
return
}
start = time.Now()
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting)
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
}(peer)
}
@@ -1232,7 +1267,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
wg.Wait()
if am.metrics != nil {
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(start))
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
}
}

View File

@@ -19,6 +19,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/metric/noop"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
@@ -963,10 +964,14 @@ func BenchmarkUpdateAccountPeers(b *testing.B) {
b.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
b.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels
@@ -1028,17 +1033,24 @@ func TestUpdateAccountPeers(t *testing.T) {
t.Fatalf("Failed to get account: %v", err)
}
peerChannels := make(map[string]chan *UpdateMessage)
peerChannels := make(map[string]*UpdateBuffer)
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
if err != nil {
t.Fatalf("Failed to create update channel metrics: %v", err)
}
for peerID := range account.Peers {
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
peerChannels[peerID] = NewUpdateBuffer(metrics)
}
manager.peersUpdateManager.peerChannels = peerChannels
manager.UpdateAccountPeers(ctx, account.Id)
for _, channel := range peerChannels {
update := <-channel
update, ok := channel.Pop(context.Background())
if !ok {
t.Fatalf("Expected update for peer, but channel is empty")
}
assert.Nil(t, update.Update.NetbirdConfig)
assert.Equal(t, tc.peers, len(update.NetworkMap.Peers))
assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules))
@@ -1267,7 +1279,7 @@ func Test_RegisterPeerByUser(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1342,7 +1354,7 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1477,7 +1489,7 @@ func Test_RegisterPeerRollbackOnFailure(t *testing.T) {
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1546,7 +1558,7 @@ func Test_LoginPeer(t *testing.T) {
settingsMockManager := settings.NewMockManager(ctrl)
permissionsManager := permissions.NewManager(s)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
assert.NoError(t, err)
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
@@ -1734,13 +1746,12 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
var peer5 *nbpeer.Peer
var peer6 *nbpeer.Peer
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
// Updating not expired peer and peer expiration is enabled should not update account peers and not send peer update
t.Run("updating not expired peer and peer expiration is enabled", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1759,6 +1770,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to unlinked group should not update account peers and not send peer update
t.Run("adding peer to unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1784,6 +1799,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with unlinked group should not update account peers and not send peer update
t.Run("deleting peer with unlinked group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1802,6 +1821,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Updating peer label should update account peers and send peer update
t.Run("updating peer label", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1820,6 +1843,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
})
t.Run("validator requires update", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
requireUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
return update, true, nil
}
@@ -1842,6 +1869,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
})
t.Run("validator requires no update", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
requireNoUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
return update, false, nil
}
@@ -1865,6 +1896,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with policy should update account peers and send peer update
t.Run("adding peer to group linked with policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
AccountID: account.Id,
Enabled: true,
@@ -1906,6 +1941,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to policy should update account peers and send peer update
t.Run("deleting peer with linked group to policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1924,6 +1963,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with route should update account peers and send peer update
t.Run("adding peer to group linked with route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
route := nbroute.Route{
ID: "testingRoute1",
Network: netip.MustParsePrefix("100.65.250.202/32"),
@@ -1970,6 +2013,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to route should update account peers and send peer update
t.Run("deleting peer with linked group to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1988,6 +2035,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Adding peer to group linked with name server group should update account peers and send peer update
t.Run("adding peer to group linked with name server group", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.CreateNameServerGroup(
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
IP: netip.MustParseAddr("1.1.1.1"),
@@ -2025,6 +2076,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
// Deleting peer with linked group to name server group should update account peers and send peer update
t.Run("deleting peer with linked group to route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)

View File

@@ -1017,17 +1017,16 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
var policyWithGroupRulesNoPeers *types.Policy
var policyWithDestinationPeersOnly *types.Policy
var policyWithSourceAndDestinationPeers *types.Policy
// Saving policy with rule groups with no peers should not update account's peers and not send peer update
t.Run("saving policy with rule groups with no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1059,6 +1058,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with source group containing peers, but destination group without peers should
// update account's peers and send peer update
t.Run("saving policy where source has peers but destination does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1091,6 +1094,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with destination group containing peers, but source group without peers should
// update account's peers and send peer update
t.Run("saving policy where destination has peers but source does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1123,6 +1130,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Saving policy with destination and source groups containing peers should update account's peers
// and send peer update
t.Run("saving policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1154,6 +1165,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Disabling policy with destination and source groups containing peers should update account's peers
// and send peer update
t.Run("disabling policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1174,6 +1189,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Updating disabled policy with destination and source groups containing peers should not update account's peers
// or send peer update
t.Run("updating disabled policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -1195,6 +1214,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Enabling policy with destination and source groups containing peers should update account's peers
// and send peer update
t.Run("enabling policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1214,6 +1237,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy should trigger account peers update and send peer update
t.Run("deleting policy with source and destination groups with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1234,6 +1261,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy with destination group containing peers, but source group without peers should
// update account's peers and send peer update
t.Run("deleting policy where destination has peers but source does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -1252,6 +1283,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
// Deleting policy with no peers in groups should not update account's peers and not send peer update
t.Run("deleting policy with no peers in groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)

View File

@@ -140,11 +140,6 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
postureCheckA := &posture.Checks{
Name: "postureCheckA",
AccountID: account.Id,
@@ -171,6 +166,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Saving unused posture check should not update account peers and not send peer update
t.Run("saving unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -189,6 +188,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating unused posture check should not update account peers and not send peer update
t.Run("updating unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -226,6 +229,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Linking posture check to policy should trigger update account peers and send peer update
t.Run("linking posture check to policy with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -244,6 +251,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture checks should update account peers and send peer update
t.Run("updating linked to posture check with peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
postureCheckB.Checks = posture.ChecksDefinition{
NBVersionCheck: &posture.NBVersionCheck{
MinVersion: "0.29.0",
@@ -273,6 +284,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Removing posture check from policy should trigger account peers update and send peer update
t.Run("removing posture check from policy", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -292,6 +307,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Deleting unused posture check should not trigger account peers update and not send peer update
t.Run("deleting unused posture check", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
done := make(chan struct{})
go func() {
peerShouldNotReceiveUpdate(t, updMsg)
@@ -313,6 +332,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture check to policy with no peers should not trigger account peers update and not send peer update
t.Run("updating linked posture check to policy with no peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
Enabled: true,
Rules: []*types.PolicyRule{
@@ -352,7 +375,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked posture check to policy where destination has peers but source does not
// should trigger account peers update and send peer update
t.Run("updating linked posture check to policy where destination has peers but source does not", func(t *testing.T) {
updMsg1 := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer2.ID)
})
@@ -374,7 +397,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg1)
peerShouldReceiveUpdate(t, updMsg)
close(done)
}()
@@ -396,6 +419,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
// Updating linked client posture check to policy where source has peers but destination does not,
// should trigger account peers update and send peer update
t.Run("updating linked posture check to policy where source has peers but destination does not", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
})
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
Enabled: true,
Rules: []*types.PolicyRule{

View File

@@ -1284,7 +1284,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
permissionsManager := permissions.NewManager(store)
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
}
func createRouterStore(t *testing.T) (store.Store, error) {
@@ -1972,13 +1972,12 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
}, true)
assert.NoError(t, err)
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
// Creating a route with no routing peer and no peers in PeerGroups or Groups should not update account peers and not send peer update
t.Run("creating route no routing peer and no peers in groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
route := route.Route{
ID: "testingRoute1",
Network: netip.MustParsePrefix("100.65.250.202/32"),
@@ -2015,6 +2014,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Creating a route with no routing peer and having peers in groups should update account peers and send peer update
t.Run("creating a route with peers in PeerGroups and Groups", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
route := route.Route{
ID: "testingRoute2",
Network: netip.MustParsePrefix("192.0.2.0/32"),
@@ -2064,6 +2067,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Creating route should update account peers and send peer update
t.Run("creating route with a routing peer", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -2089,6 +2096,11 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
t.Run("updating route", func(t *testing.T) {
baseRoute.Groups = []string{routeGroup1, routeGroup2}
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -2107,6 +2119,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Deleting the route should update account peers and send peer update
t.Run("deleting route", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
done := make(chan struct{})
go func() {
peerShouldReceiveUpdate(t, updMsg)
@@ -2125,6 +2141,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Adding peer to route peer groups that do not have any peers should update account peers and send peer update
t.Run("adding peer to route peer groups that do not have any peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
newRoute := route.Route{
Network: netip.MustParsePrefix("192.168.12.0/16"),
NetID: "superNet",
@@ -2165,6 +2185,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
// Adding peer to route groups that do not have any peers should update account peers and send peer update
t.Run("adding peer to route groups that do not have any peers", func(t *testing.T) {
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
t.Cleanup(func() {
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
})
newRoute := route.Route{
Network: netip.MustParsePrefix("192.168.13.0/16"),
NetID: "superNet",

View File

@@ -18,6 +18,13 @@ type UpdateChannelMetrics struct {
getAllConnectedPeersDurationMicro metric.Int64Histogram
getAllConnectedPeers metric.Int64Histogram
hasChannelDurationMicro metric.Int64Histogram
calcPostureChecksDurationMicro metric.Int64Histogram
calcPeerNetworkMapDurationMs metric.Int64Histogram
mergeNetworkMapDurationMicro metric.Int64Histogram
toSyncResponseDurationMicro metric.Int64Histogram
bufferPushCounter metric.Int64Counter
bufferOverwriteCounter metric.Int64Counter
bufferIgnoreCounter metric.Int64Counter
ctx context.Context
}
@@ -89,6 +96,59 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
return nil, err
}
calcPostureChecksDurationMicro, err := meter.Int64Histogram("management.updatechannel.calc.posturechecks.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to get the posture checks for a peer"),
)
if err != nil {
return nil, err
}
calcPeerNetworkMapDurationMs, err := meter.Int64Histogram("management.updatechannel.calc.networkmap.duration.ms",
metric.WithUnit("milliseconds"),
metric.WithDescription("Duration of how long it takes to calculate the network map for a peer"),
)
if err != nil {
return nil, err
}
mergeNetworkMapDurationMicro, err := meter.Int64Histogram("management.updatechannel.merge.networkmap.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to merge the network maps for a peer"),
)
if err != nil {
return nil, err
}
toSyncResponseDurationMicro, err := meter.Int64Histogram("management.updatechannel.tosyncresponse.duration.micro",
metric.WithUnit("microseconds"),
metric.WithDescription("Duration of how long it takes to convert the network map to sync response"),
)
if err != nil {
return nil, err
}
bufferPushCounter, err := meter.Int64Counter("management.updatechannel.buffer.push.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates pushed to an empty buffer"))
if err != nil {
return nil, err
}
bufferOverwriteCounter, err := meter.Int64Counter("management.updatechannel.buffer.overwrite.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates overwriting old unsent updates in the buffer"))
if err != nil {
return nil, err
}
bufferIgnoreCounter, err := meter.Int64Counter("management.updatechannel.buffer.ignore.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of updates being ignored due to old network serial"))
if err != nil {
return nil, err
}
return &UpdateChannelMetrics{
createChannelDurationMicro: createChannelDurationMicro,
closeChannelDurationMicro: closeChannelDurationMicro,
@@ -98,6 +158,13 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
getAllConnectedPeers: getAllConnectedPeers,
hasChannelDurationMicro: hasChannelDurationMicro,
calcPostureChecksDurationMicro: calcPostureChecksDurationMicro,
calcPeerNetworkMapDurationMs: calcPeerNetworkMapDurationMs,
mergeNetworkMapDurationMicro: mergeNetworkMapDurationMicro,
toSyncResponseDurationMicro: toSyncResponseDurationMicro,
bufferPushCounter: bufferPushCounter,
bufferOverwriteCounter: bufferOverwriteCounter,
bufferIgnoreCounter: bufferIgnoreCounter,
ctx: ctx,
}, nil
}
@@ -137,3 +204,34 @@ func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration
func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) {
metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
func (metrics *UpdateChannelMetrics) CountCalcPostureChecksDuration(duration time.Duration) {
metrics.calcPostureChecksDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
func (metrics *UpdateChannelMetrics) CountCalcPeerNetworkMapDuration(duration time.Duration) {
metrics.calcPeerNetworkMapDurationMs.Record(metrics.ctx, duration.Milliseconds())
}
func (metrics *UpdateChannelMetrics) CountMergeNetworkMapDuration(duration time.Duration) {
metrics.mergeNetworkMapDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
func (metrics *UpdateChannelMetrics) CountToSyncResponseDuration(duration time.Duration) {
metrics.toSyncResponseDurationMicro.Record(metrics.ctx, duration.Microseconds())
}
// CountBufferPush counts how many buffer push operations are happening on an empty buffer
func (metrics *UpdateChannelMetrics) CountBufferPush() {
metrics.bufferPushCounter.Add(metrics.ctx, 1)
}
// CountBufferOverwrite counts how many buffer overwrite operations are happening on a non-empty buffer
func (metrics *UpdateChannelMetrics) CountBufferOverwrite() {
metrics.bufferOverwriteCounter.Add(metrics.ctx, 1)
}
// CountBufferIgnore counts how many buffer ignore operations are happening when a new update is pushed
func (metrics *UpdateChannelMetrics) CountBufferIgnore() {
metrics.bufferIgnoreCounter.Add(metrics.ctx, 1)
}

View File

@@ -15,6 +15,7 @@ import (
"github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/util"
)
@@ -29,7 +30,11 @@ var TurnTestHost = &types.Host{
func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
secret := "some_secret"
peersManager := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
rc := &types.Relay{
Addresses: []string{"localhost:0"},
@@ -77,9 +82,25 @@ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
ttl := util.Duration{Duration: 2 * time.Second}
secret := "some_secret"
peersManager := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
peer := "some_peer"
updateChannel := peersManager.CreateChannel(context.Background(), peer)
buffer := peersManager.CreateChannel(context.Background(), peer)
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
msg, ok := buffer.Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}()
rc := &types.Relay{
Addresses: []string{"localhost:0"},
@@ -117,8 +138,8 @@ func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
loop:
for timeout := time.After(5 * time.Second); ; {
select {
case update := <-updateChannel:
updates = append(updates, update)
case update := <-resultCh:
updates = append(updates, update.msg)
case <-timeout:
break loop
}
@@ -181,7 +202,11 @@ loop:
func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) {
ttl := util.Duration{Duration: time.Hour}
secret := "some_secret"
peersManager := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersManager := NewPeersUpdateManager(metrics)
peer := "some_peer"
rc := &types.Relay{

View File

@@ -0,0 +1,93 @@
package server
import (
"context"
"sync"
"github.com/netbirdio/netbird/management/server/telemetry"
)
type UpdateBuffer struct {
mu sync.Mutex
cond *sync.Cond
update *UpdateMessage
closed bool
metrics *telemetry.UpdateChannelMetrics
}
func NewUpdateBuffer(metrics *telemetry.UpdateChannelMetrics) *UpdateBuffer {
ub := &UpdateBuffer{metrics: metrics}
ub.cond = sync.NewCond(&ub.mu)
return ub
}
func (b *UpdateBuffer) Push(update *UpdateMessage) {
b.mu.Lock()
defer b.mu.Unlock()
if b.update != nil && update.Update.NetbirdConfig != nil {
if update.Update.NetbirdConfig.Relay != nil {
b.update.Update.NetbirdConfig.Relay = update.Update.NetbirdConfig.Relay
}
if update.Update.NetbirdConfig.Signal != nil {
b.update.Update.NetbirdConfig.Signal = update.Update.NetbirdConfig.Signal
}
if update.Update.NetbirdConfig.Flow != nil {
b.update.Update.NetbirdConfig.Flow = update.Update.NetbirdConfig.Flow
}
if update.Update.NetbirdConfig.Stuns != nil {
b.update.Update.NetbirdConfig.Stuns = update.Update.NetbirdConfig.Stuns
}
if update.Update.NetbirdConfig.Turns != nil {
b.update.Update.NetbirdConfig.Turns = update.Update.NetbirdConfig.Turns
}
}
// the equal case we need because we don't always increment the serial number
if b.update == nil || update.Update.NetworkMap.Serial > b.update.Update.NetworkMap.Serial || b.update.Update.NetworkMap.Serial == 0 {
b.update = update
b.cond.Signal()
if b.update == nil {
b.metrics.CountBufferPush()
return
}
b.metrics.CountBufferOverwrite()
return
}
b.metrics.CountBufferIgnore()
}
func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, bool) {
b.mu.Lock()
defer b.mu.Unlock()
for b.update == nil && !b.closed {
waitCh := make(chan struct{})
go func() {
select {
case <-ctx.Done():
b.cond.Broadcast()
case <-waitCh:
// noop
}
}()
b.cond.Wait()
close(waitCh)
}
if b.closed {
return nil, false
}
msg := b.update
b.update = nil
return msg, true
}
func (b *UpdateBuffer) Close() {
b.mu.Lock()
b.closed = true
b.cond.Broadcast()
b.mu.Unlock()
}

View File

@@ -12,8 +12,6 @@ import (
"github.com/netbirdio/netbird/management/server/types"
)
const channelBufferSize = 100
type UpdateMessage struct {
Update *proto.SyncResponse
NetworkMap *types.NetworkMap
@@ -21,7 +19,7 @@ type UpdateMessage struct {
type PeersUpdateManager struct {
// peerChannels is an update channel indexed by Peer.ID
peerChannels map[string]chan *UpdateMessage
peerChannels map[string]*UpdateBuffer
// channelsMux keeps the mutex to access peerChannels
channelsMux *sync.RWMutex
// metrics provides method to collect application metrics
@@ -31,7 +29,7 @@ type PeersUpdateManager struct {
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
return &PeersUpdateManager{
peerChannels: make(map[string]chan *UpdateMessage),
peerChannels: make(map[string]*UpdateBuffer),
channelsMux: &sync.RWMutex{},
metrics: metrics,
}
@@ -53,20 +51,14 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
if channel, ok := p.peerChannels[peerID]; ok {
found = true
select {
case channel <- update:
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
default:
dropped = true
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
}
channel.Push(update)
} else {
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
}
}
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *UpdateBuffer {
start := time.Now()
closed := false
@@ -81,22 +73,22 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
if channel, ok := p.peerChannels[peerID]; ok {
closed = true
channel.Close()
delete(p.peerChannels, peerID)
close(channel)
}
// mbragin: todo shouldn't it be more? or configurable?
channel := make(chan *UpdateMessage, channelBufferSize)
p.peerChannels[peerID] = channel
buffer := NewUpdateBuffer(p.metrics.UpdateChannelMetrics())
p.peerChannels[peerID] = buffer
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
return channel
return buffer
}
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
if channel, ok := p.peerChannels[peerID]; ok {
delete(p.peerChannels, peerID)
close(channel)
channel.Close()
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
return

View File

@@ -6,13 +6,19 @@ import (
"time"
"github.com/netbirdio/netbird/management/proto"
"github.com/netbirdio/netbird/management/server/telemetry"
)
// var peersUpdater *PeersUpdateManager
func TestCreateChannel(t *testing.T) {
peer := "test-create"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
defer peersUpdater.CloseChannel(context.Background(), peer)
_ = peersUpdater.CreateChannel(context.Background(), peer)
@@ -23,7 +29,12 @@ func TestCreateChannel(t *testing.T) {
func TestSendUpdate(t *testing.T) {
peer := "test-sendupdate"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
update1 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{
Serial: 0,
@@ -33,41 +44,62 @@ func TestSendUpdate(t *testing.T) {
if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel")
}
resultCh := make(chan struct {
msg *UpdateMessage
ok bool
}, 1)
go func() {
for {
msg, ok := peersUpdater.peerChannels[peer].Pop(context.Background())
resultCh <- struct {
msg *UpdateMessage
ok bool
}{msg, ok}
}
}()
peersUpdater.SendUpdate(context.Background(), peer, update1)
select {
case <-peersUpdater.peerChannels[peer]:
default:
case <-resultCh:
case <-time.After(1 * time.Second):
t.Error("Update wasn't send")
}
for range [channelBufferSize]int{} {
peersUpdater.SendUpdate(context.Background(), peer, update1)
}
update2 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{
Serial: 10,
},
}}
update3 := &UpdateMessage{Update: &proto.SyncResponse{
NetworkMap: &proto.NetworkMap{
Serial: 8,
},
}}
peersUpdater.SendUpdate(context.Background(), peer, update2)
timeout := time.After(5 * time.Second)
for range [channelBufferSize]int{} {
select {
case <-timeout:
t.Error("timed out reading previously sent updates")
case updateReader := <-peersUpdater.peerChannels[peer]:
if updateReader.Update.NetworkMap.Serial == update2.Update.NetworkMap.Serial {
t.Error("got the update that shouldn't have been sent")
}
select {
case <-timeout:
t.Error("timed out reading previously sent updates")
case updateReader := <-resultCh:
if updateReader.msg.Update.NetworkMap.Serial == update3.Update.NetworkMap.Serial {
t.Error("got the update that shouldn't have been sent")
}
}
}
func TestCloseChannel(t *testing.T) {
peer := "test-close"
peersUpdater := NewPeersUpdateManager(nil)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("failed to create metrics: %v", err)
}
peersUpdater := NewPeersUpdateManager(metrics)
_ = peersUpdater.CreateChannel(context.Background(), peer)
if _, ok := peersUpdater.peerChannels[peer]; !ok {
t.Error("Error creating the channel")

View File

@@ -960,6 +960,12 @@ func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, accou
)
}
// ideally this should run in a transaction
err = am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
if err != nil {
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
}
if len(peerIDs) != 0 {
// this will trigger peer disconnect from the management service
am.peersUpdateManager.CloseChannels(ctx, peerIDs)

View File

@@ -21,6 +21,7 @@ var (
// Update fetch the version info periodically and notify the onUpdateListener in case the UI version or the
// daemon version are deprecated
type Update struct {
httpAgent string
uiVersion *goversion.Version
daemonVersion *goversion.Version
latestAvailable *goversion.Version
@@ -34,7 +35,7 @@ type Update struct {
}
// NewUpdate instantiate Update and start to fetch the new version information
func NewUpdate() *Update {
func NewUpdate(httpAgent string) *Update {
currentVersion, err := goversion.NewVersion(version)
if err != nil {
currentVersion, _ = goversion.NewVersion("0.0.0")
@@ -43,6 +44,7 @@ func NewUpdate() *Update {
latestAvailable, _ := goversion.NewVersion("0.0.0")
u := &Update{
httpAgent: httpAgent,
latestAvailable: latestAvailable,
uiVersion: currentVersion,
fetchTicker: time.NewTicker(fetchPeriod),
@@ -112,7 +114,15 @@ func (u *Update) startFetcher() {
func (u *Update) fetchVersion() bool {
log.Debugf("fetching version info from %s", versionURL)
resp, err := http.Get(versionURL)
req, err := http.NewRequest("GET", versionURL, nil)
if err != nil {
log.Errorf("failed to create request for version info: %s", err)
return false
}
req.Header.Set("User-Agent", u.httpAgent)
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Errorf("failed to fetch version info: %s", err)
return false

View File

@@ -9,6 +9,8 @@ import (
"time"
)
const httpAgent = "pkg/test"
func TestNewUpdate(t *testing.T) {
version = "1.0.0"
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -21,7 +23,7 @@ func TestNewUpdate(t *testing.T) {
wg.Add(1)
onUpdate := false
u := NewUpdate()
u := NewUpdate(httpAgent)
defer u.StopWatch()
u.SetOnUpdateListener(func() {
onUpdate = true
@@ -46,7 +48,7 @@ func TestDoNotUpdate(t *testing.T) {
wg.Add(1)
onUpdate := false
u := NewUpdate()
u := NewUpdate(httpAgent)
defer u.StopWatch()
u.SetOnUpdateListener(func() {
onUpdate = true
@@ -71,7 +73,7 @@ func TestDaemonUpdate(t *testing.T) {
wg.Add(1)
onUpdate := false
u := NewUpdate()
u := NewUpdate(httpAgent)
defer u.StopWatch()
u.SetOnUpdateListener(func() {
onUpdate = true