mirror of
https://github.com/netbirdio/netbird.git
synced 2026-07-01 20:29:58 +00:00
Compare commits
14 Commits
lazy-conn-
...
netmap_pro
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
79567fe347 | ||
|
|
cf8d92fbb0 | ||
|
|
b70fc4015b | ||
|
|
4988b6726e | ||
|
|
2552830184 | ||
|
|
3b8fc688f4 | ||
|
|
d82d62e818 | ||
|
|
0bf964dad7 | ||
|
|
297dcb3e24 | ||
|
|
bc22926fe0 | ||
|
|
d3f2ef9adb | ||
|
|
5bec1e8f03 | ||
|
|
74bb5c613e | ||
|
|
29dde908ae |
@@ -10,7 +10,7 @@ var (
|
||||
EnvKeyNBForceRelay = peer.EnvKeyNBForceRelay
|
||||
|
||||
// EnvKeyNBLazyConn Exported for Android java client to configure lazy connection
|
||||
EnvKeyNBLazyConn = lazyconn.EnvLazyConn
|
||||
EnvKeyNBLazyConn = lazyconn.EnvEnableLazyConn
|
||||
|
||||
// EnvKeyNBInactivityThreshold Exported for Android java client to configure connection inactivity threshold
|
||||
EnvKeyNBInactivityThreshold = lazyconn.EnvInactivityThreshold
|
||||
|
||||
@@ -71,14 +71,12 @@ var (
|
||||
extraIFaceBlackList []string
|
||||
anonymizeFlag bool
|
||||
dnsRouteInterval time.Duration
|
||||
// lazyConnEnabled is the parse target for the deprecated --enable-lazy-connection
|
||||
// flag. The flag is inert; the value is no longer read (use NB_LAZY_CONN instead).
|
||||
lazyConnEnabled bool
|
||||
mtu uint16
|
||||
profilesDisabled bool
|
||||
updateSettingsDisabled bool
|
||||
captureEnabled bool
|
||||
networksDisabled bool
|
||||
lazyConnEnabled bool
|
||||
mtu uint16
|
||||
profilesDisabled bool
|
||||
updateSettingsDisabled bool
|
||||
captureEnabled bool
|
||||
networksDisabled bool
|
||||
|
||||
rootCmd = &cobra.Command{
|
||||
Use: "netbird",
|
||||
@@ -212,8 +210,7 @@ func init() {
|
||||
upCmd.PersistentFlags().BoolVar(&rosenpassEnabled, enableRosenpassFlag, false, "[Experimental] Enable Rosenpass feature. If enabled, the connection will be post-quantum secured via Rosenpass.")
|
||||
upCmd.PersistentFlags().BoolVar(&rosenpassPermissive, rosenpassPermissiveFlag, false, "[Experimental] Enable Rosenpass in permissive mode to allow this peer to accept WireGuard connections without requiring Rosenpass functionality from peers that do not have Rosenpass enabled.")
|
||||
upCmd.PersistentFlags().BoolVar(&autoConnectDisabled, disableAutoConnectFlag, false, "Disables auto-connect feature. If enabled, then the client won't connect automatically when the service starts.")
|
||||
upCmd.PersistentFlags().BoolVar(&lazyConnEnabled, enableLazyConnectionFlag, false, "Deprecated: no longer used. Lazy connections are controlled by the server and the NB_LAZY_CONN environment variable.")
|
||||
_ = upCmd.PersistentFlags().MarkDeprecated(enableLazyConnectionFlag, "no longer used; lazy connections are controlled by the server and the NB_LAZY_CONN environment variable")
|
||||
upCmd.PersistentFlags().BoolVar(&lazyConnEnabled, enableLazyConnectionFlag, false, "[Experimental] Enable the lazy connection feature. If enabled, the client will establish connections on-demand. Note: this setting may be overridden by management configuration.")
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -479,6 +479,10 @@ func setupSetConfigReq(customDNSAddressConverted []byte, cmd *cobra.Command, pro
|
||||
req.DisableIpv6 = &disableIPv6
|
||||
}
|
||||
|
||||
if cmd.Flag(enableLazyConnectionFlag).Changed {
|
||||
req.LazyConnectionEnabled = &lazyConnEnabled
|
||||
}
|
||||
|
||||
return &req
|
||||
}
|
||||
|
||||
@@ -596,6 +600,9 @@ func setupConfig(customDNSAddressConverted []byte, cmd *cobra.Command, configFil
|
||||
ic.DisableIPv6 = &disableIPv6
|
||||
}
|
||||
|
||||
if cmd.Flag(enableLazyConnectionFlag).Changed {
|
||||
ic.LazyConnectionEnabled = &lazyConnEnabled
|
||||
}
|
||||
return &ic, nil
|
||||
}
|
||||
|
||||
@@ -711,6 +718,9 @@ func setupLoginRequest(providedSetupKey string, customDNSAddressConverted []byte
|
||||
loginRequest.DisableIpv6 = &disableIPv6
|
||||
}
|
||||
|
||||
if cmd.Flag(enableLazyConnectionFlag).Changed {
|
||||
loginRequest.LazyConnectionEnabled = &lazyConnEnabled
|
||||
}
|
||||
return &loginRequest, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -322,6 +322,7 @@ func (a *Auth) setSystemInfoFlags(info *system.Info) {
|
||||
a.config.BlockLANAccess,
|
||||
a.config.BlockInbound,
|
||||
a.config.DisableIPv6,
|
||||
a.config.LazyConnectionEnabled,
|
||||
a.config.EnableSSHRoot,
|
||||
a.config.EnableSSHSFTP,
|
||||
a.config.EnableSSHLocalPortForwarding,
|
||||
|
||||
@@ -16,16 +16,6 @@ import (
|
||||
"github.com/netbirdio/netbird/route"
|
||||
)
|
||||
|
||||
// lazyForce is the resolved local decision for lazy connections, layered above the
|
||||
// management feature flag. lazyForceNone defers to management.
|
||||
type lazyForce int
|
||||
|
||||
const (
|
||||
lazyForceNone lazyForce = iota
|
||||
lazyForceOn
|
||||
lazyForceOff
|
||||
)
|
||||
|
||||
// ConnMgr coordinates both lazy connections (established on-demand) and permanent peer connections.
|
||||
//
|
||||
// The connection manager is responsible for:
|
||||
@@ -38,7 +28,7 @@ type ConnMgr struct {
|
||||
peerStore *peerstore.Store
|
||||
statusRecorder *peer.Status
|
||||
iface lazyconn.WGIface
|
||||
force lazyForce
|
||||
enabledLocally bool
|
||||
rosenpassEnabled bool
|
||||
|
||||
lazyConnMgr *manager.Manager
|
||||
@@ -53,34 +43,28 @@ func NewConnMgr(engineConfig *EngineConfig, statusRecorder *peer.Status, peerSto
|
||||
peerStore: peerStore,
|
||||
statusRecorder: statusRecorder,
|
||||
iface: iface,
|
||||
force: resolveLazyForce(engineConfig.LazyConnection),
|
||||
rosenpassEnabled: engineConfig.RosenpassEnabled,
|
||||
}
|
||||
if engineConfig.LazyConnectionEnabled || lazyconn.IsLazyConnEnabledByEnv() {
|
||||
e.enabledLocally = true
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
// Start initializes the connection manager. It starts the lazy connection manager when a
|
||||
// local override forces it on; with no local override it waits for the management feature flag.
|
||||
// Start initializes the connection manager and starts the lazy connection manager if enabled by env var or cmd line option.
|
||||
func (e *ConnMgr) Start(ctx context.Context) {
|
||||
if e.lazyConnMgr != nil {
|
||||
log.Errorf("lazy connection manager is already started")
|
||||
return
|
||||
}
|
||||
|
||||
switch e.force {
|
||||
case lazyForceOff:
|
||||
log.Infof("lazy connection manager is disabled by local override (%s or MDM policy)", lazyconn.EnvLazyConn)
|
||||
e.statusRecorder.UpdateLazyConnection(false)
|
||||
return
|
||||
case lazyForceNone:
|
||||
log.Infof("lazy connection manager is managed by the management feature flag")
|
||||
e.statusRecorder.UpdateLazyConnection(false)
|
||||
if !e.enabledLocally {
|
||||
log.Infof("lazy connection manager is disabled")
|
||||
return
|
||||
}
|
||||
|
||||
if e.rosenpassEnabled {
|
||||
log.Warnf("rosenpass connection manager is enabled, lazy connection manager will not be started")
|
||||
e.statusRecorder.UpdateLazyConnection(false)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -92,8 +76,8 @@ func (e *ConnMgr) Start(ctx context.Context) {
|
||||
// If enabled, it initializes the lazy connection manager and start it. Do not need to call Start() again.
|
||||
// If disabled, then it closes the lazy connection manager and open the connections to all peers.
|
||||
func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) error {
|
||||
// a local override (NB_LAZY_CONN or local config) takes precedence over management
|
||||
if e.force != lazyForceNone {
|
||||
// do not disable lazy connection manager if it was enabled by env var
|
||||
if e.enabledLocally {
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -105,7 +89,6 @@ func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) er
|
||||
|
||||
if e.rosenpassEnabled {
|
||||
log.Infof("rosenpass connection manager is enabled, lazy connection manager will not be started")
|
||||
e.statusRecorder.UpdateLazyConnection(false)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -115,7 +98,6 @@ func (e *ConnMgr) UpdatedRemoteFeatureFlag(ctx context.Context, enabled bool) er
|
||||
return e.addPeersToLazyConnManager()
|
||||
} else {
|
||||
if e.lazyConnMgr == nil {
|
||||
e.statusRecorder.UpdateLazyConnection(false)
|
||||
return nil
|
||||
}
|
||||
log.Infof("lazy connection manager is disabled by management feature flag")
|
||||
@@ -327,25 +309,6 @@ func (e *ConnMgr) isStartedWithLazyMgr() bool {
|
||||
return e.lazyConnMgr != nil && e.lazyCtxCancel != nil
|
||||
}
|
||||
|
||||
// resolveLazyForce determines the local override. NB_LAZY_CONN takes precedence; when it
|
||||
// is unset the MDM policy override (mdmState) applies. Either wins in both directions over
|
||||
// the management feature flag; StateUnset for both defers to management.
|
||||
func resolveLazyForce(mdmState lazyconn.State) lazyForce {
|
||||
state := lazyconn.EnvState()
|
||||
if state == lazyconn.StateUnset {
|
||||
state = mdmState
|
||||
}
|
||||
|
||||
switch state {
|
||||
case lazyconn.StateOn:
|
||||
return lazyForceOn
|
||||
case lazyconn.StateOff:
|
||||
return lazyForceOff
|
||||
default:
|
||||
return lazyForceNone
|
||||
}
|
||||
}
|
||||
|
||||
func inactivityThresholdEnv() *time.Duration {
|
||||
envValue := os.Getenv(lazyconn.EnvInactivityThreshold)
|
||||
if envValue == "" {
|
||||
|
||||
@@ -1,40 +0,0 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||
)
|
||||
|
||||
func TestResolveLazyForce(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
env string
|
||||
envSet bool
|
||||
mdm lazyconn.State
|
||||
want lazyForce
|
||||
}{
|
||||
{name: "env unset, mdm unset -> defer to management", mdm: lazyconn.StateUnset, want: lazyForceNone},
|
||||
{name: "env on -> force on", env: "on", envSet: true, mdm: lazyconn.StateUnset, want: lazyForceOn},
|
||||
{name: "env off -> force off", env: "off", envSet: true, mdm: lazyconn.StateUnset, want: lazyForceOff},
|
||||
{name: "env unset, mdm on -> force on", mdm: lazyconn.StateOn, want: lazyForceOn},
|
||||
{name: "env unset, mdm off -> force off", mdm: lazyconn.StateOff, want: lazyForceOff},
|
||||
{name: "env on beats mdm off", env: "on", envSet: true, mdm: lazyconn.StateOff, want: lazyForceOn},
|
||||
{name: "env off beats mdm on", env: "off", envSet: true, mdm: lazyconn.StateOn, want: lazyForceOff},
|
||||
{name: "unrecognized env, mdm on -> mdm wins", env: "auto", envSet: true, mdm: lazyconn.StateOn, want: lazyForceOn},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
t.Setenv(lazyconn.EnvLazyConn, tt.env)
|
||||
if !tt.envSet {
|
||||
os.Unsetenv(lazyconn.EnvLazyConn)
|
||||
}
|
||||
|
||||
if got := resolveLazyForce(tt.mdm); got != tt.want {
|
||||
t.Fatalf("resolveLazyForce(%v) = %v, want %v", tt.mdm, got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -27,7 +27,6 @@ import (
|
||||
"github.com/netbirdio/netbird/client/iface/device"
|
||||
"github.com/netbirdio/netbird/client/iface/netstack"
|
||||
"github.com/netbirdio/netbird/client/internal/dns"
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||
"github.com/netbirdio/netbird/client/internal/listener"
|
||||
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
@@ -597,7 +596,7 @@ func createEngineConfig(key wgtypes.Key, config *profilemanager.Config, peerConf
|
||||
BlockInbound: config.BlockInbound,
|
||||
DisableIPv6: config.DisableIPv6,
|
||||
|
||||
LazyConnection: lazyconn.ParseState(config.LazyConnection),
|
||||
LazyConnectionEnabled: config.LazyConnectionEnabled,
|
||||
|
||||
MTU: selectMTU(config.MTU, peerConfig.Mtu),
|
||||
LogPath: logPath,
|
||||
@@ -671,6 +670,7 @@ func loginToManagement(ctx context.Context, client mgm.Client, pubSSHKey []byte,
|
||||
config.BlockLANAccess,
|
||||
config.BlockInbound,
|
||||
config.DisableIPv6,
|
||||
config.LazyConnectionEnabled,
|
||||
config.EnableSSHRoot,
|
||||
config.EnableSSHSFTP,
|
||||
config.EnableSSHLocalPortForwarding,
|
||||
|
||||
@@ -681,7 +681,7 @@ func (g *BundleGenerator) addCommonConfigFields(configContent *strings.Builder)
|
||||
configContent.WriteString(fmt.Sprintf("ClientCertKeyPath: %s\n", g.internalConfig.ClientCertKeyPath))
|
||||
}
|
||||
|
||||
configContent.WriteString(fmt.Sprintf("LazyConnection: %q\n", g.internalConfig.LazyConnection))
|
||||
configContent.WriteString(fmt.Sprintf("LazyConnectionEnabled: %v\n", g.internalConfig.LazyConnectionEnabled))
|
||||
configContent.WriteString(fmt.Sprintf("MTU: %d\n", g.internalConfig.MTU))
|
||||
}
|
||||
|
||||
|
||||
@@ -885,7 +885,7 @@ func TestAddConfig_AllFieldsCovered(t *testing.T) {
|
||||
DNSRouteInterval: 5 * time.Second,
|
||||
ClientCertPath: "/tmp/cert",
|
||||
ClientCertKeyPath: "/tmp/key",
|
||||
LazyConnection: "on",
|
||||
LazyConnectionEnabled: true,
|
||||
MTU: 1280,
|
||||
}
|
||||
|
||||
|
||||
@@ -40,7 +40,6 @@ import (
|
||||
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
||||
"github.com/netbirdio/netbird/client/internal/expose"
|
||||
"github.com/netbirdio/netbird/client/internal/ingressgw"
|
||||
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||
"github.com/netbirdio/netbird/client/internal/metrics"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow"
|
||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
@@ -148,9 +147,7 @@ type EngineConfig struct {
|
||||
BlockInbound bool
|
||||
DisableIPv6 bool
|
||||
|
||||
// LazyConnection is the MDM-sourced lazy-connection override; StateUnset defers to
|
||||
// the env var and management feature flag.
|
||||
LazyConnection lazyconn.State
|
||||
LazyConnectionEnabled bool
|
||||
|
||||
MTU uint16
|
||||
|
||||
@@ -219,6 +216,12 @@ type Engine struct {
|
||||
// networkSerial is the latest CurrentSerial (state ID) of the network sent by the Management service
|
||||
networkSerial uint64
|
||||
|
||||
// forwardingRules holds the ingress forward rules applied for the current target.
|
||||
// Wholesale sections (incl. forward rules) run only on the first pass of a target;
|
||||
// it is stashed here so the final, peer-converged pass can build the lazy-connection
|
||||
// exclude list without recomputing them on every bounded peer pass.
|
||||
forwardingRules []firewallManager.ForwardRule
|
||||
|
||||
networkMonitor *networkmonitor.NetworkMonitor
|
||||
|
||||
sshServer sshServer
|
||||
@@ -771,7 +774,15 @@ func (e *Engine) blockLanAccess() {
|
||||
|
||||
// modifyPeers updates peers that have been modified (e.g. IP address has been changed).
|
||||
// It closes the existing connection, removes it from the peerConns map, and creates a new one.
|
||||
func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
||||
// maxPeersPerSyncPass is the default per-pass cap on how many peers each of
|
||||
// removePeers/modifyPeers/addNewPeers applies, so syncMsgMux is held only for a
|
||||
// batch at a time and other subsystems can interleave between passes. It is
|
||||
// passed in (not read globally) so tests can exercise the multi-pass path.
|
||||
const maxPeersPerSyncPass = 300
|
||||
|
||||
// modifyPeers re-applies up to maxBatch changed peers per call. It returns true
|
||||
// when more changed peers remained than the cap, so the caller re-runs.
|
||||
func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig, maxBatch int) (bool, error) {
|
||||
|
||||
// first, check if peers have been modified
|
||||
var modified []*mgmProto.RemotePeerConfig
|
||||
@@ -801,26 +812,32 @@ func (e *Engine) modifyPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
||||
}
|
||||
}
|
||||
|
||||
more := false
|
||||
if len(modified) > maxBatch {
|
||||
modified = modified[:maxBatch]
|
||||
more = true
|
||||
}
|
||||
|
||||
// second, close all modified connections and remove them from the state map
|
||||
for _, p := range modified {
|
||||
err := e.removePeer(p.GetWgPubKey())
|
||||
if err != nil {
|
||||
return err
|
||||
if err := e.removePeer(p.GetWgPubKey()); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
// third, add the peer connections again
|
||||
for _, p := range modified {
|
||||
err := e.addNewPeer(p)
|
||||
if err != nil {
|
||||
return err
|
||||
if err := e.addNewPeer(p); err != nil {
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
return more, nil
|
||||
}
|
||||
|
||||
// removePeers finds and removes peers that do not exist anymore in the network map received from the Management Service.
|
||||
// It also removes peers that have been modified (e.g. change of IP address). They will be added again in addPeers method.
|
||||
func (e *Engine) removePeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
||||
// removePeers removes up to maxBatch peers per call. It returns true when more
|
||||
// peers remained to remove than the cap, so the caller re-runs.
|
||||
func (e *Engine) removePeers(peersUpdate []*mgmProto.RemotePeerConfig, maxBatch int) (bool, error) {
|
||||
newPeers := make([]string, 0, len(peersUpdate))
|
||||
for _, p := range peersUpdate {
|
||||
newPeers = append(newPeers, p.GetWgPubKey())
|
||||
@@ -828,14 +845,19 @@ func (e *Engine) removePeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
||||
|
||||
toRemove := util.SliceDiff(e.peerStore.PeersPubKey(), newPeers)
|
||||
|
||||
more := false
|
||||
if len(toRemove) > maxBatch {
|
||||
toRemove = toRemove[:maxBatch]
|
||||
more = true
|
||||
}
|
||||
|
||||
for _, p := range toRemove {
|
||||
err := e.removePeer(p)
|
||||
if err != nil {
|
||||
return err
|
||||
if err := e.removePeer(p); err != nil {
|
||||
return false, err
|
||||
}
|
||||
log.Infof("removed peer %s", p)
|
||||
}
|
||||
return nil
|
||||
return more, nil
|
||||
}
|
||||
|
||||
func (e *Engine) removeAllPeers() error {
|
||||
@@ -914,19 +936,17 @@ func (e *Engine) phase(name string) func() {
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
duration := time.Since(started)
|
||||
log.Infof("sync finished in %s", duration)
|
||||
e.clientMetrics.RecordSyncDuration(e.ctx, duration)
|
||||
}()
|
||||
// applySyncPass applies one bounded pass of the sync update under syncMsgMux and
|
||||
// returns true if more peers remained than the per-pass cap. It is driven by the
|
||||
// mapStateManager, which re-invokes it (releasing the lock between passes) until
|
||||
// the update is fully applied.
|
||||
func (e *Engine) applySyncPass(update *mgmProto.SyncResponse, firstPass bool) (bool, error) {
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
|
||||
// Check context INSIDE lock to ensure atomicity with shutdown
|
||||
if e.ctx.Err() != nil {
|
||||
return e.ctx.Err()
|
||||
return false, e.ctx.Err()
|
||||
}
|
||||
|
||||
if update.NetworkMap != nil && update.NetworkMap.PeerConfig != nil {
|
||||
@@ -937,7 +957,7 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
err := e.updateNetbirdConfig(update.GetNetbirdConfig())
|
||||
done()
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Posture checks are bound to the network map presence:
|
||||
@@ -947,28 +967,25 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
// leave the previously applied checks untouched
|
||||
nm := update.GetNetworkMap()
|
||||
if nm == nil {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
done = e.phase("checks")
|
||||
err = e.updateChecksIfNew(update.Checks)
|
||||
done()
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
|
||||
done = e.phase("persist")
|
||||
e.persistSyncResponse(update)
|
||||
done()
|
||||
|
||||
// only apply new changes and ignore old ones
|
||||
if err := e.updateNetworkMap(nm); err != nil {
|
||||
return err
|
||||
more, err := e.updateNetworkMap(nm, maxPeersPerSyncPass, firstPass)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
e.statusRecorder.PublishEvent(cProto.SystemEvent_INFO, cProto.SystemEvent_SYSTEM, "Network map updated", "", nil)
|
||||
|
||||
return nil
|
||||
return more, nil
|
||||
}
|
||||
|
||||
// updateNetbirdConfig applies the management-provided NetBird configuration:
|
||||
@@ -1014,6 +1031,13 @@ func (e *Engine) updateNetbirdConfig(wCfg *mgmProto.NetbirdConfig) error {
|
||||
// (not syncMsgMux) is held for the whole Set so the store cannot be cleared (disabled /
|
||||
// engine close) mid-call and have this write resurrect a file that was just removed.
|
||||
func (e *Engine) persistSyncResponse(update *mgmProto.SyncResponse) {
|
||||
// Only persist updates that carry a network map. Config-only updates (e.g. relay
|
||||
// token rotation, STUN/TURN) have a nil NetworkMap; persisting them would overwrite
|
||||
// the last full map on disk and break restore-on-restart.
|
||||
if update.GetNetworkMap() == nil {
|
||||
return
|
||||
}
|
||||
|
||||
e.syncRespMux.RLock()
|
||||
defer e.syncRespMux.RUnlock()
|
||||
|
||||
@@ -1120,6 +1144,7 @@ func (e *Engine) applyInfoFlags(info *system.Info) {
|
||||
e.config.BlockLANAccess,
|
||||
e.config.BlockInbound,
|
||||
e.config.DisableIPv6,
|
||||
e.config.LazyConnectionEnabled,
|
||||
e.config.EnableSSHRoot,
|
||||
e.config.EnableSSHSFTP,
|
||||
e.config.EnableSSHLocalPortForwarding,
|
||||
@@ -1293,7 +1318,24 @@ func (e *Engine) receiveManagementEvents() {
|
||||
}
|
||||
e.applyInfoFlags(info)
|
||||
|
||||
err := e.mgmClient.Sync(e.ctx, info, e.handleSync)
|
||||
// The map-state manager converges the latest update in the background in
|
||||
// bounded passes; the stream callback only hands it the newest target.
|
||||
persist := func(u *mgmProto.SyncResponse) {
|
||||
done := e.phase("persist")
|
||||
e.persistSyncResponse(u)
|
||||
done()
|
||||
}
|
||||
manager := newMapStateManager(e.applySyncPass, persist, func(d time.Duration) {
|
||||
log.Infof("sync finished in %s", d)
|
||||
e.clientMetrics.RecordSyncDuration(e.ctx, d)
|
||||
})
|
||||
e.shutdownWg.Add(1)
|
||||
go func() {
|
||||
defer e.shutdownWg.Done()
|
||||
manager.run(e.ctx)
|
||||
}()
|
||||
|
||||
err := e.mgmClient.Sync(e.ctx, info, manager.SetTarget)
|
||||
if err != nil {
|
||||
// happens if management is unavailable for a long time.
|
||||
// We want to cancel the operation of the whole client
|
||||
@@ -1344,21 +1386,107 @@ func (e *Engine) updateTURNs(turns []*mgmProto.ProtectedHostConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
// updateNetworkMap applies the wholesale parts (config, routes, ACL, DNS) in full
|
||||
// and up to maxBatch peers per phase. It returns true when more peers remained
|
||||
// than the cap, so the caller re-runs until convergence.
|
||||
func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap, maxBatch int, firstPass bool) (bool, error) {
|
||||
// intentionally leave it before checking serial because for now it can happen that peer IP changed but serial didn't
|
||||
if networkMap.GetPeerConfig() != nil {
|
||||
err := e.updateConfig(networkMap.GetPeerConfig())
|
||||
if err != nil {
|
||||
return err
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
serial := networkMap.GetSerial()
|
||||
if e.networkSerial > serial {
|
||||
log.Debugf("received outdated NetworkMap with serial %d, ignoring", serial)
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Wholesale sections (firewall/ACL, DNS, routes, forward rules) are applied
|
||||
// up-front and only once per target: they are cheap, local, idempotent and must
|
||||
// be in place before peers come up (fail-closed). On the bounded re-runs that only
|
||||
// drain the remaining peer batches they are skipped — the applied forward rules are
|
||||
// reused from e.forwardingRules for the lazy-exclude finalize.
|
||||
if firstPass {
|
||||
e.applyWholesale(networkMap, serial)
|
||||
}
|
||||
|
||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
||||
|
||||
doneOffline := e.phase("offline_peers")
|
||||
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
||||
doneOffline()
|
||||
|
||||
// Filter out own peer from the remote peers list
|
||||
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
||||
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
|
||||
for _, p := range networkMap.GetRemotePeers() {
|
||||
if p.GetWgPubKey() != localPubKey {
|
||||
remotePeers = append(remotePeers, p)
|
||||
}
|
||||
}
|
||||
|
||||
// No special case for cleanup: when management signals RemotePeersIsEmpty (e.g. our
|
||||
// peer was deleted), remotePeers is already empty, so the bounded diff below removes
|
||||
// every peer in batches — same path as a normal update, no unbounded removeAllPeers
|
||||
// held under syncMsgMux in one shot.
|
||||
doneRemoved := e.phase("removed_peers")
|
||||
removeMore, err := e.removePeers(remotePeers, maxBatch)
|
||||
doneRemoved()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
doneModified := e.phase("modified_peers")
|
||||
modifyMore, err := e.modifyPeers(remotePeers, maxBatch)
|
||||
doneModified()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
doneAdded := e.phase("added_peers")
|
||||
addMore, err := e.addNewPeers(remotePeers, maxBatch)
|
||||
doneAdded()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// needMore signals the caller to re-run when a peer phase hit its per-pass cap.
|
||||
needMore := removeMore || modifyMore || addMore
|
||||
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
|
||||
e.updatePeerSSHHostKeys(remotePeers)
|
||||
|
||||
if err := e.updateSSHClientConfig(remotePeers); err != nil {
|
||||
log.Warnf("failed to update SSH client config: %v", err)
|
||||
}
|
||||
|
||||
e.updateSSHServerAuth(networkMap.GetSshAuth())
|
||||
|
||||
// Set the exclude list only once peers have fully converged (this pass added
|
||||
// the last batch). It needs all target peers present in the store, and
|
||||
// ExcludePeer has replace-semantics — a partial set mid-convergence would be wrong.
|
||||
if !needMore {
|
||||
doneLazy := e.phase("lazy_exclude")
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(e.forwardingRules, remotePeers)
|
||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||
doneLazy()
|
||||
}
|
||||
|
||||
e.networkSerial = serial
|
||||
|
||||
return needMore, nil
|
||||
}
|
||||
|
||||
// applyWholesale applies the cheap, local, idempotent map sections — lazy feature
|
||||
// flag, firewall/legacy management, DNS, routes, ACL filtering, DNS forwarder and
|
||||
// ingress forward rules — that must be in place before peers come up. It runs once
|
||||
// per target (first pass only); the resulting forward rules are stashed in
|
||||
// e.forwardingRules for the lazy-exclude finalize on the peer-converged pass.
|
||||
func (e *Engine) applyWholesale(networkMap *mgmProto.NetworkMap, serial uint64) {
|
||||
if err := e.connMgr.UpdatedRemoteFeatureFlag(e.ctx, networkMap.GetPeerConfig().GetLazyConnectionEnabled()); err != nil {
|
||||
log.Errorf("failed to update lazy connection feature flag: %v", err)
|
||||
}
|
||||
@@ -1431,84 +1559,7 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
log.Errorf("failed to update forward rules, err: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
||||
|
||||
done = e.phase("offline_peers")
|
||||
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
||||
done()
|
||||
|
||||
remotePeers, err := e.reconcilePeers(networkMap)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
||||
done = e.phase("lazy_exclude")
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
|
||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||
done()
|
||||
|
||||
e.networkSerial = serial
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// reconcilePeers applies the remote peer list from the network map (removing,
|
||||
// modifying and adding peers, then updating SSH config) and returns the remote
|
||||
// peers with our own peer filtered out, for use by later sync steps.
|
||||
func (e *Engine) reconcilePeers(networkMap *mgmProto.NetworkMap) ([]*mgmProto.RemotePeerConfig, error) {
|
||||
// Filter out own peer from the remote peers list
|
||||
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
||||
remotePeers := make([]*mgmProto.RemotePeerConfig, 0, len(networkMap.GetRemotePeers()))
|
||||
for _, p := range networkMap.GetRemotePeers() {
|
||||
if p.GetWgPubKey() != localPubKey {
|
||||
remotePeers = append(remotePeers, p)
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup request, most likely our peer has been deleted
|
||||
if networkMap.GetRemotePeersIsEmpty() {
|
||||
err := e.removeAllPeers()
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return remotePeers, nil
|
||||
}
|
||||
|
||||
done := e.phase("removed_peers")
|
||||
err := e.removePeers(remotePeers)
|
||||
done()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
done = e.phase("modified_peers")
|
||||
err = e.modifyPeers(remotePeers)
|
||||
done()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
done = e.phase("added_peers")
|
||||
err = e.addNewPeers(remotePeers)
|
||||
done()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
|
||||
e.updatePeerSSHHostKeys(remotePeers)
|
||||
|
||||
if err := e.updateSSHClientConfig(remotePeers); err != nil {
|
||||
log.Warnf("failed to update SSH client config: %v", err)
|
||||
}
|
||||
|
||||
e.updateSSHServerAuth(networkMap.GetSshAuth())
|
||||
|
||||
return remotePeers, nil
|
||||
e.forwardingRules = forwardingRules
|
||||
}
|
||||
|
||||
func toDNSFeatureFlag(networkMap *mgmProto.NetworkMap) bool {
|
||||
@@ -1688,14 +1739,23 @@ func addrToString(addr netip.Addr) string {
|
||||
}
|
||||
|
||||
// addNewPeers adds peers that were not know before but arrived from the Management service with the update
|
||||
func (e *Engine) addNewPeers(peersUpdate []*mgmProto.RemotePeerConfig) error {
|
||||
// addNewPeers adds up to maxBatch not-yet-present peers per call. It returns true
|
||||
// when more new peers remained than the cap, so the caller re-runs.
|
||||
func (e *Engine) addNewPeers(peersUpdate []*mgmProto.RemotePeerConfig, maxBatch int) (bool, error) {
|
||||
added := 0
|
||||
for _, p := range peersUpdate {
|
||||
err := e.addNewPeer(p)
|
||||
if err != nil {
|
||||
return err
|
||||
if _, ok := e.peerStore.PeerConn(p.GetWgPubKey()); ok {
|
||||
continue // already present (cheap skip), does not count toward the cap
|
||||
}
|
||||
if added >= maxBatch {
|
||||
return true, nil // at least one more new peer remains
|
||||
}
|
||||
if err := e.addNewPeer(p); err != nil {
|
||||
return false, err
|
||||
}
|
||||
added++
|
||||
}
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// addNewPeer add peer if connection doesn't exist
|
||||
@@ -1988,6 +2048,7 @@ func (e *Engine) readInitialSettings() ([]*route.Route, *nbdns.Config, bool, err
|
||||
e.config.BlockLANAccess,
|
||||
e.config.BlockInbound,
|
||||
e.config.DisableIPv6,
|
||||
e.config.LazyConnectionEnabled,
|
||||
e.config.EnableSSHRoot,
|
||||
e.config.EnableSSHSFTP,
|
||||
e.config.EnableSSHLocalPortForwarding,
|
||||
|
||||
@@ -124,7 +124,7 @@ func TestEngine_SSH(t *testing.T) {
|
||||
RemotePeersIsEmpty: false,
|
||||
}
|
||||
|
||||
err = engine.updateNetworkMap(networkMap)
|
||||
_, err = engine.updateNetworkMap(networkMap, maxPeersPerSyncPass, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Nil(t, engine.sshServer)
|
||||
@@ -146,7 +146,7 @@ func TestEngine_SSH(t *testing.T) {
|
||||
RemotePeersIsEmpty: false,
|
||||
}
|
||||
|
||||
err = engine.updateNetworkMap(networkMap)
|
||||
_, err = engine.updateNetworkMap(networkMap, maxPeersPerSyncPass, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(250 * time.Millisecond)
|
||||
@@ -159,7 +159,7 @@ func TestEngine_SSH(t *testing.T) {
|
||||
RemotePeersIsEmpty: false,
|
||||
}
|
||||
|
||||
err = engine.updateNetworkMap(networkMap)
|
||||
_, err = engine.updateNetworkMap(networkMap, maxPeersPerSyncPass, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
// time.Sleep(250 * time.Millisecond)
|
||||
@@ -174,7 +174,7 @@ func TestEngine_SSH(t *testing.T) {
|
||||
RemotePeersIsEmpty: false,
|
||||
}
|
||||
|
||||
err = engine.updateNetworkMap(networkMap)
|
||||
_, err = engine.updateNetworkMap(networkMap, maxPeersPerSyncPass, true)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Nil(t, engine.sshServer)
|
||||
|
||||
@@ -437,7 +437,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
|
||||
|
||||
for _, c := range []testCase{case1, case2, case3, case4, case5, case6} {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
err = engine.updateNetworkMap(c.networkMap)
|
||||
_, err = engine.updateNetworkMap(c.networkMap, maxPeersPerSyncPass, true)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
@@ -464,6 +464,47 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// chunked apply: with a per-pass cap smaller than the number of peers, a
|
||||
// single updateNetworkMap applies one batch and reports more==true; the
|
||||
// caller re-runs until convergence. (engine currently holds 0 peers.)
|
||||
t.Run("chunked add converges over multiple passes", func(t *testing.T) {
|
||||
nm := &mgmtProto.NetworkMap{
|
||||
Serial: 6,
|
||||
RemotePeers: []*mgmtProto.RemotePeerConfig{peer1, peer2, peer3},
|
||||
}
|
||||
|
||||
more, err := engine.updateNetworkMap(nm, 1, true)
|
||||
require.NoError(t, err)
|
||||
require.True(t, more, "pass 1 should signal more")
|
||||
require.Len(t, engine.peerStore.PeersPubKey(), 1)
|
||||
|
||||
more, err = engine.updateNetworkMap(nm, 1, false)
|
||||
require.NoError(t, err)
|
||||
require.True(t, more, "pass 2 should signal more")
|
||||
require.Len(t, engine.peerStore.PeersPubKey(), 2)
|
||||
|
||||
more, err = engine.updateNetworkMap(nm, 1, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, more, "pass 3 should converge")
|
||||
require.Len(t, engine.peerStore.PeersPubKey(), 3)
|
||||
})
|
||||
|
||||
t.Run("chunked remove converges over multiple passes", func(t *testing.T) {
|
||||
nm := &mgmtProto.NetworkMap{
|
||||
Serial: 7,
|
||||
RemotePeers: []*mgmtProto.RemotePeerConfig{peer1}, // remove peer2, peer3
|
||||
}
|
||||
|
||||
more, err := engine.updateNetworkMap(nm, 1, true)
|
||||
require.NoError(t, err)
|
||||
require.True(t, more, "pass 1 should signal more (2 to remove, cap 1)")
|
||||
|
||||
more, err = engine.updateNetworkMap(nm, 1, false)
|
||||
require.NoError(t, err)
|
||||
require.False(t, more, "pass 2 should converge")
|
||||
require.Len(t, engine.peerStore.PeersPubKey(), 1)
|
||||
})
|
||||
}
|
||||
|
||||
func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
|
||||
@@ -634,7 +675,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
err = engine.updateNetworkMap(testCase.networkMap)
|
||||
_, err = engine.updateNetworkMap(testCase.networkMap, maxPeersPerSyncPass, true)
|
||||
assert.NoError(t, err, "shouldn't return error")
|
||||
assert.Equal(t, testCase.expectedSerial, input.inputSerial, "serial should match")
|
||||
assert.Len(t, input.clientRoutes, testCase.expectedLen, "clientRoutes len should match")
|
||||
@@ -838,7 +879,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
|
||||
}
|
||||
}()
|
||||
|
||||
err = engine.updateNetworkMap(testCase.networkMap)
|
||||
_, err = engine.updateNetworkMap(testCase.networkMap, maxPeersPerSyncPass, true)
|
||||
assert.NoError(t, err, "shouldn't return error")
|
||||
assert.Equal(t, testCase.expectedSerial, input.inputSerial, "serial should match")
|
||||
assert.Len(t, input.inputNSGroups, testCase.expectedZonesLen, "zones len should match")
|
||||
|
||||
@@ -3,57 +3,24 @@ package lazyconn
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
EnvLazyConn = "NB_LAZY_CONN"
|
||||
EnvEnableLazyConn = "NB_ENABLE_EXPERIMENTAL_LAZY_CONN"
|
||||
EnvInactivityThreshold = "NB_LAZY_CONN_INACTIVITY_THRESHOLD"
|
||||
)
|
||||
|
||||
// State is the tri-state local override for lazy connections read from the environment.
|
||||
type State int
|
||||
|
||||
const (
|
||||
// StateUnset means no local override; defer to the management feature flag.
|
||||
StateUnset State = iota
|
||||
// StateOn forces lazy connections on, overriding management.
|
||||
StateOn
|
||||
// StateOff forces lazy connections off, overriding management.
|
||||
StateOff
|
||||
)
|
||||
|
||||
// EnvState reads NB_LAZY_CONN and returns the local override state.
|
||||
func EnvState() State {
|
||||
return ParseState(os.Getenv(EnvLazyConn))
|
||||
}
|
||||
|
||||
// ParseState interprets a lazy-connection override value (from the environment or an MDM
|
||||
// policy). It accepts the on/off aliases plus any value strconv.ParseBool understands
|
||||
// (true/false/1/0). An empty or unrecognized value returns StateUnset so that the
|
||||
// management feature flag remains in control.
|
||||
func ParseState(raw string) State {
|
||||
if raw == "" {
|
||||
return StateUnset
|
||||
func IsLazyConnEnabledByEnv() bool {
|
||||
val := os.Getenv(EnvEnableLazyConn)
|
||||
if val == "" {
|
||||
return false
|
||||
}
|
||||
|
||||
normalized := strings.ToLower(strings.TrimSpace(raw))
|
||||
switch normalized {
|
||||
case "on":
|
||||
return StateOn
|
||||
case "off":
|
||||
return StateOff
|
||||
}
|
||||
|
||||
enabled, err := strconv.ParseBool(normalized)
|
||||
enabled, err := strconv.ParseBool(val)
|
||||
if err != nil {
|
||||
log.Warnf("failed to parse %s value %q: %v", EnvLazyConn, raw, err)
|
||||
return StateUnset
|
||||
log.Warnf("failed to parse %s: %v", EnvEnableLazyConn, err)
|
||||
return false
|
||||
}
|
||||
if enabled {
|
||||
return StateOn
|
||||
}
|
||||
return StateOff
|
||||
return enabled
|
||||
}
|
||||
|
||||
@@ -1,45 +0,0 @@
|
||||
package lazyconn
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestEnvState(t *testing.T) {
|
||||
tests := []struct {
|
||||
value string
|
||||
set bool
|
||||
want State
|
||||
}{
|
||||
{set: false, want: StateUnset},
|
||||
{value: "", set: true, want: StateUnset},
|
||||
{value: "on", set: true, want: StateOn},
|
||||
{value: "ON", set: true, want: StateOn},
|
||||
{value: "true", set: true, want: StateOn},
|
||||
{value: "1", set: true, want: StateOn},
|
||||
{value: " on ", set: true, want: StateOn},
|
||||
{value: "off", set: true, want: StateOff},
|
||||
{value: "OFF", set: true, want: StateOff},
|
||||
{value: "false", set: true, want: StateOff},
|
||||
{value: "0", set: true, want: StateOff},
|
||||
{value: "auto", set: true, want: StateUnset},
|
||||
{value: "garbage", set: true, want: StateUnset},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
name := tt.value
|
||||
if !tt.set {
|
||||
name = "unset"
|
||||
}
|
||||
t.Run(name, func(t *testing.T) {
|
||||
t.Setenv(EnvLazyConn, tt.value)
|
||||
if !tt.set {
|
||||
os.Unsetenv(EnvLazyConn)
|
||||
}
|
||||
|
||||
if got := EnvState(); got != tt.want {
|
||||
t.Fatalf("EnvState() = %v, want %v", got, tt.want)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
206
client/internal/mapsync.go
Normal file
206
client/internal/mapsync.go
Normal file
@@ -0,0 +1,206 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/protobuf/proto"
|
||||
|
||||
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
||||
)
|
||||
|
||||
// mapStateManager is the single read/write point between the management stream
|
||||
// (writes) and the convergence loop (reads/applies).
|
||||
//
|
||||
// The stream calls SetTarget with the latest full SyncResponse — the complete
|
||||
// desired state. A single background goroutine (run) applies it to the engine in
|
||||
// bounded passes via apply() until converged, releasing syncMsgMux between passes
|
||||
// so other subsystems interleave. If a newer update arrives mid-flight, the loop
|
||||
// coalesces: it keeps converging toward the latest target and the intermediate one
|
||||
// is SKIPPED — never applied on its own (logged, no onConverged).
|
||||
//
|
||||
// Convergence is a single comparison: appliedGen == targetGen. targetGen
|
||||
// increments on every SetTarget (an internal generation counter, so it also covers
|
||||
// config-only updates that carry no network-map serial).
|
||||
//
|
||||
// onConverged fires once for each — and only each — map that is actually processed
|
||||
// (i.e. converged as the target). Skipped/superseded maps and dropped-on-error maps
|
||||
// do NOT fire it. So "sync finished in X" / RecordSyncDuration always corresponds
|
||||
// to a real, completed alignment.
|
||||
type mapStateManager struct {
|
||||
// apply performs one bounded apply pass and reports whether more passes are needed.
|
||||
// firstPass is true on the first pass of a given target, so the caller can run
|
||||
// wholesale (firewall/routes/DNS/forward-rules) once per target and skip it on the
|
||||
// re-runs that only drain the bounded peer batches. The manager owns this signal
|
||||
// because it owns the convergence boundary; the engine need not track serials for it.
|
||||
apply func(update *mgmProto.SyncResponse, firstPass bool) (bool, error)
|
||||
// onConverged is called once per processed map, with the elapsed time since that
|
||||
// map was received (for the sync-duration metric / "sync finished" log).
|
||||
onConverged func(time.Duration)
|
||||
// persist snapshots an update to disk for restore-on-restart. Called once per
|
||||
// update received from management (in SetTarget), including ones later coalesced
|
||||
// or skipped from apply, so the on-disk state mirrors what management last sent.
|
||||
// The impl skips config-only updates (nil NetworkMap). May be nil.
|
||||
persist func(*mgmProto.SyncResponse)
|
||||
|
||||
mu sync.Mutex
|
||||
target *mgmProto.SyncResponse
|
||||
targetGen uint64
|
||||
appliedGen uint64
|
||||
targetSetAt time.Time
|
||||
|
||||
wake chan struct{}
|
||||
}
|
||||
|
||||
func newMapStateManager(apply func(update *mgmProto.SyncResponse, firstPass bool) (bool, error), persist func(*mgmProto.SyncResponse), onConverged func(time.Duration)) *mapStateManager {
|
||||
return &mapStateManager{
|
||||
apply: apply,
|
||||
persist: persist,
|
||||
onConverged: onConverged,
|
||||
wake: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
// SetTarget records the latest update as the desired state and wakes the loop.
|
||||
// It returns immediately; convergence happens in the background. Serial-based
|
||||
// staleness of the network map is still enforced inside apply (updateNetworkMap).
|
||||
func (m *mapStateManager) SetTarget(update *mgmProto.SyncResponse) error {
|
||||
m.mu.Lock()
|
||||
// A target that has not settled yet (targetGen > appliedGen) is being superseded
|
||||
// before it converged: we coalesce to the latest map and never apply this one on
|
||||
// its own. It is SKIPPED — logged here, and it will not fire onConverged.
|
||||
if m.target != nil && m.targetGen > m.appliedGen {
|
||||
log.Debugf("sync map (gen %d) superseded before convergence, skipping", m.targetGen)
|
||||
}
|
||||
m.target = m.mergeTarget(m.target, update)
|
||||
// Bump an internal generation counter, NOT the map serial: config-only updates
|
||||
// (relay token rotation, STUN/TURN) arrive with NetworkMap == nil and carry no
|
||||
// serial, yet must still be applied. Every SetTarget is therefore a distinct
|
||||
// target regardless of payload. Map-serial staleness is enforced separately
|
||||
// inside apply (updateNetworkMap).
|
||||
m.targetGen++
|
||||
m.targetSetAt = time.Now()
|
||||
m.mu.Unlock()
|
||||
|
||||
select {
|
||||
case m.wake <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
// Persist every update received from management — once per update (not per apply
|
||||
// pass), and including ones that get coalesced/skipped from apply, so the on-disk
|
||||
// state always reflects the latest map management sent. Done after waking the loop
|
||||
// so convergence can start in parallel with the disk write. The persist impl skips
|
||||
// config-only updates (nil NetworkMap).
|
||||
if m.persist != nil {
|
||||
m.persist(update)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// mergeTarget combines the currently pending target with a freshly received update
|
||||
// and returns the new desired state. It is called under m.mu from SetTarget and is
|
||||
// the single seam where the replace-vs-squash decision lives.
|
||||
//
|
||||
// Today management always sends a FULL map (the complete desired state), so the
|
||||
// update simply replaces whatever was pending — prev is ignored. When management
|
||||
// starts sending incremental/delta updates, squash `update` onto `prev` here; the
|
||||
// rest of the manager (generation tracking, convergence, signaling) is unaffected
|
||||
// because it already treats target as "the complete desired state, whatever it is".
|
||||
func (m *mapStateManager) mergeTarget(prev, update *mgmProto.SyncResponse) *mgmProto.SyncResponse {
|
||||
// Plain replace unless a config-only update (no map) arrives while the pending
|
||||
// target still has an unconverged map (targetGen > appliedGen). In that case graft
|
||||
// the pending map (and its checks) onto the incoming config update, so the next pass
|
||||
// keeps converging the map instead of settling on a nil-map target and stranding the
|
||||
// remaining peer batches until another full map arrives.
|
||||
if update == nil || update.GetNetworkMap() != nil || prev == nil || prev.GetNetworkMap() == nil || m.targetGen == m.appliedGen {
|
||||
return update
|
||||
}
|
||||
|
||||
merged, ok := proto.Clone(update).(*mgmProto.SyncResponse)
|
||||
if !ok {
|
||||
return update
|
||||
}
|
||||
merged.NetworkMap = prev.GetNetworkMap()
|
||||
merged.Checks = prev.Checks
|
||||
return merged
|
||||
}
|
||||
|
||||
// run drives convergence until ctx is done. It is meant to run in its own goroutine.
|
||||
func (m *mapStateManager) run(ctx context.Context) {
|
||||
// passGen is the generation of the most recent apply() call (0 = none). A pass is
|
||||
// the first for its target when its generation differs from the previous one —
|
||||
// true on a fresh target and on a coalesced switch to a newer target mid-flight.
|
||||
var passGen uint64
|
||||
for {
|
||||
m.mu.Lock()
|
||||
target, tg, ag := m.target, m.targetGen, m.appliedGen
|
||||
m.mu.Unlock()
|
||||
|
||||
// Fully converged (or nothing yet): block until a new target arrives.
|
||||
if target == nil || ag == tg {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-m.wake:
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
firstPass := tg != passGen
|
||||
passGen = tg
|
||||
more, err := m.apply(target, firstPass)
|
||||
if err != nil {
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
// Log and DROP this target — do not retry it. A deterministic failure
|
||||
// (e.g. a malformed peer in the map) would otherwise spin every pass
|
||||
// making no progress. Management is the source of truth and re-delivers
|
||||
// the full map on the next sync, so dropping is safe; peers already
|
||||
// applied this convergence stay (idempotent diffs) and the remainder is
|
||||
// reconciled by the next target. Mirrors the legacy handleSync path,
|
||||
// where the apply error was logged by the gRPC client and the update
|
||||
// dropped. No onConverged: this target did not converge.
|
||||
log.Errorf("apply sync pass, dropping update: %v", err)
|
||||
m.settle(tg, false)
|
||||
continue
|
||||
}
|
||||
|
||||
if more {
|
||||
// keep converging the current target; syncMsgMux was released by apply
|
||||
// between passes so other subsystems interleave.
|
||||
continue
|
||||
}
|
||||
|
||||
// This pass converged. Mark applied and signal this one map.
|
||||
m.settle(tg, true)
|
||||
// if a newer target arrived mid-pass, settle is a no-op (targetGen != tg) and
|
||||
// ag<tg next iteration -> apply it; this generation was skipped (logged in
|
||||
// SetTarget) and is not signaled.
|
||||
}
|
||||
}
|
||||
|
||||
// settle marks generation tg as processed so the loop goes idle instead of
|
||||
// re-applying the same target. It is a no-op when a newer target arrived during the
|
||||
// pass (targetGen != tg), leaving appliedGen behind so that target re-applies — the
|
||||
// just-finished generation was already counted as skipped.
|
||||
//
|
||||
// When signal is true (the pass converged) it fires onConverged once for this map;
|
||||
// when false (the target was dropped on error) it does not — the map did not converge.
|
||||
func (m *mapStateManager) settle(tg uint64, signal bool) {
|
||||
m.mu.Lock()
|
||||
if m.targetGen != tg {
|
||||
m.mu.Unlock()
|
||||
return
|
||||
}
|
||||
m.appliedGen = tg
|
||||
setAt := m.targetSetAt
|
||||
m.mu.Unlock()
|
||||
|
||||
if signal && m.onConverged != nil {
|
||||
m.onConverged(time.Since(setAt))
|
||||
}
|
||||
}
|
||||
270
client/internal/mapsync_test.go
Normal file
270
client/internal/mapsync_test.go
Normal file
@@ -0,0 +1,270 @@
|
||||
package internal
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
||||
)
|
||||
|
||||
// a config-only update arriving while a full map is still converging must keep the
|
||||
// pending map (so its remaining peer batches still apply); once converged or when the
|
||||
// pending target has no map, it replaces as usual.
|
||||
func TestMapStateManager_MergeTargetPreservesPendingMap(t *testing.T) {
|
||||
m := newMapStateManager(nil, nil, nil)
|
||||
|
||||
fullMap := &mgmProto.SyncResponse{NetworkMap: &mgmProto.NetworkMap{Serial: 5}}
|
||||
configOnly := &mgmProto.SyncResponse{NetbirdConfig: &mgmProto.NetbirdConfig{}}
|
||||
|
||||
// still converging the full map (targetGen > appliedGen): graft the map onto the
|
||||
// incoming config-only update instead of dropping it
|
||||
m.targetGen, m.appliedGen = 5, 4
|
||||
merged := m.mergeTarget(fullMap, configOnly)
|
||||
require.NotNil(t, merged.GetNetworkMap(), "pending map must be preserved")
|
||||
require.EqualValues(t, 5, merged.GetNetworkMap().GetSerial())
|
||||
require.NotNil(t, merged.GetNetbirdConfig(), "new config must be carried")
|
||||
require.NotSame(t, configOnly, merged, "must not mutate the received update in place")
|
||||
|
||||
// already converged (targetGen == appliedGen): nothing pending -> plain replace
|
||||
m.targetGen, m.appliedGen = 5, 5
|
||||
require.Same(t, configOnly, m.mergeTarget(fullMap, configOnly))
|
||||
|
||||
// a full map always replaces
|
||||
newFull := &mgmProto.SyncResponse{NetworkMap: &mgmProto.NetworkMap{Serial: 6}}
|
||||
m.targetGen, m.appliedGen = 5, 4
|
||||
require.Same(t, newFull, m.mergeTarget(fullMap, newFull))
|
||||
}
|
||||
|
||||
// converges over the bounded passes (apply returns more until the 3rd pass),
|
||||
// fires onConverged exactly once, then blocks (no further apply) until a new target.
|
||||
func TestMapStateManager_ConvergesThenStops(t *testing.T) {
|
||||
var passes int32
|
||||
var firstPasses int32
|
||||
converged := make(chan struct{}, 1)
|
||||
|
||||
apply := func(_ *mgmProto.SyncResponse, firstPass bool) (bool, error) {
|
||||
n := atomic.AddInt32(&passes, 1)
|
||||
if firstPass {
|
||||
atomic.AddInt32(&firstPasses, 1)
|
||||
}
|
||||
return n < 3, nil // more on pass 1 and 2, converge on pass 3
|
||||
}
|
||||
m := newMapStateManager(apply, nil, func(time.Duration) { converged <- struct{}{} })
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go m.run(ctx)
|
||||
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||
|
||||
select {
|
||||
case <-converged:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("manager did not converge")
|
||||
}
|
||||
require.EqualValues(t, 3, atomic.LoadInt32(&passes))
|
||||
require.EqualValues(t, 1, atomic.LoadInt32(&firstPasses), "firstPass true only on pass 1, false on re-runs of the same target")
|
||||
|
||||
// once converged the loop blocks: no further apply calls
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
require.EqualValues(t, 3, atomic.LoadInt32(&passes), "apply must not run after convergence")
|
||||
}
|
||||
|
||||
// persist runs once per received update (not per apply pass), regardless of how many
|
||||
// bounded passes that target takes to converge.
|
||||
func TestMapStateManager_PersistsOncePerUpdate(t *testing.T) {
|
||||
var passes, persists int32
|
||||
converged := make(chan struct{}, 1)
|
||||
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||
n := atomic.AddInt32(&passes, 1)
|
||||
return n < 3, nil // 3 passes for one target
|
||||
}
|
||||
persist := func(*mgmProto.SyncResponse) { atomic.AddInt32(&persists, 1) }
|
||||
m := newMapStateManager(apply, persist, func(time.Duration) { converged <- struct{}{} })
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go m.run(ctx)
|
||||
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||
select {
|
||||
case <-converged:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("did not converge")
|
||||
}
|
||||
require.EqualValues(t, 3, atomic.LoadInt32(&passes))
|
||||
require.EqualValues(t, 1, atomic.LoadInt32(&persists), "persist once per update, not per pass")
|
||||
}
|
||||
|
||||
// every update received from management is persisted — even one that is coalesced /
|
||||
// skipped from apply before it ever converges.
|
||||
func TestMapStateManager_PersistsEveryUpdateIncludingSkipped(t *testing.T) {
|
||||
release := make(chan struct{})
|
||||
var persists int32
|
||||
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||
<-release // hold the first apply so the second update coalesces/skips
|
||||
return false, nil
|
||||
}
|
||||
persist := func(*mgmProto.SyncResponse) { atomic.AddInt32(&persists, 1) }
|
||||
m := newMapStateManager(apply, persist, nil)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go m.run(ctx)
|
||||
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{})) // map1 -> apply blocks
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{})) // map2 supersedes map1 (skipped from apply)
|
||||
close(release)
|
||||
|
||||
// both updates persisted even though map1 is skipped from apply
|
||||
require.Eventually(t, func() bool { return atomic.LoadInt32(&persists) == 2 }, 2*time.Second, 10*time.Millisecond)
|
||||
}
|
||||
|
||||
// each map that is actually processed (converged before the next arrives) fires
|
||||
// onConverged exactly once — mirroring the legacy per-message handleSync timing.
|
||||
func TestMapStateManager_SignalsEachProcessedMap(t *testing.T) {
|
||||
converged := make(chan struct{}, 8)
|
||||
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||
return false, nil // converge in one pass
|
||||
}
|
||||
m := newMapStateManager(apply, nil, func(time.Duration) { converged <- struct{}{} })
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go m.run(ctx)
|
||||
|
||||
const maps = 3
|
||||
for i := 0; i < maps; i++ {
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||
select { // wait for this map to converge before sending the next (no coalescing)
|
||||
case <-converged:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatalf("map %d not signaled", i)
|
||||
}
|
||||
}
|
||||
|
||||
// no extra signals once the stream goes quiet
|
||||
select {
|
||||
case <-converged:
|
||||
t.Fatal("unexpected extra onConverged")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
// a map superseded before it converges is skipped: only the latest (processed) map
|
||||
// fires onConverged, not the skipped one.
|
||||
func TestMapStateManager_SkippedMapNotSignaled(t *testing.T) {
|
||||
release := make(chan struct{})
|
||||
var applies, converged atomic.Int32
|
||||
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||
applies.Add(1)
|
||||
<-release // hold the first apply in-flight so we can queue a newer target
|
||||
return false, nil
|
||||
}
|
||||
m := newMapStateManager(apply, nil, func(time.Duration) { converged.Add(1) })
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go m.run(ctx)
|
||||
|
||||
// map1 is picked up; its apply blocks on release
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||
require.Eventually(t, func() bool { return applies.Load() >= 1 }, 2*time.Second, 5*time.Millisecond)
|
||||
|
||||
// map2 supersedes map1 before it settled -> map1 is skipped
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||
close(release) // let both applies proceed
|
||||
|
||||
// only the processed (latest) map signals; the skipped one does not
|
||||
require.Eventually(t, func() bool { return converged.Load() == 1 }, 2*time.Second, 10*time.Millisecond)
|
||||
time.Sleep(150 * time.Millisecond)
|
||||
require.EqualValues(t, 1, converged.Load(), "skipped map must not fire onConverged")
|
||||
require.EqualValues(t, 2, applies.Load(), "both targets entered apply (map1 once, map2 once)")
|
||||
}
|
||||
|
||||
// an apply error drops the target: no retry of the same target, no onConverged,
|
||||
// the loop goes idle — and a fresh target is still applied afterwards.
|
||||
func TestMapStateManager_DropsTargetOnError(t *testing.T) {
|
||||
applied := make(chan struct{}, 8)
|
||||
var failNext atomic.Bool
|
||||
failNext.Store(true)
|
||||
|
||||
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||
applied <- struct{}{}
|
||||
if failNext.Load() {
|
||||
return false, errors.New("boom")
|
||||
}
|
||||
return false, nil // converge in one pass
|
||||
}
|
||||
var converged atomic.Int32
|
||||
m := newMapStateManager(apply, nil, func(time.Duration) { converged.Add(1) })
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go m.run(ctx)
|
||||
|
||||
// first target errors -> applied once, then dropped (no retry, no onConverged)
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||
select {
|
||||
case <-applied:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("errored target not applied")
|
||||
}
|
||||
select {
|
||||
case <-applied:
|
||||
t.Fatal("errored target must not be retried")
|
||||
case <-time.After(150 * time.Millisecond):
|
||||
}
|
||||
require.EqualValues(t, 0, converged.Load(), "onConverged must not fire on error")
|
||||
|
||||
// a new target is still processed normally and converges
|
||||
failNext.Store(false)
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||
select {
|
||||
case <-applied:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("new target after error not applied")
|
||||
}
|
||||
require.Eventually(t, func() bool { return converged.Load() == 1 }, 2*time.Second, 10*time.Millisecond)
|
||||
}
|
||||
|
||||
// a new target after convergence triggers a fresh apply; an idle (converged)
|
||||
// manager does not apply on its own.
|
||||
func TestMapStateManager_ReappliesOnNewTarget(t *testing.T) {
|
||||
applied := make(chan struct{}, 8)
|
||||
apply := func(_ *mgmProto.SyncResponse, _ bool) (bool, error) {
|
||||
applied <- struct{}{}
|
||||
return false, nil // converge in one pass
|
||||
}
|
||||
m := newMapStateManager(apply, nil, nil)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go m.run(ctx)
|
||||
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||
select {
|
||||
case <-applied:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("first target not applied")
|
||||
}
|
||||
|
||||
// converged → must stay idle (no spurious apply)
|
||||
select {
|
||||
case <-applied:
|
||||
t.Fatal("unexpected apply while idle/converged")
|
||||
case <-time.After(150 * time.Millisecond):
|
||||
}
|
||||
|
||||
require.NoError(t, m.SetTarget(&mgmProto.SyncResponse{}))
|
||||
select {
|
||||
case <-applied:
|
||||
case <-time.After(2 * time.Second):
|
||||
t.Fatal("new target not applied")
|
||||
}
|
||||
}
|
||||
@@ -101,6 +101,8 @@ type ConfigInput struct {
|
||||
|
||||
DNSLabels domain.List
|
||||
|
||||
LazyConnectionEnabled *bool
|
||||
|
||||
MTU *uint16
|
||||
}
|
||||
|
||||
@@ -178,9 +180,7 @@ type Config struct {
|
||||
|
||||
ClientCertKeyPair *tls.Certificate `json:"-"`
|
||||
|
||||
// LazyConnection is the MDM-managed lazy-connection override ("on"/"off"/"").
|
||||
// Runtime-only: re-derived from MDM policy on each load, never persisted.
|
||||
LazyConnection string `json:"-"`
|
||||
LazyConnectionEnabled bool
|
||||
|
||||
MTU uint16
|
||||
|
||||
@@ -632,6 +632,12 @@ func (config *Config) apply(input ConfigInput) (updated bool, err error) {
|
||||
updated = true
|
||||
}
|
||||
|
||||
if input.LazyConnectionEnabled != nil && *input.LazyConnectionEnabled != config.LazyConnectionEnabled {
|
||||
log.Infof("switching lazy connection to %t", *input.LazyConnectionEnabled)
|
||||
config.LazyConnectionEnabled = *input.LazyConnectionEnabled
|
||||
updated = true
|
||||
}
|
||||
|
||||
if input.MTU != nil && *input.MTU != config.MTU {
|
||||
log.Infof("updating MTU to %d (old value %d)", *input.MTU, config.MTU)
|
||||
config.MTU = *input.MTU
|
||||
@@ -722,11 +728,6 @@ func (config *Config) applyMDMPolicy(policy *mdm.Policy) {
|
||||
log.Warnf("MDM wireguard port %d out of range [1,65535]; keeping previous value", v)
|
||||
}
|
||||
}
|
||||
|
||||
if v, ok := policy.GetString(mdm.KeyLazyConnection); ok {
|
||||
config.LazyConnection = v
|
||||
logApplied(mdm.KeyLazyConnection, v)
|
||||
}
|
||||
}
|
||||
|
||||
// parseURL parses and validates the URL for the named service. The URL
|
||||
|
||||
@@ -38,7 +38,7 @@ func GetEnvKeyNBForceRelay() string {
|
||||
|
||||
// GetEnvKeyNBLazyConn Exports the environment variable for the iOS client
|
||||
func GetEnvKeyNBLazyConn() string {
|
||||
return lazyconn.EnvLazyConn
|
||||
return lazyconn.EnvEnableLazyConn
|
||||
}
|
||||
|
||||
// GetEnvKeyNBInactivityThreshold Exports the environment variable for the iOS client
|
||||
|
||||
@@ -41,10 +41,6 @@ const (
|
||||
// construction — only one mode can be set at a time.
|
||||
KeySplitTunnelMode = "splitTunnelMode"
|
||||
KeySplitTunnelApps = "splitTunnelApps"
|
||||
|
||||
// KeyLazyConnection forces the lazy-connection feature on or off, overriding
|
||||
// the management feature flag. Values: "on" / "off" (absent = defer to management).
|
||||
KeyLazyConnection = "lazyConnection"
|
||||
)
|
||||
|
||||
// Split-tunnel mode literals (KeySplitTunnelMode values).
|
||||
@@ -71,6 +67,7 @@ var boolStringLiterals = map[string]bool{
|
||||
"no": false,
|
||||
}
|
||||
|
||||
|
||||
// Policy holds MDM-managed settings read from the platform source. A nil or
|
||||
// empty Policy means no enforcement is active.
|
||||
type Policy struct {
|
||||
|
||||
@@ -152,6 +152,7 @@ func (s *Server) restartEngineForMDMLocked() error {
|
||||
s.config = config
|
||||
s.statusRecorder.UpdateManagementAddress(config.ManagementURL.String())
|
||||
s.statusRecorder.UpdateRosenpass(config.RosenpassEnabled, config.RosenpassPermissive)
|
||||
s.statusRecorder.UpdateLazyConnection(config.LazyConnectionEnabled)
|
||||
|
||||
ctx, cancel := context.WithCancel(s.rootCtx)
|
||||
s.actCancel = cancel
|
||||
@@ -304,6 +305,7 @@ func setConfigRequestHasConfigOverrides(msg *proto.SetConfigRequest) bool {
|
||||
msg.DisableFirewall != nil ||
|
||||
msg.BlockLanAccess != nil ||
|
||||
msg.DisableNotifications != nil ||
|
||||
msg.LazyConnectionEnabled != nil ||
|
||||
msg.BlockInbound != nil ||
|
||||
msg.DisableIpv6 != nil ||
|
||||
msg.EnableSSHRoot != nil ||
|
||||
@@ -346,6 +348,7 @@ func loginRequestHasConfigOverrides(msg *proto.LoginRequest) bool {
|
||||
msg.BlockLanAccess != nil ||
|
||||
msg.DisableNotifications != nil ||
|
||||
len(msg.DnsLabels) > 0 || msg.CleanDNSLabels ||
|
||||
msg.LazyConnectionEnabled != nil ||
|
||||
msg.BlockInbound != nil
|
||||
}
|
||||
|
||||
|
||||
@@ -214,6 +214,7 @@ func (s *Server) Start() error {
|
||||
|
||||
s.statusRecorder.UpdateManagementAddress(config.ManagementURL.String())
|
||||
s.statusRecorder.UpdateRosenpass(config.RosenpassEnabled, config.RosenpassPermissive)
|
||||
s.statusRecorder.UpdateLazyConnection(config.LazyConnectionEnabled)
|
||||
|
||||
if s.sessionWatcher == nil {
|
||||
s.sessionWatcher = internal.NewSessionWatcher(s.rootCtx, s.statusRecorder)
|
||||
@@ -462,6 +463,7 @@ func (s *Server) setConfigInputFromRequest(msg *proto.SetConfigRequest) (profile
|
||||
config.DisableFirewall = msg.DisableFirewall
|
||||
config.BlockLANAccess = msg.BlockLanAccess
|
||||
config.DisableNotifications = msg.DisableNotifications
|
||||
config.LazyConnectionEnabled = msg.LazyConnectionEnabled
|
||||
config.BlockInbound = msg.BlockInbound
|
||||
config.DisableIPv6 = msg.DisableIpv6
|
||||
config.EnableSSHRoot = msg.EnableSSHRoot
|
||||
@@ -1645,6 +1647,7 @@ func (s *Server) GetConfig(ctx context.Context, req *proto.GetConfigRequest) (*p
|
||||
ServerSSHAllowed: *cfg.ServerSSHAllowed,
|
||||
RosenpassEnabled: cfg.RosenpassEnabled,
|
||||
RosenpassPermissive: cfg.RosenpassPermissive,
|
||||
LazyConnectionEnabled: cfg.LazyConnectionEnabled,
|
||||
BlockInbound: cfg.BlockInbound,
|
||||
DisableNotifications: disableNotifications,
|
||||
NetworkMonitor: networkMonitor,
|
||||
|
||||
@@ -69,41 +69,43 @@ func TestSetConfig_AllFieldsSaved(t *testing.T) {
|
||||
disableFirewall := true
|
||||
blockLANAccess := true
|
||||
disableNotifications := true
|
||||
lazyConnectionEnabled := true
|
||||
blockInbound := true
|
||||
disableIPv6 := true
|
||||
mtu := int64(1280)
|
||||
sshJWTCacheTTL := int32(300)
|
||||
|
||||
req := &proto.SetConfigRequest{
|
||||
ProfileName: profName,
|
||||
Username: currUser.Username,
|
||||
ManagementUrl: "https://new-api.netbird.io:443",
|
||||
AdminURL: "https://new-admin.netbird.io",
|
||||
RosenpassEnabled: &rosenpassEnabled,
|
||||
RosenpassPermissive: &rosenpassPermissive,
|
||||
ServerSSHAllowed: &serverSSHAllowed,
|
||||
InterfaceName: &interfaceName,
|
||||
WireguardPort: &wireguardPort,
|
||||
OptionalPreSharedKey: &preSharedKey,
|
||||
DisableAutoConnect: &disableAutoConnect,
|
||||
NetworkMonitor: &networkMonitor,
|
||||
DisableClientRoutes: &disableClientRoutes,
|
||||
DisableServerRoutes: &disableServerRoutes,
|
||||
DisableDns: &disableDNS,
|
||||
DisableFirewall: &disableFirewall,
|
||||
BlockLanAccess: &blockLANAccess,
|
||||
DisableNotifications: &disableNotifications,
|
||||
BlockInbound: &blockInbound,
|
||||
DisableIpv6: &disableIPv6,
|
||||
NatExternalIPs: []string{"1.2.3.4", "5.6.7.8"},
|
||||
CleanNATExternalIPs: false,
|
||||
CustomDNSAddress: []byte("1.1.1.1:53"),
|
||||
ExtraIFaceBlacklist: []string{"eth1", "eth2"},
|
||||
DnsLabels: []string{"label1", "label2"},
|
||||
CleanDNSLabels: false,
|
||||
DnsRouteInterval: durationpb.New(2 * time.Minute),
|
||||
Mtu: &mtu,
|
||||
SshJWTCacheTTL: &sshJWTCacheTTL,
|
||||
ProfileName: profName,
|
||||
Username: currUser.Username,
|
||||
ManagementUrl: "https://new-api.netbird.io:443",
|
||||
AdminURL: "https://new-admin.netbird.io",
|
||||
RosenpassEnabled: &rosenpassEnabled,
|
||||
RosenpassPermissive: &rosenpassPermissive,
|
||||
ServerSSHAllowed: &serverSSHAllowed,
|
||||
InterfaceName: &interfaceName,
|
||||
WireguardPort: &wireguardPort,
|
||||
OptionalPreSharedKey: &preSharedKey,
|
||||
DisableAutoConnect: &disableAutoConnect,
|
||||
NetworkMonitor: &networkMonitor,
|
||||
DisableClientRoutes: &disableClientRoutes,
|
||||
DisableServerRoutes: &disableServerRoutes,
|
||||
DisableDns: &disableDNS,
|
||||
DisableFirewall: &disableFirewall,
|
||||
BlockLanAccess: &blockLANAccess,
|
||||
DisableNotifications: &disableNotifications,
|
||||
LazyConnectionEnabled: &lazyConnectionEnabled,
|
||||
BlockInbound: &blockInbound,
|
||||
DisableIpv6: &disableIPv6,
|
||||
NatExternalIPs: []string{"1.2.3.4", "5.6.7.8"},
|
||||
CleanNATExternalIPs: false,
|
||||
CustomDNSAddress: []byte("1.1.1.1:53"),
|
||||
ExtraIFaceBlacklist: []string{"eth1", "eth2"},
|
||||
DnsLabels: []string{"label1", "label2"},
|
||||
CleanDNSLabels: false,
|
||||
DnsRouteInterval: durationpb.New(2 * time.Minute),
|
||||
Mtu: &mtu,
|
||||
SshJWTCacheTTL: &sshJWTCacheTTL,
|
||||
}
|
||||
|
||||
_, err = s.SetConfig(ctx, req)
|
||||
@@ -138,6 +140,7 @@ func TestSetConfig_AllFieldsSaved(t *testing.T) {
|
||||
require.Equal(t, blockLANAccess, cfg.BlockLANAccess)
|
||||
require.NotNil(t, cfg.DisableNotifications)
|
||||
require.Equal(t, disableNotifications, *cfg.DisableNotifications)
|
||||
require.Equal(t, lazyConnectionEnabled, cfg.LazyConnectionEnabled)
|
||||
require.Equal(t, blockInbound, cfg.BlockInbound)
|
||||
require.Equal(t, disableIPv6, cfg.DisableIPv6)
|
||||
require.Equal(t, []string{"1.2.3.4", "5.6.7.8"}, cfg.NATExternalIPs)
|
||||
@@ -161,14 +164,13 @@ func verifyAllFieldsCovered(t *testing.T, req *proto.SetConfigRequest) {
|
||||
t.Helper()
|
||||
|
||||
metadataFields := map[string]bool{
|
||||
"state": true, // protobuf internal
|
||||
"sizeCache": true, // protobuf internal
|
||||
"unknownFields": true, // protobuf internal
|
||||
"Username": true, // metadata
|
||||
"ProfileName": true, // metadata
|
||||
"CleanNATExternalIPs": true, // control flag for clearing
|
||||
"CleanDNSLabels": true, // control flag for clearing
|
||||
"LazyConnectionEnabled": true, // deprecated: proto field retained for compat, no longer applied
|
||||
"state": true, // protobuf internal
|
||||
"sizeCache": true, // protobuf internal
|
||||
"unknownFields": true, // protobuf internal
|
||||
"Username": true, // metadata
|
||||
"ProfileName": true, // metadata
|
||||
"CleanNATExternalIPs": true, // control flag for clearing
|
||||
"CleanDNSLabels": true, // control flag for clearing
|
||||
}
|
||||
|
||||
expectedFields := map[string]bool{
|
||||
@@ -188,6 +190,7 @@ func verifyAllFieldsCovered(t *testing.T, req *proto.SetConfigRequest) {
|
||||
"DisableFirewall": true,
|
||||
"BlockLanAccess": true,
|
||||
"DisableNotifications": true,
|
||||
"LazyConnectionEnabled": true,
|
||||
"BlockInbound": true,
|
||||
"DisableIpv6": true,
|
||||
"NatExternalIPs": true,
|
||||
@@ -249,6 +252,7 @@ func TestCLIFlags_MappedToSetConfig(t *testing.T) {
|
||||
"block-lan-access": "BlockLanAccess",
|
||||
"block-inbound": "BlockInbound",
|
||||
"disable-ipv6": "DisableIpv6",
|
||||
"enable-lazy-connection": "LazyConnectionEnabled",
|
||||
"external-ip-map": "NatExternalIPs",
|
||||
"dns-resolver-address": "CustomDNSAddress",
|
||||
"extra-iface-blacklist": "ExtraIFaceBlacklist",
|
||||
@@ -265,8 +269,7 @@ func TestCLIFlags_MappedToSetConfig(t *testing.T) {
|
||||
|
||||
// SetConfigRequest fields that don't have CLI flags (settable only via UI or other means).
|
||||
fieldsWithoutCLIFlags := map[string]bool{
|
||||
"DisableNotifications": true, // Only settable via UI
|
||||
"LazyConnectionEnabled": true, // deprecated: no longer settable (managed by server + NB_LAZY_CONN)
|
||||
"DisableNotifications": true, // Only settable via UI
|
||||
}
|
||||
|
||||
// Get all SetConfigRequest fields to verify our map is complete.
|
||||
|
||||
@@ -74,6 +74,8 @@ type Info struct {
|
||||
BlockInbound bool
|
||||
DisableIPv6 bool
|
||||
|
||||
LazyConnectionEnabled bool
|
||||
|
||||
EnableSSHRoot bool
|
||||
EnableSSHSFTP bool
|
||||
EnableSSHLocalPortForwarding bool
|
||||
@@ -85,7 +87,7 @@ func (i *Info) SetFlags(
|
||||
rosenpassEnabled, rosenpassPermissive bool,
|
||||
serverSSHAllowed *bool,
|
||||
disableClientRoutes, disableServerRoutes,
|
||||
disableDNS, disableFirewall, blockLANAccess, blockInbound, disableIPv6 bool,
|
||||
disableDNS, disableFirewall, blockLANAccess, blockInbound, disableIPv6, lazyConnectionEnabled bool,
|
||||
enableSSHRoot, enableSSHSFTP, enableSSHLocalPortForwarding, enableSSHRemotePortForwarding *bool,
|
||||
disableSSHAuth *bool,
|
||||
) {
|
||||
@@ -103,6 +105,8 @@ func (i *Info) SetFlags(
|
||||
i.BlockInbound = blockInbound
|
||||
i.DisableIPv6 = disableIPv6
|
||||
|
||||
i.LazyConnectionEnabled = lazyConnectionEnabled
|
||||
|
||||
if enableSSHRoot != nil {
|
||||
i.EnableSSHRoot = *enableSSHRoot
|
||||
}
|
||||
|
||||
@@ -266,6 +266,7 @@ type serviceClient struct {
|
||||
mAllowSSH *systray.MenuItem
|
||||
mAutoConnect *systray.MenuItem
|
||||
mEnableRosenpass *systray.MenuItem
|
||||
mLazyConnEnabled *systray.MenuItem
|
||||
mBlockInbound *systray.MenuItem
|
||||
mNotifications *systray.MenuItem
|
||||
mAdvancedSettings *systray.MenuItem
|
||||
@@ -335,11 +336,11 @@ type serviceClient struct {
|
||||
// mNetworks + mExitNode submenu items. Combines features.DisableNetworks
|
||||
// AND s.connected — both must be true for the menus to be active.
|
||||
// Zero value (false) matches the Disable() call at AddMenuItem time.
|
||||
networksMenuEnabled bool
|
||||
showNetworks bool
|
||||
wNetworks fyne.Window
|
||||
wProfiles fyne.Window
|
||||
wQuickActions fyne.Window
|
||||
networksMenuEnabled bool
|
||||
showNetworks bool
|
||||
wNetworks fyne.Window
|
||||
wProfiles fyne.Window
|
||||
wQuickActions fyne.Window
|
||||
|
||||
eventManager *event.Manager
|
||||
|
||||
@@ -1093,6 +1094,7 @@ func (s *serviceClient) onTrayReady() {
|
||||
s.mAllowSSH = s.mSettings.AddSubMenuItemCheckbox("Allow SSH", allowSSHMenuDescr, false)
|
||||
s.mAutoConnect = s.mSettings.AddSubMenuItemCheckbox("Connect on Startup", autoConnectMenuDescr, false)
|
||||
s.mEnableRosenpass = s.mSettings.AddSubMenuItemCheckbox("Enable Quantum-Resistance", quantumResistanceMenuDescr, false)
|
||||
s.mLazyConnEnabled = s.mSettings.AddSubMenuItemCheckbox("Enable Lazy Connections", lazyConnMenuDescr, false)
|
||||
s.mBlockInbound = s.mSettings.AddSubMenuItemCheckbox("Block Inbound Connections", blockInboundMenuDescr, false)
|
||||
s.mNotifications = s.mSettings.AddSubMenuItemCheckbox("Notifications", notificationsMenuDescr, false)
|
||||
s.mSettings.AddSeparator()
|
||||
@@ -1576,6 +1578,7 @@ func protoConfigToConfig(cfg *proto.GetConfigResponse) *profilemanager.Config {
|
||||
config.RosenpassEnabled = cfg.RosenpassEnabled
|
||||
config.RosenpassPermissive = cfg.RosenpassPermissive
|
||||
config.DisableNotifications = &cfg.DisableNotifications
|
||||
config.LazyConnectionEnabled = cfg.LazyConnectionEnabled
|
||||
config.BlockInbound = cfg.BlockInbound
|
||||
config.NetworkMonitor = &cfg.NetworkMonitor
|
||||
config.DisableDNS = cfg.DisableDns
|
||||
@@ -1679,6 +1682,12 @@ func (s *serviceClient) loadSettings() {
|
||||
s.mEnableRosenpass.Uncheck()
|
||||
}
|
||||
|
||||
if cfg.LazyConnectionEnabled {
|
||||
s.mLazyConnEnabled.Check()
|
||||
} else {
|
||||
s.mLazyConnEnabled.Uncheck()
|
||||
}
|
||||
|
||||
if cfg.BlockInbound {
|
||||
s.mBlockInbound.Check()
|
||||
} else {
|
||||
@@ -1824,6 +1833,7 @@ func (s *serviceClient) updateConfig() error {
|
||||
disableAutoStart := !s.mAutoConnect.Checked()
|
||||
sshAllowed := s.mAllowSSH.Checked()
|
||||
rosenpassEnabled := s.mEnableRosenpass.Checked()
|
||||
lazyConnectionEnabled := s.mLazyConnEnabled.Checked()
|
||||
blockInbound := s.mBlockInbound.Checked()
|
||||
notificationsDisabled := !s.mNotifications.Checked()
|
||||
|
||||
@@ -1846,13 +1856,14 @@ func (s *serviceClient) updateConfig() error {
|
||||
}
|
||||
|
||||
req := proto.SetConfigRequest{
|
||||
ProfileName: activeProf.ID.String(),
|
||||
Username: currUser.Username,
|
||||
DisableAutoConnect: &disableAutoStart,
|
||||
ServerSSHAllowed: &sshAllowed,
|
||||
RosenpassEnabled: &rosenpassEnabled,
|
||||
BlockInbound: &blockInbound,
|
||||
DisableNotifications: ¬ificationsDisabled,
|
||||
ProfileName: activeProf.ID.String(),
|
||||
Username: currUser.Username,
|
||||
DisableAutoConnect: &disableAutoStart,
|
||||
ServerSSHAllowed: &sshAllowed,
|
||||
RosenpassEnabled: &rosenpassEnabled,
|
||||
LazyConnectionEnabled: &lazyConnectionEnabled,
|
||||
BlockInbound: &blockInbound,
|
||||
DisableNotifications: ¬ificationsDisabled,
|
||||
}
|
||||
|
||||
if _, err := conn.SetConfig(s.ctx, &req); err != nil {
|
||||
|
||||
@@ -4,6 +4,7 @@ const (
|
||||
allowSSHMenuDescr = "Allow SSH connections"
|
||||
autoConnectMenuDescr = "Connect automatically when the service starts"
|
||||
quantumResistanceMenuDescr = "Enable post-quantum security via Rosenpass"
|
||||
lazyConnMenuDescr = "[Experimental] Enable lazy connections"
|
||||
blockInboundMenuDescr = "Block inbound connections to the local machine and routed networks"
|
||||
notificationsMenuDescr = "Enable notifications"
|
||||
advancedSettingsMenuDescr = "Advanced settings of the application"
|
||||
|
||||
@@ -43,6 +43,8 @@ func (h *eventHandler) listen(ctx context.Context) {
|
||||
h.handleAutoConnectClick()
|
||||
case <-h.client.mEnableRosenpass.ClickedCh:
|
||||
h.handleRosenpassClick()
|
||||
case <-h.client.mLazyConnEnabled.ClickedCh:
|
||||
h.handleLazyConnectionClick()
|
||||
case <-h.client.mBlockInbound.ClickedCh:
|
||||
h.handleBlockInboundClick()
|
||||
case <-h.client.mAdvancedSettings.ClickedCh:
|
||||
@@ -150,6 +152,15 @@ func (h *eventHandler) handleRosenpassClick() {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *eventHandler) handleLazyConnectionClick() {
|
||||
h.toggleCheckbox(h.client.mLazyConnEnabled)
|
||||
if err := h.updateConfigWithErr(); err != nil {
|
||||
h.toggleCheckbox(h.client.mLazyConnEnabled) // revert checkbox state on error
|
||||
log.Errorf("failed to update config: %v", err)
|
||||
h.client.notifier.Send("Error", "Failed to update lazy connection settings")
|
||||
}
|
||||
}
|
||||
|
||||
func (h *eventHandler) handleBlockInboundClick() {
|
||||
h.toggleCheckbox(h.client.mBlockInbound)
|
||||
if err := h.updateConfigWithErr(); err != nil {
|
||||
|
||||
@@ -1030,6 +1030,8 @@ func infoToMetaData(info *system.Info) *proto.PeerSystemMeta {
|
||||
BlockLANAccess: info.BlockLANAccess,
|
||||
BlockInbound: info.BlockInbound,
|
||||
DisableIPv6: info.DisableIPv6,
|
||||
|
||||
LazyConnectionEnabled: info.LazyConnectionEnabled,
|
||||
},
|
||||
|
||||
Capabilities: peerCapabilities(*info),
|
||||
|
||||
Reference in New Issue
Block a user