mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 07:16:38 +00:00
* Refactor WG endpoint setup with role-based proxy activation For relay connections, the controller (initiator) now activates the wgProxy before configuring the WG endpoint, while the non-controller (responder) configures the endpoint first with a delayed update, then activates the proxy after. This prevents the responder from sending traffic through the proxy before WireGuard is ready to receive it, avoiding handshake congestion when both sides try to initiate simultaneously. For ICE connections, pass hasRelayBackup as the setEndpointNow flag so the responder sets the endpoint immediately when a relay fallback exists (avoiding the delayed update path since relay is already available as backup). On ICE disconnect with relay fallback, remove the duplicate wgProxyRelay.Work() calls — the relay proxy is already active from initial setup, so re-activating it is unnecessary. In EndpointUpdater, split ConfigureWGEndpoint into explicit configureAsInitiator and configureAsResponder paths, and add the setEndpointNow parameter to let the caller control whether the responder applies the endpoint immediately or defers it. Add unused SwitchWGEndpoint and RemoveEndpointAddress methods. Remove the wgConfigWorkaround sleep from the relay setup path. * Fix redundant wgProxyRelay.Work() call during relay fallback setup * Simplify WireGuard endpoint configuration by removing unused parameters and redundant logic
882 lines
26 KiB
Go
882 lines
26 KiB
Go
package peer
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"net/netip"
|
|
"runtime"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/pion/ice/v4"
|
|
log "github.com/sirupsen/logrus"
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
|
|
"github.com/netbirdio/netbird/client/iface/configurer"
|
|
"github.com/netbirdio/netbird/client/iface/wgproxy"
|
|
"github.com/netbirdio/netbird/client/internal/peer/conntype"
|
|
"github.com/netbirdio/netbird/client/internal/peer/dispatcher"
|
|
"github.com/netbirdio/netbird/client/internal/peer/guard"
|
|
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"
|
|
"github.com/netbirdio/netbird/client/internal/peer/id"
|
|
"github.com/netbirdio/netbird/client/internal/peer/worker"
|
|
"github.com/netbirdio/netbird/client/internal/stdnet"
|
|
"github.com/netbirdio/netbird/route"
|
|
relayClient "github.com/netbirdio/netbird/shared/relay/client"
|
|
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
|
)
|
|
|
|
type ServiceDependencies struct {
|
|
StatusRecorder *Status
|
|
Signaler *Signaler
|
|
IFaceDiscover stdnet.ExternalIFaceDiscover
|
|
RelayManager *relayClient.Manager
|
|
SrWatcher *guard.SRWatcher
|
|
Semaphore *semaphoregroup.SemaphoreGroup
|
|
PeerConnDispatcher *dispatcher.ConnectionDispatcher
|
|
}
|
|
|
|
type WgConfig struct {
|
|
WgListenPort int
|
|
RemoteKey string
|
|
WgInterface WGIface
|
|
AllowedIps []netip.Prefix
|
|
PreSharedKey *wgtypes.Key
|
|
}
|
|
|
|
type RosenpassConfig struct {
|
|
// RosenpassPubKey is this peer's Rosenpass public key
|
|
PubKey []byte
|
|
// RosenpassPubKey is this peer's RosenpassAddr server address (IP:port)
|
|
Addr string
|
|
|
|
PermissiveMode bool
|
|
}
|
|
|
|
// ConnConfig is a peer Connection configuration
|
|
type ConnConfig struct {
|
|
// Key is a public key of a remote peer
|
|
Key string
|
|
// LocalKey is a public key of a local peer
|
|
LocalKey string
|
|
|
|
AgentVersion string
|
|
|
|
Timeout time.Duration
|
|
|
|
WgConfig WgConfig
|
|
|
|
LocalWgPort int
|
|
|
|
RosenpassConfig RosenpassConfig
|
|
|
|
// ICEConfig ICE protocol configuration
|
|
ICEConfig icemaker.Config
|
|
}
|
|
|
|
type Conn struct {
|
|
Log *log.Entry
|
|
mu sync.Mutex
|
|
ctx context.Context
|
|
ctxCancel context.CancelFunc
|
|
config ConnConfig
|
|
statusRecorder *Status
|
|
signaler *Signaler
|
|
iFaceDiscover stdnet.ExternalIFaceDiscover
|
|
relayManager *relayClient.Manager
|
|
srWatcher *guard.SRWatcher
|
|
|
|
onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)
|
|
onDisconnected func(remotePeer string)
|
|
rosenpassInitializedPresharedKeyValidator func(peerKey string) bool
|
|
|
|
statusRelay *worker.AtomicWorkerStatus
|
|
statusICE *worker.AtomicWorkerStatus
|
|
currentConnPriority conntype.ConnPriority
|
|
opened bool // this flag is used to prevent close in case of not opened connection
|
|
|
|
workerICE *WorkerICE
|
|
workerRelay *WorkerRelay
|
|
|
|
wgWatcher *WGWatcher
|
|
wgWatcherWg sync.WaitGroup
|
|
wgWatcherCancel context.CancelFunc
|
|
|
|
// used to store the remote Rosenpass key for Relayed connection in case of connection update from ice
|
|
rosenpassRemoteKey []byte
|
|
|
|
wgProxyICE wgproxy.Proxy
|
|
wgProxyRelay wgproxy.Proxy
|
|
handshaker *Handshaker
|
|
|
|
guard *guard.Guard
|
|
semaphore *semaphoregroup.SemaphoreGroup
|
|
wg sync.WaitGroup
|
|
|
|
// debug purpose
|
|
dumpState *stateDump
|
|
|
|
endpointUpdater *EndpointUpdater
|
|
}
|
|
|
|
// NewConn creates a new not opened Conn to the remote peer.
|
|
// To establish a connection run Conn.Open
|
|
func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
|
if len(config.WgConfig.AllowedIps) == 0 {
|
|
return nil, fmt.Errorf("allowed IPs is empty")
|
|
}
|
|
|
|
connLog := log.WithField("peer", config.Key)
|
|
|
|
dumpState := newStateDump(config.Key, connLog, services.StatusRecorder)
|
|
var conn = &Conn{
|
|
Log: connLog,
|
|
config: config,
|
|
statusRecorder: services.StatusRecorder,
|
|
signaler: services.Signaler,
|
|
iFaceDiscover: services.IFaceDiscover,
|
|
relayManager: services.RelayManager,
|
|
srWatcher: services.SrWatcher,
|
|
semaphore: services.Semaphore,
|
|
statusRelay: worker.NewAtomicStatus(),
|
|
statusICE: worker.NewAtomicStatus(),
|
|
dumpState: dumpState,
|
|
endpointUpdater: NewEndpointUpdater(connLog, config.WgConfig, isController(config)),
|
|
wgWatcher: NewWGWatcher(connLog, config.WgConfig.WgInterface, config.Key, dumpState),
|
|
}
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
// Open opens connection to the remote peer
|
|
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
|
// be used.
|
|
func (conn *Conn) Open(engineCtx context.Context) error {
|
|
if err := conn.semaphore.Add(engineCtx); err != nil {
|
|
return err
|
|
}
|
|
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.opened {
|
|
conn.semaphore.Done()
|
|
return nil
|
|
}
|
|
|
|
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
|
|
|
|
conn.workerRelay = NewWorkerRelay(conn.ctx, conn.Log, isController(conn.config), conn.config, conn, conn.relayManager)
|
|
|
|
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
|
workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally)
|
|
if err != nil {
|
|
conn.semaphore.Done()
|
|
return err
|
|
}
|
|
conn.workerICE = workerICE
|
|
|
|
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
|
|
|
|
conn.handshaker.AddRelayListener(conn.workerRelay.OnNewOffer)
|
|
if !isForceRelayed() {
|
|
conn.handshaker.AddICEListener(conn.workerICE.OnNewOffer)
|
|
}
|
|
|
|
conn.guard = guard.NewGuard(conn.Log, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
|
|
|
|
conn.wg.Add(1)
|
|
go func() {
|
|
defer conn.wg.Done()
|
|
conn.handshaker.Listen(conn.ctx)
|
|
}()
|
|
go conn.dumpState.Start(conn.ctx)
|
|
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatusUpdate: time.Now(),
|
|
ConnStatus: StatusConnecting,
|
|
Mux: new(sync.RWMutex),
|
|
}
|
|
if err := conn.statusRecorder.UpdatePeerState(peerState); err != nil {
|
|
conn.Log.Warnf("error while updating the state err: %v", err)
|
|
}
|
|
|
|
conn.wg.Add(1)
|
|
go func() {
|
|
defer conn.wg.Done()
|
|
|
|
conn.waitInitialRandomSleepTime(conn.ctx)
|
|
conn.semaphore.Done()
|
|
|
|
conn.guard.Start(conn.ctx, conn.onGuardEvent)
|
|
}()
|
|
conn.opened = true
|
|
return nil
|
|
}
|
|
|
|
// Close closes this peer Conn issuing a close event to the Conn closeCh
|
|
func (conn *Conn) Close(signalToRemote bool) {
|
|
conn.mu.Lock()
|
|
defer conn.wgWatcherWg.Wait()
|
|
defer conn.mu.Unlock()
|
|
|
|
if !conn.opened {
|
|
conn.Log.Debugf("ignore close connection to peer")
|
|
return
|
|
}
|
|
|
|
if signalToRemote {
|
|
if err := conn.signaler.SignalIdle(conn.config.Key); err != nil {
|
|
conn.Log.Errorf("failed to signal idle state to peer: %v", err)
|
|
}
|
|
}
|
|
|
|
conn.Log.Infof("close peer connection")
|
|
conn.ctxCancel()
|
|
|
|
if conn.wgWatcherCancel != nil {
|
|
conn.wgWatcherCancel()
|
|
}
|
|
conn.workerRelay.CloseConn()
|
|
conn.workerICE.Close()
|
|
|
|
if conn.wgProxyRelay != nil {
|
|
err := conn.wgProxyRelay.CloseConn()
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to close wg proxy for relay: %v", err)
|
|
}
|
|
conn.wgProxyRelay = nil
|
|
}
|
|
|
|
if conn.wgProxyICE != nil {
|
|
err := conn.wgProxyICE.CloseConn()
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to close wg proxy for ice: %v", err)
|
|
}
|
|
conn.wgProxyICE = nil
|
|
}
|
|
|
|
if err := conn.endpointUpdater.RemoveWgPeer(); err != nil {
|
|
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
|
}
|
|
|
|
if conn.evalStatus() == StatusConnected && conn.onDisconnected != nil {
|
|
conn.onDisconnected(conn.config.WgConfig.RemoteKey)
|
|
}
|
|
|
|
conn.setStatusToDisconnected()
|
|
conn.opened = false
|
|
conn.wg.Wait()
|
|
conn.Log.Infof("peer connection closed")
|
|
}
|
|
|
|
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
|
// doesn't block, discards the message if connection wasn't ready
|
|
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) {
|
|
conn.dumpState.RemoteAnswer()
|
|
conn.Log.Infof("OnRemoteAnswer, priority: %s, status ICE: %s, status relay: %s", conn.currentConnPriority, conn.statusICE, conn.statusRelay)
|
|
conn.handshaker.OnRemoteAnswer(answer)
|
|
}
|
|
|
|
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
|
func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) {
|
|
conn.dumpState.RemoteCandidate()
|
|
conn.workerICE.OnRemoteCandidate(candidate, haRoutes)
|
|
}
|
|
|
|
// SetOnConnected sets a handler function to be triggered by Conn when a new connection to a remote peer established
|
|
func (conn *Conn) SetOnConnected(handler func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)) {
|
|
conn.onConnected = handler
|
|
}
|
|
|
|
// SetOnDisconnected sets a handler function to be triggered by Conn when a connection to a remote disconnected
|
|
func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
|
|
conn.onDisconnected = handler
|
|
}
|
|
|
|
// SetRosenpassInitializedPresharedKeyValidator sets a function to check if Rosenpass has taken over
|
|
// PSK management for a peer. When this returns true, presharedKey() returns nil
|
|
// to prevent UpdatePeer from overwriting the Rosenpass-managed PSK.
|
|
func (conn *Conn) SetRosenpassInitializedPresharedKeyValidator(handler func(peerKey string) bool) {
|
|
conn.rosenpassInitializedPresharedKeyValidator = handler
|
|
}
|
|
|
|
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) {
|
|
conn.dumpState.RemoteOffer()
|
|
conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
|
|
conn.handshaker.OnRemoteOffer(offer)
|
|
}
|
|
|
|
// WgConfig returns the WireGuard config
|
|
func (conn *Conn) WgConfig() WgConfig {
|
|
return conn.config.WgConfig
|
|
}
|
|
|
|
// IsConnected returns true if the peer is connected
|
|
func (conn *Conn) IsConnected() bool {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
return conn.evalStatus() == StatusConnected
|
|
}
|
|
|
|
func (conn *Conn) GetKey() string {
|
|
return conn.config.Key
|
|
}
|
|
|
|
func (conn *Conn) ConnID() id.ConnID {
|
|
return id.ConnID(conn)
|
|
}
|
|
|
|
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
|
func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConnInfo ICEConnInfo) {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
if remoteConnNil(conn.Log, iceConnInfo.RemoteConn) {
|
|
conn.Log.Errorf("remote ICE connection is nil")
|
|
return
|
|
}
|
|
|
|
// this never should happen, because Relay is the lower priority and ICE always close the deprecated connection before upgrade
|
|
// todo consider to remove this check
|
|
if conn.currentConnPriority > priority {
|
|
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
|
|
conn.statusICE.SetConnected()
|
|
conn.updateIceState(iceConnInfo)
|
|
return
|
|
}
|
|
|
|
conn.Log.Infof("set ICE to active connection")
|
|
conn.dumpState.P2PConnected()
|
|
|
|
var (
|
|
ep *net.UDPAddr
|
|
wgProxy wgproxy.Proxy
|
|
err error
|
|
)
|
|
if iceConnInfo.RelayedOnLocal {
|
|
conn.dumpState.NewLocalProxy()
|
|
wgProxy, err = conn.newProxy(iceConnInfo.RemoteConn)
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
|
return
|
|
}
|
|
ep = wgProxy.EndpointAddr()
|
|
conn.wgProxyICE = wgProxy
|
|
} else {
|
|
directEp, err := net.ResolveUDPAddr("udp", iceConnInfo.RemoteConn.RemoteAddr().String())
|
|
if err != nil {
|
|
log.Errorf("failed to resolveUDPaddr")
|
|
conn.handleConfigurationFailure(err, nil)
|
|
return
|
|
}
|
|
ep = directEp
|
|
}
|
|
|
|
if conn.wgProxyRelay != nil {
|
|
conn.wgProxyRelay.Pause()
|
|
}
|
|
|
|
if wgProxy != nil {
|
|
wgProxy.Work()
|
|
}
|
|
|
|
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
|
|
conn.enableWgWatcherIfNeeded()
|
|
|
|
presharedKey := conn.presharedKey(iceConnInfo.RosenpassPubKey)
|
|
if err = conn.endpointUpdater.ConfigureWGEndpoint(ep, presharedKey); err != nil {
|
|
conn.handleConfigurationFailure(err, wgProxy)
|
|
return
|
|
}
|
|
wgConfigWorkaround()
|
|
|
|
if conn.wgProxyRelay != nil {
|
|
conn.Log.Debugf("redirect packets from relayed conn to WireGuard")
|
|
conn.wgProxyRelay.RedirectAs(ep)
|
|
}
|
|
|
|
conn.currentConnPriority = priority
|
|
conn.statusICE.SetConnected()
|
|
conn.updateIceState(iceConnInfo)
|
|
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
|
|
}
|
|
|
|
func (conn *Conn) onICEStateDisconnected(sessionChanged bool) {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
conn.Log.Tracef("ICE connection state changed to disconnected")
|
|
|
|
if conn.wgProxyICE != nil {
|
|
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
|
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
|
}
|
|
}
|
|
|
|
// switch back to relay connection
|
|
if conn.isReadyToUpgrade() {
|
|
conn.Log.Infof("ICE disconnected, set Relay to active connection")
|
|
conn.dumpState.SwitchToRelay()
|
|
if sessionChanged {
|
|
conn.resetEndpoint()
|
|
}
|
|
|
|
// todo consider to move after the ConfigureWGEndpoint
|
|
conn.wgProxyRelay.Work()
|
|
|
|
presharedKey := conn.presharedKey(conn.rosenpassRemoteKey)
|
|
if err := conn.endpointUpdater.SwitchWGEndpoint(conn.wgProxyRelay.EndpointAddr(), presharedKey); err != nil {
|
|
conn.Log.Errorf("failed to switch to relay conn: %v", err)
|
|
}
|
|
|
|
conn.currentConnPriority = conntype.Relay
|
|
} else {
|
|
conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String())
|
|
conn.currentConnPriority = conntype.None
|
|
if err := conn.config.WgConfig.WgInterface.RemoveEndpointAddress(conn.config.WgConfig.RemoteKey); err != nil {
|
|
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
|
}
|
|
}
|
|
|
|
changed := conn.statusICE.Get() != worker.StatusDisconnected
|
|
if changed {
|
|
conn.guard.SetICEConnDisconnected()
|
|
}
|
|
conn.statusICE.SetDisconnected()
|
|
|
|
conn.disableWgWatcherIfNeeded()
|
|
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatus: conn.evalStatus(),
|
|
Relayed: conn.isRelayed(),
|
|
ConnStatusUpdate: time.Now(),
|
|
}
|
|
if err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState); err != nil {
|
|
conn.Log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.ctx.Err() != nil {
|
|
if err := rci.relayedConn.Close(); err != nil {
|
|
conn.Log.Warnf("failed to close unnecessary relayed connection: %v", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
conn.dumpState.RelayConnected()
|
|
conn.Log.Debugf("Relay connection has been established, setup the WireGuard")
|
|
|
|
wgProxy, err := conn.newProxy(rci.relayedConn)
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
|
|
return
|
|
}
|
|
wgProxy.SetDisconnectListener(conn.onRelayDisconnected)
|
|
|
|
conn.dumpState.NewLocalProxy()
|
|
|
|
conn.Log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
|
|
|
|
if conn.isICEActive() {
|
|
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
|
|
conn.setRelayedProxy(wgProxy)
|
|
conn.statusRelay.SetConnected()
|
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
|
return
|
|
}
|
|
|
|
controller := isController(conn.config)
|
|
|
|
if controller {
|
|
wgProxy.Work()
|
|
}
|
|
conn.enableWgWatcherIfNeeded()
|
|
if err := conn.endpointUpdater.ConfigureWGEndpoint(wgProxy.EndpointAddr(), conn.presharedKey(rci.rosenpassPubKey)); err != nil {
|
|
if err := wgProxy.CloseConn(); err != nil {
|
|
conn.Log.Warnf("Failed to close relay connection: %v", err)
|
|
}
|
|
conn.Log.Errorf("Failed to update WireGuard peer configuration: %v", err)
|
|
return
|
|
}
|
|
if !controller {
|
|
wgProxy.Work()
|
|
}
|
|
conn.rosenpassRemoteKey = rci.rosenpassPubKey
|
|
conn.currentConnPriority = conntype.Relay
|
|
conn.statusRelay.SetConnected()
|
|
conn.setRelayedProxy(wgProxy)
|
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
|
conn.Log.Infof("start to communicate with peer via relay")
|
|
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
|
}
|
|
|
|
func (conn *Conn) onRelayDisconnected() {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
conn.handleRelayDisconnectedLocked()
|
|
}
|
|
|
|
// handleRelayDisconnectedLocked handles relay disconnection. Caller must hold conn.mu.
|
|
func (conn *Conn) handleRelayDisconnectedLocked() {
|
|
if conn.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
conn.Log.Debugf("relay connection is disconnected")
|
|
|
|
if conn.currentConnPriority == conntype.Relay {
|
|
conn.Log.Debugf("clean up WireGuard config")
|
|
conn.currentConnPriority = conntype.None
|
|
if err := conn.config.WgConfig.WgInterface.RemoveEndpointAddress(conn.config.WgConfig.RemoteKey); err != nil {
|
|
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
|
}
|
|
}
|
|
|
|
if conn.wgProxyRelay != nil {
|
|
_ = conn.wgProxyRelay.CloseConn()
|
|
conn.wgProxyRelay = nil
|
|
}
|
|
|
|
changed := conn.statusRelay.Get() != worker.StatusDisconnected
|
|
if changed {
|
|
conn.guard.SetRelayedConnDisconnected()
|
|
}
|
|
conn.statusRelay.SetDisconnected()
|
|
|
|
conn.disableWgWatcherIfNeeded()
|
|
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatus: conn.evalStatus(),
|
|
Relayed: conn.isRelayed(),
|
|
ConnStatusUpdate: time.Now(),
|
|
}
|
|
if err := conn.statusRecorder.UpdatePeerRelayedStateToDisconnected(peerState); err != nil {
|
|
conn.Log.Warnf("unable to save peer's state to Relay disconnected, got error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) onGuardEvent() {
|
|
conn.dumpState.SendOffer()
|
|
if err := conn.handshaker.SendOffer(); err != nil {
|
|
conn.Log.Errorf("failed to send offer: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) onWGDisconnected() {
|
|
conn.mu.Lock()
|
|
defer conn.mu.Unlock()
|
|
|
|
if conn.ctx.Err() != nil {
|
|
return
|
|
}
|
|
|
|
conn.Log.Warnf("WireGuard handshake timeout detected, closing current connection")
|
|
|
|
// Close the active connection based on current priority
|
|
switch conn.currentConnPriority {
|
|
case conntype.Relay:
|
|
conn.workerRelay.CloseConn()
|
|
conn.handleRelayDisconnectedLocked()
|
|
case conntype.ICEP2P, conntype.ICETurn:
|
|
conn.workerICE.Close()
|
|
default:
|
|
conn.Log.Debugf("No active connection to close on WG timeout")
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatusUpdate: time.Now(),
|
|
ConnStatus: conn.evalStatus(),
|
|
Relayed: conn.isRelayed(),
|
|
RelayServerAddress: relayServerAddr,
|
|
RosenpassEnabled: isRosenpassEnabled(rosenpassPubKey),
|
|
}
|
|
|
|
err := conn.statusRecorder.UpdatePeerRelayedState(peerState)
|
|
if err != nil {
|
|
conn.Log.Warnf("unable to save peer's Relay state, got error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatusUpdate: time.Now(),
|
|
ConnStatus: conn.evalStatus(),
|
|
Relayed: iceConnInfo.Relayed,
|
|
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
|
|
RemoteIceCandidateType: iceConnInfo.RemoteIceCandidateType,
|
|
LocalIceCandidateEndpoint: iceConnInfo.LocalIceCandidateEndpoint,
|
|
RemoteIceCandidateEndpoint: iceConnInfo.RemoteIceCandidateEndpoint,
|
|
RosenpassEnabled: isRosenpassEnabled(iceConnInfo.RosenpassPubKey),
|
|
}
|
|
|
|
err := conn.statusRecorder.UpdatePeerICEState(peerState)
|
|
if err != nil {
|
|
conn.Log.Warnf("unable to save peer's ICE state, got error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) setStatusToDisconnected() {
|
|
conn.statusRelay.SetDisconnected()
|
|
conn.statusICE.SetDisconnected()
|
|
conn.currentConnPriority = conntype.None
|
|
|
|
peerState := State{
|
|
PubKey: conn.config.Key,
|
|
ConnStatus: StatusIdle,
|
|
ConnStatusUpdate: time.Now(),
|
|
Mux: new(sync.RWMutex),
|
|
}
|
|
err := conn.statusRecorder.UpdatePeerState(peerState)
|
|
if err != nil {
|
|
// pretty common error because by that time Engine can already remove the peer and status won't be available.
|
|
// todo rethink status updates
|
|
conn.Log.Debugf("error while updating peer's state, err: %v", err)
|
|
}
|
|
if err := conn.statusRecorder.UpdateWireGuardPeerState(conn.config.Key, configurer.WGStats{}); err != nil {
|
|
conn.Log.Debugf("failed to reset wireguard stats for peer: %s", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
|
|
if runtime.GOOS == "ios" {
|
|
runtime.GC()
|
|
}
|
|
|
|
if conn.onConnected != nil {
|
|
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, conn.config.WgConfig.AllowedIps[0].Addr().String(), remoteRosenpassAddr)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) waitInitialRandomSleepTime(ctx context.Context) {
|
|
maxWait := 300
|
|
duration := time.Duration(rand.Intn(maxWait)) * time.Millisecond
|
|
|
|
timeout := time.NewTimer(duration)
|
|
defer timeout.Stop()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
case <-timeout.C:
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) isRelayed() bool {
|
|
switch conn.currentConnPriority {
|
|
case conntype.Relay, conntype.ICETurn:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) evalStatus() ConnStatus {
|
|
if conn.statusRelay.Get() == worker.StatusConnected || conn.statusICE.Get() == worker.StatusConnected {
|
|
return StatusConnected
|
|
}
|
|
|
|
return StatusConnecting
|
|
}
|
|
|
|
func (conn *Conn) isConnectedOnAllWay() (connected bool) {
|
|
// would be better to protect this with a mutex, but it could cause deadlock with Close function
|
|
|
|
defer func() {
|
|
if !connected {
|
|
conn.logTraceConnState()
|
|
}
|
|
}()
|
|
|
|
// For JS platform: only relay connection is supported
|
|
if runtime.GOOS == "js" {
|
|
return conn.statusRelay.Get() == worker.StatusConnected
|
|
}
|
|
|
|
// For non-JS platforms: check ICE connection status
|
|
if conn.statusICE.Get() == worker.StatusDisconnected && !conn.workerICE.InProgress() {
|
|
return false
|
|
}
|
|
|
|
// If relay is supported with peer, it must also be connected
|
|
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
|
|
if conn.statusRelay.Get() == worker.StatusDisconnected {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (conn *Conn) enableWgWatcherIfNeeded() {
|
|
if !conn.wgWatcher.IsEnabled() {
|
|
wgWatcherCtx, wgWatcherCancel := context.WithCancel(conn.ctx)
|
|
conn.wgWatcherCancel = wgWatcherCancel
|
|
conn.wgWatcherWg.Add(1)
|
|
go func() {
|
|
defer conn.wgWatcherWg.Done()
|
|
conn.wgWatcher.EnableWgWatcher(wgWatcherCtx, conn.onWGDisconnected)
|
|
}()
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) disableWgWatcherIfNeeded() {
|
|
if conn.currentConnPriority == conntype.None && conn.wgWatcherCancel != nil {
|
|
conn.wgWatcherCancel()
|
|
conn.wgWatcherCancel = nil
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
|
|
conn.Log.Debugf("setup proxied WireGuard connection")
|
|
udpAddr := &net.UDPAddr{
|
|
IP: conn.config.WgConfig.AllowedIps[0].Addr().AsSlice(),
|
|
Port: conn.config.WgConfig.WgListenPort,
|
|
}
|
|
|
|
wgProxy := conn.config.WgConfig.WgInterface.GetProxy()
|
|
if err := wgProxy.AddTurnConn(conn.ctx, udpAddr, remoteConn); err != nil {
|
|
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
|
return nil, err
|
|
}
|
|
return wgProxy, nil
|
|
}
|
|
|
|
func (conn *Conn) resetEndpoint() {
|
|
if !isController(conn.config) {
|
|
return
|
|
}
|
|
conn.Log.Infof("reset wg endpoint")
|
|
conn.wgWatcher.Reset()
|
|
if err := conn.endpointUpdater.RemoveEndpointAddress(); err != nil {
|
|
conn.Log.Warnf("failed to remove endpoint address before update: %v", err)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) isReadyToUpgrade() bool {
|
|
return conn.wgProxyRelay != nil && conn.currentConnPriority != conntype.Relay
|
|
}
|
|
|
|
func (conn *Conn) isICEActive() bool {
|
|
return (conn.currentConnPriority == conntype.ICEP2P || conn.currentConnPriority == conntype.ICETurn) && conn.statusICE.Get() == worker.StatusConnected
|
|
}
|
|
|
|
func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
|
|
conn.Log.Warnf("Failed to update wg peer configuration: %v", err)
|
|
if wgProxy != nil {
|
|
if ierr := wgProxy.CloseConn(); ierr != nil {
|
|
conn.Log.Warnf("Failed to close wg proxy: %v", ierr)
|
|
}
|
|
}
|
|
if conn.wgProxyRelay != nil {
|
|
conn.wgProxyRelay.Work()
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) logTraceConnState() {
|
|
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
|
|
conn.Log.Tracef("connectivity guard check, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
|
|
} else {
|
|
conn.Log.Tracef("connectivity guard check, ice state: %s", conn.statusICE)
|
|
}
|
|
}
|
|
|
|
func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
|
|
if conn.wgProxyRelay != nil {
|
|
if err := conn.wgProxyRelay.CloseConn(); err != nil {
|
|
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
|
}
|
|
}
|
|
conn.wgProxyRelay = proxy
|
|
}
|
|
|
|
// AllowedIP returns the allowed IP of the remote peer
|
|
func (conn *Conn) AllowedIP() netip.Addr {
|
|
return conn.config.WgConfig.AllowedIps[0].Addr()
|
|
}
|
|
|
|
func (conn *Conn) AgentVersionString() string {
|
|
return conn.config.AgentVersion
|
|
}
|
|
|
|
func (conn *Conn) presharedKey(remoteRosenpassKey []byte) *wgtypes.Key {
|
|
if conn.config.RosenpassConfig.PubKey == nil {
|
|
return conn.config.WgConfig.PreSharedKey
|
|
}
|
|
|
|
if remoteRosenpassKey == nil && conn.config.RosenpassConfig.PermissiveMode {
|
|
return conn.config.WgConfig.PreSharedKey
|
|
}
|
|
|
|
// If Rosenpass has already set a PSK for this peer, return nil to prevent
|
|
// UpdatePeer from overwriting the Rosenpass-managed key.
|
|
if conn.rosenpassInitializedPresharedKeyValidator != nil && conn.rosenpassInitializedPresharedKeyValidator(conn.config.Key) {
|
|
return nil
|
|
}
|
|
|
|
// Use NetBird PSK as the seed for Rosenpass. This same PSK is passed to
|
|
// Rosenpass as PeerConfig.PresharedKey, ensuring the derived post-quantum
|
|
// key is cryptographically bound to the original secret.
|
|
if conn.config.WgConfig.PreSharedKey != nil {
|
|
return conn.config.WgConfig.PreSharedKey
|
|
}
|
|
|
|
// Fallback to deterministic key if no NetBird PSK is configured
|
|
determKey, err := conn.rosenpassDetermKey()
|
|
if err != nil {
|
|
conn.Log.Errorf("failed to generate Rosenpass initial key: %v", err)
|
|
return nil
|
|
}
|
|
|
|
return determKey
|
|
}
|
|
|
|
// todo: move this logic into Rosenpass package
|
|
func (conn *Conn) rosenpassDetermKey() (*wgtypes.Key, error) {
|
|
lk := []byte(conn.config.LocalKey)
|
|
rk := []byte(conn.config.Key) // remote key
|
|
var keyInput []byte
|
|
if string(lk) > string(rk) {
|
|
//nolint:gocritic
|
|
keyInput = append(lk[:16], rk[:16]...)
|
|
} else {
|
|
//nolint:gocritic
|
|
keyInput = append(rk[:16], lk[:16]...)
|
|
}
|
|
|
|
key, err := wgtypes.NewKey(keyInput)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &key, nil
|
|
}
|
|
|
|
func isController(config ConnConfig) bool {
|
|
return config.LocalKey > config.Key
|
|
}
|
|
|
|
func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {
|
|
return remoteRosenpassPubKey != nil
|
|
}
|