mirror of
https://github.com/netbirdio/netbird.git
synced 2026-07-05 14:19:54 +00:00
Compare commits
25 Commits
fix/routes
...
netmap_pro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d613189f6 | ||
|
|
4b3dd9103d | ||
|
|
8e3b284f4b | ||
|
|
21aa933584 | ||
|
|
1dfa85a917 | ||
|
|
859fe19fff | ||
|
|
e40cb294f6 | ||
|
|
e203e0f42a | ||
|
|
7673067605 | ||
|
|
167be3a30f | ||
|
|
1d8b5f6e5c | ||
|
|
79567fe347 | ||
|
|
cf8d92fbb0 | ||
|
|
b70fc4015b | ||
|
|
4988b6726e | ||
|
|
2552830184 | ||
|
|
3b8fc688f4 | ||
|
|
d82d62e818 | ||
|
|
0bf964dad7 | ||
|
|
297dcb3e24 | ||
|
|
bc22926fe0 | ||
|
|
d3f2ef9adb | ||
|
|
5bec1e8f03 | ||
|
|
74bb5c613e | ||
|
|
29dde908ae |
2
.github/workflows/golang-test-linux.yml
vendored
2
.github/workflows/golang-test-linux.yml
vendored
@@ -158,7 +158,7 @@ jobs:
|
|||||||
run: git --no-pager diff --exit-code
|
run: git --no-pager diff --exit-code
|
||||||
|
|
||||||
- name: Test
|
- name: Test
|
||||||
run: CGO_ENABLED=1 GOARCH=${{ matrix.arch }} CI=true go test -coverprofile=coverage.txt -tags devcert -timeout 10m -p 1 $(go list ./... | grep -v -e /management -e /signal -e /relay -e /proxy -e /combined)
|
run: CGO_ENABLED=1 GOARCH=${{ matrix.arch }} CI=true go test -coverprofile=coverage.txt -tags 'devcert privileged' -exec 'sudo --preserve-env=CI,CGO_ENABLED' -timeout 10m -p 1 $(go list ./... | grep -v -e /management -e /signal -e /relay -e /proxy -e /combined -e /client/testutil/privileged)
|
||||||
|
|
||||||
- name: Upload coverage reports to Codecov
|
- name: Upload coverage reports to Codecov
|
||||||
if: matrix.arch == 'amd64'
|
if: matrix.arch == 'amd64'
|
||||||
|
|||||||
5
.github/workflows/release.yml
vendored
5
.github/workflows/release.yml
vendored
@@ -293,8 +293,11 @@ jobs:
|
|||||||
${{ steps.goreleaser.outputs.artifacts }}
|
${{ steps.goreleaser.outputs.artifacts }}
|
||||||
JSON
|
JSON
|
||||||
|
|
||||||
|
# dockers_v2 artifacts have no top-level goarch field, so match the
|
||||||
|
# per-platform -amd64 tag suffix instead; it works for both the old
|
||||||
|
# dockers and the new dockers_v2 image naming.
|
||||||
mapfile -t src_images < <(
|
mapfile -t src_images < <(
|
||||||
jq -r '.[] | select(.type == "Docker Image") | select(.goarch == "amd64") | .name | select(startswith("ghcr.io/"))' /tmp/goreleaser-artifacts.json
|
jq -r '.[] | select(.type == "Docker Image") | .name | select(startswith("ghcr.io/") and endswith("-amd64"))' /tmp/goreleaser-artifacts.json
|
||||||
)
|
)
|
||||||
|
|
||||||
for src in "${src_images[@]}"; do
|
for src in "${src_images[@]}"; do
|
||||||
|
|||||||
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,3 +1,4 @@
|
|||||||
|
.claude
|
||||||
.idea
|
.idea
|
||||||
.run
|
.run
|
||||||
*.iml
|
*.iml
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ var (
|
|||||||
EnvKeyNBForceRelay = peer.EnvKeyNBForceRelay
|
EnvKeyNBForceRelay = peer.EnvKeyNBForceRelay
|
||||||
|
|
||||||
// EnvKeyNBLazyConn Exported for Android java client to configure lazy connection
|
// EnvKeyNBLazyConn Exported for Android java client to configure lazy connection
|
||||||
EnvKeyNBLazyConn = lazyconn.EnvEnableLazyConn
|
EnvKeyNBLazyConn = lazyconn.EnvLazyConn
|
||||||
|
|
||||||
// EnvKeyNBInactivityThreshold Exported for Android java client to configure connection inactivity threshold
|
// EnvKeyNBInactivityThreshold Exported for Android java client to configure connection inactivity threshold
|
||||||
EnvKeyNBInactivityThreshold = lazyconn.EnvInactivityThreshold
|
EnvKeyNBInactivityThreshold = lazyconn.EnvInactivityThreshold
|
||||||
|
|||||||
@@ -71,12 +71,14 @@ var (
|
|||||||
extraIFaceBlackList []string
|
extraIFaceBlackList []string
|
||||||
anonymizeFlag bool
|
anonymizeFlag bool
|
||||||
dnsRouteInterval time.Duration
|
dnsRouteInterval time.Duration
|
||||||
lazyConnEnabled bool
|
// lazyConnEnabled is the parse target for the deprecated --enable-lazy-connection
|
||||||
mtu uint16
|
// flag. The flag is inert; the value is no longer read (use NB_LAZY_CONN instead).
|
||||||
profilesDisabled bool
|
lazyConnEnabled bool
|
||||||
updateSettingsDisabled bool
|
mtu uint16
|
||||||
captureEnabled bool
|
profilesDisabled bool
|
||||||
networksDisabled bool
|
updateSettingsDisabled bool
|
||||||
|
captureEnabled bool
|
||||||
|
networksDisabled bool
|
||||||
|
|
||||||
rootCmd = &cobra.Command{
|
rootCmd = &cobra.Command{
|
||||||
Use: "netbird",
|
Use: "netbird",
|
||||||
@@ -210,7 +212,8 @@ func init() {
|
|||||||
upCmd.PersistentFlags().BoolVar(&rosenpassEnabled, enableRosenpassFlag, false, "[Experimental] Enable Rosenpass feature. If enabled, the connection will be post-quantum secured via Rosenpass.")
|
upCmd.PersistentFlags().BoolVar(&rosenpassEnabled, enableRosenpassFlag, false, "[Experimental] Enable Rosenpass feature. If enabled, the connection will be post-quantum secured via Rosenpass.")
|
||||||
upCmd.PersistentFlags().BoolVar(&rosenpassPermissive, rosenpassPermissiveFlag, false, "[Experimental] Enable Rosenpass in permissive mode to allow this peer to accept WireGuard connections without requiring Rosenpass functionality from peers that do not have Rosenpass enabled.")
|
upCmd.PersistentFlags().BoolVar(&rosenpassPermissive, rosenpassPermissiveFlag, false, "[Experimental] Enable Rosenpass in permissive mode to allow this peer to accept WireGuard connections without requiring Rosenpass functionality from peers that do not have Rosenpass enabled.")
|
||||||
upCmd.PersistentFlags().BoolVar(&autoConnectDisabled, disableAutoConnectFlag, false, "Disables auto-connect feature. If enabled, then the client won't connect automatically when the service starts.")
|
upCmd.PersistentFlags().BoolVar(&autoConnectDisabled, disableAutoConnectFlag, false, "Disables auto-connect feature. If enabled, then the client won't connect automatically when the service starts.")
|
||||||
upCmd.PersistentFlags().BoolVar(&lazyConnEnabled, enableLazyConnectionFlag, false, "[Experimental] Enable the lazy connection feature. If enabled, the client will establish connections on-demand. Note: this setting may be overridden by management configuration.")
|
upCmd.PersistentFlags().BoolVar(&lazyConnEnabled, enableLazyConnectionFlag, false, "Deprecated: no longer used. Lazy connections are controlled by the server and the NB_LAZY_CONN environment variable.")
|
||||||
|
_ = upCmd.PersistentFlags().MarkDeprecated(enableLazyConnectionFlag, "no longer used; lazy connections are controlled by the server and the NB_LAZY_CONN environment variable")
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -479,10 +479,6 @@ func setupSetConfigReq(customDNSAddressConverted []byte, cmd *cobra.Command, pro
|
|||||||
req.DisableIpv6 = &disableIPv6
|
req.DisableIpv6 = &disableIPv6
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flag(enableLazyConnectionFlag).Changed {
|
|
||||||
req.LazyConnectionEnabled = &lazyConnEnabled
|
|
||||||
}
|
|
||||||
|
|
||||||
return &req
|
return &req
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -600,9 +596,6 @@ func setupConfig(customDNSAddressConverted []byte, cmd *cobra.Command, configFil
|
|||||||
ic.DisableIPv6 = &disableIPv6
|
ic.DisableIPv6 = &disableIPv6
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flag(enableLazyConnectionFlag).Changed {
|
|
||||||
ic.LazyConnectionEnabled = &lazyConnEnabled
|
|
||||||
}
|
|
||||||
return &ic, nil
|
return &ic, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -718,9 +711,6 @@ func setupLoginRequest(providedSetupKey string, customDNSAddressConverted []byte
|
|||||||
loginRequest.DisableIpv6 = &disableIPv6
|
loginRequest.DisableIpv6 = &disableIPv6
|
||||||
}
|
}
|
||||||
|
|
||||||
if cmd.Flag(enableLazyConnectionFlag).Changed {
|
|
||||||
loginRequest.LazyConnectionEnabled = &lazyConnEnabled
|
|
||||||
}
|
|
||||||
return &loginRequest, nil
|
return &loginRequest, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -17,12 +17,15 @@ import (
|
|||||||
|
|
||||||
type KernelConfigurer struct {
|
type KernelConfigurer struct {
|
||||||
deviceName string
|
deviceName string
|
||||||
|
statsCache *statsCache
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewKernelConfigurer(deviceName string) *KernelConfigurer {
|
func NewKernelConfigurer(deviceName string) *KernelConfigurer {
|
||||||
return &KernelConfigurer{
|
c := &KernelConfigurer{
|
||||||
deviceName: deviceName,
|
deviceName: deviceName,
|
||||||
}
|
}
|
||||||
|
c.statsCache = newStatsCache(statsCacheTTL, c.fetchStats)
|
||||||
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KernelConfigurer) ConfigureInterface(privateKey string, port int) error {
|
func (c *KernelConfigurer) ConfigureInterface(privateKey string, port int) error {
|
||||||
@@ -246,12 +249,6 @@ func (c *KernelConfigurer) configure(config wgtypes.Config) error {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// validate if device with name exists
|
|
||||||
_, err = wg.Device(c.deviceName)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return wg.ConfigureDevice(c.deviceName, config)
|
return wg.ConfigureDevice(c.deviceName, config)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -300,6 +297,14 @@ func (c *KernelConfigurer) FullStats() (*Stats, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *KernelConfigurer) GetStats() (map[string]WGStats, error) {
|
func (c *KernelConfigurer) GetStats() (map[string]WGStats, error) {
|
||||||
|
return c.statsCache.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *KernelConfigurer) LastActivities() map[string]monotime.Time {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *KernelConfigurer) fetchStats() (map[string]WGStats, error) {
|
||||||
stats := make(map[string]WGStats)
|
stats := make(map[string]WGStats)
|
||||||
wg, err := wgctrl.New()
|
wg, err := wgctrl.New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -326,7 +331,3 @@ func (c *KernelConfigurer) GetStats() (map[string]WGStats, error) {
|
|||||||
}
|
}
|
||||||
return stats, nil
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KernelConfigurer) LastActivities() map[string]monotime.Time {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|||||||
52
client/iface/configurer/stats_cache.go
Normal file
52
client/iface/configurer/stats_cache.go
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
package configurer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/sync/singleflight"
|
||||||
|
)
|
||||||
|
|
||||||
|
const statsCacheTTL = 1 * time.Second
|
||||||
|
|
||||||
|
type statsCache struct {
|
||||||
|
ttl time.Duration
|
||||||
|
fetch func() (map[string]WGStats, error)
|
||||||
|
|
||||||
|
mu sync.RWMutex
|
||||||
|
value map[string]WGStats
|
||||||
|
expireAt time.Time
|
||||||
|
|
||||||
|
sf singleflight.Group
|
||||||
|
}
|
||||||
|
|
||||||
|
func newStatsCache(ttl time.Duration, fetch func() (map[string]WGStats, error)) *statsCache {
|
||||||
|
return &statsCache{ttl: ttl, fetch: fetch}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *statsCache) get() (map[string]WGStats, error) {
|
||||||
|
c.mu.RLock()
|
||||||
|
if c.value != nil && time.Now().Before(c.expireAt) {
|
||||||
|
value := c.value
|
||||||
|
c.mu.RUnlock()
|
||||||
|
return value, nil
|
||||||
|
}
|
||||||
|
c.mu.RUnlock()
|
||||||
|
|
||||||
|
value, err, _ := c.sf.Do("stats", func() (interface{}, error) {
|
||||||
|
res, err := c.fetch()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
c.mu.Lock()
|
||||||
|
c.value = res
|
||||||
|
c.expireAt = time.Now().Add(c.ttl)
|
||||||
|
c.mu.Unlock()
|
||||||
|
return res, nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return value.(map[string]WGStats), nil
|
||||||
|
}
|
||||||
70
client/iface/configurer/stats_cache_test.go
Normal file
70
client/iface/configurer/stats_cache_test.go
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
package configurer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestStatsCache_CachesWithinTTL(t *testing.T) {
|
||||||
|
var calls atomic.Int64
|
||||||
|
c := newStatsCache(50*time.Millisecond, func() (map[string]WGStats, error) {
|
||||||
|
calls.Add(1)
|
||||||
|
return map[string]WGStats{"p": {}}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
_, err := c.get()
|
||||||
|
require.NoError(t, err)
|
||||||
|
}
|
||||||
|
require.Equal(t, int64(1), calls.Load(), "within TTL only one underlying fetch")
|
||||||
|
|
||||||
|
time.Sleep(60 * time.Millisecond)
|
||||||
|
_, err := c.get()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, int64(2), calls.Load(), "after TTL expiry a fresh fetch happens")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsCache_SingleFlight(t *testing.T) {
|
||||||
|
var calls atomic.Int64
|
||||||
|
release := make(chan struct{})
|
||||||
|
c := newStatsCache(time.Minute, func() (map[string]WGStats, error) {
|
||||||
|
calls.Add(1)
|
||||||
|
<-release
|
||||||
|
return map[string]WGStats{}, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
const n = 50
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
wg.Add(n)
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
_, _ = c.get()
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
close(release)
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
require.Equal(t, int64(1), calls.Load(), "concurrent misses collapse into one fetch")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestStatsCache_ErrorNotCached(t *testing.T) {
|
||||||
|
var calls atomic.Int64
|
||||||
|
wantErr := errors.New("dump failed")
|
||||||
|
c := newStatsCache(time.Minute, func() (map[string]WGStats, error) {
|
||||||
|
calls.Add(1)
|
||||||
|
return nil, wantErr
|
||||||
|
})
|
||||||
|
|
||||||
|
_, err := c.get()
|
||||||
|
require.ErrorIs(t, err, wantErr)
|
||||||
|
_, err = c.get()
|
||||||
|
require.ErrorIs(t, err, wantErr)
|
||||||
|
require.Equal(t, int64(2), calls.Load(), "errors are not cached; each call retries")
|
||||||
|
}
|
||||||
@@ -40,6 +40,7 @@ type WGUSPConfigurer struct {
|
|||||||
device *device.Device
|
device *device.Device
|
||||||
deviceName string
|
deviceName string
|
||||||
activityRecorder *bind.ActivityRecorder
|
activityRecorder *bind.ActivityRecorder
|
||||||
|
statsCache *statsCache
|
||||||
|
|
||||||
uapiListener net.Listener
|
uapiListener net.Listener
|
||||||
}
|
}
|
||||||
@@ -50,16 +51,19 @@ func NewUSPConfigurer(device *device.Device, deviceName string, activityRecorder
|
|||||||
deviceName: deviceName,
|
deviceName: deviceName,
|
||||||
activityRecorder: activityRecorder,
|
activityRecorder: activityRecorder,
|
||||||
}
|
}
|
||||||
|
wgCfg.statsCache = newStatsCache(statsCacheTTL, wgCfg.fetchStats)
|
||||||
wgCfg.startUAPI()
|
wgCfg.startUAPI()
|
||||||
return wgCfg
|
return wgCfg
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewUSPConfigurerNoUAPI(device *device.Device, deviceName string, activityRecorder *bind.ActivityRecorder) *WGUSPConfigurer {
|
func NewUSPConfigurerNoUAPI(device *device.Device, deviceName string, activityRecorder *bind.ActivityRecorder) *WGUSPConfigurer {
|
||||||
return &WGUSPConfigurer{
|
wgCfg := &WGUSPConfigurer{
|
||||||
device: device,
|
device: device,
|
||||||
deviceName: deviceName,
|
deviceName: deviceName,
|
||||||
activityRecorder: activityRecorder,
|
activityRecorder: activityRecorder,
|
||||||
}
|
}
|
||||||
|
wgCfg.statsCache = newStatsCache(statsCacheTTL, wgCfg.fetchStats)
|
||||||
|
return wgCfg
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *WGUSPConfigurer) ConfigureInterface(privateKey string, port int) error {
|
func (c *WGUSPConfigurer) ConfigureInterface(privateKey string, port int) error {
|
||||||
@@ -348,6 +352,10 @@ func (t *WGUSPConfigurer) Close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (t *WGUSPConfigurer) GetStats() (map[string]WGStats, error) {
|
func (t *WGUSPConfigurer) GetStats() (map[string]WGStats, error) {
|
||||||
|
return t.statsCache.get()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *WGUSPConfigurer) fetchStats() (map[string]WGStats, error) {
|
||||||
ipc, err := t.device.IpcGet()
|
ipc, err := t.device.IpcGet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("ipc get: %w", err)
|
return nil, fmt.Errorf("ipc get: %w", err)
|
||||||
|
|||||||
@@ -322,7 +322,6 @@ func (a *Auth) setSystemInfoFlags(info *system.Info) {
|
|||||||
a.config.BlockLANAccess,
|
a.config.BlockLANAccess,
|
||||||
a.config.BlockInbound,
|
a.config.BlockInbound,
|
||||||
a.config.DisableIPv6,
|
a.config.DisableIPv6,
|
||||||
a.config.LazyConnectionEnabled,
|
|
||||||
a.config.EnableSSHRoot,
|
a.config.EnableSSHRoot,
|
||||||
a.config.EnableSSHSFTP,
|
a.config.EnableSSHSFTP,
|
||||||
a.config.EnableSSHLocalPortForwarding,
|
a.config.EnableSSHLocalPortForwarding,
|
||||||
|
|||||||
@@ -16,6 +16,16 @@ import (
|
|||||||
"github.com/netbirdio/netbird/route"
|
"github.com/netbirdio/netbird/route"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// lazyForce is the resolved local decision for lazy connections, layered above the
|
||||||
|
// management feature flag. lazyForceNone defers to management.
|
||||||
|
type lazyForce int
|
||||||
|
|
||||||
|
const (
|
||||||
|
lazyForceNone lazyForce = iota
|
||||||
|
lazyForceOn
|
||||||
|
lazyForceOff
|
||||||
|
)
|
||||||
|
|
||||||
// ConnMgr coordinates both lazy connections (established on-demand) and permanent peer connections.
|
// ConnMgr coordinates both lazy connections (established on-demand) and permanent peer connections.
|
||||||
//
|
//
|
||||||
// The connection manager is responsible for:
|
// The connection manager is responsible for:
|
||||||
@@ -28,7 +38,7 @@ type ConnMgr struct {
|
|||||||
peerStore *peerstore.Store
|
peerStore *peerstore.Store
|
||||||
statusRecorder *peer.Status
|
statusRecorder *peer.Status
|
||||||
iface lazyconn.WGIface
|
iface lazyconn.WGIface
|
||||||
enabledLocally bool
|
force lazyForce
|
||||||
rosenpassEnabled bool
|
rosenpassEnabled bool
|
||||||
|
|
||||||
lazyConnMgr *manager.Manager
|
lazyConnMgr *manager.Manager
|
||||||
@@ -43,28 +53,34 @@ func NewConnMgr(engineConfig *EngineConfig, statusRecorder *peer.Status, peerSto
|
|||||||
peerStore: peerStore,
|
peerStore: peerStore,
|
||||||
statusRecorder: statusRecorder,
|
statusRecorder: statusRecorder,
|
||||||
iface: iface,
|
iface: iface,
|
||||||
|
force: resolveLazyForce(engineConfig.LazyConnection),
|
||||||
rosenpassEnabled: engineConfig.RosenpassEnabled,
|
rosenpassEnabled: engineConfig.RosenpassEnabled,
|
||||||
}
|
}
|
||||||
if engineConfig.LazyConnectionEnabled || lazyconn.IsLazyConnEnabledByEnv() {
|
|
||||||
e.enabledLocally = true
|
|
||||||
}
|
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start initializes the connection manager and starts the lazy connection manager if enabled by env var or cmd line option.
|
// Start initializes the connection manager. It starts the lazy connection manager when a
|
||||||
|
// local override forces it on; with no local override it waits for the management feature flag.
|
||||||
func (e *ConnMgr) Start(ctx context.Context) {
|
func (e *ConnMgr) Start(ctx context.Context) {
|
||||||
if e.lazyConnMgr != nil {
|
if e.lazyConnMgr != nil {
|
||||||
log.Errorf("lazy connection manager is already started")
|
log.Errorf("lazy connection manager is already started")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if !e.enabledLocally {
|
switch e.force {
|
||||||
log.Infof("lazy connection manager is disabled")
|
case lazyForceOff:
|
||||||
|
log.Infof("lazy connection manager is disabled by local override (%s or MDM policy)", lazyconn.EnvLazyConn)
|
||||||
|
e.statusRecorder.UpdateLazyConnection(false)
|
||||||
|
return
|
||||||
|
case lazyForceNone:
|
||||||
|
log.Infof("lazy connection manager is managed by the management feature flag")
|
||||||
|
e.statusRecorder.UpdateLazyConnection(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.rosenpassEnabled {
|
if e.rosenpassEnabled {
|
||||||
log.Warnf("rosenpass connection manager is enabled, lazy connection manager will not be started")
|
log.Warnf("rosenpass connection manager is enabled, lazy connection manager will not be started")
|
||||||
|
e.statusRecorder.UpdateLazyConnection(false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -76,8 +92,8 @@ func (e *ConnMgr) Start(ctx context.Context) {
|
|||||||
// If enabled, it initializes the lazy connection manager and start it. Do not need to call Start() again.
|
// If enabled, it initializes the lazy connection manager and start it. Do not need to call Start() again.
|
||||||
// If disabled, then it closes the lazy connection manager and open the connections to all peers.
|
// If disabled, then it closes the lazy connection manager and open the connections to all peers.
|
||||||
func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) error {
|
func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) error {
|
||||||
// do not disable lazy connection manager if it was enabled by env var
|
// a local override (NB_LAZY_CONN or local config) takes precedence over management
|
||||||
if e.enabledLocally {
|
if e.force != lazyForceNone {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -89,6 +105,7 @@ func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) er
|
|||||||
|
|
||||||
if e.rosenpassEnabled {
|
if e.rosenpassEnabled {
|
||||||
log.Infof("rosenpass connection manager is enabled, lazy connection manager will not be started")
|
log.Infof("rosenpass connection manager is enabled, lazy connection manager will not be started")
|
||||||
|
e.statusRecorder.UpdateLazyConnection(false)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -98,6 +115,7 @@ func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) er
|
|||||||
return e.addPeersToLazyConnManager()
|
return e.addPeersToLazyConnManager()
|
||||||
} else {
|
} else {
|
||||||
if e.lazyConnMgr == nil {
|
if e.lazyConnMgr == nil {
|
||||||
|
e.statusRecorder.UpdateLazyConnection(false)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.Infof("lazy connection manager is disabled by management feature flag")
|
log.Infof("lazy connection manager is disabled by management feature flag")
|
||||||
@@ -309,6 +327,25 @@ func (e *ConnMgr) isStartedWithLazyMgr() bool {
|
|||||||
return e.lazyConnMgr != nil && e.lazyCtxCancel != nil
|
return e.lazyConnMgr != nil && e.lazyCtxCancel != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// resolveLazyForce determines the local override. NB_LAZY_CONN takes precedence; when it
|
||||||
|
// is unset the MDM policy override (mdmState) applies. Either wins in both directions over
|
||||||
|
// the management feature flag; StateUnset for both defers to management.
|
||||||
|
func resolveLazyForce(mdmState lazyconn.State) lazyForce {
|
||||||
|
state := lazyconn.EnvState()
|
||||||
|
if state == lazyconn.StateUnset {
|
||||||
|
state = mdmState
|
||||||
|
}
|
||||||
|
|
||||||
|
switch state {
|
||||||
|
case lazyconn.StateOn:
|
||||||
|
return lazyForceOn
|
||||||
|
case lazyconn.StateOff:
|
||||||
|
return lazyForceOff
|
||||||
|
default:
|
||||||
|
return lazyForceNone
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func inactivityThresholdEnv() *time.Duration {
|
func inactivityThresholdEnv() *time.Duration {
|
||||||
envValue := os.Getenv(lazyconn.EnvInactivityThreshold)
|
envValue := os.Getenv(lazyconn.EnvInactivityThreshold)
|
||||||
if envValue == "" {
|
if envValue == "" {
|
||||||
|
|||||||
40
client/internal/conn_mgr_test.go
Normal file
40
client/internal/conn_mgr_test.go
Normal file
@@ -0,0 +1,40 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestResolveLazyForce(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
env string
|
||||||
|
envSet bool
|
||||||
|
mdm lazyconn.State
|
||||||
|
want lazyForce
|
||||||
|
}{
|
||||||
|
{name: "env unset, mdm unset -> defer to management", mdm: lazyconn.StateUnset, want: lazyForceNone},
|
||||||
|
{name: "env on -> force on", env: "on", envSet: true, mdm: lazyconn.StateUnset, want: lazyForceOn},
|
||||||
|
{name: "env off -> force off", env: "off", envSet: true, mdm: lazyconn.StateUnset, want: lazyForceOff},
|
||||||
|
{name: "env unset, mdm on -> force on", mdm: lazyconn.StateOn, want: lazyForceOn},
|
||||||
|
{name: "env unset, mdm off -> force off", mdm: lazyconn.StateOff, want: lazyForceOff},
|
||||||
|
{name: "env on beats mdm off", env: "on", envSet: true, mdm: lazyconn.StateOff, want: lazyForceOn},
|
||||||
|
{name: "env off beats mdm on", env: "off", envSet: true, mdm: lazyconn.StateOn, want: lazyForceOff},
|
||||||
|
{name: "unrecognized env, mdm on -> mdm wins", env: "auto", envSet: true, mdm: lazyconn.StateOn, want: lazyForceOn},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
t.Setenv(lazyconn.EnvLazyConn, tt.env)
|
||||||
|
if !tt.envSet {
|
||||||
|
os.Unsetenv(lazyconn.EnvLazyConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := resolveLazyForce(tt.mdm); got != tt.want {
|
||||||
|
t.Fatalf("resolveLazyForce(%v) = %v, want %v", tt.mdm, got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -27,6 +27,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/iface/device"
|
"github.com/netbirdio/netbird/client/iface/device"
|
||||||
"github.com/netbirdio/netbird/client/iface/netstack"
|
"github.com/netbirdio/netbird/client/iface/netstack"
|
||||||
"github.com/netbirdio/netbird/client/internal/dns"
|
"github.com/netbirdio/netbird/client/internal/dns"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||||
"github.com/netbirdio/netbird/client/internal/listener"
|
"github.com/netbirdio/netbird/client/internal/listener"
|
||||||
"github.com/netbirdio/netbird/client/internal/metrics"
|
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||||
"github.com/netbirdio/netbird/client/internal/peer"
|
"github.com/netbirdio/netbird/client/internal/peer"
|
||||||
@@ -601,7 +602,7 @@ func createEngineConfig(key wgtypes.Key, config *profilemanager.Config, peerConf
|
|||||||
BlockInbound: config.BlockInbound,
|
BlockInbound: config.BlockInbound,
|
||||||
DisableIPv6: config.DisableIPv6,
|
DisableIPv6: config.DisableIPv6,
|
||||||
|
|
||||||
LazyConnectionEnabled: config.LazyConnectionEnabled,
|
LazyConnection: lazyconn.ParseState(config.LazyConnection),
|
||||||
|
|
||||||
MTU: selectMTU(config.MTU, peerConfig.Mtu),
|
MTU: selectMTU(config.MTU, peerConfig.Mtu),
|
||||||
LogPath: logPath,
|
LogPath: logPath,
|
||||||
@@ -675,7 +676,6 @@ func loginToManagement(ctx context.Context, client mgm.Client, pubSSHKey []byte,
|
|||||||
config.BlockLANAccess,
|
config.BlockLANAccess,
|
||||||
config.BlockInbound,
|
config.BlockInbound,
|
||||||
config.DisableIPv6,
|
config.DisableIPv6,
|
||||||
config.LazyConnectionEnabled,
|
|
||||||
config.EnableSSHRoot,
|
config.EnableSSHRoot,
|
||||||
config.EnableSSHSFTP,
|
config.EnableSSHSFTP,
|
||||||
config.EnableSSHLocalPortForwarding,
|
config.EnableSSHLocalPortForwarding,
|
||||||
|
|||||||
@@ -681,7 +681,7 @@ func (g *BundleGenerator) addCommonConfigFields(configContent *strings.Builder)
|
|||||||
configContent.WriteString(fmt.Sprintf("ClientCertKeyPath: %s\n", g.internalConfig.ClientCertKeyPath))
|
configContent.WriteString(fmt.Sprintf("ClientCertKeyPath: %s\n", g.internalConfig.ClientCertKeyPath))
|
||||||
}
|
}
|
||||||
|
|
||||||
configContent.WriteString(fmt.Sprintf("LazyConnectionEnabled: %v\n", g.internalConfig.LazyConnectionEnabled))
|
configContent.WriteString(fmt.Sprintf("LazyConnection: %q\n", g.internalConfig.LazyConnection))
|
||||||
configContent.WriteString(fmt.Sprintf("MTU: %d\n", g.internalConfig.MTU))
|
configContent.WriteString(fmt.Sprintf("MTU: %d\n", g.internalConfig.MTU))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -885,7 +885,7 @@ func TestAddConfig_AllFieldsCovered(t *testing.T) {
|
|||||||
DNSRouteInterval: 5 * time.Second,
|
DNSRouteInterval: 5 * time.Second,
|
||||||
ClientCertPath: "/tmp/cert",
|
ClientCertPath: "/tmp/cert",
|
||||||
ClientCertKeyPath: "/tmp/key",
|
ClientCertKeyPath: "/tmp/key",
|
||||||
LazyConnectionEnabled: true,
|
LazyConnection: "on",
|
||||||
MTU: 1280,
|
MTU: 1280,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,6 +40,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
||||||
"github.com/netbirdio/netbird/client/internal/expose"
|
"github.com/netbirdio/netbird/client/internal/expose"
|
||||||
"github.com/netbirdio/netbird/client/internal/ingressgw"
|
"github.com/netbirdio/netbird/client/internal/ingressgw"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||||
"github.com/netbirdio/netbird/client/internal/metrics"
|
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||||
"github.com/netbirdio/netbird/client/internal/netflow"
|
"github.com/netbirdio/netbird/client/internal/netflow"
|
||||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||||
@@ -147,7 +148,9 @@ type EngineConfig struct {
|
|||||||
BlockInbound bool
|
BlockInbound bool
|
||||||
DisableIPv6 bool
|
DisableIPv6 bool
|
||||||
|
|
||||||
LazyConnectionEnabled bool
|
// LazyConnection is the MDM-sourced lazy-connection override; StateUnset defers to
|
||||||
|
// the env var and management feature flag.
|
||||||
|
LazyConnection lazyconn.State
|
||||||
|
|
||||||
MTU uint16
|
MTU uint16
|
||||||
|
|
||||||
@@ -217,6 +220,12 @@ type Engine struct {
|
|||||||
// networkSerial is the latest CurrentSerial (state ID) of the network sent by the Management service
|
// networkSerial is the latest CurrentSerial (state ID) of the network sent by the Management service
|
||||||
networkSerial uint64
|
networkSerial uint64
|
||||||
|
|
||||||
|
// forwardingRules holds the ingress forward rules applied for the current target.
|
||||||
|
// Wholesale sections (incl. forward rules) run only on the first pass of a target;
|
||||||
|
// it is stashed here so the final, peer-converged pass can build the lazy-connection
|
||||||
|
// exclude list without recomputing them on every bounded peer pass.
|
||||||
|
forwardingRules []firewallManager.ForwardRule
|
||||||
|
|
||||||
networkMonitor *networkmonitor.NetworkMonitor
|
networkMonitor *networkmonitor.NetworkMonitor
|
||||||
|
|
||||||
sshServer sshServer
|
sshServer sshServer
|
||||||
@@ -771,7 +780,15 @@ func (e *Engine) blockLanAccess() {
|
|||||||
|
|
||||||
// modifyPeers updates peers that have been modified (e.g. IP address has been changed).
|
// modifyPeers updates peers that have been modified (e.g. IP address has been changed).
|
||||||
// It closes the existing connection, removes it from the peerConns map, and creates a new one.
|
// It closes the existing connection, removes it from the peerConns map, and creates a new one.
|
||||||
func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
// maxPeersPerSyncPass is the default per-pass cap on how many peers each of
|
||||||
|
// removePeers/modifyPeers/addNewPeers applies, so syncMsgMux is held only for a
|
||||||
|
// batch at a time and other subsystems can interleave between passes. It is
|
||||||
|
// passed in (not read globally) so tests can exercise the multi-pass path.
|
||||||
|
const maxPeersPerSyncPass = 300
|
||||||
|
|
||||||
|
// modifyPeers re-applies up to maxBatch changed peers per call. It returns true
|
||||||
|
// when more changed peers remained than the cap, so the caller re-runs.
|
||||||
|
func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig, maxBatch int) (bool, error) {
|
||||||
|
|
||||||
// first, check if peers have been modified
|
// first, check if peers have been modified
|
||||||
var modified []*mgmProto.RemotePeerConfig
|
var modified []*mgmProto.RemotePeerConfig
|
||||||
@@ -801,26 +818,32 @@ func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
more := false
|
||||||
|
if len(modified) > maxBatch {
|
||||||
|
modified = modified[:maxBatch]
|
||||||
|
more = true
|
||||||
|
}
|
||||||
|
|
||||||
// second, close all modified connections and remove them from the state map
|
// second, close all modified connections and remove them from the state map
|
||||||
for _, p := range modified {
|
for _, p := range modified {
|
||||||
err := e.removePeer(p.GetWgPubKey())
|
if err := e.removePeer(p.GetWgPubKey()); err != nil {
|
||||||
if err != nil {
|
return false, err
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// third, add the peer connections again
|
// third, add the peer connections again
|
||||||
for _, p := range modified {
|
for _, p := range modified {
|
||||||
err := e.addNewPeer(p)
|
if err := e.addNewPeer(p); err != nil {
|
||||||
if err != nil {
|
return false, err
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return more, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// removePeers finds and removes peers that do not exist anymore in the network map received from the Management Service.
|
// removePeers finds and removes peers that do not exist anymore in the network map received from the Management Service.
|
||||||
// It also removes peers that have been modified (e.g. change of IP address). They will be added again in addPeers method.
|
// It also removes peers that have been modified (e.g. change of IP address). They will be added again in addPeers method.
|
||||||
func (e *Engine) removePeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
// removePeers removes up to maxBatch peers per call. It returns true when more
|
||||||
|
// peers remained to remove than the cap, so the caller re-runs.
|
||||||
|
func (e *Engine) removePeers(peersUpdate []*mgmProto.RemotePeerConfig, maxBatch int) (bool, error) {
|
||||||
newPeers := make([]string, 0, len(peersUpdate))
|
newPeers := make([]string, 0, len(peersUpdate))
|
||||||
for _, p := range peersUpdate {
|
for _, p := range peersUpdate {
|
||||||
newPeers = append(newPeers, p.GetWgPubKey())
|
newPeers = append(newPeers, p.GetWgPubKey())
|
||||||
@@ -828,14 +851,19 @@ func (e *Engine) removePeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
|||||||
|
|
||||||
toRemove := util.SliceDiff(e.peerStore.PeersPubKey(), newPeers)
|
toRemove := util.SliceDiff(e.peerStore.PeersPubKey(), newPeers)
|
||||||
|
|
||||||
|
more := false
|
||||||
|
if len(toRemove) > maxBatch {
|
||||||
|
toRemove = toRemove[:maxBatch]
|
||||||
|
more = true
|
||||||
|
}
|
||||||
|
|
||||||
for _, p := range toRemove {
|
for _, p := range toRemove {
|
||||||
err := e.removePeer(p)
|
if err := e.removePeer(p); err != nil {
|
||||||
if err != nil {
|
return false, err
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
log.Infof("removed peer %s", p)
|
log.Infof("removed peer %s", p)
|
||||||
}
|
}
|
||||||
return nil
|
return more, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) removeAllPeers() error {
|
func (e *Engine) removeAllPeers() error {
|
||||||
@@ -914,19 +942,17 @@ func (e *Engine) phase(name string) func() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
// applySyncPass applies one bounded pass of the sync update under syncMsgMux and
|
||||||
started := time.Now()
|
// returns true if more peers remained than the per-pass cap. It is driven by the
|
||||||
defer func() {
|
// mapStateManager, which re-invokes it (releasing the lock between passes) until
|
||||||
duration := time.Since(started)
|
// the update is fully applied.
|
||||||
log.Infof("sync finished in %s", duration)
|
func (e *Engine) applySyncPass(update *mgmProto.SyncResponse, firstPass bool) (bool, error) {
|
||||||
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
|
|
||||||
}()
|
|
||||||
e.syncMsgMux.Lock()
|
e.syncMsgMux.Lock()
|
||||||
defer e.syncMsgMux.Unlock()
|
defer e.syncMsgMux.Unlock()
|
||||||
|
|
||||||
// Check context INSIDE lock to ensure atomicity with shutdown
|
// Check context INSIDE lock to ensure atomicity with shutdown
|
||||||
if e.ctx.Err() != nil {
|
if e.ctx.Err() != nil {
|
||||||
return e.ctx.Err()
|
return false, e.ctx.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
|
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
|
||||||
@@ -937,7 +963,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
|||||||
err := e.updateNetbirdConfig(update.GetNetbirdConfig())
|
err := e.updateNetbirdConfig(update.GetNetbirdConfig())
|
||||||
done()
|
done()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Posture checks are bound to the network map presence:
|
// Posture checks are bound to the network map presence:
|
||||||
@@ -947,28 +973,25 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
|||||||
// leave the previously applied checks untouched
|
// leave the previously applied checks untouched
|
||||||
nm := update.GetNetworkMap()
|
nm := update.GetNetworkMap()
|
||||||
if nm == nil {
|
if nm == nil {
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
done = e.phase("checks")
|
done = e.phase("checks")
|
||||||
err = e.updateChecksIfNew(update.Checks)
|
err = e.updateChecksIfNew(update.Checks)
|
||||||
done()
|
done()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
done = e.phase("persist")
|
|
||||||
e.persistSyncResponse(update)
|
|
||||||
done()
|
|
||||||
|
|
||||||
// only apply new changes and ignore old ones
|
// only apply new changes and ignore old ones
|
||||||
if err := e.updateNetworkMap(nm); err != nil {
|
more, err := e.updateNetworkMap(nm, maxPeersPerSyncPass, firstPass)
|
||||||
return err
|
if err != nil {
|
||||||
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
e.statusRecorder.PublishEvent(cProto.SystemEvent_INFO, cProto.SystemEvent_SYSTEM, "Network map updated", "", nil)
|
e.statusRecorder.PublishEvent(cProto.SystemEvent_INFO, cProto.SystemEvent_SYSTEM, "Network map updated", "", nil)
|
||||||
|
|
||||||
return nil
|
return more, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateNetbirdConfig applies the management-provided NetBird configuration:
|
// updateNetbirdConfig applies the management-provided NetBird configuration:
|
||||||
@@ -1016,6 +1039,13 @@ func (e *Engine) updateNetbirdConfig(wCfg *mgmProto.NetbirdConfig) error {
|
|||||||
// (not syncMsgMux) is held for the whole Set so the store cannot be cleared (disabled /
|
// (not syncMsgMux) is held for the whole Set so the store cannot be cleared (disabled /
|
||||||
// engine close) mid-call and have this write resurrect a file that was just removed.
|
// engine close) mid-call and have this write resurrect a file that was just removed.
|
||||||
func (e *Engine) persistSyncResponse(update *mgmProto.SyncResponse) {
|
func (e *Engine) persistSyncResponse(update *mgmProto.SyncResponse) {
|
||||||
|
// Only persist updates that carry a network map. Config-only updates (e.g. relay
|
||||||
|
// token rotation, STUN/TURN) have a nil NetworkMap; persisting them would overwrite
|
||||||
|
// the last full map on disk and break restore-on-restart.
|
||||||
|
if update.GetNetworkMap() == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
e.syncRespMux.RLock()
|
e.syncRespMux.RLock()
|
||||||
defer e.syncRespMux.RUnlock()
|
defer e.syncRespMux.RUnlock()
|
||||||
|
|
||||||
@@ -1130,7 +1160,6 @@ func (e *Engine) applyInfoFlags(info *system.Info) {
|
|||||||
e.config.BlockLANAccess,
|
e.config.BlockLANAccess,
|
||||||
e.config.BlockInbound,
|
e.config.BlockInbound,
|
||||||
e.config.DisableIPv6,
|
e.config.DisableIPv6,
|
||||||
e.config.LazyConnectionEnabled,
|
|
||||||
e.config.EnableSSHRoot,
|
e.config.EnableSSHRoot,
|
||||||
e.config.EnableSSHSFTP,
|
e.config.EnableSSHSFTP,
|
||||||
e.config.EnableSSHLocalPortForwarding,
|
e.config.EnableSSHLocalPortForwarding,
|
||||||
@@ -1304,7 +1333,24 @@ func (e *Engine) receiveManagementEvents() {
|
|||||||
}
|
}
|
||||||
e.applyInfoFlags(info)
|
e.applyInfoFlags(info)
|
||||||
|
|
||||||
err := e.mgmClient.Sync(e.ctx, info, e.handleSync)
|
// The map-state manager converges the latest update in the background in
|
||||||
|
// bounded passes; the stream callback only hands it the newest target.
|
||||||
|
persist := func(u *mgmProto.SyncResponse) {
|
||||||
|
done := e.phase("persist")
|
||||||
|
e.persistSyncResponse(u)
|
||||||
|
done()
|
||||||
|
}
|
||||||
|
manager := newMapStateManager(e.applySyncPass, persist, func(d time.Duration) {
|
||||||
|
log.Infof("sync finished in %s", d)
|
||||||
|
e.clientMetrics.RecordSyncDuration(e.ctx, d)
|
||||||
|
})
|
||||||
|
e.shutdownWg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer e.shutdownWg.Done()
|
||||||
|
manager.run(e.ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
|
err := e.mgmClient.Sync(e.ctx, info, manager.SetTarget)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// happens if management is unavailable for a long time.
|
// happens if management is unavailable for a long time.
|
||||||
// We want to cancel the operation of the whole client
|
// We want to cancel the operation of the whole client
|
||||||
@@ -1355,21 +1401,107 @@ func (e *Engine) updateTURNs(turns []*mgmProto.ProtectedHostConfig) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
// updateNetworkMap applies the wholesale parts (config, routes, ACL, DNS) in full
|
||||||
|
// and up to maxBatch peers per phase. It returns true when more peers remained
|
||||||
|
// than the cap, so the caller re-runs until convergence.
|
||||||
|
func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap, maxBatch int, firstPass bool) (bool, error) {
|
||||||
// intentionally leave it before checking serial because for now it can happen that peer IP changed but serial didn't
|
// intentionally leave it before checking serial because for now it can happen that peer IP changed but serial didn't
|
||||||
if networkMap.GetPeerConfig() != nil {
|
if networkMap.GetPeerConfig() != nil {
|
||||||
err := e.updateConfig(networkMap.GetPeerConfig())
|
err := e.updateConfig(networkMap.GetPeerConfig())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
serial := networkMap.GetSerial()
|
serial := networkMap.GetSerial()
|
||||||
if e.networkSerial > serial {
|
if e.networkSerial > serial {
|
||||||
log.Debugf("received outdated NetworkMap with serial %d, ignoring", serial)
|
log.Debugf("received outdated NetworkMap with serial %d, ignoring", serial)
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Wholesale sections (firewall/ACL, DNS, routes, forward rules) are applied
|
||||||
|
// up-front and only once per target: they are cheap, local, idempotent and must
|
||||||
|
// be in place before peers come up (fail-closed). On the bounded re-runs that only
|
||||||
|
// drain the remaining peer batches they are skipped — the applied forward rules are
|
||||||
|
// reused from e.forwardingRules for the lazy-exclude finalize.
|
||||||
|
if firstPass {
|
||||||
|
e.applyWholesale(networkMap, serial)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
||||||
|
|
||||||
|
doneOffline := e.phase("offline_peers")
|
||||||
|
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
||||||
|
doneOffline()
|
||||||
|
|
||||||
|
// Filter out own peer from the remote peers list
|
||||||
|
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
||||||
|
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
|
||||||
|
for _, p := range networkMap.GetRemotePeers() {
|
||||||
|
if p.GetWgPubKey() != localPubKey {
|
||||||
|
remotePeers = append(remotePeers, p)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// No special case for cleanup: when management signals RemotePeersIsEmpty (e.g. our
|
||||||
|
// peer was deleted), remotePeers is already empty, so the bounded diff below removes
|
||||||
|
// every peer in batches — same path as a normal update, no unbounded removeAllPeers
|
||||||
|
// held under syncMsgMux in one shot.
|
||||||
|
doneRemoved := e.phase("removed_peers")
|
||||||
|
removeMore, err := e.removePeers(remotePeers, maxBatch)
|
||||||
|
doneRemoved()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
doneModified := e.phase("modified_peers")
|
||||||
|
modifyMore, err := e.modifyPeers(remotePeers, maxBatch)
|
||||||
|
doneModified()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
doneAdded := e.phase("added_peers")
|
||||||
|
addMore, err := e.addNewPeers(remotePeers, maxBatch)
|
||||||
|
doneAdded()
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// needMore signals the caller to re-run when a peer phase hit its per-pass cap.
|
||||||
|
needMore := removeMore || modifyMore || addMore
|
||||||
|
|
||||||
|
e.statusRecorder.FinishPeerListModifications()
|
||||||
|
|
||||||
|
e.updatePeerSSHHostKeys(remotePeers)
|
||||||
|
|
||||||
|
if err := e.updateSSHClientConfig(remotePeers); err != nil {
|
||||||
|
log.Warnf("failed to update SSH client config: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
e.updateSSHServerAuth(networkMap.GetSshAuth())
|
||||||
|
|
||||||
|
// Set the exclude list only once peers have fully converged (this pass added
|
||||||
|
// the last batch). It needs all target peers present in the store, and
|
||||||
|
// ExcludePeer has replace-semantics — a partial set mid-convergence would be wrong.
|
||||||
|
if !needMore {
|
||||||
|
doneLazy := e.phase("lazy_exclude")
|
||||||
|
excludedLazyPeers := e.toExcludedLazyPeers(e.forwardingRules, remotePeers)
|
||||||
|
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||||
|
doneLazy()
|
||||||
|
}
|
||||||
|
|
||||||
|
e.networkSerial = serial
|
||||||
|
|
||||||
|
return needMore, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// applyWholesale applies the cheap, local, idempotent map sections — lazy feature
|
||||||
|
// flag, firewall/legacy management, DNS, routes, ACL filtering, DNS forwarder and
|
||||||
|
// ingress forward rules — that must be in place before peers come up. It runs once
|
||||||
|
// per target (first pass only); the resulting forward rules are stashed in
|
||||||
|
// e.forwardingRules for the lazy-exclude finalize on the peer-converged pass.
|
||||||
|
func (e *Engine) applyWholesale(networkMap *mgmProto.NetworkMap, serial uint64) {
|
||||||
if err := e.connMgr.UpdatedRemoteFeatureFlag(e.ctx, networkMap.GetPeerConfig().GetLazyConnectionEnabled()); err != nil {
|
if err := e.connMgr.UpdatedRemoteFeatureFlag(e.ctx, networkMap.GetPeerConfig().GetLazyConnectionEnabled()); err != nil {
|
||||||
log.Errorf("failed to update lazy connection feature flag: %v", err)
|
log.Errorf("failed to update lazy connection feature flag: %v", err)
|
||||||
}
|
}
|
||||||
@@ -1442,84 +1574,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
|||||||
log.Errorf("failed to update forward rules, err: %v", err)
|
log.Errorf("failed to update forward rules, err: %v", err)
|
||||||
}
|
}
|
||||||
done()
|
done()
|
||||||
|
e.forwardingRules = forwardingRules
|
||||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
|
||||||
|
|
||||||
done = e.phase("offline_peers")
|
|
||||||
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
|
||||||
done()
|
|
||||||
|
|
||||||
remotePeers, err := e.reconcilePeers(networkMap)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
|
||||||
done = e.phase("lazy_exclude")
|
|
||||||
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
|
|
||||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
|
||||||
done()
|
|
||||||
|
|
||||||
e.networkSerial = serial
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// reconcilePeers applies the remote peer list from the network map (removing,
|
|
||||||
// modifying and adding peers, then updating SSH config) and returns the remote
|
|
||||||
// peers with our own peer filtered out, for use by later sync steps.
|
|
||||||
func (e *Engine) reconcilePeers(networkMap *mgmProto.NetworkMap) ([]*mgmProto.RemotePeerConfig, error) {
|
|
||||||
// Filter out own peer from the remote peers list
|
|
||||||
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
|
||||||
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
|
|
||||||
for _, p := range networkMap.GetRemotePeers() {
|
|
||||||
if p.GetWgPubKey() != localPubKey {
|
|
||||||
remotePeers = append(remotePeers, p)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// cleanup request, most likely our peer has been deleted
|
|
||||||
if networkMap.GetRemotePeersIsEmpty() {
|
|
||||||
err := e.removeAllPeers()
|
|
||||||
e.statusRecorder.FinishPeerListModifications()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
return remotePeers, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
done := e.phase("removed_peers")
|
|
||||||
err := e.removePeers(remotePeers)
|
|
||||||
done()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
done = e.phase("modified_peers")
|
|
||||||
err = e.modifyPeers(remotePeers)
|
|
||||||
done()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
done = e.phase("added_peers")
|
|
||||||
err = e.addNewPeers(remotePeers)
|
|
||||||
done()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
e.statusRecorder.FinishPeerListModifications()
|
|
||||||
|
|
||||||
e.updatePeerSSHHostKeys(remotePeers)
|
|
||||||
|
|
||||||
if err := e.updateSSHClientConfig(remotePeers); err != nil {
|
|
||||||
log.Warnf("failed to update SSH client config: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
e.updateSSHServerAuth(networkMap.GetSshAuth())
|
|
||||||
|
|
||||||
return remotePeers, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func toDNSFeatureFlag(networkMap *mgmProto.NetworkMap) bool {
|
func toDNSFeatureFlag(networkMap *mgmProto.NetworkMap) bool {
|
||||||
@@ -1699,14 +1754,23 @@ func addrToString(addr netip.Addr) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// addNewPeers adds peers that were not know before but arrived from the Management service with the update
|
// addNewPeers adds peers that were not know before but arrived from the Management service with the update
|
||||||
func (e *Engine) addNewPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
// addNewPeers adds up to maxBatch not-yet-present peers per call. It returns true
|
||||||
|
// when more new peers remained than the cap, so the caller re-runs.
|
||||||
|
func (e *Engine) addNewPeers(peersUpdate []*mgmProto.RemotePeerConfig, maxBatch int) (bool, error) {
|
||||||
|
added := 0
|
||||||
for _, p := range peersUpdate {
|
for _, p := range peersUpdate {
|
||||||
err := e.addNewPeer(p)
|
if _, ok := e.peerStore.PeerConn(p.GetWgPubKey()); ok {
|
||||||
if err != nil {
|
continue // already present (cheap skip), does not count toward the cap
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
if added >= maxBatch {
|
||||||
|
return true, nil // at least one more new peer remains
|
||||||
|
}
|
||||||
|
if err := e.addNewPeer(p); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
added++
|
||||||
}
|
}
|
||||||
return nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// addNewPeer add peer if connection doesn't exist
|
// addNewPeer add peer if connection doesn't exist
|
||||||
@@ -1999,7 +2063,6 @@ func (e *Engine) readInitialSettings() ([]*route.Route, *nbdns.Config, bool, err
|
|||||||
e.config.BlockLANAccess,
|
e.config.BlockLANAccess,
|
||||||
e.config.BlockInbound,
|
e.config.BlockInbound,
|
||||||
e.config.DisableIPv6,
|
e.config.DisableIPv6,
|
||||||
e.config.LazyConnectionEnabled,
|
|
||||||
e.config.EnableSSHRoot,
|
e.config.EnableSSHRoot,
|
||||||
e.config.EnableSSHSFTP,
|
e.config.EnableSSHSFTP,
|
||||||
e.config.EnableSSHLocalPortForwarding,
|
e.config.EnableSSHLocalPortForwarding,
|
||||||
|
|||||||
@@ -124,7 +124,7 @@ func TestEngine_SSH(t *testing.T) {
|
|||||||
RemotePeersIsEmpty: false,
|
RemotePeersIsEmpty: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = engine.updateNetworkMap(networkMap)
|
_, err = engine.updateNetworkMap(networkMap, maxPeersPerSyncPass, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Nil(t, engine.sshServer)
|
assert.Nil(t, engine.sshServer)
|
||||||
@@ -146,7 +146,7 @@ func TestEngine_SSH(t *testing.T) {
|
|||||||
RemotePeersIsEmpty: false,
|
RemotePeersIsEmpty: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = engine.updateNetworkMap(networkMap)
|
_, err = engine.updateNetworkMap(networkMap, maxPeersPerSyncPass, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
time.Sleep(250 * time.Millisecond)
|
time.Sleep(250 * time.Millisecond)
|
||||||
@@ -159,7 +159,7 @@ func TestEngine_SSH(t *testing.T) {
|
|||||||
RemotePeersIsEmpty: false,
|
RemotePeersIsEmpty: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = engine.updateNetworkMap(networkMap)
|
_, err = engine.updateNetworkMap(networkMap, maxPeersPerSyncPass, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// time.Sleep(250 * time.Millisecond)
|
// time.Sleep(250 * time.Millisecond)
|
||||||
@@ -174,7 +174,7 @@ func TestEngine_SSH(t *testing.T) {
|
|||||||
RemotePeersIsEmpty: false,
|
RemotePeersIsEmpty: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
err = engine.updateNetworkMap(networkMap)
|
_, err = engine.updateNetworkMap(networkMap, maxPeersPerSyncPass, true)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
assert.Nil(t, engine.sshServer)
|
assert.Nil(t, engine.sshServer)
|
||||||
|
|||||||
@@ -437,7 +437,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
|
|||||||
|
|
||||||
for _, c := range []testCase{case1, case2, case3, case4, case5, case6} {
|
for _, c := range []testCase{case1, case2, case3, case4, case5, case6} {
|
||||||
t.Run(c.name, func(t *testing.T) {
|
t.Run(c.name, func(t *testing.T) {
|
||||||
err = engine.updateNetworkMap(c.networkMap)
|
_, err = engine.updateNetworkMap(c.networkMap, maxPeersPerSyncPass, true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
return
|
return
|
||||||
@@ -464,6 +464,47 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// chunked apply: with a per-pass cap smaller than the number of peers, a
|
||||||
|
// single updateNetworkMap applies one batch and reports more==true; the
|
||||||
|
// caller re-runs until convergence. (engine currently holds 0 peers.)
|
||||||
|
t.Run("chunked add converges over multiple passes", func(t *testing.T) {
|
||||||
|
nm := &mgmtProto.NetworkMap{
|
||||||
|
Serial: 6,
|
||||||
|
RemotePeers: []*mgmtProto.RemotePeerConfig{peer1, peer2, peer3},
|
||||||
|
}
|
||||||
|
|
||||||
|
more, err := engine.updateNetworkMap(nm, 1, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, more, "pass 1 should signal more")
|
||||||
|
require.Len(t, engine.peerStore.PeersPubKey(), 1)
|
||||||
|
|
||||||
|
more, err = engine.updateNetworkMap(nm, 1, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, more, "pass 2 should signal more")
|
||||||
|
require.Len(t, engine.peerStore.PeersPubKey(), 2)
|
||||||
|
|
||||||
|
more, err = engine.updateNetworkMap(nm, 1, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, more, "pass 3 should converge")
|
||||||
|
require.Len(t, engine.peerStore.PeersPubKey(), 3)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("chunked remove converges over multiple passes", func(t *testing.T) {
|
||||||
|
nm := &mgmtProto.NetworkMap{
|
||||||
|
Serial: 7,
|
||||||
|
RemotePeers: []*mgmtProto.RemotePeerConfig{peer1}, // remove peer2, peer3
|
||||||
|
}
|
||||||
|
|
||||||
|
more, err := engine.updateNetworkMap(nm, 1, true)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.True(t, more, "pass 1 should signal more (2 to remove, cap 1)")
|
||||||
|
|
||||||
|
more, err = engine.updateNetworkMap(nm, 1, false)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.False(t, more, "pass 2 should converge")
|
||||||
|
require.Len(t, engine.peerStore.PeersPubKey(), 1)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
|
func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
|
||||||
@@ -634,7 +675,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = engine.updateNetworkMap(testCase.networkMap)
|
_, err = engine.updateNetworkMap(testCase.networkMap, maxPeersPerSyncPass, true)
|
||||||
assert.NoError(t, err, "shouldn't return error")
|
assert.NoError(t, err, "shouldn't return error")
|
||||||
assert.Equal(t, testCase.expectedSerial, input.inputSerial, "serial should match")
|
assert.Equal(t, testCase.expectedSerial, input.inputSerial, "serial should match")
|
||||||
assert.Len(t, input.clientRoutes, testCase.expectedLen, "clientRoutes len should match")
|
assert.Len(t, input.clientRoutes, testCase.expectedLen, "clientRoutes len should match")
|
||||||
@@ -838,7 +879,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
err = engine.updateNetworkMap(testCase.networkMap)
|
_, err = engine.updateNetworkMap(testCase.networkMap, maxPeersPerSyncPass, true)
|
||||||
assert.NoError(t, err, "shouldn't return error")
|
assert.NoError(t, err, "shouldn't return error")
|
||||||
assert.Equal(t, testCase.expectedSerial, input.inputSerial, "serial should match")
|
assert.Equal(t, testCase.expectedSerial, input.inputSerial, "serial should match")
|
||||||
assert.Len(t, input.inputNSGroups, testCase.expectedZonesLen, "zones len should match")
|
assert.Len(t, input.inputNSGroups, testCase.expectedZonesLen, "zones len should match")
|
||||||
|
|||||||
@@ -3,24 +3,57 @@ package lazyconn
|
|||||||
import (
|
import (
|
||||||
"os"
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
EnvEnableLazyConn = "NB_ENABLE_EXPERIMENTAL_LAZY_CONN"
|
EnvLazyConn = "NB_LAZY_CONN"
|
||||||
EnvInactivityThreshold = "NB_LAZY_CONN_INACTIVITY_THRESHOLD"
|
EnvInactivityThreshold = "NB_LAZY_CONN_INACTIVITY_THRESHOLD"
|
||||||
)
|
)
|
||||||
|
|
||||||
func IsLazyConnEnabledByEnv() bool {
|
// State is the tri-state local override for lazy connections read from the environment.
|
||||||
val := os.Getenv(EnvEnableLazyConn)
|
type State int
|
||||||
if val == "" {
|
|
||||||
return false
|
const (
|
||||||
}
|
// StateUnset means no local override; defer to the management feature flag.
|
||||||
enabled, err := strconv.ParseBool(val)
|
StateUnset State = iota
|
||||||
if err != nil {
|
// StateOn forces lazy connections on, overriding management.
|
||||||
log.Warnf("failed to parse %s: %v", EnvEnableLazyConn, err)
|
StateOn
|
||||||
return false
|
// StateOff forces lazy connections off, overriding management.
|
||||||
}
|
StateOff
|
||||||
return enabled
|
)
|
||||||
|
|
||||||
|
// EnvState reads NB_LAZY_CONN and returns the local override state.
|
||||||
|
func EnvState() State {
|
||||||
|
return ParseState(os.Getenv(EnvLazyConn))
|
||||||
|
}
|
||||||
|
|
||||||
|
// ParseState interprets a lazy-connection override value (from the environment or an MDM
|
||||||
|
// policy). It accepts the on/off aliases plus any value strconv.ParseBool understands
|
||||||
|
// (true/false/1/0). An empty or unrecognized value returns StateUnset so that the
|
||||||
|
// management feature flag remains in control.
|
||||||
|
func ParseState(raw string) State {
|
||||||
|
if raw == "" {
|
||||||
|
return StateUnset
|
||||||
|
}
|
||||||
|
|
||||||
|
normalized := strings.ToLower(strings.TrimSpace(raw))
|
||||||
|
switch normalized {
|
||||||
|
case "on":
|
||||||
|
return StateOn
|
||||||
|
case "off":
|
||||||
|
return StateOff
|
||||||
|
}
|
||||||
|
|
||||||
|
enabled, err := strconv.ParseBool(normalized)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to parse lazy connection value %q (from %s env or MDM policy): %v", raw, EnvLazyConn, err)
|
||||||
|
return StateUnset
|
||||||
|
}
|
||||||
|
if enabled {
|
||||||
|
return StateOn
|
||||||
|
}
|
||||||
|
return StateOff
|
||||||
}
|
}
|
||||||
|
|||||||
45
client/internal/lazyconn/env_test.go
Normal file
45
client/internal/lazyconn/env_test.go
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
package lazyconn
|
||||||
|
|
||||||
|
import (
|
||||||
|
"os"
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestEnvState(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
value string
|
||||||
|
set bool
|
||||||
|
want State
|
||||||
|
}{
|
||||||
|
{set: false, want: StateUnset},
|
||||||
|
{value: "", set: true, want: StateUnset},
|
||||||
|
{value: "on", set: true, want: StateOn},
|
||||||
|
{value: "ON", set: true, want: StateOn},
|
||||||
|
{value: "true", set: true, want: StateOn},
|
||||||
|
{value: "1", set: true, want: StateOn},
|
||||||
|
{value: " on ", set: true, want: StateOn},
|
||||||
|
{value: "off", set: true, want: StateOff},
|
||||||
|
{value: "OFF", set: true, want: StateOff},
|
||||||
|
{value: "false", set: true, want: StateOff},
|
||||||
|
{value: "0", set: true, want: StateOff},
|
||||||
|
{value: "auto", set: true, want: StateUnset},
|
||||||
|
{value: "garbage", set: true, want: StateUnset},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
name := tt.value
|
||||||
|
if !tt.set {
|
||||||
|
name = "unset"
|
||||||
|
}
|
||||||
|
t.Run(name, func(t *testing.T) {
|
||||||
|
t.Setenv(EnvLazyConn, tt.value)
|
||||||
|
if !tt.set {
|
||||||
|
os.Unsetenv(EnvLazyConn)
|
||||||
|
}
|
||||||
|
|
||||||
|
if got := EnvState(); got != tt.want {
|
||||||
|
t.Fatalf("EnvState() = %v, want %v", got, tt.want)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
214
client/internal/mapsync.go
Normal file
214
client/internal/mapsync.go
Normal file
@@ -0,0 +1,214 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mapStateManager is the single read/write point between the management stream
|
||||||
|
// (writes) and the convergence loop (reads/applies).
|
||||||
|
//
|
||||||
|
// The stream calls SetTarget with the latest full SyncResponse — the complete
|
||||||
|
// desired state. A single background goroutine (run) applies it to the engine in
|
||||||
|
// bounded passes via apply() until converged, releasing syncMsgMux between passes
|
||||||
|
// so other subsystems interleave. If a newer update arrives mid-flight, the loop
|
||||||
|
// coalesces: it keeps converging toward the latest target and the intermediate one
|
||||||
|
// is SKIPPED — never applied on its own (logged, no onConverged).
|
||||||
|
//
|
||||||
|
// Convergence is a single comparison: appliedGen == targetGen. targetGen
|
||||||
|
// increments on every SetTarget (an internal generation counter, so it also covers
|
||||||
|
// config-only updates that carry no network-map serial).
|
||||||
|
//
|
||||||
|
// onConverged fires once for each — and only each — map that is actually processed
|
||||||
|
// (i.e. converged as the target). Skipped/superseded maps and dropped-on-error maps
|
||||||
|
// do NOT fire it. So "sync finished in X" / RecordSyncDuration always corresponds
|
||||||
|
// to a real, completed alignment.
|
||||||
|
type mapStateManager struct {
|
||||||
|
// apply performs one bounded apply pass and reports whether more passes are needed.
|
||||||
|
// firstPass is true on the first pass of a given target, so the caller can run
|
||||||
|
// wholesale (firewall/routes/DNS/forward-rules) once per target and skip it on the
|
||||||
|
// re-runs that only drain the bounded peer batches. The manager owns this signal
|
||||||
|
// because it owns the convergence boundary; the engine need not track serials for it.
|
||||||
|
apply func(update *mgmProto.SyncResponse, firstPass bool) (bool, error)
|
||||||
|
// onConverged is called once per processed map, with the elapsed time since that
|
||||||
|
// map was received (for the sync-duration metric / "sync finished" log).
|
||||||
|
onConverged func(time.Duration)
|
||||||
|
// persist snapshots an update to disk for restore-on-restart. Called once per
|
||||||
|
// update received from management (in SetTarget), including ones later coalesced
|
||||||
|
// or skipped from apply, so the on-disk state mirrors what management last sent.
|
||||||
|
// The impl skips config-only updates (nil NetworkMap). May be nil.
|
||||||
|
persist func(*mgmProto.SyncResponse)
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
target *mgmProto.SyncResponse
|
||||||
|
targetGen uint64
|
||||||
|
appliedGen uint64
|
||||||
|
targetSetAt time.Time
|
||||||
|
|
||||||
|
wake chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMapStateManager(apply func(update *mgmProto.SyncResponse, firstPass bool) (bool, error), persist func(*mgmProto.SyncResponse), onConverged func(time.Duration)) *mapStateManager {
|
||||||
|
return &mapStateManager{
|
||||||
|
apply: apply,
|
||||||
|
persist: persist,
|
||||||
|
onConverged: onConverged,
|
||||||
|
wake: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetTarget records the latest update as the desired state and wakes the loop.
|
||||||
|
// It returns immediately; convergence happens in the background. Serial-based
|
||||||
|
// staleness of the network map is still enforced inside apply (updateNetworkMap).
|
||||||
|
func (m *mapStateManager) SetTarget(update *mgmProto.SyncResponse) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
// A target that has not settled yet (targetGen > appliedGen) is being superseded
|
||||||
|
// before it converged: we coalesce to the latest map and never apply this one on
|
||||||
|
// its own. It is SKIPPED — logged here, and it will not fire onConverged.
|
||||||
|
if m.target != nil && m.targetGen > m.appliedGen {
|
||||||
|
log.Debugf("sync map (gen %d) superseded before convergence, skipping", m.targetGen)
|
||||||
|
}
|
||||||
|
m.target = m.mergeTarget(m.target, update)
|
||||||
|
// Bump an internal generation counter, NOT the map serial: config-only updates
|
||||||
|
// (relay token rotation, STUN/TURN) arrive with NetworkMap == nil and carry no
|
||||||
|
// serial, yet must still be applied. Every SetTarget is therefore a distinct
|
||||||
|
// target regardless of payload. Map-serial staleness is enforced separately
|
||||||
|
// inside apply (updateNetworkMap).
|
||||||
|
m.targetGen++
|
||||||
|
m.targetSetAt = time.Now()
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case m.wake <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persist every update received from management — once per update (not per apply
|
||||||
|
// pass), and including ones that get coalesced/skipped from apply, so the on-disk
|
||||||
|
// state always reflects the latest map management sent. Done after waking the loop
|
||||||
|
// so convergence can start in parallel with the disk write. The persist impl skips
|
||||||
|
// config-only updates (nil NetworkMap).
|
||||||
|
if m.persist != nil {
|
||||||
|
m.persist(update)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// mergeTarget combines the currently pending target with a freshly received update
|
||||||
|
// and returns the new desired state. It is called under m.mu from SetTarget and is
|
||||||
|
// the single seam where the replace-vs-squash decision lives.
|
||||||
|
//
|
||||||
|
// Today management always sends a FULL map (the complete desired state), so the
|
||||||
|
// update simply replaces whatever was pending — prev is ignored. When management
|
||||||
|
// starts sending incremental/delta updates, squash `update` onto `prev` here; the
|
||||||
|
// rest of the manager (generation tracking, convergence, signaling) is unaffected
|
||||||
|
// because it already treats target as "the complete desired state, whatever it is".
|
||||||
|
func (m *mapStateManager) mergeTarget(prev, update *mgmProto.SyncResponse) *mgmProto.SyncResponse {
|
||||||
|
// Nothing pending to preserve (no prev, or prev already fully applied): plain replace.
|
||||||
|
if prev == nil || update == nil || m.targetGen == m.appliedGen {
|
||||||
|
return update
|
||||||
|
}
|
||||||
|
|
||||||
|
// prev still has unapplied state (targetGen > appliedGen). In the sync protocol a
|
||||||
|
// nil component means "no change", so if `update` omits a component that prev
|
||||||
|
// carried, carry prev's forward — otherwise coalescing an update that superseded a
|
||||||
|
// not-yet-applied one would silently drop the map or config it uniquely brought.
|
||||||
|
// A present component in `update` is newer and wins. Management may send map-only
|
||||||
|
// updates (nil config) and config-only updates (nil map); both are handled here.
|
||||||
|
// A nil component in `update` means "no change", so fill it in from prev — otherwise
|
||||||
|
// coalescing an update that superseded a not-yet-applied one would drop the map or
|
||||||
|
// config it uniquely carried. A present component in `update` is newer and wins.
|
||||||
|
// We mutate `update` in place: it is a fresh per-message allocation from the sync
|
||||||
|
// stream (see receiveUpdatesEvents — not reused), and persisting this squashed target
|
||||||
|
// is correct, since it is the current full (superset) desired state.
|
||||||
|
if update.GetNetworkMap() == nil && prev.GetNetworkMap() != nil {
|
||||||
|
update.NetworkMap = prev.GetNetworkMap()
|
||||||
|
update.Checks = prev.Checks // checks travel with the map
|
||||||
|
}
|
||||||
|
if update.GetNetbirdConfig() == nil && prev.GetNetbirdConfig() != nil {
|
||||||
|
update.NetbirdConfig = prev.GetNetbirdConfig()
|
||||||
|
}
|
||||||
|
return update
|
||||||
|
}
|
||||||
|
|
||||||
|
// run drives convergence until ctx is done. It is meant to run in its own goroutine.
|
||||||
|
func (m *mapStateManager) run(ctx context.Context) {
|
||||||
|
// passGen is the generation of the most recent apply() call (0 = none). A pass is
|
||||||
|
// the first for its target when its generation differs from the previous one —
|
||||||
|
// true on a fresh target and on a coalesced switch to a newer target mid-flight.
|
||||||
|
var passGen uint64
|
||||||
|
for {
|
||||||
|
m.mu.Lock()
|
||||||
|
target, tg, ag := m.target, m.targetGen, m.appliedGen
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
// Fully converged (or nothing yet): block until a new target arrives.
|
||||||
|
if target == nil || ag == tg {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-m.wake:
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
firstPass := tg != passGen
|
||||||
|
passGen = tg
|
||||||
|
more, err := m.apply(target, firstPass)
|
||||||
|
if err != nil {
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Log and DROP this target — do not retry it. A deterministic failure
|
||||||
|
// (e.g. a malformed peer in the map) would otherwise spin every pass
|
||||||
|
// making no progress. Management is the source of truth and re-delivers
|
||||||
|
// the full map on the next sync, so dropping is safe; peers already
|
||||||
|
// applied this convergence stay (idempotent diffs) and the remainder is
|
||||||
|
// reconciled by the next target. Mirrors the legacy handleSync path,
|
||||||
|
// where the apply error was logged by the gRPC client and the update
|
||||||
|
// dropped. No onConverged: this target did not converge.
|
||||||
|
log.Errorf("apply sync pass, dropping update: %v", err)
|
||||||
|
m.settle(tg, false)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if more {
|
||||||
|
// keep converging the current target; syncMsgMux was released by apply
|
||||||
|
// between passes so other subsystems interleave.
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// This pass converged. Mark applied and signal this one map.
|
||||||
|
m.settle(tg, true)
|
||||||
|
// if a newer target arrived mid-pass, settle is a no-op (targetGen != tg) and
|
||||||
|
// ag<tg next iteration -> apply it; this generation was skipped (logged in
|
||||||
|
// SetTarget) and is not signaled.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// settle marks generation tg as processed so the loop goes idle instead of
|
||||||
|
// re-applying the same target. It is a no-op when a newer target arrived during the
|
||||||
|
// pass (targetGen != tg), leaving appliedGen behind so that target re-applies — the
|
||||||
|
// just-finished generation was already counted as skipped.
|
||||||
|
//
|
||||||
|
// When signal is true (the pass converged) it fires onConverged once for this map;
|
||||||
|
// when false (the target was dropped on error) it does not — the map did not converge.
|
||||||
|
func (m *mapStateManager) settle(tg uint64, signal bool) {
|
||||||
|
m.mu.Lock()
|
||||||
|
if m.targetGen != tg {
|
||||||
|
m.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.appliedGen = tg
|
||||||
|
setAt := m.targetSetAt
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
if signal && m.onConverged != nil {
|
||||||
|
m.onConverged(time.Since(setAt))
|
||||||
|
}
|
||||||
|
}
|
||||||
281
client/internal/mapsync_test.go
Normal file
281
client/internal/mapsync_test.go
Normal file
@@ -0,0 +1,281 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"sync/atomic"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
// mergeTarget fills components missing from the incoming update with the pending
|
||||||
|
// (not-yet-applied) prev's, in place, so a coalesced/superseded update does not drop
|
||||||
|
// the map or config it uniquely carried.
|
||||||
|
func TestMapStateManager_MergeTargetPreservesPendingState(t *testing.T) {
|
||||||
|
m := newMapStateManager(nil, nil, nil)
|
||||||
|
|
||||||
|
// config-only update while a full map is still converging (targetGen > appliedGen):
|
||||||
|
// the pending map (+ checks) is filled into the update in place
|
||||||
|
m.targetGen, m.appliedGen = 5, 4
|
||||||
|
prev := &mgmProto.SyncResponse{NetworkMap: &mgmProto.NetworkMap{Serial: 5}}
|
||||||
|
update := &mgmProto.SyncResponse{NetbirdConfig: &mgmProto.NetbirdConfig{}}
|
||||||
|
merged := m.mergeTarget(prev, update)
|
||||||
|
require.Same(t, update, merged, "merges in place, returns the update")
|
||||||
|
require.EqualValues(t, 5, merged.GetNetworkMap().GetSerial(), "pending map preserved")
|
||||||
|
require.NotNil(t, merged.GetNetbirdConfig(), "new config kept")
|
||||||
|
|
||||||
|
// symmetric: map-only update while a config-only update is pending -> keep the config
|
||||||
|
m.targetGen, m.appliedGen = 5, 4
|
||||||
|
prev = &mgmProto.SyncResponse{NetbirdConfig: &mgmProto.NetbirdConfig{}}
|
||||||
|
update = &mgmProto.SyncResponse{NetworkMap: &mgmProto.NetworkMap{Serial: 7}}
|
||||||
|
merged = m.mergeTarget(prev, update)
|
||||||
|
require.EqualValues(t, 7, merged.GetNetworkMap().GetSerial(), "new map kept")
|
||||||
|
require.NotNil(t, merged.GetNetbirdConfig(), "pending config preserved")
|
||||||
|
|
||||||
|
// prev already applied (targetGen == appliedGen): plain replace, no fill-in
|
||||||
|
m.targetGen, m.appliedGen = 5, 5
|
||||||
|
prev = &mgmProto.SyncResponse{NetworkMap: &mgmProto.NetworkMap{Serial: 5}}
|
||||||
|
update = &mgmProto.SyncResponse{NetbirdConfig: &mgmProto.NetbirdConfig{}}
|
||||||
|
merged = m.mergeTarget(prev, update)
|
||||||
|
require.Same(t, update, merged)
|
||||||
|
require.Nil(t, merged.GetNetworkMap(), "no map grafted when prev already applied")
|
||||||
|
|
||||||
|
// nothing to carry (update has a map, prev has no config): plain replace
|
||||||
|
m.targetGen, m.appliedGen = 5, 4
|
||||||
|
prev = &mgmProto.SyncResponse{NetworkMap: &mgmProto.NetworkMap{Serial: 5}}
|
||||||
|
update = &mgmProto.SyncResponse{NetworkMap: &mgmProto.NetworkMap{Serial: 6}}
|
||||||
|
require.Same(t, update, m.mergeTarget(prev, update))
|
||||||
|
}
|
||||||
|
|
||||||
|
// converges over the bounded passes (apply returns more until the 3rd pass),
|
||||||
|
// fires onConverged exactly once, then blocks (no further apply) until a new target.
|
||||||
|
func TestMapStateManager_ConvergesThenStops(t *testing.T) {
|
||||||
|
var passes int32
|
||||||
|
var firstPasses int32
|
||||||
|
converged := make(chan struct{}, 1)
|
||||||
|
|
||||||
|
apply := func(_ *mgmProto.SyncResponse, firstPass bool) (bool, error) {
|
||||||
|
n := atomic.AddInt32(&passes, 1)
|
||||||
|
if firstPass {
|
||||||
|
atomic.AddInt32(&firstPasses, 1)
|
||||||
|
}
|
||||||
|
return n < 3, nil // more on pass 1 and 2, converge on pass 3
|
||||||
|
}
|
||||||
|
m := newMapStateManager(apply, nil, func(time.Duration) { converged <- struct{}{} })
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go m.run(ctx)
|
||||||
|
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-converged:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("manager did not converge")
|
||||||
|
}
|
||||||
|
require.EqualValues(t, 3, atomic.LoadInt32(&passes))
|
||||||
|
require.EqualValues(t, 1, atomic.LoadInt32(&firstPasses), "firstPass true only on pass 1, false on re-runs of the same target")
|
||||||
|
|
||||||
|
// once converged the loop blocks: no further apply calls
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
require.EqualValues(t, 3, atomic.LoadInt32(&passes), "apply must not run after convergence")
|
||||||
|
}
|
||||||
|
|
||||||
|
// persist runs once per received update (not per apply pass), regardless of how many
|
||||||
|
// bounded passes that target takes to converge.
|
||||||
|
func TestMapStateManager_PersistsOncePerUpdate(t *testing.T) {
|
||||||
|
var passes, persists int32
|
||||||
|
converged := make(chan struct{}, 1)
|
||||||
|
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||||
|
n := atomic.AddInt32(&passes, 1)
|
||||||
|
return n < 3, nil // 3 passes for one target
|
||||||
|
}
|
||||||
|
persist := func(*mgmProto.SyncResponse) { atomic.AddInt32(&persists, 1) }
|
||||||
|
m := newMapStateManager(apply, persist, func(time.Duration) { converged <- struct{}{} })
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go m.run(ctx)
|
||||||
|
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||||
|
select {
|
||||||
|
case <-converged:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("did not converge")
|
||||||
|
}
|
||||||
|
require.EqualValues(t, 3, atomic.LoadInt32(&passes))
|
||||||
|
require.EqualValues(t, 1, atomic.LoadInt32(&persists), "persist once per update, not per pass")
|
||||||
|
}
|
||||||
|
|
||||||
|
// every update received from management is persisted — even one that is coalesced /
|
||||||
|
// skipped from apply before it ever converges.
|
||||||
|
func TestMapStateManager_PersistsEveryUpdateIncludingSkipped(t *testing.T) {
|
||||||
|
release := make(chan struct{})
|
||||||
|
var persists int32
|
||||||
|
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||||
|
<-release // hold the first apply so the second update coalesces/skips
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
persist := func(*mgmProto.SyncResponse) { atomic.AddInt32(&persists, 1) }
|
||||||
|
m := newMapStateManager(apply, persist, nil)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go m.run(ctx)
|
||||||
|
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{})) // map1 -> apply blocks
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{})) // map2 supersedes map1 (skipped from apply)
|
||||||
|
close(release)
|
||||||
|
|
||||||
|
// both updates persisted even though map1 is skipped from apply
|
||||||
|
require.Eventually(t, func() bool { return atomic.LoadInt32(&persists) == 2 }, 2*time.Second, 10*time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// each map that is actually processed (converged before the next arrives) fires
|
||||||
|
// onConverged exactly once — mirroring the legacy per-message handleSync timing.
|
||||||
|
func TestMapStateManager_SignalsEachProcessedMap(t *testing.T) {
|
||||||
|
converged := make(chan struct{}, 8)
|
||||||
|
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||||
|
return false, nil // converge in one pass
|
||||||
|
}
|
||||||
|
m := newMapStateManager(apply, nil, func(time.Duration) { converged <- struct{}{} })
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go m.run(ctx)
|
||||||
|
|
||||||
|
const maps = 3
|
||||||
|
for i := 0; i < maps; i++ {
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||||
|
select { // wait for this map to converge before sending the next (no coalescing)
|
||||||
|
case <-converged:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatalf("map %d not signaled", i)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// no extra signals once the stream goes quiet
|
||||||
|
select {
|
||||||
|
case <-converged:
|
||||||
|
t.Fatal("unexpected extra onConverged")
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// a map superseded before it converges is skipped: only the latest (processed) map
|
||||||
|
// fires onConverged, not the skipped one.
|
||||||
|
func TestMapStateManager_SkippedMapNotSignaled(t *testing.T) {
|
||||||
|
release := make(chan struct{})
|
||||||
|
var applies, converged atomic.Int32
|
||||||
|
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||||
|
applies.Add(1)
|
||||||
|
<-release // hold the first apply in-flight so we can queue a newer target
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
m := newMapStateManager(apply, nil, func(time.Duration) { converged.Add(1) })
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go m.run(ctx)
|
||||||
|
|
||||||
|
// map1 is picked up; its apply blocks on release
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||||
|
require.Eventually(t, func() bool { return applies.Load() >= 1 }, 2*time.Second, 5*time.Millisecond)
|
||||||
|
|
||||||
|
// map2 supersedes map1 before it settled -> map1 is skipped
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||||
|
close(release) // let both applies proceed
|
||||||
|
|
||||||
|
// only the processed (latest) map signals; the skipped one does not
|
||||||
|
require.Eventually(t, func() bool { return converged.Load() == 1 }, 2*time.Second, 10*time.Millisecond)
|
||||||
|
time.Sleep(150 * time.Millisecond)
|
||||||
|
require.EqualValues(t, 1, converged.Load(), "skipped map must not fire onConverged")
|
||||||
|
require.EqualValues(t, 2, applies.Load(), "both targets entered apply (map1 once, map2 once)")
|
||||||
|
}
|
||||||
|
|
||||||
|
// an apply error drops the target: no retry of the same target, no onConverged,
|
||||||
|
// the loop goes idle — and a fresh target is still applied afterwards.
|
||||||
|
func TestMapStateManager_DropsTargetOnError(t *testing.T) {
|
||||||
|
applied := make(chan struct{}, 8)
|
||||||
|
var failNext atomic.Bool
|
||||||
|
failNext.Store(true)
|
||||||
|
|
||||||
|
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||||
|
applied <- struct{}{}
|
||||||
|
if failNext.Load() {
|
||||||
|
return false, errors.New("boom")
|
||||||
|
}
|
||||||
|
return false, nil // converge in one pass
|
||||||
|
}
|
||||||
|
var converged atomic.Int32
|
||||||
|
m := newMapStateManager(apply, nil, func(time.Duration) { converged.Add(1) })
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go m.run(ctx)
|
||||||
|
|
||||||
|
// first target errors -> applied once, then dropped (no retry, no onConverged)
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||||
|
select {
|
||||||
|
case <-applied:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("errored target not applied")
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-applied:
|
||||||
|
t.Fatal("errored target must not be retried")
|
||||||
|
case <-time.After(150 * time.Millisecond):
|
||||||
|
}
|
||||||
|
require.EqualValues(t, 0, converged.Load(), "onConverged must not fire on error")
|
||||||
|
|
||||||
|
// a new target is still processed normally and converges
|
||||||
|
failNext.Store(false)
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||||
|
select {
|
||||||
|
case <-applied:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("new target after error not applied")
|
||||||
|
}
|
||||||
|
require.Eventually(t, func() bool { return converged.Load() == 1 }, 2*time.Second, 10*time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
// a new target after convergence triggers a fresh apply; an idle (converged)
|
||||||
|
// manager does not apply on its own.
|
||||||
|
func TestMapStateManager_ReappliesOnNewTarget(t *testing.T) {
|
||||||
|
applied := make(chan struct{}, 8)
|
||||||
|
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||||
|
applied <- struct{}{}
|
||||||
|
return false, nil // converge in one pass
|
||||||
|
}
|
||||||
|
m := newMapStateManager(apply, nil, nil)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
go m.run(ctx)
|
||||||
|
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||||
|
select {
|
||||||
|
case <-applied:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("first target not applied")
|
||||||
|
}
|
||||||
|
|
||||||
|
// converged → must stay idle (no spurious apply)
|
||||||
|
select {
|
||||||
|
case <-applied:
|
||||||
|
t.Fatal("unexpected apply while idle/converged")
|
||||||
|
case <-time.After(150 * time.Millisecond):
|
||||||
|
}
|
||||||
|
|
||||||
|
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||||
|
select {
|
||||||
|
case <-applied:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatal("new target not applied")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -101,8 +101,6 @@ type ConfigInput struct {
|
|||||||
|
|
||||||
DNSLabels domain.List
|
DNSLabels domain.List
|
||||||
|
|
||||||
LazyConnectionEnabled *bool
|
|
||||||
|
|
||||||
MTU *uint16
|
MTU *uint16
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -180,7 +178,9 @@ type Config struct {
|
|||||||
|
|
||||||
ClientCertKeyPair *tls.Certificate `json:"-"`
|
ClientCertKeyPair *tls.Certificate `json:"-"`
|
||||||
|
|
||||||
LazyConnectionEnabled bool
|
// LazyConnection is the MDM-managed lazy-connection override ("on"/"off"/"").
|
||||||
|
// Runtime-only: re-derived from MDM policy on each load, never persisted.
|
||||||
|
LazyConnection string `json:"-"`
|
||||||
|
|
||||||
MTU uint16
|
MTU uint16
|
||||||
|
|
||||||
@@ -632,12 +632,6 @@ func (config *Config) apply(input ConfigInput) (updated bool, err error) {
|
|||||||
updated = true
|
updated = true
|
||||||
}
|
}
|
||||||
|
|
||||||
if input.LazyConnectionEnabled != nil && *input.LazyConnectionEnabled != config.LazyConnectionEnabled {
|
|
||||||
log.Infof("switching lazy connection to %t", *input.LazyConnectionEnabled)
|
|
||||||
config.LazyConnectionEnabled = *input.LazyConnectionEnabled
|
|
||||||
updated = true
|
|
||||||
}
|
|
||||||
|
|
||||||
if input.MTU != nil && *input.MTU != config.MTU {
|
if input.MTU != nil && *input.MTU != config.MTU {
|
||||||
log.Infof("updating MTU to %d (old value %d)", *input.MTU, config.MTU)
|
log.Infof("updating MTU to %d (old value %d)", *input.MTU, config.MTU)
|
||||||
config.MTU = *input.MTU
|
config.MTU = *input.MTU
|
||||||
@@ -728,6 +722,15 @@ func (config *Config) applyMDMPolicy(policy *mdm.Policy) {
|
|||||||
log.Warnf("MDM wireguard port %d out of range [1,65535]; keeping previous value", v)
|
log.Warnf("MDM wireguard port %d out of range [1,65535]; keeping previous value", v)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if v, ok := policy.GetBool(mdm.KeyLazyConnection); ok {
|
||||||
|
state := "off"
|
||||||
|
if v {
|
||||||
|
state = "on"
|
||||||
|
}
|
||||||
|
config.LazyConnection = state
|
||||||
|
logApplied(mdm.KeyLazyConnection, state)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseURL parses and validates the URL for the named service. The URL
|
// parseURL parses and validates the URL for the named service. The URL
|
||||||
|
|||||||
@@ -130,6 +130,37 @@ func TestApply_MDMBoolKeysOverrideOnDiskValue(t *testing.T) {
|
|||||||
assert.True(t, cfg.Policy().HasKey(mdm.KeyRosenpassEnabled))
|
assert.True(t, cfg.Policy().HasKey(mdm.KeyRosenpassEnabled))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestApply_MDMLazyConnection(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
name string
|
||||||
|
raw any
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{"native true", true, "on"},
|
||||||
|
{"native false", false, "off"},
|
||||||
|
{"string on", "on", "on"},
|
||||||
|
{"string off", "off", "off"},
|
||||||
|
{"string yes", "yes", "on"},
|
||||||
|
{"string no", "no", "off"},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
t.Run(c.name, func(t *testing.T) {
|
||||||
|
withMDMPolicy(t, mdm.NewPolicy(map[string]any{
|
||||||
|
mdm.KeyLazyConnection: c.raw,
|
||||||
|
}))
|
||||||
|
|
||||||
|
cfg, err := UpdateOrCreateConfig(ConfigInput{
|
||||||
|
ConfigPath: filepath.Join(t.TempDir(), "config.json"),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, cfg)
|
||||||
|
|
||||||
|
assert.Equal(t, c.want, cfg.LazyConnection)
|
||||||
|
assert.True(t, cfg.Policy().HasKey(mdm.KeyLazyConnection))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestApply_MDMPreSharedKeyRedactionSentinelRejected(t *testing.T) {
|
func TestApply_MDMPreSharedKeyRedactionSentinelRejected(t *testing.T) {
|
||||||
const maskSentinel = "**********"
|
const maskSentinel = "**********"
|
||||||
|
|
||||||
|
|||||||
@@ -38,7 +38,7 @@ func GetEnvKeyNBForceRelay() string {
|
|||||||
|
|
||||||
// GetEnvKeyNBLazyConn Exports the environment variable for the iOS client
|
// GetEnvKeyNBLazyConn Exports the environment variable for the iOS client
|
||||||
func GetEnvKeyNBLazyConn() string {
|
func GetEnvKeyNBLazyConn() string {
|
||||||
return lazyconn.EnvEnableLazyConn
|
return lazyconn.EnvLazyConn
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetEnvKeyNBInactivityThreshold Exports the environment variable for the iOS client
|
// GetEnvKeyNBInactivityThreshold Exports the environment variable for the iOS client
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ var allKeys = []string{
|
|||||||
KeyWireguardPort,
|
KeyWireguardPort,
|
||||||
KeySplitTunnelMode,
|
KeySplitTunnelMode,
|
||||||
KeySplitTunnelApps,
|
KeySplitTunnelApps,
|
||||||
|
KeyLazyConnection,
|
||||||
}
|
}
|
||||||
|
|
||||||
// canonicalKey maps the lowercase form of a managed-config value name to
|
// canonicalKey maps the lowercase form of a managed-config value name to
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ package mdm
|
|||||||
import (
|
import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
@@ -41,6 +42,11 @@ const (
|
|||||||
// construction — only one mode can be set at a time.
|
// construction — only one mode can be set at a time.
|
||||||
KeySplitTunnelMode = "splitTunnelMode"
|
KeySplitTunnelMode = "splitTunnelMode"
|
||||||
KeySplitTunnelApps = "splitTunnelApps"
|
KeySplitTunnelApps = "splitTunnelApps"
|
||||||
|
|
||||||
|
// KeyLazyConnection forces the lazy-connection feature on or off, overriding
|
||||||
|
// the management feature flag. Read as a bool (native bool, or on/off,
|
||||||
|
// true/false, 1/0, yes/no); absent = defer to management.
|
||||||
|
KeyLazyConnection = "lazyConnection"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Split-tunnel mode literals (KeySplitTunnelMode values).
|
// Split-tunnel mode literals (KeySplitTunnelMode values).
|
||||||
@@ -62,12 +68,13 @@ var boolStringLiterals = map[string]bool{
|
|||||||
"true": true,
|
"true": true,
|
||||||
"1": true,
|
"1": true,
|
||||||
"yes": true,
|
"yes": true,
|
||||||
|
"on": true,
|
||||||
"false": false,
|
"false": false,
|
||||||
"0": false,
|
"0": false,
|
||||||
"no": false,
|
"no": false,
|
||||||
|
"off": false,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// Policy holds MDM-managed settings read from the platform source. A nil or
|
// Policy holds MDM-managed settings read from the platform source. A nil or
|
||||||
// empty Policy means no enforcement is active.
|
// empty Policy means no enforcement is active.
|
||||||
type Policy struct {
|
type Policy struct {
|
||||||
@@ -150,7 +157,8 @@ func (p *Policy) GetString(key string) (string, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetBool returns the managed value for key coerced to bool, and whether the
|
// GetBool returns the managed value for key coerced to bool, and whether the
|
||||||
// key was set. Accepts native bool and string literals "true"/"false"/"1"/"0".
|
// key was set. Accepts native bool and string literals (true/false, 1/0,
|
||||||
|
// yes/no, on/off), case-insensitively and trimmed of surrounding whitespace.
|
||||||
func (p *Policy) GetBool(key string) (bool, bool) {
|
func (p *Policy) GetBool(key string) (bool, bool) {
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return false, false
|
return false, false
|
||||||
@@ -163,7 +171,7 @@ func (p *Policy) GetBool(key string) (bool, bool) {
|
|||||||
case bool:
|
case bool:
|
||||||
return t, true
|
return t, true
|
||||||
case string:
|
case string:
|
||||||
b, known := boolStringLiterals[t]
|
b, known := boolStringLiterals[strings.ToLower(strings.TrimSpace(t))]
|
||||||
return b, known
|
return b, known
|
||||||
case int:
|
case int:
|
||||||
return t != 0, true
|
return t != 0, true
|
||||||
|
|||||||
@@ -31,8 +31,8 @@ func TestPolicy_Empty(t *testing.T) {
|
|||||||
|
|
||||||
func TestPolicy_HasKey(t *testing.T) {
|
func TestPolicy_HasKey(t *testing.T) {
|
||||||
p := NewPolicy(map[string]any{
|
p := NewPolicy(map[string]any{
|
||||||
KeyManagementURL: "https://corp.example.com",
|
KeyManagementURL: "https://corp.example.com",
|
||||||
KeyDisableProfiles: true,
|
KeyDisableProfiles: true,
|
||||||
})
|
})
|
||||||
assert.False(t, p.IsEmpty())
|
assert.False(t, p.IsEmpty())
|
||||||
assert.True(t, p.HasKey(KeyManagementURL))
|
assert.True(t, p.HasKey(KeyManagementURL))
|
||||||
@@ -53,8 +53,8 @@ func TestPolicy_ManagedKeysSorted(t *testing.T) {
|
|||||||
func TestPolicy_GetString(t *testing.T) {
|
func TestPolicy_GetString(t *testing.T) {
|
||||||
p := NewPolicy(map[string]any{
|
p := NewPolicy(map[string]any{
|
||||||
KeyManagementURL: "https://corp.example.com",
|
KeyManagementURL: "https://corp.example.com",
|
||||||
KeyDisableProfiles: true, // wrong type for GetString
|
KeyDisableProfiles: true, // wrong type for GetString
|
||||||
KeyPreSharedKey: "", // empty rejected
|
KeyPreSharedKey: "", // empty rejected
|
||||||
})
|
})
|
||||||
v, ok := p.GetString(KeyManagementURL)
|
v, ok := p.GetString(KeyManagementURL)
|
||||||
assert.True(t, ok)
|
assert.True(t, ok)
|
||||||
@@ -85,6 +85,11 @@ func TestPolicy_GetBool(t *testing.T) {
|
|||||||
{"string 0", "0", false, true},
|
{"string 0", "0", false, true},
|
||||||
{"string yes", "yes", true, true},
|
{"string yes", "yes", true, true},
|
||||||
{"string no", "no", false, true},
|
{"string no", "no", false, true},
|
||||||
|
{"string on", "on", true, true},
|
||||||
|
{"string off", "off", false, true},
|
||||||
|
{"mixed case On", "On", true, true},
|
||||||
|
{"upper TRUE", "TRUE", true, true},
|
||||||
|
{"padded yes", " yes ", true, true},
|
||||||
{"int nonzero", 1, true, true},
|
{"int nonzero", 1, true, true},
|
||||||
{"int zero", 0, false, true},
|
{"int zero", 0, false, true},
|
||||||
{"int64 nonzero", int64(2), true, true},
|
{"int64 nonzero", int64(2), true, true},
|
||||||
|
|||||||
@@ -152,7 +152,6 @@ func (s *Server) restartEngineForMDMLocked() error {
|
|||||||
s.config = config
|
s.config = config
|
||||||
s.statusRecorder.UpdateManagementAddress(config.ManagementURL.String())
|
s.statusRecorder.UpdateManagementAddress(config.ManagementURL.String())
|
||||||
s.statusRecorder.UpdateRosenpass(config.RosenpassEnabled, config.RosenpassPermissive)
|
s.statusRecorder.UpdateRosenpass(config.RosenpassEnabled, config.RosenpassPermissive)
|
||||||
s.statusRecorder.UpdateLazyConnection(config.LazyConnectionEnabled)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(s.rootCtx)
|
ctx, cancel := context.WithCancel(s.rootCtx)
|
||||||
s.actCancel = cancel
|
s.actCancel = cancel
|
||||||
@@ -305,7 +304,6 @@ func setConfigRequestHasConfigOverrides(msg *proto.SetConfigRequest) bool {
|
|||||||
msg.DisableFirewall != nil ||
|
msg.DisableFirewall != nil ||
|
||||||
msg.BlockLanAccess != nil ||
|
msg.BlockLanAccess != nil ||
|
||||||
msg.DisableNotifications != nil ||
|
msg.DisableNotifications != nil ||
|
||||||
msg.LazyConnectionEnabled != nil ||
|
|
||||||
msg.BlockInbound != nil ||
|
msg.BlockInbound != nil ||
|
||||||
msg.DisableIpv6 != nil ||
|
msg.DisableIpv6 != nil ||
|
||||||
msg.EnableSSHRoot != nil ||
|
msg.EnableSSHRoot != nil ||
|
||||||
@@ -348,7 +346,6 @@ func loginRequestHasConfigOverrides(msg *proto.LoginRequest) bool {
|
|||||||
msg.BlockLanAccess != nil ||
|
msg.BlockLanAccess != nil ||
|
||||||
msg.DisableNotifications != nil ||
|
msg.DisableNotifications != nil ||
|
||||||
len(msg.DnsLabels) > 0 || msg.CleanDNSLabels ||
|
len(msg.DnsLabels) > 0 || msg.CleanDNSLabels ||
|
||||||
msg.LazyConnectionEnabled != nil ||
|
|
||||||
msg.BlockInbound != nil
|
msg.BlockInbound != nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -214,7 +214,6 @@ func (s *Server) Start() error {
|
|||||||
|
|
||||||
s.statusRecorder.UpdateManagementAddress(config.ManagementURL.String())
|
s.statusRecorder.UpdateManagementAddress(config.ManagementURL.String())
|
||||||
s.statusRecorder.UpdateRosenpass(config.RosenpassEnabled, config.RosenpassPermissive)
|
s.statusRecorder.UpdateRosenpass(config.RosenpassEnabled, config.RosenpassPermissive)
|
||||||
s.statusRecorder.UpdateLazyConnection(config.LazyConnectionEnabled)
|
|
||||||
|
|
||||||
if s.sessionWatcher == nil {
|
if s.sessionWatcher == nil {
|
||||||
s.sessionWatcher = internal.NewSessionWatcher(s.rootCtx, s.statusRecorder)
|
s.sessionWatcher = internal.NewSessionWatcher(s.rootCtx, s.statusRecorder)
|
||||||
@@ -463,7 +462,6 @@ func (s *Server) setConfigInputFromRequest(msg *proto.SetConfigRequest) (profile
|
|||||||
config.DisableFirewall = msg.DisableFirewall
|
config.DisableFirewall = msg.DisableFirewall
|
||||||
config.BlockLANAccess = msg.BlockLanAccess
|
config.BlockLANAccess = msg.BlockLanAccess
|
||||||
config.DisableNotifications = msg.DisableNotifications
|
config.DisableNotifications = msg.DisableNotifications
|
||||||
config.LazyConnectionEnabled = msg.LazyConnectionEnabled
|
|
||||||
config.BlockInbound = msg.BlockInbound
|
config.BlockInbound = msg.BlockInbound
|
||||||
config.DisableIPv6 = msg.DisableIpv6
|
config.DisableIPv6 = msg.DisableIpv6
|
||||||
config.EnableSSHRoot = msg.EnableSSHRoot
|
config.EnableSSHRoot = msg.EnableSSHRoot
|
||||||
@@ -1647,7 +1645,6 @@ func (s *Server) GetConfig(ctx context.Context, req *proto.GetConfigRequest) (*p
|
|||||||
ServerSSHAllowed: *cfg.ServerSSHAllowed,
|
ServerSSHAllowed: *cfg.ServerSSHAllowed,
|
||||||
RosenpassEnabled: cfg.RosenpassEnabled,
|
RosenpassEnabled: cfg.RosenpassEnabled,
|
||||||
RosenpassPermissive: cfg.RosenpassPermissive,
|
RosenpassPermissive: cfg.RosenpassPermissive,
|
||||||
LazyConnectionEnabled: cfg.LazyConnectionEnabled,
|
|
||||||
BlockInbound: cfg.BlockInbound,
|
BlockInbound: cfg.BlockInbound,
|
||||||
DisableNotifications: disableNotifications,
|
DisableNotifications: disableNotifications,
|
||||||
NetworkMonitor: networkMonitor,
|
NetworkMonitor: networkMonitor,
|
||||||
|
|||||||
@@ -69,43 +69,41 @@ func TestSetConfig_AllFieldsSaved(t *testing.T) {
|
|||||||
disableFirewall := true
|
disableFirewall := true
|
||||||
blockLANAccess := true
|
blockLANAccess := true
|
||||||
disableNotifications := true
|
disableNotifications := true
|
||||||
lazyConnectionEnabled := true
|
|
||||||
blockInbound := true
|
blockInbound := true
|
||||||
disableIPv6 := true
|
disableIPv6 := true
|
||||||
mtu := int64(1280)
|
mtu := int64(1280)
|
||||||
sshJWTCacheTTL := int32(300)
|
sshJWTCacheTTL := int32(300)
|
||||||
|
|
||||||
req := &proto.SetConfigRequest{
|
req := &proto.SetConfigRequest{
|
||||||
ProfileName: profName,
|
ProfileName: profName,
|
||||||
Username: currUser.Username,
|
Username: currUser.Username,
|
||||||
ManagementUrl: "https://new-api.netbird.io:443",
|
ManagementUrl: "https://new-api.netbird.io:443",
|
||||||
AdminURL: "https://new-admin.netbird.io",
|
AdminURL: "https://new-admin.netbird.io",
|
||||||
RosenpassEnabled: &rosenpassEnabled,
|
RosenpassEnabled: &rosenpassEnabled,
|
||||||
RosenpassPermissive: &rosenpassPermissive,
|
RosenpassPermissive: &rosenpassPermissive,
|
||||||
ServerSSHAllowed: &serverSSHAllowed,
|
ServerSSHAllowed: &serverSSHAllowed,
|
||||||
InterfaceName: &interfaceName,
|
InterfaceName: &interfaceName,
|
||||||
WireguardPort: &wireguardPort,
|
WireguardPort: &wireguardPort,
|
||||||
OptionalPreSharedKey: &preSharedKey,
|
OptionalPreSharedKey: &preSharedKey,
|
||||||
DisableAutoConnect: &disableAutoConnect,
|
DisableAutoConnect: &disableAutoConnect,
|
||||||
NetworkMonitor: &networkMonitor,
|
NetworkMonitor: &networkMonitor,
|
||||||
DisableClientRoutes: &disableClientRoutes,
|
DisableClientRoutes: &disableClientRoutes,
|
||||||
DisableServerRoutes: &disableServerRoutes,
|
DisableServerRoutes: &disableServerRoutes,
|
||||||
DisableDns: &disableDNS,
|
DisableDns: &disableDNS,
|
||||||
DisableFirewall: &disableFirewall,
|
DisableFirewall: &disableFirewall,
|
||||||
BlockLanAccess: &blockLANAccess,
|
BlockLanAccess: &blockLANAccess,
|
||||||
DisableNotifications: &disableNotifications,
|
DisableNotifications: &disableNotifications,
|
||||||
LazyConnectionEnabled: &lazyConnectionEnabled,
|
BlockInbound: &blockInbound,
|
||||||
BlockInbound: &blockInbound,
|
DisableIpv6: &disableIPv6,
|
||||||
DisableIpv6: &disableIPv6,
|
NatExternalIPs: []string{"1.2.3.4", "5.6.7.8"},
|
||||||
NatExternalIPs: []string{"1.2.3.4", "5.6.7.8"},
|
CleanNATExternalIPs: false,
|
||||||
CleanNATExternalIPs: false,
|
CustomDNSAddress: []byte("1.1.1.1:53"),
|
||||||
CustomDNSAddress: []byte("1.1.1.1:53"),
|
ExtraIFaceBlacklist: []string{"eth1", "eth2"},
|
||||||
ExtraIFaceBlacklist: []string{"eth1", "eth2"},
|
DnsLabels: []string{"label1", "label2"},
|
||||||
DnsLabels: []string{"label1", "label2"},
|
CleanDNSLabels: false,
|
||||||
CleanDNSLabels: false,
|
DnsRouteInterval: durationpb.New(2 * time.Minute),
|
||||||
DnsRouteInterval: durationpb.New(2 * time.Minute),
|
Mtu: &mtu,
|
||||||
Mtu: &mtu,
|
SshJWTCacheTTL: &sshJWTCacheTTL,
|
||||||
SshJWTCacheTTL: &sshJWTCacheTTL,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = s.SetConfig(ctx, req)
|
_, err = s.SetConfig(ctx, req)
|
||||||
@@ -140,7 +138,6 @@ func TestSetConfig_AllFieldsSaved(t *testing.T) {
|
|||||||
require.Equal(t, blockLANAccess, cfg.BlockLANAccess)
|
require.Equal(t, blockLANAccess, cfg.BlockLANAccess)
|
||||||
require.NotNil(t, cfg.DisableNotifications)
|
require.NotNil(t, cfg.DisableNotifications)
|
||||||
require.Equal(t, disableNotifications, *cfg.DisableNotifications)
|
require.Equal(t, disableNotifications, *cfg.DisableNotifications)
|
||||||
require.Equal(t, lazyConnectionEnabled, cfg.LazyConnectionEnabled)
|
|
||||||
require.Equal(t, blockInbound, cfg.BlockInbound)
|
require.Equal(t, blockInbound, cfg.BlockInbound)
|
||||||
require.Equal(t, disableIPv6, cfg.DisableIPv6)
|
require.Equal(t, disableIPv6, cfg.DisableIPv6)
|
||||||
require.Equal(t, []string{"1.2.3.4", "5.6.7.8"}, cfg.NATExternalIPs)
|
require.Equal(t, []string{"1.2.3.4", "5.6.7.8"}, cfg.NATExternalIPs)
|
||||||
@@ -164,13 +161,14 @@ func verifyAllFieldsCovered(t *testing.T, req *proto.SetConfigRequest) {
|
|||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
metadataFields := map[string]bool{
|
metadataFields := map[string]bool{
|
||||||
"state": true, // protobuf internal
|
"state": true, // protobuf internal
|
||||||
"sizeCache": true, // protobuf internal
|
"sizeCache": true, // protobuf internal
|
||||||
"unknownFields": true, // protobuf internal
|
"unknownFields": true, // protobuf internal
|
||||||
"Username": true, // metadata
|
"Username": true, // metadata
|
||||||
"ProfileName": true, // metadata
|
"ProfileName": true, // metadata
|
||||||
"CleanNATExternalIPs": true, // control flag for clearing
|
"CleanNATExternalIPs": true, // control flag for clearing
|
||||||
"CleanDNSLabels": true, // control flag for clearing
|
"CleanDNSLabels": true, // control flag for clearing
|
||||||
|
"LazyConnectionEnabled": true, // deprecated: proto field retained for compat, no longer applied
|
||||||
}
|
}
|
||||||
|
|
||||||
expectedFields := map[string]bool{
|
expectedFields := map[string]bool{
|
||||||
@@ -190,7 +188,6 @@ func verifyAllFieldsCovered(t *testing.T, req *proto.SetConfigRequest) {
|
|||||||
"DisableFirewall": true,
|
"DisableFirewall": true,
|
||||||
"BlockLanAccess": true,
|
"BlockLanAccess": true,
|
||||||
"DisableNotifications": true,
|
"DisableNotifications": true,
|
||||||
"LazyConnectionEnabled": true,
|
|
||||||
"BlockInbound": true,
|
"BlockInbound": true,
|
||||||
"DisableIpv6": true,
|
"DisableIpv6": true,
|
||||||
"NatExternalIPs": true,
|
"NatExternalIPs": true,
|
||||||
@@ -252,7 +249,6 @@ func TestCLIFlags_MappedToSetConfig(t *testing.T) {
|
|||||||
"block-lan-access": "BlockLanAccess",
|
"block-lan-access": "BlockLanAccess",
|
||||||
"block-inbound": "BlockInbound",
|
"block-inbound": "BlockInbound",
|
||||||
"disable-ipv6": "DisableIpv6",
|
"disable-ipv6": "DisableIpv6",
|
||||||
"enable-lazy-connection": "LazyConnectionEnabled",
|
|
||||||
"external-ip-map": "NatExternalIPs",
|
"external-ip-map": "NatExternalIPs",
|
||||||
"dns-resolver-address": "CustomDNSAddress",
|
"dns-resolver-address": "CustomDNSAddress",
|
||||||
"extra-iface-blacklist": "ExtraIFaceBlacklist",
|
"extra-iface-blacklist": "ExtraIFaceBlacklist",
|
||||||
@@ -269,7 +265,8 @@ func TestCLIFlags_MappedToSetConfig(t *testing.T) {
|
|||||||
|
|
||||||
// SetConfigRequest fields that don't have CLI flags (settable only via UI or other means).
|
// SetConfigRequest fields that don't have CLI flags (settable only via UI or other means).
|
||||||
fieldsWithoutCLIFlags := map[string]bool{
|
fieldsWithoutCLIFlags := map[string]bool{
|
||||||
"DisableNotifications": true, // Only settable via UI
|
"DisableNotifications": true, // Only settable via UI
|
||||||
|
"LazyConnectionEnabled": true, // deprecated: no longer settable (managed by server + NB_LAZY_CONN)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get all SetConfigRequest fields to verify our map is complete.
|
// Get all SetConfigRequest fields to verify our map is complete.
|
||||||
|
|||||||
@@ -74,8 +74,6 @@ type Info struct {
|
|||||||
BlockInbound bool
|
BlockInbound bool
|
||||||
DisableIPv6 bool
|
DisableIPv6 bool
|
||||||
|
|
||||||
LazyConnectionEnabled bool
|
|
||||||
|
|
||||||
EnableSSHRoot bool
|
EnableSSHRoot bool
|
||||||
EnableSSHSFTP bool
|
EnableSSHSFTP bool
|
||||||
EnableSSHLocalPortForwarding bool
|
EnableSSHLocalPortForwarding bool
|
||||||
@@ -87,7 +85,7 @@ func (i *Info) SetFlags(
|
|||||||
rosenpassEnabled, rosenpassPermissive bool,
|
rosenpassEnabled, rosenpassPermissive bool,
|
||||||
serverSSHAllowed *bool,
|
serverSSHAllowed *bool,
|
||||||
disableClientRoutes, disableServerRoutes,
|
disableClientRoutes, disableServerRoutes,
|
||||||
disableDNS, disableFirewall, blockLANAccess, blockInbound, disableIPv6, lazyConnectionEnabled bool,
|
disableDNS, disableFirewall, blockLANAccess, blockInbound, disableIPv6 bool,
|
||||||
enableSSHRoot, enableSSHSFTP, enableSSHLocalPortForwarding, enableSSHRemotePortForwarding *bool,
|
enableSSHRoot, enableSSHSFTP, enableSSHLocalPortForwarding, enableSSHRemotePortForwarding *bool,
|
||||||
disableSSHAuth *bool,
|
disableSSHAuth *bool,
|
||||||
) {
|
) {
|
||||||
@@ -105,8 +103,6 @@ func (i *Info) SetFlags(
|
|||||||
i.BlockInbound = blockInbound
|
i.BlockInbound = blockInbound
|
||||||
i.DisableIPv6 = disableIPv6
|
i.DisableIPv6 = disableIPv6
|
||||||
|
|
||||||
i.LazyConnectionEnabled = lazyConnectionEnabled
|
|
||||||
|
|
||||||
if enableSSHRoot != nil {
|
if enableSSHRoot != nil {
|
||||||
i.EnableSSHRoot = *enableSSHRoot
|
i.EnableSSHRoot = *enableSSHRoot
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -266,7 +266,6 @@ type serviceClient struct {
|
|||||||
mAllowSSH *systray.MenuItem
|
mAllowSSH *systray.MenuItem
|
||||||
mAutoConnect *systray.MenuItem
|
mAutoConnect *systray.MenuItem
|
||||||
mEnableRosenpass *systray.MenuItem
|
mEnableRosenpass *systray.MenuItem
|
||||||
mLazyConnEnabled *systray.MenuItem
|
|
||||||
mBlockInbound *systray.MenuItem
|
mBlockInbound *systray.MenuItem
|
||||||
mNotifications *systray.MenuItem
|
mNotifications *systray.MenuItem
|
||||||
mAdvancedSettings *systray.MenuItem
|
mAdvancedSettings *systray.MenuItem
|
||||||
@@ -336,11 +335,11 @@ type serviceClient struct {
|
|||||||
// mNetworks + mExitNode submenu items. Combines features.DisableNetworks
|
// mNetworks + mExitNode submenu items. Combines features.DisableNetworks
|
||||||
// AND s.connected — both must be true for the menus to be active.
|
// AND s.connected — both must be true for the menus to be active.
|
||||||
// Zero value (false) matches the Disable() call at AddMenuItem time.
|
// Zero value (false) matches the Disable() call at AddMenuItem time.
|
||||||
networksMenuEnabled bool
|
networksMenuEnabled bool
|
||||||
showNetworks bool
|
showNetworks bool
|
||||||
wNetworks fyne.Window
|
wNetworks fyne.Window
|
||||||
wProfiles fyne.Window
|
wProfiles fyne.Window
|
||||||
wQuickActions fyne.Window
|
wQuickActions fyne.Window
|
||||||
|
|
||||||
eventManager *event.Manager
|
eventManager *event.Manager
|
||||||
|
|
||||||
@@ -1094,7 +1093,6 @@ func (s *serviceClient) onTrayReady() {
|
|||||||
s.mAllowSSH = s.mSettings.AddSubMenuItemCheckbox("Allow SSH", allowSSHMenuDescr, false)
|
s.mAllowSSH = s.mSettings.AddSubMenuItemCheckbox("Allow SSH", allowSSHMenuDescr, false)
|
||||||
s.mAutoConnect = s.mSettings.AddSubMenuItemCheckbox("Connect on Startup", autoConnectMenuDescr, false)
|
s.mAutoConnect = s.mSettings.AddSubMenuItemCheckbox("Connect on Startup", autoConnectMenuDescr, false)
|
||||||
s.mEnableRosenpass = s.mSettings.AddSubMenuItemCheckbox("Enable Quantum-Resistance", quantumResistanceMenuDescr, false)
|
s.mEnableRosenpass = s.mSettings.AddSubMenuItemCheckbox("Enable Quantum-Resistance", quantumResistanceMenuDescr, false)
|
||||||
s.mLazyConnEnabled = s.mSettings.AddSubMenuItemCheckbox("Enable Lazy Connections", lazyConnMenuDescr, false)
|
|
||||||
s.mBlockInbound = s.mSettings.AddSubMenuItemCheckbox("Block Inbound Connections", blockInboundMenuDescr, false)
|
s.mBlockInbound = s.mSettings.AddSubMenuItemCheckbox("Block Inbound Connections", blockInboundMenuDescr, false)
|
||||||
s.mNotifications = s.mSettings.AddSubMenuItemCheckbox("Notifications", notificationsMenuDescr, false)
|
s.mNotifications = s.mSettings.AddSubMenuItemCheckbox("Notifications", notificationsMenuDescr, false)
|
||||||
s.mSettings.AddSeparator()
|
s.mSettings.AddSeparator()
|
||||||
@@ -1578,7 +1576,6 @@ func protoConfigToConfig(cfg *proto.GetConfigResponse) *profilemanager.Config {
|
|||||||
config.RosenpassEnabled = cfg.RosenpassEnabled
|
config.RosenpassEnabled = cfg.RosenpassEnabled
|
||||||
config.RosenpassPermissive = cfg.RosenpassPermissive
|
config.RosenpassPermissive = cfg.RosenpassPermissive
|
||||||
config.DisableNotifications = &cfg.DisableNotifications
|
config.DisableNotifications = &cfg.DisableNotifications
|
||||||
config.LazyConnectionEnabled = cfg.LazyConnectionEnabled
|
|
||||||
config.BlockInbound = cfg.BlockInbound
|
config.BlockInbound = cfg.BlockInbound
|
||||||
config.NetworkMonitor = &cfg.NetworkMonitor
|
config.NetworkMonitor = &cfg.NetworkMonitor
|
||||||
config.DisableDNS = cfg.DisableDns
|
config.DisableDNS = cfg.DisableDns
|
||||||
@@ -1682,12 +1679,6 @@ func (s *serviceClient) loadSettings() {
|
|||||||
s.mEnableRosenpass.Uncheck()
|
s.mEnableRosenpass.Uncheck()
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.LazyConnectionEnabled {
|
|
||||||
s.mLazyConnEnabled.Check()
|
|
||||||
} else {
|
|
||||||
s.mLazyConnEnabled.Uncheck()
|
|
||||||
}
|
|
||||||
|
|
||||||
if cfg.BlockInbound {
|
if cfg.BlockInbound {
|
||||||
s.mBlockInbound.Check()
|
s.mBlockInbound.Check()
|
||||||
} else {
|
} else {
|
||||||
@@ -1833,7 +1824,6 @@ func (s *serviceClient) updateConfig() error {
|
|||||||
disableAutoStart := !s.mAutoConnect.Checked()
|
disableAutoStart := !s.mAutoConnect.Checked()
|
||||||
sshAllowed := s.mAllowSSH.Checked()
|
sshAllowed := s.mAllowSSH.Checked()
|
||||||
rosenpassEnabled := s.mEnableRosenpass.Checked()
|
rosenpassEnabled := s.mEnableRosenpass.Checked()
|
||||||
lazyConnectionEnabled := s.mLazyConnEnabled.Checked()
|
|
||||||
blockInbound := s.mBlockInbound.Checked()
|
blockInbound := s.mBlockInbound.Checked()
|
||||||
notificationsDisabled := !s.mNotifications.Checked()
|
notificationsDisabled := !s.mNotifications.Checked()
|
||||||
|
|
||||||
@@ -1856,14 +1846,13 @@ func (s *serviceClient) updateConfig() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
req := proto.SetConfigRequest{
|
req := proto.SetConfigRequest{
|
||||||
ProfileName: activeProf.ID.String(),
|
ProfileName: activeProf.ID.String(),
|
||||||
Username: currUser.Username,
|
Username: currUser.Username,
|
||||||
DisableAutoConnect: &disableAutoStart,
|
DisableAutoConnect: &disableAutoStart,
|
||||||
ServerSSHAllowed: &sshAllowed,
|
ServerSSHAllowed: &sshAllowed,
|
||||||
RosenpassEnabled: &rosenpassEnabled,
|
RosenpassEnabled: &rosenpassEnabled,
|
||||||
LazyConnectionEnabled: &lazyConnectionEnabled,
|
BlockInbound: &blockInbound,
|
||||||
BlockInbound: &blockInbound,
|
DisableNotifications: ¬ificationsDisabled,
|
||||||
DisableNotifications: ¬ificationsDisabled,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := conn.SetConfig(s.ctx, &req); err != nil {
|
if _, err := conn.SetConfig(s.ctx, &req); err != nil {
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ const (
|
|||||||
allowSSHMenuDescr = "Allow SSH connections"
|
allowSSHMenuDescr = "Allow SSH connections"
|
||||||
autoConnectMenuDescr = "Connect automatically when the service starts"
|
autoConnectMenuDescr = "Connect automatically when the service starts"
|
||||||
quantumResistanceMenuDescr = "Enable post-quantum security via Rosenpass"
|
quantumResistanceMenuDescr = "Enable post-quantum security via Rosenpass"
|
||||||
lazyConnMenuDescr = "[Experimental] Enable lazy connections"
|
|
||||||
blockInboundMenuDescr = "Block inbound connections to the local machine and routed networks"
|
blockInboundMenuDescr = "Block inbound connections to the local machine and routed networks"
|
||||||
notificationsMenuDescr = "Enable notifications"
|
notificationsMenuDescr = "Enable notifications"
|
||||||
advancedSettingsMenuDescr = "Advanced settings of the application"
|
advancedSettingsMenuDescr = "Advanced settings of the application"
|
||||||
|
|||||||
@@ -43,8 +43,6 @@ func (h *eventHandler) listen(ctx context.Context) {
|
|||||||
h.handleAutoConnectClick()
|
h.handleAutoConnectClick()
|
||||||
case <-h.client.mEnableRosenpass.ClickedCh:
|
case <-h.client.mEnableRosenpass.ClickedCh:
|
||||||
h.handleRosenpassClick()
|
h.handleRosenpassClick()
|
||||||
case <-h.client.mLazyConnEnabled.ClickedCh:
|
|
||||||
h.handleLazyConnectionClick()
|
|
||||||
case <-h.client.mBlockInbound.ClickedCh:
|
case <-h.client.mBlockInbound.ClickedCh:
|
||||||
h.handleBlockInboundClick()
|
h.handleBlockInboundClick()
|
||||||
case <-h.client.mAdvancedSettings.ClickedCh:
|
case <-h.client.mAdvancedSettings.ClickedCh:
|
||||||
@@ -152,15 +150,6 @@ func (h *eventHandler) handleRosenpassClick() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *eventHandler) handleLazyConnectionClick() {
|
|
||||||
h.toggleCheckbox(h.client.mLazyConnEnabled)
|
|
||||||
if err := h.updateConfigWithErr(); err != nil {
|
|
||||||
h.toggleCheckbox(h.client.mLazyConnEnabled) // revert checkbox state on error
|
|
||||||
log.Errorf("failed to update config: %v", err)
|
|
||||||
h.client.notifier.Send("Error", "Failed to update lazy connection settings")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h *eventHandler) handleBlockInboundClick() {
|
func (h *eventHandler) handleBlockInboundClick() {
|
||||||
h.toggleCheckbox(h.client.mBlockInbound)
|
h.toggleCheckbox(h.client.mBlockInbound)
|
||||||
if err := h.updateConfigWithErr(); err != nil {
|
if err := h.updateConfigWithErr(); err != nil {
|
||||||
|
|||||||
171
e2e/agentnetwork/vllm_test.go
Normal file
171
e2e/agentnetwork/vllm_test.go
Normal file
@@ -0,0 +1,171 @@
|
|||||||
|
//go:build e2e
|
||||||
|
|
||||||
|
package agentnetwork
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/e2e/harness"
|
||||||
|
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestVLLMProvider proves the proxy supports a self-hosted vLLM backend. vLLM is
|
||||||
|
// OpenAI-compatible, so it uses the "vllm" catalog entry (KindCustom) and is
|
||||||
|
// reached over plain HTTP — no TLS anywhere on the path:
|
||||||
|
//
|
||||||
|
// client --tunnel--> netbird proxy --http--> vllm (:8000, OpenAI-compatible)
|
||||||
|
//
|
||||||
|
// The mock vLLM server answers /v1/chat/completions with an OpenAI-shaped
|
||||||
|
// completion carrying a non-zero usage block. The test asserts the chat returns
|
||||||
|
// 200 with the completion, that the request is recorded in the access log by its
|
||||||
|
// session id, and that vLLM's usage block is metered into a consumption row —
|
||||||
|
// which together prove request routing, response parsing, and token accounting
|
||||||
|
// all work for a self-hosted OpenAI-compatible provider.
|
||||||
|
//
|
||||||
|
// It needs no external credentials (the mock ignores auth), so it always runs.
|
||||||
|
func TestVLLMProvider(t *testing.T) {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
vllm, err := harness.StartVLLM(ctx, srv)
|
||||||
|
require.NoError(t, err, "start mock vLLM server")
|
||||||
|
t.Cleanup(func() { _ = vllm.Terminate(context.Background()) })
|
||||||
|
|
||||||
|
grp, err := srv.API().Groups.Create(ctx, api.PostApiGroupsJSONRequestBody{Name: "e2e-vllm"})
|
||||||
|
require.NoError(t, err, "create group")
|
||||||
|
t.Cleanup(func() { _ = srv.API().Groups.Delete(context.Background(), grp.Id) })
|
||||||
|
|
||||||
|
ephemeral := false
|
||||||
|
sk, err := srv.API().SetupKeys.Create(ctx, api.PostApiSetupKeysJSONRequestBody{
|
||||||
|
Name: "e2e-vllm-client",
|
||||||
|
Type: "reusable",
|
||||||
|
ExpiresIn: 86400,
|
||||||
|
UsageLimit: 0,
|
||||||
|
AutoGroups: []string{grp.Id},
|
||||||
|
Ephemeral: &ephemeral,
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "mint setup key")
|
||||||
|
require.NotEmpty(t, sk.Key, "setup key plaintext")
|
||||||
|
|
||||||
|
// vLLM provider pointed at the mock over plain HTTP. The mock ignores auth,
|
||||||
|
// so a dummy key satisfies the "Bearer ${API_KEY}" template. The served model
|
||||||
|
// is enumerated so the router dispatches this model string to this provider.
|
||||||
|
dummyKey := "sk-vllm-e2e"
|
||||||
|
prov, err := srv.CreateProvider(ctx, api.AgentNetworkProviderRequest{
|
||||||
|
Name: "vllm",
|
||||||
|
ProviderId: "vllm",
|
||||||
|
UpstreamUrl: vllm.URL,
|
||||||
|
ApiKey: &dummyKey,
|
||||||
|
Enabled: ptr(true),
|
||||||
|
BootstrapCluster: ptr(harness.AgentNetworkCluster),
|
||||||
|
Models: &[]api.AgentNetworkProviderModel{
|
||||||
|
{Id: harness.VLLMModel, InputPer1k: 0.001, OutputPer1k: 0.002},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "create vllm provider")
|
||||||
|
t.Cleanup(func() { _ = srv.DeleteProvider(context.Background(), prov.Id) })
|
||||||
|
|
||||||
|
// Token limit far above the handful of tokens this test drives, so it never
|
||||||
|
// blocks but switches on usage metering — the switch that makes consumption
|
||||||
|
// rows get recorded.
|
||||||
|
enabled := true
|
||||||
|
pol, err := srv.CreatePolicy(ctx, api.AgentNetworkPolicyRequest{
|
||||||
|
Name: "e2e-vllm-allow",
|
||||||
|
Enabled: &enabled,
|
||||||
|
SourceGroups: []string{grp.Id},
|
||||||
|
DestinationProviderIds: []string{prov.Id},
|
||||||
|
Limits: &api.AgentNetworkPolicyLimits{
|
||||||
|
TokenLimit: api.AgentNetworkPolicyTokenLimit{
|
||||||
|
Enabled: true,
|
||||||
|
GroupCap: 10_000_000,
|
||||||
|
UserCap: 10_000_000,
|
||||||
|
WindowSeconds: 60,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
require.NoError(t, err, "create policy")
|
||||||
|
t.Cleanup(func() { _ = srv.DeletePolicy(context.Background(), pol.Id) })
|
||||||
|
|
||||||
|
settings, err := srv.GetSettings(ctx)
|
||||||
|
require.NoError(t, err, "read settings")
|
||||||
|
require.NotEmpty(t, settings.Endpoint, "endpoint must be assigned")
|
||||||
|
|
||||||
|
proxyToken, err := srv.CreateProxyTokenCLI(ctx, "e2e-vllm-proxy")
|
||||||
|
require.NoError(t, err, "mint proxy token")
|
||||||
|
px, err := harness.StartProxy(ctx, srv, proxyToken)
|
||||||
|
require.NoError(t, err, "start proxy")
|
||||||
|
t.Cleanup(func() { _ = px.Terminate(context.Background()) })
|
||||||
|
|
||||||
|
cl, err := harness.StartClient(ctx, srv, sk.Key)
|
||||||
|
require.NoError(t, err, "start client")
|
||||||
|
t.Cleanup(func() { _ = cl.Terminate(context.Background()) })
|
||||||
|
|
||||||
|
require.NoError(t, cl.WaitConnected(ctx, 90*time.Second), "client must connect to management")
|
||||||
|
if err := cl.WaitProxyPeer(ctx, 180*time.Second); err != nil {
|
||||||
|
t.Fatalf("client did not see the proxy peer: %v\n=== proxy logs ===\n%s", err, px.Logs(context.Background()))
|
||||||
|
}
|
||||||
|
proxyIP, err := cl.ResolveProxyIP(ctx, settings.Endpoint)
|
||||||
|
require.NoError(t, err, "resolve endpoint to proxy IP")
|
||||||
|
|
||||||
|
before, _ := srv.ListAccessLogs(ctx)
|
||||||
|
sessionID := "e2e-session-vllm"
|
||||||
|
|
||||||
|
// Retry to absorb tunnel/DNS jitter on the first call.
|
||||||
|
var code int
|
||||||
|
var body string
|
||||||
|
deadline := time.Now().Add(90 * time.Second)
|
||||||
|
for time.Now().Before(deadline) {
|
||||||
|
c, b, cerr := cl.Chat(ctx, settings.Endpoint, proxyIP, harness.WireChat, harness.VLLMModel, "Reply with exactly: pong", sessionID)
|
||||||
|
if cerr == nil {
|
||||||
|
code, body = c, b
|
||||||
|
if code == 200 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
}
|
||||||
|
require.Equal(t, 200, code,
|
||||||
|
"chat through the vLLM provider must return 200; body: %s\n=== vllm logs ===\n%s\n=== proxy logs ===\n%s",
|
||||||
|
body, vllm.Logs(context.Background()), px.Logs(context.Background()))
|
||||||
|
require.True(t, strings.Contains(body, "chat.completion"),
|
||||||
|
"body should be an OpenAI-compatible chat completion; got: %s", body)
|
||||||
|
|
||||||
|
// The request must surface as an access-log row carrying our session id.
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
logs, lerr := srv.ListAccessLogs(ctx)
|
||||||
|
return lerr == nil && logs.TotalRecords > before.TotalRecords
|
||||||
|
}, 30*time.Second, 2*time.Second, "an access-log row should be ingested for the vLLM provider")
|
||||||
|
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
logs, lerr := srv.ListAccessLogs(ctx)
|
||||||
|
if lerr != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, r := range logs.Data {
|
||||||
|
if r.SessionId != nil && *r.SessionId == sessionID {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}, 30*time.Second, 2*time.Second, "session id %q must be recorded in an access-log row", sessionID)
|
||||||
|
|
||||||
|
// vLLM's usage block (prompt_tokens=11, completion_tokens=2) must be parsed
|
||||||
|
// and metered into a consumption row with positive token counts.
|
||||||
|
require.Eventually(t, func() bool {
|
||||||
|
rows, lerr := srv.ListConsumption(ctx)
|
||||||
|
if lerr != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
for _, r := range rows {
|
||||||
|
if r.TokensInput > 0 && r.TokensOutput > 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}, 60*time.Second, 3*time.Second, "vLLM usage must be metered into a consumption row")
|
||||||
|
}
|
||||||
113
e2e/harness/vllm.go
Normal file
113
e2e/harness/vllm.go
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
//go:build e2e
|
||||||
|
|
||||||
|
package harness
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/docker/docker/api/types/container"
|
||||||
|
"github.com/testcontainers/testcontainers-go"
|
||||||
|
"github.com/testcontainers/testcontainers-go/wait"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
vllmImage = "nginx:alpine"
|
||||||
|
vllmAlias = "vllm"
|
||||||
|
vllmPort = "8000/tcp"
|
||||||
|
// VLLMModel is the served model id the mock advertises and echoes back. It
|
||||||
|
// matches a real small model commonly served by vLLM so the provider's
|
||||||
|
// enumerated model and the client's request line up.
|
||||||
|
VLLMModel = "Qwen/Qwen2.5-0.5B-Instruct"
|
||||||
|
)
|
||||||
|
|
||||||
|
// vllmNginxConf emulates a vLLM OpenAI-compatible server over plain HTTP (vLLM's
|
||||||
|
// default: no TLS, port 8000). It answers /v1/models with a one-model list and
|
||||||
|
// any chat/completions path with a canned OpenAI-shaped chat completion carrying
|
||||||
|
// a non-zero usage block, so the proxy's OpenAI parser records real token
|
||||||
|
// consumption. Running actual vLLM in CI is infeasible (GPU + multi-GB model
|
||||||
|
// download), so this stands in for the wire contract the proxy depends on.
|
||||||
|
const vllmNginxConf = `pid /tmp/nginx.pid;
|
||||||
|
events {}
|
||||||
|
http {
|
||||||
|
server {
|
||||||
|
listen 8000;
|
||||||
|
location = /v1/models {
|
||||||
|
default_type application/json;
|
||||||
|
return 200 '{"object":"list","data":[{"id":"Qwen/Qwen2.5-0.5B-Instruct","object":"model","owned_by":"vllm"}]}';
|
||||||
|
}
|
||||||
|
location / {
|
||||||
|
default_type application/json;
|
||||||
|
return 200 '{"id":"chatcmpl-e2e-vllm","object":"chat.completion","created":1700000000,"model":"Qwen/Qwen2.5-0.5B-Instruct","choices":[{"index":0,"message":{"role":"assistant","content":"pong"},"finish_reason":"stop"}],"usage":{"prompt_tokens":11,"completion_tokens":2,"total_tokens":13}}';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
`
|
||||||
|
|
||||||
|
// VLLM is a mock vLLM OpenAI-compatible server on the combined server's network,
|
||||||
|
// reachable at http://vllm:8000. A "vllm" provider points at it to exercise the
|
||||||
|
// proxy's support for self-hosted OpenAI-compatible backends.
|
||||||
|
type VLLM struct {
|
||||||
|
container testcontainers.Container
|
||||||
|
workDir string
|
||||||
|
// URL is the upstream URL the vllm provider points at (http://<alias>:8000).
|
||||||
|
URL string
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartVLLM runs the mock vLLM server on the shared network over plain HTTP.
|
||||||
|
func StartVLLM(ctx context.Context, c *Combined) (*VLLM, error) {
|
||||||
|
workDir, err := os.MkdirTemp("/tmp", "nb-e2e-vllm-*")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create vllm work dir: %w", err)
|
||||||
|
}
|
||||||
|
// Widen so the (non-root worker) nginx container can traverse the bind mount.
|
||||||
|
if err := os.Chmod(workDir, 0o755); err != nil { //nolint:gosec // throwaway e2e config dir
|
||||||
|
return nil, fmt.Errorf("chmod vllm dir: %w", err)
|
||||||
|
}
|
||||||
|
if err := os.WriteFile(filepath.Join(workDir, "nginx.conf"), []byte(vllmNginxConf), 0o644); err != nil { //nolint:gosec // non-secret e2e config
|
||||||
|
return nil, fmt.Errorf("write nginx conf: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req := testcontainers.ContainerRequest{
|
||||||
|
Image: vllmImage,
|
||||||
|
ExposedPorts: []string{vllmPort},
|
||||||
|
Networks: []string{c.network.Name},
|
||||||
|
NetworkAliases: map[string][]string{c.network.Name: {vllmAlias}},
|
||||||
|
Cmd: []string{"nginx", "-c", "/conf/nginx.conf", "-g", "daemon off;"},
|
||||||
|
HostConfigModifier: func(hc *container.HostConfig) {
|
||||||
|
hc.Binds = append(hc.Binds, workDir+":/conf:ro")
|
||||||
|
},
|
||||||
|
WaitingFor: wait.ForListeningPort(vllmPort).WithStartupTimeout(60 * time.Second),
|
||||||
|
}
|
||||||
|
|
||||||
|
ctr, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
|
||||||
|
ContainerRequest: req,
|
||||||
|
Started: true,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
_ = os.RemoveAll(workDir)
|
||||||
|
return nil, fmt.Errorf("start vllm container: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &VLLM{container: ctr, workDir: workDir, URL: "http://" + vllmAlias + ":8000"}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Logs returns the vLLM container logs, for diagnostics on failure.
|
||||||
|
func (v *VLLM) Logs(ctx context.Context) string {
|
||||||
|
return containerLogs(ctx, v.container)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Terminate stops the vLLM container and cleans its work dir.
|
||||||
|
func (v *VLLM) Terminate(ctx context.Context) error {
|
||||||
|
var err error
|
||||||
|
if v.container != nil {
|
||||||
|
err = v.container.Terminate(ctx)
|
||||||
|
}
|
||||||
|
if v.workDir != "" {
|
||||||
|
_ = os.RemoveAll(v.workDir)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
@@ -351,11 +351,6 @@ initialize_default_values() {
|
|||||||
NETBIRD_STUN_PORT=3478
|
NETBIRD_STUN_PORT=3478
|
||||||
|
|
||||||
# Docker images
|
# Docker images
|
||||||
# Record whether the operator explicitly pinned the server/proxy images via
|
|
||||||
# env vars, so the agent-network preset can pick its own defaults without
|
|
||||||
# clobbering an explicit override.
|
|
||||||
NETBIRD_SERVER_IMAGE_EXPLICIT=${NETBIRD_SERVER_IMAGE:+true}
|
|
||||||
NETBIRD_PROXY_IMAGE_EXPLICIT=${NETBIRD_PROXY_IMAGE:+true}
|
|
||||||
DASHBOARD_IMAGE=${DASHBOARD_IMAGE:-"netbirdio/dashboard:latest"}
|
DASHBOARD_IMAGE=${DASHBOARD_IMAGE:-"netbirdio/dashboard:latest"}
|
||||||
# Combined server replaces separate signal, relay, and management containers
|
# Combined server replaces separate signal, relay, and management containers
|
||||||
NETBIRD_SERVER_IMAGE=${NETBIRD_SERVER_IMAGE:-"netbirdio/netbird-server:latest"}
|
NETBIRD_SERVER_IMAGE=${NETBIRD_SERVER_IMAGE:-"netbirdio/netbird-server:latest"}
|
||||||
@@ -415,15 +410,6 @@ apply_agent_network_preset() {
|
|||||||
ENABLE_PROXY="true"
|
ENABLE_PROXY="true"
|
||||||
ENABLE_CROWDSEC="false"
|
ENABLE_CROWDSEC="false"
|
||||||
|
|
||||||
# Agent-network ships dedicated server/proxy images. Honor an explicit
|
|
||||||
# env override; otherwise pin the agent-network builds.
|
|
||||||
if [[ "${NETBIRD_SERVER_IMAGE_EXPLICIT}" != "true" ]]; then
|
|
||||||
NETBIRD_SERVER_IMAGE="netbirdio/netbird-server:0.74.0-rc.2"
|
|
||||||
fi
|
|
||||||
if [[ "${NETBIRD_PROXY_IMAGE_EXPLICIT}" != "true" ]]; then
|
|
||||||
NETBIRD_PROXY_IMAGE="netbirdio/reverse-proxy:0.74.0-rc.2"
|
|
||||||
fi
|
|
||||||
|
|
||||||
if [[ -n "${NETBIRD_LETSENCRYPT_EMAIL}" ]]; then
|
if [[ -n "${NETBIRD_LETSENCRYPT_EMAIL}" ]]; then
|
||||||
TRAEFIK_ACME_EMAIL="${NETBIRD_LETSENCRYPT_EMAIL}"
|
TRAEFIK_ACME_EMAIL="${NETBIRD_LETSENCRYPT_EMAIL}"
|
||||||
else
|
else
|
||||||
|
|||||||
@@ -627,6 +627,21 @@ var providers = []Provider{
|
|||||||
},
|
},
|
||||||
Models: []Model{},
|
Models: []Model{},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
// vLLM is an OpenAI-compatible self-hosted server. It behaves like
|
||||||
|
// the generic custom entry; it gets its own catalog id purely so it
|
||||||
|
// surfaces as a named "vLLM" choice in the provider picker.
|
||||||
|
ID: "vllm",
|
||||||
|
Kind: KindCustom,
|
||||||
|
Name: "vLLM",
|
||||||
|
Description: "Self-hosted vLLM (OpenAI-compatible)",
|
||||||
|
DefaultHost: "",
|
||||||
|
AuthHeaderName: "Authorization",
|
||||||
|
AuthHeaderTemplate: "Bearer ${API_KEY}",
|
||||||
|
DefaultContentType: "application/json",
|
||||||
|
BrandColor: "#30A2FF",
|
||||||
|
Models: []Model{},
|
||||||
|
},
|
||||||
{
|
{
|
||||||
ID: "custom",
|
ID: "custom",
|
||||||
Kind: KindCustom,
|
Kind: KindCustom,
|
||||||
|
|||||||
@@ -47,16 +47,13 @@ func init() {
|
|||||||
precomputedDeprecatedRemotePeersConstraint = constraint
|
precomputedDeprecatedRemotePeersConstraint = constraint
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// toNetbirdConfig converts the server configuration to the wire representation. It returns
|
||||||
|
// nil when no server config is set (the fan-out network-map path) because clients treat any
|
||||||
|
// non-nil config as authoritative: a config without a relay section is interpreted as relay
|
||||||
|
// disabled and wipes the clients' relay URLs.
|
||||||
func toNetbirdConfig(config *nbconfig.Config, turnCredentials *Token, relayToken *Token, extraSettings *types.ExtraSettings, settings *types.Settings) *proto.NetbirdConfig {
|
func toNetbirdConfig(config *nbconfig.Config, turnCredentials *Token, relayToken *Token, extraSettings *types.ExtraSettings, settings *types.Settings) *proto.NetbirdConfig {
|
||||||
if config == nil {
|
if config == nil {
|
||||||
if settings == nil {
|
return nil
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return &proto.NetbirdConfig{
|
|
||||||
Metrics: &proto.MetricsConfig{
|
|
||||||
Enabled: settings.MetricsPushEnabled,
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var stuns []*proto.HostConfig
|
var stuns []*proto.HostConfig
|
||||||
|
|||||||
@@ -8,11 +8,13 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
"github.com/stretchr/testify/assert"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
nbdns "github.com/netbirdio/netbird/dns"
|
nbdns "github.com/netbirdio/netbird/dns"
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller/cache"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller/cache"
|
||||||
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
|
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
|
||||||
|
"github.com/netbirdio/netbird/management/server/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestToProtocolDNSConfigWithCache(t *testing.T) {
|
func TestToProtocolDNSConfigWithCache(t *testing.T) {
|
||||||
@@ -263,3 +265,39 @@ func TestEncodeSessionExpiresAt(t *testing.T) {
|
|||||||
assert.True(t, got.AsTime().Equal(deadline))
|
assert.True(t, got.AsTime().Equal(deadline))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestToNetbirdConfig_RelayInvariant guards against the v0.74.0 relay-wipe regression.
|
||||||
|
// Clients treat any non-nil NetbirdConfig as authoritative and interpret a missing relay
|
||||||
|
// section as relay disabled, wiping their relay URLs. toNetbirdConfig must therefore
|
||||||
|
// return nil when no server config is set (the fan-out network-map path) instead of a
|
||||||
|
// partial config, and a result built from a relay-enabled config must carry the relay
|
||||||
|
// section.
|
||||||
|
func TestToNetbirdConfig_RelayInvariant(t *testing.T) {
|
||||||
|
settings := &types.Settings{MetricsPushEnabled: true}
|
||||||
|
|
||||||
|
t.Run("nil server config returns nil config", func(t *testing.T) {
|
||||||
|
nbCfg := toNetbirdConfig(nil, nil, nil, nil, settings)
|
||||||
|
assert.Nil(t, nbCfg, "fan-out updates must not carry a partial NetbirdConfig even when settings are present")
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("relay-enabled config carries relay section", func(t *testing.T) {
|
||||||
|
cfg := &nbconfig.Config{
|
||||||
|
Stuns: []*nbconfig.Host{{Proto: nbconfig.UDP, URI: "stun:stun.example.com:3478"}},
|
||||||
|
TURNConfig: &nbconfig.TURNConfig{
|
||||||
|
Turns: []*nbconfig.Host{{Proto: nbconfig.UDP, URI: "turn:turn.example.com:3478", Username: "user", Password: "pass"}},
|
||||||
|
},
|
||||||
|
Relay: &nbconfig.Relay{Addresses: []string{"rels://relay.example.com:443"}},
|
||||||
|
Signal: &nbconfig.Host{Proto: nbconfig.HTTP, URI: "signal.example.com:10000"},
|
||||||
|
}
|
||||||
|
relayToken := &Token{Payload: "token-payload", Signature: "token-signature"}
|
||||||
|
|
||||||
|
nbCfg := toNetbirdConfig(cfg, nil, relayToken, nil, settings)
|
||||||
|
require.NotNil(t, nbCfg)
|
||||||
|
require.NotNil(t, nbCfg.Relay, "non-nil NetbirdConfig must include the relay section")
|
||||||
|
assert.Equal(t, cfg.Relay.Addresses, nbCfg.Relay.Urls, "relay URLs should match the server config")
|
||||||
|
assert.Equal(t, relayToken.Payload, nbCfg.Relay.TokenPayload, "relay token payload should be set")
|
||||||
|
assert.Equal(t, relayToken.Signature, nbCfg.Relay.TokenSignature, "relay token signature should be set")
|
||||||
|
require.NotNil(t, nbCfg.Metrics)
|
||||||
|
assert.True(t, nbCfg.Metrics.Enabled, "metrics flag should carry the settings value")
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -1048,11 +1048,7 @@ func testUpdateAccountPeers(t *testing.T) {
|
|||||||
|
|
||||||
for _, channel := range peerChannels {
|
for _, channel := range peerChannels {
|
||||||
update := <-channel
|
update := <-channel
|
||||||
assert.NotNil(t, update.Update.NetbirdConfig)
|
assert.Nil(t, update.Update.NetbirdConfig, "fan-out updates must not carry a NetbirdConfig; clients treat a config without relay as relay disabled and wipe their relay URLs")
|
||||||
assert.Nil(t, update.Update.NetbirdConfig.Stuns)
|
|
||||||
assert.Nil(t, update.Update.NetbirdConfig.Turns)
|
|
||||||
assert.Nil(t, update.Update.NetbirdConfig.Signal)
|
|
||||||
assert.Nil(t, update.Update.NetbirdConfig.Relay)
|
|
||||||
assert.Equal(t, tc.peers, len(update.Update.NetworkMap.RemotePeers))
|
assert.Equal(t, tc.peers, len(update.Update.NetworkMap.RemotePeers))
|
||||||
assert.Equal(t, tc.peers*2, len(update.Update.NetworkMap.FirewallRules))
|
assert.Equal(t, tc.peers*2, len(update.Update.NetworkMap.FirewallRules))
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,10 +33,15 @@ const ConnectTimeout = 10 * time.Second
|
|||||||
const healthCheckTimeout = 5 * time.Second
|
const healthCheckTimeout = 5 * time.Second
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// EnvMaxRecvMsgSize overrides the default gRPC max receive message size (4 MB)
|
// EnvMaxRecvMsgSize overrides the default gRPC max receive message size
|
||||||
// for the management client connection. Value is in bytes.
|
// for the management client connection. Value is in bytes.
|
||||||
EnvMaxRecvMsgSize = "NB_MANAGEMENT_GRPC_MAX_MSG_SIZE"
|
EnvMaxRecvMsgSize = "NB_MANAGEMENT_GRPC_MAX_MSG_SIZE"
|
||||||
|
|
||||||
|
// defaultMaxRecvMsgSize is the max gRPC receive message size used for the
|
||||||
|
// management client connection when EnvMaxRecvMsgSize is unset or invalid.
|
||||||
|
// It overrides the gRPC library default of 4 MB.
|
||||||
|
defaultMaxRecvMsgSize = 1024 * 1024 * 16
|
||||||
|
|
||||||
errMsgMgmtPublicKey = "failed getting Management Service public key: %s"
|
errMsgMgmtPublicKey = "failed getting Management Service public key: %s"
|
||||||
errMsgNoMgmtConnection = "no connection to management"
|
errMsgNoMgmtConnection = "no connection to management"
|
||||||
)
|
)
|
||||||
@@ -84,22 +89,22 @@ type ExposeResponse struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// MaxRecvMsgSize returns the configured max gRPC receive message size from
|
// MaxRecvMsgSize returns the configured max gRPC receive message size from
|
||||||
// the environment, or 0 if unset (which uses the gRPC default of 4 MB).
|
// the environment, or defaultMaxRecvMsgSize (16 MB) if unset or invalid.
|
||||||
func MaxRecvMsgSize() int {
|
func MaxRecvMsgSize() int {
|
||||||
val := os.Getenv(EnvMaxRecvMsgSize)
|
val := os.Getenv(EnvMaxRecvMsgSize)
|
||||||
if val == "" {
|
if val == "" {
|
||||||
return 0
|
return defaultMaxRecvMsgSize
|
||||||
}
|
}
|
||||||
|
|
||||||
size, err := strconv.Atoi(val)
|
size, err := strconv.Atoi(val)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("invalid %s value %q, using default: %v", EnvMaxRecvMsgSize, val, err)
|
log.Warnf("invalid %s value %q, using default: %v", EnvMaxRecvMsgSize, val, err)
|
||||||
return 0
|
return defaultMaxRecvMsgSize
|
||||||
}
|
}
|
||||||
|
|
||||||
if size <= 0 {
|
if size <= 0 {
|
||||||
log.Warnf("invalid %s value %d, must be positive, using default", EnvMaxRecvMsgSize, size)
|
log.Warnf("invalid %s value %d, must be positive, using default", EnvMaxRecvMsgSize, size)
|
||||||
return 0
|
return defaultMaxRecvMsgSize
|
||||||
}
|
}
|
||||||
|
|
||||||
return size
|
return size
|
||||||
@@ -1030,8 +1035,6 @@ func infoToMetaData(info *system.Info) *proto.PeerSystemMeta {
|
|||||||
BlockLANAccess: info.BlockLANAccess,
|
BlockLANAccess: info.BlockLANAccess,
|
||||||
BlockInbound: info.BlockInbound,
|
BlockInbound: info.BlockInbound,
|
||||||
DisableIPv6: info.DisableIPv6,
|
DisableIPv6: info.DisableIPv6,
|
||||||
|
|
||||||
LazyConnectionEnabled: info.LazyConnectionEnabled,
|
|
||||||
},
|
},
|
||||||
|
|
||||||
Capabilities: peerCapabilities(*info),
|
Capabilities: peerCapabilities(*info),
|
||||||
|
|||||||
@@ -21,11 +21,11 @@ func TestMaxRecvMsgSize(t *testing.T) {
|
|||||||
envValue string
|
envValue string
|
||||||
expected int
|
expected int
|
||||||
}{
|
}{
|
||||||
{name: "unset returns 0", envValue: "", expected: 0},
|
{name: "unset returns default", envValue: "", expected: defaultMaxRecvMsgSize},
|
||||||
{name: "valid value", envValue: "10485760", expected: 10485760},
|
{name: "valid value", envValue: "10485760", expected: 10485760},
|
||||||
{name: "non-numeric returns 0", envValue: "abc", expected: 0},
|
{name: "non-numeric returns default", envValue: "abc", expected: defaultMaxRecvMsgSize},
|
||||||
{name: "negative returns 0", envValue: "-1", expected: 0},
|
{name: "negative returns default", envValue: "-1", expected: defaultMaxRecvMsgSize},
|
||||||
{name: "zero returns 0", envValue: "0", expected: 0},
|
{name: "zero returns default", envValue: "0", expected: defaultMaxRecvMsgSize},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
|||||||
Reference in New Issue
Block a user