diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 8df9aa700..fe3f16791 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -208,6 +208,7 @@ func (conn *Conn) Open(engineCtx context.Context) error { // both peer send offer if err := conn.handshaker.SendOffer(); err != nil { + conn.guard.FailedToSendOffer() conn.Log.Errorf("failed to send offer: %v", err) } diff --git a/client/internal/peer/guard/guard.go b/client/internal/peer/guard/guard.go index b85ec02fa..2cc11f674 100644 --- a/client/internal/peer/guard/guard.go +++ b/client/internal/peer/guard/guard.go @@ -7,6 +7,10 @@ import ( log "github.com/sirupsen/logrus" ) +const ( + offerResendPeriod = 2 * time.Second +) + type isConnectedFunc func() bool // Guard is responsible for the reconnection logic. @@ -24,6 +28,7 @@ type Guard struct { srWatcher *SRWatcher relayedConnDisconnected chan struct{} iCEConnDisconnected chan struct{} + offerError chan struct{} } func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard { @@ -34,6 +39,7 @@ func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Durati srWatcher: srWatcher, relayedConnDisconnected: make(chan struct{}, 1), iCEConnDisconnected: make(chan struct{}, 1), + offerError: make(chan struct{}, 1), } } @@ -56,26 +62,48 @@ func (g *Guard) SetICEConnDisconnected() { } } +func (g *Guard) FailedToSendOffer() { + select { + case g.iCEConnDisconnected <- struct{}{}: + default: + } +} + // reconnectLoopWithRetry periodically check the connection status. // Try to send offer while the P2P is not established or while the Relay is not connected if is it supported func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) { srReconnectedChan := g.srWatcher.NewListener() defer g.srWatcher.RemoveListener(srReconnectedChan) + offerResendTimer := time.NewTimer(0) + offerResendTimer.Stop() + defer offerResendTimer.Stop() + for { select { case <-g.relayedConnDisconnected: g.log.Debugf("Relay connection changed, reset reconnection ticker") + offerResendTimer.Stop() if !g.isConnectedOnAllWay() { callback() } case <-g.iCEConnDisconnected: g.log.Debugf("ICE connection changed, reset reconnection ticker") + offerResendTimer.Stop() if !g.isConnectedOnAllWay() { callback() } case <-srReconnectedChan: g.log.Debugf("has network changes, reset reconnection ticker") + offerResendTimer.Stop() + if !g.isConnectedOnAllWay() { + callback() + } + case <-g.offerError: + g.log.Debugf("failed to send offer, reset reconnection ticker") + offerResendTimer.Reset(offerResendPeriod) + continue + case <-offerResendTimer.C: if !g.isConnectedOnAllWay() { callback() } diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index e4e23253f..ecadb17fa 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -79,18 +79,15 @@ func (h *Handshaker) Listen(ctx context.Context) { for { select { case remoteOfferAnswer := <-h.remoteOffersCh: - - if isController(h.config) { - if err := h.sendAnswer(); err != nil { - h.log.Errorf("failed to send remote offer confirmation: %s", err) - continue - } + h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString()) + if err := h.sendAnswer(); err != nil { + h.log.Errorf("failed to send remote offer confirmation: %s", err) + continue } for _, listener := range h.onNewOfferListeners { listener.Notify(&remoteOfferAnswer) } - h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString()) case remoteOfferAnswer := <-h.remoteAnswerCh: h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString()) for _, listener := range h.onNewOfferListeners { diff --git a/signal/server/signal.go b/signal/server/signal.go index 47f01edae..8881a9771 100644 --- a/signal/server/signal.go +++ b/signal/server/signal.go @@ -100,7 +100,10 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto. return &proto.EncryptedMessage{}, nil } - return s.dispatcher.SendMessage(ctx, msg) + return nil, status.Errorf(codes.FailedPrecondition, "remote peer not connected") + + // todo handle dispatcher errors + //return s.dispatcher.SendMessage(ctx, msg) } // ConnectStream connects to the exchange stream