diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index ad84bd700..3f489864b 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -80,8 +80,6 @@ type Conn struct { config ConnConfig statusRecorder *Status wgProxyFactory *wgproxy.Factory - wgProxyICE wgproxy.Proxy - wgProxyRelay wgproxy.Proxy signaler *Signaler relayManager *relayClient.Manager allowedIPsIP string @@ -103,7 +101,8 @@ type Conn struct { beforeAddPeerHooks []nbnet.AddHookFunc afterRemovePeerHooks []nbnet.RemoveHookFunc - endpointRelay *net.UDPAddr + wgProxyICE wgproxy.Proxy + wgProxyRelay wgproxy.Proxy // for reconnection operations iCEDisconnected chan bool @@ -430,24 +429,33 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon conn.log.Infof("set ICE to active connection") - endpoint, wgProxy, err := conn.getEndpointForICEConnInfo(iceConnInfo) - if err != nil { - return + var ( + ep *net.UDPAddr + wgProxy wgproxy.Proxy + err error + ) + if iceConnInfo.RelayedOnLocal { + wgProxy, err = conn.newProxy(iceConnInfo.RemoteConn) + if err != nil { + conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err) + conn.statusICE.Set(StatusDisconnected) + return + } + ep = wgProxy.EndpointAddr() + } else { + ep, _ = net.ResolveUDPAddr("udp", iceConnInfo.RemoteConn.RemoteAddr().String()) } - endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) - conn.log.Debugf("Conn resolved IP is %s for endopint %s", endpoint, endpointUdpAddr.IP) - conn.connIDICE = nbnet.GenerateConnID() for _, hook := range conn.beforeAddPeerHooks { - if err := hook(conn.connIDICE, endpointUdpAddr.IP); err != nil { + if err := hook(conn.connIDICE, ep.IP); err != nil { conn.log.Errorf("Before add peer hook failed: %v", err) } } conn.workerRelay.DisableWgWatcher() - err = conn.configureWGEndpoint(endpointUdpAddr) + err = conn.configureWGEndpoint(ep) if err != nil { if wgProxy != nil { if err := wgProxy.CloseConn(); err != nil { @@ -455,19 +463,12 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon } } conn.log.Warnf("Failed to update wg peer configuration: %v", err) + // todo revert to relay connection if it was active before return } wgConfigWorkaround() - - if conn.wgProxyICE != nil { - if err := conn.wgProxyICE.CloseConn(); err != nil { - conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err) - } - } conn.wgProxyICE = wgProxy - conn.currentConnPriority = priority - conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) } @@ -483,10 +484,9 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { conn.log.Tracef("ICE connection state changed to %s", newState) // switch back to relay connection - if conn.endpointRelay != nil && conn.currentConnPriority != connPriorityRelay { + if conn.wgProxyRelay != nil && conn.currentConnPriority != connPriorityRelay { conn.log.Debugf("ICE disconnected, set Relay to active connection") - err := conn.configureWGEndpoint(conn.endpointRelay) - if err != nil { + if err := conn.configureWGEndpoint(conn.wgProxyRelay.EndpointAddr()); err != nil { conn.log.Errorf("failed to switch to relay conn: %v", err) } conn.workerRelay.EnableWgWatcher(conn.ctx) @@ -508,6 +508,12 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) { ConnStatusUpdate: time.Now(), } + if conn.wgProxyICE != nil { + if err := conn.wgProxyICE.CloseConn(); err != nil { + conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err) + } + } + err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState) if err != nil { conn.log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err) @@ -525,20 +531,18 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { return } - conn.log.Debugf("Relay connection is ready to use") - conn.statusRelay.Set(StatusConnected) + conn.log.Debugf("Relay connection has been established, setup the WireGuard") - wgProxy := conn.wgProxyFactory.GetProxy() - endpoint, err := wgProxy.AddTurnConn(conn.ctx, rci.relayedConn) + wgProxy, err := conn.newProxy(rci.relayedConn) if err != nil { conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err) return } - conn.log.Infof("created new wgProxy for relay connection: %s", endpoint) + conn.wgProxyRelay = wgProxy - endpointUdpAddr, _ := net.ResolveUDPAddr(endpoint.Network(), endpoint.String()) - conn.endpointRelay = endpointUdpAddr - conn.log.Debugf("conn resolved IP for %s: %s", endpoint, endpointUdpAddr.IP) + conn.log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String()) + + conn.statusRelay.Set(StatusConnected) defer conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) @@ -551,28 +555,22 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) { conn.connIDRelay = nbnet.GenerateConnID() for _, hook := range conn.beforeAddPeerHooks { - if err := hook(conn.connIDRelay, endpointUdpAddr.IP); err != nil { + if err := hook(conn.connIDRelay, wgProxy.EndpointAddr().IP); err != nil { conn.log.Errorf("Before add peer hook failed: %v", err) } } - err = conn.configureWGEndpoint(endpointUdpAddr) + err = conn.configureWGEndpoint(wgProxy.EndpointAddr()) if err != nil { + conn.statusRelay.Set(StatusDisconnected) if err := wgProxy.CloseConn(); err != nil { conn.log.Warnf("Failed to close relay connection: %v", err) } - conn.log.Errorf("Failed to update wg peer configuration: %v", err) + conn.log.Errorf("Failed to update WireGuard peer configuration: %v", err) return } conn.workerRelay.EnableWgWatcher(conn.ctx) wgConfigWorkaround() - - if conn.wgProxyRelay != nil { - if err := conn.wgProxyRelay.CloseConn(); err != nil { - conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err) - } - } - conn.wgProxyRelay = wgProxy conn.currentConnPriority = connPriorityRelay conn.log.Infof("start to communicate with peer via relay") @@ -598,7 +596,6 @@ func (conn *Conn) onWorkerRelayStateDisconnected() { } if conn.wgProxyRelay != nil { - conn.endpointRelay = nil _ = conn.wgProxyRelay.CloseConn() conn.wgProxyRelay = nil } @@ -775,21 +772,16 @@ func (conn *Conn) freeUpConnID() { } } -func (conn *Conn) getEndpointForICEConnInfo(iceConnInfo ICEConnInfo) (net.Addr, wgproxy.Proxy, error) { - if !iceConnInfo.RelayedOnLocal { - return iceConnInfo.RemoteConn.RemoteAddr(), nil, nil - } - conn.log.Debugf("setup ice turn connection") +func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) { + conn.log.Debugf("setup proxied WireGuard connection") wgProxy := conn.wgProxyFactory.GetProxy() - ep, err := wgProxy.AddTurnConn(conn.ctx, iceConnInfo.RemoteConn) - if err != nil { + if err := wgProxy.AddTurnConn(conn.ctx, remoteConn); err != nil { conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err) - if errClose := wgProxy.CloseConn(); errClose != nil { - conn.log.Warnf("failed to close turn proxy connection: %v", errClose) - } - return nil, nil, err + return nil, err } - return ep, wgProxy, nil + // todo remove this line if you active the resume, pause logic + wgProxy.Work() + return wgProxy, nil } func isRosenpassEnabled(remoteRosenpassPubKey []byte) bool { diff --git a/client/internal/wgproxy/ebpf/proxy.go b/client/internal/wgproxy/ebpf/proxy.go index 4cf0f93fe..e850f4533 100644 --- a/client/internal/wgproxy/ebpf/proxy.go +++ b/client/internal/wgproxy/ebpf/proxy.go @@ -249,7 +249,7 @@ func (p *WGEBPFProxy) prepareSenderRawSocket() (net.PacketConn, error) { return packetConn, nil } -func (p *WGEBPFProxy) sendPkg(data []byte, port uint16) error { +func (p *WGEBPFProxy) sendPkg(data []byte, port int) error { localhost := net.ParseIP("127.0.0.1") payload := gopacket.Payload(data) diff --git a/client/internal/wgproxy/ebpf/wrapper.go b/client/internal/wgproxy/ebpf/wrapper.go index d4e11083b..12908f1bc 100644 --- a/client/internal/wgproxy/ebpf/wrapper.go +++ b/client/internal/wgproxy/ebpf/wrapper.go @@ -20,22 +20,26 @@ type ProxyWrapper struct { ctx context.Context cancel context.CancelFunc - wgEndpointPort uint16 + wgEndpointAddr *net.UDPAddr pausedMu sync.Mutex paused bool isStarted bool } -func (p *ProxyWrapper) AddTurnConn(ctx context.Context, remoteConn net.Conn) (net.Addr, error) { +func (p *ProxyWrapper) AddTurnConn(ctx context.Context, remoteConn net.Conn) error { addr, err := p.WgeBPFProxy.AddTurnConn(remoteConn) if err != nil { - return nil, fmt.Errorf("add turn conn: %w", err) + return fmt.Errorf("add turn conn: %w", err) } p.remoteConn = remoteConn p.ctx, p.cancel = context.WithCancel(ctx) - p.wgEndpointPort = uint16(addr.Port) - return addr, err + p.wgEndpointAddr = addr + return err +} + +func (p *ProxyWrapper) EndpointAddr() *net.UDPAddr { + return p.wgEndpointAddr } func (p *ProxyWrapper) Work() { @@ -88,7 +92,7 @@ func (e *ProxyWrapper) CloseConn() error { } func (p *ProxyWrapper) proxyToLocal(ctx context.Context) { - defer p.WgeBPFProxy.removeTurnConn(p.wgEndpointPort) + defer p.WgeBPFProxy.removeTurnConn(uint16(p.wgEndpointAddr.Port)) var ( err error @@ -102,12 +106,12 @@ func (p *ProxyWrapper) proxyToLocal(ctx context.Context) { return } if err != io.EOF { - log.Errorf("failed to read from turn conn (endpoint: :%d): %s", p.wgEndpointPort, err) + log.Errorf("failed to read from turn conn (endpoint: :%d): %s", p.wgEndpointAddr.Port, err) } return } - if err := p.WgeBPFProxy.sendPkg(buf[:n], p.wgEndpointPort); err != nil { + if err := p.WgeBPFProxy.sendPkg(buf[:n], p.wgEndpointAddr.Port); err != nil { if ctx.Err() != nil { return } diff --git a/client/internal/wgproxy/proxy.go b/client/internal/wgproxy/proxy.go index 55093df1c..558121cdd 100644 --- a/client/internal/wgproxy/proxy.go +++ b/client/internal/wgproxy/proxy.go @@ -7,7 +7,8 @@ import ( // Proxy is a transfer layer between the relayed connection and the WireGuard type Proxy interface { - AddTurnConn(ctx context.Context, turnConn net.Conn) (net.Addr, error) + AddTurnConn(ctx context.Context, turnConn net.Conn) error + EndpointAddr() *net.UDPAddr Work() Pause() CloseConn() error diff --git a/client/internal/wgproxy/usp/proxy.go b/client/internal/wgproxy/usp/proxy.go index 8f5044443..1e1a51ea5 100644 --- a/client/internal/wgproxy/usp/proxy.go +++ b/client/internal/wgproxy/usp/proxy.go @@ -42,22 +42,31 @@ func NewWGUserSpaceProxy(wgPort int) *WGUserSpaceProxy { // the connection is complete, an error is returned. Once successfully // connected, any expiration of the context will not affect the // connection. -func (p *WGUserSpaceProxy) AddTurnConn(ctx context.Context, remoteConn net.Conn) (net.Addr, error) { +func (p *WGUserSpaceProxy) AddTurnConn(ctx context.Context, remoteConn net.Conn) error { dialer := net.Dialer{} localConn, err := dialer.DialContext(ctx, "udp", fmt.Sprintf(":%d", p.localWGListenPort)) if err != nil { log.Errorf("failed dialing to local Wireguard port %s", err) p.cancel() - return nil, err + return err } p.ctx, p.cancel = context.WithCancel(ctx) p.localConn = localConn p.remoteConn = remoteConn - return p.localConn.LocalAddr(), err + return err } +func (p *WGUserSpaceProxy) EndpointAddr() *net.UDPAddr { + if p.localConn == nil { + return nil + } + endpointUdpAddr, _ := net.ResolveUDPAddr(p.localConn.LocalAddr().Network(), p.localConn.LocalAddr().String()) + return endpointUdpAddr +} + +// Work starts the proxy or resumes it if it was paused func (p *WGUserSpaceProxy) Work() { if p.remoteConn == nil { return @@ -74,6 +83,7 @@ func (p *WGUserSpaceProxy) Work() { } } +// Pause pauses the proxy from receiving data from the remote peer func (p *WGUserSpaceProxy) Pause() { if p.remoteConn == nil { return @@ -147,6 +157,7 @@ func (p *WGUserSpaceProxy) proxyToRemote(ctx context.Context) { } // proxyToLocal proxies from the Remote peer to local WireGuard +// if the proxy is paused it will drain the remote conn and drop the packets func (p *WGUserSpaceProxy) proxyToLocal(ctx context.Context) { defer func() { if err := p.close(); err != nil {