Block close call

This commit is contained in:
Zoltán Papp
2025-03-14 13:30:24 +01:00
parent 6107b394a0
commit e7098d7b77
6 changed files with 85 additions and 105 deletions

View File

@@ -30,6 +30,7 @@ type ConnMgr struct {
connStateListener *peer.ConnectionListener
wg sync.WaitGroup
ctx context.Context
ctxCancel context.CancelFunc
}
@@ -53,6 +54,7 @@ func (e *ConnMgr) Start(parentCtx context.Context) {
}
ctx, cancel := context.WithCancel(parentCtx)
e.ctx = ctx
e.ctxCancel = cancel
e.wg.Add(1)
@@ -75,7 +77,7 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
}
if !e.isStartedWithLazyMgr() {
conn.Open()
conn.Open(e.ctx)
return
}
@@ -88,13 +90,13 @@ func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
if err != nil {
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
conn.Open()
conn.Open(e.ctx)
return
}
if excluded {
conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection")
conn.Open()
conn.Open(e.ctx)
return
}
@@ -113,7 +115,7 @@ func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) {
}
if found := e.lazyConnMgr.RunInactivityMonitor(peerKey); found {
conn.Open()
conn.Open(e.ctx)
}
return conn, true
}
@@ -151,7 +153,7 @@ func (e *ConnMgr) receiveLazyEvents(ctx context.Context) {
case <-ctx.Done():
return
case peerID := <-e.lazyConnMgr.OnActive:
e.peerStore.PeerConnOpen(peerID)
e.peerStore.PeerConnOpen(e.ctx, peerID)
case peerID := <-e.lazyConnMgr.Idle:
// todo consider to use engine lock
e.peerStore.PeerConnClose(peerID)

View File

@@ -1198,7 +1198,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix) (*peer
},
}
peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore, e.peerConnDispatcher)
peerConn, err := peer.NewConn(config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore, e.peerConnDispatcher)
if err != nil {
return nil, err
}

View File

