diff --git a/client/internal/peer/ice/agent.go b/client/internal/peer/ice/agent.go index 4a0228405..58c1bf634 100644 --- a/client/internal/peer/ice/agent.go +++ b/client/internal/peer/ice/agent.go @@ -1,6 +1,7 @@ package ice import ( + "sync" "time" "github.com/pion/ice/v3" @@ -23,7 +24,20 @@ const ( iceRelayAcceptanceMinWaitDefault = 2 * time.Second ) -func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ice.Agent, error) { +type ThreadSafeAgent struct { + *ice.Agent + once sync.Once +} + +func (a *ThreadSafeAgent) Close() error { + var err error + a.once.Do(func() { + err = a.Agent.Close() + }) + return err +} + +func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candidateTypes []ice.CandidateType, ufrag string, pwd string) (*ThreadSafeAgent, error) { iceKeepAlive := iceKeepAlive() iceDisconnectedTimeout := iceDisconnectedTimeout() iceFailedTimeout := iceFailedTimeout() @@ -61,7 +75,12 @@ func NewAgent(iFaceDiscover stdnet.ExternalIFaceDiscover, config Config, candida agentConfig.NetworkTypes = []ice.NetworkType{ice.NetworkTypeUDP4} } - return ice.NewAgent(agentConfig) + agent, err := ice.NewAgent(agentConfig) + if err != nil { + return nil, err + } + + return &ThreadSafeAgent{Agent: agent}, nil } func GenerateICECredentials() (string, string, error) { diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index ee85254fb..4f00af829 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -42,7 +42,7 @@ type WorkerICE struct { statusRecorder *Status hasRelayOnLocally bool - agent *ice.Agent + agent *icemaker.ThreadSafeAgent agentDialerCancel context.CancelFunc agentConnecting bool // while it is true, drop all incoming offers lastSuccess time.Time // with this avoid the too frequent ICE agent recreation @@ -121,6 +121,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { if err := w.agent.Close(); err != nil { w.log.Warnf("failed to close ICE agent: %s", err) } + w.agent = nil // todo consider to switch to Relay connection while establishing a new ICE connection } @@ -195,7 +196,7 @@ func (w *WorkerICE) Close() { w.agent = nil } -func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*ice.Agent, error) { +func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*icemaker.ThreadSafeAgent, error) { agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd) if err != nil { return nil, fmt.Errorf("create agent: %w", err) @@ -230,7 +231,7 @@ func (w *WorkerICE) SessionID() ICESessionID { // will block until connection succeeded // but it won't release if ICE Agent went into Disconnected or Failed state, // so we have to cancel it with the provided context once agent detected a broken connection -func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAnswer *OfferAnswer) { +func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) { w.log.Debugf("gather candidates") if err := agent.GatherCandidates(); err != nil { w.log.Warnf("failed to gather candidates: %s", err) @@ -239,7 +240,7 @@ func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAn } w.log.Debugf("turn agent dial") - remoteConn, err := w.turnAgentDial(ctx, remoteOfferAnswer) + remoteConn, err := w.turnAgentDial(ctx, agent, remoteOfferAnswer) if err != nil { w.log.Debugf("failed to dial the remote peer: %s", err) w.closeAgent(agent, w.agentDialerCancel) @@ -290,13 +291,14 @@ func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAn w.conn.onICEConnectionIsReady(selectedPriority(pair), ci) } -func (w *WorkerICE) closeAgent(agent *ice.Agent, cancel context.CancelFunc) { +func (w *WorkerICE) closeAgent(agent *icemaker.ThreadSafeAgent, cancel context.CancelFunc) { cancel() if err := agent.Close(); err != nil { w.log.Warnf("failed to close ICE agent: %s", err) } w.muxAgent.Lock() + // todo review does it make sense to generate new session ID all the time when w.agent==agent sessionID, err := NewICESessionID() if err != nil { w.log.Errorf("failed to create new session ID: %s", err) @@ -379,7 +381,7 @@ func (w *WorkerICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidat w.config.Key) } -func (w *WorkerICE) onConnectionStateChange(agent *ice.Agent, dialerCancel context.CancelFunc) func(ice.ConnectionState) { +func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dialerCancel context.CancelFunc) func(ice.ConnectionState) { return func(state ice.ConnectionState) { w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) switch state { @@ -412,12 +414,12 @@ func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool return false } -func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) { +func (w *WorkerICE) turnAgentDial(ctx context.Context, agent *icemaker.ThreadSafeAgent, remoteOfferAnswer *OfferAnswer) (*ice.Conn, error) { isControlling := w.config.LocalKey > w.config.Key if isControlling { - return w.agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) + return agent.Dial(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) } else { - return w.agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) + return agent.Accept(ctx, remoteOfferAnswer.IceCredentials.UFrag, remoteOfferAnswer.IceCredentials.Pwd) } }