[client] Avoid duplicated agent close (#4383)

This commit is contained in:
Zoltan Papp
2025-08-20 18:50:51 +02:00
committed by GitHub
parent f9d64a06c2
commit f425870c8e
2 changed files with 32 additions and 11 deletions

View File

@@ -1,6 +1,7 @@
package ice package ice
import ( import (
"sync"
"time" "time"
"github.com/pion/ice/v3" "github.com/pion/ice/v3"
@@ -23,7 +24,20 @@ const (
iceRelayAcceptanceMinWaitDefault = 2 * time.Second iceRelayAcceptanceMinWaitDefault = 2 * time.Second
) )
func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ice.Agent, error) { type ThreadSafeAgent struct {
*ice.Agent
once sync.Once
}
func (a *ThreadSafeAgent) Close() error {
var err error
a.once.Do(func() {
err = a.Agent.Close()
})
return err
}
func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ThreadSafeAgent, error) {
iceKeepAlive := iceKeepAlive() iceKeepAlive := iceKeepAlive()
iceDisconnectedTimeout := iceDisconnectedTimeout() iceDisconnectedTimeout := iceDisconnectedTimeout()
iceFailedTimeout := iceFailedTimeout() iceFailedTimeout := iceFailedTimeout()
@@ -61,7 +75,12 @@ func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candida
agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4} agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4}
} }
return ice.NewAgent(agentConfig) agent, err := ice.NewAgent(agentConfig)
if err != nil {
return nil, err
}
return &ThreadSafeAgent{Agent: agent}, nil
} }
func GenerateICECredentials() (string, string, error) { func GenerateICECredentials() (string, string, error) {

View File

@@ -42,7 +42,7 @@ type WorkerICE struct {
statusRecorder *Status statusRecorder *Status
hasRelayOnLocally bool hasRelayOnLocally bool
agent *ice.Agent agent *icemaker.ThreadSafeAgent
agentDialerCancel context.CancelFunc agentDialerCancel context.CancelFunc
agentConnecting bool // while it is true, drop all incoming offers agentConnecting bool // while it is true, drop all incoming offers
lastSuccess time.Time // with this avoid the too frequent ICE agent recreation lastSuccess time.Time // with this avoid the too frequent ICE agent recreation
@@ -121,6 +121,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
if err := w.agent.Close(); err != nil { if err := w.agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err) w.log.Warnf("failed to close ICE agent: %s", err)
} }
w.agent = nil
// todo consider to switch to Relay connection while establishing a new ICE connection // todo consider to switch to Relay connection while establishing a new ICE connection
} }
@@ -195,7 +196,7 @@ func (w *WorkerICE) Close() {
w.agent = nil w.agent = nil
} }
func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*ice.Agent, error) { func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*icemaker.ThreadSafeAgent, error) {
agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd) agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd)
if err != nil { if err != nil {
return nil, fmt.Errorf("create agent: %w", err) return nil, fmt.Errorf("create agent: %w", err)
@@ -230,7 +231,7 @@ func (w *WorkerICE) SessionID() ICESessionID {
// will block until connection succeeded // will block until connection succeeded
// but it won't release if ICE Agent went into Disconnected or Failed state, // but it won't release if ICE Agent went into Disconnected or Failed state,
// so we have to cancel it with the provided context once agent detected a broken connection // so we have to cancel it with the provided context once agent detected a broken connection
func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAnswer *OfferAnswer) { func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) {
w.log.Debugf("gather candidates") w.log.Debugf("gather candidates")
if err := agent.GatherCandidates(); err != nil { if err := agent.GatherCandidates(); err != nil {
w.log.Warnf("failed to gather candidates: %s", err) w.log.Warnf("failed to gather candidates: %s", err)
@@ -239,7 +240,7 @@ func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAn
} }
w.log.Debugf("turn agent dial") w.log.Debugf("turn agent dial")
remoteConn, err := w.turnAgentDial(ctx, remoteOfferAnswer) remoteConn, err := w.turnAgentDial(ctx, agent, remoteOfferAnswer)
if err != nil { if err != nil {
w.log.Debugf("failed to dial the remote peer: %s", err) w.log.Debugf("failed to dial the remote peer: %s", err)
w.closeAgent(agent, w.agentDialerCancel) w.closeAgent(agent, w.agentDialerCancel)
@@ -290,13 +291,14 @@ func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAn
w.conn.onICEConnectionIsReady(selectedPriority(pair), ci) w.conn.onICEConnectionIsReady(selectedPriority(pair), ci)
} }
func (w *WorkerICE) closeAgent(agent *ice.Agent, cancel context.CancelFunc) { func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.CancelFunc) {
cancel() cancel()
if err := agent.Close(); err != nil { if err := agent.Close(); err != nil {
w.log.Warnf("failed to close ICE agent: %s", err) w.log.Warnf("failed to close ICE agent: %s", err)
} }
w.muxAgent.Lock() w.muxAgent.Lock()
// todo review does it make sense to generate new session ID all the time when w.agent==agent
sessionID, err := NewICESessionID() sessionID, err := NewICESessionID()
if err != nil { if err != nil {
w.log.Errorf("failed to create new session ID: %s", err) w.log.Errorf("failed to create new session ID: %s", err)
@@ -379,7 +381,7 @@ func (w *WorkerICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidat
w.config.Key) w.config.Key)
} }
func (w *WorkerICE) onConnectionStateChange(agent *ice.Agent, dialerCancel context.CancelFunc) func(ice.ConnectionState) { func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dialerCancel context.CancelFunc) func(ice.ConnectionState) {
return func(state ice.ConnectionState) { return func(state ice.ConnectionState) {
w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
switch state { switch state {
@@ -412,12 +414,12 @@ func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool
return false return false
} }
func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) { func (w *WorkerICE) turnAgentDial(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) {
isControlling := w.config.LocalKey > w.config.Key isControlling := w.config.LocalKey > w.config.Key
if isControlling { if isControlling {
return w.agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) return agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
} else { } else {
return w.agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) return agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd)
} }
} }