diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 4c264e129..a6cf3cd25 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -455,14 +455,6 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) { return } - // Check if we already have a relay proxy configured - if conn.wgProxyRelay != nil { - conn.Log.Debugf("Relay proxy already configured, skipping duplicate setup") - // Update status to ensure it's connected - conn.statusRelay.SetConnected() - return - } - conn.dumpState.RelayConnected() conn.Log.Debugf("Relay connection has been established, setup the WireGuard") @@ -480,42 +472,19 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) { if conn.isICEActive() { conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String()) conn.setRelayedProxy(wgProxy) - // For WASM, we still need to start the proxy and configure WireGuard - // because ICE doesn't actually work in browsers and we rely on relay - conn.Log.Infof("WASM check: runtime.GOOS=%s, should start proxy=%v", runtime.GOOS, runtime.GOOS == "js") - if runtime.GOOS == "js" { - conn.Log.Infof("WASM: starting relay proxy and configuring WireGuard despite ICE being 'active'") - wgProxy.Work() - - // Configure WireGuard to use the relay proxy endpoint - endpointAddr := wgProxy.EndpointAddr() - conn.Log.Infof("WASM: Configuring WireGuard endpoint to proxy address: %v", endpointAddr) - if err := conn.configureWGEndpoint(endpointAddr, rci.rosenpassPubKey); err != nil { - conn.Log.Errorf("WASM: Failed to update WireGuard peer configuration: %v", err) - } else { - conn.Log.Infof("WASM: Successfully configured WireGuard endpoint to use proxy at %v", endpointAddr) - } - - // Update connection priority to relay for WASM - conn.currentConnPriority = conntype.Relay - conn.rosenpassRemoteKey = rci.rosenpassPubKey - } conn.statusRelay.SetConnected() conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) return } wgProxy.Work() - endpointAddr := wgProxy.EndpointAddr() - conn.Log.Infof("Configuring WireGuard endpoint to proxy address: %v", endpointAddr) - if err := conn.configureWGEndpoint(endpointAddr, rci.rosenpassPubKey); err != nil { + if err := conn.configureWGEndpoint(wgProxy.EndpointAddr(), rci.rosenpassPubKey); err != nil { if err := wgProxy.CloseConn(); err != nil { conn.Log.Warnf("Failed to close relay connection: %v", err) } conn.Log.Errorf("Failed to update WireGuard peer configuration: %v", err) return } - conn.Log.Infof("Successfully configured WireGuard endpoint to use proxy at %v", endpointAddr) conn.wgWatcherWg.Add(1) go func() { @@ -694,21 +663,6 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) { } }() - // In WASM with forced relay, ICE is not used, so skip ICE check - if runtime.GOOS == "js" && os.Getenv("NB_FORCE_RELAY") == "true" { - // Only check relay connection status - if conn.workerRelay.IsRelayConnectionSupportedWithPeer() { - relayConnected := conn.statusRelay.Get() != worker.StatusDisconnected - if !relayConnected { - conn.Log.Tracef("WASM: relay not connected for connectivity check") - } - return relayConnected - } - // If relay is not supported, consider it connected to avoid reconnect loop - conn.Log.Tracef("WASM: relay not supported, returning true to avoid reconnect") - return true - } - if conn.statusICE.Get() == worker.StatusDisconnected && !conn.workerICE.InProgress() { return false } diff --git a/client/internal/peer/worker_relay.go b/client/internal/peer/worker_relay.go index a10e5a569..f584487f5 100644 --- a/client/internal/peer/worker_relay.go +++ b/client/internal/peer/worker_relay.go @@ -2,6 +2,7 @@ package peer import ( "context" + "errors" "net" "sync" "sync/atomic" @@ -54,22 +55,6 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { } w.relaySupportedOnRemotePeer.Store(true) - // Check if we already have an active relay connection - w.relayLock.Lock() - existingConn := w.relayedConn - w.relayLock.Unlock() - - if existingConn != nil { - w.log.Debugf("relay connection already exists for peer %s, reusing it", w.config.Key) - // Connection exists, just ensure proxy is set up if needed - go w.conn.onRelayConnectionIsReady(RelayConnInfo{ - relayedConn: existingConn, - rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, - rosenpassAddr: remoteOfferAnswer.RosenpassAddr, - }) - return - } - // the relayManager will return with error in case if the connection has lost with relay server currentRelayAddress, err := w.relayManager.RelayInstanceAddress() if err != nil { @@ -81,24 +66,15 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) { relayedConn, err := w.relayManager.OpenConn(w.peerCtx, srv, w.config.Key) if err != nil { - // The relay manager never actually returns ErrConnAlreadyExists - it returns - // the existing connection with nil error. This error handling is for other failures. + if errors.Is(err, relayClient.ErrConnAlreadyExists) { + w.log.Debugf("handled offer by reusing existing relay connection") + return + } w.log.Errorf("failed to open connection via Relay: %s", err) return } w.relayLock.Lock() - // Check if we already stored this connection (might happen if OpenConn returned existing) - if w.relayedConn != nil && w.relayedConn == relayedConn { - w.relayLock.Unlock() - w.log.Debugf("OpenConn returned the same connection we already have for peer %s", w.config.Key) - go w.conn.onRelayConnectionIsReady(RelayConnInfo{ - relayedConn: relayedConn, - rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey, - rosenpassAddr: remoteOfferAnswer.RosenpassAddr, - }) - return - } w.relayedConn = relayedConn w.relayLock.Unlock() @@ -147,16 +123,11 @@ func (w *WorkerRelay) CloseConn() { if err := w.relayedConn.Close(); err != nil { w.log.Warnf("failed to close relay connection: %v", err) } - // Clear the stored connection to allow reopening - w.relayedConn = nil } func (w *WorkerRelay) onWGDisconnected() { w.relayLock.Lock() - if w.relayedConn != nil { - _ = w.relayedConn.Close() - w.relayedConn = nil - } + _ = w.relayedConn.Close() w.relayLock.Unlock() w.conn.onRelayDisconnected() @@ -177,11 +148,6 @@ func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress st } func (w *WorkerRelay) onRelayClientDisconnected() { - // Clear the stored connection when relay disconnects - w.relayLock.Lock() - w.relayedConn = nil - w.relayLock.Unlock() - w.wgWatcher.DisableWgWatcher() go w.conn.onRelayDisconnected() } diff --git a/shared/relay/client/client.go b/shared/relay/client/client.go index d04749296..5dabc5742 100644 --- a/shared/relay/client/client.go +++ b/shared/relay/client/client.go @@ -223,10 +223,10 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro c.mu.Unlock() return nil, fmt.Errorf("relay connection is not established") } - existingContainer, ok := c.conns[peerID] + _, ok := c.conns[peerID] if ok { c.mu.Unlock() - return existingContainer.conn, nil + return nil, ErrConnAlreadyExists } c.mu.Unlock() @@ -235,6 +235,7 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro return nil, err } + c.log.Infof("remote peer is available, prepare the relayed connection: %s", peerID) msgChannel := make(chan Msg, 100) c.mu.Lock() @@ -248,11 +249,11 @@ func (c *Client) OpenConn(ctx context.Context, dstPeerID string) (net.Conn, erro c.muInstanceURL.Unlock() conn := NewConn(c, peerID, msgChannel, instanceURL) - existingContainer, ok = c.conns[peerID] + _, ok = c.conns[peerID] if ok { c.mu.Unlock() _ = conn.Close() - return existingContainer.conn, nil + return nil, ErrConnAlreadyExists } c.conns[peerID] = newConnContainer(c.log, conn, msgChannel) c.mu.Unlock() @@ -376,6 +377,7 @@ func (c *Client) readLoop(hc *healthcheck.Receiver, relayConn net.Conn, internal buf := *bufPtr n, errExit = relayConn.Read(buf) if errExit != nil { + c.log.Infof("start to Relay read loop exit") c.mu.Lock() if c.serviceIsRunning && !internallyStoppedFlag.isSet() { c.log.Errorf("failed to read message from relay server: %s", errExit) @@ -466,24 +468,12 @@ func (c *Client) handleTransportMsg(buf []byte, bufPtr *[]byte, internallyStoppe c.bufPool.Put(bufPtr) return false } - container, ok := c.conns[*peerID] c.mu.Unlock() if !ok { - // Try to create a connection for this peer to handle incoming messages - msgChannel := make(chan Msg, 100) - c.muInstanceURL.Lock() - instanceURL := c.instanceURL - c.muInstanceURL.Unlock() - conn := NewConn(c, *peerID, msgChannel, instanceURL) - - c.mu.Lock() - // Check again if connection was created while we were creating it - if _, exists := c.conns[*peerID]; !exists { - c.conns[*peerID] = newConnContainer(c.log, conn, msgChannel) - } - container = c.conns[*peerID] - c.mu.Unlock() + c.log.Errorf("peer not found: %s", peerID.String()) + c.bufPool.Put(bufPtr) + return true } msg := Msg{ bufPool: c.bufPool,