diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index 3cbf74cfd..42eaea683 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -43,13 +43,6 @@ type OfferAnswer struct { SessionID *ICESessionID } -func (oa *OfferAnswer) SessionIDString() string { - if oa.SessionID == nil { - return "unknown" - } - return oa.SessionID.String() -} - type Handshaker struct { mu sync.Mutex log *log.Entry @@ -57,7 +50,7 @@ type Handshaker struct { signaler *Signaler ice *WorkerICE relay *WorkerRelay - onNewOfferListeners []func(*OfferAnswer) + onNewOfferListeners []*OfferListener // remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection remoteOffersCh chan OfferAnswer @@ -78,7 +71,8 @@ func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *W } func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) { - h.onNewOfferListeners = append(h.onNewOfferListeners, offer) + l := NewOfferListener(offer) + h.onNewOfferListeners = append(h.onNewOfferListeners, l) } func (h *Handshaker) Listen(ctx context.Context) { @@ -91,13 +85,13 @@ func (h *Handshaker) Listen(ctx context.Context) { continue } for _, listener := range h.onNewOfferListeners { - listener(&remoteOfferAnswer) + listener.Notify(&remoteOfferAnswer) } h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString()) case remoteOfferAnswer := <-h.remoteAnswerCh: h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString()) for _, listener := range h.onNewOfferListeners { - listener(&remoteOfferAnswer) + listener.Notify(&remoteOfferAnswer) } case <-ctx.Done(): h.log.Infof("stop listening for remote offers and answers") diff --git a/client/internal/peer/handshaker_listener.go b/client/internal/peer/handshaker_listener.go new file mode 100644 index 000000000..e2d3f3f38 --- /dev/null +++ b/client/internal/peer/handshaker_listener.go @@ -0,0 +1,62 @@ +package peer + +import ( + "sync" +) + +type callbackFunc func(remoteOfferAnswer *OfferAnswer) + +func (oa *OfferAnswer) SessionIDString() string { + if oa.SessionID == nil { + return "unknown" + } + return oa.SessionID.String() +} + +type OfferListener struct { + fn callbackFunc + running bool + latest *OfferAnswer + mu sync.Mutex +} + +func NewOfferListener(fn callbackFunc) *OfferListener { + return &OfferListener{ + fn: fn, + } +} + +func (o *OfferListener) Notify(remoteOfferAnswer *OfferAnswer) { + o.mu.Lock() + defer o.mu.Unlock() + + // Store the latest offer + o.latest = remoteOfferAnswer + + // If already running, the running goroutine will pick up this latest value + if o.running { + return + } + + // Start processing + o.running = true + + // Process in a goroutine to avoid blocking the caller + go func(remoteOfferAnswer *OfferAnswer) { + for { + o.fn(remoteOfferAnswer) + + o.mu.Lock() + if o.latest == nil { + // No more work to do + o.running = false + o.mu.Unlock() + return + } + remoteOfferAnswer = o.latest + // Clear the latest to mark it as being processed + o.latest = nil + o.mu.Unlock() + } + }(remoteOfferAnswer) +} diff --git a/client/internal/peer/handshaker_listener_test.go b/client/internal/peer/handshaker_listener_test.go new file mode 100644 index 000000000..8363741a5 --- /dev/null +++ b/client/internal/peer/handshaker_listener_test.go @@ -0,0 +1,39 @@ +package peer + +import ( + "testing" + "time" +) + +func Test_newOfferListener(t *testing.T) { + dummyOfferAnswer := &OfferAnswer{} + runChan := make(chan struct{}, 10) + + longRunningFn := func(remoteOfferAnswer *OfferAnswer) { + time.Sleep(1 * time.Second) + runChan <- struct{}{} + } + + hl := NewOfferListener(longRunningFn) + + hl.Notify(dummyOfferAnswer) + hl.Notify(dummyOfferAnswer) + hl.Notify(dummyOfferAnswer) + + // Wait for exactly 2 callbacks + for i := 0; i < 2; i++ { + select { + case <-runChan: + case <-time.After(3 * time.Second): + t.Fatal("Timeout waiting for callback") + } + } + + // Verify no additional callbacks happen + select { + case <-runChan: + t.Fatal("Unexpected additional callback") + case <-time.After(100 * time.Millisecond): + t.Log("Correctly received exactly 2 callbacks") + } +} diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index e80641770..896c55b6c 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -122,7 +122,6 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.log.Warnf("failed to close ICE agent: %s", err) } w.agent = nil - // todo consider to switch to Relay connection while establishing a new ICE connection } var preferredCandidateTypes []ice.CandidateType @@ -410,7 +409,10 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia case ice.ConnectionStateConnected: w.lastKnownState = ice.ConnectionStateConnected return - case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected: + case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected, ice.ConnectionStateClosed: + // ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to + // notify the conn.onICEStateDisconnected changes to update the current used priority + if w.lastKnownState == ice.ConnectionStateConnected { w.lastKnownState = ice.ConnectionStateDisconnected w.conn.onICEStateDisconnected()