From e5d9e3fb133fc22ae428fa5d1e2d337211e08fd2 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Sat, 18 Jan 2025 01:11:31 +0100 Subject: [PATCH] code cleaning --- client/internal/peer/conn.go | 15 ++------- client/internal/peer/handshaker.go | 11 ++++--- client/internal/peer/worker_ice.go | 46 +++----------------------- client/internal/peer/worker_relay.go | 48 ++++------------------------ 4 files changed, 20 insertions(+), 100 deletions(-) diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 964870614..1b9e1d561 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -4,7 +4,6 @@ import ( "context" "math/rand" "net" - "os" "runtime" "strings" "sync" @@ -127,25 +126,15 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu } ctrl := isController(config) - conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager) - conn.workerRelay.SetOnConnReady(conn.relayConnectionIsReady) - conn.workerRelay.SetOnDisconnected(conn.onWorkerRelayStateDisconnected) - relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally() - conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally) + conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager) + conn.workerICE, err = NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, conn.workerRelay.RelayIsSupportedLocally()) if err != nil { return nil, err } - conn.workerICE.SetOnConnReady(conn.iCEConnectionIsReady) - conn.workerICE.SetOnDisconnected(conn.onWorkerICEStateDisconnected) conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay) - conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer) - if os.Getenv("NB_FORCE_RELAY") != "true" { - conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer) - } - conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher) go conn.handshaker.Listen() diff --git a/client/internal/peer/handshaker.go b/client/internal/peer/handshaker.go index 545f81966..a41516b54 100644 --- a/client/internal/peer/handshaker.go +++ b/client/internal/peer/handshaker.go @@ -3,6 +3,7 @@ package peer import ( "context" "errors" + "os" "sync" log "github.com/sirupsen/logrus" @@ -58,7 +59,7 @@ type Handshaker struct { } func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker { - return &Handshaker{ + hs := &Handshaker{ ctx: ctx, log: log, config: config, @@ -68,10 +69,12 @@ func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signa remoteOffersCh: make(chan OfferAnswer), remoteAnswerCh: make(chan OfferAnswer), } -} -func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) { - h.onNewOfferListeners = append(h.onNewOfferListeners, offer) + hs.onNewOfferListeners = append(hs.onNewOfferListeners, hs.relay.OnNewOffer) + if os.Getenv("NB_FORCE_RELAY") != "true" { + hs.onNewOfferListeners = append(hs.onNewOfferListeners, hs.ice.OnNewOffer) + } + return hs } func (h *Handshaker) Listen() { diff --git a/client/internal/peer/worker_ice.go b/client/internal/peer/worker_ice.go index 22d87abd3..f9bb79f50 100644 --- a/client/internal/peer/worker_ice.go +++ b/client/internal/peer/worker_ice.go @@ -36,15 +36,12 @@ type WorkerICE struct { ctx context.Context log *log.Entry config ConnConfig + conn *Conn signaler *Signaler iFaceDiscover stdnet.ExternalIFaceDiscover statusRecorder *Status hasRelayOnLocally bool - onConnReady func(ICEConnInfo) - onDisconnected func() - callBackMu sync.Mutex - agent *ice.Agent muxAgent sync.Mutex @@ -56,11 +53,12 @@ type WorkerICE struct { localPwd string } -func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) { +func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, conn *Conn, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) { w := &WorkerICE{ ctx: ctx, log: log, config: config, + conn: conn, signaler: signaler, iFaceDiscover: ifaceDiscover, statusRecorder: statusRecorder, @@ -157,7 +155,7 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) { Priority: selectedPriority, } w.log.Debugf("on ICE conn read to use ready") - w.notifyOnReady(ci) + w.conn.iCEConnectionIsReady(ci) } // OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. @@ -181,20 +179,6 @@ func (w *WorkerICE) OnRemoteCandidate(candidate ice.Candidate, haRoutes route.HA } } -func (w *WorkerICE) SetOnConnReady(f func(ICEConnInfo)) { - w.callBackMu.Lock() - defer w.callBackMu.Unlock() - - w.onConnReady = f -} - -func (w *WorkerICE) SetOnDisconnected(f func()) { - w.callBackMu.Lock() - defer w.callBackMu.Unlock() - - w.onDisconnected = f -} - func (w *WorkerICE) GetLocalUserCredentials() (frag string, pwd string) { w.muxAgent.Lock() defer w.muxAgent.Unlock() @@ -231,7 +215,7 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i err = agent.OnConnectionStateChange(func(state ice.ConnectionState) { w.log.Debugf("ICE ConnectionState has changed to %s", state.String()) if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected { - w.notifyDisconnected() + w.conn.onWorkerICEStateDisconnected() w.muxAgent.Lock() agentCancel() @@ -343,26 +327,6 @@ func (w *WorkerICE) turnAgentDial(ctx context.Context, remoteOfferAnswer *OfferA } } -func (w *WorkerICE) notifyDisconnected() { - w.callBackMu.Lock() - defer w.callBackMu.Unlock() - - if w.onDisconnected == nil { - return - } - w.onDisconnected() -} - -func (w *WorkerICE) notifyOnReady(ci ICEConnInfo) { - w.callBackMu.Lock() - defer w.callBackMu.Unlock() - - if w.onConnReady == nil { - return - } - w.onConnReady(ci) -} - func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) { relatedAdd := candidate.RelatedAddress() return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{ diff --git a/client/internal/peer/worker_relay.go b/client/internal/peer/worker_relay.go index 708534940..14e1a1cf4 100644 --- a/client/internal/peer/worker_relay.go +++ b/client/internal/peer/worker_relay.go @@ -34,10 +34,7 @@ type WorkerRelay struct { isController bool config ConnConfig relayManager relayClient.ManagerService - - onConnReady func(info RelayConnInfo) - onDisconnected func() - callBackMu sync.Mutex + conn *Conn relayedConn net.Conn relayLock sync.Mutex @@ -48,12 +45,13 @@ type WorkerRelay struct { relaySupportedOnRemotePeer atomic.Bool } -func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService) *WorkerRelay { +func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, conn *Conn, relayManager *relayClient.Manager) *WorkerRelay { r := &WorkerRelay{ log: log, isController: ctrl, config: config, relayManager: relayManager, + conn: conn, } return r } @@ -97,7 +95,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { } w.log.Debugf("peer conn opened via Relay: %s", srv) - go w.notifyOnReady(RelayConnInfo{ + go w.conn.relayConnectionIsReady(RelayConnInfo{ relayedConn: relayedConn, rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, rosenpassAddr: remoteOfferAnswer.RosenpassAddr, @@ -133,20 +131,6 @@ func (w *WorkerRelay) DisableWgWatcher() { w.ctxCancelWgWatch() } -func (w *WorkerRelay) SetOnConnReady(f func(info RelayConnInfo)) { - w.callBackMu.Lock() - defer w.callBackMu.Unlock() - - w.onConnReady = f -} - -func (w *WorkerRelay) SetOnDisconnected(f func()) { - w.callBackMu.Lock() - defer w.callBackMu.Unlock() - - w.onDisconnected = f -} - func (w *WorkerRelay) RelayInstanceAddress() (string, error) { return w.relayManager.RelayInstanceAddress() } @@ -203,7 +187,7 @@ func (w *WorkerRelay) wgStateCheck(ctx context.Context, ctxCancel context.Cancel w.relayLock.Lock() _ = w.relayedConn.Close() w.relayLock.Unlock() - go w.notifyDisconnected() + go w.conn.onWorkerRelayStateDisconnected() return } @@ -248,25 +232,5 @@ func (w *WorkerRelay) onRelayMGDisconnected() { if w.ctxCancelWgWatch != nil { w.ctxCancelWgWatch() } - go w.notifyDisconnected() -} - -func (w *WorkerRelay) notifyDisconnected() { - w.callBackMu.Lock() - defer w.callBackMu.Unlock() - - if w.onDisconnected == nil { - return - } - w.onDisconnected() -} - -func (w *WorkerRelay) notifyOnReady(ci RelayConnInfo) { - w.callBackMu.Lock() - defer w.callBackMu.Unlock() - - if w.onConnReady == nil { - return - } - w.onConnReady(ci) + go w.conn.onWorkerRelayStateDisconnected() }