mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 16:26:38 +00:00
fix: reconnects
This commit is contained in:
32
connection/cond.go
Normal file
32
connection/cond.go
Normal file
@@ -0,0 +1,32 @@
|
|||||||
|
package connection
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
// A Cond is a condition variable like sync.Cond, but using a channel so we can use select.
|
||||||
|
type Cond struct {
|
||||||
|
once sync.Once
|
||||||
|
C chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCond creates a new condition variable.
|
||||||
|
func NewCond() *Cond {
|
||||||
|
return &Cond{C: make(chan struct{})}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do runs f if the condition hasn't been signaled yet. Afterwards it will be signaled.
|
||||||
|
func (c *Cond) Do(f func()) {
|
||||||
|
c.once.Do(func() {
|
||||||
|
f()
|
||||||
|
close(c.C)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Signal closes the condition variable channel.
|
||||||
|
func (c *Cond) Signal() {
|
||||||
|
c.Do(func() {})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait waits for the condition variable channel to close.
|
||||||
|
func (c *Cond) Wait() {
|
||||||
|
<-c.C
|
||||||
|
}
|
||||||
@@ -53,17 +53,17 @@ type Connection struct {
|
|||||||
// remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection
|
// remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection
|
||||||
remoteAuthChannel chan IceCredentials
|
remoteAuthChannel chan IceCredentials
|
||||||
|
|
||||||
closeChannel chan struct{}
|
|
||||||
closedChannel chan struct{}
|
|
||||||
|
|
||||||
// agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer
|
// agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer
|
||||||
agent *ice.Agent
|
agent *ice.Agent
|
||||||
|
|
||||||
wgConn net.Conn
|
wgConn net.Conn
|
||||||
// mux is used to ensure exclusive access to Open() and Close() operations on connection
|
|
||||||
mux sync.Mutex
|
|
||||||
// isActive indicates whether connection is active or not.
|
// isActive indicates whether connection is active or not.
|
||||||
isActive bool
|
isActive bool
|
||||||
|
|
||||||
|
connected *Cond
|
||||||
|
closeCond *Cond
|
||||||
|
|
||||||
|
remoteAuthCond sync.Once
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConnection(config ConnConfig,
|
func NewConnection(config ConnConfig,
|
||||||
@@ -78,19 +78,15 @@ func NewConnection(config ConnConfig,
|
|||||||
signalOffer: signalOffer,
|
signalOffer: signalOffer,
|
||||||
signalAnswer: signalAnswer,
|
signalAnswer: signalAnswer,
|
||||||
remoteAuthChannel: make(chan IceCredentials, 1),
|
remoteAuthChannel: make(chan IceCredentials, 1),
|
||||||
closeChannel: make(chan struct{}),
|
closeCond: NewCond(),
|
||||||
closedChannel: make(chan struct{}),
|
connected: NewCond(),
|
||||||
agent: nil,
|
agent: nil,
|
||||||
isActive: false,
|
isActive: false,
|
||||||
mux: sync.Mutex{},
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Connection) Close() error {
|
func (conn *Connection) Close() error {
|
||||||
|
|
||||||
conn.mux.Lock()
|
|
||||||
defer conn.mux.Unlock()
|
|
||||||
|
|
||||||
if !conn.isActive {
|
if !conn.isActive {
|
||||||
log.Infof("connection to peer %s has been already closed, skipping", conn.Config.RemoteWgKey.String())
|
log.Infof("connection to peer %s has been already closed, skipping", conn.Config.RemoteWgKey.String())
|
||||||
return nil
|
return nil
|
||||||
@@ -108,10 +104,6 @@ func (conn *Connection) Close() error {
|
|||||||
|
|
||||||
log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String())
|
log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
close(conn.closeChannel)
|
|
||||||
|
|
||||||
log.Debugf("closed connection to peer %s", conn.Config.RemoteWgKey.String())
|
|
||||||
|
|
||||||
conn.isActive = false
|
conn.isActive = false
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@@ -119,13 +111,10 @@ func (conn *Connection) Close() error {
|
|||||||
|
|
||||||
// Open opens connection to a remote peer.
|
// Open opens connection to a remote peer.
|
||||||
// Will block until the connection has successfully established
|
// Will block until the connection has successfully established
|
||||||
func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) {
|
func (conn *Connection) Open(timeout time.Duration) error {
|
||||||
|
|
||||||
log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
conn.mux.Lock()
|
|
||||||
defer conn.mux.Unlock()
|
|
||||||
|
|
||||||
// create an ice.Agent that will be responsible for negotiating and establishing actual peer-to-peer connection
|
// create an ice.Agent that will be responsible for negotiating and establishing actual peer-to-peer connection
|
||||||
a, err := ice.NewAgent(&ice.AgentConfig{
|
a, err := ice.NewAgent(&ice.AgentConfig{
|
||||||
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
|
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
|
||||||
@@ -136,26 +125,26 @@ func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) {
|
|||||||
log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = conn.listenOnLocalCandidates()
|
err = conn.listenOnLocalCandidates()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
err = conn.listenOnConnectionStateChanges()
|
err = conn.listenOnConnectionStateChanges()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
err = conn.signalCredentials()
|
err = conn.signalCredentials()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
@@ -166,18 +155,18 @@ func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) {
|
|||||||
|
|
||||||
err = conn.agent.GatherCandidates()
|
err = conn.agent.GatherCandidates()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
remoteConn, err := conn.openConnectionToRemote(remoteAuth.isControlling, remoteAuth)
|
remoteConn, err := conn.openConnectionToRemote(remoteAuth.isControlling, remoteAuth)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed establishing connection with the remote peer %s %s", conn.Config.RemoteWgKey.String(), err)
|
log.Errorf("failed establishing connection with the remote peer %s %s", conn.Config.RemoteWgKey.String(), err)
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
wgConn, err := conn.createWireguardProxy()
|
wgConn, err := conn.createWireguardProxy()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
conn.wgConn = *wgConn
|
conn.wgConn = *wgConn
|
||||||
|
|
||||||
@@ -187,10 +176,18 @@ func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) {
|
|||||||
log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String())
|
log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
conn.isActive = true
|
conn.isActive = true
|
||||||
|
|
||||||
return conn.closedChannel, nil
|
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
return nil, fmt.Errorf("timeout of %vs exceeded while waiting for connection to peer %s", timeout.Seconds(), conn.Config.RemoteWgKey.String())
|
conn.closeCond.Do(func() {
|
||||||
|
log.Errorf("CLOSED CONDITION ON TIMEOUT")
|
||||||
|
err = conn.Close()
|
||||||
|
})
|
||||||
|
return fmt.Errorf("timeout of %vs exceeded while waiting for the remote peer %s", timeout.Seconds(), conn.Config.RemoteWgKey.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-conn.closeCond.C:
|
||||||
|
log.Errorf("CLOSED CONDITION RETURNING ERR TO ENGINE")
|
||||||
|
return fmt.Errorf("connection to peer %s has been closed while starting", conn.Config.RemoteWgKey.String())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -202,7 +199,9 @@ func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.remoteAuthChannel <- remoteAuth
|
conn.remoteAuthCond.Do(func() {
|
||||||
|
conn.remoteAuthChannel <- remoteAuth
|
||||||
|
})
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -215,7 +214,9 @@ func (conn *Connection) OnOffer(remoteAuth IceCredentials) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.remoteAuthChannel <- remoteAuth
|
conn.remoteAuthCond.Do(func() {
|
||||||
|
conn.remoteAuthChannel <- remoteAuth
|
||||||
|
})
|
||||||
|
|
||||||
uFrag, pwd, err := conn.agent.GetLocalUserCredentials()
|
uFrag, pwd, err := conn.agent.GetLocalUserCredentials()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -302,20 +303,23 @@ func (conn *Connection) listenOnConnectionStateChanges() error {
|
|||||||
err := conn.agent.OnConnectionStateChange(func(state ice.ConnectionState) {
|
err := conn.agent.OnConnectionStateChange(func(state ice.ConnectionState) {
|
||||||
log.Debugf("ICE Connection State has changed: %s", state.String())
|
log.Debugf("ICE Connection State has changed: %s", state.String())
|
||||||
if state == ice.ConnectionStateConnected {
|
if state == ice.ConnectionStateConnected {
|
||||||
// once the connection has been established we can check the selected candidate pair
|
// closed the connection has been established we can check the selected candidate pair
|
||||||
pair, err := conn.agent.GetSelectedCandidatePair()
|
pair, err := conn.agent.GetSelectedCandidatePair()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed selecting active ICE candidate pair %s", err)
|
log.Errorf("failed selecting active ICE candidate pair %s", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair)
|
log.Debugf("closed to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair)
|
||||||
} else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed {
|
} else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed {
|
||||||
// todo do we really wanna have a connection restart within connection itself? Think of moving it outside
|
// todo do we really wanna have a connection restart within connection itself? Think of moving it outside
|
||||||
err := conn.Close()
|
|
||||||
if err != nil {
|
conn.closeCond.Do(func() {
|
||||||
log.Errorf("failed closing connection fo peer %s %s", conn.Config.RemoteWgKey.String(), err.Error())
|
log.Errorf("CLOSED CONNDITION TRIGGERED ON STATE %s", state.String())
|
||||||
}
|
err := conn.Close()
|
||||||
close(conn.closedChannel)
|
if err != nil {
|
||||||
|
log.Errorf("failed closing connection fo peer %s %s", conn.Config.RemoteWgKey.String(), err.Error())
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
@@ -363,10 +367,8 @@ func (conn *Connection) proxyToRemotePeer(wgConn net.Conn, remoteConn *ice.Conn)
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnln("Error writing to remote peer: ", err.Error())
|
log.Warnln("Error writing to remote peer: ", err.Error())
|
||||||
}
|
}
|
||||||
case _, ok := <-conn.closeChannel:
|
case <-conn.closeCond.C:
|
||||||
if !ok {
|
log.Infof("stopped proxying to remote peer %s due to closed connection", conn.Config.RemoteWgKey.String())
|
||||||
log.Infof("stopped proxying to remote peer %s", conn.Config.RemoteWgKey.String())
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -389,10 +391,8 @@ func (conn *Connection) proxyToLocalWireguard(wgConn net.Conn, remoteConn *ice.C
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed writing to local Wireguard instance %s", err)
|
log.Errorf("failed writing to local Wireguard instance %s", err)
|
||||||
}
|
}
|
||||||
case _, ok := <-conn.closeChannel:
|
case <-conn.closeCond.C:
|
||||||
if !ok {
|
log.Infof("stopped proxying from remote peer %s due to closed connection", conn.Config.RemoteWgKey.String())
|
||||||
log.Infof("stopped proxying from remote peer %s", conn.Config.RemoteWgKey.String())
|
|
||||||
}
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -77,20 +77,12 @@ func (e *Engine) Start(privateKey string, peers []Peer) error {
|
|||||||
go func() {
|
go func() {
|
||||||
|
|
||||||
operation := func() error {
|
operation := func() error {
|
||||||
_, closed, err := e.openPeerConnection(*wgPort, myKey, peer)
|
_, err := e.openPeerConnection(*wgPort, myKey, peer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
e.conns[peer.WgPubKey] = nil
|
e.conns[peer.WgPubKey] = nil
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
select {
|
|
||||||
case _, ok := <-closed:
|
|
||||||
if !ok {
|
|
||||||
e.conns[peer.WgPubKey] = nil
|
|
||||||
return fmt.Errorf("connection to peer %s has been closed", peer.WgPubKey)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
err = backoff.Retry(operation, backoff.NewExponentialBackOff())
|
err = backoff.Retry(operation, backoff.NewExponentialBackOff())
|
||||||
@@ -105,7 +97,7 @@ func (e *Engine) Start(privateKey string, peers []Peer) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Connection, chan struct{}, error) {
|
func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Connection, error) {
|
||||||
|
|
||||||
remoteKey, _ := wgtypes.ParseKey(peer.WgPubKey)
|
remoteKey, _ := wgtypes.ParseKey(peer.WgPubKey)
|
||||||
connConfig := &ConnConfig{
|
connConfig := &ConnConfig{
|
||||||
@@ -132,12 +124,12 @@ func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*
|
|||||||
conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer)
|
conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer)
|
||||||
e.conns[remoteKey.String()] = conn
|
e.conns[remoteKey.String()] = conn
|
||||||
// blocks until the connection is open (or timeout)
|
// blocks until the connection is open (or timeout)
|
||||||
closedCh, err := conn.Open(60 * time.Second)
|
err := conn.Open(20 * time.Second)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error())
|
log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error())
|
||||||
return nil, nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return conn, closedCh, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {
|
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user