mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-05 16:46:39 +00:00
Compare commits
12 Commits
debug-and-
...
merged-fix
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6efc1a61fe | ||
|
|
4a45e40578 | ||
|
|
85ad236f6d | ||
|
|
e54ab4b5e1 | ||
|
|
23977c6409 | ||
|
|
a73cb99045 | ||
|
|
ccf40fefaf | ||
|
|
b2548a4037 | ||
|
|
f86a7f745e | ||
|
|
fd13247d66 | ||
|
|
1d83fccd9c | ||
|
|
7d3c972653 |
@@ -394,6 +394,13 @@ func toLastHandshake(stringVar string) (time.Time, error) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return time.Time{}, fmt.Errorf("parse handshake sec: %w", err)
|
return time.Time{}, fmt.Errorf("parse handshake sec: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If sec is 0 (Unix epoch), return zero time instead
|
||||||
|
// This indicates no handshake has occurred
|
||||||
|
if sec == 0 {
|
||||||
|
return time.Time{}, nil
|
||||||
|
}
|
||||||
|
|
||||||
return time.Unix(sec, 0), nil
|
return time.Unix(sec, 0), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -62,7 +62,7 @@ func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
|
|||||||
closeFn: func() error { return nil },
|
closeFn: func() error { return nil },
|
||||||
}
|
}
|
||||||
pl = append(pl, pUDP)
|
pl = append(pl, pUDP)
|
||||||
wgAddress, err := wgaddr.ParseWGAddress("10.0.0.1")
|
wgAddress, err := wgaddr.ParseWGAddress("10.0.0.1/32")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/iface/bind"
|
"github.com/netbirdio/netbird/client/iface/bind"
|
||||||
|
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||||
bindproxy "github.com/netbirdio/netbird/client/iface/wgproxy/bind"
|
bindproxy "github.com/netbirdio/netbird/client/iface/wgproxy/bind"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -17,7 +18,7 @@ func seedProxies() ([]proxyInstance, error) {
|
|||||||
|
|
||||||
func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
|
func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
|
||||||
pl := make([]proxyInstance, 0)
|
pl := make([]proxyInstance, 0)
|
||||||
wgAddress, err := wgaddr.ParseWGAddress("10.0.0.1")
|
wgAddress, err := wgaddr.ParseWGAddress("10.0.0.1/32")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -93,8 +93,9 @@ func TestProxyCloseByRemoteConn(t *testing.T) {
|
|||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
addr, _ := net.ResolveUDPAddr("udp", "100.108.135.221:51892")
|
||||||
relayedConn := newMockConn()
|
relayedConn := newMockConn()
|
||||||
err := tt.proxy.AddTurnConn(ctx, nil, relayedConn)
|
err := tt.proxy.AddTurnConn(ctx, addr, relayedConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("error: %v", err)
|
t.Errorf("error: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -949,7 +949,6 @@ func (e *Engine) receiveManagementEvents() {
|
|||||||
e.config.LazyConnectionEnabled,
|
e.config.LazyConnectionEnabled,
|
||||||
)
|
)
|
||||||
|
|
||||||
// err = e.mgmClient.Sync(info, e.handleSync)
|
|
||||||
err = e.mgmClient.Sync(e.ctx, info, e.handleSync)
|
err = e.mgmClient.Sync(e.ctx, info, e.handleSync)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// happens if management is unavailable for a long time.
|
// happens if management is unavailable for a long time.
|
||||||
@@ -960,7 +959,7 @@ func (e *Engine) receiveManagementEvents() {
|
|||||||
}
|
}
|
||||||
log.Debugf("stopped receiving updates from Management Service")
|
log.Debugf("stopped receiving updates from Management Service")
|
||||||
}()
|
}()
|
||||||
log.Debugf("connecting to Management Service updates stream")
|
log.Infof("connecting to Management Service updates stream")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) updateSTUNs(stuns []*mgmProto.HostConfig) error {
|
func (e *Engine) updateSTUNs(stuns []*mgmProto.HostConfig) error {
|
||||||
|
|||||||
@@ -118,6 +118,8 @@ type Conn struct {
|
|||||||
|
|
||||||
// debug purpose
|
// debug purpose
|
||||||
dumpState *stateDump
|
dumpState *stateDump
|
||||||
|
|
||||||
|
endpointUpdater *endpointUpdater
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConn creates a new not opened Conn to the remote peer.
|
// NewConn creates a new not opened Conn to the remote peer.
|
||||||
@@ -141,6 +143,11 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
|||||||
statusRelay: worker.NewAtomicStatus(),
|
statusRelay: worker.NewAtomicStatus(),
|
||||||
statusICE: worker.NewAtomicStatus(),
|
statusICE: worker.NewAtomicStatus(),
|
||||||
dumpState: newStateDump(config.Key, connLog, services.StatusRecorder),
|
dumpState: newStateDump(config.Key, connLog, services.StatusRecorder),
|
||||||
|
endpointUpdater: &endpointUpdater{
|
||||||
|
log: connLog,
|
||||||
|
wgConfig: config.WgConfig,
|
||||||
|
initiator: isWireGuardInitiator(config),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
@@ -250,7 +257,7 @@ func (conn *Conn) Close(signalToRemote bool) {
|
|||||||
conn.wgProxyICE = nil
|
conn.wgProxyICE = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := conn.removeWgPeer(); err != nil {
|
if err := conn.endpointUpdater.removeWgPeer(); err != nil {
|
||||||
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -377,13 +384,12 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
|
|||||||
}
|
}
|
||||||
|
|
||||||
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
|
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
|
||||||
if err = conn.configureWGEndpoint(ep, iceConnInfo.RosenpassPubKey); err != nil {
|
if err = conn.endpointUpdater.configureWGEndpoint(ep, iceConnInfo.RosenpassPubKey); err != nil {
|
||||||
conn.handleConfigurationFailure(err, wgProxy)
|
conn.handleConfigurationFailure(err, wgProxy)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
wgConfigWorkaround()
|
wgConfigWorkaround()
|
||||||
|
|
||||||
|
|
||||||
if conn.wgProxyRelay != nil {
|
if conn.wgProxyRelay != nil {
|
||||||
conn.Log.Debugf("redirect packages from relayed conn to WireGuard")
|
conn.Log.Debugf("redirect packages from relayed conn to WireGuard")
|
||||||
conn.wgProxyRelay.RedirectAs(ep)
|
conn.wgProxyRelay.RedirectAs(ep)
|
||||||
@@ -417,7 +423,7 @@ func (conn *Conn) onICEStateDisconnected() {
|
|||||||
conn.dumpState.SwitchToRelay()
|
conn.dumpState.SwitchToRelay()
|
||||||
conn.wgProxyRelay.Work()
|
conn.wgProxyRelay.Work()
|
||||||
|
|
||||||
if err := conn.configureWGEndpoint(conn.wgProxyRelay.EndpointAddr(), conn.rosenpassRemoteKey); err != nil {
|
if err := conn.endpointUpdater.configureWGEndpoint(conn.wgProxyRelay.EndpointAddr(), conn.rosenpassRemoteKey); err != nil {
|
||||||
conn.Log.Errorf("failed to switch to relay conn: %v", err)
|
conn.Log.Errorf("failed to switch to relay conn: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -486,7 +492,7 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
wgProxy.Work()
|
wgProxy.Work()
|
||||||
if err := conn.configureWGEndpoint(wgProxy.EndpointAddr(), rci.rosenpassPubKey); err != nil {
|
if err := conn.endpointUpdater.configureWGEndpoint(wgProxy.EndpointAddr(), rci.rosenpassPubKey); err != nil {
|
||||||
if err := wgProxy.CloseConn(); err != nil {
|
if err := wgProxy.CloseConn(); err != nil {
|
||||||
conn.Log.Warnf("Failed to close relay connection: %v", err)
|
conn.Log.Warnf("Failed to close relay connection: %v", err)
|
||||||
}
|
}
|
||||||
@@ -554,17 +560,6 @@ func (conn *Conn) onGuardEvent() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) configureWGEndpoint(addr *net.UDPAddr, remoteRPKey []byte) error {
|
|
||||||
presharedKey := conn.presharedKey(remoteRPKey)
|
|
||||||
return conn.config.WgConfig.WgInterface.UpdatePeer(
|
|
||||||
conn.config.WgConfig.RemoteKey,
|
|
||||||
conn.config.WgConfig.AllowedIps,
|
|
||||||
defaultWgKeepAlive,
|
|
||||||
addr,
|
|
||||||
presharedKey,
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
|
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
@@ -707,10 +702,6 @@ func (conn *Conn) isICEActive() bool {
|
|||||||
return (conn.currentConnPriority == conntype.ICEP2P || conn.currentConnPriority == conntype.ICETurn) && conn.statusICE.Get() == worker.StatusConnected
|
return (conn.currentConnPriority == conntype.ICEP2P || conn.currentConnPriority == conntype.ICETurn) && conn.statusICE.Get() == worker.StatusConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) removeWgPeer() error {
|
|
||||||
return conn.config.WgConfig.WgInterface.RemovePeer(conn.config.WgConfig.RemoteKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
|
func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
|
||||||
conn.Log.Warnf("Failed to update wg peer configuration: %v", err)
|
conn.Log.Warnf("Failed to update wg peer configuration: %v", err)
|
||||||
if wgProxy != nil {
|
if wgProxy != nil {
|
||||||
@@ -791,6 +782,10 @@ func isController(config ConnConfig) bool {
|
|||||||
return config.LocalKey > config.Key
|
return config.LocalKey > config.Key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func isWireGuardInitiator(config ConnConfig) bool {
|
||||||
|
return isController(config)
|
||||||
|
}
|
||||||
|
|
||||||
func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {
|
func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool {
|
||||||
return remoteRosenpassPubKey != nil
|
return remoteRosenpassPubKey != nil
|
||||||
}
|
}
|
||||||
|
|||||||
88
client/internal/peer/endpoint.go
Normal file
88
client/internal/peer/endpoint.go
Normal file
@@ -0,0 +1,88 @@
|
|||||||
|
package peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
// fallbackDelay could be const but because of testing it is a var
|
||||||
|
var fallbackDelay = 5 * time.Second
|
||||||
|
|
||||||
|
type endpointUpdater struct {
|
||||||
|
log *logrus.Entry
|
||||||
|
wgConfig WgConfig
|
||||||
|
initiator bool
|
||||||
|
|
||||||
|
cancelFunc func()
|
||||||
|
configUpdateMutex sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
// configureWGEndpoint sets up the WireGuard endpoint configuration.
|
||||||
|
// The initiator immediately configures the endpoint, while the non-initiator
|
||||||
|
// waits for a fallback period before configuring to avoid handshake congestion.
|
||||||
|
func (e *endpointUpdater) configureWGEndpoint(addr *net.UDPAddr, remoteRPKey []byte) error {
|
||||||
|
if e.initiator {
|
||||||
|
return e.updateWireGuardPeer(addr, remoteRPKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// prevent to run new update while cancel the previous update
|
||||||
|
e.configUpdateMutex.Lock()
|
||||||
|
if e.cancelFunc != nil {
|
||||||
|
e.cancelFunc()
|
||||||
|
}
|
||||||
|
e.configUpdateMutex.Unlock()
|
||||||
|
|
||||||
|
var ctx context.Context
|
||||||
|
ctx, e.cancelFunc = context.WithCancel(context.Background())
|
||||||
|
go e.scheduleDelayedUpdate(ctx, addr, remoteRPKey)
|
||||||
|
|
||||||
|
return e.updateWireGuardPeer(nil, remoteRPKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *endpointUpdater) removeWgPeer() error {
|
||||||
|
e.configUpdateMutex.Lock()
|
||||||
|
defer e.configUpdateMutex.Unlock()
|
||||||
|
|
||||||
|
if e.cancelFunc != nil {
|
||||||
|
e.cancelFunc()
|
||||||
|
}
|
||||||
|
|
||||||
|
return e.wgConfig.WgInterface.RemovePeer(e.wgConfig.RemoteKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
// scheduleDelayedUpdate waits for the fallback period before updating the endpoint
|
||||||
|
func (e *endpointUpdater) scheduleDelayedUpdate(ctx context.Context, addr *net.UDPAddr, remoteRPKey []byte) {
|
||||||
|
t := time.NewTimer(fallbackDelay)
|
||||||
|
defer t.Stop()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
e.configUpdateMutex.Lock()
|
||||||
|
defer e.configUpdateMutex.Unlock()
|
||||||
|
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := e.updateWireGuardPeer(addr, remoteRPKey); err != nil {
|
||||||
|
e.log.Errorf("failed to update WireGuard peer, address: %s, error: %v", addr, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *endpointUpdater) updateWireGuardPeer(endpoint *net.UDPAddr, remoteRPKey []byte) error {
|
||||||
|
// todo add, "presharedKey := e.presharedKey(remote)"
|
||||||
|
return e.wgConfig.WgInterface.UpdatePeer(
|
||||||
|
e.wgConfig.RemoteKey,
|
||||||
|
e.wgConfig.AllowedIps,
|
||||||
|
defaultWgKeepAlive,
|
||||||
|
endpoint,
|
||||||
|
e.wgConfig.PreSharedKey,
|
||||||
|
)
|
||||||
|
}
|
||||||
@@ -43,13 +43,6 @@ type OfferAnswer struct {
|
|||||||
SessionID *ICESessionID
|
SessionID *ICESessionID
|
||||||
}
|
}
|
||||||
|
|
||||||
func (oa *OfferAnswer) SessionIDString() string {
|
|
||||||
if oa.SessionID == nil {
|
|
||||||
return "unknown"
|
|
||||||
}
|
|
||||||
return oa.SessionID.String()
|
|
||||||
}
|
|
||||||
|
|
||||||
type Handshaker struct {
|
type Handshaker struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
@@ -57,7 +50,7 @@ type Handshaker struct {
|
|||||||
signaler *Signaler
|
signaler *Signaler
|
||||||
ice *WorkerICE
|
ice *WorkerICE
|
||||||
relay *WorkerRelay
|
relay *WorkerRelay
|
||||||
onNewOfferListeners []func(*OfferAnswer)
|
onNewOfferListeners []*OfferListener
|
||||||
|
|
||||||
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
|
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
|
||||||
remoteOffersCh chan OfferAnswer
|
remoteOffersCh chan OfferAnswer
|
||||||
@@ -78,7 +71,8 @@ func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *W
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) {
|
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) {
|
||||||
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
|
l := NewOfferListener(offer)
|
||||||
|
h.onNewOfferListeners = append(h.onNewOfferListeners, l)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handshaker) Listen(ctx context.Context) {
|
func (h *Handshaker) Listen(ctx context.Context) {
|
||||||
@@ -91,13 +85,13 @@ func (h *Handshaker) Listen(ctx context.Context) {
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
for _, listener := range h.onNewOfferListeners {
|
for _, listener := range h.onNewOfferListeners {
|
||||||
listener(&remoteOfferAnswer)
|
listener.Notify(&remoteOfferAnswer)
|
||||||
}
|
}
|
||||||
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
||||||
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
||||||
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
||||||
for _, listener := range h.onNewOfferListeners {
|
for _, listener := range h.onNewOfferListeners {
|
||||||
listener(&remoteOfferAnswer)
|
listener.Notify(&remoteOfferAnswer)
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
h.log.Infof("stop listening for remote offers and answers")
|
h.log.Infof("stop listening for remote offers and answers")
|
||||||
|
|||||||
62
client/internal/peer/handshaker_listener.go
Normal file
62
client/internal/peer/handshaker_listener.go
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
package peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type callbackFunc func(remoteOfferAnswer *OfferAnswer)
|
||||||
|
|
||||||
|
func (oa *OfferAnswer) SessionIDString() string {
|
||||||
|
if oa.SessionID == nil {
|
||||||
|
return "unknown"
|
||||||
|
}
|
||||||
|
return oa.SessionID.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
type OfferListener struct {
|
||||||
|
fn callbackFunc
|
||||||
|
running bool
|
||||||
|
latest *OfferAnswer
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewOfferListener(fn callbackFunc) *OfferListener {
|
||||||
|
return &OfferListener{
|
||||||
|
fn: fn,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (o *OfferListener) Notify(remoteOfferAnswer *OfferAnswer) {
|
||||||
|
o.mu.Lock()
|
||||||
|
defer o.mu.Unlock()
|
||||||
|
|
||||||
|
// Store the latest offer
|
||||||
|
o.latest = remoteOfferAnswer
|
||||||
|
|
||||||
|
// If already running, the running goroutine will pick up this latest value
|
||||||
|
if o.running {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start processing
|
||||||
|
o.running = true
|
||||||
|
|
||||||
|
// Process in a goroutine to avoid blocking the caller
|
||||||
|
go func(remoteOfferAnswer *OfferAnswer) {
|
||||||
|
for {
|
||||||
|
o.fn(remoteOfferAnswer)
|
||||||
|
|
||||||
|
o.mu.Lock()
|
||||||
|
if o.latest == nil {
|
||||||
|
// No more work to do
|
||||||
|
o.running = false
|
||||||
|
o.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
remoteOfferAnswer = o.latest
|
||||||
|
// Clear the latest to mark it as being processed
|
||||||
|
o.latest = nil
|
||||||
|
o.mu.Unlock()
|
||||||
|
}
|
||||||
|
}(remoteOfferAnswer)
|
||||||
|
}
|
||||||
39
client/internal/peer/handshaker_listener_test.go
Normal file
39
client/internal/peer/handshaker_listener_test.go
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
package peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_newOfferListener(t *testing.T) {
|
||||||
|
dummyOfferAnswer := &OfferAnswer{}
|
||||||
|
runChan := make(chan struct{}, 10)
|
||||||
|
|
||||||
|
longRunningFn := func(remoteOfferAnswer *OfferAnswer) {
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
runChan <- struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
hl := NewOfferListener(longRunningFn)
|
||||||
|
|
||||||
|
hl.Notify(dummyOfferAnswer)
|
||||||
|
hl.Notify(dummyOfferAnswer)
|
||||||
|
hl.Notify(dummyOfferAnswer)
|
||||||
|
|
||||||
|
// Wait for exactly 2 callbacks
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
select {
|
||||||
|
case <-runChan:
|
||||||
|
case <-time.After(3 * time.Second):
|
||||||
|
t.Fatal("Timeout waiting for callback")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify no additional callbacks happen
|
||||||
|
select {
|
||||||
|
case <-runChan:
|
||||||
|
t.Fatal("Unexpected additional callback")
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Log("Correctly received exactly 2 callbacks")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -33,6 +33,7 @@ type WGWatcher struct {
|
|||||||
ctx context.Context
|
ctx context.Context
|
||||||
ctxCancel context.CancelFunc
|
ctxCancel context.CancelFunc
|
||||||
ctxLock sync.Mutex
|
ctxLock sync.Mutex
|
||||||
|
enabled time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump) *WGWatcher {
|
func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey string, stateDump *stateDump) *WGWatcher {
|
||||||
@@ -48,6 +49,7 @@ func NewWGWatcher(log *log.Entry, wgIfaceStater WGInterfaceStater, peerKey strin
|
|||||||
func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn func()) {
|
func (w *WGWatcher) EnableWgWatcher(parentCtx context.Context, onDisconnectedFn func()) {
|
||||||
w.log.Debugf("enable WireGuard watcher")
|
w.log.Debugf("enable WireGuard watcher")
|
||||||
w.ctxLock.Lock()
|
w.ctxLock.Lock()
|
||||||
|
w.enabled = time.Now()
|
||||||
|
|
||||||
if w.ctx != nil && w.ctx.Err() == nil {
|
if w.ctx != nil && w.ctx.Err() == nil {
|
||||||
w.log.Errorf("WireGuard watcher already enabled")
|
w.log.Errorf("WireGuard watcher already enabled")
|
||||||
@@ -87,6 +89,8 @@ func (w *WGWatcher) DisableWgWatcher() {
|
|||||||
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func(), initialHandshake time.Time) {
|
func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel context.CancelFunc, onDisconnectedFn func(), initialHandshake time.Time) {
|
||||||
w.log.Infof("WireGuard watcher started")
|
w.log.Infof("WireGuard watcher started")
|
||||||
|
|
||||||
|
debugTicker := time.NewTicker(time.Second)
|
||||||
|
|
||||||
timer := time.NewTimer(wgHandshakeOvertime)
|
timer := time.NewTimer(wgHandshakeOvertime)
|
||||||
defer timer.Stop()
|
defer timer.Stop()
|
||||||
defer ctxCancel()
|
defer ctxCancel()
|
||||||
@@ -95,12 +99,29 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex
|
|||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
|
case <-debugTicker.C:
|
||||||
|
handshake, err := w.wgState()
|
||||||
|
if err != nil {
|
||||||
|
w.log.Errorf("failed to read wg stats: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if !handshake.IsZero() {
|
||||||
|
w.log.Infof("first wg handshake detected at: %s, %s", handshake, time.Since(w.enabled))
|
||||||
|
debugTicker.Stop()
|
||||||
|
}
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
handshake, ok := w.handshakeCheck(lastHandshake)
|
handshake, ok := w.handshakeCheck(lastHandshake)
|
||||||
if !ok {
|
if !ok {
|
||||||
onDisconnectedFn()
|
onDisconnectedFn()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
// todo: put it back if remove debug ticker
|
||||||
|
if lastHandshake.IsZero() {
|
||||||
|
w.log.Infof("first wg handshake detected at: %s", handshake)
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
lastHandshake = *handshake
|
lastHandshake = *handshake
|
||||||
|
|
||||||
resetTime := time.Until(handshake.Add(checkPeriod))
|
resetTime := time.Until(handshake.Add(checkPeriod))
|
||||||
|
|||||||
@@ -122,7 +122,6 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
w.log.Warnf("failed to close ICE agent: %s", err)
|
w.log.Warnf("failed to close ICE agent: %s", err)
|
||||||
}
|
}
|
||||||
w.agent = nil
|
w.agent = nil
|
||||||
// todo consider to switch to Relay connection while establishing a new ICE connection
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var preferredCandidateTypes []ice.CandidateType
|
var preferredCandidateTypes []ice.CandidateType
|
||||||
@@ -410,7 +409,10 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia
|
|||||||
case ice.ConnectionStateConnected:
|
case ice.ConnectionStateConnected:
|
||||||
w.lastKnownState = ice.ConnectionStateConnected
|
w.lastKnownState = ice.ConnectionStateConnected
|
||||||
return
|
return
|
||||||
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
|
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected, ice.ConnectionStateClosed:
|
||||||
|
// ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to
|
||||||
|
// notify the conn.onICEStateDisconnected changes to update the current used priority
|
||||||
|
|
||||||
if w.lastKnownState == ice.ConnectionStateConnected {
|
if w.lastKnownState == ice.ConnectionStateConnected {
|
||||||
w.lastKnownState = ice.ConnectionStateDisconnected
|
w.lastKnownState = ice.ConnectionStateDisconnected
|
||||||
w.conn.onICEStateDisconnected()
|
w.conn.onICEStateDisconnected()
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"net/netip"
|
"net/netip"
|
||||||
"strings"
|
"strings"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/shared/management/proto"
|
"github.com/netbirdio/netbird/shared/management/proto"
|
||||||
@@ -180,6 +181,7 @@ func isDuplicated(addresses []NetworkAddress, addr NetworkAddress) bool {
|
|||||||
|
|
||||||
// GetInfoWithChecks retrieves and parses the system information with applied checks.
|
// GetInfoWithChecks retrieves and parses the system information with applied checks.
|
||||||
func GetInfoWithChecks(ctx context.Context, checks []*proto.Checks) (*Info, error) {
|
func GetInfoWithChecks(ctx context.Context, checks []*proto.Checks) (*Info, error) {
|
||||||
|
log.Debugf("gathering system information with checks: %d", len(checks))
|
||||||
processCheckPaths := make([]string, 0)
|
processCheckPaths := make([]string, 0)
|
||||||
for _, check := range checks {
|
for _, check := range checks {
|
||||||
processCheckPaths = append(processCheckPaths, check.GetFiles()...)
|
processCheckPaths = append(processCheckPaths, check.GetFiles()...)
|
||||||
@@ -189,10 +191,12 @@ func GetInfoWithChecks(ctx context.Context, checks []*proto.Checks) (*Info, erro
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
log.Debugf("gathering process check information completed")
|
||||||
|
|
||||||
info := GetInfo(ctx)
|
info := GetInfo(ctx)
|
||||||
info.Files = files
|
info.Files = files
|
||||||
|
|
||||||
|
log.Debugf("all system information gathered successfully")
|
||||||
return info, nil
|
return info, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"os"
|
"os"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@@ -31,42 +32,100 @@ type Win32_BIOS struct {
|
|||||||
SerialNumber string
|
SerialNumber string
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetInfo retrieves and parses the system information
|
// CachedStaticInfo holds all the static system information that never changes
|
||||||
func GetInfo(ctx context.Context) *Info {
|
type CachedStaticInfo struct {
|
||||||
|
OSName string
|
||||||
|
OSVersion string
|
||||||
|
KernelVersion string
|
||||||
|
SystemSerialNumber string
|
||||||
|
SystemProductName string
|
||||||
|
SystemManufacturer string
|
||||||
|
Environment Environment // Assuming this is from your StaticInfo struct
|
||||||
|
GoOS string
|
||||||
|
CPUs int
|
||||||
|
Kernel string
|
||||||
|
Platform string
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
cachedStaticInfo *CachedStaticInfo
|
||||||
|
staticInfoOnce sync.Once
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
go initStaticInfo()
|
||||||
|
}
|
||||||
|
|
||||||
|
// initStaticInfo initializes all static system information once
|
||||||
|
func initStaticInfo() {
|
||||||
|
staticInfoOnce.Do(func() {
|
||||||
|
log.Debugf("initializing static system information (one-time operation)")
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
// Get OS info
|
||||||
osName, osVersion := getOSNameAndVersion()
|
osName, osVersion := getOSNameAndVersion()
|
||||||
buildVersion := getBuildVersion()
|
buildVersion := getBuildVersion()
|
||||||
|
|
||||||
|
// Get hardware info
|
||||||
|
si := updateStaticInfo()
|
||||||
|
|
||||||
|
cachedStaticInfo = &CachedStaticInfo{
|
||||||
|
OSName: osName,
|
||||||
|
OSVersion: osVersion,
|
||||||
|
KernelVersion: buildVersion,
|
||||||
|
SystemSerialNumber: si.SystemSerialNumber,
|
||||||
|
SystemProductName: si.SystemProductName,
|
||||||
|
SystemManufacturer: si.SystemManufacturer,
|
||||||
|
Environment: si.Environment,
|
||||||
|
GoOS: runtime.GOOS,
|
||||||
|
CPUs: runtime.NumCPU(),
|
||||||
|
Kernel: "windows",
|
||||||
|
Platform: "unknown",
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("static system information initialized in %s", time.Since(start))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetInfo retrieves system information (static info cached, dynamic info fresh)
|
||||||
|
func GetInfo(ctx context.Context) *Info {
|
||||||
|
initStaticInfo()
|
||||||
|
log.Debugf("gathering dynamic system information")
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
// Only gather dynamic information that might change
|
||||||
|
log.Debugf("gathering networkAddresses")
|
||||||
addrs, err := networkAddresses()
|
addrs, err := networkAddresses()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("failed to discover network addresses: %s", err)
|
log.Warnf("failed to discover network addresses: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
start := time.Now()
|
log.Debugf("gathering Hostname")
|
||||||
si := updateStaticInfo()
|
|
||||||
if time.Since(start) > 1*time.Second {
|
|
||||||
log.Warnf("updateStaticInfo took %s", time.Since(start))
|
|
||||||
}
|
|
||||||
|
|
||||||
gio := &Info{
|
|
||||||
Kernel: "windows",
|
|
||||||
OSVersion: osVersion,
|
|
||||||
Platform: "unknown",
|
|
||||||
OS: osName,
|
|
||||||
GoOS: runtime.GOOS,
|
|
||||||
CPUs: runtime.NumCPU(),
|
|
||||||
KernelVersion: buildVersion,
|
|
||||||
NetworkAddresses: addrs,
|
|
||||||
SystemSerialNumber: si.SystemSerialNumber,
|
|
||||||
SystemProductName: si.SystemProductName,
|
|
||||||
SystemManufacturer: si.SystemManufacturer,
|
|
||||||
Environment: si.Environment,
|
|
||||||
}
|
|
||||||
|
|
||||||
systemHostname, _ := os.Hostname()
|
systemHostname, _ := os.Hostname()
|
||||||
gio.Hostname = extractDeviceName(ctx, systemHostname)
|
|
||||||
gio.NetbirdVersion = version.NetbirdVersion()
|
|
||||||
gio.UIVersion = extractUserAgent(ctx)
|
|
||||||
|
|
||||||
|
// Create Info struct using cached static info + fresh dynamic info
|
||||||
|
gio := &Info{
|
||||||
|
// Static information (cached)
|
||||||
|
Kernel: cachedStaticInfo.Kernel,
|
||||||
|
OSVersion: cachedStaticInfo.OSVersion,
|
||||||
|
Platform: cachedStaticInfo.Platform,
|
||||||
|
OS: cachedStaticInfo.OSName,
|
||||||
|
GoOS: cachedStaticInfo.GoOS,
|
||||||
|
CPUs: cachedStaticInfo.CPUs,
|
||||||
|
KernelVersion: cachedStaticInfo.KernelVersion,
|
||||||
|
SystemSerialNumber: cachedStaticInfo.SystemSerialNumber,
|
||||||
|
SystemProductName: cachedStaticInfo.SystemProductName,
|
||||||
|
SystemManufacturer: cachedStaticInfo.SystemManufacturer,
|
||||||
|
Environment: cachedStaticInfo.Environment,
|
||||||
|
|
||||||
|
// Dynamic information (fresh each call)
|
||||||
|
NetworkAddresses: addrs,
|
||||||
|
Hostname: extractDeviceName(ctx, systemHostname),
|
||||||
|
NetbirdVersion: version.NetbirdVersion(), // This might change with updates
|
||||||
|
UIVersion: extractUserAgent(ctx), // This might change
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("dynamic system information gathered in %s", time.Since(start))
|
||||||
return gio
|
return gio
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user