diff --git a/client/internal/engine.go b/client/internal/engine.go index 2339866fb..a3192188b 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -254,6 +254,7 @@ func NewEngine( } engine.stateManager = statemanager.New(path) + log.Infof("I am: %s", config.WgPrivateKey.PublicKey().String()) return engine } diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index ddd90450d..b22cf5e8a 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -274,10 +274,10 @@ func (conn *Conn) Close(signalToRemote bool) { // OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise // doesn't block, discards the message if connection wasn't ready -func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool { +func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) { conn.dumpState.RemoteAnswer() conn.Log.Infof("OnRemoteAnswer, priority: %s, status ICE: %s, status relay: %s", conn.currentConnPriority, conn.statusICE, conn.statusRelay) - return conn.handshaker.OnRemoteAnswer(answer) + conn.handshaker.OnRemoteAnswer(answer) } // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. @@ -296,10 +296,10 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) { conn.onDisconnected = handler } -func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool { +func (conn *Conn) OnRemoteOffer(offer OfferAnswer) { conn.dumpState.RemoteOffer() conn.Log.Infof("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay) - return conn.handshaker.OnRemoteOffer(offer) + conn.handshaker.OnRemoteOffer(offer) } // WgConfig returns the WireGuard config diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index bf4335fe5..234c91133 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -74,21 +74,23 @@ func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAn func (h *Handshaker) Listen(ctx context.Context) { for { - h.log.Info("wait for remote offer confirmation") - remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation(ctx) - if err != nil { - var connectionClosedError *ConnectionClosedError - if errors.As(err, &connectionClosedError) { - h.log.Info("exit from handshaker") - return + select { + case remoteOfferAnswer := <-h.remoteOffersCh: + // received confirmation from the remote peer -> ready to proceed + if err := h.sendAnswer(); err != nil { + h.log.Errorf("failed to send remote offer confirmation: %s", err) + continue } - h.log.Errorf("failed to received remote offer confirmation: %s", err) - continue - } - - h.log.Infof("received connection confirmation, running version %s and with remote WireGuard listen port %d", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort) - for _, listener := range h.onNewOfferListeners { - go listener(remoteOfferAnswer) + for _, listener := range h.onNewOfferListeners { + listener(&remoteOfferAnswer) + } + case remoteOfferAnswer := <-h.remoteAnswerCh: + for _, listener := range h.onNewOfferListeners { + listener(&remoteOfferAnswer) + } + case <-ctx.Done(): + h.log.Infof("stop listening for remote offers and answers") + return } } } @@ -101,43 +103,27 @@ func (h *Handshaker) SendOffer() error { // OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise // doesn't block, discards the message if connection wasn't ready -func (h *Handshaker) OnRemoteOffer(offer OfferAnswer) bool { +func (h *Handshaker) OnRemoteOffer(offer OfferAnswer) { select { case h.remoteOffersCh <- offer: - return true + return default: - h.log.Warnf("OnRemoteOffer skipping message because is not ready") + h.log.Warnf("skipping remote offer message because receiver not ready") // connection might not be ready yet to receive so we ignore the message - return false + return } } // OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise // doesn't block, discards the message if connection wasn't ready -func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool { +func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) { select { case h.remoteAnswerCh <- answer: - return true + return default: // connection might not be ready yet to receive so we ignore the message - h.log.Debugf("OnRemoteAnswer skipping message because is not ready") - return false - } -} - -func (h *Handshaker) waitForRemoteOfferConfirmation(ctx context.Context) (*OfferAnswer, error) { - select { - case remoteOfferAnswer := <-h.remoteOffersCh: - // received confirmation from the remote peer -> ready to proceed - if err := h.sendAnswer(); err != nil { - return nil, err - } - return &remoteOfferAnswer, nil - case remoteOfferAnswer := <-h.remoteAnswerCh: - return &remoteOfferAnswer, nil - case <-ctx.Done(): - // closed externally - return nil, NewConnectionClosedError(h.config.Key) + h.log.Warnf("skipping remote answer message because receiver not ready") + return } } @@ -180,8 +166,7 @@ func (h *Handshaker) sendAnswer() error { answer.RelaySrvAddress = addr } - err = h.signaler.SignalAnswer(answer, h.config.Key) - if err != nil { + if err = h.signaler.SignalAnswer(answer, h.config.Key); err != nil { return err } diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index e99c50d25..cfa78c9e4 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -42,8 +42,11 @@ type WorkerICE struct { statusRecorder *Status hasRelayOnLocally bool - agent *ice.Agent - muxAgent sync.Mutex + agent *ice.Agent + agentDialerCancel context.CancelFunc + agentConnecting bool // while it is true, drop all incoming offers + lastSuccess time.Time // with this avoid the too frequent ICE agent recreation + muxAgent sync.Mutex StunTurn []*stun.URI @@ -82,12 +85,26 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { w.log.Debugf("OnNewOffer for ICE") w.muxAgent.Lock() - if w.agent != nil { - w.log.Debugf("agent already exists, skipping the offer") + if w.agentConnecting { + w.log.Debugf("agent connection is in progress, skipping the offer") w.muxAgent.Unlock() return } + if w.agent != nil { + if time.Since(w.lastSuccess) < 900*time.Millisecond { + w.log.Debugf("agent is already connected, skipping the offer") + w.muxAgent.Unlock() + return + } + w.log.Debugf("agent already exists, recreate the connection") + w.agentDialerCancel() + if err := w.agent.Close(); err != nil { + w.log.Warnf("failed to close ICE agent: %s", err) + } + // todo consider to switch to Relay connection while establishing a new ICE connection + } + var preferredCandidateTypes []ice.CandidateType if w.hasRelayOnLocally && remoteOfferAnswer.RelaySrvAddress != "" { preferredCandidateTypes = icemaker.CandidateTypesP2P() @@ -96,35 +113,106 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { } w.log.Debugf("recreate ICE agent") - agentCtx, agentCancel := context.WithCancel(w.ctx) - agent, err := w.reCreateAgent(agentCancel, preferredCandidateTypes) + dialerCtx, dialerCancel := context.WithCancel(w.ctx) + agent, err := w.reCreateAgent(dialerCancel, preferredCandidateTypes) if err != nil { w.log.Errorf("failed to recreate ICE Agent: %s", err) w.muxAgent.Unlock() return } + w.sentExtraSrflx = false w.agent = agent + w.agentDialerCancel = dialerCancel + w.agentConnecting = true w.muxAgent.Unlock() - w.log.Debugf("gather candidates") - err = w.agent.GatherCandidates() + go w.connect(dialerCtx, agent, remoteOfferAnswer) +} + +// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. +func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) { + w.muxAgent.Lock() + defer w.muxAgent.Unlock() + w.log.Debugf("OnRemoteCandidate from peer %s -> %s", w.config.Key, candidate.String()) + if w.agent == nil { + w.log.Warnf("ICE Agent is not initialized yet") + return + } + + if candidateViaRoutes(candidate, haRoutes) { + return + } + + if err := w.agent.AddRemoteCandidate(candidate); err != nil { + w.log.Errorf("error while handling remote candidate") + return + } +} + +func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) { + return w.localUfrag, w.localPwd +} + +func (w *WorkerICE) Close() { + w.muxAgent.Lock() + defer w.muxAgent.Unlock() + + if w.agent == nil { + return + } + + w.agentDialerCancel() + if err := w.agent.Close(); err != nil { + w.log.Warnf("failed to close ICE agent: %s", err) + } + + w.agent = nil +} + +func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []ice.CandidateType) (*ice.Agent, error) { + agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd) if err != nil { + return nil, fmt.Errorf("create agent: %w", err) + } + + if err := agent.OnCandidate(w.onICECandidate); err != nil { + return nil, err + } + + if err := agent.OnConnectionStateChange(w.onConnectionStateChange(agent, dialerCancel)); err != nil { + return nil, err + } + + if err := agent.OnSelectedCandidatePairChange(w.onICESelectedCandidatePair); err != nil { + return nil, err + } + + if err := agent.OnSuccessfulSelectedPairBindingResponse(w.onSuccessfulSelectedPairBindingResponse); err != nil { + return nil, fmt.Errorf("failed setting binding response callback: %w", err) + } + + return agent, nil +} + +// will block until connection succeeded +// but it won't release if ICE Agent went into Disconnected or Failed state, +// so we have to cancel it with the provided context once agent detected a broken connection +func (w *WorkerICE) connect(ctx context.Context, agent *ice.Agent, remoteOfferAnswer *OfferAnswer) { + w.log.Debugf("gather candidates") + if err := agent.GatherCandidates(); err != nil { w.log.Debugf("failed to gather candidates: %s", err) return } - // will block until connection succeeded - // but it won't release if ICE Agent went into Disconnected or Failed state, - // so we have to cancel it with the provided context once agent detected a broken connection w.log.Debugf("turn agent dial") - remoteConn, err := w.turnAgentDial(agentCtx, remoteOfferAnswer) + remoteConn, err := w.turnAgentDial(ctx, remoteOfferAnswer) if err != nil { w.log.Debugf("failed to dial the remote peer: %s", err) return } w.log.Debugf("agent dial succeeded") - pair, err := w.agent.GetSelectedCandidatePair() + pair, err := agent.GetSelectedCandidatePair() if err != nil { return } @@ -152,114 +240,28 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { RelayedOnLocal: isRelayCandidate(pair.Local), } w.log.Debugf("on ICE conn is ready to use") - go w.conn.onICEConnectionIsReady(selectedPriority(pair), ci) -} -// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. -func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HAMap) { w.muxAgent.Lock() - defer w.muxAgent.Unlock() - w.log.Debugf("OnRemoteCandidate from peer %s -> %s", w.config.Key, candidate.String()) - if w.agent == nil { - w.log.Warnf("ICE Agent is not initialized yet") - return - } + w.agentConnecting = false + w.lastSuccess = time.Now() + w.muxAgent.Unlock() - if candidateViaRoutes(candidate, haRoutes) { - return - } - - err := w.agent.AddRemoteCandidate(candidate) - if err != nil { - w.log.Errorf("error while handling remote candidate") - return - } + // todo: the potential problem is a race between the onConnectionStateChange + w.conn.onICEConnectionIsReady(selectedPriority(pair), ci) } -func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) { - w.muxAgent.Lock() - defer w.muxAgent.Unlock() - return w.localUfrag, w.localPwd -} - -func (w *WorkerICE) Close() { - w.muxAgent.Lock() - defer w.muxAgent.Unlock() - - if w.agent == nil { - return - } - - if err := w.agent.Close(); err != nil { - w.log.Warnf("failed to close ICE agent: %s", err) - } -} - -func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []ice.CandidateType) (*ice.Agent, error) { - w.sentExtraSrflx = false - - agent, err := icemaker.NewAgent(w.iFaceDiscover, w.config.ICEConfig, candidates, w.localUfrag, w.localPwd) - if err != nil { - return nil, fmt.Errorf("create agent: %w", err) - } - - err = agent.OnCandidate(w.onICECandidate) - if err != nil { - return nil, err - } - - err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { - w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) - switch state { - case ice.ConnectionStateConnected: - w.lastKnownState = ice.ConnectionStateConnected - return - case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected: - if w.lastKnownState == ice.ConnectionStateConnected { - w.lastKnownState = ice.ConnectionStateDisconnected - w.conn.onICEStateDisconnected() - } - w.closeAgent(agentCancel) - default: - return - } - }) - if err != nil { - return nil, err - } - - err = agent.OnSelectedCandidatePairChange(w.onICESelectedCandidatePair) - if err != nil { - return nil, err - } - - err = agent.OnSuccessfulSelectedPairBindingResponse(func(p *ice.CandidatePair) { - err := w.statusRecorder.UpdateLatency(w.config.Key, p.Latency()) - if err != nil { - w.log.Debugf("failed to update latency for peer: %s", err) - return - } - }) - if err != nil { - return nil, fmt.Errorf("failed setting binding response callback: %w", err) - } - - return agent, nil -} - -func (w *WorkerICE) closeAgent(cancel context.CancelFunc) { - w.muxAgent.Lock() - defer w.muxAgent.Unlock() - +func (w *WorkerICE) closeAgent(agent *ice.Agent, cancel context.CancelFunc) { cancel() - if w.agent == nil { - return - } - - if err := w.agent.Close(); err != nil { + if err := agent.Close(); err != nil { w.log.Warnf("failed to close ICE agent: %s", err) } - w.agent = nil + + w.muxAgent.Lock() + if w.agent == agent { + w.agent = nil + w.agentConnecting = false + } + w.muxAgent.Unlock() } func (w *WorkerICE) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) { @@ -331,6 +333,33 @@ func (w *WorkerICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidat w.config.Key) } +func (w *WorkerICE) onConnectionStateChange(agent *ice.Agent, dialerCancel context.CancelFunc) func(ice.ConnectionState) { + return func(state ice.ConnectionState) { + w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) + switch state { + case ice.ConnectionStateConnected: + w.lastKnownState = ice.ConnectionStateConnected + return + case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected: + if w.lastKnownState == ice.ConnectionStateConnected { + w.lastKnownState = ice.ConnectionStateDisconnected + w.conn.onICEStateDisconnected() + } + w.closeAgent(agent, dialerCancel) + default: + return + } + return + } +} + +func (w *WorkerICE) onSuccessfulSelectedPairBindingResponse(pair *ice.CandidatePair) { + if err := w.statusRecorder.UpdateLatency(w.config.Key, pair.Latency()); err != nil { + w.log.Debugf("failed to update latency for peer: %s", err) + return + } +} + func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool { if !w.sentExtraSrflx && candidate.Type() == ice.CandidateTypeServerReflexive && candidate.Port() != candidate.RelatedAddress().Port { return true