mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 07:16:38 +00:00
Do not block Offer processing from relay worker (#4435)
- do not miss ICE offers when relay worker busy - close p2p connection before recreate agent
This commit is contained in:
@@ -43,13 +43,6 @@ type OfferAnswer struct {
|
||||
SessionID *ICESessionID
|
||||
}
|
||||
|
||||
func (oa *OfferAnswer) SessionIDString() string {
|
||||
if oa.SessionID == nil {
|
||||
return "unknown"
|
||||
}
|
||||
return oa.SessionID.String()
|
||||
}
|
||||
|
||||
type Handshaker struct {
|
||||
mu sync.Mutex
|
||||
log *log.Entry
|
||||
@@ -57,7 +50,7 @@ type Handshaker struct {
|
||||
signaler *Signaler
|
||||
ice *WorkerICE
|
||||
relay *WorkerRelay
|
||||
onNewOfferListeners []func(*OfferAnswer)
|
||||
onNewOfferListeners []*OfferListener
|
||||
|
||||
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
|
||||
remoteOffersCh chan OfferAnswer
|
||||
@@ -78,7 +71,8 @@ func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *W
|
||||
}
|
||||
|
||||
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) {
|
||||
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
|
||||
l := NewOfferListener(offer)
|
||||
h.onNewOfferListeners = append(h.onNewOfferListeners, l)
|
||||
}
|
||||
|
||||
func (h *Handshaker) Listen(ctx context.Context) {
|
||||
@@ -91,13 +85,13 @@ func (h *Handshaker) Listen(ctx context.Context) {
|
||||
continue
|
||||
}
|
||||
for _, listener := range h.onNewOfferListeners {
|
||||
listener(&remoteOfferAnswer)
|
||||
listener.Notify(&remoteOfferAnswer)
|
||||
}
|
||||
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
||||
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
||||
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
|
||||
for _, listener := range h.onNewOfferListeners {
|
||||
listener(&remoteOfferAnswer)
|
||||
listener.Notify(&remoteOfferAnswer)
|
||||
}
|
||||
case <-ctx.Done():
|
||||
h.log.Infof("stop listening for remote offers and answers")
|
||||
|
||||
62
client/internal/peer/handshaker_listener.go
Normal file
62
client/internal/peer/handshaker_listener.go
Normal file
@@ -0,0 +1,62 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type callbackFunc func(remoteOfferAnswer *OfferAnswer)
|
||||
|
||||
func (oa *OfferAnswer) SessionIDString() string {
|
||||
if oa.SessionID == nil {
|
||||
return "unknown"
|
||||
}
|
||||
return oa.SessionID.String()
|
||||
}
|
||||
|
||||
type OfferListener struct {
|
||||
fn callbackFunc
|
||||
running bool
|
||||
latest *OfferAnswer
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
func NewOfferListener(fn callbackFunc) *OfferListener {
|
||||
return &OfferListener{
|
||||
fn: fn,
|
||||
}
|
||||
}
|
||||
|
||||
func (o *OfferListener) Notify(remoteOfferAnswer *OfferAnswer) {
|
||||
o.mu.Lock()
|
||||
defer o.mu.Unlock()
|
||||
|
||||
// Store the latest offer
|
||||
o.latest = remoteOfferAnswer
|
||||
|
||||
// If already running, the running goroutine will pick up this latest value
|
||||
if o.running {
|
||||
return
|
||||
}
|
||||
|
||||
// Start processing
|
||||
o.running = true
|
||||
|
||||
// Process in a goroutine to avoid blocking the caller
|
||||
go func(remoteOfferAnswer *OfferAnswer) {
|
||||
for {
|
||||
o.fn(remoteOfferAnswer)
|
||||
|
||||
o.mu.Lock()
|
||||
if o.latest == nil {
|
||||
// No more work to do
|
||||
o.running = false
|
||||
o.mu.Unlock()
|
||||
return
|
||||
}
|
||||
remoteOfferAnswer = o.latest
|
||||
// Clear the latest to mark it as being processed
|
||||
o.latest = nil
|
||||
o.mu.Unlock()
|
||||
}
|
||||
}(remoteOfferAnswer)
|
||||
}
|
||||
39
client/internal/peer/handshaker_listener_test.go
Normal file
39
client/internal/peer/handshaker_listener_test.go
Normal file
@@ -0,0 +1,39 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func Test_newOfferListener(t *testing.T) {
|
||||
dummyOfferAnswer := &OfferAnswer{}
|
||||
runChan := make(chan struct{}, 10)
|
||||
|
||||
longRunningFn := func(remoteOfferAnswer *OfferAnswer) {
|
||||
time.Sleep(1 * time.Second)
|
||||
runChan <- struct{}{}
|
||||
}
|
||||
|
||||
hl := NewOfferListener(longRunningFn)
|
||||
|
||||
hl.Notify(dummyOfferAnswer)
|
||||
hl.Notify(dummyOfferAnswer)
|
||||
hl.Notify(dummyOfferAnswer)
|
||||
|
||||
// Wait for exactly 2 callbacks
|
||||
for i := 0; i < 2; i++ {
|
||||
select {
|
||||
case <-runChan:
|
||||
case <-time.After(3 * time.Second):
|
||||
t.Fatal("Timeout waiting for callback")
|
||||
}
|
||||
}
|
||||
|
||||
// Verify no additional callbacks happen
|
||||
select {
|
||||
case <-runChan:
|
||||
t.Fatal("Unexpected additional callback")
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
t.Log("Correctly received exactly 2 callbacks")
|
||||
}
|
||||
}
|
||||
@@ -122,7 +122,6 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
w.log.Warnf("failed to close ICE agent: %s", err)
|
||||
}
|
||||
w.agent = nil
|
||||
// todo consider to switch to Relay connection while establishing a new ICE connection
|
||||
}
|
||||
|
||||
var preferredCandidateTypes []ice.CandidateType
|
||||
@@ -410,7 +409,10 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia
|
||||
case ice.ConnectionStateConnected:
|
||||
w.lastKnownState = ice.ConnectionStateConnected
|
||||
return
|
||||
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
|
||||
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected, ice.ConnectionStateClosed:
|
||||
// ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to
|
||||
// notify the conn.onICEStateDisconnected changes to update the current used priority
|
||||
|
||||
if w.lastKnownState == ice.ConnectionStateConnected {
|
||||
w.lastKnownState = ice.ConnectionStateDisconnected
|
||||
w.conn.onICEStateDisconnected()
|
||||
|
||||
Reference in New Issue
Block a user