[client] Eliminate UDP proxy in user-space mode (#2712)

In the case of user space WireGuard mode, use in-memory proxy between the TURN/Relay connection and the WireGuard Bind. We keep the UDP proxy and eBPF proxy for kernel mode.

The key change is the new wgproxy/bind and the iface/bind/ice_bind changes. Everything else is just to fulfill the dependencies.
This commit is contained in:
Zoltan Papp
2024-10-22 20:53:14 +02:00
committed by GitHub
parent 0106a95f7a
commit 30ebcf38c7
50 changed files with 1129 additions and 553 deletions

View File

@@ -0,0 +1,32 @@
package ebpf
import (
"fmt"
"net"
)
const (
portRangeStart = 3128
portRangeEnd = 3228
)
type portLookup struct {
}
func (pl portLookup) searchFreePort() (int, error) {
for i := portRangeStart; i <= portRangeEnd; i++ {
if pl.tryToBind(i) == nil {
return i, nil
}
}
return 0, fmt.Errorf("failed to bind free port for eBPF proxy")
}
func (pl portLookup) tryToBind(port int) error {
l, err := net.ListenPacket("udp", fmt.Sprintf(":%d", port))
if err != nil {
return err
}
_ = l.Close()
return nil
}

View File

@@ -0,0 +1,42 @@
package ebpf
import (
"fmt"
"net"
"testing"
)
func Test_portLookup_searchFreePort(t *testing.T) {
pl := portLookup{}
_, err := pl.searchFreePort()
if err != nil {
t.Fatal(err)
}
}
func Test_portLookup_on_allocated(t *testing.T) {
pl := portLookup{}
allocatedPort, err := allocatePort(portRangeStart)
if err != nil {
t.Fatal(err)
}
defer allocatedPort.Close()
fp, err := pl.searchFreePort()
if err != nil {
t.Fatal(err)
}
if fp != (portRangeStart + 1) {
t.Errorf("invalid free port, expected: %d, got: %d", portRangeStart+1, fp)
}
}
func allocatePort(port int) (net.PacketConn, error) {
c, err := net.ListenPacket("udp", fmt.Sprintf(":%d", port))
if err != nil {
return nil, err
}
return c, err
}

View File

@@ -0,0 +1,283 @@
//go:build linux && !android
package ebpf
import (
"context"
"fmt"
"net"
"os"
"sync"
"syscall"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
"github.com/hashicorp/go-multierror"
"github.com/pion/transport/v3"
log "github.com/sirupsen/logrus"
nberrors "github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/client/internal/ebpf"
ebpfMgr "github.com/netbirdio/netbird/client/internal/ebpf/manager"
nbnet "github.com/netbirdio/netbird/util/net"
)
const (
loopbackAddr = "127.0.0.1"
)
// WGEBPFProxy definition for proxy with EBPF support
type WGEBPFProxy struct {
localWGListenPort int
ebpfManager ebpfMgr.Manager
turnConnStore map[uint16]net.Conn
turnConnMutex sync.Mutex
lastUsedPort uint16
rawConn net.PacketConn
conn transport.UDPConn
ctx context.Context
ctxCancel context.CancelFunc
}
// NewWGEBPFProxy create new WGEBPFProxy instance
func NewWGEBPFProxy(wgPort int) *WGEBPFProxy {
log.Debugf("instantiate ebpf proxy")
wgProxy := &WGEBPFProxy{
localWGListenPort: wgPort,
ebpfManager: ebpf.GetEbpfManagerInstance(),
turnConnStore: make(map[uint16]net.Conn),
}
return wgProxy
}
// Listen load ebpf program and listen the proxy
func (p *WGEBPFProxy) Listen() error {
pl := portLookup{}
wgPorxyPort, err := pl.searchFreePort()
if err != nil {
return err
}
p.rawConn, err = p.prepareSenderRawSocket()
if err != nil {
return err
}
err = p.ebpfManager.LoadWgProxy(wgPorxyPort, p.localWGListenPort)
if err != nil {
return err
}
addr := net.UDPAddr{
Port: wgPorxyPort,
IP: net.ParseIP(loopbackAddr),
}
p.ctx, p.ctxCancel = context.WithCancel(context.Background())
conn, err := nbnet.ListenUDP("udp", &addr)
if err != nil {
if cErr := p.Free(); cErr != nil {
log.Errorf("Failed to close the wgproxy: %s", cErr)
}
return err
}
p.conn = conn
go p.proxyToRemote()
log.Infof("local wg proxy listening on: %d", wgPorxyPort)
return nil
}
// AddTurnConn add new turn connection for the proxy
func (p *WGEBPFProxy) AddTurnConn(turnConn net.Conn) (*net.UDPAddr, error) {
wgEndpointPort, err := p.storeTurnConn(turnConn)
if err != nil {
return nil, err
}
log.Infof("turn conn added to wg proxy store: %s, endpoint port: :%d", turnConn.RemoteAddr(), wgEndpointPort)
wgEndpoint := &net.UDPAddr{
IP: net.ParseIP(loopbackAddr),
Port: int(wgEndpointPort),
}
return wgEndpoint, nil
}
// Free resources except the remoteConns will be keep open.
func (p *WGEBPFProxy) Free() error {
log.Debugf("free up ebpf wg proxy")
if p.ctx != nil && p.ctx.Err() != nil {
//nolint
return nil
}
p.ctxCancel()
var result *multierror.Error
if p.conn != nil {
if err := p.conn.Close(); err != nil {
result = multierror.Append(result, err)
}
}
if err := p.ebpfManager.FreeWGProxy(); err != nil {
result = multierror.Append(result, err)
}
if err := p.rawConn.Close(); err != nil {
result = multierror.Append(result, err)
}
return nberrors.FormatErrorOrNil(result)
}
// proxyToRemote read messages from local WireGuard interface and forward it to remote conn
// From this go routine has only one instance.
func (p *WGEBPFProxy) proxyToRemote() {
buf := make([]byte, 1500)
for p.ctx.Err() == nil {
if err := p.readAndForwardPacket(buf); err != nil {
if p.ctx.Err() != nil {
return
}
log.Errorf("failed to proxy packet to remote conn: %s", err)
}
}
}
func (p *WGEBPFProxy) readAndForwardPacket(buf []byte) error {
n, addr, err := p.conn.ReadFromUDP(buf)
if err != nil {
return fmt.Errorf("failed to read UDP packet from WG: %w", err)
}
p.turnConnMutex.Lock()
conn, ok := p.turnConnStore[uint16(addr.Port)]
p.turnConnMutex.Unlock()
if !ok {
if p.ctx.Err() == nil {
log.Debugf("turn conn not found by port because conn already has been closed: %d", addr.Port)
}
return nil
}
if _, err := conn.Write(buf[:n]); err != nil {
return fmt.Errorf("failed to forward local WG packet (%d) to remote turn conn: %w", addr.Port, err)
}
return nil
}
func (p *WGEBPFProxy) storeTurnConn(turnConn net.Conn) (uint16, error) {
p.turnConnMutex.Lock()
defer p.turnConnMutex.Unlock()
np, err := p.nextFreePort()
if err != nil {
return np, err
}
p.turnConnStore[np] = turnConn
return np, nil
}
func (p *WGEBPFProxy) removeTurnConn(turnConnID uint16) {
p.turnConnMutex.Lock()
defer p.turnConnMutex.Unlock()
_, ok := p.turnConnStore[turnConnID]
if ok {
log.Debugf("remove turn conn from store by port: %d", turnConnID)
}
delete(p.turnConnStore, turnConnID)
}
func (p *WGEBPFProxy) nextFreePort() (uint16, error) {
if len(p.turnConnStore) == 65535 {
return 0, fmt.Errorf("reached maximum turn connection numbers")
}
generatePort:
if p.lastUsedPort == 65535 {
p.lastUsedPort = 1
} else {
p.lastUsedPort++
}
if _, ok := p.turnConnStore[p.lastUsedPort]; ok {
goto generatePort
}
return p.lastUsedPort, nil
}
func (p *WGEBPFProxy) prepareSenderRawSocket() (net.PacketConn, error) {
// Create a raw socket.
fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_RAW)
if err != nil {
return nil, fmt.Errorf("creating raw socket failed: %w", err)
}
// Set the IP_HDRINCL option on the socket to tell the kernel that headers are included in the packet.
err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_HDRINCL, 1)
if err != nil {
return nil, fmt.Errorf("setting IP_HDRINCL failed: %w", err)
}
// Bind the socket to the "lo" interface.
err = syscall.SetsockoptString(fd, syscall.SOL_SOCKET, syscall.SO_BINDTODEVICE, "lo")
if err != nil {
return nil, fmt.Errorf("binding to lo interface failed: %w", err)
}
// Set the fwmark on the socket.
err = nbnet.SetSocketOpt(fd)
if err != nil {
return nil, fmt.Errorf("setting fwmark failed: %w", err)
}
// Convert the file descriptor to a PacketConn.
file := os.NewFile(uintptr(fd), fmt.Sprintf("fd %d", fd))
if file == nil {
return nil, fmt.Errorf("converting fd to file failed")
}
packetConn, err := net.FilePacketConn(file)
if err != nil {
return nil, fmt.Errorf("converting file to packet conn failed: %w", err)
}
return packetConn, nil
}
func (p *WGEBPFProxy) sendPkg(data []byte, port int) error {
localhost := net.ParseIP("127.0.0.1")
payload := gopacket.Payload(data)
ipH := &layers.IPv4{
DstIP: localhost,
SrcIP: localhost,
Version: 4,
TTL: 64,
Protocol: layers.IPProtocolUDP,
}
udpH := &layers.UDP{
SrcPort: layers.UDPPort(port),
DstPort: layers.UDPPort(p.localWGListenPort),
}
err := udpH.SetNetworkLayerForChecksum(ipH)
if err != nil {
return fmt.Errorf("set network layer for checksum: %w", err)
}
layerBuffer := gopacket.NewSerializeBuffer()
err = gopacket.SerializeLayers(layerBuffer, gopacket.SerializeOptions{ComputeChecksums: true, FixLengths: true}, ipH, udpH, payload)
if err != nil {
return fmt.Errorf("serialize layers: %w", err)
}
if _, err = p.rawConn.WriteTo(layerBuffer.Bytes(), &net.IPAddr{IP: localhost}); err != nil {
return fmt.Errorf("write to raw conn: %w", err)
}
return nil
}

