Merge branch 'main' into fix/pkg-loss

This commit is contained in:
Zoltán Papp
2025-09-03 16:01:50 +02:00
872 changed files with 75638 additions and 22204 deletions

View File

@@ -6,23 +6,27 @@ import (
"fmt"
"net"
"net/netip"
"strings"
"sync"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/bufsize"
"github.com/netbirdio/netbird/client/iface/wgproxy/listener"
)
type IceBind interface {
SetEndpoint(addr *net.UDPAddr, conn net.Conn) (*net.UDPAddr, error)
RemoveEndpoint(addr *net.UDPAddr)
SetEndpoint(fakeIP netip.Addr, conn net.Conn)
RemoveEndpoint(fakeIP netip.Addr)
Recv(ctx context.Context, msg bind.RecvMessage)
MTU() uint16
}
type ProxyBind struct {
bind IceBind
// wgEndpoint is a fake address that generated by the Bind.SetEndpoint based on the remote NetBird peer address
// wgRelayedEndpoint is a fake address that generated by the Bind.SetEndpoint based on the remote NetBird peer address
wgRelayedEndpoint *bind.Endpoint
wgCurrentUsed *bind.Endpoint
remoteConn net.Conn
@@ -34,13 +38,18 @@ type ProxyBind struct {
paused bool
pausedCond *sync.Cond
isStarted bool
closeListener *listener.CloseListener
}
func NewProxyBind(bind IceBind) *ProxyBind {
return &ProxyBind{
bind: bind,
pausedCond: sync.NewCond(&sync.Mutex{}),
p := &ProxyBind{
bind: bind,
closeListener: listener.NewCloseListener(),
pausedCond: sync.NewCond(&sync.Mutex{}),
}
return p
}
// AddTurnConn adds a new connection to the bind.
@@ -52,26 +61,32 @@ func NewProxyBind(bind IceBind) *ProxyBind {
// - nbAddr: The NetBird UDP address of the remote peer, it required to generate fake address
// - remoteConn: The established TURN connection to the remote peer
func (p *ProxyBind) AddTurnConn(ctx context.Context, nbAddr *net.UDPAddr, remoteConn net.Conn) error {
fakeAddr, err := p.bind.SetEndpoint(nbAddr, remoteConn)
fakeNetIP, err := fakeAddress(nbAddr)
if err != nil {
return err
}
p.wgRelayedEndpoint = addrToEndpoint(fakeAddr)
p.wgRelayedEndpoint = &bind.Endpoint{AddrPort: *fakeNetIP}
p.remoteConn = remoteConn
p.ctx, p.cancel = context.WithCancel(ctx)
return err
return nil
}
func (p *ProxyBind) EndpointAddr() *net.UDPAddr {
return bind.EndpointToUDPAddr(*p.wgRelayedEndpoint)
}
func (p *ProxyBind) SetDisconnectListener(disconnected func()) {
p.closeListener.SetCloseListener(disconnected)
}
func (p *ProxyBind) Work() {
if p.remoteConn == nil {
return
}
p.bind.SetEndpoint(p.wgRelayedEndpoint.Addr(), p.remoteConn)
p.pausedCond.L.Lock()
p.paused = false
@@ -108,6 +123,12 @@ func (p *ProxyBind) RedirectAs(endpoint *net.UDPAddr) {
p.pausedCond.Signal()
}
func addrToEndpoint(addr *net.UDPAddr) *bind.Endpoint {
ip, _ := netip.AddrFromSlice(addr.IP.To4())
addrPort := netip.AddrPortFrom(ip, uint16(addr.Port))
return &bind.Endpoint{AddrPort: addrPort}
}
func (p *ProxyBind) CloseConn() error {
if p.cancel == nil {
return fmt.Errorf("proxy not started")
@@ -126,6 +147,9 @@ func (p *ProxyBind) close() error {
if p.closed {
return nil
}
p.closeListener.SetCloseListener(nil)
p.closed = true
p.cancel()
@@ -135,7 +159,7 @@ func (p *ProxyBind) close() error {
p.pausedCond.L.Unlock()
p.pausedCond.Signal()
p.bind.RemoveEndpoint(bind.EndpointToUDPAddr(*p.wgRelayedEndpoint))
p.bind.RemoveEndpoint(p.wgRelayedEndpoint.Addr())
if rErr := p.remoteConn.Close(); rErr != nil && !errors.Is(rErr, net.ErrClosed) {
return rErr
@@ -151,12 +175,13 @@ func (p *ProxyBind) proxyToLocal(ctx context.Context) {
}()
for {
buf := make([]byte, 1500)
buf := make([]byte, p.bind.MTU()+bufsize.WGBufferOverhead)
n, err := p.remoteConn.Read(buf)
if err != nil {
if ctx.Err() != nil {
return
}
p.closeListener.Notify()
log.Errorf("failed to read from remote conn: %s, %s", p.remoteConn.RemoteAddr(), err)
return
}
@@ -175,8 +200,19 @@ func (p *ProxyBind) proxyToLocal(ctx context.Context) {
}
}
func addrToEndpoint(addr *net.UDPAddr) *bind.Endpoint {
ip, _ := netip.AddrFromSlice(addr.IP.To4())
addrPort := netip.AddrPortFrom(ip, uint16(addr.Port))
return &bind.Endpoint{AddrPort: addrPort}
// fakeAddress returns a fake address that is used to as an identifier for the peer.
// The fake address is in the format of 127.1.x.x where x.x is the last two octets of the peer address.
func fakeAddress(peerAddress *net.UDPAddr) (*netip.AddrPort, error) {
octets := strings.Split(peerAddress.IP.String(), ".")
if len(octets) != 4 {
return nil, fmt.Errorf("invalid IP format")
}
fakeIP, err := netip.ParseAddr(fmt.Sprintf("127.1.%s.%s", octets[2], octets[3]))
if err != nil {
return nil, fmt.Errorf("parse new IP: %w", err)
}
netipAddr := netip.AddrPortFrom(fakeIP, uint16(peerAddress.Port))
return &netipAddr, nil
}