Add signal error handling

This commit is contained in:
Zoltán Papp
2025-09-23 18:54:40 +02:00
parent 52d8bdfc78
commit 0753766336
4 changed files with 37 additions and 8 deletions

View File

@@ -208,6 +208,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
// both peer send offer
if err := conn.handshaker.SendOffer(); err != nil {
conn.guard.FailedToSendOffer()
conn.Log.Errorf("failed to send offer: %v", err)
}

View File

@@ -7,6 +7,10 @@ import (
log "github.com/sirupsen/logrus"
)
const (
offerResendPeriod = 2 * time.Second
)
type isConnectedFunc func() bool
// Guard is responsible for the reconnection logic.
@@ -24,6 +28,7 @@ type Guard struct {
srWatcher *SRWatcher
relayedConnDisconnected chan struct{}
iCEConnDisconnected chan struct{}
offerError chan struct{}
}
func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
@@ -34,6 +39,7 @@ func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Durati
srWatcher: srWatcher,
relayedConnDisconnected: make(chan struct{}, 1),
iCEConnDisconnected: make(chan struct{}, 1),
offerError: make(chan struct{}, 1),
}
}
@@ -56,26 +62,48 @@ func (g *Guard) SetICEConnDisconnected() {
}
}
func (g *Guard) FailedToSendOffer() {
select {
case g.iCEConnDisconnected <- struct{}{}:
default:
}
}
// reconnectLoopWithRetry periodically check the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan)
offerResendTimer := time.NewTimer(0)
offerResendTimer.Stop()
defer offerResendTimer.Stop()
for {
select {
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, reset reconnection ticker")
offerResendTimer.Stop()
if !g.isConnectedOnAllWay() {
callback()
}
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE connection changed, reset reconnection ticker")
offerResendTimer.Stop()
if !g.isConnectedOnAllWay() {
callback()
}
case <-srReconnectedChan:
g.log.Debugf("has network changes, reset reconnection ticker")
offerResendTimer.Stop()
if !g.isConnectedOnAllWay() {
callback()
}
case <-g.offerError:
g.log.Debugf("failed to send offer, reset reconnection ticker")
offerResendTimer.Reset(offerResendPeriod)
continue
case <-offerResendTimer.C:
if !g.isConnectedOnAllWay() {
callback()
}

View File

@@ -79,18 +79,15 @@ func (h *Handshaker) Listen(ctx context.Context) {
for {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
if isController(h.config) {
if err := h.sendAnswer(); err != nil {
h.log.Errorf("failed to send remote offer confirmation: %s", err)
continue
}
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
if err := h.sendAnswer(); err != nil {
h.log.Errorf("failed to send remote offer confirmation: %s", err)
continue
}
for _, listener := range h.onNewOfferListeners {
listener.Notify(&remoteOfferAnswer)
}
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
case remoteOfferAnswer := <-h.remoteAnswerCh:
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
for _, listener := range h.onNewOfferListeners {

View File

@@ -100,7 +100,10 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.
return &proto.EncryptedMessage{}, nil
}
return s.dispatcher.SendMessage(ctx, msg)
return nil, status.Errorf(codes.FailedPrecondition, "remote peer not connected")
// todo handle dispatcher errors
//return s.dispatcher.SendMessage(ctx, msg)
}
// ConnectStream connects to the exchange stream