mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-07 09:19:59 +00:00
Compare commits
29 Commits
v0.48.0
...
feature/li
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5d34804062 | ||
|
|
418d23d7f8 | ||
|
|
b0582c86ba | ||
|
|
1b7c787cc5 | ||
|
|
ad50e07325 | ||
|
|
c66bd0cc71 | ||
|
|
0480507a10 | ||
|
|
34ac4e4b5a | ||
|
|
52ff9d9602 | ||
|
|
6f0111eab0 | ||
|
|
d403d20e7b | ||
|
|
c043018939 | ||
|
|
ed6ed4a597 | ||
|
|
905a3481ec | ||
|
|
1b73fae46e | ||
|
|
47052fa024 | ||
|
|
d897365abc | ||
|
|
f37aa2cc9d | ||
|
|
5343bee7b2 | ||
|
|
870e29db63 | ||
|
|
08e9b05d51 | ||
|
|
3581648071 | ||
|
|
2a51609436 | ||
|
|
83457f8b99 | ||
|
|
4795e2fbc4 | ||
|
|
ec57c685a9 | ||
|
|
4b44b8c46c | ||
|
|
0c7cac81f0 | ||
|
|
5e9ea122f7 |
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -9,7 +9,7 @@ on:
|
||||
pull_request:
|
||||
|
||||
env:
|
||||
SIGN_PIPE_VER: "v0.0.18"
|
||||
SIGN_PIPE_VER: "v0.0.19"
|
||||
GORELEASER_VER: "v2.3.2"
|
||||
PRODUCT_NAME: "NetBird"
|
||||
COPYRIGHT: "NetBird GmbH"
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
FROM alpine:3.21.3
|
||||
# iproute2: busybox doesn't display ip rules properly
|
||||
RUN apk add --no-cache ca-certificates ip6tables iproute2 iptables
|
||||
|
||||
ARG NETBIRD_BINARY=netbird
|
||||
COPY ${NETBIRD_BINARY} /usr/local/bin/netbird
|
||||
|
||||
ENV NB_FOREGROUND_MODE=true
|
||||
ENTRYPOINT [ "/usr/local/bin/netbird","up"]
|
||||
COPY netbird /usr/local/bin/netbird
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
FROM alpine:3.21.0
|
||||
|
||||
COPY netbird /usr/local/bin/netbird
|
||||
ARG NETBIRD_BINARY=netbird
|
||||
COPY ${NETBIRD_BINARY} /usr/local/bin/netbird
|
||||
|
||||
RUN apk add --no-cache ca-certificates \
|
||||
&& adduser -D -h /var/lib/netbird netbird
|
||||
|
||||
@@ -83,7 +83,6 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
|
||||
}
|
||||
t.Cleanup(cleanUp)
|
||||
|
||||
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
|
||||
eventStore := &activity.InMemoryEventStore{}
|
||||
if err != nil {
|
||||
return nil, nil
|
||||
@@ -92,6 +91,9 @@ func startManagement(t *testing.T, config *types.Config, testFile string) (*grpc
|
||||
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
t.Cleanup(ctrl.Finish)
|
||||
|
||||
|
||||
@@ -175,7 +175,7 @@ func (e *ConnMgr) AddPeerConn(ctx context.Context, peerKey string, conn *peer.Co
|
||||
PeerConnID: conn.ConnID(),
|
||||
Log: conn.Log,
|
||||
}
|
||||
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
|
||||
excluded, err := e.lazyConnMgr.AddPeer(e.lazyCtx, lazyPeerCfg)
|
||||
if err != nil {
|
||||
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
|
||||
if err := conn.Open(ctx); err != nil {
|
||||
|
||||
@@ -1456,16 +1456,16 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
|
||||
}
|
||||
t.Cleanup(cleanUp)
|
||||
|
||||
peersUpdateManager := server.NewPeersUpdateManager(nil)
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
peersUpdateManager := server.NewPeersUpdateManager(metrics)
|
||||
eventStore := &activity.InMemoryEventStore{}
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
|
||||
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
t.Cleanup(ctrl.Finish)
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
|
||||
@@ -68,3 +68,8 @@ func (i *Monitor) PauseTimer() {
|
||||
func (i *Monitor) ResetTimer() {
|
||||
i.timer.Reset(i.inactivityThreshold)
|
||||
}
|
||||
|
||||
func (i *Monitor) ResetMonitor(ctx context.Context, timeoutChan chan peer.ConnID) {
|
||||
i.Stop()
|
||||
go i.Start(ctx, timeoutChan)
|
||||
}
|
||||
|
||||
@@ -58,7 +58,7 @@ type Manager struct {
|
||||
// Route HA group management
|
||||
peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to
|
||||
haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group
|
||||
routesMu sync.RWMutex // protects route mappings
|
||||
routesMu sync.RWMutex
|
||||
|
||||
onInactive chan peerid.ConnID
|
||||
}
|
||||
@@ -146,7 +146,7 @@ func (m *Manager) Start(ctx context.Context) {
|
||||
case peerConnID := <-m.activityManager.OnActivityChan:
|
||||
m.onPeerActivity(ctx, peerConnID)
|
||||
case peerConnID := <-m.onInactive:
|
||||
m.onPeerInactivityTimedOut(peerConnID)
|
||||
m.onPeerInactivityTimedOut(ctx, peerConnID)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -197,7 +197,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo
|
||||
return added
|
||||
}
|
||||
|
||||
func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
|
||||
func (m *Manager) AddPeer(ctx context.Context, peerCfg lazyconn.PeerConfig) (bool, error) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
@@ -225,6 +225,13 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
|
||||
peerCfg: &peerCfg,
|
||||
expectedWatcher: watcherActivity,
|
||||
}
|
||||
|
||||
// Check if this peer should be activated because its HA group peers are active
|
||||
if group, ok := m.shouldActivateNewPeer(peerCfg.PublicKey); ok {
|
||||
peerCfg.Log.Debugf("peer belongs to active HA group %s, will activate immediately", group)
|
||||
m.activateNewPeerInActiveGroup(ctx, peerCfg)
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -315,36 +322,38 @@ func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConf
|
||||
|
||||
// activateHAGroupPeers activates all peers in HA groups that the given peer belongs to
|
||||
func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) {
|
||||
var peersToActivate []string
|
||||
|
||||
m.routesMu.RLock()
|
||||
haGroups := m.peerToHAGroups[triggerPeerID]
|
||||
m.routesMu.RUnlock()
|
||||
|
||||
if len(haGroups) == 0 {
|
||||
m.routesMu.RUnlock()
|
||||
log.Debugf("peer %s is not part of any HA groups", triggerPeerID)
|
||||
return
|
||||
}
|
||||
|
||||
activatedCount := 0
|
||||
for _, haGroup := range haGroups {
|
||||
m.routesMu.RLock()
|
||||
peers := m.haGroupToPeers[haGroup]
|
||||
m.routesMu.RUnlock()
|
||||
|
||||
for _, peerID := range peers {
|
||||
if peerID == triggerPeerID {
|
||||
continue
|
||||
if peerID != triggerPeerID {
|
||||
peersToActivate = append(peersToActivate, peerID)
|
||||
}
|
||||
}
|
||||
}
|
||||
m.routesMu.RUnlock()
|
||||
|
||||
cfg, mp := m.getPeerForActivation(peerID)
|
||||
if cfg == nil {
|
||||
continue
|
||||
}
|
||||
activatedCount := 0
|
||||
for _, peerID := range peersToActivate {
|
||||
cfg, mp := m.getPeerForActivation(peerID)
|
||||
if cfg == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
if m.activateSinglePeer(ctx, cfg, mp) {
|
||||
activatedCount++
|
||||
cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID)
|
||||
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
|
||||
}
|
||||
if m.activateSinglePeer(ctx, cfg, mp) {
|
||||
activatedCount++
|
||||
cfg.Log.Infof("activated peer as part of HA group (triggered by %s)", triggerPeerID)
|
||||
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -354,6 +363,51 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string
|
||||
}
|
||||
}
|
||||
|
||||
// shouldActivateNewPeer checks if a newly added peer should be activated
|
||||
// because other peers in its HA groups are already active
|
||||
func (m *Manager) shouldActivateNewPeer(peerID string) (route.HAUniqueID, bool) {
|
||||
m.routesMu.RLock()
|
||||
defer m.routesMu.RUnlock()
|
||||
|
||||
haGroups := m.peerToHAGroups[peerID]
|
||||
if len(haGroups) == 0 {
|
||||
return "", false
|
||||
}
|
||||
|
||||
for _, haGroup := range haGroups {
|
||||
peers := m.haGroupToPeers[haGroup]
|
||||
for _, groupPeerID := range peers {
|
||||
if groupPeerID == peerID {
|
||||
continue
|
||||
}
|
||||
|
||||
cfg, ok := m.managedPeers[groupPeerID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
if mp, ok := m.managedPeersByConnID[cfg.PeerConnID]; ok && mp.expectedWatcher == watcherInactivity {
|
||||
return haGroup, true
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", false
|
||||
}
|
||||
|
||||
// activateNewPeerInActiveGroup activates a newly added peer that should be active due to HA group
|
||||
func (m *Manager) activateNewPeerInActiveGroup(ctx context.Context, peerCfg lazyconn.PeerConfig) {
|
||||
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if !m.activateSinglePeer(ctx, &peerCfg, mp) {
|
||||
return
|
||||
}
|
||||
|
||||
peerCfg.Log.Infof("activated newly added peer due to active HA group peers")
|
||||
m.peerStore.PeerConnOpen(m.engineCtx, peerCfg.PublicKey)
|
||||
}
|
||||
|
||||
func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error {
|
||||
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
|
||||
peerCfg.Log.Warnf("peer already managed")
|
||||
@@ -415,6 +469,48 @@ func (m *Manager) close() {
|
||||
log.Infof("lazy connection manager closed")
|
||||
}
|
||||
|
||||
// shouldDeferIdleForHA checks if peer should stay connected due to HA group requirements
|
||||
func (m *Manager) shouldDeferIdleForHA(peerID string) bool {
|
||||
m.routesMu.RLock()
|
||||
defer m.routesMu.RUnlock()
|
||||
|
||||
haGroups := m.peerToHAGroups[peerID]
|
||||
if len(haGroups) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, haGroup := range haGroups {
|
||||
groupPeers := m.haGroupToPeers[haGroup]
|
||||
|
||||
for _, groupPeerID := range groupPeers {
|
||||
if groupPeerID == peerID {
|
||||
continue
|
||||
}
|
||||
|
||||
cfg, ok := m.managedPeers[groupPeerID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
groupMp, ok := m.managedPeersByConnID[cfg.PeerConnID]
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
if groupMp.expectedWatcher != watcherInactivity {
|
||||
continue
|
||||
}
|
||||
|
||||
// Other member is still connected, defer idle
|
||||
if peer, ok := m.peerStore.PeerConn(groupPeerID); ok && peer.IsConnected() {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
@@ -441,7 +537,7 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID)
|
||||
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
|
||||
}
|
||||
|
||||
func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
|
||||
func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerConnID peerid.ConnID) {
|
||||
m.managedPeersMu.Lock()
|
||||
defer m.managedPeersMu.Unlock()
|
||||
|
||||
@@ -456,6 +552,17 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
|
||||
return
|
||||
}
|
||||
|
||||
if m.shouldDeferIdleForHA(mp.peerCfg.PublicKey) {
|
||||
iw, ok := m.inactivityMonitors[peerConnID]
|
||||
if ok {
|
||||
mp.peerCfg.Log.Debugf("resetting inactivity timer due to HA group requirements")
|
||||
iw.ResetMonitor(ctx, m.onInactive)
|
||||
} else {
|
||||
mp.peerCfg.Log.Errorf("inactivity monitor not found for HA defer reset")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
mp.peerCfg.Log.Infof("connection timed out")
|
||||
|
||||
// this is blocking operation, potentially can be optimized
|
||||
@@ -489,7 +596,7 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) {
|
||||
|
||||
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID]
|
||||
if !ok {
|
||||
mp.peerCfg.Log.Errorf("inactivity monitor not found for peer")
|
||||
mp.peerCfg.Log.Warnf("inactivity monitor not found for peer")
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
@@ -317,12 +317,12 @@ func (conn *Conn) WgConfig() WgConfig {
|
||||
return conn.config.WgConfig
|
||||
}
|
||||
|
||||
// IsConnected unit tests only
|
||||
// refactor unit test to use status recorder use refactor status recorded to manage connection status in peer.Conn
|
||||
// IsConnected returns true if the peer is connected
|
||||
func (conn *Conn) IsConnected() bool {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
return conn.currentConnPriority != conntype.None
|
||||
|
||||
return conn.evalStatus() == StatusConnected
|
||||
}
|
||||
|
||||
func (conn *Conn) GetKey() string {
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
<Wix
|
||||
xmlns="http://wixtoolset.org/schemas/v4/wxs">
|
||||
xmlns="http://wixtoolset.org/schemas/v4/wxs"
|
||||
xmlns:util="http://wixtoolset.org/schemas/v4/wxs/util">
|
||||
<Package Name="NetBird" Version="$(env.NETBIRD_VERSION)" Manufacturer="NetBird GmbH" Language="1033" UpgradeCode="6456ec4e-3ad6-4b9b-a2be-98e81cb21ccf"
|
||||
InstallerVersion="500" Compressed="yes" Codepage="utf-8" >
|
||||
|
||||
|
||||
<MediaTemplate EmbedCab="yes" />
|
||||
|
||||
<Feature Id="NetbirdFeature" Title="Netbird" Level="1">
|
||||
@@ -46,29 +48,10 @@
|
||||
<ComponentRef Id="NetbirdFiles" />
|
||||
</ComponentGroup>
|
||||
|
||||
<Property Id="cmd" Value="cmd.exe"/>
|
||||
<util:CloseApplication Id="CloseNetBird" CloseMessage="no" Target="netbird.exe" RebootPrompt="no" />
|
||||
<util:CloseApplication Id="CloseNetBirdUI" CloseMessage="no" Target="netbird-ui.exe" RebootPrompt="no" />
|
||||
|
||||
<CustomAction Id="KillDaemon"
|
||||
ExeCommand='/c "taskkill /im netbird.exe"'
|
||||
Execute="deferred"
|
||||
Property="cmd"
|
||||
Impersonate="no"
|
||||
Return="ignore"
|
||||
/>
|
||||
|
||||
<CustomAction Id="KillUI"
|
||||
ExeCommand='/c "taskkill /im netbird-ui.exe"'
|
||||
Execute="deferred"
|
||||
Property="cmd"
|
||||
Impersonate="no"
|
||||
Return="ignore"
|
||||
/>
|
||||
|
||||
<InstallExecuteSequence>
|
||||
<!-- For Uninstallation -->
|
||||
<Custom Action="KillDaemon" Before="RemoveFiles" Condition="Installed"/>
|
||||
<Custom Action="KillUI" After="KillDaemon" Condition="Installed"/>
|
||||
</InstallExecuteSequence>
|
||||
|
||||
<!-- Icons -->
|
||||
<Icon Id="NetbirdIcon" SourceFile=".\client\ui\assets\netbird.ico" />
|
||||
|
||||
@@ -191,16 +191,16 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
|
||||
}
|
||||
t.Cleanup(cleanUp)
|
||||
|
||||
peersUpdateManager := server.NewPeersUpdateManager(nil)
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
peersUpdateManager := server.NewPeersUpdateManager(metrics)
|
||||
eventStore := &activity.InMemoryEventStore{}
|
||||
if err != nil {
|
||||
return nil, "", err
|
||||
}
|
||||
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
|
||||
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
t.Cleanup(ctrl.Finish)
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
|
||||
@@ -280,7 +280,7 @@ func newServiceClient(addr string, logFile string, a fyne.App, showSettings bool
|
||||
|
||||
showAdvancedSettings: showSettings,
|
||||
showNetworks: showNetworks,
|
||||
update: version.NewUpdate(),
|
||||
update: version.NewUpdate("nb/client-ui"),
|
||||
}
|
||||
|
||||
s.eventHandler = newEventHandler(s)
|
||||
@@ -879,7 +879,7 @@ func (s *serviceClient) onUpdateAvailable() {
|
||||
func (s *serviceClient) onSessionExpire() {
|
||||
s.sendNotification = true
|
||||
if s.sendNotification {
|
||||
s.eventHandler.runSelfCommand("login-url", "true")
|
||||
s.eventHandler.runSelfCommand(s.ctx, "login-url", "true")
|
||||
s.sendNotification = false
|
||||
}
|
||||
}
|
||||
@@ -992,21 +992,6 @@ func (s *serviceClient) restartClient(loginRequest *proto.LoginRequest) error {
|
||||
// showLoginURL creates a borderless window styled like a pop-up in the top-right corner using s.wLoginURL.
|
||||
func (s *serviceClient) showLoginURL() {
|
||||
|
||||
resp, err := s.login(false)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch login URL: %v", err)
|
||||
return
|
||||
}
|
||||
verificationURL := resp.VerificationURIComplete
|
||||
if verificationURL == "" {
|
||||
verificationURL = resp.VerificationURI
|
||||
}
|
||||
|
||||
if verificationURL == "" {
|
||||
log.Error("no verification URL provided in the login response")
|
||||
return
|
||||
}
|
||||
|
||||
resIcon := fyne.NewStaticResource("netbird.png", iconAbout)
|
||||
|
||||
if s.wLoginURL == nil {
|
||||
@@ -1025,6 +1010,21 @@ func (s *serviceClient) showLoginURL() {
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := s.login(false)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch login URL: %v", err)
|
||||
return
|
||||
}
|
||||
verificationURL := resp.VerificationURIComplete
|
||||
if verificationURL == "" {
|
||||
verificationURL = resp.VerificationURI
|
||||
}
|
||||
|
||||
if verificationURL == "" {
|
||||
log.Error("no verification URL provided in the login response")
|
||||
return
|
||||
}
|
||||
|
||||
if err := openURL(verificationURL); err != nil {
|
||||
log.Errorf("failed to open login URL: %v", err)
|
||||
return
|
||||
@@ -1038,7 +1038,19 @@ func (s *serviceClient) showLoginURL() {
|
||||
}
|
||||
|
||||
label.SetText("Re-authentication successful.\nReconnecting")
|
||||
time.Sleep(300 * time.Millisecond)
|
||||
status, err := conn.Status(s.ctx, &proto.StatusRequest{})
|
||||
if err != nil {
|
||||
log.Errorf("get service status: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if status.Status == string(internal.StatusConnected) {
|
||||
label.SetText("Already connected.\nClosing this window.")
|
||||
time.Sleep(2 * time.Second)
|
||||
s.wLoginURL.Close()
|
||||
return
|
||||
}
|
||||
|
||||
_, err = conn.Up(s.ctx, &proto.UpRequest{})
|
||||
if err != nil {
|
||||
label.SetText("Reconnecting failed, please create \na debug bundle in the settings and contact support.")
|
||||
|
||||
@@ -122,7 +122,7 @@ func (h *eventHandler) handleAdvancedSettingsClick() {
|
||||
go func() {
|
||||
defer h.client.mAdvancedSettings.Enable()
|
||||
defer h.client.getSrvConfig()
|
||||
h.runSelfCommand("settings", "true")
|
||||
h.runSelfCommand(h.client.ctx, "settings", "true")
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -130,7 +130,7 @@ func (h *eventHandler) handleCreateDebugBundleClick() {
|
||||
h.client.mCreateDebugBundle.Disable()
|
||||
go func() {
|
||||
defer h.client.mCreateDebugBundle.Enable()
|
||||
h.runSelfCommand("debug", "true")
|
||||
h.runSelfCommand(h.client.ctx, "debug", "true")
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -154,7 +154,7 @@ func (h *eventHandler) handleNetworksClick() {
|
||||
h.client.mNetworks.Disable()
|
||||
go func() {
|
||||
defer h.client.mNetworks.Enable()
|
||||
h.runSelfCommand("networks", "true")
|
||||
h.runSelfCommand(h.client.ctx, "networks", "true")
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -172,14 +172,14 @@ func (h *eventHandler) updateConfigWithErr() {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *eventHandler) runSelfCommand(command, arg string) {
|
||||
func (h *eventHandler) runSelfCommand(ctx context.Context, command, arg string) {
|
||||
proc, err := os.Executable()
|
||||
if err != nil {
|
||||
log.Errorf("error getting executable path: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
cmd := exec.Command(proc,
|
||||
cmd := exec.CommandContext(ctx, proc,
|
||||
fmt.Sprintf("--%s=%s", command, arg),
|
||||
fmt.Sprintf("--daemon-addr=%s", h.client.addr),
|
||||
)
|
||||
|
||||
@@ -60,7 +60,7 @@ NETBIRD_TOKEN_SOURCE=${NETBIRD_TOKEN_SOURCE:-accessToken}
|
||||
NETBIRD_AUTH_PKCE_REDIRECT_URL_PORTS=${NETBIRD_AUTH_PKCE_REDIRECT_URL_PORTS:-"53000"}
|
||||
NETBIRD_AUTH_PKCE_USE_ID_TOKEN=${NETBIRD_AUTH_PKCE_USE_ID_TOKEN:-false}
|
||||
NETBIRD_AUTH_PKCE_DISABLE_PROMPT_LOGIN=${NETBIRD_AUTH_PKCE_DISABLE_PROMPT_LOGIN:-false}
|
||||
NETBIRD_AUTH_PKCE_LOGIN_FLAG=${NETBIRD_AUTH_PKCE_LOGIN_FLAG:-1}
|
||||
NETBIRD_AUTH_PKCE_LOGIN_FLAG=${NETBIRD_AUTH_PKCE_LOGIN_FLAG:-0}
|
||||
NETBIRD_AUTH_PKCE_AUDIENCE=$NETBIRD_AUTH_AUDIENCE
|
||||
|
||||
# Dashboard
|
||||
|
||||
@@ -68,13 +68,13 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
||||
}
|
||||
t.Cleanup(cleanUp)
|
||||
|
||||
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
|
||||
eventStore := &activity.InMemoryEventStore{}
|
||||
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
|
||||
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
peersUpdateManager := mgmt.NewPeersUpdateManager(metrics)
|
||||
eventStore := &activity.InMemoryEventStore{}
|
||||
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
t.Cleanup(ctrl.Finish)
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
|
||||
@@ -357,6 +357,13 @@ var (
|
||||
log.WithContext(ctx).Infof("running HTTP server and gRPC server on the same port: %s", listener.Addr().String())
|
||||
serveGRPCWithHTTP(ctx, listener, rootHandler, tlsEnabled)
|
||||
|
||||
update := version.NewUpdate("nb/management")
|
||||
update.SetDaemonVersion(version.NetbirdVersion())
|
||||
update.SetOnUpdateListener(func() {
|
||||
log.WithContext(ctx).Infof("your management version, \"%s\", is outdated, a new management version is available. Learn more here: https://github.com/netbirdio/netbird/releases", version.NetbirdVersion())
|
||||
})
|
||||
defer update.StopWatch()
|
||||
|
||||
SetupCloseHandler()
|
||||
|
||||
<-stopCh
|
||||
|
||||
@@ -1624,6 +1624,10 @@ func (am *DefaultAccountManager) GetDNSDomain(settings *types.Settings) string {
|
||||
|
||||
func (am *DefaultAccountManager) onPeersInvalidated(ctx context.Context, accountID string) {
|
||||
log.WithContext(ctx).Debugf("validated peers has been invalidated for account %s", accountID)
|
||||
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
|
||||
}
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
@@ -1853,40 +1857,49 @@ func (am *DefaultAccountManager) GetOrCreateAccountByPrivateDomain(ctx context.C
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) {
|
||||
account, err := am.Store.GetAccount(ctx, accountId)
|
||||
var account *types.Account
|
||||
err := am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
var err error
|
||||
account, err = transaction.GetAccount(ctx, accountId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if account.IsDomainPrimaryAccount {
|
||||
return nil
|
||||
}
|
||||
|
||||
existingPrimaryAccountID, err := transaction.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain)
|
||||
|
||||
// error is not a not found error
|
||||
if handleNotFound(err) != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// a primary account already exists for this private domain
|
||||
if err == nil {
|
||||
log.WithContext(ctx).WithFields(log.Fields{
|
||||
"accountId": accountId,
|
||||
"existingAccountId": existingPrimaryAccountID,
|
||||
}).Errorf("cannot update account to primary, another account already exists as primary for the same domain")
|
||||
return status.Errorf(status.Internal, "cannot update account to primary")
|
||||
}
|
||||
|
||||
account.IsDomainPrimaryAccount = true
|
||||
|
||||
if err := transaction.SaveAccount(ctx, account); err != nil {
|
||||
log.WithContext(ctx).WithFields(log.Fields{
|
||||
"accountId": accountId,
|
||||
}).Errorf("failed to update account to primary: %v", err)
|
||||
return status.Errorf(status.Internal, "failed to update account to primary")
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if account.IsDomainPrimaryAccount {
|
||||
return account, nil
|
||||
}
|
||||
|
||||
existingPrimaryAccountID, err := am.Store.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain)
|
||||
|
||||
// error is not a not found error
|
||||
if handleNotFound(err) != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// a primary account already exists for this private domain
|
||||
if err == nil {
|
||||
log.WithContext(ctx).WithFields(log.Fields{
|
||||
"accountId": accountId,
|
||||
"existingAccountId": existingPrimaryAccountID,
|
||||
}).Errorf("cannot update account to primary, another account already exists as primary for the same domain")
|
||||
return nil, status.Errorf(status.Internal, "cannot update account to primary")
|
||||
}
|
||||
|
||||
account.IsDomainPrimaryAccount = true
|
||||
|
||||
if err := am.Store.SaveAccount(ctx, account); err != nil {
|
||||
log.WithContext(ctx).WithFields(log.Fields{
|
||||
"accountId": accountId,
|
||||
}).Errorf("failed to update account to primary: %v", err)
|
||||
return nil, status.Errorf(status.Internal, "failed to update account to primary")
|
||||
}
|
||||
|
||||
return account, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -17,6 +17,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
|
||||
nbdns "github.com/netbirdio/netbird/dns"
|
||||
@@ -1186,7 +1187,10 @@ func TestAccountManager_NetworkUpdates_SaveGroup(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
message := <-updMsg
|
||||
message, ok := updMsg.Pop(context.Background())
|
||||
if !ok {
|
||||
t.Errorf("failed to receive update message")
|
||||
}
|
||||
networkMap := message.Update.GetNetworkMap()
|
||||
if len(networkMap.RemotePeers) != 2 {
|
||||
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
|
||||
@@ -1213,7 +1217,10 @@ func TestAccountManager_NetworkUpdates_DeletePolicy(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
message := <-updMsg
|
||||
message, ok := updMsg.Pop(context.Background())
|
||||
if !ok {
|
||||
t.Errorf("failed to receive update message")
|
||||
}
|
||||
networkMap := message.Update.GetNetworkMap()
|
||||
if len(networkMap.RemotePeers) != 0 {
|
||||
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
|
||||
@@ -1249,7 +1256,10 @@ func TestAccountManager_NetworkUpdates_SavePolicy(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
message := <-updMsg
|
||||
message, ok := updMsg.Pop(context.Background())
|
||||
if !ok {
|
||||
t.Errorf("failed to receive update message")
|
||||
}
|
||||
networkMap := message.Update.GetNetworkMap()
|
||||
if len(networkMap.RemotePeers) != 2 {
|
||||
t.Errorf("mismatch peers count: 2 expected, got %v", len(networkMap.RemotePeers))
|
||||
@@ -1314,7 +1324,10 @@ func TestAccountManager_NetworkUpdates_DeletePeer(t *testing.T) {
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
message := <-updMsg
|
||||
message, ok := updMsg.Pop(context.Background())
|
||||
if !ok {
|
||||
t.Errorf("failed to receive update message")
|
||||
}
|
||||
networkMap := message.Update.GetNetworkMap()
|
||||
if len(networkMap.RemotePeers) != 1 {
|
||||
t.Errorf("mismatch peers count: 1 expected, got %v", len(networkMap.RemotePeers))
|
||||
@@ -1365,15 +1378,24 @@ func TestAccountManager_NetworkUpdates_DeleteGroup(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
// emptying buffer of previous changes
|
||||
_, _ = updMsg.Pop(context.Background())
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
message := <-updMsg
|
||||
networkMap := message.Update.GetNetworkMap()
|
||||
if len(networkMap.RemotePeers) != 0 {
|
||||
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
|
||||
// expecting 2 messages (policy delete and group delete)
|
||||
for i := 0; i < 1; i++ {
|
||||
message, ok := updMsg.Pop(context.Background())
|
||||
if !ok {
|
||||
t.Errorf("failed to receive update message")
|
||||
}
|
||||
networkMap := message.Update.GetNetworkMap()
|
||||
if len(networkMap.RemotePeers) != 0 {
|
||||
t.Errorf("mismatch peers count: 0 expected, got %v", len(networkMap.RemotePeers))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -2879,7 +2901,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, error) {
|
||||
|
||||
permissionsManager := permissions.NewManager(store)
|
||||
|
||||
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
manager, err := BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -2960,23 +2982,58 @@ func setupNetworkMapTest(t *testing.T) (*DefaultAccountManager, *types.Account,
|
||||
return manager, account, peer1, peer2, peer3
|
||||
}
|
||||
|
||||
func peerShouldNotReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) {
|
||||
func peerShouldNotReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
|
||||
t.Helper()
|
||||
|
||||
resultCh := make(chan struct {
|
||||
msg *UpdateMessage
|
||||
ok bool
|
||||
}, 1)
|
||||
|
||||
go func() {
|
||||
msg, ok := buffer.Pop(context.Background())
|
||||
resultCh <- struct {
|
||||
msg *UpdateMessage
|
||||
ok bool
|
||||
}{msg, ok}
|
||||
}()
|
||||
|
||||
select {
|
||||
case msg := <-updateMessage:
|
||||
case msg := <-resultCh:
|
||||
if !msg.ok {
|
||||
t.Errorf("Update message channel closed unexpectedly")
|
||||
}
|
||||
t.Errorf("Unexpected message received: %+v", msg)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func peerShouldReceiveUpdate(t *testing.T, updateMessage <-chan *UpdateMessage) {
|
||||
func peerShouldReceiveUpdate(t *testing.T, buffer *UpdateBuffer) {
|
||||
t.Helper()
|
||||
|
||||
resultCh := make(chan struct {
|
||||
msg *UpdateMessage
|
||||
ok bool
|
||||
}, 1)
|
||||
|
||||
go func() {
|
||||
msg, ok := buffer.Pop(context.Background())
|
||||
resultCh <- struct {
|
||||
msg *UpdateMessage
|
||||
ok bool
|
||||
}{msg, ok}
|
||||
}()
|
||||
|
||||
select {
|
||||
case msg := <-updateMessage:
|
||||
if msg == nil {
|
||||
case msg := <-resultCh:
|
||||
if !msg.ok {
|
||||
t.Errorf("Update message channel closed unexpectedly")
|
||||
return
|
||||
}
|
||||
if msg.msg == nil {
|
||||
t.Errorf("Received nil update message, expected valid message")
|
||||
return
|
||||
}
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Error("Timed out waiting for update message")
|
||||
@@ -3017,9 +3074,13 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to get account: %v", err)
|
||||
}
|
||||
peerChannels := make(map[string]chan *UpdateMessage)
|
||||
peerChannels := make(map[string]*UpdateBuffer)
|
||||
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to create update channel metrics: %v", err)
|
||||
}
|
||||
for peerID := range account.Peers {
|
||||
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
|
||||
peerChannels[peerID] = NewUpdateBuffer(metrics)
|
||||
}
|
||||
manager.peersUpdateManager.peerChannels = peerChannels
|
||||
|
||||
@@ -3085,9 +3146,13 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to get account: %v", err)
|
||||
}
|
||||
peerChannels := make(map[string]chan *UpdateMessage)
|
||||
peerChannels := make(map[string]*UpdateBuffer)
|
||||
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to create update channel metrics: %v", err)
|
||||
}
|
||||
for peerID := range account.Peers {
|
||||
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
|
||||
peerChannels[peerID] = NewUpdateBuffer(metrics)
|
||||
}
|
||||
manager.peersUpdateManager.peerChannels = peerChannels
|
||||
|
||||
@@ -3160,9 +3225,13 @@ func BenchmarkLoginPeer_NewPeer(b *testing.B) {
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to get account: %v", err)
|
||||
}
|
||||
peerChannels := make(map[string]chan *UpdateMessage)
|
||||
peerChannels := make(map[string]*UpdateBuffer)
|
||||
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to create update channel metrics: %v", err)
|
||||
}
|
||||
for peerID := range account.Peers {
|
||||
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
|
||||
peerChannels[peerID] = NewUpdateBuffer(metrics)
|
||||
}
|
||||
manager.peersUpdateManager.peerChannels = peerChannels
|
||||
|
||||
|
||||
@@ -217,7 +217,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
|
||||
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
permissionsManager := permissions.NewManager(store)
|
||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.test", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
}
|
||||
|
||||
func createDNSStore(t *testing.T) (store.Store, error) {
|
||||
@@ -507,13 +507,12 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
|
||||
}, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
// Saving DNS settings with groups that have no peers should not trigger updates to account peers or send peer updates
|
||||
t.Run("saving dns setting with unused groups", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -534,6 +533,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Creating DNS settings with groups that have no peers should not update account peers or send peer update
|
||||
t.Run("creating dns setting with unused groups", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -560,6 +563,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Creating DNS settings with groups that have peers should update account peers and send peer update
|
||||
t.Run("creating dns setting with used groups", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
|
||||
ID: "groupA",
|
||||
Name: "GroupA",
|
||||
@@ -567,6 +574,8 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
|
||||
}, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
_, _ = updMsg.Pop(context.Background())
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -593,6 +602,18 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Saving DNS settings with groups that have peers should update account peers and send peer update
|
||||
t.Run("saving dns setting with used groups", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
err = manager.SaveGroup(context.Background(), account.Id, userID, &types.Group{
|
||||
ID: "groupA",
|
||||
Name: "GroupA",
|
||||
Peers: []string{peer1.ID, peer2.ID, peer3.ID},
|
||||
}, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -613,6 +634,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Removing group with no peers from DNS settings should not trigger updates to account peers or send peer updates
|
||||
t.Run("removing group with no peers from dns settings", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -633,6 +658,10 @@ func TestDNSAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Removing group with peers from DNS settings should trigger updates to account peers and send peer updates
|
||||
t.Run("removing group with peers from dns settings", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
|
||||
@@ -429,13 +429,12 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
}, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
// Saving a group that is not linked to any resource should not update account peers
|
||||
t.Run("saving unlinked group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -459,6 +458,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
// Adding a peer to a group that is not linked to any resource should not update account peers
|
||||
// and not send peer update
|
||||
t.Run("adding peer to unlinked group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -478,6 +481,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
// Removing a peer from a group that is not linked to any resource should not update account peers
|
||||
// and not send peer update
|
||||
t.Run("removing peer from unliked group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -496,6 +503,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting group should not update account peers and not send peer update
|
||||
t.Run("deleting group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -529,6 +540,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Saving a group linked to policy should update account peers and send peer update
|
||||
t.Run("saving linked group to policy", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -551,6 +566,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// adding peer to a used group should update account peers and send peer update
|
||||
t.Run("adding peer to linked group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -569,6 +588,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// removing peer from a linked group should update account peers and send peer update
|
||||
t.Run("removing peer from linked group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -587,6 +610,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Saving a group linked to name server group should update account peers and send peer update
|
||||
t.Run("saving group linked to name server group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
_, err = manager.CreateNameServerGroup(
|
||||
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
|
||||
IP: netip.MustParseAddr("1.1.1.1"),
|
||||
@@ -620,6 +647,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Saving a group linked to route should update account peers and send peer update
|
||||
t.Run("saving group linked to route", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
newRoute := route.Route{
|
||||
ID: "route",
|
||||
Network: netip.MustParsePrefix("192.168.0.0/16"),
|
||||
@@ -661,6 +692,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Saving a group linked to dns settings should update account peers and send peer update
|
||||
t.Run("saving group linked to dns settings", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
err := manager.SaveDNSSettings(context.Background(), account.Id, userID, &types.DNSSettings{
|
||||
DisabledManagementGroups: []string{"groupD"},
|
||||
})
|
||||
@@ -688,6 +723,10 @@ func TestGroupAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Saving a group linked to network router should update account peers and send peer update
|
||||
t.Run("saving group linked to network router", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
permissionsManager := permissions.NewManager(manager.Store)
|
||||
groupsManager := groups.NewManager(manager.Store, permissionsManager, manager)
|
||||
resourcesManager := resources.NewManager(manager.Store, permissionsManager, groupsManager, manager)
|
||||
|
||||
@@ -184,7 +184,7 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
||||
return err
|
||||
}
|
||||
|
||||
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
|
||||
updateBuffer := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
|
||||
|
||||
s.ephemeralManager.OnPeerConnected(ctx, peer)
|
||||
|
||||
@@ -199,37 +199,24 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
||||
|
||||
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
|
||||
|
||||
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
|
||||
return s.handleUpdates(ctx, accountID, peerKey, peer, updateBuffer, srv)
|
||||
}
|
||||
|
||||
// handleUpdates sends updates to the connected peer until the updates channel is closed.
|
||||
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *UpdateMessage, srv proto.ManagementService_SyncServer) error {
|
||||
func (s *GRPCServer) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates *UpdateBuffer, srv proto.ManagementService_SyncServer) error {
|
||||
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
|
||||
|
||||
for {
|
||||
select {
|
||||
// condition when there are some updates
|
||||
case update, open := <-updates:
|
||||
if s.appMetrics != nil {
|
||||
s.appMetrics.GRPCMetrics().UpdateChannelQueueLength(len(updates) + 1)
|
||||
}
|
||||
|
||||
if !open {
|
||||
log.WithContext(ctx).Debugf("updates channel for peer %s was closed", peerKey.String())
|
||||
s.cancelPeerRoutines(ctx, accountID, peer)
|
||||
return nil
|
||||
}
|
||||
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
|
||||
|
||||
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// condition when client <-> server connection has been terminated
|
||||
case <-srv.Context().Done():
|
||||
// happens when connection drops, e.g. client disconnects
|
||||
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
|
||||
update, ok := updates.Pop(ctx)
|
||||
if !ok {
|
||||
log.WithContext(ctx).Debugf("update buffer for peer %s closed", peerKey.String())
|
||||
s.cancelPeerRoutines(ctx, accountID, peer)
|
||||
return srv.Context().Err()
|
||||
return nil
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("sending latest update to peer %s", peerKey.String())
|
||||
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
|
||||
t.Fatalf("Failed to create metrics: %v", err)
|
||||
}
|
||||
|
||||
peersUpdateManager := server.NewPeersUpdateManager(nil)
|
||||
peersUpdateManager := server.NewPeersUpdateManager(metrics)
|
||||
updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId)
|
||||
done := make(chan struct{})
|
||||
if validateUpdate {
|
||||
@@ -166,24 +166,54 @@ func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *serve
|
||||
return apiHandler, am, done
|
||||
}
|
||||
|
||||
func peerShouldNotReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage) {
|
||||
func peerShouldNotReceiveUpdate(t TB, buffer *server.UpdateBuffer) {
|
||||
t.Helper()
|
||||
|
||||
resultCh := make(chan struct {
|
||||
msg *server.UpdateMessage
|
||||
ok bool
|
||||
}, 1)
|
||||
|
||||
go func() {
|
||||
msg, ok := buffer.Pop(context.Background())
|
||||
resultCh <- struct {
|
||||
msg *server.UpdateMessage
|
||||
ok bool
|
||||
}{msg, ok}
|
||||
}()
|
||||
|
||||
select {
|
||||
case msg := <-updateMessage:
|
||||
case msg := <-resultCh:
|
||||
t.Errorf("Unexpected message received: %+v", msg)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func peerShouldReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage, expected *server.UpdateMessage) {
|
||||
func peerShouldReceiveUpdate(t TB, buffer *server.UpdateBuffer, expected *server.UpdateMessage) {
|
||||
t.Helper()
|
||||
|
||||
resultCh := make(chan struct {
|
||||
msg *server.UpdateMessage
|
||||
ok bool
|
||||
}, 1)
|
||||
|
||||
go func() {
|
||||
msg, ok := buffer.Pop(context.Background())
|
||||
resultCh <- struct {
|
||||
msg *server.UpdateMessage
|
||||
ok bool
|
||||
}{msg, ok}
|
||||
}()
|
||||
|
||||
select {
|
||||
case msg := <-updateMessage:
|
||||
if msg == nil {
|
||||
case msg := <-resultCh:
|
||||
if msg.msg == nil {
|
||||
t.Errorf("Received nil update message, expected valid message")
|
||||
}
|
||||
if !msg.ok {
|
||||
t.Errorf("Expected to receive an update message, but got ok = false")
|
||||
}
|
||||
assert.Equal(t, expected, msg)
|
||||
case <-time.After(500 * time.Millisecond):
|
||||
t.Errorf("Timed out waiting for update message")
|
||||
|
||||
@@ -37,21 +37,23 @@ func (am *DefaultAccountManager) UpdateIntegratedValidatorGroups(ctx context.Con
|
||||
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
|
||||
defer unlock()
|
||||
|
||||
a, err := am.Store.GetAccountByUser(ctx, userID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
a, err := transaction.GetAccountByUser(ctx, userID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var extra *types.ExtraSettings
|
||||
var extra *types.ExtraSettings
|
||||
|
||||
if a.Settings.Extra != nil {
|
||||
extra = a.Settings.Extra
|
||||
} else {
|
||||
extra = &types.ExtraSettings{}
|
||||
a.Settings.Extra = extra
|
||||
}
|
||||
extra.IntegratedValidatorGroups = groups
|
||||
return am.Store.SaveAccount(ctx, a)
|
||||
if a.Settings.Extra != nil {
|
||||
extra = a.Settings.Extra
|
||||
} else {
|
||||
extra = &types.ExtraSettings{}
|
||||
a.Settings.Extra = extra
|
||||
}
|
||||
extra.IntegratedValidatorGroups = groups
|
||||
return transaction.SaveAccount(ctx, a)
|
||||
})
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) GroupValidation(ctx context.Context, accountID string, groupIDs []string) (bool, error) {
|
||||
@@ -81,15 +83,12 @@ func (am *DefaultAccountManager) GetValidatedPeers(ctx context.Context, accountI
|
||||
var peers []*nbpeer.Peer
|
||||
var settings *types.Settings
|
||||
|
||||
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
groups, err = transaction.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
groups, err = am.Store.GetAccountGroups(ctx, store.LockingStrengthShare, accountID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
peers, err = transaction.GetAccountPeers(ctx, store.LockingStrengthShare, accountID, "", "")
|
||||
return err
|
||||
})
|
||||
peers, err = am.Store.GetAccountPeers(ctx, store.LockingStrengthShare, accountID, "", "")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -424,14 +424,14 @@ func startManagementForTest(t *testing.T, testFile string, config *types.Config)
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
peersUpdateManager := NewPeersUpdateManager(nil)
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
peersUpdateManager := NewPeersUpdateManager(metrics)
|
||||
eventStore := &activity.InMemoryEventStore{}
|
||||
|
||||
ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
|
||||
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
require.NoError(t, err)
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
t.Cleanup(ctrl.Finish)
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
|
||||
@@ -173,14 +173,14 @@ func startServer(
|
||||
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
|
||||
}
|
||||
|
||||
peersUpdateManager := server.NewPeersUpdateManager(nil)
|
||||
eventStore := &activity.InMemoryEventStore{}
|
||||
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("failed creating metrics: %v", err)
|
||||
}
|
||||
|
||||
peersUpdateManager := server.NewPeersUpdateManager(metrics)
|
||||
eventStore := &activity.InMemoryEventStore{}
|
||||
|
||||
ctrl := gomock.NewController(t)
|
||||
t.Cleanup(ctrl.Finish)
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
|
||||
@@ -184,7 +184,9 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
|
||||
ephemeralPeersSKs int
|
||||
ephemeralPeersSKUsage int
|
||||
activePeersLastDay int
|
||||
activeUserPeersLastDay int
|
||||
osPeers map[string]int
|
||||
activeUsersLastDay map[string]struct{}
|
||||
userPeers int
|
||||
rules int
|
||||
rulesProtocol map[string]int
|
||||
@@ -203,6 +205,7 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
|
||||
version string
|
||||
peerActiveVersions []string
|
||||
osUIClients map[string]int
|
||||
rosenpassEnabled int
|
||||
)
|
||||
start := time.Now()
|
||||
metricsProperties := make(properties)
|
||||
@@ -210,6 +213,7 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
|
||||
osUIClients = make(map[string]int)
|
||||
rulesProtocol = make(map[string]int)
|
||||
rulesDirection = make(map[string]int)
|
||||
activeUsersLastDay = make(map[string]struct{})
|
||||
uptime = time.Since(w.startupTime).Seconds()
|
||||
connections := w.connManager.GetAllConnectedPeers()
|
||||
version = nbversion.NetbirdVersion()
|
||||
@@ -277,10 +281,14 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
|
||||
for _, peer := range account.Peers {
|
||||
peers++
|
||||
|
||||
if peer.SSHEnabled {
|
||||
if peer.SSHEnabled || peer.Meta.Flags.ServerSSHAllowed {
|
||||
peersSSHEnabled++
|
||||
}
|
||||
|
||||
if peer.Meta.Flags.RosenpassEnabled {
|
||||
rosenpassEnabled++
|
||||
}
|
||||
|
||||
if peer.UserID != "" {
|
||||
userPeers++
|
||||
}
|
||||
@@ -299,6 +307,10 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
|
||||
_, connected := connections[peer.ID]
|
||||
if connected || peer.Status.LastSeen.After(w.lastRun) {
|
||||
activePeersLastDay++
|
||||
if peer.UserID != "" {
|
||||
activeUserPeersLastDay++
|
||||
activeUsersLastDay[peer.UserID] = struct{}{}
|
||||
}
|
||||
osActiveKey := osKey + "_active"
|
||||
osActiveCount := osPeers[osActiveKey]
|
||||
osPeers[osActiveKey] = osActiveCount + 1
|
||||
@@ -320,6 +332,8 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
|
||||
metricsProperties["ephemeral_peers_setup_keys"] = ephemeralPeersSKs
|
||||
metricsProperties["ephemeral_peers_setup_keys_usage"] = ephemeralPeersSKUsage
|
||||
metricsProperties["active_peers_last_day"] = activePeersLastDay
|
||||
metricsProperties["active_user_peers_last_day"] = activeUserPeersLastDay
|
||||
metricsProperties["active_users_last_day"] = len(activeUsersLastDay)
|
||||
metricsProperties["user_peers"] = userPeers
|
||||
metricsProperties["rules"] = rules
|
||||
metricsProperties["rules_with_src_posture_checks"] = rulesWithSrcPostureChecks
|
||||
@@ -338,6 +352,7 @@ func (w *Worker) generateProperties(ctx context.Context) properties {
|
||||
metricsProperties["ui_clients"] = uiClient
|
||||
metricsProperties["idp_manager"] = w.idpManager
|
||||
metricsProperties["store_engine"] = w.dataSource.GetStoreEngine()
|
||||
metricsProperties["rosenpass_enabled"] = rosenpassEnabled
|
||||
|
||||
for protocol, count := range rulesProtocol {
|
||||
metricsProperties["rules_protocol_"+protocol] = count
|
||||
|
||||
@@ -47,8 +47,8 @@ func (mockDatasource) GetAllAccounts(_ context.Context) []*types.Account {
|
||||
"1": {
|
||||
ID: "1",
|
||||
UserID: "test",
|
||||
SSHEnabled: true,
|
||||
Meta: nbpeer.PeerSystemMeta{GoOS: "linux", WtVersion: "0.0.1"},
|
||||
SSHEnabled: false,
|
||||
Meta: nbpeer.PeerSystemMeta{GoOS: "linux", WtVersion: "0.0.1", Flags: nbpeer.Flags{ServerSSHAllowed: true, RosenpassEnabled: true}},
|
||||
},
|
||||
},
|
||||
Policies: []*types.Policy{
|
||||
@@ -312,7 +312,19 @@ func TestGenerateProperties(t *testing.T) {
|
||||
}
|
||||
|
||||
if properties["posture_checks"] != 2 {
|
||||
t.Errorf("expected 1 posture_checks, got %d", properties["posture_checks"])
|
||||
t.Errorf("expected 2 posture_checks, got %d", properties["posture_checks"])
|
||||
}
|
||||
|
||||
if properties["rosenpass_enabled"] != 1 {
|
||||
t.Errorf("expected 1 rosenpass_enabled, got %d", properties["rosenpass_enabled"])
|
||||
}
|
||||
|
||||
if properties["active_user_peers_last_day"] != 2 {
|
||||
t.Errorf("expected 2 active_user_peers_last_day, got %d", properties["active_user_peers_last_day"])
|
||||
}
|
||||
|
||||
if properties["active_users_last_day"] != 1 {
|
||||
t.Errorf("expected 1 active_users_last_day, got %d", properties["active_users_last_day"])
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -779,7 +779,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
|
||||
t.Cleanup(ctrl.Finish)
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
permissionsManager := permissions.NewManager(store)
|
||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
}
|
||||
|
||||
func createNSStore(t *testing.T) (store.Store, error) {
|
||||
@@ -988,14 +988,14 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
|
||||
}, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
// Creating a nameserver group with a distribution group no peers should not update account peers
|
||||
// and not send peer update
|
||||
t.Run("creating nameserver group with distribution group no peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -1023,6 +1023,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
|
||||
// saving a nameserver group with a distribution group with no peers should not update account peers
|
||||
// and not send peer update
|
||||
t.Run("saving nameserver group with distribution group no peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -1041,6 +1046,11 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Creating a nameserver group with a distribution group no peers should update account peers and send peer update
|
||||
t.Run("creating nameserver group with distribution group has peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1067,6 +1077,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// saving a nameserver group with a distribution group with peers should update account peers and send peer update
|
||||
t.Run("saving nameserver group with distribution group has peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1097,6 +1111,10 @@ func TestNameServerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting a nameserver group should update account peers and send peer update
|
||||
t.Run("deleting nameserver group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
|
||||
@@ -144,6 +144,10 @@ func (am *DefaultAccountManager) MarkPeerConnected(ctx context.Context, peerPubK
|
||||
if expired {
|
||||
// we need to update other peers because when peer login expires all other peers are notified to disconnect from
|
||||
// the expired one. Here we notify them that connection is now allowed again.
|
||||
err := am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
|
||||
}
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
@@ -270,6 +274,11 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
|
||||
inactivityExpirationChanged = true
|
||||
}
|
||||
|
||||
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return transaction.SavePeer(ctx, store.LockingStrengthUpdate, accountID, peer)
|
||||
})
|
||||
if err != nil {
|
||||
@@ -755,6 +764,13 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if isStatusChanged || sync.UpdateAccountPeers || (updated && len(postureChecks) > 0) {
|
||||
if err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@@ -888,6 +904,13 @@ func (am *DefaultAccountManager) LoginPeer(ctx context.Context, login types.Peer
|
||||
}
|
||||
}
|
||||
|
||||
if updateRemotePeers || isStatusChanged || (isPeerUpdated && len(postureChecks) > 0) {
|
||||
err = transaction.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to increment network serial: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
@@ -1169,7 +1192,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
return
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
globalStart := time.Now()
|
||||
|
||||
approvedPeersMap, err := am.integratedPeerValidator.GetValidatedPeers(account.Id, maps.Values(account.Groups), maps.Values(account.Peers), account.Settings.Extra)
|
||||
if err != nil {
|
||||
@@ -1204,18 +1227,27 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
defer wg.Done()
|
||||
defer func() { <-semaphore }()
|
||||
|
||||
start := time.Now()
|
||||
|
||||
postureChecks, err := am.getPeerPostureChecks(account, p.ID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Debugf("failed to get posture checks for peer %s: %v", peer.ID, err)
|
||||
return
|
||||
}
|
||||
|
||||
am.metrics.UpdateChannelMetrics().CountCalcPostureChecksDuration(time.Since(start))
|
||||
start = time.Now()
|
||||
|
||||
remotePeerNetworkMap := account.GetPeerNetworkMap(ctx, p.ID, customZone, approvedPeersMap, resourcePolicies, routers, am.metrics.AccountManagerMetrics())
|
||||
|
||||
am.metrics.UpdateChannelMetrics().CountCalcPeerNetworkMapDuration(time.Since(start))
|
||||
start = time.Now()
|
||||
|
||||
proxyNetworkMap, ok := proxyNetworkMaps[p.ID]
|
||||
if ok {
|
||||
remotePeerNetworkMap.Merge(proxyNetworkMap)
|
||||
}
|
||||
am.metrics.UpdateChannelMetrics().CountMergeNetworkMapDuration(time.Since(start))
|
||||
|
||||
extraSetting, err := am.settingsManager.GetExtraSettings(ctx, accountID)
|
||||
if err != nil {
|
||||
@@ -1223,7 +1255,10 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
return
|
||||
}
|
||||
|
||||
start = time.Now()
|
||||
update := toSyncResponse(ctx, nil, p, nil, nil, remotePeerNetworkMap, dnsDomain, postureChecks, dnsCache, account.Settings, extraSetting)
|
||||
am.metrics.UpdateChannelMetrics().CountToSyncResponseDuration(time.Since(start))
|
||||
|
||||
am.peersUpdateManager.SendUpdate(ctx, p.ID, &UpdateMessage{Update: update, NetworkMap: remotePeerNetworkMap})
|
||||
}(peer)
|
||||
}
|
||||
@@ -1232,7 +1267,7 @@ func (am *DefaultAccountManager) UpdateAccountPeers(ctx context.Context, account
|
||||
|
||||
wg.Wait()
|
||||
if am.metrics != nil {
|
||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(start))
|
||||
am.metrics.AccountManagerMetrics().CountUpdateAccountPeersDuration(time.Since(globalStart))
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"go.opentelemetry.io/otel/metric/noop"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||
@@ -963,10 +964,14 @@ func BenchmarkUpdateAccountPeers(b *testing.B) {
|
||||
b.Fatalf("Failed to get account: %v", err)
|
||||
}
|
||||
|
||||
peerChannels := make(map[string]chan *UpdateMessage)
|
||||
peerChannels := make(map[string]*UpdateBuffer)
|
||||
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
|
||||
if err != nil {
|
||||
b.Fatalf("Failed to create update channel metrics: %v", err)
|
||||
}
|
||||
|
||||
for peerID := range account.Peers {
|
||||
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
|
||||
peerChannels[peerID] = NewUpdateBuffer(metrics)
|
||||
}
|
||||
|
||||
manager.peersUpdateManager.peerChannels = peerChannels
|
||||
@@ -1028,17 +1033,24 @@ func TestUpdateAccountPeers(t *testing.T) {
|
||||
t.Fatalf("Failed to get account: %v", err)
|
||||
}
|
||||
|
||||
peerChannels := make(map[string]chan *UpdateMessage)
|
||||
peerChannels := make(map[string]*UpdateBuffer)
|
||||
metrics, err := telemetry.NewUpdateChannelMetrics(context.Background(), noop.NewMeterProvider().Meter("test"))
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create update channel metrics: %v", err)
|
||||
}
|
||||
|
||||
for peerID := range account.Peers {
|
||||
peerChannels[peerID] = make(chan *UpdateMessage, channelBufferSize)
|
||||
peerChannels[peerID] = NewUpdateBuffer(metrics)
|
||||
}
|
||||
|
||||
manager.peersUpdateManager.peerChannels = peerChannels
|
||||
manager.UpdateAccountPeers(ctx, account.Id)
|
||||
|
||||
for _, channel := range peerChannels {
|
||||
update := <-channel
|
||||
update, ok := channel.Pop(context.Background())
|
||||
if !ok {
|
||||
t.Fatalf("Expected update for peer, but channel is empty")
|
||||
}
|
||||
assert.Nil(t, update.Update.NetbirdConfig)
|
||||
assert.Equal(t, tc.peers, len(update.NetworkMap.Peers))
|
||||
assert.Equal(t, tc.peers*2, len(update.NetworkMap.FirewallRules))
|
||||
@@ -1267,7 +1279,7 @@ func Test_RegisterPeerByUser(t *testing.T) {
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
permissionsManager := permissions.NewManager(s)
|
||||
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
assert.NoError(t, err)
|
||||
|
||||
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
||||
@@ -1342,7 +1354,7 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
permissionsManager := permissions.NewManager(s)
|
||||
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
assert.NoError(t, err)
|
||||
|
||||
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
||||
@@ -1477,7 +1489,7 @@ func Test_RegisterPeerRollbackOnFailure(t *testing.T) {
|
||||
|
||||
permissionsManager := permissions.NewManager(s)
|
||||
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
assert.NoError(t, err)
|
||||
|
||||
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
||||
@@ -1546,7 +1558,7 @@ func Test_LoginPeer(t *testing.T) {
|
||||
settingsMockManager := settings.NewMockManager(ctrl)
|
||||
permissionsManager := permissions.NewManager(s)
|
||||
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(nil), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
am, err := BuildManager(context.Background(), s, NewPeersUpdateManager(metrics), nil, "", "netbird.cloud", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
assert.NoError(t, err)
|
||||
|
||||
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
||||
@@ -1734,13 +1746,12 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
var peer5 *nbpeer.Peer
|
||||
var peer6 *nbpeer.Peer
|
||||
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
// Updating not expired peer and peer expiration is enabled should not update account peers and not send peer update
|
||||
t.Run("updating not expired peer and peer expiration is enabled", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -1759,6 +1770,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Adding peer to unlinked group should not update account peers and not send peer update
|
||||
t.Run("adding peer to unlinked group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -1784,6 +1799,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting peer with unlinked group should not update account peers and not send peer update
|
||||
t.Run("deleting peer with unlinked group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -1802,6 +1821,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Updating peer label should update account peers and send peer update
|
||||
t.Run("updating peer label", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1820,6 +1843,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("validator requires update", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
requireUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
|
||||
return update, true, nil
|
||||
}
|
||||
@@ -1842,6 +1869,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
})
|
||||
|
||||
t.Run("validator requires no update", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
requireNoUpdateFunc := func(_ context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error) {
|
||||
return update, false, nil
|
||||
}
|
||||
@@ -1865,6 +1896,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Adding peer to group linked with policy should update account peers and send peer update
|
||||
t.Run("adding peer to group linked with policy", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
|
||||
AccountID: account.Id,
|
||||
Enabled: true,
|
||||
@@ -1906,6 +1941,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting peer with linked group to policy should update account peers and send peer update
|
||||
t.Run("deleting peer with linked group to policy", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1924,6 +1963,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Adding peer to group linked with route should update account peers and send peer update
|
||||
t.Run("adding peer to group linked with route", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
route := nbroute.Route{
|
||||
ID: "testingRoute1",
|
||||
Network: netip.MustParsePrefix("100.65.250.202/32"),
|
||||
@@ -1970,6 +2013,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting peer with linked group to route should update account peers and send peer update
|
||||
t.Run("deleting peer with linked group to route", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1988,6 +2035,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Adding peer to group linked with name server group should update account peers and send peer update
|
||||
t.Run("adding peer to group linked with name server group", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
_, err = manager.CreateNameServerGroup(
|
||||
context.Background(), account.Id, "nsGroup", "nsGroup", []nbdns.NameServer{{
|
||||
IP: netip.MustParseAddr("1.1.1.1"),
|
||||
@@ -2025,6 +2076,10 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting peer with linked group to name server group should update account peers and send peer update
|
||||
t.Run("deleting peer with linked group to route", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
|
||||
@@ -1017,17 +1017,16 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
}, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
var policyWithGroupRulesNoPeers *types.Policy
|
||||
var policyWithDestinationPeersOnly *types.Policy
|
||||
var policyWithSourceAndDestinationPeers *types.Policy
|
||||
|
||||
// Saving policy with rule groups with no peers should not update account's peers and not send peer update
|
||||
t.Run("saving policy with rule groups with no peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -1059,6 +1058,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
// Saving policy with source group containing peers, but destination group without peers should
|
||||
// update account's peers and send peer update
|
||||
t.Run("saving policy where source has peers but destination does not", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1091,6 +1094,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
// Saving policy with destination group containing peers, but source group without peers should
|
||||
// update account's peers and send peer update
|
||||
t.Run("saving policy where destination has peers but source does not", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1123,6 +1130,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
// Saving policy with destination and source groups containing peers should update account's peers
|
||||
// and send peer update
|
||||
t.Run("saving policy with source and destination groups with peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1154,6 +1165,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
// Disabling policy with destination and source groups containing peers should update account's peers
|
||||
// and send peer update
|
||||
t.Run("disabling policy with source and destination groups with peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1174,6 +1189,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
// Updating disabled policy with destination and source groups containing peers should not update account's peers
|
||||
// or send peer update
|
||||
t.Run("updating disabled policy with source and destination groups with peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -1195,6 +1214,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
// Enabling policy with destination and source groups containing peers should update account's peers
|
||||
// and send peer update
|
||||
t.Run("enabling policy with source and destination groups with peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1214,6 +1237,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting policy should trigger account peers update and send peer update
|
||||
t.Run("deleting policy with source and destination groups with peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1234,6 +1261,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
// Deleting policy with destination group containing peers, but source group without peers should
|
||||
// update account's peers and send peer update
|
||||
t.Run("deleting policy where destination has peers but source does not", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -1252,6 +1283,10 @@ func TestPolicyAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting policy with no peers in groups should not update account's peers and not send peer update
|
||||
t.Run("deleting policy with no peers in groups", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
|
||||
@@ -140,11 +140,6 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
}, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
|
||||
postureCheckA := &posture.Checks{
|
||||
Name: "postureCheckA",
|
||||
AccountID: account.Id,
|
||||
@@ -171,6 +166,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Saving unused posture check should not update account peers and not send peer update
|
||||
t.Run("saving unused posture check", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -189,6 +188,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Updating unused posture check should not update account peers and not send peer update
|
||||
t.Run("updating unused posture check", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -226,6 +229,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Linking posture check to policy should trigger update account peers and send peer update
|
||||
t.Run("linking posture check to policy with peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -244,6 +251,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Updating linked posture checks should update account peers and send peer update
|
||||
t.Run("updating linked to posture check with peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
postureCheckB.Checks = posture.ChecksDefinition{
|
||||
NBVersionCheck: &posture.NBVersionCheck{
|
||||
MinVersion: "0.29.0",
|
||||
@@ -273,6 +284,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Removing posture check from policy should trigger account peers update and send peer update
|
||||
t.Run("removing posture check from policy", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -292,6 +307,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting unused posture check should not trigger account peers update and not send peer update
|
||||
t.Run("deleting unused posture check", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
@@ -313,6 +332,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Updating linked posture check to policy with no peers should not trigger account peers update and not send peer update
|
||||
t.Run("updating linked posture check to policy with no peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
|
||||
Enabled: true,
|
||||
Rules: []*types.PolicyRule{
|
||||
@@ -352,7 +375,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
// Updating linked posture check to policy where destination has peers but source does not
|
||||
// should trigger account peers update and send peer update
|
||||
t.Run("updating linked posture check to policy where destination has peers but source does not", func(t *testing.T) {
|
||||
updMsg1 := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID)
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer2.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer2.ID)
|
||||
})
|
||||
@@ -374,7 +397,7 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg1)
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
@@ -396,6 +419,10 @@ func TestPostureCheckAccountPeersUpdate(t *testing.T) {
|
||||
// Updating linked client posture check to policy where source has peers but destination does not,
|
||||
// should trigger account peers update and send peer update
|
||||
t.Run("updating linked posture check to policy where source has peers but destination does not", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1.ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1.ID)
|
||||
})
|
||||
_, err = manager.SavePolicy(context.Background(), account.Id, userID, &types.Policy{
|
||||
Enabled: true,
|
||||
Rules: []*types.PolicyRule{
|
||||
|
||||
@@ -1284,7 +1284,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, error) {
|
||||
|
||||
permissionsManager := permissions.NewManager(store)
|
||||
|
||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(nil), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
return BuildManager(context.Background(), store, NewPeersUpdateManager(metrics), nil, "", "netbird.selfhosted", eventStore, nil, false, MocIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager)
|
||||
}
|
||||
|
||||
func createRouterStore(t *testing.T) (store.Store, error) {
|
||||
@@ -1972,13 +1972,12 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
|
||||
}, true)
|
||||
assert.NoError(t, err)
|
||||
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
|
||||
})
|
||||
|
||||
// Creating a route with no routing peer and no peers in PeerGroups or Groups should not update account peers and not send peer update
|
||||
t.Run("creating route no routing peer and no peers in groups", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
|
||||
})
|
||||
route := route.Route{
|
||||
ID: "testingRoute1",
|
||||
Network: netip.MustParsePrefix("100.65.250.202/32"),
|
||||
@@ -2015,6 +2014,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Creating a route with no routing peer and having peers in groups should update account peers and send peer update
|
||||
t.Run("creating a route with peers in PeerGroups and Groups", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
|
||||
})
|
||||
route := route.Route{
|
||||
ID: "testingRoute2",
|
||||
Network: netip.MustParsePrefix("192.0.2.0/32"),
|
||||
@@ -2064,6 +2067,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Creating route should update account peers and send peer update
|
||||
t.Run("creating route with a routing peer", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -2089,6 +2096,11 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
|
||||
t.Run("updating route", func(t *testing.T) {
|
||||
baseRoute.Groups = []string{routeGroup1, routeGroup2}
|
||||
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
|
||||
})
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -2107,6 +2119,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Deleting the route should update account peers and send peer update
|
||||
t.Run("deleting route", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
|
||||
})
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
@@ -2125,6 +2141,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Adding peer to route peer groups that do not have any peers should update account peers and send peer update
|
||||
t.Run("adding peer to route peer groups that do not have any peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
|
||||
})
|
||||
newRoute := route.Route{
|
||||
Network: netip.MustParsePrefix("192.168.12.0/16"),
|
||||
NetID: "superNet",
|
||||
@@ -2165,6 +2185,10 @@ func TestRouteAccountPeersUpdate(t *testing.T) {
|
||||
|
||||
// Adding peer to route groups that do not have any peers should update account peers and send peer update
|
||||
t.Run("adding peer to route groups that do not have any peers", func(t *testing.T) {
|
||||
updMsg := manager.peersUpdateManager.CreateChannel(context.Background(), peer1ID)
|
||||
t.Cleanup(func() {
|
||||
manager.peersUpdateManager.CloseChannel(context.Background(), peer1ID)
|
||||
})
|
||||
newRoute := route.Route{
|
||||
Network: netip.MustParsePrefix("192.168.13.0/16"),
|
||||
NetID: "superNet",
|
||||
|
||||
@@ -18,6 +18,13 @@ type UpdateChannelMetrics struct {
|
||||
getAllConnectedPeersDurationMicro metric.Int64Histogram
|
||||
getAllConnectedPeers metric.Int64Histogram
|
||||
hasChannelDurationMicro metric.Int64Histogram
|
||||
calcPostureChecksDurationMicro metric.Int64Histogram
|
||||
calcPeerNetworkMapDurationMs metric.Int64Histogram
|
||||
mergeNetworkMapDurationMicro metric.Int64Histogram
|
||||
toSyncResponseDurationMicro metric.Int64Histogram
|
||||
bufferPushCounter metric.Int64Counter
|
||||
bufferOverwriteCounter metric.Int64Counter
|
||||
bufferIgnoreCounter metric.Int64Counter
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
@@ -89,6 +96,59 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
|
||||
return nil, err
|
||||
}
|
||||
|
||||
calcPostureChecksDurationMicro, err := meter.Int64Histogram("management.updatechannel.calc.posturechecks.duration.micro",
|
||||
metric.WithUnit("microseconds"),
|
||||
metric.WithDescription("Duration of how long it takes to get the posture checks for a peer"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
calcPeerNetworkMapDurationMs, err := meter.Int64Histogram("management.updatechannel.calc.networkmap.duration.ms",
|
||||
metric.WithUnit("milliseconds"),
|
||||
metric.WithDescription("Duration of how long it takes to calculate the network map for a peer"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
mergeNetworkMapDurationMicro, err := meter.Int64Histogram("management.updatechannel.merge.networkmap.duration.micro",
|
||||
metric.WithUnit("microseconds"),
|
||||
metric.WithDescription("Duration of how long it takes to merge the network maps for a peer"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
toSyncResponseDurationMicro, err := meter.Int64Histogram("management.updatechannel.tosyncresponse.duration.micro",
|
||||
metric.WithUnit("microseconds"),
|
||||
metric.WithDescription("Duration of how long it takes to convert the network map to sync response"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bufferPushCounter, err := meter.Int64Counter("management.updatechannel.buffer.push.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of updates pushed to an empty buffer"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bufferOverwriteCounter, err := meter.Int64Counter("management.updatechannel.buffer.overwrite.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of updates overwriting old unsent updates in the buffer"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bufferIgnoreCounter, err := meter.Int64Counter("management.updatechannel.buffer.ignore.counter",
|
||||
metric.WithUnit("1"),
|
||||
metric.WithDescription("Number of updates being ignored due to old network serial"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &UpdateChannelMetrics{
|
||||
createChannelDurationMicro: createChannelDurationMicro,
|
||||
closeChannelDurationMicro: closeChannelDurationMicro,
|
||||
@@ -98,6 +158,13 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
|
||||
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
|
||||
getAllConnectedPeers: getAllConnectedPeers,
|
||||
hasChannelDurationMicro: hasChannelDurationMicro,
|
||||
calcPostureChecksDurationMicro: calcPostureChecksDurationMicro,
|
||||
calcPeerNetworkMapDurationMs: calcPeerNetworkMapDurationMs,
|
||||
mergeNetworkMapDurationMicro: mergeNetworkMapDurationMicro,
|
||||
toSyncResponseDurationMicro: toSyncResponseDurationMicro,
|
||||
bufferPushCounter: bufferPushCounter,
|
||||
bufferOverwriteCounter: bufferOverwriteCounter,
|
||||
bufferIgnoreCounter: bufferIgnoreCounter,
|
||||
ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
@@ -137,3 +204,34 @@ func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration
|
||||
func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) {
|
||||
metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
|
||||
}
|
||||
|
||||
func (metrics *UpdateChannelMetrics) CountCalcPostureChecksDuration(duration time.Duration) {
|
||||
metrics.calcPostureChecksDurationMicro.Record(metrics.ctx, duration.Microseconds())
|
||||
}
|
||||
|
||||
func (metrics *UpdateChannelMetrics) CountCalcPeerNetworkMapDuration(duration time.Duration) {
|
||||
metrics.calcPeerNetworkMapDurationMs.Record(metrics.ctx, duration.Milliseconds())
|
||||
}
|
||||
|
||||
func (metrics *UpdateChannelMetrics) CountMergeNetworkMapDuration(duration time.Duration) {
|
||||
metrics.mergeNetworkMapDurationMicro.Record(metrics.ctx, duration.Microseconds())
|
||||
}
|
||||
|
||||
func (metrics *UpdateChannelMetrics) CountToSyncResponseDuration(duration time.Duration) {
|
||||
metrics.toSyncResponseDurationMicro.Record(metrics.ctx, duration.Microseconds())
|
||||
}
|
||||
|
||||
// CountBufferPush counts how many buffer push operations are happening on an empty buffer
|
||||
func (metrics *UpdateChannelMetrics) CountBufferPush() {
|
||||
metrics.bufferPushCounter.Add(metrics.ctx, 1)
|
||||
}
|
||||
|
||||
// CountBufferOverwrite counts how many buffer overwrite operations are happening on a non-empty buffer
|
||||
func (metrics *UpdateChannelMetrics) CountBufferOverwrite() {
|
||||
metrics.bufferOverwriteCounter.Add(metrics.ctx, 1)
|
||||
}
|
||||
|
||||
// CountBufferIgnore counts how many buffer ignore operations are happening when a new update is pushed
|
||||
func (metrics *UpdateChannelMetrics) CountBufferIgnore() {
|
||||
metrics.bufferIgnoreCounter.Add(metrics.ctx, 1)
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import (
|
||||
|
||||
"github.com/netbirdio/netbird/management/proto"
|
||||
"github.com/netbirdio/netbird/management/server/settings"
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
)
|
||||
@@ -29,7 +30,11 @@ var TurnTestHost = &types.Host{
|
||||
func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
|
||||
ttl := util.Duration{Duration: time.Hour}
|
||||
secret := "some_secret"
|
||||
peersManager := NewPeersUpdateManager(nil)
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create metrics: %v", err)
|
||||
}
|
||||
peersManager := NewPeersUpdateManager(metrics)
|
||||
|
||||
rc := &types.Relay{
|
||||
Addresses: []string{"localhost:0"},
|
||||
@@ -77,9 +82,25 @@ func TestTimeBasedAuthSecretsManager_GenerateCredentials(t *testing.T) {
|
||||
func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
|
||||
ttl := util.Duration{Duration: 2 * time.Second}
|
||||
secret := "some_secret"
|
||||
peersManager := NewPeersUpdateManager(nil)
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create metrics: %v", err)
|
||||
}
|
||||
peersManager := NewPeersUpdateManager(metrics)
|
||||
peer := "some_peer"
|
||||
updateChannel := peersManager.CreateChannel(context.Background(), peer)
|
||||
buffer := peersManager.CreateChannel(context.Background(), peer)
|
||||
resultCh := make(chan struct {
|
||||
msg *UpdateMessage
|
||||
ok bool
|
||||
}, 1)
|
||||
|
||||
go func() {
|
||||
msg, ok := buffer.Pop(context.Background())
|
||||
resultCh <- struct {
|
||||
msg *UpdateMessage
|
||||
ok bool
|
||||
}{msg, ok}
|
||||
}()
|
||||
|
||||
rc := &types.Relay{
|
||||
Addresses: []string{"localhost:0"},
|
||||
@@ -117,8 +138,8 @@ func TestTimeBasedAuthSecretsManager_SetupRefresh(t *testing.T) {
|
||||
loop:
|
||||
for timeout := time.After(5 * time.Second); ; {
|
||||
select {
|
||||
case update := <-updateChannel:
|
||||
updates = append(updates, update)
|
||||
case update := <-resultCh:
|
||||
updates = append(updates, update.msg)
|
||||
case <-timeout:
|
||||
break loop
|
||||
}
|
||||
@@ -181,7 +202,11 @@ loop:
|
||||
func TestTimeBasedAuthSecretsManager_CancelRefresh(t *testing.T) {
|
||||
ttl := util.Duration{Duration: time.Hour}
|
||||
secret := "some_secret"
|
||||
peersManager := NewPeersUpdateManager(nil)
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create metrics: %v", err)
|
||||
}
|
||||
peersManager := NewPeersUpdateManager(metrics)
|
||||
peer := "some_peer"
|
||||
|
||||
rc := &types.Relay{
|
||||
|
||||
93
management/server/update_buffer.go
Normal file
93
management/server/update_buffer.go
Normal file
@@ -0,0 +1,93 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
)
|
||||
|
||||
type UpdateBuffer struct {
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
update *UpdateMessage
|
||||
closed bool
|
||||
metrics *telemetry.UpdateChannelMetrics
|
||||
}
|
||||
|
||||
func NewUpdateBuffer(metrics *telemetry.UpdateChannelMetrics) *UpdateBuffer {
|
||||
ub := &UpdateBuffer{metrics: metrics}
|
||||
ub.cond = sync.NewCond(&ub.mu)
|
||||
return ub
|
||||
}
|
||||
|
||||
func (b *UpdateBuffer) Push(update *UpdateMessage) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
if b.update != nil && update.Update.NetbirdConfig != nil {
|
||||
if update.Update.NetbirdConfig.Relay != nil {
|
||||
b.update.Update.NetbirdConfig.Relay = update.Update.NetbirdConfig.Relay
|
||||
}
|
||||
if update.Update.NetbirdConfig.Signal != nil {
|
||||
b.update.Update.NetbirdConfig.Signal = update.Update.NetbirdConfig.Signal
|
||||
}
|
||||
if update.Update.NetbirdConfig.Flow != nil {
|
||||
b.update.Update.NetbirdConfig.Flow = update.Update.NetbirdConfig.Flow
|
||||
}
|
||||
if update.Update.NetbirdConfig.Stuns != nil {
|
||||
b.update.Update.NetbirdConfig.Stuns = update.Update.NetbirdConfig.Stuns
|
||||
}
|
||||
if update.Update.NetbirdConfig.Turns != nil {
|
||||
b.update.Update.NetbirdConfig.Turns = update.Update.NetbirdConfig.Turns
|
||||
}
|
||||
}
|
||||
|
||||
// the equal case we need because we don't always increment the serial number
|
||||
if b.update == nil || update.Update.NetworkMap.Serial > b.update.Update.NetworkMap.Serial || b.update.Update.NetworkMap.Serial == 0 {
|
||||
b.update = update
|
||||
b.cond.Signal()
|
||||
if b.update == nil {
|
||||
b.metrics.CountBufferPush()
|
||||
return
|
||||
}
|
||||
|
||||
b.metrics.CountBufferOverwrite()
|
||||
return
|
||||
}
|
||||
|
||||
b.metrics.CountBufferIgnore()
|
||||
}
|
||||
|
||||
func (b *UpdateBuffer) Pop(ctx context.Context) (*UpdateMessage, bool) {
|
||||
b.mu.Lock()
|
||||
defer b.mu.Unlock()
|
||||
|
||||
for b.update == nil && !b.closed {
|
||||
waitCh := make(chan struct{})
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
b.cond.Broadcast()
|
||||
case <-waitCh:
|
||||
// noop
|
||||
}
|
||||
}()
|
||||
b.cond.Wait()
|
||||
close(waitCh)
|
||||
}
|
||||
|
||||
if b.closed {
|
||||
return nil, false
|
||||
}
|
||||
msg := b.update
|
||||
b.update = nil
|
||||
return msg, true
|
||||
}
|
||||
|
||||
func (b *UpdateBuffer) Close() {
|
||||
b.mu.Lock()
|
||||
b.closed = true
|
||||
b.cond.Broadcast()
|
||||
b.mu.Unlock()
|
||||
}
|
||||
@@ -12,8 +12,6 @@ import (
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
)
|
||||
|
||||
const channelBufferSize = 100
|
||||
|
||||
type UpdateMessage struct {
|
||||
Update *proto.SyncResponse
|
||||
NetworkMap *types.NetworkMap
|
||||
@@ -21,7 +19,7 @@ type UpdateMessage struct {
|
||||
|
||||
type PeersUpdateManager struct {
|
||||
// peerChannels is an update channel indexed by Peer.ID
|
||||
peerChannels map[string]chan *UpdateMessage
|
||||
peerChannels map[string]*UpdateBuffer
|
||||
// channelsMux keeps the mutex to access peerChannels
|
||||
channelsMux *sync.RWMutex
|
||||
// metrics provides method to collect application metrics
|
||||
@@ -31,7 +29,7 @@ type PeersUpdateManager struct {
|
||||
// NewPeersUpdateManager returns a new instance of PeersUpdateManager
|
||||
func NewPeersUpdateManager(metrics telemetry.AppMetrics) *PeersUpdateManager {
|
||||
return &PeersUpdateManager{
|
||||
peerChannels: make(map[string]chan *UpdateMessage),
|
||||
peerChannels: make(map[string]*UpdateBuffer),
|
||||
channelsMux: &sync.RWMutex{},
|
||||
metrics: metrics,
|
||||
}
|
||||
@@ -53,20 +51,14 @@ func (p *PeersUpdateManager) SendUpdate(ctx context.Context, peerID string, upda
|
||||
|
||||
if channel, ok := p.peerChannels[peerID]; ok {
|
||||
found = true
|
||||
select {
|
||||
case channel <- update:
|
||||
log.WithContext(ctx).Debugf("update was sent to channel for peer %s", peerID)
|
||||
default:
|
||||
dropped = true
|
||||
log.WithContext(ctx).Warnf("channel for peer %s is %d full or closed", peerID, len(channel))
|
||||
}
|
||||
channel.Push(update)
|
||||
} else {
|
||||
log.WithContext(ctx).Debugf("peer %s has no channel", peerID)
|
||||
}
|
||||
}
|
||||
|
||||
// CreateChannel creates a go channel for a given peer used to deliver updates relevant to the peer.
|
||||
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) chan *UpdateMessage {
|
||||
func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) *UpdateBuffer {
|
||||
start := time.Now()
|
||||
|
||||
closed := false
|
||||
@@ -81,22 +73,22 @@ func (p *PeersUpdateManager) CreateChannel(ctx context.Context, peerID string) c
|
||||
|
||||
if channel, ok := p.peerChannels[peerID]; ok {
|
||||
closed = true
|
||||
channel.Close()
|
||||
delete(p.peerChannels, peerID)
|
||||
close(channel)
|
||||
}
|
||||
// mbragin: todo shouldn't it be more? or configurable?
|
||||
channel := make(chan *UpdateMessage, channelBufferSize)
|
||||
p.peerChannels[peerID] = channel
|
||||
buffer := NewUpdateBuffer(p.metrics.UpdateChannelMetrics())
|
||||
p.peerChannels[peerID] = buffer
|
||||
|
||||
log.WithContext(ctx).Debugf("opened updates channel for a peer %s", peerID)
|
||||
|
||||
return channel
|
||||
return buffer
|
||||
}
|
||||
|
||||
func (p *PeersUpdateManager) closeChannel(ctx context.Context, peerID string) {
|
||||
if channel, ok := p.peerChannels[peerID]; ok {
|
||||
delete(p.peerChannels, peerID)
|
||||
close(channel)
|
||||
channel.Close()
|
||||
|
||||
log.WithContext(ctx).Debugf("closed updates channel of a peer %s", peerID)
|
||||
return
|
||||
|
||||
@@ -6,13 +6,19 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/netbirdio/netbird/management/proto"
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
)
|
||||
|
||||
// var peersUpdater *PeersUpdateManager
|
||||
|
||||
func TestCreateChannel(t *testing.T) {
|
||||
peer := "test-create"
|
||||
peersUpdater := NewPeersUpdateManager(nil)
|
||||
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create metrics: %v", err)
|
||||
}
|
||||
peersUpdater := NewPeersUpdateManager(metrics)
|
||||
defer peersUpdater.CloseChannel(context.Background(), peer)
|
||||
|
||||
_ = peersUpdater.CreateChannel(context.Background(), peer)
|
||||
@@ -23,7 +29,12 @@ func TestCreateChannel(t *testing.T) {
|
||||
|
||||
func TestSendUpdate(t *testing.T) {
|
||||
peer := "test-sendupdate"
|
||||
peersUpdater := NewPeersUpdateManager(nil)
|
||||
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create metrics: %v", err)
|
||||
}
|
||||
peersUpdater := NewPeersUpdateManager(metrics)
|
||||
update1 := &UpdateMessage{Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: 0,
|
||||
@@ -33,41 +44,62 @@ func TestSendUpdate(t *testing.T) {
|
||||
if _, ok := peersUpdater.peerChannels[peer]; !ok {
|
||||
t.Error("Error creating the channel")
|
||||
}
|
||||
|
||||
resultCh := make(chan struct {
|
||||
msg *UpdateMessage
|
||||
ok bool
|
||||
}, 1)
|
||||
|
||||
go func() {
|
||||
for {
|
||||
msg, ok := peersUpdater.peerChannels[peer].Pop(context.Background())
|
||||
resultCh <- struct {
|
||||
msg *UpdateMessage
|
||||
ok bool
|
||||
}{msg, ok}
|
||||
}
|
||||
}()
|
||||
|
||||
peersUpdater.SendUpdate(context.Background(), peer, update1)
|
||||
select {
|
||||
case <-peersUpdater.peerChannels[peer]:
|
||||
default:
|
||||
case <-resultCh:
|
||||
case <-time.After(1 * time.Second):
|
||||
t.Error("Update wasn't send")
|
||||
}
|
||||
|
||||
for range [channelBufferSize]int{} {
|
||||
peersUpdater.SendUpdate(context.Background(), peer, update1)
|
||||
}
|
||||
|
||||
update2 := &UpdateMessage{Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: 10,
|
||||
},
|
||||
}}
|
||||
|
||||
update3 := &UpdateMessage{Update: &proto.SyncResponse{
|
||||
NetworkMap: &proto.NetworkMap{
|
||||
Serial: 8,
|
||||
},
|
||||
}}
|
||||
|
||||
peersUpdater.SendUpdate(context.Background(), peer, update2)
|
||||
timeout := time.After(5 * time.Second)
|
||||
for range [channelBufferSize]int{} {
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Error("timed out reading previously sent updates")
|
||||
case updateReader := <-peersUpdater.peerChannels[peer]:
|
||||
if updateReader.Update.NetworkMap.Serial == update2.Update.NetworkMap.Serial {
|
||||
t.Error("got the update that shouldn't have been sent")
|
||||
}
|
||||
|
||||
select {
|
||||
case <-timeout:
|
||||
t.Error("timed out reading previously sent updates")
|
||||
case updateReader := <-resultCh:
|
||||
if updateReader.msg.Update.NetworkMap.Serial == update3.Update.NetworkMap.Serial {
|
||||
t.Error("got the update that shouldn't have been sent")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestCloseChannel(t *testing.T) {
|
||||
peer := "test-close"
|
||||
peersUpdater := NewPeersUpdateManager(nil)
|
||||
|
||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||
if err != nil {
|
||||
t.Fatalf("failed to create metrics: %v", err)
|
||||
}
|
||||
peersUpdater := NewPeersUpdateManager(metrics)
|
||||
_ = peersUpdater.CreateChannel(context.Background(), peer)
|
||||
if _, ok := peersUpdater.peerChannels[peer]; !ok {
|
||||
t.Error("Error creating the channel")
|
||||
|
||||
@@ -960,6 +960,12 @@ func (am *DefaultAccountManager) expireAndUpdatePeers(ctx context.Context, accou
|
||||
)
|
||||
}
|
||||
|
||||
// ideally this should run in a transaction
|
||||
err = am.Store.IncrementNetworkSerial(ctx, store.LockingStrengthUpdate, accountID)
|
||||
if err != nil {
|
||||
log.Errorf("failed to increment network serial number for account %s: %v", accountID, err)
|
||||
}
|
||||
|
||||
if len(peerIDs) != 0 {
|
||||
// this will trigger peer disconnect from the management service
|
||||
am.peersUpdateManager.CloseChannels(ctx, peerIDs)
|
||||
|
||||
@@ -21,6 +21,7 @@ var (
|
||||
// Update fetch the version info periodically and notify the onUpdateListener in case the UI version or the
|
||||
// daemon version are deprecated
|
||||
type Update struct {
|
||||
httpAgent string
|
||||
uiVersion *goversion.Version
|
||||
daemonVersion *goversion.Version
|
||||
latestAvailable *goversion.Version
|
||||
@@ -34,7 +35,7 @@ type Update struct {
|
||||
}
|
||||
|
||||
// NewUpdate instantiate Update and start to fetch the new version information
|
||||
func NewUpdate() *Update {
|
||||
func NewUpdate(httpAgent string) *Update {
|
||||
currentVersion, err := goversion.NewVersion(version)
|
||||
if err != nil {
|
||||
currentVersion, _ = goversion.NewVersion("0.0.0")
|
||||
@@ -43,6 +44,7 @@ func NewUpdate() *Update {
|
||||
latestAvailable, _ := goversion.NewVersion("0.0.0")
|
||||
|
||||
u := &Update{
|
||||
httpAgent: httpAgent,
|
||||
latestAvailable: latestAvailable,
|
||||
uiVersion: currentVersion,
|
||||
fetchTicker: time.NewTicker(fetchPeriod),
|
||||
@@ -112,7 +114,15 @@ func (u *Update) startFetcher() {
|
||||
func (u *Update) fetchVersion() bool {
|
||||
log.Debugf("fetching version info from %s", versionURL)
|
||||
|
||||
resp, err := http.Get(versionURL)
|
||||
req, err := http.NewRequest("GET", versionURL, nil)
|
||||
if err != nil {
|
||||
log.Errorf("failed to create request for version info: %s", err)
|
||||
return false
|
||||
}
|
||||
|
||||
req.Header.Set("User-Agent", u.httpAgent)
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
log.Errorf("failed to fetch version info: %s", err)
|
||||
return false
|
||||
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const httpAgent = "pkg/test"
|
||||
|
||||
func TestNewUpdate(t *testing.T) {
|
||||
version = "1.0.0"
|
||||
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -21,7 +23,7 @@ func TestNewUpdate(t *testing.T) {
|
||||
wg.Add(1)
|
||||
|
||||
onUpdate := false
|
||||
u := NewUpdate()
|
||||
u := NewUpdate(httpAgent)
|
||||
defer u.StopWatch()
|
||||
u.SetOnUpdateListener(func() {
|
||||
onUpdate = true
|
||||
@@ -46,7 +48,7 @@ func TestDoNotUpdate(t *testing.T) {
|
||||
wg.Add(1)
|
||||
|
||||
onUpdate := false
|
||||
u := NewUpdate()
|
||||
u := NewUpdate(httpAgent)
|
||||
defer u.StopWatch()
|
||||
u.SetOnUpdateListener(func() {
|
||||
onUpdate = true
|
||||
@@ -71,7 +73,7 @@ func TestDaemonUpdate(t *testing.T) {
|
||||
wg.Add(1)
|
||||
|
||||
onUpdate := false
|
||||
u := NewUpdate()
|
||||
u := NewUpdate(httpAgent)
|
||||
defer u.StopWatch()
|
||||
u.SetOnUpdateListener(func() {
|
||||
onUpdate = true
|
||||
|
||||
Reference in New Issue
Block a user