diff --git a/client/firewall/uspfilter/nat.go b/client/firewall/uspfilter/nat.go index 13567872e..597f892cf 100644 --- a/client/firewall/uspfilter/nat.go +++ b/client/firewall/uspfilter/nat.go @@ -358,9 +358,9 @@ func incrementalUpdate(oldChecksum uint16, oldBytes, newBytes []byte) uint16 { // Fast path for IPv4 addresses (4 bytes) - most common case if len(oldBytes) == 4 && len(newBytes) == 4 { sum += uint32(^binary.BigEndian.Uint16(oldBytes[0:2])) - sum += uint32(^binary.BigEndian.Uint16(oldBytes[2:4])) + sum += uint32(^binary.BigEndian.Uint16(oldBytes[2:4])) //nolint:gosec // length checked above sum += uint32(binary.BigEndian.Uint16(newBytes[0:2])) - sum += uint32(binary.BigEndian.Uint16(newBytes[2:4])) + sum += uint32(binary.BigEndian.Uint16(newBytes[2:4])) //nolint:gosec // length checked above } else { // Fallback for other lengths for i := 0; i < len(oldBytes)-1; i += 2 { diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index eb455431d..af6ab3f83 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -410,7 +410,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) } -func (conn *Conn) onICEStateDisconnected() { +func (conn *Conn) onICEStateDisconnected(sessionChanged bool) { conn.mu.Lock() defer conn.mu.Unlock() @@ -430,6 +430,10 @@ func (conn *Conn) onICEStateDisconnected() { if conn.isReadyToUpgrade() { conn.Log.Infof("ICE disconnected, set Relay to active connection") conn.dumpState.SwitchToRelay() + if sessionChanged { + conn.resetEndpoint() + } + conn.wgProxyRelay.Work() presharedKey := conn.presharedKey(conn.rosenpassRemoteKey) @@ -757,6 +761,17 @@ func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) { return wgProxy, nil } +func (conn *Conn) resetEndpoint() { + if !isController(conn.config) { + return + } + conn.Log.Infof("reset wg endpoint") + conn.wgWatcher.Reset() + if err := conn.endpointUpdater.RemoveEndpointAddress(); err != nil { + conn.Log.Warnf("failed to remove endpoint address before update: %v", err) + } +} + func (conn *Conn) isReadyToUpgrade() bool { return conn.wgProxyRelay != nil && conn.currentConnPriority != conntype.Relay } diff --git a/client/internal/peer/endpoint.go b/client/internal/peer/endpoint.go index 52d66159c..372f33ec6 100644 --- a/client/internal/peer/endpoint.go +++ b/client/internal/peer/endpoint.go @@ -66,6 +66,10 @@ func (e *EndpointUpdater) RemoveWgPeer() error { return e.wgConfig.WgInterface.RemovePeer(e.wgConfig.RemoteKey) } +func (e *EndpointUpdater) RemoveEndpointAddress() error { + return e.wgConfig.WgInterface.RemoveEndpointAddress(e.wgConfig.RemoteKey) +} + func (e *EndpointUpdater) waitForCloseTheDelayedUpdate() { if e.cancelFunc == nil { return diff --git a/client/internal/peer/wg_watcher.go b/client/internal/peer/wg_watcher.go index d40ec7a80..799a9375e 100644 --- a/client/internal/peer/wg_watcher.go +++ b/client/internal/peer/wg_watcher.go @@ -32,6 +32,8 @@ type WGWatcher struct { enabled bool muEnabled sync.RWMutex + + resetCh chan struct{} } func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump) *WGWatcher { @@ -40,6 +42,7 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin wgIfaceStater: wgIfaceStater, peerKey: peerKey, stateDump: stateDump, + resetCh: make(chan struct{}, 1), } } @@ -76,6 +79,15 @@ func (w *WGWatcher) IsEnabled() bool { return w.enabled } +// Reset signals the watcher that the WireGuard peer has been reset and a new +// handshake is expected. This restarts the handshake timeout from scratch. +func (w *WGWatcher) Reset() { + select { + case w.resetCh <- struct{}{}: + default: + } +} + // wgStateCheck help to check the state of the WireGuard handshake and relay connection func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn func(), enabledTime time.Time, initialHandshake time.Time) { w.log.Infof("WireGuard watcher started") @@ -105,6 +117,12 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, onDisconnectedFn w.stateDump.WGcheckSuccess() w.log.Debugf("WireGuard watcher reset timer: %v", resetTime) + case <-w.resetCh: + w.log.Infof("WireGuard watcher received peer reset, restarting handshake timeout") + lastHandshake = time.Time{} + enabledTime = time.Now() + timer.Stop() + timer.Reset(wgHandshakeOvertime) case <-ctx.Done(): w.log.Infof("WireGuard watcher stopped") return diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 464f57bff..edd70fb20 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -52,8 +52,9 @@ type WorkerICE struct { // increase by one when disconnecting the agent // with it the remote peer can discard the already deprecated offer/answer // Without it the remote peer may recreate a workable ICE connection - sessionID ICESessionID - muxAgent sync.Mutex + sessionID ICESessionID + remoteSessionChanged bool + muxAgent sync.Mutex localUfrag string localPwd string @@ -106,6 +107,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { return } w.log.Debugf("agent already exists, recreate the connection") + w.remoteSessionChanged = true w.agentDialerCancel() if w.agent != nil { if err := w.agent.Close(); err != nil { @@ -306,13 +308,17 @@ func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent w.conn.onICEConnectionIsReady(selectedPriority(pair), ci) } -func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.CancelFunc) { +func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.CancelFunc) bool { cancel() if err := agent.Close(); err != nil { w.log.Warnf("failed to close ICE agent: %s", err) } w.muxAgent.Lock() + defer w.muxAgent.Unlock() + + sessionChanged := w.remoteSessionChanged + w.remoteSessionChanged = false if w.agent == agent { // consider to remove from here and move to the OnNewOffer @@ -325,7 +331,7 @@ func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.C w.agentConnecting = false w.remoteSessionID = "" } - w.muxAgent.Unlock() + return sessionChanged } func (w *WorkerICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) { @@ -426,11 +432,11 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia // ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to // notify the conn.onICEStateDisconnected changes to update the current used priority - w.closeAgent(agent, dialerCancel) + sessionChanged := w.closeAgent(agent, dialerCancel) if w.lastKnownState == ice.ConnectionStateConnected { w.lastKnownState = ice.ConnectionStateDisconnected - w.conn.onICEStateDisconnected() + w.conn.onICEStateDisconnected(sessionChanged) } default: return diff --git a/go.mod b/go.mod index ff9105761..4a8bc3f2b 100644 --- a/go.mod +++ b/go.mod @@ -83,6 +83,7 @@ require ( github.com/pion/stun/v3 v3.1.0 github.com/pion/transport/v3 v3.1.1 github.com/pion/turn/v3 v3.0.1 + github.com/pires/go-proxyproto v0.11.0 github.com/pkg/sftp v1.13.9 github.com/prometheus/client_golang v1.23.2 github.com/quic-go/quic-go v0.55.0 diff --git a/go.sum b/go.sum index 23a12ff68..2a9ad6d70 100644 --- a/go.sum +++ b/go.sum @@ -474,6 +474,8 @@ github.com/pion/turn/v3 v3.0.1 h1:wLi7BTQr6/Q20R0vt/lHbjv6y4GChFtC33nkYbasoT8= github.com/pion/turn/v3 v3.0.1/go.mod h1:MrJDKgqryDyWy1/4NT9TWfXWGMC7UHT6pJIv1+gMeNE= github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc= github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8= +github.com/pires/go-proxyproto v0.11.0 h1:gUQpS85X/VJMdUsYyEgyn59uLJvGqPhJV5YvG68wXH4= +github.com/pires/go-proxyproto v0.11.0/go.mod h1:ZKAAyp3cgy5Y5Mo4n9AlScrkCZwUy0g3Jf+slqQVcuU= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/idp/dex/provider.go b/idp/dex/provider.go index 6c608dbf5..68fe48486 100644 --- a/idp/dex/provider.go +++ b/idp/dex/provider.go @@ -99,15 +99,16 @@ func NewProvider(ctx context.Context, config *Config) (*Provider, error) { // Build Dex server config - use Dex's types directly dexConfig := server.Config{ - Issuer: issuer, - Storage: stor, - SkipApprovalScreen: true, - SupportedResponseTypes: []string{"code"}, - Logger: logger, - PrometheusRegistry: prometheus.NewRegistry(), - RotateKeysAfter: 6 * time.Hour, - IDTokensValidFor: 24 * time.Hour, - RefreshTokenPolicy: refreshPolicy, + Issuer: issuer, + Storage: stor, + SkipApprovalScreen: true, + SupportedResponseTypes: []string{"code"}, + ContinueOnConnectorFailure: true, + Logger: logger, + PrometheusRegistry: prometheus.NewRegistry(), + RotateKeysAfter: 6 * time.Hour, + IDTokensValidFor: 24 * time.Hour, + RefreshTokenPolicy: refreshPolicy, Web: server.WebConfig{ Issuer: "NetBird", }, @@ -260,6 +261,7 @@ func buildDexConfig(yamlConfig *YAMLConfig, stor storage.Storage, logger *slog.L if len(cfg.SupportedResponseTypes) == 0 { cfg.SupportedResponseTypes = []string{"code"} } + cfg.ContinueOnConnectorFailure = true return cfg } diff --git a/idp/dex/provider_test.go b/idp/dex/provider_test.go index bc34e592f..bd2f676fb 100644 --- a/idp/dex/provider_test.go +++ b/idp/dex/provider_test.go @@ -2,6 +2,7 @@ package dex import ( "context" + "log/slog" "os" "path/filepath" "testing" @@ -195,3 +196,64 @@ enablePasswordDB: true t.Logf("User lookup successful: rawID=%s, connectorID=%s", rawID, connID) } + +func TestNewProvider_ContinueOnConnectorFailure(t *testing.T) { + ctx := context.Background() + + tmpDir, err := os.MkdirTemp("", "dex-connector-failure-*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + config := &Config{ + Issuer: "http://localhost:5556/dex", + Port: 5556, + DataDir: tmpDir, + } + + provider, err := NewProvider(ctx, config) + require.NoError(t, err) + defer func() { _ = provider.Stop(ctx) }() + + // The provider should have started successfully even though + // ContinueOnConnectorFailure is an internal Dex config field. + // We verify the provider is functional by performing a basic operation. + assert.NotNil(t, provider.dexServer) + assert.NotNil(t, provider.storage) +} + +func TestBuildDexConfig_ContinueOnConnectorFailure(t *testing.T) { + tmpDir, err := os.MkdirTemp("", "dex-build-config-*") + require.NoError(t, err) + defer os.RemoveAll(tmpDir) + + yamlContent := ` +issuer: http://localhost:5556/dex +storage: + type: sqlite3 + config: + file: ` + filepath.Join(tmpDir, "dex.db") + ` +web: + http: 127.0.0.1:5556 +enablePasswordDB: true +` + configPath := filepath.Join(tmpDir, "config.yaml") + err = os.WriteFile(configPath, []byte(yamlContent), 0644) + require.NoError(t, err) + + yamlConfig, err := LoadConfig(configPath) + require.NoError(t, err) + + ctx := context.Background() + stor, err := yamlConfig.Storage.OpenStorage(slog.New(slog.NewTextHandler(os.Stderr, nil))) + require.NoError(t, err) + defer stor.Close() + + err = initializeStorage(ctx, stor, yamlConfig) + require.NoError(t, err) + + logger := slog.New(slog.NewTextHandler(os.Stderr, nil)) + cfg := buildDexConfig(yamlConfig, stor, logger) + + assert.True(t, cfg.ContinueOnConnectorFailure, + "buildDexConfig must set ContinueOnConnectorFailure to true so management starts even if an external IdP is down") +} diff --git a/infrastructure_files/getting-started.sh b/infrastructure_files/getting-started.sh index 864e9af32..dc5d53504 100755 --- a/infrastructure_files/getting-started.sh +++ b/infrastructure_files/getting-started.sh @@ -329,6 +329,9 @@ initialize_default_values() { BIND_LOCALHOST_ONLY="true" EXTERNAL_PROXY_NETWORK="" + # Traefik static IP within the internal bridge network + TRAEFIK_IP="172.30.0.10" + # NetBird Proxy configuration ENABLE_PROXY="false" PROXY_DOMAIN="" @@ -393,7 +396,7 @@ check_existing_installation() { echo "Generated files already exist, if you want to reinitialize the environment, please remove them first." echo "You can use the following commands:" echo " $DOCKER_COMPOSE_COMMAND down --volumes # to remove all containers and volumes" - echo " rm -f docker-compose.yml dashboard.env config.yaml proxy.env nginx-netbird.conf caddyfile-netbird.txt npm-advanced-config.txt" + echo " rm -f docker-compose.yml dashboard.env config.yaml proxy.env traefik-dynamic.yaml nginx-netbird.conf caddyfile-netbird.txt npm-advanced-config.txt" echo "Be aware that this will remove all data from the database, and you will have to reconfigure the dashboard." exit 1 fi @@ -412,6 +415,8 @@ generate_configuration_files() { # This will be overwritten with the actual token after netbird-server starts echo "# Placeholder - will be updated with token after netbird-server starts" > proxy.env echo "NB_PROXY_TOKEN=placeholder" >> proxy.env + # TCP ServersTransport for PROXY protocol v2 to the proxy backend + render_traefik_dynamic > traefik-dynamic.yaml fi ;; 1) @@ -559,10 +564,14 @@ init_environment() { ############################################ render_docker_compose_traefik_builtin() { - # Generate proxy service section if enabled + # Generate proxy service section and Traefik dynamic config if enabled local proxy_service="" local proxy_volumes="" + local traefik_file_provider="" + local traefik_dynamic_volume="" if [[ "$ENABLE_PROXY" == "true" ]]; then + traefik_file_provider=' - "--providers.file.filename=/etc/traefik/dynamic.yaml"' + traefik_dynamic_volume=" - ./traefik-dynamic.yaml:/etc/traefik/dynamic.yaml:ro" proxy_service=" # NetBird Proxy - exposes internal resources to the internet proxy: @@ -570,7 +579,7 @@ render_docker_compose_traefik_builtin() { container_name: netbird-proxy # Hairpin NAT fix: route domain back to traefik's static IP within Docker extra_hosts: - - \"$NETBIRD_DOMAIN:172.30.0.10\" + - \"$NETBIRD_DOMAIN:$TRAEFIK_IP\" ports: - 51820:51820/udp restart: unless-stopped @@ -590,6 +599,7 @@ render_docker_compose_traefik_builtin() { - traefik.tcp.routers.proxy-passthrough.service=proxy-tls - traefik.tcp.routers.proxy-passthrough.priority=1 - traefik.tcp.services.proxy-tls.loadbalancer.server.port=8443 + - traefik.tcp.services.proxy-tls.loadbalancer.serverstransport=pp-v2@file logging: driver: \"json-file\" options: @@ -609,7 +619,7 @@ services: restart: unless-stopped networks: netbird: - ipv4_address: 172.30.0.10 + ipv4_address: $TRAEFIK_IP command: # Logging - "--log.level=INFO" @@ -636,12 +646,14 @@ services: # gRPC transport settings - "--serverstransport.forwardingtimeouts.responseheadertimeout=0s" - "--serverstransport.forwardingtimeouts.idleconntimeout=0s" +$traefik_file_provider ports: - '443:443' - '80:80' volumes: - /var/run/docker.sock:/var/run/docker.sock:ro - netbird_traefik_letsencrypt:/letsencrypt +$traefik_dynamic_volume logging: driver: "json-file" options: @@ -751,6 +763,10 @@ server: cliRedirectURIs: - "http://localhost:53000/" + reverseProxy: + trustedHTTPProxies: + - "$TRAEFIK_IP/32" + store: engine: "sqlite" encryptionKey: "$DATASTORE_ENCRYPTION_KEY" @@ -780,6 +796,17 @@ EOF return 0 } +render_traefik_dynamic() { + cat <<'EOF' +tcp: + serversTransports: + pp-v2: + proxyProtocol: + version: 2 +EOF + return 0 +} + render_proxy_env() { cat < 0 { + ppListener.ConnPolicy = s.proxyProtocolPolicy + } else { + s.Logger.Warn("PROXY protocol enabled without trusted proxies; any source may send PROXY headers") + } + s.Logger.Info("PROXY protocol enabled on listener") + return ppListener +} + +// proxyProtocolPolicy returns whether to require, skip, or reject the PROXY +// header based on whether the connection source is in TrustedProxies. +func (s *Server) proxyProtocolPolicy(opts proxyproto.ConnPolicyOptions) (proxyproto.Policy, error) { + // No logging on reject to prevent abuse + tcpAddr, ok := opts.Upstream.(*net.TCPAddr) + if !ok { + return proxyproto.REJECT, nil + } + addr, ok := netip.AddrFromSlice(tcpAddr.IP) + if !ok { + return proxyproto.REJECT, nil + } + addr = addr.Unmap() + + // called per accept + for _, prefix := range s.TrustedProxies { + if prefix.Contains(addr) { + return proxyproto.REQUIRE, nil + } + } + return proxyproto.IGNORE, nil +} + const ( + defaultHealthAddr = "localhost:8080" + defaultDebugAddr = "localhost:8444" + + // proxyProtoHeaderTimeout is the deadline for reading the PROXY protocol + // header after accepting a connection. + proxyProtoHeaderTimeout = 5 * time.Second + // shutdownPreStopDelay is the time to wait after receiving a shutdown signal // before draining connections. This allows the load balancer to propagate // the endpoint removal. @@ -379,7 +471,12 @@ func (s *Server) gracefulShutdown() { s.Logger.Warnf("https server drain: %v", err) } - // Step 4: Stop all remaining background services. + // Step 4: Close hijacked connections (WebSocket) that Shutdown does not handle. + if n := s.hijackTracker.CloseAll(); n > 0 { + s.Logger.Infof("closed %d hijacked connection(s)", n) + } + + // Step 5: Stop all remaining background services. s.shutdownServices() s.Logger.Info("graceful shutdown complete") } @@ -647,7 +744,7 @@ func (s *Server) protoToMapping(mapping *proto.ProxyMapping) proxy.Mapping { // If addr is empty, it defaults to localhost:8444 for security. func debugEndpointAddr(addr string) string { if addr == "" { - return "localhost:8444" + return defaultDebugAddr } return addr }