mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-19 08:46:38 +00:00
fix: add connection state to handle reconnects
This commit is contained in:
@@ -8,6 +8,7 @@ import (
|
|||||||
"github.com/wiretrustee/wiretrustee/util"
|
"github.com/wiretrustee/wiretrustee/util"
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"net"
|
"net"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -58,6 +59,10 @@ type Connection struct {
|
|||||||
agent *ice.Agent
|
agent *ice.Agent
|
||||||
|
|
||||||
wgConn net.Conn
|
wgConn net.Conn
|
||||||
|
// mux is used to ensure exclusive access to Open() and Close() operations on connection
|
||||||
|
mux sync.Mutex
|
||||||
|
// isActive indicates whether connection is active or not.
|
||||||
|
isActive bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConnection(config ConnConfig,
|
func NewConnection(config ConnConfig,
|
||||||
@@ -72,13 +77,20 @@ func NewConnection(config ConnConfig,
|
|||||||
signalOffer: signalOffer,
|
signalOffer: signalOffer,
|
||||||
signalAnswer: signalAnswer,
|
signalAnswer: signalAnswer,
|
||||||
remoteAuthChannel: make(chan IceCredentials, 1),
|
remoteAuthChannel: make(chan IceCredentials, 1),
|
||||||
closeChannel: make(chan bool, 1),
|
closeChannel: make(chan bool, 2),
|
||||||
agent: nil,
|
agent: nil,
|
||||||
|
isActive: false,
|
||||||
|
mux: sync.Mutex{},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Connection) Close() error {
|
func (conn *Connection) Close() error {
|
||||||
|
|
||||||
|
conn.mux.Lock()
|
||||||
|
defer conn.mux.Unlock()
|
||||||
|
|
||||||
|
log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
conn.closeChannel <- true
|
conn.closeChannel <- true
|
||||||
conn.closeChannel <- true
|
conn.closeChannel <- true
|
||||||
|
|
||||||
@@ -92,6 +104,10 @@ func (conn *Connection) Close() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Debugf("closed connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
|
conn.isActive = false
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -99,6 +115,11 @@ func (conn *Connection) Close() error {
|
|||||||
// Will block until the connection has successfully established
|
// Will block until the connection has successfully established
|
||||||
func (conn *Connection) Open() error {
|
func (conn *Connection) Open() error {
|
||||||
|
|
||||||
|
log.Debugf("opening connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
|
conn.mux.Lock()
|
||||||
|
defer conn.mux.Unlock()
|
||||||
|
|
||||||
wgConn, err := conn.createWireguardProxy()
|
wgConn, err := conn.createWireguardProxy()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
@@ -146,17 +167,34 @@ func (conn *Connection) Open() error {
|
|||||||
go conn.proxyToRemotePeer(*wgConn, remoteConn)
|
go conn.proxyToRemotePeer(*wgConn, remoteConn)
|
||||||
go conn.proxyToLocalWireguard(*wgConn, remoteConn)
|
go conn.proxyToLocalWireguard(*wgConn, remoteConn)
|
||||||
|
|
||||||
|
log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
|
conn.isActive = true
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error {
|
func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error {
|
||||||
log.Debugf("onAnswer from peer %s", conn.Config.RemoteWgKey.String())
|
log.Debugf("OnAnswer from peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
|
if conn.isActive {
|
||||||
|
log.Debugf("connection is active, ignoring OnAnswer from peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
conn.remoteAuthChannel <- remoteAuth
|
conn.remoteAuthChannel <- remoteAuth
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Connection) OnOffer(remoteAuth IceCredentials) error {
|
func (conn *Connection) OnOffer(remoteAuth IceCredentials) error {
|
||||||
|
|
||||||
|
log.Debugf("OnOffer from peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
|
||||||
|
if conn.isActive {
|
||||||
|
log.Debugf("connection is active, ignoring OnOffer from peer %s", conn.Config.RemoteWgKey.String())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
conn.remoteAuthChannel <- remoteAuth
|
conn.remoteAuthChannel <- remoteAuth
|
||||||
|
|
||||||
uFrag, pwd, err := conn.agent.GetLocalUserCredentials()
|
uFrag, pwd, err := conn.agent.GetLocalUserCredentials()
|
||||||
@@ -174,7 +212,7 @@ func (conn *Connection) OnOffer(remoteAuth IceCredentials) error {
|
|||||||
|
|
||||||
func (conn *Connection) OnRemoteCandidate(candidate ice.Candidate) error {
|
func (conn *Connection) OnRemoteCandidate(candidate ice.Candidate) error {
|
||||||
|
|
||||||
log.Debugf("onRemoteCandidate from peer %s -> %s", conn.Config.RemoteWgKey.String(), candidate.String())
|
log.Debugf("onRemoteCandidate from peer %s -> %s", conn.Config.RemoteWgKey.String(), candidate.String())
|
||||||
|
|
||||||
err := conn.agent.AddRemoteCandidate(candidate)
|
err := conn.agent.AddRemoteCandidate(candidate)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -251,6 +289,7 @@ func (conn *Connection) listenOnConnectionStateChanges() error {
|
|||||||
}
|
}
|
||||||
log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair)
|
log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair)
|
||||||
} else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed {
|
} else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed {
|
||||||
|
// todo do we really wanna have a connection restart within connection itself? Think of moving it outside
|
||||||
err := util.Retry(15, time.Second, func() error {
|
err := util.Retry(15, time.Second, func() error {
|
||||||
return conn.Restart()
|
return conn.Restart()
|
||||||
}, func(err error) {
|
}, func(err error) {
|
||||||
|
|||||||
Reference in New Issue
Block a user