mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-06 17:08:53 +00:00
Integrate relay into peer conn
- extend mgm with relay address - extend signaling with remote peer's relay address - start setup relay connection before engine start
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/netbirdio/netbird/client/internal/wgproxy"
|
||||
"github.com/netbirdio/netbird/iface"
|
||||
"github.com/netbirdio/netbird/iface/bind"
|
||||
relayClient "github.com/netbirdio/netbird/relay/client"
|
||||
"github.com/netbirdio/netbird/route"
|
||||
sProto "github.com/netbirdio/netbird/signal/proto"
|
||||
nbnet "github.com/netbirdio/netbird/util/net"
|
||||
@@ -91,6 +92,9 @@ type OfferAnswer struct {
|
||||
// RosenpassAddr is the Rosenpass server address (IP:port) of the remote peer when receiving this message
|
||||
// This value is the local Rosenpass server address when sending the message
|
||||
RosenpassAddr string
|
||||
|
||||
// relay server address
|
||||
RelaySrvAddress string
|
||||
}
|
||||
|
||||
// IceCredentials ICE protocol credentials struct
|
||||
@@ -138,6 +142,112 @@ type Conn struct {
|
||||
connID nbnet.ConnectionID
|
||||
beforeAddPeerHooks []BeforeAddPeerHookFunc
|
||||
afterRemovePeerHooks []AfterRemovePeerHookFunc
|
||||
|
||||
relayManager *relayClient.Manager
|
||||
}
|
||||
|
||||
// NewConn creates a new not opened Conn to the remote peer.
|
||||
// To establish a connection run Conn.Open
|
||||
func NewConn(config ConnConfig, statusRecorder *Status, wgProxyFactory *wgproxy.Factory, adapter iface.TunAdapter, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager) (*Conn, error) {
|
||||
return &Conn{
|
||||
config: config,
|
||||
mu: sync.Mutex{},
|
||||
status: StatusDisconnected,
|
||||
closeCh: make(chan struct{}),
|
||||
remoteOffersCh: make(chan OfferAnswer),
|
||||
remoteAnswerCh: make(chan OfferAnswer),
|
||||
statusRecorder: statusRecorder,
|
||||
wgProxyFactory: wgProxyFactory,
|
||||
adapter: adapter,
|
||||
iFaceDiscover: iFaceDiscover,
|
||||
relayManager: relayManager,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Open opens connection to the remote peer starting ICE candidate gathering process.
|
||||
// Blocks until connection has been closed or connection timeout.
|
||||
// ConnStatus will be set accordingly
|
||||
func (conn *Conn) Open(ctx context.Context) error {
|
||||
log.Debugf("trying to connect to peer %s", conn.config.Key)
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
IP: strings.Split(conn.config.WgConfig.AllowedIps, "/")[0],
|
||||
ConnStatusUpdate: time.Now(),
|
||||
ConnStatus: conn.status,
|
||||
Mux: new(sync.RWMutex),
|
||||
}
|
||||
err := conn.statusRecorder.UpdatePeerState(peerState)
|
||||
if err != nil {
|
||||
log.Warnf("error while updating the state of peer %s,err: %v", conn.config.Key, err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := conn.cleanup()
|
||||
if err != nil {
|
||||
log.Warnf("error while cleaning up peer connection %s: %v", conn.config.Key, err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
err = conn.sendOffer()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("connection offer sent to peer %s, waiting for the confirmation", conn.config.Key)
|
||||
|
||||
// Only continue once we got a connection confirmation from the remote peer.
|
||||
// The connection timeout could have happened before a confirmation received from the remote.
|
||||
// The connection could have also been closed externally (e.g. when we received an update from the management that peer shouldn't be connected)
|
||||
remoteOfferAnswer, err := conn.waitForRemoteOfferConfirmation()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("received connection confirmation from peer %s running version %s and with remote WireGuard listen port %d",
|
||||
conn.config.Key, remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort)
|
||||
|
||||
// at this point we received offer/answer and we are ready to gather candidates
|
||||
conn.mu.Lock()
|
||||
conn.status = StatusConnecting
|
||||
conn.ctx, conn.notifyDisconnected = context.WithCancel(ctx)
|
||||
defer conn.notifyDisconnected()
|
||||
conn.mu.Unlock()
|
||||
|
||||
peerState = State{
|
||||
PubKey: conn.config.Key,
|
||||
ConnStatus: conn.status,
|
||||
ConnStatusUpdate: time.Now(),
|
||||
Mux: new(sync.RWMutex),
|
||||
}
|
||||
err = conn.statusRecorder.UpdatePeerState(peerState)
|
||||
if err != nil {
|
||||
log.Warnf("error while updating the state of peer %s,err: %v", conn.config.Key, err)
|
||||
}
|
||||
|
||||
// in edge case this function can block while the manager set up a new relay server connection
|
||||
relayOperate := conn.setupRelayConnection(remoteOfferAnswer)
|
||||
|
||||
err = conn.setupICEConnection(remoteOfferAnswer, relayOperate)
|
||||
if err != nil {
|
||||
log.Errorf("failed to setup ICE connection: %s", err)
|
||||
if !relayOperate {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// wait until connection disconnected or has been closed externally (upper layer, e.g. engine)
|
||||
err = conn.waitForDisconnection()
|
||||
return err
|
||||
}
|
||||
|
||||
func (conn *Conn) AddBeforeAddPeerHook(hook BeforeAddPeerHookFunc) {
|
||||
conn.beforeAddPeerHooks = append(conn.beforeAddPeerHooks, hook)
|
||||
}
|
||||
|
||||
func (conn *Conn) AddAfterRemovePeerHook(hook AfterRemovePeerHookFunc) {
|
||||
conn.afterRemovePeerHooks = append(conn.afterRemovePeerHooks, hook)
|
||||
}
|
||||
|
||||
// GetConf returns the connection config
|
||||
@@ -155,24 +265,126 @@ func (conn *Conn) UpdateStunTurn(turnStun []*stun.URI) {
|
||||
conn.config.StunTurn = turnStun
|
||||
}
|
||||
|
||||
// NewConn creates a new not opened Conn to the remote peer.
|
||||
// To establish a connection run Conn.Open
|
||||
func NewConn(config ConnConfig, statusRecorder *Status, wgProxyFactory *wgproxy.Factory, adapter iface.TunAdapter, iFaceDiscover stdnet.ExternalIFaceDiscover) (*Conn, error) {
|
||||
return &Conn{
|
||||
config: config,
|
||||
mu: sync.Mutex{},
|
||||
status: StatusDisconnected,
|
||||
closeCh: make(chan struct{}),
|
||||
remoteOffersCh: make(chan OfferAnswer),
|
||||
remoteAnswerCh: make(chan OfferAnswer),
|
||||
statusRecorder: statusRecorder,
|
||||
wgProxyFactory: wgProxyFactory,
|
||||
adapter: adapter,
|
||||
iFaceDiscover: iFaceDiscover,
|
||||
}, nil
|
||||
// SetSignalOffer sets a handler function to be triggered by Conn when a new connection offer has to be signalled to the remote peer
|
||||
func (conn *Conn) SetSignalOffer(handler func(offer OfferAnswer) error) {
|
||||
conn.signalOffer = handler
|
||||
}
|
||||
|
||||
func (conn *Conn) reCreateAgent() error {
|
||||
// SetOnConnected sets a handler function to be triggered by Conn when a new connection to a remote peer established
|
||||
func (conn *Conn) SetOnConnected(handler func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)) {
|
||||
conn.onConnected = handler
|
||||
}
|
||||
|
||||
// SetOnDisconnected sets a handler function to be triggered by Conn when a connection to a remote disconnected
|
||||
func (conn *Conn) SetOnDisconnected(handler func(remotePeer string, wgIP string)) {
|
||||
conn.onDisconnected = handler
|
||||
}
|
||||
|
||||
// SetSignalAnswer sets a handler function to be triggered by Conn when a new connection answer has to be signalled to the remote peer
|
||||
func (conn *Conn) SetSignalAnswer(handler func(answer OfferAnswer) error) {
|
||||
conn.signalAnswer = handler
|
||||
}
|
||||
|
||||
// SetSignalCandidate sets a handler function to be triggered by Conn when a new ICE local connection candidate has to be signalled to the remote peer
|
||||
func (conn *Conn) SetSignalCandidate(handler func(candidate ice.Candidate) error) {
|
||||
conn.signalCandidate = handler
|
||||
}
|
||||
|
||||
// SetSendSignalMessage sets a handler function to be triggered by Conn when there is new message to send via signal
|
||||
func (conn *Conn) SetSendSignalMessage(handler func(message *sProto.Message) error) {
|
||||
conn.sendSignalMessage = handler
|
||||
}
|
||||
|
||||
// Close closes this peer Conn issuing a close event to the Conn closeCh
|
||||
func (conn *Conn) Close() error {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
select {
|
||||
case conn.closeCh <- struct{}{}:
|
||||
return nil
|
||||
default:
|
||||
// probably could happen when peer has been added and removed right after not even starting to connect
|
||||
// todo further investigate
|
||||
// this really happens due to unordered messages coming from management
|
||||
// more importantly it causes inconsistency -> 2 Conn objects for the same peer
|
||||
// e.g. this flow:
|
||||
// update from management has peers: [1,2,3,4]
|
||||
// engine creates a Conn for peers: [1,2,3,4] and schedules Open in ~1sec
|
||||
// before conn.Open() another update from management arrives with peers: [1,2,3]
|
||||
// engine removes peer 4 and calls conn.Close() which does nothing (this default clause)
|
||||
// before conn.Open() another update from management arrives with peers: [1,2,3,4,5]
|
||||
// engine adds a new Conn for 4 and 5
|
||||
// therefore peer 4 has 2 Conn objects
|
||||
log.Warnf("Connection has been already closed or attempted closing not started connection %s", conn.config.Key)
|
||||
return NewConnectionAlreadyClosed(conn.config.Key)
|
||||
}
|
||||
}
|
||||
|
||||
// Status returns current status of the Conn
|
||||
func (conn *Conn) Status() ConnStatus {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
return conn.status
|
||||
}
|
||||
|
||||
// OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
||||
// doesn't block, discards the message if connection wasn't ready
|
||||
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool {
|
||||
log.Debugf("OnRemoteOffer from peer %s on status %s", conn.config.Key, conn.status.String())
|
||||
|
||||
select {
|
||||
case conn.remoteOffersCh <- offer:
|
||||
return true
|
||||
default:
|
||||
log.Debugf("OnRemoteOffer skipping message from peer %s on status %s because is not ready", conn.config.Key, conn.status.String())
|
||||
// connection might not be ready yet to receive so we ignore the message
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
||||
// doesn't block, discards the message if connection wasn't ready
|
||||
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool {
|
||||
log.Debugf("OnRemoteAnswer from peer %s on status %s", conn.config.Key, conn.status.String())
|
||||
|
||||
select {
|
||||
case conn.remoteAnswerCh <- answer:
|
||||
return true
|
||||
default:
|
||||
// connection might not be ready yet to receive so we ignore the message
|
||||
log.Debugf("OnRemoteAnswer skipping message from peer %s on status %s because is not ready", conn.config.Key, conn.status.String())
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
||||
func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) {
|
||||
log.Debugf("OnRemoteCandidate from peer %s -> %s", conn.config.Key, candidate.String())
|
||||
go func() {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
if conn.agent == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if candidateViaRoutes(candidate, haRoutes) {
|
||||
return
|
||||
}
|
||||
|
||||
err := conn.agent.AddRemoteCandidate(candidate)
|
||||
if err != nil {
|
||||
log.Errorf("error while handling remote candidate from peer %s", conn.config.Key)
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (conn *Conn) GetKey() string {
|
||||
return conn.config.Key
|
||||
}
|
||||
|
||||
func (conn *Conn) reCreateAgent(relaySupport []ice.CandidateType) error {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
@@ -192,7 +404,7 @@ func (conn *Conn) reCreateAgent() error {
|
||||
MulticastDNSMode: ice.MulticastDNSModeDisabled,
|
||||
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4, ice.NetworkTypeUDP6},
|
||||
Urls: conn.config.StunTurn,
|
||||
CandidateTypes: conn.candidateTypes(),
|
||||
CandidateTypes: candidateTypes(),
|
||||
FailedTimeout: &failedTimeout,
|
||||
InterfaceFilter: stdnet.InterfaceFilter(conn.config.InterfaceBlackList),
|
||||
UDPMux: conn.config.UDPMux,
|
||||
@@ -242,150 +454,72 @@ func (conn *Conn) reCreateAgent() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (conn *Conn) candidateTypes() []ice.CandidateType {
|
||||
if hasICEForceRelayConn() {
|
||||
return []ice.CandidateType{ice.CandidateTypeRelay}
|
||||
}
|
||||
// TODO: remove this once we have refactored userspace proxy into the bind package
|
||||
if runtime.GOOS == "ios" {
|
||||
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive}
|
||||
}
|
||||
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay}
|
||||
}
|
||||
func (conn *Conn) configureWgConnectionForRelay(remoteConn net.Conn, remoteRosenpassPubKey []byte, remoteRosenpassAddr string) error {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
// Open opens connection to the remote peer starting ICE candidate gathering process.
|
||||
// Blocks until connection has been closed or connection timeout.
|
||||
// ConnStatus will be set accordingly
|
||||
func (conn *Conn) Open(ctx context.Context) error {
|
||||
log.Debugf("trying to connect to peer %s", conn.config.Key)
|
||||
conn.wgProxy = conn.wgProxyFactory.GetProxy(conn.ctx)
|
||||
endpoint, err := conn.wgProxy.AddTurnConn(remoteConn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String())
|
||||
log.Debugf("Conn resolved IP for %s: %s", endpoint, endpointUdpAddr.IP)
|
||||
|
||||
conn.connID = nbnet.GenerateConnID()
|
||||
for _, hook := range conn.beforeAddPeerHooks {
|
||||
if err := hook(conn.connID, endpointUdpAddr.IP); err != nil {
|
||||
log.Errorf("Before add peer hook failed: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
err = conn.config.WgConfig.WgInterface.UpdatePeer(conn.config.WgConfig.RemoteKey, conn.config.WgConfig.AllowedIps, defaultWgKeepAlive, endpointUdpAddr, conn.config.WgConfig.PreSharedKey)
|
||||
if err != nil {
|
||||
if conn.wgProxy != nil {
|
||||
if err := conn.wgProxy.CloseConn(); err != nil {
|
||||
log.Warnf("Failed to close relay connection: %v", err)
|
||||
}
|
||||
}
|
||||
// todo: is this nil correct?
|
||||
return nil
|
||||
}
|
||||
|
||||
conn.status = StatusConnected
|
||||
|
||||
peerState := State{
|
||||
PubKey: conn.config.Key,
|
||||
IP: strings.Split(conn.config.WgConfig.AllowedIps, "/")[0],
|
||||
ConnStatusUpdate: time.Now(),
|
||||
ConnStatus: conn.status,
|
||||
Mux: new(sync.RWMutex),
|
||||
}
|
||||
err := conn.statusRecorder.UpdatePeerState(peerState)
|
||||
if err != nil {
|
||||
log.Warnf("error while updating the state of peer %s,err: %v", conn.config.Key, err)
|
||||
PubKey: conn.config.Key,
|
||||
ConnStatus: StatusConnected,
|
||||
ConnStatusUpdate: time.Now(),
|
||||
LocalIceCandidateType: "",
|
||||
RemoteIceCandidateType: "",
|
||||
LocalIceCandidateEndpoint: "",
|
||||
RemoteIceCandidateEndpoint: "",
|
||||
Direct: false,
|
||||
RosenpassEnabled: isRosenpassEnabled(remoteRosenpassPubKey),
|
||||
Mux: new(sync.RWMutex),
|
||||
Relayed: true,
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err := conn.cleanup()
|
||||
if err != nil {
|
||||
log.Warnf("error while cleaning up peer connection %s: %v", conn.config.Key, err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
err = conn.reCreateAgent()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = conn.sendOffer()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debugf("connection offer sent to peer %s, waiting for the confirmation", conn.config.Key)
|
||||
|
||||
// Only continue once we got a connection confirmation from the remote peer.
|
||||
// The connection timeout could have happened before a confirmation received from the remote.
|
||||
// The connection could have also been closed externally (e.g. when we received an update from the management that peer shouldn't be connected)
|
||||
var remoteOfferAnswer OfferAnswer
|
||||
select {
|
||||
case remoteOfferAnswer = <-conn.remoteOffersCh:
|
||||
// received confirmation from the remote peer -> ready to proceed
|
||||
err = conn.sendAnswer()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
case remoteOfferAnswer = <-conn.remoteAnswerCh:
|
||||
case <-time.After(conn.config.Timeout):
|
||||
return NewConnectionTimeoutError(conn.config.Key, conn.config.Timeout)
|
||||
case <-conn.closeCh:
|
||||
// closed externally
|
||||
return NewConnectionClosedError(conn.config.Key)
|
||||
}
|
||||
|
||||
log.Debugf("received connection confirmation from peer %s running version %s and with remote WireGuard listen port %d",
|
||||
conn.config.Key, remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort)
|
||||
|
||||
// at this point we received offer/answer and we are ready to gather candidates
|
||||
conn.mu.Lock()
|
||||
conn.status = StatusConnecting
|
||||
conn.ctx, conn.notifyDisconnected = context.WithCancel(ctx)
|
||||
defer conn.notifyDisconnected()
|
||||
conn.mu.Unlock()
|
||||
|
||||
peerState = State{
|
||||
PubKey: conn.config.Key,
|
||||
ConnStatus: conn.status,
|
||||
ConnStatusUpdate: time.Now(),
|
||||
Mux: new(sync.RWMutex),
|
||||
}
|
||||
err = conn.statusRecorder.UpdatePeerState(peerState)
|
||||
if err != nil {
|
||||
log.Warnf("error while updating the state of peer %s,err: %v", conn.config.Key, err)
|
||||
log.Warnf("unable to save peer's state, got error: %v", err)
|
||||
}
|
||||
|
||||
err = conn.agent.GatherCandidates()
|
||||
_, ipNet, err := net.ParseCIDR(conn.config.WgConfig.AllowedIps)
|
||||
if err != nil {
|
||||
return fmt.Errorf("gather candidates: %v", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
// will block until connection succeeded
|
||||
// but it won't release if ICE Agent went into Disconnected or Failed state,
|
||||
// so we have to cancel it with the provided context once agent detected a broken connection
|
||||
isControlling := conn.config.LocalKey > conn.config.Key
|
||||
var remoteConn *ice.Conn
|
||||
if isControlling {
|
||||
remoteConn, err = conn.agent.Dial(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
|
||||
} else {
|
||||
remoteConn, err = conn.agent.Accept(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
if runtime.GOOS == "ios" {
|
||||
runtime.GC()
|
||||
}
|
||||
|
||||
// dynamically set remote WireGuard port if other side specified a different one from the default one
|
||||
remoteWgPort := iface.DefaultWgPort
|
||||
if remoteOfferAnswer.WgListenPort != 0 {
|
||||
remoteWgPort = remoteOfferAnswer.WgListenPort
|
||||
if conn.onConnected != nil {
|
||||
conn.onConnected(conn.config.Key, remoteRosenpassPubKey, ipNet.IP.String(), remoteRosenpassAddr)
|
||||
}
|
||||
|
||||
// the ice connection has been established successfully so we are ready to start the proxy
|
||||
remoteAddr, err := conn.configureConnection(remoteConn, remoteWgPort, remoteOfferAnswer.RosenpassPubKey,
|
||||
remoteOfferAnswer.RosenpassAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("connected to peer %s, endpoint address: %s", conn.config.Key, remoteAddr.String())
|
||||
|
||||
// wait until connection disconnected or has been closed externally (upper layer, e.g. engine)
|
||||
select {
|
||||
case <-conn.closeCh:
|
||||
// closed externally
|
||||
return NewConnectionClosedError(conn.config.Key)
|
||||
case <-conn.ctx.Done():
|
||||
// disconnected from the remote peer
|
||||
return NewConnectionDisconnectedError(conn.config.Key)
|
||||
}
|
||||
}
|
||||
|
||||
func isRelayCandidate(candidate ice.Candidate) bool {
|
||||
return candidate.Type() == ice.CandidateTypeRelay
|
||||
}
|
||||
|
||||
func (conn *Conn) AddBeforeAddPeerHook(hook BeforeAddPeerHookFunc) {
|
||||
conn.beforeAddPeerHooks = append(conn.beforeAddPeerHooks, hook)
|
||||
}
|
||||
|
||||
func (conn *Conn) AddAfterRemovePeerHook(hook AfterRemovePeerHookFunc) {
|
||||
conn.afterRemovePeerHooks = append(conn.afterRemovePeerHooks, hook)
|
||||
return nil
|
||||
}
|
||||
|
||||
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
||||
@@ -495,6 +629,17 @@ func (conn *Conn) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) {
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) waitForDisconnection() error {
|
||||
select {
|
||||
case <-conn.closeCh:
|
||||
// closed externally
|
||||
return NewConnectionClosedError(conn.config.Key)
|
||||
case <-conn.ctx.Done():
|
||||
// disconnected from the remote peer
|
||||
return NewConnectionDisconnectedError(conn.config.Key)
|
||||
}
|
||||
}
|
||||
|
||||
// cleanup closes all open resources and sets status to StatusDisconnected
|
||||
func (conn *Conn) cleanup() error {
|
||||
log.Debugf("trying to cleanup %s", conn.config.Key)
|
||||
@@ -565,36 +710,6 @@ func (conn *Conn) cleanup() error {
|
||||
return err3
|
||||
}
|
||||
|
||||
// SetSignalOffer sets a handler function to be triggered by Conn when a new connection offer has to be signalled to the remote peer
|
||||
func (conn *Conn) SetSignalOffer(handler func(offer OfferAnswer) error) {
|
||||
conn.signalOffer = handler
|
||||
}
|
||||
|
||||
// SetOnConnected sets a handler function to be triggered by Conn when a new connection to a remote peer established
|
||||
func (conn *Conn) SetOnConnected(handler func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)) {
|
||||
conn.onConnected = handler
|
||||
}
|
||||
|
||||
// SetOnDisconnected sets a handler function to be triggered by Conn when a connection to a remote disconnected
|
||||
func (conn *Conn) SetOnDisconnected(handler func(remotePeer string, wgIP string)) {
|
||||
conn.onDisconnected = handler
|
||||
}
|
||||
|
||||
// SetSignalAnswer sets a handler function to be triggered by Conn when a new connection answer has to be signalled to the remote peer
|
||||
func (conn *Conn) SetSignalAnswer(handler func(answer OfferAnswer) error) {
|
||||
conn.signalAnswer = handler
|
||||
}
|
||||
|
||||
// SetSignalCandidate sets a handler function to be triggered by Conn when a new ICE local connection candidate has to be signalled to the remote peer
|
||||
func (conn *Conn) SetSignalCandidate(handler func(candidate ice.Candidate) error) {
|
||||
conn.signalCandidate = handler
|
||||
}
|
||||
|
||||
// SetSendSignalMessage sets a handler function to be triggered by Conn when there is new message to send via signal
|
||||
func (conn *Conn) SetSendSignalMessage(handler func(message *sProto.Message) error) {
|
||||
conn.sendSignalMessage = handler
|
||||
}
|
||||
|
||||
// onICECandidate is a callback attached to an ICE Agent to receive new local connection candidates
|
||||
// and then signals them to the remote peer
|
||||
func (conn *Conn) onICECandidate(candidate ice.Candidate) {
|
||||
@@ -679,106 +794,20 @@ func (conn *Conn) sendOffer() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = conn.signalOffer(OfferAnswer{
|
||||
oa := OfferAnswer{
|
||||
IceCredentials: IceCredentials{localUFrag, localPwd},
|
||||
WgListenPort: conn.config.LocalWgPort,
|
||||
Version: version.NetbirdVersion(),
|
||||
RosenpassPubKey: conn.config.RosenpassPubKey,
|
||||
RosenpassAddr: conn.config.RosenpassAddr,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes this peer Conn issuing a close event to the Conn closeCh
|
||||
func (conn *Conn) Close() error {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
select {
|
||||
case conn.closeCh <- struct{}{}:
|
||||
return nil
|
||||
default:
|
||||
// probably could happen when peer has been added and removed right after not even starting to connect
|
||||
// todo further investigate
|
||||
// this really happens due to unordered messages coming from management
|
||||
// more importantly it causes inconsistency -> 2 Conn objects for the same peer
|
||||
// e.g. this flow:
|
||||
// update from management has peers: [1,2,3,4]
|
||||
// engine creates a Conn for peers: [1,2,3,4] and schedules Open in ~1sec
|
||||
// before conn.Open() another update from management arrives with peers: [1,2,3]
|
||||
// engine removes peer 4 and calls conn.Close() which does nothing (this default clause)
|
||||
// before conn.Open() another update from management arrives with peers: [1,2,3,4,5]
|
||||
// engine adds a new Conn for 4 and 5
|
||||
// therefore peer 4 has 2 Conn objects
|
||||
log.Warnf("Connection has been already closed or attempted closing not started connection %s", conn.config.Key)
|
||||
return NewConnectionAlreadyClosed(conn.config.Key)
|
||||
relayIPAddress, err := conn.relayManager.RelayAddress()
|
||||
if err == nil {
|
||||
oa.RelaySrvAddress = relayIPAddress.String()
|
||||
}
|
||||
}
|
||||
|
||||
// Status returns current status of the Conn
|
||||
func (conn *Conn) Status() ConnStatus {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
return conn.status
|
||||
}
|
||||
|
||||
// OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
||||
// doesn't block, discards the message if connection wasn't ready
|
||||
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool {
|
||||
log.Debugf("OnRemoteOffer from peer %s on status %s", conn.config.Key, conn.status.String())
|
||||
|
||||
select {
|
||||
case conn.remoteOffersCh <- offer:
|
||||
return true
|
||||
default:
|
||||
log.Debugf("OnRemoteOffer skipping message from peer %s on status %s because is not ready", conn.config.Key, conn.status.String())
|
||||
// connection might not be ready yet to receive so we ignore the message
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
||||
// doesn't block, discards the message if connection wasn't ready
|
||||
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool {
|
||||
log.Debugf("OnRemoteAnswer from peer %s on status %s", conn.config.Key, conn.status.String())
|
||||
|
||||
select {
|
||||
case conn.remoteAnswerCh <- answer:
|
||||
return true
|
||||
default:
|
||||
// connection might not be ready yet to receive so we ignore the message
|
||||
log.Debugf("OnRemoteAnswer skipping message from peer %s on status %s because is not ready", conn.config.Key, conn.status.String())
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
||||
func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) {
|
||||
log.Debugf("OnRemoteCandidate from peer %s -> %s", conn.config.Key, candidate.String())
|
||||
go func() {
|
||||
conn.mu.Lock()
|
||||
defer conn.mu.Unlock()
|
||||
|
||||
if conn.agent == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if candidateViaRoutes(candidate, haRoutes) {
|
||||
return
|
||||
}
|
||||
|
||||
err := conn.agent.AddRemoteCandidate(candidate)
|
||||
if err != nil {
|
||||
log.Errorf("error while handling remote candidate from peer %s", conn.config.Key)
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (conn *Conn) GetKey() string {
|
||||
return conn.config.Key
|
||||
return conn.signalOffer(oa)
|
||||
}
|
||||
|
||||
func (conn *Conn) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool {
|
||||
@@ -788,6 +817,109 @@ func (conn *Conn) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (conn *Conn) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
|
||||
var remoteOfferAnswer OfferAnswer
|
||||
select {
|
||||
case remoteOfferAnswer = <-conn.remoteOffersCh:
|
||||
// received confirmation from the remote peer -> ready to proceed
|
||||
err := conn.sendAnswer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
case remoteOfferAnswer = <-conn.remoteAnswerCh:
|
||||
case <-time.After(conn.config.Timeout):
|
||||
return nil, NewConnectionTimeoutError(conn.config.Key, conn.config.Timeout)
|
||||
case <-conn.closeCh:
|
||||
// closed externally
|
||||
return nil, NewConnectionClosedError(conn.config.Key)
|
||||
}
|
||||
|
||||
return &remoteOfferAnswer, nil
|
||||
}
|
||||
|
||||
func (conn *Conn) turnAgentDial(remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
|
||||
isControlling := conn.config.LocalKey > conn.config.Key
|
||||
if isControlling {
|
||||
return conn.agent.Dial(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
|
||||
} else {
|
||||
return conn.agent.Accept(conn.ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
|
||||
}
|
||||
}
|
||||
|
||||
func (conn *Conn) setupRelayConnection(remoteOfferAnswer *OfferAnswer) bool {
|
||||
if !isRelaySupported(remoteOfferAnswer) {
|
||||
return false
|
||||
}
|
||||
|
||||
currentRelayAddress, err := conn.relayManager.RelayAddress()
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
conn.preferredRelayServer(currentRelayAddress.String(), remoteOfferAnswer.RelaySrvAddress)
|
||||
relayConn, err := conn.relayManager.OpenConn(remoteOfferAnswer.RelaySrvAddress, conn.config.Key)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
err = conn.configureWgConnectionForRelay(relayConn, remoteOfferAnswer.RosenpassPubKey, remoteOfferAnswer.RosenpassAddr)
|
||||
if err != nil {
|
||||
log.Errorf("failed to configure WireGuard connection for relay: %s", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (conn *Conn) preferredRelayServer(myRelayAddress, remoteRelayAddress string) string {
|
||||
if conn.config.LocalKey > conn.config.Key {
|
||||
return myRelayAddress
|
||||
}
|
||||
return remoteRelayAddress
|
||||
}
|
||||
|
||||
func (conn *Conn) setupICEConnection(remoteOfferAnswer *OfferAnswer, relayOperate bool) error {
|
||||
var preferredCandidateTypes []ice.CandidateType
|
||||
if relayOperate {
|
||||
preferredCandidateTypes = candidateTypesP2P()
|
||||
} else {
|
||||
preferredCandidateTypes = candidateTypes()
|
||||
}
|
||||
|
||||
err := conn.reCreateAgent(preferredCandidateTypes)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
err = conn.agent.GatherCandidates()
|
||||
if err != nil {
|
||||
return fmt.Errorf("gather candidates: %v", err)
|
||||
}
|
||||
|
||||
// will block until connection succeeded
|
||||
// but it won't release if ICE Agent went into Disconnected or Failed state,
|
||||
// so we have to cancel it with the provided context once agent detected a broken connection
|
||||
remoteConn, err := conn.turnAgentDial(remoteOfferAnswer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// dynamically set remote WireGuard port if other side specified a different one from the default one
|
||||
remoteWgPort := iface.DefaultWgPort
|
||||
if remoteOfferAnswer.WgListenPort != 0 {
|
||||
remoteWgPort = remoteOfferAnswer.WgListenPort
|
||||
}
|
||||
|
||||
// the ice connection has been established successfully so we are ready to start the proxy
|
||||
remoteAddr, err := conn.configureConnection(remoteConn, remoteWgPort, remoteOfferAnswer.RosenpassPubKey,
|
||||
remoteOfferAnswer.RosenpassAddr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("connected to peer %s, endpoint address: %s", conn.config.Key, remoteAddr.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) {
|
||||
relatedAdd := candidate.RelatedAddress()
|
||||
return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{
|
||||
@@ -827,3 +959,31 @@ func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func candidateTypes() []ice.CandidateType {
|
||||
if hasICEForceRelayConn() {
|
||||
return []ice.CandidateType{ice.CandidateTypeRelay}
|
||||
}
|
||||
// TODO: remove this once we have refactored userspace proxy into the bind package
|
||||
if runtime.GOOS == "ios" {
|
||||
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive}
|
||||
}
|
||||
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay}
|
||||
}
|
||||
|
||||
func candidateTypesP2P() []ice.CandidateType {
|
||||
return []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive}
|
||||
}
|
||||
|
||||
func isRelayCandidate(candidate ice.Candidate) bool {
|
||||
return candidate.Type() == ice.CandidateTypeRelay
|
||||
}
|
||||
|
||||
// todo check my side too
|
||||
func isRelaySupported(answer *OfferAnswer) bool {
|
||||
return answer.RelaySrvAddress != ""
|
||||
}
|
||||
|
||||
func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {
|
||||
return remoteRosenpassPubKey != nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user