mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-19 00:36:38 +00:00
Compare commits
5 Commits
fix/androi
...
wgwatcher-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
dd13b8f27e | ||
|
|
961a942339 | ||
|
|
b7cf77d324 | ||
|
|
ef5e417cb7 | ||
|
|
759544f2c3 |
@@ -135,21 +135,11 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
|
|||||||
semaphore: semaphore,
|
semaphore: semaphore,
|
||||||
}
|
}
|
||||||
|
|
||||||
rFns := WorkerRelayCallbacks{
|
|
||||||
OnConnReady: conn.relayConnectionIsReady,
|
|
||||||
OnDisconnected: conn.onWorkerRelayStateDisconnected,
|
|
||||||
}
|
|
||||||
|
|
||||||
wFns := WorkerICECallbacks{
|
|
||||||
OnConnReady: conn.iCEConnectionIsReady,
|
|
||||||
OnStatusChanged: conn.onWorkerICEStateDisconnected,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctrl := isController(config)
|
ctrl := isController(config)
|
||||||
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager, rFns)
|
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)
|
||||||
|
|
||||||
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
||||||
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns)
|
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -304,7 +294,7 @@ func (conn *Conn) GetKey() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
||||||
func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
|
func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
@@ -376,7 +366,7 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
|
|||||||
}
|
}
|
||||||
|
|
||||||
// todo review to make sense to handle connecting and disconnected status also?
|
// todo review to make sense to handle connecting and disconnected status also?
|
||||||
func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
func (conn *Conn) onICEStateDisconnected() {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
@@ -384,7 +374,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.log.Tracef("ICE connection state changed to %s", newState)
|
conn.log.Tracef("ICE connection state changed to disconnected")
|
||||||
|
|
||||||
if conn.wgProxyICE != nil {
|
if conn.wgProxyICE != nil {
|
||||||
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
||||||
@@ -404,10 +394,11 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
|||||||
conn.currentConnPriority = connPriorityRelay
|
conn.currentConnPriority = connPriorityRelay
|
||||||
}
|
}
|
||||||
|
|
||||||
changed := conn.statusICE.Get() != newState && newState != StatusConnecting
|
changed := conn.statusICE.Get() != StatusDisconnected
|
||||||
conn.statusICE.Set(newState)
|
if changed {
|
||||||
|
conn.guard.SetICEConnDisconnected()
|
||||||
conn.guard.SetICEConnDisconnected(changed)
|
}
|
||||||
|
conn.statusICE.Set(StatusDisconnected)
|
||||||
|
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
@@ -422,7 +413,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
|
func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
@@ -474,7 +465,7 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
|
|||||||
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) onWorkerRelayStateDisconnected() {
|
func (conn *Conn) onRelayDisconnected() {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
@@ -497,8 +488,10 @@ func (conn *Conn) onWorkerRelayStateDisconnected() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
changed := conn.statusRelay.Get() != StatusDisconnected
|
changed := conn.statusRelay.Get() != StatusDisconnected
|
||||||
|
if changed {
|
||||||
|
conn.guard.SetRelayedConnDisconnected()
|
||||||
|
}
|
||||||
conn.statusRelay.Set(StatusDisconnected)
|
conn.statusRelay.Set(StatusDisconnected)
|
||||||
conn.guard.SetRelayedConnDisconnected(changed)
|
|
||||||
|
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
|
|||||||
@@ -29,8 +29,8 @@ type Guard struct {
|
|||||||
isConnectedOnAllWay isConnectedFunc
|
isConnectedOnAllWay isConnectedFunc
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
srWatcher *SRWatcher
|
srWatcher *SRWatcher
|
||||||
relayedConnDisconnected chan bool
|
relayedConnDisconnected chan struct{}
|
||||||
iCEConnDisconnected chan bool
|
iCEConnDisconnected chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
|
func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
|
||||||
@@ -41,8 +41,8 @@ func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc,
|
|||||||
isConnectedOnAllWay: isConnectedFn,
|
isConnectedOnAllWay: isConnectedFn,
|
||||||
timeout: timeout,
|
timeout: timeout,
|
||||||
srWatcher: srWatcher,
|
srWatcher: srWatcher,
|
||||||
relayedConnDisconnected: make(chan bool, 1),
|
relayedConnDisconnected: make(chan struct{}, 1),
|
||||||
iCEConnDisconnected: make(chan bool, 1),
|
iCEConnDisconnected: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -54,16 +54,16 @@ func (g *Guard) Start(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Guard) SetRelayedConnDisconnected(changed bool) {
|
func (g *Guard) SetRelayedConnDisconnected() {
|
||||||
select {
|
select {
|
||||||
case g.relayedConnDisconnected <- changed:
|
case g.relayedConnDisconnected <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Guard) SetICEConnDisconnected(changed bool) {
|
func (g *Guard) SetICEConnDisconnected() {
|
||||||
select {
|
select {
|
||||||
case g.iCEConnDisconnected <- changed:
|
case g.iCEConnDisconnected <- struct{}{}:
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -96,19 +96,13 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
|
|||||||
g.triggerOfferSending()
|
g.triggerOfferSending()
|
||||||
}
|
}
|
||||||
|
|
||||||
case changed := <-g.relayedConnDisconnected:
|
case <-g.relayedConnDisconnected:
|
||||||
if !changed {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
g.log.Debugf("Relay connection changed, reset reconnection ticker")
|
g.log.Debugf("Relay connection changed, reset reconnection ticker")
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
ticker = g.prepareExponentTicker(ctx)
|
ticker = g.prepareExponentTicker(ctx)
|
||||||
tickerChannel = ticker.C
|
tickerChannel = ticker.C
|
||||||
|
|
||||||
case changed := <-g.iCEConnDisconnected:
|
case <-g.iCEConnDisconnected:
|
||||||
if !changed {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
g.log.Debugf("ICE connection changed, reset reconnection ticker")
|
g.log.Debugf("ICE connection changed, reset reconnection ticker")
|
||||||
ticker.Stop()
|
ticker.Stop()
|
||||||
ticker = g.prepareExponentTicker(ctx)
|
ticker = g.prepareExponentTicker(ctx)
|
||||||
@@ -138,16 +132,10 @@ func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
|
|||||||
g.log.Infof("start listen for reconnect events...")
|
g.log.Infof("start listen for reconnect events...")
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case changed := <-g.relayedConnDisconnected:
|
case <-g.relayedConnDisconnected:
|
||||||
if !changed {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
g.log.Debugf("Relay connection changed, triggering reconnect")
|
g.log.Debugf("Relay connection changed, triggering reconnect")
|
||||||
g.triggerOfferSending()
|
g.triggerOfferSending()
|
||||||
case changed := <-g.iCEConnDisconnected:
|
case <-g.iCEConnDisconnected:
|
||||||
if !changed {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
g.log.Debugf("ICE state changed, try to send new offer")
|
g.log.Debugf("ICE state changed, try to send new offer")
|
||||||
g.triggerOfferSending()
|
g.triggerOfferSending()
|
||||||
case <-srReconnectedChan:
|
case <-srReconnectedChan:
|
||||||
|
|||||||
134
client/internal/peer/wg_watcher.go
Normal file
134
client/internal/peer/wg_watcher.go
Normal file
@@ -0,0 +1,134 @@
|
|||||||
|
package peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
wgHandshakePeriod = 3 * time.Minute
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
wgHandshakeOvertime = 30 * time.Second
|
||||||
|
checkPeriod = wgHandshakePeriod + wgHandshakeOvertime
|
||||||
|
)
|
||||||
|
|
||||||
|
type WGInterfaceStater interface {
|
||||||
|
GetStats(key string) (configurer.WGStats, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type WGWatcher struct {
|
||||||
|
log *log.Entry
|
||||||
|
wgIfaceStater WGInterfaceStater
|
||||||
|
peerKey string
|
||||||
|
|
||||||
|
ctxCancel context.CancelFunc
|
||||||
|
ctxLock sync.Mutex
|
||||||
|
waitGroup sync.WaitGroup
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string) *WGWatcher {
|
||||||
|
return &WGWatcher{
|
||||||
|
log: log,
|
||||||
|
wgIfaceStater: wgIfaceStater,
|
||||||
|
peerKey: peerKey,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnableWgWatcher starts the WireGuard watcher. If it is already enabled, it will return immediately and do nothing.
|
||||||
|
func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn func()) {
|
||||||
|
w.log.Debugf("enable WireGuard watcher")
|
||||||
|
w.ctxLock.Lock()
|
||||||
|
defer w.ctxLock.Unlock()
|
||||||
|
|
||||||
|
if w.ctxCancel != nil {
|
||||||
|
w.log.Errorf("WireGuard watcher already enabled")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, ctxCancel := context.WithCancel(parentCtx)
|
||||||
|
w.ctxCancel = ctxCancel
|
||||||
|
|
||||||
|
initialHandshake, err := w.wgState()
|
||||||
|
if err != nil {
|
||||||
|
w.log.Warnf("failed to read wg stats: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
w.waitGroup.Add(1)
|
||||||
|
go w.periodicHandshakeCheck(ctx, w.ctxCancel, onDisconnectedFn, initialHandshake)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DisableWgWatcher stops the WireGuard watcher and wait for the watcher to exit
|
||||||
|
func (w *WGWatcher) DisableWgWatcher() {
|
||||||
|
w.ctxLock.Lock()
|
||||||
|
defer w.ctxLock.Unlock()
|
||||||
|
|
||||||
|
if w.ctxCancel == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
w.log.Debugf("disable WireGuard watcher")
|
||||||
|
|
||||||
|
w.ctxCancel()
|
||||||
|
w.ctxCancel = nil
|
||||||
|
w.waitGroup.Wait()
|
||||||
|
}
|
||||||
|
|
||||||
|
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
|
||||||
|
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func(), initialHandshake time.Time) {
|
||||||
|
w.log.Debugf("WireGuard watcher started")
|
||||||
|
defer w.waitGroup.Done()
|
||||||
|
|
||||||
|
timer := time.NewTimer(wgHandshakeOvertime)
|
||||||
|
defer timer.Stop()
|
||||||
|
defer ctxCancel()
|
||||||
|
|
||||||
|
lastHandshake := initialHandshake
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
handshake, ok := w.handshakeCheck(lastHandshake)
|
||||||
|
if !ok {
|
||||||
|
onDisconnectedFn()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
timer.Reset(time.Until(handshake.Add(checkPeriod)))
|
||||||
|
lastHandshake = *handshake
|
||||||
|
case <-ctx.Done():
|
||||||
|
w.log.Debugf("WireGuard watcher stopped")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WGWatcher) wgState() (time.Time, error) {
|
||||||
|
wgState, err := w.wgIfaceStater.GetStats(w.peerKey)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}, err
|
||||||
|
}
|
||||||
|
return wgState.LastHandshake, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) {
|
||||||
|
handshake, err := w.wgState()
|
||||||
|
if err != nil {
|
||||||
|
w.log.Errorf("failed to read wg stats: %v", err)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake)
|
||||||
|
|
||||||
|
if handshake.Equal(lastHandshake) {
|
||||||
|
w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake)
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
return &handshake, true
|
||||||
|
}
|
||||||
98
client/internal/peer/wg_watcher_test.go
Normal file
98
client/internal/peer/wg_watcher_test.go
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
package peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MocWgIface struct {
|
||||||
|
initial bool
|
||||||
|
lastHandshake time.Time
|
||||||
|
stop bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MocWgIface) GetStats(key string) (configurer.WGStats, error) {
|
||||||
|
if !m.initial {
|
||||||
|
m.initial = true
|
||||||
|
return configurer.WGStats{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if !m.stop {
|
||||||
|
m.lastHandshake = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
stats := configurer.WGStats{
|
||||||
|
LastHandshake: m.lastHandshake,
|
||||||
|
}
|
||||||
|
|
||||||
|
return stats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MocWgIface) disconnect() {
|
||||||
|
m.stop = true
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWGWatcher_EnableWgWatcher(t *testing.T) {
|
||||||
|
checkPeriod = 5 * time.Second
|
||||||
|
wgHandshakeOvertime = 1 * time.Second
|
||||||
|
|
||||||
|
mlog := log.WithField("peer", "tet")
|
||||||
|
mocWgIface := &MocWgIface{}
|
||||||
|
watcher := NewWGWatcher(mlog, mocWgIface, "")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
onDisconnected := make(chan struct{}, 1)
|
||||||
|
watcher.EnableWgWatcher(ctx, func() {
|
||||||
|
mlog.Infof("onDisconnectedFn")
|
||||||
|
onDisconnected <- struct{}{}
|
||||||
|
})
|
||||||
|
|
||||||
|
// wait for initial reading
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
mocWgIface.disconnect()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-onDisconnected:
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
t.Errorf("timeout")
|
||||||
|
}
|
||||||
|
watcher.DisableWgWatcher()
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestWGWatcher_ReEnable(t *testing.T) {
|
||||||
|
checkPeriod = 5 * time.Second
|
||||||
|
wgHandshakeOvertime = 1 * time.Second
|
||||||
|
|
||||||
|
mlog := log.WithField("peer", "tet")
|
||||||
|
mocWgIface := &MocWgIface{}
|
||||||
|
watcher := NewWGWatcher(mlog, mocWgIface, "")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
onDisconnected := make(chan struct{}, 1)
|
||||||
|
|
||||||
|
watcher.EnableWgWatcher(ctx, func() {})
|
||||||
|
watcher.DisableWgWatcher()
|
||||||
|
|
||||||
|
watcher.EnableWgWatcher(ctx, func() {
|
||||||
|
onDisconnected <- struct{}{}
|
||||||
|
})
|
||||||
|
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
mocWgIface.disconnect()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-onDisconnected:
|
||||||
|
case <-time.After(10 * time.Second):
|
||||||
|
t.Errorf("timeout")
|
||||||
|
}
|
||||||
|
watcher.DisableWgWatcher()
|
||||||
|
}
|
||||||
@@ -31,20 +31,15 @@ type ICEConnInfo struct {
|
|||||||
RelayedOnLocal bool
|
RelayedOnLocal bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkerICECallbacks struct {
|
|
||||||
OnConnReady func(ConnPriority, ICEConnInfo)
|
|
||||||
OnStatusChanged func(ConnStatus)
|
|
||||||
}
|
|
||||||
|
|
||||||
type WorkerICE struct {
|
type WorkerICE struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
config ConnConfig
|
config ConnConfig
|
||||||
|
conn *Conn
|
||||||
signaler *Signaler
|
signaler *Signaler
|
||||||
iFaceDiscover stdnet.ExternalIFaceDiscover
|
iFaceDiscover stdnet.ExternalIFaceDiscover
|
||||||
statusRecorder *Status
|
statusRecorder *Status
|
||||||
hasRelayOnLocally bool
|
hasRelayOnLocally bool
|
||||||
conn WorkerICECallbacks
|
|
||||||
|
|
||||||
agent *ice.Agent
|
agent *ice.Agent
|
||||||
muxAgent sync.Mutex
|
muxAgent sync.Mutex
|
||||||
@@ -60,16 +55,16 @@ type WorkerICE struct {
|
|||||||
lastKnownState ice.ConnectionState
|
lastKnownState ice.ConnectionState
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) {
|
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *Conn, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) {
|
||||||
w := &WorkerICE{
|
w := &WorkerICE{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
log: log,
|
log: log,
|
||||||
config: config,
|
config: config,
|
||||||
|
conn: conn,
|
||||||
signaler: signaler,
|
signaler: signaler,
|
||||||
iFaceDiscover: ifaceDiscover,
|
iFaceDiscover: ifaceDiscover,
|
||||||
statusRecorder: statusRecorder,
|
statusRecorder: statusRecorder,
|
||||||
hasRelayOnLocally: hasRelayOnLocally,
|
hasRelayOnLocally: hasRelayOnLocally,
|
||||||
conn: callBacks,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
localUfrag, localPwd, err := icemaker.GenerateICECredentials()
|
localUfrag, localPwd, err := icemaker.GenerateICECredentials()
|
||||||
@@ -154,8 +149,8 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
Relayed: isRelayed(pair),
|
Relayed: isRelayed(pair),
|
||||||
RelayedOnLocal: isRelayCandidate(pair.Local),
|
RelayedOnLocal: isRelayCandidate(pair.Local),
|
||||||
}
|
}
|
||||||
w.log.Debugf("on ICE conn read to use ready")
|
w.log.Debugf("on ICE conn is ready to use")
|
||||||
go w.conn.OnConnReady(selectedPriority(pair), ci)
|
go w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
||||||
@@ -220,7 +215,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i
|
|||||||
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
|
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
|
||||||
if w.lastKnownState != ice.ConnectionStateDisconnected {
|
if w.lastKnownState != ice.ConnectionStateDisconnected {
|
||||||
w.lastKnownState = ice.ConnectionStateDisconnected
|
w.lastKnownState = ice.ConnectionStateDisconnected
|
||||||
w.conn.OnStatusChanged(StatusDisconnected)
|
w.conn.onICEStateDisconnected()
|
||||||
}
|
}
|
||||||
w.closeAgent(agentCancel)
|
w.closeAgent(agentCancel)
|
||||||
default:
|
default:
|
||||||
|
|||||||
@@ -6,52 +6,41 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
relayClient "github.com/netbirdio/netbird/relay/client"
|
relayClient "github.com/netbirdio/netbird/relay/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
|
||||||
wgHandshakePeriod = 3 * time.Minute
|
|
||||||
wgHandshakeOvertime = 30 * time.Second
|
|
||||||
)
|
|
||||||
|
|
||||||
type RelayConnInfo struct {
|
type RelayConnInfo struct {
|
||||||
relayedConn net.Conn
|
relayedConn net.Conn
|
||||||
rosenpassPubKey []byte
|
rosenpassPubKey []byte
|
||||||
rosenpassAddr string
|
rosenpassAddr string
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkerRelayCallbacks struct {
|
|
||||||
OnConnReady func(RelayConnInfo)
|
|
||||||
OnDisconnected func()
|
|
||||||
}
|
|
||||||
|
|
||||||
type WorkerRelay struct {
|
type WorkerRelay struct {
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
isController bool
|
isController bool
|
||||||
config ConnConfig
|
config ConnConfig
|
||||||
|
conn *Conn
|
||||||
relayManager relayClient.ManagerService
|
relayManager relayClient.ManagerService
|
||||||
callBacks WorkerRelayCallbacks
|
|
||||||
|
|
||||||
relayedConn net.Conn
|
relayedConn net.Conn
|
||||||
relayLock sync.Mutex
|
relayLock sync.Mutex
|
||||||
ctxWgWatch context.Context
|
|
||||||
ctxCancelWgWatch context.CancelFunc
|
|
||||||
ctxLock sync.Mutex
|
|
||||||
|
|
||||||
relaySupportedOnRemotePeer atomic.Bool
|
relaySupportedOnRemotePeer atomic.Bool
|
||||||
|
|
||||||
|
wgWatcher *WGWatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay {
|
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager relayClient.ManagerService) *WorkerRelay {
|
||||||
r := &WorkerRelay{
|
r := &WorkerRelay{
|
||||||
log: log,
|
log: log,
|
||||||
isController: ctrl,
|
isController: ctrl,
|
||||||
config: config,
|
config: config,
|
||||||
|
conn: conn,
|
||||||
relayManager: relayManager,
|
relayManager: relayManager,
|
||||||
callBacks: callbacks,
|
wgWatcher: NewWGWatcher(log, config.WgConfig.WgInterface, config.Key),
|
||||||
}
|
}
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
@@ -87,7 +76,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
w.relayedConn = relayedConn
|
w.relayedConn = relayedConn
|
||||||
w.relayLock.Unlock()
|
w.relayLock.Unlock()
|
||||||
|
|
||||||
err = w.relayManager.AddCloseListener(srv, w.onRelayMGDisconnected)
|
err = w.relayManager.AddCloseListener(srv, w.onRelayClientDisconnected)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to add close listener: %s", err)
|
log.Errorf("failed to add close listener: %s", err)
|
||||||
_ = relayedConn.Close()
|
_ = relayedConn.Close()
|
||||||
@@ -95,7 +84,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
w.log.Debugf("peer conn opened via Relay: %s", srv)
|
w.log.Debugf("peer conn opened via Relay: %s", srv)
|
||||||
go w.callBacks.OnConnReady(RelayConnInfo{
|
go w.conn.onRelayConnectionIsReady(RelayConnInfo{
|
||||||
relayedConn: relayedConn,
|
relayedConn: relayedConn,
|
||||||
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
|
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
|
||||||
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
|
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
|
||||||
@@ -103,32 +92,11 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) {
|
func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) {
|
||||||
w.log.Debugf("enable WireGuard watcher")
|
w.wgWatcher.EnableWgWatcher(ctx, w.onWGDisconnected)
|
||||||
w.ctxLock.Lock()
|
|
||||||
defer w.ctxLock.Unlock()
|
|
||||||
|
|
||||||
if w.ctxWgWatch != nil && w.ctxWgWatch.Err() == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
ctx, ctxCancel := context.WithCancel(ctx)
|
|
||||||
w.ctxWgWatch = ctx
|
|
||||||
w.ctxCancelWgWatch = ctxCancel
|
|
||||||
|
|
||||||
w.wgStateCheck(ctx, ctxCancel)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerRelay) DisableWgWatcher() {
|
func (w *WorkerRelay) DisableWgWatcher() {
|
||||||
w.ctxLock.Lock()
|
w.wgWatcher.DisableWgWatcher()
|
||||||
defer w.ctxLock.Unlock()
|
|
||||||
|
|
||||||
if w.ctxCancelWgWatch == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
w.log.Debugf("disable WireGuard watcher")
|
|
||||||
|
|
||||||
w.ctxCancelWgWatch()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerRelay) RelayInstanceAddress() (string, error) {
|
func (w *WorkerRelay) RelayInstanceAddress() (string, error) {
|
||||||
@@ -150,57 +118,17 @@ func (w *WorkerRelay) CloseConn() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err := w.relayedConn.Close()
|
if err := w.relayedConn.Close(); err != nil {
|
||||||
if err != nil {
|
|
||||||
w.log.Warnf("failed to close relay connection: %v", err)
|
w.log.Warnf("failed to close relay connection: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// wgStateCheck help to check the state of the WireGuard handshake and relay connection
|
func (w *WorkerRelay) onWGDisconnected() {
|
||||||
func (w *WorkerRelay) wgStateCheck(ctx context.Context, ctxCancel context.CancelFunc) {
|
w.relayLock.Lock()
|
||||||
w.log.Debugf("WireGuard watcher started")
|
_ = w.relayedConn.Close()
|
||||||
lastHandshake, err := w.wgState()
|
w.relayLock.Unlock()
|
||||||
if err != nil {
|
|
||||||
w.log.Warnf("failed to read wg stats: %v", err)
|
|
||||||
lastHandshake = time.Time{}
|
|
||||||
}
|
|
||||||
|
|
||||||
go func(lastHandshake time.Time) {
|
|
||||||
timer := time.NewTimer(wgHandshakeOvertime)
|
|
||||||
defer timer.Stop()
|
|
||||||
defer ctxCancel()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-timer.C:
|
|
||||||
handshake, err := w.wgState()
|
|
||||||
if err != nil {
|
|
||||||
w.log.Errorf("failed to read wg stats: %v", err)
|
|
||||||
timer.Reset(wgHandshakeOvertime)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
w.log.Tracef("previous handshake, handshake: %v, %v", lastHandshake, handshake)
|
|
||||||
|
|
||||||
if handshake.Equal(lastHandshake) {
|
|
||||||
w.log.Infof("WireGuard handshake timed out, closing relay connection: %v", handshake)
|
|
||||||
w.relayLock.Lock()
|
|
||||||
_ = w.relayedConn.Close()
|
|
||||||
w.relayLock.Unlock()
|
|
||||||
w.callBacks.OnDisconnected()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
resetTime := time.Until(handshake.Add(wgHandshakePeriod + wgHandshakeOvertime))
|
|
||||||
lastHandshake = handshake
|
|
||||||
timer.Reset(resetTime)
|
|
||||||
case <-ctx.Done():
|
|
||||||
w.log.Debugf("WireGuard watcher stopped")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}(lastHandshake)
|
|
||||||
|
|
||||||
|
w.conn.onRelayDisconnected()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
|
func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
|
||||||
@@ -217,20 +145,7 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st
|
|||||||
return remoteRelayAddress
|
return remoteRelayAddress
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerRelay) wgState() (time.Time, error) {
|
func (w *WorkerRelay) onRelayClientDisconnected() {
|
||||||
wgState, err := w.config.WgConfig.WgInterface.GetStats(w.config.Key)
|
w.wgWatcher.DisableWgWatcher()
|
||||||
if err != nil {
|
go w.conn.onRelayDisconnected()
|
||||||
return time.Time{}, err
|
|
||||||
}
|
|
||||||
return wgState.LastHandshake, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (w *WorkerRelay) onRelayMGDisconnected() {
|
|
||||||
w.ctxLock.Lock()
|
|
||||||
defer w.ctxLock.Unlock()
|
|
||||||
|
|
||||||
if w.ctxCancelWgWatch != nil {
|
|
||||||
w.ctxCancelWgWatch()
|
|
||||||
}
|
|
||||||
go w.callBacks.OnDisconnected()
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user