View File

@@ -0,0 +1,56 @@
//go:build linux && !android
package ebpf
import (
"testing"
)
func TestWGEBPFProxy_connStore(t *testing.T) {
wgProxy := NewWGEBPFProxy(1)
p, _ := wgProxy.storeTurnConn(nil)
if p != 1 {
t.Errorf("invalid initial port: %d", wgProxy.lastUsedPort)
}
numOfConns := 10
for i := 0; i < numOfConns; i++ {
p, _ = wgProxy.storeTurnConn(nil)
}
if p != uint16(numOfConns)+1 {
t.Errorf("invalid last used port: %d, expected: %d", p, numOfConns+1)
}
if len(wgProxy.turnConnStore) != numOfConns+1 {
t.Errorf("invalid store size: %d, expected: %d", len(wgProxy.turnConnStore), numOfConns+1)
}
}
func TestWGEBPFProxy_portCalculation_overflow(t *testing.T) {
wgProxy := NewWGEBPFProxy(1)
_, _ = wgProxy.storeTurnConn(nil)
wgProxy.lastUsedPort = 65535
p, _ := wgProxy.storeTurnConn(nil)
if len(wgProxy.turnConnStore) != 2 {
t.Errorf("invalid store size: %d, expected: %d", len(wgProxy.turnConnStore), 2)
}
if p != 2 {
t.Errorf("invalid last used port: %d, expected: %d", p, 2)
}
}
func TestWGEBPFProxy_portCalculation_maxConn(t *testing.T) {
wgProxy := NewWGEBPFProxy(1)
for i := 0; i < 65535; i++ {
_, _ = wgProxy.storeTurnConn(nil)
}
_, err := wgProxy.storeTurnConn(nil)
if err == nil {
t.Errorf("invalid turn conn store calculation")
}
}