@@ -93,8 +93,9 @@ type Conn struct {
config ConnConfig
statusRecorder *Status
signaler *Signaler
iFaceDiscover stdnet.ExternalIFaceDiscover
relayManager *relayClient.Manager
handshaker *Handshaker
srWatcher *guard.SRWatcher
onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)
onDisconnected func(remotePeer string)
@@ -114,74 +115,81 @@ type Conn struct {
wgProxyICE wgproxy.Proxy
wgProxyRelay wgproxy.Proxy
handshaker *Handshaker
guard *guard.Guard
semaphore *semaphoregroup.SemaphoreGroup
wg sync.WaitGroup
peerConnDispatcher *ConnectionDispatcher
}
// NewConn creates a new not opened Conn to the remote peer.
// To establish a connection run Conn.Open
func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup, peerConnDispatcher *ConnectionDispatcher) (*Conn, error) {
func NewConn(config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup, peerConnDispatcher *ConnectionDispatcher) (*Conn, error) {
if len(config.WgConfig.AllowedIps) == 0 {
return nil, fmt.Errorf("allowed IPs is empty")
}
ctx, ctxCancel := context.WithCancel(engineCtx)
connLog := log.WithField("peer", config.Key)
var conn = &Conn{
Log: connLog,
ctx: ctx,
ctxCancel: ctxCancel,
config: config,
statusRecorder: statusRecorder,
signaler: signaler,
iFaceDiscover: iFaceDiscover,
relayManager: relayManager,
srWatcher: srWatcher,
statusRelay: NewAtomicConnStatus(),
statusICE: NewAtomicConnStatus(),
semaphore: semaphore,
peerConnDispatcher: peerConnDispatcher,
}
ctrl := isController(config)
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
workerICE, err := NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
if err != nil {
return nil, err
}
conn.workerICE = workerICE
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
if os.Getenv("NB_FORCE_RELAY") != "true" {
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
}
conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher)
return conn, nil
}
// Open opens connection to the remote peer
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
// be used.
func (conn *Conn) Open() {
conn.semaphore.Add(conn.ctx)
func (conn *Conn) Open(engineCtx context.Context) error {
conn.semaphore.Add(engineCtx)
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.opened {
conn.semaphore.Done(conn.ctx)
return
conn.semaphore.Done(engineCtx)
return nil
}
conn.opened = true
go conn.handshaker.Listen()
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
ctrl := isController(conn.config)
conn.workerRelay = NewWorkerRelay(conn.Log, ctrl, conn.config, conn, conn.relayManager)
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally)
if err != nil {
return err
}
conn.workerICE = workerICE
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
if os.Getenv("NB_FORCE_RELAY") != "true" {
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
}
conn.guard = guard.NewGuard(conn.Log, ctrl, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
conn.handshaker.Listen(conn.ctx)
}()
peerState := State{
PubKey: conn.config.Key,
@@ -189,25 +197,28 @@ func (conn *Conn) Open() {
ConnStatus: StatusConnecting,
Mux: new(sync.RWMutex),
}
err := conn.statusRecorder.UpdatePeerState(peerState)
if err != nil {
if err := conn.statusRecorder.UpdatePeerState(peerState); err != nil {
conn.Log.Warnf("error while updating the state err: %v", err)
}
go conn.startHandshakeAndReconnect(conn.ctx)
}
conn.wg.Add(1)
go func() {
defer conn.wg.Done()
conn.waitInitialRandomSleepTime(conn.ctx)
conn.semaphore.Done(conn.ctx)
func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) {
defer conn.semaphore.Done(conn.ctx)
conn.waitInitialRandomSleepTime(ctx)
if err := conn.handshaker.sendOffer(); err != nil {
conn.Log.Errorf("failed to send initial offer: %v", err)
}
err := conn.handshaker.sendOffer()
if err != nil {
conn.Log.Errorf("failed to send initial offer: %v", err)
}
go conn.guard.Start(ctx)
go conn.listenGuardEvent(ctx)
conn.wg.Add(1)
go func() {
conn.guard.Start(conn.ctx, conn.onGuardEvent)
conn.wg.Done()
}()
}()
conn.opened = true
return nil
}
// Close closes this peer Conn issuing a close event to the Conn closeCh
@@ -255,6 +266,7 @@ func (conn *Conn) Close() {
conn.setStatusToDisconnected()
conn.opened = false
conn.wg.Wait()
}
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
@@ -316,10 +328,6 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ctx.Err() != nil {
return
}
if remoteConnNil(conn.Log, iceConnInfo.RemoteConn) {
conn.Log.Errorf("remote ICE connection is nil")
return
@@ -394,10 +402,6 @@ func (conn *Conn) onICEStateDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ctx.Err() != nil {
return
}
conn.Log.Tracef("ICE connection state changed to disconnected")
if conn.wgProxyICE != nil {
@@ -445,13 +449,6 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ctx.Err() != nil {
if err := rci.relayedConn.Close(); err != nil {
conn.Log.Warnf("failed to close unnecessary relayed connection: %v", err)
}
return
}
conn.Log.Debugf("Relay connection has been established, setup the WireGuard")
wgProxy, err := conn.newProxy(rci.relayedConn)
@@ -498,10 +495,6 @@ func (conn *Conn) onRelayDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.ctx.Err() != nil {
return
}
conn.Log.Debugf("relay connection is disconnected")
if conn.currentConnPriority == connPriorityRelay {
@@ -535,17 +528,10 @@ func (conn *Conn) onRelayDisconnected() {
}
}
func (conn *Conn) listenGuardEvent(ctx context.Context) {
for {
select {
case <-conn.guard.Reconnect:
conn.Log.Debugf("send offer to peer")
if err := conn.handshaker.SendOffer(); err != nil {
conn.Log.Errorf("failed to send offer: %v", err)
}
case <-ctx.Done():
return
}
func (conn *Conn) onGuardEvent() {
conn.Log.Debugf("send offer to peer")
if err := conn.handshaker.SendOffer(); err != nil {
conn.Log.Errorf("failed to send offer: %v", err)
}
}

View File

@@ -46,11 +46,11 @@ func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc,
}
}
func (g *Guard) Start(ctx context.Context) {
func (g *Guard) Start(ctx context.Context, eventCallback func()) {
if g.isController {
g.reconnectLoopWithRetry(ctx)
g.reconnectLoopWithRetry(ctx, eventCallback)
} else {
g.listenForDisconnectEvents(ctx)
g.listenForDisconnectEvents(ctx, eventCallback)
}
}
@@ -70,7 +70,7 @@ func (g *Guard) SetICEConnDisconnected() {
// reconnectLoopWithRetry periodically check (max 30 min) the connection status.
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
waitForInitialConnectionTry(ctx)
srReconnectedChan := g.srWatcher.NewListener()
@@ -93,7 +93,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
}
if !g.isConnectedOnAllWay() {
g.triggerOfferSending()
callback()
}
case <-g.relayedConnDisconnected:
@@ -125,7 +125,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
// when the connection is lost. It will try to establish a connection only once time if before the connection was established
// It track separately the ice and relay connection status. Just because a lower priority connection reestablished it does not
// mean that to switch to it. We always force to use the higher priority connection.
func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
func (g *Guard) listenForDisconnectEvents(ctx context.Context, callback func()) {
srReconnectedChan := g.srWatcher.NewListener()
defer g.srWatcher.RemoveListener(srReconnectedChan)
@@ -134,12 +134,12 @@ func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
select {
case <-g.relayedConnDisconnected:
g.log.Debugf("Relay connection changed, triggering reconnect")
g.triggerOfferSending()
callback()
case <-g.iCEConnDisconnected:
g.log.Debugf("ICE state changed, try to send new offer")
g.triggerOfferSending()
callback()
case <-srReconnectedChan:
g.triggerOfferSending()
callback()
case <-ctx.Done():
g.log.Debugf("context is done, stop reconnect loop")
return
@@ -164,13 +164,6 @@ func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
return ticker
}
func (g *Guard) triggerOfferSending() {
select {
case g.Reconnect <- struct{}{}:
default:
}
}
// Give chance to the peer to establish the initial connection.
// With it, we can decrease to send necessary offer
func waitForInitialConnectionTry(ctx context.Context) {

View File

@@ -43,7 +43,6 @@ type OfferAnswer struct {
type Handshaker struct {
mu sync.Mutex
ctx context.Context
log *log.Entry
config ConnConfig
signaler *Signaler
@@ -57,9 +56,8 @@ type Handshaker struct {
remoteAnswerCh chan OfferAnswer
}
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
return &Handshaker{
ctx: ctx,
log: log,
config: config,
signaler: signaler,
@@ -74,10 +72,10 @@ func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAn
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
}
func (h *Handshaker) Listen() {
func (h *Handshaker) Listen(ctx context.Context) {
for {
h.log.Debugf("wait for remote offer confirmation")
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation(ctx)
if err != nil {
var connectionClosedError *ConnectionClosedError
if errors.As(err, &connectionClosedError) {
@@ -127,7 +125,7 @@ func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool {
}
}
func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
func (h *Handshaker) waitForRemoteOfferConfirmation(ctx context.Context) (*OfferAnswer, error) {
select {
case remoteOfferAnswer := <-h.remoteOffersCh:
// received confirmation from the remote peer -> ready to proceed
@@ -138,7 +136,7 @@ func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
return &remoteOfferAnswer, nil
case remoteOfferAnswer := <-h.remoteAnswerCh:
return &remoteOfferAnswer, nil
case <-h.ctx.Done():
case <-ctx.Done():
// closed externally
return nil, NewConnectionClosedError(h.config.Key)
}

View File

@@ -1,6 +1,7 @@
package peerstore
import (
"context"
"net/netip"
"sync"
@@ -79,7 +80,7 @@ func (s *Store) PeerConn(pubKey string) (*peer.Conn, bool) {
return p, true
}
func (s *Store) PeerConnOpen(pubKey string) {
func (s *Store) PeerConnOpen(ctx context.Context, pubKey string) {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()
@@ -88,7 +89,7 @@ func (s *Store) PeerConnOpen(pubKey string) {
return
}
// this can be blocked because of the connect open limiter semaphore
p.Open()
p.Open(ctx)
}
func (s *Store) PeerConnClose(pubKey string) {