diff --git a/.github/workflows/golang-test-linux.yml b/.github/workflows/golang-test-linux.yml index cc6f5ec57..886e232ec 100644 --- a/.github/workflows/golang-test-linux.yml +++ b/.github/workflows/golang-test-linux.yml @@ -72,6 +72,9 @@ jobs: - name: Generate Iface Test bin run: CGO_ENABLED=0 go test -c -o iface-testing.bin ./iface/ + - name: Generate Shared Sock Test bin + run: CGO_ENABLED=0 go test -c -o sharedsock-testing.bin ./sharedsock + - name: Generate RouteManager Test bin run: CGO_ENABLED=0 go test -c -o routemanager-testing.bin ./client/internal/routemanager/... @@ -83,9 +86,13 @@ jobs: - run: chmod +x *testing.bin + - name: Run Shared Sock tests in docker + run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/ci -w /ci/sharedsock --entrypoint /busybox/sh gcr.io/distroless/base:debug -c /ci/sharedsock-testing.bin -test.timeout 5m -test.parallel 1 + - name: Run Iface tests in docker run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/ci -w /ci/iface --entrypoint /busybox/sh gcr.io/distroless/base:debug -c /ci/iface-testing.bin -test.timeout 5m -test.parallel 1 + - name: Run RouteManager tests in docker run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/ci -w /ci/client/internal/routemanager --entrypoint /busybox/sh gcr.io/distroless/base:debug -c /ci/routemanager-testing.bin -test.timeout 5m -test.parallel 1 @@ -93,4 +100,4 @@ jobs: run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/ci -w /ci/client/internal --entrypoint /busybox/sh gcr.io/distroless/base:debug -c /ci/engine-testing.bin -test.timeout 5m -test.parallel 1 - name: Run Peer tests in docker - run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/ci -w /ci/client/internal/peer --entrypoint /busybox/sh gcr.io/distroless/base:debug -c /ci/peer-testing.bin -test.timeout 5m -test.parallel 1 + run: docker run -t --cap-add=NET_ADMIN --privileged --rm -v $PWD:/ci -w /ci/client/internal/peer --entrypoint /busybox/sh gcr.io/distroless/base:debug -c /ci/peer-testing.bin -test.timeout 5m -test.parallel 1 \ No newline at end of file diff --git a/client/internal/engine.go b/client/internal/engine.go index d7e78cbf1..0f0750c65 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -3,6 +3,7 @@ package internal import ( "context" "fmt" + "io" "math/rand" "net" "net/netip" @@ -23,9 +24,11 @@ import ( nbssh "github.com/netbirdio/netbird/client/ssh" nbdns "github.com/netbirdio/netbird/dns" "github.com/netbirdio/netbird/iface" + "github.com/netbirdio/netbird/iface/bind" mgm "github.com/netbirdio/netbird/management/client" mgmProto "github.com/netbirdio/netbird/management/proto" "github.com/netbirdio/netbird/route" + "github.com/netbirdio/netbird/sharedsock" signal "github.com/netbirdio/netbird/signal/client" sProto "github.com/netbirdio/netbird/signal/proto" "github.com/netbirdio/netbird/util" @@ -99,10 +102,8 @@ type Engine struct { wgInterface *iface.WGIface - udpMux ice.UDPMux - udpMuxSrflx ice.UniversalUDPMux - udpMuxConn *net.UDPConn - udpMuxConnSrflx *net.UDPConn + udpMux *bind.UniversalUDPMuxDefault + udpMuxConn io.Closer // networkSerial is the latest CurrentSerial (state ID) of the network sent by the Management service networkSerial uint64 @@ -206,33 +207,17 @@ func (e *Engine) Start() error { e.close() return err } - e.udpMux = udpMux.UDPMuxDefault - e.udpMuxSrflx = udpMux + e.udpMux = udpMux log.Infof("using userspace bind mode %s", udpMux.LocalAddr().String()) } else { - networkName := "udp" - if e.config.DisableIPv6Discovery { - networkName = "udp4" - } - e.udpMuxConn, err = net.ListenUDP(networkName, &net.UDPAddr{Port: e.config.UDPMuxPort}) + rawSock, err := sharedsock.Listen(e.config.WgPort, sharedsock.NewSTUNFilter()) if err != nil { - log.Errorf("failed listening on UDP port %d: [%s]", e.config.UDPMuxPort, err.Error()) - e.close() return err } - udpMuxParams := ice.UDPMuxParams{ - UDPConn: e.udpMuxConn, - Net: transportNet, - } - e.udpMux = ice.NewUDPMuxDefault(udpMuxParams) - - e.udpMuxConnSrflx, err = net.ListenUDP(networkName, &net.UDPAddr{Port: e.config.UDPMuxSrflxPort}) - if err != nil { - log.Errorf("failed listening on UDP port %d: [%s]", e.config.UDPMuxSrflxPort, err.Error()) - e.close() - return err - } - e.udpMuxSrflx = ice.NewUniversalUDPMuxDefault(ice.UniversalUDPMuxParams{UDPConn: e.udpMuxConnSrflx, Net: transportNet}) + mux := bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: rawSock, Net: transportNet}) + go mux.ReadFromConn(e.ctx) + e.udpMuxConn = rawSock + e.udpMux = mux } e.routeManager = routemanager.NewManager(e.ctx, e.config.WgPrivateKey.PublicKey().String(), e.wgInterface, e.statusRecorder) @@ -394,9 +379,6 @@ func SignalOfferAnswer(offerAnswer peer.OfferAnswer, myKey wgtypes.Key, remoteKe return err } - // indicates message support in gRPC - msg.Body.FeaturesSupported = []uint32{signal.DirectCheck} - err = s.Send(msg) if err != nil { return err @@ -824,8 +806,8 @@ func (e Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, er InterfaceBlackList: e.config.IFaceBlackList, DisableIPv6Discovery: e.config.DisableIPv6Discovery, Timeout: timeout, - UDPMux: e.udpMux, - UDPMuxSrflx: e.udpMuxSrflx, + UDPMux: e.udpMux.UDPMuxDefault, + UDPMuxSrflx: e.udpMux, ProxyConfig: proxyConfig, LocalWgPort: e.config.WgPort, NATExternalIPs: e.parseNATExternalIPMappings(), @@ -918,18 +900,6 @@ func (e *Engine) receiveSignalEvents() { } conn.OnRemoteCandidate(candidate) case sProto.Body_MODE: - protoMode := msg.GetBody().GetMode() - if protoMode == nil { - return fmt.Errorf("received an empty mode message") - } - - err := conn.OnModeMessage(peer.ModeMessage{ - Direct: protoMode.GetDirect(), - }) - if err != nil { - log.Errorf("failed processing a mode message -> %s", err) - return err - } } return nil @@ -1020,12 +990,6 @@ func (e *Engine) close() { } } - if e.udpMuxConnSrflx != nil { - if err := e.udpMuxConnSrflx.Close(); err != nil { - log.Debugf("close server reflexive udp mux connection: %v", err) - } - } - if !isNil(e.sshServer) { err := e.sshServer.Stop() if err != nil { diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go index 2a5a17f19..088010ef3 100644 --- a/client/internal/peer/conn.go +++ b/client/internal/peer/conn.go @@ -16,6 +16,7 @@ import ( "github.com/netbirdio/netbird/client/internal/proxy" "github.com/netbirdio/netbird/client/internal/stdnet" "github.com/netbirdio/netbird/iface" + "github.com/netbirdio/netbird/iface/bind" signal "github.com/netbirdio/netbird/signal/client" sProto "github.com/netbirdio/netbird/signal/proto" "github.com/netbirdio/netbird/version" @@ -358,88 +359,10 @@ func (conn *Conn) Open() error { } } -// useProxy determines whether a direct connection (without a go proxy) is possible -// -// There are 3 cases: -// -// * When neither candidate is from hard nat and one of the peers has a public IP -// -// * both peers are in the same private network -// -// * Local peer uses userspace interface with bind.ICEBind and is not relayed -// -// Please note, that this check happens when peers were already able to ping each other using ICE layer. -func shouldUseProxy(pair *ice.CandidatePair, userspaceBind bool) bool { - - if !isRelayCandidate(pair.Local) && userspaceBind { - log.Debugf("shouldn't use proxy because using Bind and the connection is not relayed") - return false - } - - if !isHardNATCandidate(pair.Local) && isHostCandidateWithPublicIP(pair.Remote) { - log.Debugf("shouldn't use proxy because the local peer is not behind a hard NAT and the remote one has a public IP") - return false - } - - if !isHardNATCandidate(pair.Remote) && isHostCandidateWithPublicIP(pair.Local) { - log.Debugf("shouldn't use proxy because the remote peer is not behind a hard NAT and the local one has a public IP") - return false - } - - if isHostCandidateWithPrivateIP(pair.Local) && isHostCandidateWithPrivateIP(pair.Remote) && isSameNetworkPrefix(pair) { - log.Debugf("shouldn't use proxy because peers are in the same private /16 network") - return false - } - - if (isPeerReflexiveCandidateWithPrivateIP(pair.Local) && isHostCandidateWithPrivateIP(pair.Remote) || - isHostCandidateWithPrivateIP(pair.Local) && isPeerReflexiveCandidateWithPrivateIP(pair.Remote)) && isSameNetworkPrefix(pair) { - log.Debugf("shouldn't use proxy because peers are in the same private /16 network and one peer is peer reflexive") - return false - } - - return true -} - -func isSameNetworkPrefix(pair *ice.CandidatePair) bool { - - localIP := net.ParseIP(pair.Local.Address()) - remoteIP := net.ParseIP(pair.Remote.Address()) - if localIP == nil || remoteIP == nil { - return false - } - // only consider /16 networks - mask := net.IPMask{255, 255, 0, 0} - return localIP.Mask(mask).Equal(remoteIP.Mask(mask)) -} - func isRelayCandidate(candidate ice.Candidate) bool { return candidate.Type() == ice.CandidateTypeRelay } -func isHardNATCandidate(candidate ice.Candidate) bool { - return candidate.Type() == ice.CandidateTypeRelay || candidate.Type() == ice.CandidateTypePeerReflexive -} - -func isHostCandidateWithPublicIP(candidate ice.Candidate) bool { - return candidate.Type() == ice.CandidateTypeHost && isPublicIP(candidate.Address()) -} - -func isHostCandidateWithPrivateIP(candidate ice.Candidate) bool { - return candidate.Type() == ice.CandidateTypeHost && !isPublicIP(candidate.Address()) -} - -func isPeerReflexiveCandidateWithPrivateIP(candidate ice.Candidate) bool { - return candidate.Type() == ice.CandidateTypePeerReflexive && !isPublicIP(candidate.Address()) -} - -func isPublicIP(address string) bool { - ip := net.ParseIP(address) - if ip == nil || ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() || ip.IsPrivate() { - return false - } - return true -} - // startProxy starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected func (conn *Conn) startProxy(remoteConn net.Conn, remoteWgPort int) error { conn.mu.Lock() @@ -452,7 +375,7 @@ func (conn *Conn) startProxy(remoteConn net.Conn, remoteWgPort int) error { } peerState := State{PubKey: conn.config.Key} - p := conn.getProxyWithMessageExchange(pair, remoteWgPort) + p := conn.getProxy(pair, remoteWgPort) conn.proxy = p err = p.Start(remoteConn) if err != nil { @@ -478,62 +401,35 @@ func (conn *Conn) startProxy(remoteConn net.Conn, remoteWgPort int) error { return nil } -func (conn *Conn) getProxyWithMessageExchange(pair *ice.CandidatePair, remoteWgPort int) proxy.Proxy { - useProxy := shouldUseProxy(pair, conn.config.UserspaceBind) - localDirectMode := !useProxy - remoteDirectMode := localDirectMode - - if conn.meta.protoSupport.DirectCheck { - go conn.sendLocalDirectMode(localDirectMode) - // will block until message received or timeout - remoteDirectMode = conn.receiveRemoteDirectMode() +// todo rename this method and the proxy package to something more appropriate +func (conn *Conn) getProxy(pair *ice.CandidatePair, remoteWgPort int) proxy.Proxy { + if isRelayCandidate(pair.Local) { + return proxy.NewWireGuardProxy(conn.config.ProxyConfig) } - if conn.config.UserspaceBind && localDirectMode { - return proxy.NewNoProxy(conn.config.ProxyConfig) - } + // To support old version's with direct mode we attempt to punch an additional role with the remote wireguard port + go conn.punchRemoteWGPort(pair, remoteWgPort) - if localDirectMode && remoteDirectMode { - return proxy.NewDirectNoProxy(conn.config.ProxyConfig, remoteWgPort) - } - - log.Debugf("falling back to local proxy mode with peer %s", conn.config.Key) - return proxy.NewWireGuardProxy(conn.config.ProxyConfig) + return proxy.NewNoProxy(conn.config.ProxyConfig) } -func (conn *Conn) sendLocalDirectMode(localMode bool) { - // todo what happens when we couldn't deliver this message? - // we could retry, etc but there is no guarantee - - err := conn.sendSignalMessage(&sProto.Message{ - Key: conn.config.LocalKey, - RemoteKey: conn.config.Key, - Body: &sProto.Body{ - Type: sProto.Body_MODE, - Mode: &sProto.Mode{ - Direct: &localMode, - }, - NetBirdVersion: version.NetbirdVersion(), - }, - }) +func (conn *Conn) punchRemoteWGPort(pair *ice.CandidatePair, remoteWgPort int) { + // wait local endpoint configuration + time.Sleep(time.Second) + addr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", pair.Remote.Address(), remoteWgPort)) if err != nil { - log.Errorf("failed to send local proxy mode to remote peer %s, error: %s", conn.config.Key, err) + log.Warnf("got an error while resolving the udp address, err: %s", err) + return } -} -func (conn *Conn) receiveRemoteDirectMode() bool { - timeout := time.Second - timer := time.NewTimer(timeout) - defer timer.Stop() - - select { - case receivedMSG := <-conn.remoteModeCh: - return receivedMSG.Direct - case <-timer.C: - // we didn't receive a message from remote so we assume that it supports the direct mode to keep the old behaviour - log.Debugf("timeout after %s while waiting for remote direct mode message from remote peer %s", - timeout, conn.config.Key) - return true + mux, ok := conn.config.UDPMuxSrflx.(*bind.UniversalUDPMuxDefault) + if !ok { + log.Warn("invalid udp mux conversion") + return + } + _, err = mux.GetSharedConn().WriteTo([]byte{0x6e, 0x62}, addr) + if err != nil { + log.Warnf("got an error while sending the punch packet, err: %s", err) } } @@ -757,16 +653,6 @@ func (conn *Conn) GetKey() string { return conn.config.Key } -// OnModeMessage unmarshall the payload message and send it to the mode message channel -func (conn *Conn) OnModeMessage(message ModeMessage) error { - select { - case conn.remoteModeCh <- message: - return nil - default: - return fmt.Errorf("unable to process mode message: channel busy") - } -} - // RegisterProtoSupportMeta register supported proto message in the connection metadata func (conn *Conn) RegisterProtoSupportMeta(support []uint32) { protoSupport := signal.ParseFeaturesSupported(support) diff --git a/client/internal/peer/conn_test.go b/client/internal/peer/conn_test.go index c34f6fed5..dedc381ac 100644 --- a/client/internal/peer/conn_test.go +++ b/client/internal/peer/conn_test.go @@ -9,11 +9,9 @@ import ( "github.com/magiconair/properties/assert" "github.com/pion/ice/v2" - "golang.org/x/sync/errgroup" "github.com/netbirdio/netbird/client/internal/proxy" "github.com/netbirdio/netbird/iface" - sproto "github.com/netbirdio/netbird/signal/proto" ) var connConf = ConnConfig{ @@ -170,310 +168,3 @@ func TestConn_Close(t *testing.T) { wg.Wait() } - -type mockICECandidate struct { - ice.Candidate - AddressFunc func() string - TypeFunc func() ice.CandidateType -} - -// Address mocks and overwrite ice.Candidate Address method -func (m *mockICECandidate) Address() string { - if m.AddressFunc != nil { - return m.AddressFunc() - } - return "" -} - -// Type mocks and overwrite ice.Candidate Type method -func (m *mockICECandidate) Type() ice.CandidateType { - if m.TypeFunc != nil { - return m.TypeFunc() - } - return ice.CandidateTypeUnspecified -} - -func TestConn_ShouldUseProxy(t *testing.T) { - publicHostCandidate := &mockICECandidate{ - AddressFunc: func() string { - return "8.8.8.8" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypeHost - }, - } - privateHostCandidate := &mockICECandidate{ - AddressFunc: func() string { - return "10.0.0.1" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypeHost - }, - } - - srflxCandidate := &mockICECandidate{ - AddressFunc: func() string { - return "1.1.1.1" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypeServerReflexive - }, - } - - prflxCandidate := &mockICECandidate{ - AddressFunc: func() string { - return "1.1.1.1" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypePeerReflexive - }, - } - - relayCandidate := &mockICECandidate{ - AddressFunc: func() string { - return "1.1.1.1" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypeRelay - }, - } - - testCases := []struct { - name string - candatePair *ice.CandidatePair - expected bool - }{ - { - name: "Use Proxy When Local Candidate Is Relay", - candatePair: &ice.CandidatePair{ - Local: relayCandidate, - Remote: privateHostCandidate, - }, - expected: true, - }, - { - name: "Use Proxy When Remote Candidate Is Relay", - candatePair: &ice.CandidatePair{ - Local: privateHostCandidate, - Remote: relayCandidate, - }, - expected: true, - }, - { - name: "Use Proxy When Local Candidate Is Peer Reflexive", - candatePair: &ice.CandidatePair{ - Local: prflxCandidate, - Remote: privateHostCandidate, - }, - expected: true, - }, - { - name: "Use Proxy When Remote Candidate Is Peer Reflexive", - candatePair: &ice.CandidatePair{ - Local: privateHostCandidate, - Remote: prflxCandidate, - }, - expected: true, - }, - { - name: "Don't Use Proxy When Local Candidate Is Public And Remote Is Private", - candatePair: &ice.CandidatePair{ - Local: publicHostCandidate, - Remote: privateHostCandidate, - }, - expected: false, - }, - { - name: "Don't Use Proxy When Remote Candidate Is Public And Local Is Private", - candatePair: &ice.CandidatePair{ - Local: privateHostCandidate, - Remote: publicHostCandidate, - }, - expected: false, - }, - { - name: "Don't Use Proxy When Local Candidate is Public And Remote Is Server Reflexive", - candatePair: &ice.CandidatePair{ - Local: publicHostCandidate, - Remote: srflxCandidate, - }, - expected: false, - }, - { - name: "Don't Use Proxy When Remote Candidate is Public And Local Is Server Reflexive", - candatePair: &ice.CandidatePair{ - Local: srflxCandidate, - Remote: publicHostCandidate, - }, - expected: false, - }, - { - name: "Don't Use Proxy When Both Candidates Are Public", - candatePair: &ice.CandidatePair{ - Local: publicHostCandidate, - Remote: publicHostCandidate, - }, - expected: false, - }, - { - name: "Don't Use Proxy When Both Candidates Are Private", - candatePair: &ice.CandidatePair{ - Local: privateHostCandidate, - Remote: privateHostCandidate, - }, - expected: false, - }, - { - name: "Don't Use Proxy When Both Candidates are in private network and one is peer reflexive", - candatePair: &ice.CandidatePair{ - Local: &mockICECandidate{AddressFunc: func() string { - return "10.16.102.168" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypeHost - }}, - Remote: &mockICECandidate{AddressFunc: func() string { - return "10.16.101.96" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypePeerReflexive - }}, - }, - expected: false, - }, - { - name: "Should Use Proxy When Both Candidates are in private network and both are peer reflexive", - candatePair: &ice.CandidatePair{ - Local: &mockICECandidate{AddressFunc: func() string { - return "10.16.102.168" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypePeerReflexive - }}, - Remote: &mockICECandidate{AddressFunc: func() string { - return "10.16.101.96" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypePeerReflexive - }}, - }, - expected: true, - }, - } - - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - result := shouldUseProxy(testCase.candatePair, false) - if result != testCase.expected { - t.Errorf("got a different result. Expected %t Got %t", testCase.expected, result) - } - }) - } -} - -func TestGetProxyWithMessageExchange(t *testing.T) { - publicHostCandidate := &mockICECandidate{ - AddressFunc: func() string { - return "8.8.8.8" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypeHost - }, - } - relayCandidate := &mockICECandidate{ - AddressFunc: func() string { - return "1.1.1.1" - }, - TypeFunc: func() ice.CandidateType { - return ice.CandidateTypeRelay - }, - } - - testCases := []struct { - name string - candatePair *ice.CandidatePair - inputDirectModeSupport bool - inputRemoteModeMessage bool - expected proxy.Type - }{ - { - name: "Should Result In Using Wireguard Proxy When Local Eval Is Use Proxy", - candatePair: &ice.CandidatePair{ - Local: relayCandidate, - Remote: publicHostCandidate, - }, - inputDirectModeSupport: true, - inputRemoteModeMessage: true, - expected: proxy.TypeWireGuard, - }, - { - name: "Should Result In Using Wireguard Proxy When Remote Eval Is Use Proxy", - candatePair: &ice.CandidatePair{ - Local: publicHostCandidate, - Remote: publicHostCandidate, - }, - inputDirectModeSupport: true, - inputRemoteModeMessage: false, - expected: proxy.TypeWireGuard, - }, - { - name: "Should Result In Using Wireguard Proxy When Remote Direct Mode Support Is False And Local Eval Is Use Proxy", - candatePair: &ice.CandidatePair{ - Local: relayCandidate, - Remote: publicHostCandidate, - }, - inputDirectModeSupport: false, - inputRemoteModeMessage: false, - expected: proxy.TypeWireGuard, - }, - { - name: "Should Result In Using Direct When Remote Direct Mode Support Is False And Local Eval Is No Use Proxy", - candatePair: &ice.CandidatePair{ - Local: publicHostCandidate, - Remote: publicHostCandidate, - }, - inputDirectModeSupport: false, - inputRemoteModeMessage: false, - expected: proxy.TypeDirectNoProxy, - }, - { - name: "Should Result In Using Direct When Local And Remote Eval Is No Proxy", - candatePair: &ice.CandidatePair{ - Local: publicHostCandidate, - Remote: publicHostCandidate, - }, - inputDirectModeSupport: true, - inputRemoteModeMessage: true, - expected: proxy.TypeDirectNoProxy, - }, - } - for _, testCase := range testCases { - t.Run(testCase.name, func(t *testing.T) { - g := errgroup.Group{} - conn, err := NewConn(connConf, nil, nil, nil) - if err != nil { - t.Fatal(err) - } - conn.meta.protoSupport.DirectCheck = testCase.inputDirectModeSupport - conn.SetSendSignalMessage(func(message *sproto.Message) error { - return nil - }) - - g.Go(func() error { - return conn.OnModeMessage(ModeMessage{ - Direct: testCase.inputRemoteModeMessage, - }) - }) - - resultProxy := conn.getProxyWithMessageExchange(testCase.candatePair, 1000) - - err = g.Wait() - if err != nil { - t.Error(err) - } - if resultProxy.Type() != testCase.expected { - t.Errorf("result didn't match expected value: Expected: %s, Got: %s", testCase.expected, resultProxy.Type()) - } - }) - } -} diff --git a/client/internal/proxy/direct.go b/client/internal/proxy/direct.go deleted file mode 100644 index 58603a831..000000000 --- a/client/internal/proxy/direct.go +++ /dev/null @@ -1,57 +0,0 @@ -package proxy - -import ( - log "github.com/sirupsen/logrus" - "net" -) - -// DirectNoProxy is used when there is no need for a proxy between ICE and WireGuard. -// This is possible in either of these cases: -// - peers are in the same local network -// - one of the peers has a public static IP (host) -// DirectNoProxy will just update remote peer with a remote host and fixed WireGuard port (r.g. 51820). -// In order DirectNoProxy to work, WireGuard port has to be fixed for the time being. -type DirectNoProxy struct { - config Config - // RemoteWgListenPort is a WireGuard port of a remote peer. - // It is used instead of the hardcoded 51820 port. - RemoteWgListenPort int -} - -// NewDirectNoProxy creates a new DirectNoProxy with a provided config and remote peer's WireGuard listen port -func NewDirectNoProxy(config Config, remoteWgPort int) *DirectNoProxy { - return &DirectNoProxy{config: config, RemoteWgListenPort: remoteWgPort} -} - -// Close removes peer from the WireGuard interface -func (p *DirectNoProxy) Close() error { - err := p.config.WgInterface.RemovePeer(p.config.RemoteKey) - if err != nil { - return err - } - return nil -} - -// Start just updates WireGuard peer with the remote IP and default WireGuard port -func (p *DirectNoProxy) Start(remoteConn net.Conn) error { - - log.Debugf("using DirectNoProxy while connecting to peer %s", p.config.RemoteKey) - addr, err := net.ResolveUDPAddr("udp", remoteConn.RemoteAddr().String()) - if err != nil { - return err - } - addr.Port = p.RemoteWgListenPort - err = p.config.WgInterface.UpdatePeer(p.config.RemoteKey, p.config.AllowedIps, DefaultWgKeepAlive, - addr, p.config.PreSharedKey) - - if err != nil { - return err - } - - return nil -} - -// Type returns the type of this proxy -func (p *DirectNoProxy) Type() Type { - return TypeDirectNoProxy -} diff --git a/go.mod b/go.mod index c25fde374..915163d6b 100644 --- a/go.mod +++ b/go.mod @@ -38,12 +38,14 @@ require ( github.com/gliderlabs/ssh v0.3.4 github.com/godbus/dbus/v5 v5.1.0 github.com/google/go-cmp v0.5.9 + github.com/google/gopacket v1.1.19 github.com/google/nftables v0.0.0-20220808154552-2eca00135732 github.com/hashicorp/go-secure-stdlib/base62 v0.1.2 github.com/hashicorp/go-version v1.6.0 github.com/libp2p/go-netroute v0.2.0 github.com/magiconair/properties v1.8.5 github.com/mattn/go-sqlite3 v1.14.16 + github.com/mdlayher/socket v0.4.0 github.com/miekg/dns v1.1.43 github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/open-policy-agent/opa v0.49.0 @@ -93,14 +95,12 @@ require ( github.com/go-stack/stack v1.8.0 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/goki/freetype v0.0.0-20181231101311-fa8a33aabaff // indirect - github.com/google/gopacket v1.1.19 // indirect github.com/hashicorp/go-uuid v1.0.2 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect - github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 // indirect + github.com/josharian/native v1.0.0 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/mdlayher/genetlink v1.1.0 // indirect - github.com/mdlayher/netlink v1.4.2 // indirect - github.com/mdlayher/socket v0.0.0-20211102153432-57e3fa563ecb // indirect + github.com/mdlayher/netlink v1.7.1 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/oxtoacart/bpool v0.0.0-20190530202638-03653db5a59c // indirect github.com/pegasus-kv/thrift v0.13.0 // indirect diff --git a/go.sum b/go.sum index 8ba648ed3..7ef776045 100644 --- a/go.sum +++ b/go.sum @@ -380,8 +380,9 @@ github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLf github.com/jackmordaunt/icns v0.0.0-20181231085925-4f16af745526/go.mod h1:UQkeMHVoNcyXYq9otUupF7/h/2tmHlhrS2zw7ZVvUqc= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= github.com/josephspurrier/goversioninfo v0.0.0-20200309025242-14b0ab84c6ca/go.mod h1:eJTEwMjXb7kZ633hO3Ln9mBUCOjX2+FlTljvpl9SYdE= -github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 h1:uhL5Gw7BINiiPAo24A2sxkcDI0Jt/sqp1v5xQCniEFA= github.com/josharian/native v0.0.0-20200817173448-b6b71def0850/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= +github.com/josharian/native v1.0.0 h1:Ts/E8zCSEsG17dUqv7joXJFybuMLjQfWE04tsBODTxk= +github.com/josharian/native v1.0.0/go.mod h1:7X/raswPFr05uY3HiLlYeyQntB6OO7E/d2Cu7qoaN2w= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/jsimonetti/rtnetlink v0.0.0-20190606172950-9527aa82566a/go.mod h1:Oz+70psSo5OFh8DBl0Zv2ACw7Esh6pPUphlvZG9x7uw= github.com/jsimonetti/rtnetlink v0.0.0-20200117123717-f846d4f6c1f4/go.mod h1:WGuG/smIU4J/54PblvSbh+xvCZmpJnFgr3ds6Z55XMQ= @@ -391,7 +392,6 @@ github.com/jsimonetti/rtnetlink v0.0.0-20201220180245-69540ac93943/go.mod h1:z4c github.com/jsimonetti/rtnetlink v0.0.0-20210122163228-8d122574c736/go.mod h1:ZXpIyOK59ZnN7J0BV99cZUPmsqDRZ3eq5X+st7u/oSA= github.com/jsimonetti/rtnetlink v0.0.0-20210212075122-66c871082f2b/go.mod h1:8w9Rh8m+aHZIG69YPGGem1i5VzoyRC8nw2kA8B+ik5U= github.com/jsimonetti/rtnetlink v0.0.0-20210525051524-4cc836578190/go.mod h1:NmKSdU4VGSiv1bMsdqNALI4RSvvjtz65tTMCnD05qLo= -github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786 h1:N527AHMa793TP5z5GNAn/VLPzlc0ewzWdeP/25gDfgQ= github.com/jsimonetti/rtnetlink v0.0.0-20211022192332-93da33804786/go.mod h1:v4hqbTdfQngbVSZJVWUhGE/lbTFf9jb+ygmNUDQMuOs= github.com/json-iterator/go v0.0.0-20180612202835-f2b4162afba3/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -441,7 +441,6 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5 github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= github.com/mdlayher/ethtool v0.0.0-20210210192532-2b88debcdd43/go.mod h1:+t7E0lkKfbBsebllff1xdTmyJt8lH37niI6kwFk9OTo= -github.com/mdlayher/ethtool v0.0.0-20211028163843-288d040e9d60 h1:tHdB+hQRHU10CfcK0furo6rSNgZ38JT8uPh70c/pFD8= github.com/mdlayher/ethtool v0.0.0-20211028163843-288d040e9d60/go.mod h1:aYbhishWc4Ai3I2U4Gaa2n3kHWSwzme6EsG/46HRQbE= github.com/mdlayher/genetlink v1.0.0/go.mod h1:0rJ0h4itni50A86M2kHcgS85ttZazNt7a8H2a2cw0Gc= github.com/mdlayher/genetlink v1.1.0 h1:k2YQT3959rJOF7gOvhdfQ0lut7QMIZiuVlJANheoZ+E= @@ -456,12 +455,14 @@ github.com/mdlayher/netlink v1.2.2-0.20210123213345-5cc92139ae3e/go.mod h1:bacnN github.com/mdlayher/netlink v1.3.0/go.mod h1:xK/BssKuwcRXHrtN04UBkwQ6dY9VviGGuriDdoPSWys= github.com/mdlayher/netlink v1.4.0/go.mod h1:dRJi5IABcZpBD2A3D0Mv/AiX8I9uDEu5oGkAVrekmf8= github.com/mdlayher/netlink v1.4.1/go.mod h1:e4/KuJ+s8UhfUpO9z00/fDZZmhSrs+oxyqAS9cNgn6Q= -github.com/mdlayher/netlink v1.4.2 h1:3sbnJWe/LETovA7yRZIX3f9McVOWV3OySH6iIBxiFfI= github.com/mdlayher/netlink v1.4.2/go.mod h1:13VaingaArGUTUxFLf/iEovKxXji32JAtF858jZYEug= +github.com/mdlayher/netlink v1.7.1 h1:FdUaT/e33HjEXagwELR8R3/KL1Fq5x3G5jgHLp/BTmg= +github.com/mdlayher/netlink v1.7.1/go.mod h1:nKO5CSjE/DJjVhk/TNp6vCE1ktVxEA8VEh8drhZzxsQ= github.com/mdlayher/socket v0.0.0-20210307095302-262dc9984e00/go.mod h1:GAFlyu4/XV68LkQKYzKhIo/WW7j3Zi0YRAz/BOoanUc= github.com/mdlayher/socket v0.0.0-20211007213009-516dcbdf0267/go.mod h1:nFZ1EtZYK8Gi/k6QNu7z7CgO20i/4ExeQswwWuPmG/g= -github.com/mdlayher/socket v0.0.0-20211102153432-57e3fa563ecb h1:2dC7L10LmTqlyMVzFJ00qM25lqESg9Z4u3GuEXN5iHY= github.com/mdlayher/socket v0.0.0-20211102153432-57e3fa563ecb/go.mod h1:nFZ1EtZYK8Gi/k6QNu7z7CgO20i/4ExeQswwWuPmG/g= +github.com/mdlayher/socket v0.4.0 h1:280wsy40IC9M9q1uPGcLBwXpcTQDtoGwVt+BNoITxIw= +github.com/mdlayher/socket v0.4.0/go.mod h1:xxFqz5GRCUN3UEOm9CZqEJsAbe1C8OwSK46NlmWuVoc= github.com/miekg/dns v1.1.43 h1:JKfpVSCB84vrAmHzyrsxB5NAr5kLoMXZArPSw7Qlgyg= github.com/miekg/dns v1.1.43/go.mod h1:+evo5L0630/F6ca/Z9+GAqzhjGyn8/c+TBaOyfEl0V4= github.com/mikioh/ipaddr v0.0.0-20190404000644-d465c8ab6721 h1:RlZweED6sbSArvlE924+mUcZuXKLBHA35U7LN621Bws= diff --git a/iface/bind/udp_mux_universal.go b/iface/bind/udp_mux_universal.go index 91f0ee18a..a8d8d4038 100644 --- a/iface/bind/udp_mux_universal.go +++ b/iface/bind/udp_mux_universal.go @@ -5,6 +5,7 @@ package bind */ import ( + "context" "fmt" "net" "time" @@ -68,6 +69,39 @@ func NewUniversalUDPMuxDefault(params UniversalUDPMuxParams) *UniversalUDPMuxDef return m } +// ReadFromConn reads from the m.params.UDPConn provided upon the creation. It expects STUN packets only, however, will +// just ignore other packets printing an warning message. +// It is a blocking method, consider running in a go routine. +func (m *UniversalUDPMuxDefault) ReadFromConn(ctx context.Context) { + buf := make([]byte, 1500) + for { + select { + case <-ctx.Done(): + log.Debugf("stopped reading from the UDPConn due to finished context") + return + default: + _, a, err := m.params.UDPConn.ReadFrom(buf) + if err != nil { + log.Errorf("error while reading packet %s", err) + continue + } + msg := &stun.Message{ + Raw: buf, + } + err = msg.Decode() + if err != nil { + log.Warnf("error while parsing STUN message. The packet doesn't seem to be a STUN packet: %s", err) + continue + } + + err = m.HandleSTUNMessage(msg, a) + if err != nil { + log.Errorf("error while handling STUn message: %s", err) + } + } + } +} + // udpConn is a wrapper around UDPMux conn that overrides ReadFrom and handles STUN/TURN packets type udpConn struct { net.PacketConn @@ -75,6 +109,11 @@ type udpConn struct { logger logging.LeveledLogger } +// GetSharedConn returns the shared udp conn +func (m *UniversalUDPMuxDefault) GetSharedConn() net.PacketConn { + return m.params.UDPConn +} + // GetListenAddresses returns the listen addr of this UDP func (m *UniversalUDPMuxDefault) GetListenAddresses() []net.Addr { return []net.Addr{m.LocalAddr()} diff --git a/iface/wg_configurer_nonandroid.go b/iface/wg_configurer_nonandroid.go index 5a2a70ea3..70ec5dc04 100644 --- a/iface/wg_configurer_nonandroid.go +++ b/iface/wg_configurer_nonandroid.go @@ -202,7 +202,7 @@ func (c *wGConfigurer) configure(config wgtypes.Config) error { if err != nil { return err } - log.Debugf("got Wireguard device %s", c.deviceName) + log.Tracef("got Wireguard device %s", c.deviceName) return wg.ConfigureDevice(c.deviceName, config) } diff --git a/sharedsock/filter.go b/sharedsock/filter.go new file mode 100644 index 000000000..da27639fb --- /dev/null +++ b/sharedsock/filter.go @@ -0,0 +1,11 @@ +package sharedsock + +import "golang.org/x/net/bpf" + +const magicCookie uint32 = 0x2112A442 + +// BPFFilter is a generic filter that provides ipv4 and ipv6 BPF instructions +type BPFFilter interface { + // GetInstructions returns raw BPF instructions for ipv4 and ipv6 + GetInstructions(port uint32) (ipv4 []bpf.RawInstruction, ipv6 []bpf.RawInstruction, err error) +} diff --git a/sharedsock/filter_linux.go b/sharedsock/filter_linux.go new file mode 100644 index 000000000..a9903f03f --- /dev/null +++ b/sharedsock/filter_linux.go @@ -0,0 +1,47 @@ +package sharedsock + +import "golang.org/x/net/bpf" + +// STUNFilter implements BPFFilter by filtering on STUN packets. +// Other packets (non STUN) will be forwarded to the process that own the port (e.g., WireGuard). +type STUNFilter struct { +} + +// NewSTUNFilter creates an instance of a STUNFilter +func NewSTUNFilter() BPFFilter { + return &STUNFilter{} +} + +// GetInstructions returns raw BPF instructions for ipv4 and ipv6 that filter out anything but STUN packets +func (sf STUNFilter) GetInstructions(port uint32) (raw4 []bpf.RawInstruction, raw6 []bpf.RawInstruction, err error) { + raw4, err = rawInstructions(22, 32, port) + if err != nil { + return nil, nil, err + } + raw6, err = rawInstructions(2, 12, port) + if err != nil { + return nil, nil, err + } + return raw4, raw6, nil +} + +func rawInstructions(portOff, cookieOff, port uint32) ([]bpf.RawInstruction, error) { + // UDP raw socket for ipv4 receives the rcvdPacket with IP headers + // UDP raw socket for ipv6 receives the rcvdPacket with UDP headers + instructions := []bpf.Instruction{ + // Load the source port from the UDP header (offset 22 for ipv4 and 2 for ipv6) + bpf.LoadAbsolute{Off: portOff, Size: 2}, + // Check if the source port is equal to the specified `port`. If not, skip the next 3 instructions. + bpf.JumpIf{Cond: bpf.JumpNotEqual, Val: port, SkipTrue: 3}, + // Load the 4-byte value (magic cookie) from the UDP payload (offset 32 for ipv4 and 12 for ipv6) + bpf.LoadAbsolute{Off: cookieOff, Size: 4}, + // Check if the loaded value is equal to the `magicCookie`. If not, skip the next instruction. + bpf.JumpIf{Cond: bpf.JumpNotEqual, Val: magicCookie, SkipTrue: 1}, + // If both the port and the magic cookie match, return a positive value (0xffffffff) + bpf.RetConstant{Val: 0xffffffff}, + // If either the port or the magic cookie doesn't match, return 0 + bpf.RetConstant{Val: 0}, + } + + return bpf.Assemble(instructions) +} diff --git a/sharedsock/filter_nolinux.go b/sharedsock/filter_nolinux.go new file mode 100644 index 000000000..3ae648ade --- /dev/null +++ b/sharedsock/filter_nolinux.go @@ -0,0 +1,8 @@ +//go:build !linux + +package sharedsock + +// NewSTUNFilter is a noop method just because we do not support BPF filters on other platforms than Linux +func NewSTUNFilter() BPFFilter { + return nil +} diff --git a/sharedsock/sock_linux.go b/sharedsock/sock_linux.go new file mode 100644 index 000000000..f6501928a --- /dev/null +++ b/sharedsock/sock_linux.go @@ -0,0 +1,327 @@ +//go:build linux && !android + +// Inspired by +// Jason Donenfeld (https://git.zx2c4.com/wireguard-tools/tree/contrib/nat-hole-punching/nat-punch-client.c#n96) +// and @stv0g in https://github.com/stv0g/cunicu/tree/ebpf-poc/ebpf_poc + +package sharedsock + +import ( + "context" + "fmt" + "net" + "sync" + "time" + + "github.com/google/gopacket" + "github.com/google/gopacket/layers" + "github.com/google/gopacket/routing" + "github.com/libp2p/go-netroute" + "github.com/mdlayher/socket" + log "github.com/sirupsen/logrus" + "golang.org/x/sync/errgroup" + "golang.org/x/sys/unix" +) + +// ErrSharedSockStopped indicates that shared socket has been stopped +var ErrSharedSockStopped = fmt.Errorf("shared socked stopped") + +// SharedSocket is a net.PacketConn that initiates two raw sockets (ipv4 and ipv6) and listens to UDP packets filtered +// by BPF instructions (e.g., STUNFilter that checks and sends only STUN packets to the listeners (ReadFrom)). +type SharedSocket struct { + ctx context.Context + conn4 *socket.Conn + conn6 *socket.Conn + port int + routerMux sync.RWMutex + router routing.Router + packetDemux chan rcvdPacket + cancel context.CancelFunc +} + +type rcvdPacket struct { + n int + addr unix.Sockaddr + buf []byte + err error +} + +type receiver func(ctx context.Context, p []byte, flags int) (int, unix.Sockaddr, error) + +var writeSerializerOptions = gopacket.SerializeOptions{ + ComputeChecksums: true, + FixLengths: true, +} + +// Listen creates an IPv4 and IPv6 raw sockets, starts a reader and routing table routines +func Listen(port int, filter BPFFilter) (net.PacketConn, error) { + var err error + ctx, cancel := context.WithCancel(context.Background()) + rawSock := &SharedSocket{ + ctx: ctx, + cancel: cancel, + port: port, + packetDemux: make(chan rcvdPacket), + } + + rawSock.router, err = netroute.New() + if err != nil { + return nil, fmt.Errorf("failed to create router: %rawSock", err) + } + + rawSock.conn4, err = socket.Socket(unix.AF_INET, unix.SOCK_RAW, unix.IPPROTO_UDP, "raw_udp4", nil) + if err != nil { + return nil, fmt.Errorf("socket.Socket for ipv4 failed with: %rawSock", err) + } + + rawSock.conn6, err = socket.Socket(unix.AF_INET6, unix.SOCK_RAW, unix.IPPROTO_UDP, "raw_udp6", nil) + if err != nil { + log.Errorf("socket.Socket for ipv6 failed with: %rawSock", err) + } + + ipv4Instructions, ipv6Instructions, err := filter.GetInstructions(uint32(rawSock.port)) + if err != nil { + _ = rawSock.Close() + return nil, fmt.Errorf("getBPFInstructions failed with: %rawSock", err) + } + + err = rawSock.conn4.SetBPF(ipv4Instructions) + if err != nil { + _ = rawSock.Close() + return nil, fmt.Errorf("socket4.SetBPF failed with: %rawSock", err) + } + if rawSock.conn6 != nil { + err = rawSock.conn6.SetBPF(ipv6Instructions) + if err != nil { + _ = rawSock.Close() + return nil, fmt.Errorf("socket6.SetBPF failed with: %rawSock", err) + } + } + + go rawSock.read(rawSock.conn4.Recvfrom) + if rawSock.conn6 != nil { + go rawSock.read(rawSock.conn6.Recvfrom) + } + + go rawSock.updateRouter() + + return rawSock, nil +} + +// updateRouter updates the listener routing table client +// this is needed to avoid outdated information across different client networks +func (s *SharedSocket) updateRouter() { + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + for { + select { + case <-s.ctx.Done(): + return + case <-ticker.C: + router, err := netroute.New() + if err != nil { + log.Errorf("failed to create and update packet router for stunListener: %s", err) + continue + } + s.routerMux.Lock() + s.router = router + s.routerMux.Unlock() + } + } +} + +// LocalAddr returns an IPv4 address using the supplied port +func (s *SharedSocket) LocalAddr() net.Addr { + // todo check impact on ipv6 discovery + return &net.UDPAddr{ + IP: net.IPv4zero, + Port: s.port, + } +} + +// SetDeadline sets both the read and write deadlines associated with the ipv4 and ipv6 Conn sockets +func (s *SharedSocket) SetDeadline(t time.Time) error { + err := s.conn4.SetDeadline(t) + if err != nil { + return fmt.Errorf("s.conn4.SetDeadline error: %s", err) + } + if s.conn6 == nil { + return nil + } + + err = s.conn6.SetDeadline(t) + if err != nil { + return fmt.Errorf("s.conn6.SetDeadline error: %s", err) + } + return nil +} + +// SetReadDeadline sets the read deadline associated with the ipv4 and ipv6 Conn sockets +func (s *SharedSocket) SetReadDeadline(t time.Time) error { + err := s.conn4.SetReadDeadline(t) + if err != nil { + return fmt.Errorf("s.conn4.SetReadDeadline error: %s", err) + } + if s.conn6 == nil { + return nil + } + + err = s.conn6.SetReadDeadline(t) + if err != nil { + return fmt.Errorf("s.conn6.SetReadDeadline error: %s", err) + } + return nil +} + +// SetWriteDeadline sets the write deadline associated with the ipv4 and ipv6 Conn sockets +func (s *SharedSocket) SetWriteDeadline(t time.Time) error { + err := s.conn4.SetWriteDeadline(t) + if err != nil { + return fmt.Errorf("s.conn4.SetWriteDeadline error: %s", err) + } + if s.conn6 == nil { + return nil + } + + err = s.conn6.SetWriteDeadline(t) + if err != nil { + return fmt.Errorf("s.conn6.SetWriteDeadline error: %s", err) + } + return nil +} + +// Close closes the underlying ipv4 and ipv6 conn sockets +func (s *SharedSocket) Close() error { + s.cancel() + errGrp := errgroup.Group{} + if s.conn4 != nil { + errGrp.Go(s.conn4.Close) + } + + if s.conn6 != nil { + errGrp.Go(s.conn6.Close) + } + return errGrp.Wait() +} + +// read start a read loop for a specific receiver and sends the packet to the packetDemux channel +func (s *SharedSocket) read(receiver receiver) { + for { + buf := make([]byte, 1500) + n, addr, err := receiver(s.ctx, buf, 0) + select { + case <-s.ctx.Done(): + return + case s.packetDemux <- rcvdPacket{n, addr, buf[:n], err}: + } + } +} + +// ReadFrom reads packets received in the packetDemux channel +func (s *SharedSocket) ReadFrom(b []byte) (n int, addr net.Addr, err error) { + var pkt rcvdPacket + select { + case <-s.ctx.Done(): + return -1, nil, ErrSharedSockStopped + case pkt = <-s.packetDemux: + } + + if pkt.err != nil { + return -1, nil, pkt.err + } + var ip4layer layers.IPv4 + var udp layers.UDP + var payload gopacket.Payload + var parser *gopacket.DecodingLayerParser + var ip net.IP + + if sa, isIPv4 := pkt.addr.(*unix.SockaddrInet4); isIPv4 { + ip = sa.Addr[:] + parser = gopacket.NewDecodingLayerParser(layers.LayerTypeIPv4, &ip4layer, &udp, &payload) + } else if sa, isIPv6 := pkt.addr.(*unix.SockaddrInet6); isIPv6 { + ip = sa.Addr[:] + parser = gopacket.NewDecodingLayerParser(layers.LayerTypeUDP, &udp, &payload) + } else { + return -1, nil, fmt.Errorf("received invalid address family") + } + + decodedLayers := make([]gopacket.LayerType, 0, 3) + + err = parser.DecodeLayers(pkt.buf[:], &decodedLayers) + if err != nil { + return 0, nil, err + } + + remoteAddr := &net.UDPAddr{ + IP: ip, + Port: int(udp.SrcPort), + } + + copy(b, payload) + return int(udp.Length), remoteAddr, nil +} + +// WriteTo builds a UDP packet and writes it using the specific IP version writter +func (s *SharedSocket) WriteTo(buf []byte, rAddr net.Addr) (n int, err error) { + rUDPAddr, ok := rAddr.(*net.UDPAddr) + if !ok { + return -1, fmt.Errorf("invalid address type") + } + + buffer := gopacket.NewSerializeBuffer() + payload := gopacket.Payload(buf) + + udp := &layers.UDP{ + SrcPort: layers.UDPPort(s.port), + DstPort: layers.UDPPort(rUDPAddr.Port), + } + + s.routerMux.RLock() + defer s.routerMux.RUnlock() + + _, _, src, err := s.router.Route(rUDPAddr.IP) + if err != nil { + return 0, fmt.Errorf("got an error while checking route, err: %s", err) + } + + rSockAddr, conn, nwLayer := s.getWriterObjects(src, rUDPAddr.IP) + + if err := udp.SetNetworkLayerForChecksum(nwLayer); err != nil { + return -1, fmt.Errorf("failed to set network layer for checksum: %w", err) + } + + if err := gopacket.SerializeLayers(buffer, writeSerializerOptions, udp, payload); err != nil { + return -1, fmt.Errorf("failed serialize rcvdPacket: %s", err) + } + + bufser := buffer.Bytes() + + return 0, conn.Sendto(context.TODO(), bufser, 0, rSockAddr) +} + +// getWriterObjects returns the specific IP version objects that are used to build a packet and send it using the raw socket +func (s *SharedSocket) getWriterObjects(src, dest net.IP) (sa unix.Sockaddr, conn *socket.Conn, layer gopacket.NetworkLayer) { + if dest.To4() == nil { + sa = &unix.SockaddrInet6{} + copy(sa.(*unix.SockaddrInet6).Addr[:], dest.To16()) + conn = s.conn6 + + layer = &layers.IPv6{ + SrcIP: src, + DstIP: dest, + } + } else { + sa = &unix.SockaddrInet4{} + copy(sa.(*unix.SockaddrInet4).Addr[:], dest.To4()) + conn = s.conn4 + layer = &layers.IPv4{ + Version: 4, + TTL: 64, + Protocol: layers.IPProtocolUDP, + SrcIP: src, + DstIP: dest, + } + } + + return sa, conn, layer +} diff --git a/sharedsock/sock_linux_test.go b/sharedsock/sock_linux_test.go new file mode 100644 index 000000000..8e774cafc --- /dev/null +++ b/sharedsock/sock_linux_test.go @@ -0,0 +1,162 @@ +package sharedsock + +import ( + "context" + "errors" + "fmt" + "net" + "net/netip" + "os" + "sync" + "testing" + "time" + + "github.com/pion/stun" + log "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" +) + +func TestShouldReadSTUNOnReadFrom(t *testing.T) { + + // create raw socket on a port + testingPort := 51821 + rawSock, err := Listen(testingPort, NewSTUNFilter()) + require.NoError(t, err, "received an error while creating STUN listener, error: %s", err) + err = rawSock.SetReadDeadline(time.Now().Add(3 * time.Second)) + require.NoError(t, err, "unable to set deadline, error: %s", err) + + wg := sync.WaitGroup{} + wg.Add(1) + + // when reading from the raw socket + buf := make([]byte, 1500) + rcvMSG := &stun.Message{ + Raw: buf, + } + ctx, cancel := context.WithTimeout(context.TODO(), 10*time.Second) + defer cancel() + + go func() { + select { + case <-ctx.Done(): + return + default: + _, _, err := rawSock.ReadFrom(buf) + if err != nil { + log.Errorf("error while reading packet %s", err) + return + } + + err = rcvMSG.Decode() + if err != nil { + log.Warnf("error while parsing STUN message. The packet doesn't seem to be a STUN packet: %s", err) + return + } + wg.Done() + } + + }() + + // and sending STUN packet to the shared port, the packet has to be handled + udpListener, err := net.ListenUDP("udp", &net.UDPAddr{Port: 12345, IP: net.ParseIP("127.0.0.1")}) + require.NoError(t, err, "received an error while creating regular listener, error: %s", err) + defer udpListener.Close() + stunMSG, err := stun.Build(stun.NewType(stun.MethodBinding, stun.ClassRequest), stun.TransactionID, + stun.Fingerprint, + ) + require.NoError(t, err, "unable to build stun msg, error: %s", err) + _, err = udpListener.WriteTo(stunMSG.Raw, net.UDPAddrFromAddrPort(netip.MustParseAddrPort(fmt.Sprintf("127.0.0.1:%d", testingPort)))) + require.NoError(t, err, "received an error while writing the stun listener, error: %s", err) + + // the packet has to be handled and be a STUN packet + wg.Wait() + require.EqualValues(t, stunMSG.TransactionID, rcvMSG.TransactionID, "transaction id values did't match") +} + +func TestShouldNotReadNonSTUNPackets(t *testing.T) { + testingPort := 39439 + rawSock, err := Listen(testingPort, NewSTUNFilter()) + require.NoError(t, err, "received an error while creating STUN listener, error: %s", err) + defer rawSock.Close() + + buf := make([]byte, 1500) + err = rawSock.SetReadDeadline(time.Now().Add(time.Second)) + require.NoError(t, err, "unable to set deadline, error: %s", err) + + errGrp := errgroup.Group{} + errGrp.Go(func() error { + _, _, err := rawSock.ReadFrom(buf) + return err + }) + nonStun := []byte("netbird") + udpListener, err := net.ListenUDP("udp", &net.UDPAddr{Port: 0, IP: net.ParseIP("127.0.0.1")}) + require.NoError(t, err, "received an error while creating regular listener, error: %s", err) + defer udpListener.Close() + remote := net.UDPAddrFromAddrPort(netip.MustParseAddrPort(fmt.Sprintf("127.0.0.1:%d", testingPort))) + _, err = udpListener.WriteTo(nonStun, remote) + require.NoError(t, err, "received an error while writing the stun listener, error: %s", err) + + err = errGrp.Wait() + require.Error(t, err, "should receive an error") + if !errors.Is(err, os.ErrDeadlineExceeded) { + t.Errorf("error should be I/O timeout, got: %s", err) + } +} + +func TestWriteTo(t *testing.T) { + udpListener, err := net.ListenUDP("udp4", &net.UDPAddr{Port: 0, IP: net.ParseIP("127.0.0.1")}) + require.NoError(t, err, "received an error while creating regular listener, error: %s", err) + defer udpListener.Close() + + testingPort := 39440 + rawSock, err := Listen(testingPort, NewSTUNFilter()) + require.NoError(t, err, "received an error while creating STUN listener, error: %s", err) + defer rawSock.Close() + + buf := make([]byte, 1500) + err = udpListener.SetReadDeadline(time.Now().Add(3 * time.Second)) + require.NoError(t, err, "unable to set deadline, error: %s", err) + + errGrp := errgroup.Group{} + var remoteAdr net.Addr + var rcvBytes int + errGrp.Go(func() error { + n, a, err := udpListener.ReadFrom(buf) + remoteAdr = a + rcvBytes = n + return err + }) + + msg := []byte("netbird") + _, err = rawSock.WriteTo(msg, udpListener.LocalAddr()) + require.NoError(t, err, "received an error while writing the stun listener, error: %s", err) + + err = errGrp.Wait() + require.NoError(t, err, "received an error while reading the packet, error: %s", err) + + require.EqualValues(t, string(msg), string(buf[:rcvBytes]), "received message should match") + + udpRcv, ok := remoteAdr.(*net.UDPAddr) + require.True(t, ok, "udp address conversion didn't work") + + require.EqualValues(t, testingPort, udpRcv.Port, "received address port didn't match") +} + +func TestSharedSocket_Close(t *testing.T) { + rawSock, err := Listen(39440, NewSTUNFilter()) + require.NoError(t, err, "received an error while creating STUN listener, error: %s", err) + + errGrp := errgroup.Group{} + + errGrp.Go(func() error { + buf := make([]byte, 1500) + _, _, err := rawSock.ReadFrom(buf) + return err + }) + _ = rawSock.Close() + err = errGrp.Wait() + if err != ErrSharedSockStopped { + t.Errorf("invalid error response: %s", err) + } +} diff --git a/sharedsock/sock_nolinux.go b/sharedsock/sock_nolinux.go new file mode 100644 index 000000000..07badb550 --- /dev/null +++ b/sharedsock/sock_nolinux.go @@ -0,0 +1,14 @@ +//go:build !linux + +package sharedsock + +import ( + "fmt" + "net" + "runtime" +) + +// Listen is not supported on other platforms +func Listen(port int, filter BPFFilter) (net.PacketConn, error) { + return nil, fmt.Errorf(fmt.Sprintf("Not supported OS %s. SharedSocket is only supported on Linux", runtime.GOOS)) +}