View File

@@ -0,0 +1,126 @@
//go:build linux && !android
package ebpf
import (
"context"
"errors"
"fmt"
"io"
"net"
"sync"
log "github.com/sirupsen/logrus"
)
// ProxyWrapper help to keep the remoteConn instance for net.Conn.Close function call
type ProxyWrapper struct {
WgeBPFProxy *WGEBPFProxy
remoteConn net.Conn
ctx context.Context
cancel context.CancelFunc
wgEndpointAddr *net.UDPAddr
pausedMu sync.Mutex
paused bool
isStarted bool
}
func (p *ProxyWrapper) AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, remoteConn net.Conn) error {
addr, err := p.WgeBPFProxy.AddTurnConn(remoteConn)
if err != nil {
return fmt.Errorf("add turn conn: %w", err)
}
p.remoteConn = remoteConn
p.ctx, p.cancel = context.WithCancel(ctx)
p.wgEndpointAddr = addr
return err
}
func (p *ProxyWrapper) EndpointAddr() *net.UDPAddr {
return p.wgEndpointAddr
}
func (p *ProxyWrapper) Work() {
if p.remoteConn == nil {
return
}
p.pausedMu.Lock()
p.paused = false
p.pausedMu.Unlock()
if !p.isStarted {
p.isStarted = true
go p.proxyToLocal(p.ctx)
}
}
func (p *ProxyWrapper) Pause() {
if p.remoteConn == nil {
return
}
log.Tracef("pause proxy reading from: %s", p.remoteConn.RemoteAddr())
p.pausedMu.Lock()
p.paused = true
p.pausedMu.Unlock()
}
// CloseConn close the remoteConn and automatically remove the conn instance from the map
func (e *ProxyWrapper) CloseConn() error {
if e.cancel == nil {
return fmt.Errorf("proxy not started")
}
e.cancel()
if err := e.remoteConn.Close(); err != nil {
return fmt.Errorf("failed to close remote conn: %w", err)
}
return nil
}
func (p *ProxyWrapper) proxyToLocal(ctx context.Context) {
defer p.WgeBPFProxy.removeTurnConn(uint16(p.wgEndpointAddr.Port))
buf := make([]byte, 1500)
for {
n, err := p.readFromRemote(ctx, buf)
if err != nil {
return
}
p.pausedMu.Lock()
if p.paused {
p.pausedMu.Unlock()
continue
}
err = p.WgeBPFProxy.sendPkg(buf[:n], p.wgEndpointAddr.Port)
p.pausedMu.Unlock()
if err != nil {
if ctx.Err() != nil {
return
}
log.Errorf("failed to write out turn pkg to local conn: %v", err)
}
}
}
func (p *ProxyWrapper) readFromRemote(ctx context.Context, buf []byte) (int, error) {
n, err := p.remoteConn.Read(buf)
if err != nil {
if ctx.Err() != nil {
return 0, ctx.Err()
}
if !errors.Is(err, io.EOF) {
log.Errorf("failed to read from turn conn (endpoint: :%d): %s", p.wgEndpointAddr.Port, err)
}
return 0, err
}
return n, nil
}