Compare commits

...

50 Commits

Author SHA1 Message Date
Zoltán Papp
a4b8ddac52 Fix missing required parameter for Bind proxy test 2025-09-04 11:14:57 +02:00
Zoltán Papp
0ce32ec527 Fix unit test data 2025-09-04 10:24:57 +02:00
Zoltán Papp
0a03145709 Fix test on darwin 2025-09-04 10:13:07 +02:00
Zoltán Papp
af18385291 Merge branch 'debug-slow-conn' into debug-and-fixes 2025-09-03 19:07:52 +02:00
Zoltán Papp
b1f6e9b12d Merge branch 'fix/pkg-loss' into debug-and-fixes 2025-09-03 19:07:28 +02:00
Zoltán Papp
180b52ba26 Merge branch 'main' into fix/pkg-loss 2025-09-03 16:27:17 +02:00
Zoltán Papp
f86a7f745e Fix logging library 2025-09-03 14:08:53 +02:00
Zoltán Papp
fd13247d66 Explicit print out the first success handshake time 2025-09-03 14:06:28 +02:00
Zoltán Papp
1d83fccd9c Improve logging for management service connection and system info gathering 2025-09-03 13:28:04 +02:00
Zoltán Papp
7d3c972653 Do not block Offer processing from relay worker
- do not miss ICE offers when relay worker busy
- close p2p connection before recreate agent
2025-09-02 22:17:13 +02:00
Viktor Liu
71e944fa57 [relay] Let relay accept any origin (#4426) 2025-09-01 19:51:06 +02:00
Maycon Santos
d39fcfd62a [management] Add user approval (#4411)
This PR adds user approval functionality to the management system, allowing administrators to manually approve new users joining via domain matching. When enabled, users are blocked with pending approval status until explicitly approved by an admin.

Adds UserApprovalRequired setting to control manual user approval requirement
Introduces user approval and rejection endpoints with corresponding business logic
Prevents pending approval users from adding peers or logging in
2025-09-01 18:00:45 +02:00
Zoltan Papp
21368b38d9 [client] Update Pion ICE to the latest version (#4388)
- Update Pion version
- Update protobuf version
2025-09-01 10:42:01 +02:00
Maycon Santos
d817584f52 [misc] fix Windows client and management bench tests (#4424)
Windows tests had too many directories, causing issues to the payload via psexec.

Also migrated all checked benchmarks to send data to grafana.
2025-08-31 17:19:56 +02:00
Pascal Fischer
4d3dc3475d [management] remove duplicated removal of groups on peer delete (#4421) 2025-08-30 12:47:13 +02:00
Pascal Fischer
6fc50a438f [management] remove withContext from store methods (#4422) 2025-08-30 12:46:54 +02:00
Vlad
149559a06b [management] login filter to fix multiple peers connected with the same pub key (#3986) 2025-08-29 19:48:40 +02:00
Pascal Fischer
e14c6de203 [management] fix ephemeral flag on peer batch response (#4420) 2025-08-29 17:41:20 +02:00
Viktor Liu
d4c067f0af [client] Don't deactivate upstream resolvers on failure (#4128) 2025-08-29 17:40:05 +02:00
Pascal Fischer
dbefa8bd9f [management] remove lock and continue user update on failure (#4410) 2025-08-28 17:50:12 +02:00
Pascal Fischer
4fd10b9447 [management] split high latency grpc metrics (#4408) 2025-08-28 13:25:40 +02:00
Viktor Liu
aa595c3073 [client] Fix shared sock buffer allocation (#4409) 2025-08-28 13:25:16 +02:00
Zoltán Papp
990aa8f7cd Remove wg initiator logic 2025-02-24 10:46:22 +01:00
Zoltán Papp
2e6a44af1f Merge branch 'main' into fix/pkg-loss 2025-02-24 10:44:32 +01:00
Zoltán Papp
559d347588 Revert async WireGuard handshake 2025-02-21 16:19:28 +01:00
Zoltán Papp
06d71257b4 Build rawsocket code on linux only. 2025-02-21 16:00:31 +01:00
Zoltán Papp
62d10496ee Merge branch 'main' into fix/pkg-loss 2025-02-21 15:52:51 +01:00
Zoltán Papp
651e88d611 Eliminate code duplication 2025-02-21 14:59:42 +01:00
Zoltán Papp
d496d21693 Apply same pausedCond logic on all implementation 2025-02-21 14:55:56 +01:00
Zoltan Papp
648b4cdf72 Update client/iface/wgproxy/udp/proxy.go
Co-authored-by: Viktor Liu <17948409+lixmal@users.noreply.github.com>
2025-02-21 14:50:29 +01:00
Zoltán Papp
f0020ad4ce Fallback to package loss solution if the raw socket does not work. 2025-02-18 13:58:20 +01:00
Zoltán Papp
1eacff250e Remove WireGuard kernel code from FreeBSD 2025-02-18 13:47:42 +01:00
Zoltán Papp
1f83ba4563 Ignore err in tests 2025-02-17 22:06:04 +01:00
Zoltán Papp
3d80a25b4d Fix possible blocker if the bind will be closed earlier then proxy 2025-02-17 22:03:15 +01:00
Zoltán Papp
1963644c99 Add close test for all implementation 2025-02-17 21:47:34 +01:00
Zoltán Papp
360c7134f7 Add unit test 2025-02-17 21:26:37 +01:00
Zoltán Papp
775b4feb7e Fix close operation 2025-02-17 20:17:47 +01:00
Zoltán Papp
aca443bdec Build UDP proxy on Linux only 2025-02-17 19:26:40 +01:00
Zoltán Papp
335866ac60 Close unused rawsocket 2025-02-17 15:56:48 +01:00
Zoltán Papp
082452eb5f Add info log line 2025-02-17 15:50:49 +01:00
Zoltan Papp
b17c1d96a5 [client] Improve WireGuard handshake success rate (#3092)
The controller peer sends WireGuard handshake requests only
2025-02-17 15:50:15 +01:00
Zoltán Papp
2d5b5f59c2 Remove log line 2025-02-17 15:49:04 +01:00
Zoltán Papp
d5042f688f Fix interface changes 2025-02-17 15:47:20 +01:00
Zoltán Papp
4db73a13d7 Implement redirect logic in UDP proxy 2025-02-17 15:47:20 +01:00
Zoltán Papp
06a17f0eee Implement redirect to in eBPF proxy 2025-02-17 15:47:20 +01:00
Zoltán Papp
1f088b7e69 Extend the proxy interface with RedirectTo function and implement it in Bind proxy 2025-02-17 15:47:20 +01:00
Zoltan Papp
ffe74365a8 Code cleaning 2025-02-17 15:47:20 +01:00
Zoltán Papp
6a0f6efc18 Always stop timer 2025-02-17 15:47:20 +01:00
Zoltán Papp
bfa6df13c5 The non handshake initiator peer start the handshake after timeout 2025-02-17 15:47:20 +01:00
Zoltán Papp
e9b3b6210d Improve WireGuard handshake success rate
The controller peer sends WireGuard
handshake requests only
2025-02-17 15:47:20 +01:00
91 changed files with 4325 additions and 888 deletions

View File

@@ -382,6 +382,32 @@ jobs:
store: [ 'sqlite', 'postgres' ]
runs-on: ubuntu-22.04
steps:
- name: Create Docker network
run: docker network create promnet
- name: Start Prometheus Pushgateway
run: docker run -d --name pushgateway --network promnet -p 9091:9091 prom/pushgateway
- name: Start Prometheus (for Pushgateway forwarding)
run: |
echo '
global:
scrape_interval: 15s
scrape_configs:
- job_name: "pushgateway"
static_configs:
- targets: ["pushgateway:9091"]
remote_write:
- url: ${{ secrets.GRAFANA_URL }}
basic_auth:
username: ${{ secrets.GRAFANA_USER }}
password: ${{ secrets.GRAFANA_API_KEY }}
' > prometheus.yml
docker run -d --name prometheus --network promnet \
-v $PWD/prometheus.yml:/etc/prometheus/prometheus.yml \
-p 9090:9090 \
prom/prometheus
- name: Install Go
uses: actions/setup-go@v5
with:
@@ -428,9 +454,10 @@ jobs:
CGO_ENABLED=1 GOARCH=${{ matrix.arch }} \
NETBIRD_STORE_ENGINE=${{ matrix.store }} \
CI=true \
GIT_BRANCH=${{ github.ref_name }} \
go test -tags devcert -run=^$ -bench=. \
-exec 'sudo --preserve-env=CI,NETBIRD_STORE_ENGINE' \
-timeout 20m ./management/... ./shared/management/...
-exec 'sudo --preserve-env=CI,NETBIRD_STORE_ENGINE,GIT_BRANCH,GITHUB_RUN_ID' \
-timeout 20m ./management/... ./shared/management/... $(go list ./management/... ./shared/management/... | grep -v -e /management/server/http)
api_benchmark:
name: "Management / Benchmark (API)"
@@ -521,7 +548,7 @@ jobs:
-run=^$ \
-bench=. \
-exec 'sudo --preserve-env=CI,NETBIRD_STORE_ENGINE,GIT_BRANCH,GITHUB_RUN_ID' \
-timeout 20m ./management/... ./shared/management/...
-timeout 20m ./management/server/http/...
api_integration_test:
name: "Management / Integration"
@@ -571,4 +598,4 @@ jobs:
CI=true \
go test -tags=integration \
-exec 'sudo --preserve-env=CI,NETBIRD_STORE_ENGINE' \
-timeout 20m ./management/... ./shared/management/...
-timeout 20m ./management/server/http/...

View File

@@ -63,7 +63,7 @@ jobs:
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe env -w GOMODCACHE=${{ env.cache }}
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe env -w GOCACHE=${{ env.modcache }}
- run: PsExec64 -s -w ${{ github.workspace }} C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe mod tidy
- run: echo "files=$(go list ./... | ForEach-Object { $_ } | Where-Object { $_ -notmatch '/management' })" >> $env:GITHUB_ENV
- run: echo "files=$(go list ./... | ForEach-Object { $_ } | Where-Object { $_ -notmatch '/management' } | Where-Object { $_ -notmatch '/relay' } | Where-Object { $_ -notmatch '/signal' })" >> $env:GITHUB_ENV
- name: test
run: PsExec64 -s -w ${{ github.workspace }} cmd.exe /c "C:\hostedtoolcache\windows\go\${{ steps.go.outputs.go-version }}\x64\bin\go.exe test -tags=devcert -timeout 10m -p 1 ${{ env.files }} > test-out.txt 2>&1"

View File

@@ -1,5 +1,17 @@
package bind
import wgConn "golang.zx2c4.com/wireguard/conn"
import (
"net"
wgConn "golang.zx2c4.com/wireguard/conn"
)
type Endpoint = wgConn.StdNetEndpoint
func EndpointToUDPAddr(e Endpoint) *net.UDPAddr {
return &net.UDPAddr{
IP: e.Addr().AsSlice(),
Port: int(e.Port()),
Zone: e.Addr().Zone(),
}
}

View File

@@ -1,6 +1,7 @@
package bind
import (
"context"
"encoding/binary"
"fmt"
"net"
@@ -8,7 +9,7 @@ import (
"runtime"
"sync"
"github.com/pion/stun/v2"
"github.com/pion/stun/v3"
"github.com/pion/transport/v3"
log "github.com/sirupsen/logrus"
"golang.org/x/net/ipv4"
@@ -41,7 +42,7 @@ func (rc receiverCreator) CreateIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UD
// use the port because in the Send function the wgConn.Endpoint the port info is not exported.
type ICEBind struct {
*wgConn.StdNetBind
RecvChan chan RecvMessage
recvChan chan RecvMessage
transportNet transport.Net
filterFn FilterFn
@@ -64,7 +65,7 @@ func NewICEBind(transportNet transport.Net, filterFn FilterFn, address wgaddr.Ad
b, _ := wgConn.NewStdNetBind().(*wgConn.StdNetBind)
ib := &ICEBind{
StdNetBind: b,
RecvChan: make(chan RecvMessage, 1),
recvChan: make(chan RecvMessage, 1),
transportNet: transportNet,
filterFn: filterFn,
endpoints: make(map[netip.Addr]net.Conn),
@@ -154,6 +155,14 @@ func (b *ICEBind) Send(bufs [][]byte, ep wgConn.Endpoint) error {
return nil
}
func (b *ICEBind) Recv(ctx context.Context, msg RecvMessage) {
select {
case <-ctx.Done():
return
case b.recvChan <- msg:
}
}
func (s *ICEBind) createIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UDPConn, rxOffload bool, msgsPool *sync.Pool) wgConn.ReceiveFunc {
s.muUDPMux.Lock()
defer s.muUDPMux.Unlock()
@@ -270,7 +279,7 @@ func (c *ICEBind) receiveRelayed(buffs [][]byte, sizes []int, eps []wgConn.Endpo
select {
case <-c.closedChan:
return 0, net.ErrClosed
case msg, ok := <-c.RecvChan:
case msg, ok := <-c.recvChan:
if !ok {
return 0, net.ErrClosed
}

View File

@@ -8,9 +8,9 @@ import (
"strings"
"sync"
"github.com/pion/ice/v3"
"github.com/pion/ice/v4"
"github.com/pion/logging"
"github.com/pion/stun/v2"
"github.com/pion/stun/v3"
"github.com/pion/transport/v3"
"github.com/pion/transport/v3/stdnet"
log "github.com/sirupsen/logrus"

View File

@@ -15,7 +15,7 @@ import (
log "github.com/sirupsen/logrus"
"github.com/pion/logging"
"github.com/pion/stun/v2"
"github.com/pion/stun/v3"
"github.com/pion/transport/v3"
"github.com/netbirdio/netbird/client/iface/bufsize"

View File

@@ -394,6 +394,13 @@ func toLastHandshake(stringVar string) (time.Time, error) {
if err != nil {
return time.Time{}, fmt.Errorf("parse handshake sec: %w", err)
}
// If sec is 0 (Unix epoch), return zero time instead
// This indicates no handshake has occurred
if sec == 0 {
return time.Time{}, nil
}
return time.Unix(sec, 0), nil
}

View File

@@ -0,0 +1,40 @@
//go:build freebsd
package iface
import (
"fmt"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/device"
"github.com/netbirdio/netbird/client/iface/netstack"
"github.com/netbirdio/netbird/client/iface/wgproxy"
)
// NewWGIFace Creates a new WireGuard interface instance
func NewWGIFace(opts WGIFaceOpts) (*WGIface, error) {
wgAddress, err := device.ParseWGAddress(opts.Address)
if err != nil {
return nil, err
}
wgIFace := &WGIface{}
if netstack.IsEnabled() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn)
wgIFace.tun = device.NewNetstackDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind, netstack.ListenAddr())
wgIFace.userspaceBind = true
wgIFace.wgProxyFactory = wgproxy.NewUSPFactory(iceBind)
return wgIFace, nil
}
if device.ModuleTunIsLoaded() {
iceBind := bind.NewICEBind(opts.TransportNet, opts.FilterFn)
wgIFace.tun = device.NewUSPDevice(opts.IFaceName, wgAddress, opts.WGPort, opts.WGPrivKey, opts.MTU, iceBind)
wgIFace.userspaceBind = true
wgIFace.wgProxyFactory = wgproxy.NewUSPFactory(iceBind)
return wgIFace, nil
}
return nil, fmt.Errorf("couldn't check or load tun module")
}

View File

@@ -1,4 +1,4 @@
//go:build (linux && !android) || freebsd
//go:build linux && !android
package iface

View File

@@ -16,28 +16,37 @@ import (
"github.com/netbirdio/netbird/client/iface/wgproxy/listener"
)
type IceBind interface {
SetEndpoint(fakeIP netip.Addr, conn net.Conn)
RemoveEndpoint(fakeIP netip.Addr)
Recv(ctx context.Context, msg bind.RecvMessage)
MTU() uint16
}
type ProxyBind struct {
Bind *bind.ICEBind
bind IceBind
fakeNetIP *netip.AddrPort
wgBindEndpoint *bind.Endpoint
remoteConn net.Conn
ctx context.Context
cancel context.CancelFunc
closeMu sync.Mutex
closed bool
// wgRelayedEndpoint is a fake address that generated by the Bind.SetEndpoint based on the remote NetBird peer address
wgRelayedEndpoint *bind.Endpoint
wgCurrentUsed *bind.Endpoint
remoteConn net.Conn
ctx context.Context
cancel context.CancelFunc
closeMu sync.Mutex
closed bool
pausedMu sync.Mutex
paused bool
isStarted bool
paused bool
pausedCond *sync.Cond
isStarted bool
closeListener *listener.CloseListener
}
func NewProxyBind(bind *bind.ICEBind) *ProxyBind {
func NewProxyBind(bind IceBind) *ProxyBind {
p := &ProxyBind{
Bind: bind,
bind: bind,
closeListener: listener.NewCloseListener(),
pausedCond: sync.NewCond(&sync.Mutex{}),
}
return p
@@ -46,25 +55,25 @@ func NewProxyBind(bind *bind.ICEBind) *ProxyBind {
// AddTurnConn adds a new connection to the bind.
// endpoint is the NetBird address of the remote peer. The SetEndpoint return with the address what will be used in the
// WireGuard configuration.
//
// Parameters:
// - ctx: Context is used for proxyToLocal to avoid unnecessary error messages
// - nbAddr: The NetBird UDP address of the remote peer, it required to generate fake address
// - remoteConn: The established TURN connection to the remote peer
func (p *ProxyBind) AddTurnConn(ctx context.Context, nbAddr *net.UDPAddr, remoteConn net.Conn) error {
fakeNetIP, err := fakeAddress(nbAddr)
if err != nil {
return err
}
p.fakeNetIP = fakeNetIP
p.wgBindEndpoint = &bind.Endpoint{AddrPort: *fakeNetIP}
p.wgRelayedEndpoint = &bind.Endpoint{AddrPort: *fakeNetIP}
p.remoteConn = remoteConn
p.ctx, p.cancel = context.WithCancel(ctx)
return nil
}
func (p *ProxyBind) EndpointAddr() *net.UDPAddr {
return &net.UDPAddr{
IP: p.fakeNetIP.Addr().AsSlice(),
Port: int(p.fakeNetIP.Port()),
Zone: p.fakeNetIP.Addr().Zone(),
}
return bind.EndpointToUDPAddr(*p.wgRelayedEndpoint)
}
func (p *ProxyBind) SetDisconnectListener(disconnected func()) {
@@ -76,17 +85,22 @@ func (p *ProxyBind) Work() {
return
}
p.Bind.SetEndpoint(p.fakeNetIP.Addr(), p.remoteConn)
p.bind.SetEndpoint(p.wgRelayedEndpoint.Addr(), p.remoteConn)
p.pausedMu.Lock()
p.pausedCond.L.Lock()
p.paused = false
p.pausedMu.Unlock()
p.wgCurrentUsed = p.wgRelayedEndpoint
// Start the proxy only once
if !p.isStarted {
p.isStarted = true
go p.proxyToLocal(p.ctx)
}
p.pausedCond.L.Unlock()
// todo: review to should be inside the lock scope
p.pausedCond.Signal()
}
func (p *ProxyBind) Pause() {
@@ -94,9 +108,25 @@ func (p *ProxyBind) Pause() {
return
}
p.pausedMu.Lock()
p.pausedCond.L.Lock()
p.paused = true
p.pausedMu.Unlock()
p.pausedCond.L.Unlock()
}
func (p *ProxyBind) RedirectAs(endpoint *net.UDPAddr) {
p.pausedCond.L.Lock()
p.paused = false
p.wgCurrentUsed = addrToEndpoint(endpoint)
p.pausedCond.L.Unlock()
p.pausedCond.Signal()
}
func addrToEndpoint(addr *net.UDPAddr) *bind.Endpoint {
ip, _ := netip.AddrFromSlice(addr.IP.To4())
addrPort := netip.AddrPortFrom(ip, uint16(addr.Port))
return &bind.Endpoint{AddrPort: addrPort}
}
func (p *ProxyBind) CloseConn() error {
@@ -107,6 +137,10 @@ func (p *ProxyBind) CloseConn() error {
}
func (p *ProxyBind) close() error {
if p.remoteConn == nil {
return nil
}
p.closeMu.Lock()
defer p.closeMu.Unlock()
@@ -120,7 +154,12 @@ func (p *ProxyBind) close() error {
p.cancel()
p.Bind.RemoveEndpoint(p.fakeNetIP.Addr())
p.pausedCond.L.Lock()
p.paused = false
p.pausedCond.L.Unlock()
p.pausedCond.Signal()
p.bind.RemoveEndpoint(p.wgRelayedEndpoint.Addr())
if rErr := p.remoteConn.Close(); rErr != nil && !errors.Is(rErr, net.ErrClosed) {
return rErr
@@ -136,7 +175,7 @@ func (p *ProxyBind) proxyToLocal(ctx context.Context) {
}()
for {
buf := make([]byte, p.Bind.MTU()+bufsize.WGBufferOverhead)
buf := make([]byte, p.bind.MTU()+bufsize.WGBufferOverhead)
n, err := p.remoteConn.Read(buf)
if err != nil {
if ctx.Err() != nil {
@@ -147,18 +186,17 @@ func (p *ProxyBind) proxyToLocal(ctx context.Context) {
return
}
p.pausedMu.Lock()
if p.paused {
p.pausedMu.Unlock()
continue
p.pausedCond.L.Lock()
for p.paused {
p.pausedCond.Wait()
}
msg := bind.RecvMessage{
Endpoint: p.wgBindEndpoint,
Endpoint: p.wgCurrentUsed,
Buffer: buf[:n],
}
p.Bind.RecvChan <- msg
p.pausedMu.Unlock()
p.bind.Recv(ctx, msg)
p.pausedCond.L.Unlock()
}
}

View File

@@ -6,9 +6,7 @@ import (
"context"
"fmt"
"net"
"os"
"sync"
"syscall"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
@@ -18,6 +16,7 @@ import (
nberrors "github.com/netbirdio/netbird/client/errors"
"github.com/netbirdio/netbird/client/iface/bufsize"
"github.com/netbirdio/netbird/client/iface/wgproxy/rawsocket"
"github.com/netbirdio/netbird/client/internal/ebpf"
ebpfMgr "github.com/netbirdio/netbird/client/internal/ebpf/manager"
nbnet "github.com/netbirdio/netbird/util/net"
@@ -27,6 +26,10 @@ const (
loopbackAddr = "127.0.0.1"
)
var (
localHostNetIP = net.ParseIP("127.0.0.1")
)
// WGEBPFProxy definition for proxy with EBPF support
type WGEBPFProxy struct {
localWGListenPort int
@@ -64,7 +67,7 @@ func (p *WGEBPFProxy) Listen() error {
return err
}
p.rawConn, err = p.prepareSenderRawSocket()
p.rawConn, err = rawsocket.PrepareSenderRawSocket()
if err != nil {
return err
}
@@ -214,57 +217,17 @@ generatePort:
return p.lastUsedPort, nil
}
func (p *WGEBPFProxy) prepareSenderRawSocket() (net.PacketConn, error) {
// Create a raw socket.
fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_RAW)
if err != nil {
return nil, fmt.Errorf("creating raw socket failed: %w", err)
}
// Set the IP_HDRINCL option on the socket to tell the kernel that headers are included in the packet.
err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_HDRINCL, 1)
if err != nil {
return nil, fmt.Errorf("setting IP_HDRINCL failed: %w", err)
}
// Bind the socket to the "lo" interface.
err = syscall.SetsockoptString(fd, syscall.SOL_SOCKET, syscall.SO_BINDTODEVICE, "lo")
if err != nil {
return nil, fmt.Errorf("binding to lo interface failed: %w", err)
}
// Set the fwmark on the socket.
err = nbnet.SetSocketOpt(fd)
if err != nil {
return nil, fmt.Errorf("setting fwmark failed: %w", err)
}
// Convert the file descriptor to a PacketConn.
file := os.NewFile(uintptr(fd), fmt.Sprintf("fd %d", fd))
if file == nil {
return nil, fmt.Errorf("converting fd to file failed")
}
packetConn, err := net.FilePacketConn(file)
if err != nil {
return nil, fmt.Errorf("converting file to packet conn failed: %w", err)
}
return packetConn, nil
}
func (p *WGEBPFProxy) sendPkg(data []byte, port int) error {
localhost := net.ParseIP("127.0.0.1")
func (p *WGEBPFProxy) sendPkg(data []byte, endpointAddr *net.UDPAddr) error {
payload := gopacket.Payload(data)
ipH := &layers.IPv4{
DstIP: localhost,
SrcIP: localhost,
DstIP: localHostNetIP,
SrcIP: endpointAddr.IP,
Version: 4,
TTL: 64,
Protocol: layers.IPProtocolUDP,
}
udpH := &layers.UDP{
SrcPort: layers.UDPPort(port),
SrcPort: layers.UDPPort(endpointAddr.Port),
DstPort: layers.UDPPort(p.localWGListenPort),
}
@@ -279,7 +242,7 @@ func (p *WGEBPFProxy) sendPkg(data []byte, port int) error {
if err != nil {
return fmt.Errorf("serialize layers: %w", err)
}
if _, err = p.rawConn.WriteTo(layerBuffer.Bytes(), &net.IPAddr{IP: localhost}); err != nil {
if _, err = p.rawConn.WriteTo(layerBuffer.Bytes(), &net.IPAddr{IP: localHostNetIP}); err != nil {
return fmt.Errorf("write to raw conn: %w", err)
}
return nil

View File

@@ -18,41 +18,42 @@ import (
// ProxyWrapper help to keep the remoteConn instance for net.Conn.Close function call
type ProxyWrapper struct {
WgeBPFProxy *WGEBPFProxy
wgeBPFProxy *WGEBPFProxy
remoteConn net.Conn
ctx context.Context
cancel context.CancelFunc
wgEndpointAddr *net.UDPAddr
wgRelayedEndpointAddr *net.UDPAddr
wgEndpointCurrentUsedAddr *net.UDPAddr
pausedMu sync.Mutex
paused bool
isStarted bool
paused bool
pausedCond *sync.Cond
isStarted bool
closeListener *listener.CloseListener
}
func NewProxyWrapper(WgeBPFProxy *WGEBPFProxy) *ProxyWrapper {
func NewProxyWrapper(proxy *WGEBPFProxy) *ProxyWrapper {
return &ProxyWrapper{
WgeBPFProxy: WgeBPFProxy,
wgeBPFProxy: proxy,
pausedCond: sync.NewCond(&sync.Mutex{}),
closeListener: listener.NewCloseListener(),
}
}
func (p *ProxyWrapper) AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, remoteConn net.Conn) error {
addr, err := p.WgeBPFProxy.AddTurnConn(remoteConn)
addr, err := p.wgeBPFProxy.AddTurnConn(remoteConn)
if err != nil {
return fmt.Errorf("add turn conn: %w", err)
}
p.remoteConn = remoteConn
p.ctx, p.cancel = context.WithCancel(ctx)
p.wgEndpointAddr = addr
p.wgRelayedEndpointAddr = addr
return err
}
func (p *ProxyWrapper) EndpointAddr() *net.UDPAddr {
return p.wgEndpointAddr
return p.wgRelayedEndpointAddr
}
func (p *ProxyWrapper) SetDisconnectListener(disconnected func()) {
@@ -64,14 +65,19 @@ func (p *ProxyWrapper) Work() {
return
}
p.pausedMu.Lock()
p.pausedCond.L.Lock()
p.paused = false
p.pausedMu.Unlock()
p.wgEndpointCurrentUsedAddr = p.wgRelayedEndpointAddr
if !p.isStarted {
p.isStarted = true
go p.proxyToLocal(p.ctx)
}
p.pausedCond.L.Unlock()
// todo: review to should be inside the lock scope
p.pausedCond.Signal()
}
func (p *ProxyWrapper) Pause() {
@@ -80,45 +86,59 @@ func (p *ProxyWrapper) Pause() {
}
log.Tracef("pause proxy reading from: %s", p.remoteConn.RemoteAddr())
p.pausedMu.Lock()
p.pausedCond.L.Lock()
p.paused = true
p.pausedMu.Unlock()
p.pausedCond.L.Unlock()
}
func (p *ProxyWrapper) RedirectAs(endpoint *net.UDPAddr) {
p.pausedCond.L.Lock()
p.paused = false
p.wgEndpointCurrentUsedAddr = endpoint
p.pausedCond.L.Unlock()
p.pausedCond.Signal()
}
// CloseConn close the remoteConn and automatically remove the conn instance from the map
func (e *ProxyWrapper) CloseConn() error {
if e.cancel == nil {
func (p *ProxyWrapper) CloseConn() error {
if p.cancel == nil {
return fmt.Errorf("proxy not started")
}
e.cancel()
p.cancel()
e.closeListener.SetCloseListener(nil)
p.closeListener.SetCloseListener(nil)
if err := e.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
return fmt.Errorf("close remote conn: %w", err)
p.pausedCond.L.Lock()
p.paused = false
p.pausedCond.L.Unlock()
p.pausedCond.Signal()
if err := p.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
return fmt.Errorf("failed to close remote conn: %w", err)
}
return nil
}
func (p *ProxyWrapper) proxyToLocal(ctx context.Context) {
defer p.WgeBPFProxy.removeTurnConn(uint16(p.wgEndpointAddr.Port))
defer p.wgeBPFProxy.removeTurnConn(uint16(p.wgRelayedEndpointAddr.Port))
buf := make([]byte, p.WgeBPFProxy.mtu+bufsize.WGBufferOverhead)
buf := make([]byte, p.wgeBPFProxy.mtu+bufsize.WGBufferOverhead)
for {
n, err := p.readFromRemote(ctx, buf)
if err != nil {
return
}
p.pausedMu.Lock()
if p.paused {
p.pausedMu.Unlock()
continue
p.pausedCond.L.Lock()
for p.paused {
p.pausedCond.Wait()
}
err = p.WgeBPFProxy.sendPkg(buf[:n], p.wgEndpointAddr.Port)
p.pausedMu.Unlock()
err = p.wgeBPFProxy.sendPkg(buf[:n], p.wgEndpointCurrentUsedAddr)
p.pausedCond.L.Unlock()
if err != nil {
if ctx.Err() != nil {
@@ -137,7 +157,7 @@ func (p *ProxyWrapper) readFromRemote(ctx context.Context, buf []byte) (int, err
}
p.closeListener.Notify()
if !errors.Is(err, io.EOF) {
log.Errorf("failed to read from turn conn (endpoint: :%d): %s", p.wgEndpointAddr.Port, err)
log.Errorf("failed to read from turn conn (endpoint: :%d): %s", p.wgRelayedEndpointAddr.Port, err)
}
return 0, err
}

View File

@@ -39,7 +39,6 @@ func (w *KernelFactory) GetProxy() Proxy {
}
return ebpf.NewProxyWrapper(w.ebpfProxy)
}
func (w *KernelFactory) Free() error {

View File

@@ -1,31 +0,0 @@
package wgproxy
import (
log "github.com/sirupsen/logrus"
udpProxy "github.com/netbirdio/netbird/client/iface/wgproxy/udp"
)
// KernelFactory todo: check eBPF support on FreeBSD
type KernelFactory struct {
wgPort int
mtu uint16
}
func NewKernelFactory(wgPort int, mtu uint16) *KernelFactory {
log.Infof("WireGuard Proxy Factory will produce UDP proxy")
f := &KernelFactory{
wgPort: wgPort,
mtu: mtu,
}
return f
}
func (w *KernelFactory) GetProxy() Proxy {
return udpProxy.NewWGUDPProxy(w.wgPort, w.mtu)
}
func (w *KernelFactory) Free() error {
return nil
}

View File

@@ -11,6 +11,12 @@ type Proxy interface {
EndpointAddr() *net.UDPAddr // EndpointAddr returns the address of the WireGuard peer endpoint
Work() // Work start or resume the proxy
Pause() // Pause to forward the packages from remote connection to WireGuard. The opposite way still works.
/*
RedirectAs resume the forwarding the packages from relayed connection to WireGuard interface if it was paused
and rewrite the src address to the endpoint address.
With this logic can avoid the package loss from relayed connections.
*/
RedirectAs(endpoint *net.UDPAddr)
CloseConn() error
SetDisconnectListener(disconnected func())
}

View File

@@ -3,54 +3,82 @@
package wgproxy
import (
"context"
"os"
"testing"
"fmt"
"net"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/wgaddr"
bindproxy "github.com/netbirdio/netbird/client/iface/wgproxy/bind"
"github.com/netbirdio/netbird/client/iface/wgproxy/ebpf"
"github.com/netbirdio/netbird/client/iface/wgproxy/udp"
)
func TestProxyCloseByRemoteConnEBPF(t *testing.T) {
if os.Getenv("GITHUB_ACTIONS") != "true" {
t.Skip("Skipping test as it requires root privileges")
}
ctx := context.Background()
func seedProxies() ([]proxyInstance, error) {
pl := make([]proxyInstance, 0)
ebpfProxy := ebpf.NewWGEBPFProxy(51831, 1280)
if err := ebpfProxy.Listen(); err != nil {
t.Fatalf("failed to initialize ebpf proxy: %s", err)
return nil, fmt.Errorf("failed to initialize ebpf proxy: %s", err)
}
defer func() {
if err := ebpfProxy.Free(); err != nil {
t.Errorf("failed to free ebpf proxy: %s", err)
}
}()
tests := []struct {
name string
proxy Proxy
}{
{
name: "ebpf proxy",
proxy: &ebpf.ProxyWrapper{
WgeBPFProxy: ebpfProxy,
},
},
pEbpf := proxyInstance{
name: "ebpf kernel proxy",
proxy: ebpf.NewProxyWrapper(ebpfProxy),
wgPort: 51831,
closeFn: ebpfProxy.Free,
}
pl = append(pl, pEbpf)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
relayedConn := newMockConn()
err := tt.proxy.AddTurnConn(ctx, nil, relayedConn)
if err != nil {
t.Errorf("error: %v", err)
}
_ = relayedConn.Close()
if err := tt.proxy.CloseConn(); err != nil {
t.Errorf("error: %v", err)
}
})
pUDP := proxyInstance{
name: "udp kernel proxy",
proxy: udp.NewWGUDPProxy(51832, 1280),
wgPort: 51832,
closeFn: func() error { return nil },
}
pl = append(pl, pUDP)
return pl, nil
}
func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
pl := make([]proxyInstance, 0)
ebpfProxy := ebpf.NewWGEBPFProxy(51831, 1280)
if err := ebpfProxy.Listen(); err != nil {
return nil, fmt.Errorf("failed to initialize ebpf proxy: %s", err)
}
pEbpf := proxyInstance{
name: "ebpf kernel proxy",
proxy: ebpf.NewProxyWrapper(ebpfProxy),
wgPort: 51831,
closeFn: ebpfProxy.Free,
}
pl = append(pl, pEbpf)
pUDP := proxyInstance{
name: "udp kernel proxy",
proxy: udp.NewWGUDPProxy(51832, 1280),
wgPort: 51832,
closeFn: func() error { return nil },
}
pl = append(pl, pUDP)
wgAddress, err := wgaddr.ParseWGAddress("10.0.0.1/32")
if err != nil {
return nil, err
}
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
endpointAddress := &net.UDPAddr{
IP: net.IPv4(10, 0, 0, 1),
Port: 1234,
}
pBind := proxyInstance{
name: "bind proxy",
proxy: bindproxy.NewProxyBind(iceBind),
endpointAddr: endpointAddress,
closeFn: func() error { return nil },
}
pl = append(pl, pBind)
return pl, nil
}

View File

@@ -0,0 +1,39 @@
//go:build !linux
package wgproxy
import (
"net"
"github.com/netbirdio/netbird/client/iface/bind"
"github.com/netbirdio/netbird/client/iface/wgaddr"
bindproxy "github.com/netbirdio/netbird/client/iface/wgproxy/bind"
)
func seedProxies() ([]proxyInstance, error) {
// todo extend with Bind proxy
pl := make([]proxyInstance, 0)
return pl, nil
}
func seedProxyForProxyCloseByRemoteConn() ([]proxyInstance, error) {
pl := make([]proxyInstance, 0)
wgAddress, err := wgaddr.ParseWGAddress("10.0.0.1/32")
if err != nil {
return nil, err
}
iceBind := bind.NewICEBind(nil, nil, wgAddress, 1280)
endpointAddress := &net.UDPAddr{
IP: net.IPv4(10, 0, 0, 1),
Port: 1234,
}
pBind := proxyInstance{
name: "bind proxy",
proxy: bindproxy.NewProxyBind(iceBind),
endpointAddr: endpointAddress,
closeFn: func() error { return nil },
}
pl = append(pl, pBind)
return pl, nil
}

View File

@@ -1,5 +1,3 @@
//go:build linux
package wgproxy
import (
@@ -7,12 +5,9 @@ import (
"io"
"net"
"os"
"runtime"
"testing"
"time"
"github.com/netbirdio/netbird/client/iface/wgproxy/ebpf"
udpProxy "github.com/netbirdio/netbird/client/iface/wgproxy/udp"
"github.com/netbirdio/netbird/util"
)
@@ -22,6 +17,14 @@ func TestMain(m *testing.M) {
os.Exit(code)
}
type proxyInstance struct {
name string
proxy Proxy
wgPort int
endpointAddr *net.UDPAddr
closeFn func() error
}
type mocConn struct {
closeChan chan struct{}
closed bool
@@ -78,41 +81,21 @@ func (m *mocConn) SetWriteDeadline(t time.Time) error {
func TestProxyCloseByRemoteConn(t *testing.T) {
ctx := context.Background()
tests := []struct {
name string
proxy Proxy
}{
{
name: "userspace proxy",
proxy: udpProxy.NewWGUDPProxy(51830, 1280),
},
tests, err := seedProxyForProxyCloseByRemoteConn()
if err != nil {
t.Fatalf("error: %v", err)
}
if runtime.GOOS == "linux" && os.Getenv("GITHUB_ACTIONS") != "true" {
ebpfProxy := ebpf.NewWGEBPFProxy(51831, 1280)
if err := ebpfProxy.Listen(); err != nil {
t.Fatalf("failed to initialize ebpf proxy: %s", err)
}
defer func() {
if err := ebpfProxy.Free(); err != nil {
t.Errorf("failed to free ebpf proxy: %s", err)
}
}()
proxyWrapper := ebpf.NewProxyWrapper(ebpfProxy)
tests = append(tests, struct {
name string
proxy Proxy
}{
name: "ebpf proxy",
proxy: proxyWrapper,
})
}
relayedConn, _ := net.Dial("udp", "127.0.0.1:1234")
defer func() {
_ = relayedConn.Close()
}()
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
addr, _ := net.ResolveUDPAddr("udp", "100.108.135.221:51892")
relayedConn := newMockConn()
err := tt.proxy.AddTurnConn(ctx, nil, relayedConn)
err := tt.proxy.AddTurnConn(ctx, addr, relayedConn)
if err != nil {
t.Errorf("error: %v", err)
}
@@ -124,3 +107,104 @@ func TestProxyCloseByRemoteConn(t *testing.T) {
})
}
}
// TestProxyRedirect todo extend the proxies with Bind proxy
func TestProxyRedirect(t *testing.T) {
tests, err := seedProxies()
if err != nil {
t.Fatalf("error: %v", err)
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
redirectTraffic(t, tt.proxy, tt.wgPort, tt.endpointAddr)
if err := tt.closeFn(); err != nil {
t.Errorf("error: %v", err)
}
})
}
}
func redirectTraffic(t *testing.T, proxy Proxy, wgPort int, endPointAddr *net.UDPAddr) {
t.Helper()
msgHelloFromRelay := []byte("hello from relay")
msgRedirected := [][]byte{
[]byte("hello 1. to p2p"),
[]byte("hello 2. to p2p"),
[]byte("hello 3. to p2p"),
}
dummyWgListener, err := net.ListenUDP("udp", &net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: wgPort})
if err != nil {
t.Fatalf("failed to listen on udp port: %s", err)
}
relayedServer, _ := net.ListenUDP("udp",
&net.UDPAddr{
IP: net.IPv4(127, 0, 0, 1),
Port: 1234,
},
)
relayedConn, _ := net.Dial("udp", "127.0.0.1:1234")
defer func() {
_ = dummyWgListener.Close()
_ = relayedConn.Close()
_ = relayedServer.Close()
}()
if err := proxy.AddTurnConn(context.Background(), endPointAddr, relayedConn); err != nil {
t.Errorf("error: %v", err)
}
defer func() {
if err := proxy.CloseConn(); err != nil {
t.Errorf("error: %v", err)
}
}()
proxy.Work()
if _, err := relayedServer.WriteTo(msgHelloFromRelay, relayedConn.LocalAddr()); err != nil {
t.Errorf("error relayedServer.Write(msgHelloFromRelay): %v", err)
}
n, err := dummyWgListener.Read(make([]byte, 1024))
if err != nil {
t.Errorf("error: %v", err)
}
if n != len(msgHelloFromRelay) {
t.Errorf("expected %d bytes, got %d", len(msgHelloFromRelay), n)
}
p2pEndpointAddr := &net.UDPAddr{
IP: net.IPv4(192, 168, 0, 56),
Port: 1234,
}
proxy.RedirectAs(p2pEndpointAddr)
for _, msg := range msgRedirected {
if _, err := relayedServer.WriteTo(msg, relayedConn.LocalAddr()); err != nil {
t.Errorf("error: %v", err)
}
}
for i := 0; i < len(msgRedirected); i++ {
buf := make([]byte, 1024)
n, rAddr, err := dummyWgListener.ReadFrom(buf)
if err != nil {
t.Errorf("error: %v", err)
}
if rAddr.String() != p2pEndpointAddr.String() {
t.Errorf("expected %s, got %s", p2pEndpointAddr.String(), rAddr.String())
}
if string(buf[:n]) != string(msgRedirected[i]) {
t.Errorf("expected %s, got %s", string(msgRedirected[i]), string(buf[:n]))
}
}
}

View File

@@ -0,0 +1,50 @@
//go:build linux && !android
package rawsocket
import (
"fmt"
"net"
"os"
"syscall"
nbnet "github.com/netbirdio/netbird/util/net"
)
func PrepareSenderRawSocket() (net.PacketConn, error) {
// Create a raw socket.
fd, err := syscall.Socket(syscall.AF_INET, syscall.SOCK_RAW, syscall.IPPROTO_RAW)
if err != nil {
return nil, fmt.Errorf("creating raw socket failed: %w", err)
}
// Set the IP_HDRINCL option on the socket to tell the kernel that headers are included in the packet.
err = syscall.SetsockoptInt(fd, syscall.IPPROTO_IP, syscall.IP_HDRINCL, 1)
if err != nil {
return nil, fmt.Errorf("setting IP_HDRINCL failed: %w", err)
}
// Bind the socket to the "lo" interface.
err = syscall.SetsockoptString(fd, syscall.SOL_SOCKET, syscall.SO_BINDTODEVICE, "lo")
if err != nil {
return nil, fmt.Errorf("binding to lo interface failed: %w", err)
}
// Set the fwmark on the socket.
err = nbnet.SetSocketOpt(fd)
if err != nil {
return nil, fmt.Errorf("setting fwmark failed: %w", err)
}
// Convert the file descriptor to a PacketConn.
file := os.NewFile(uintptr(fd), fmt.Sprintf("fd %d", fd))
if file == nil {
return nil, fmt.Errorf("converting fd to file failed")
}
packetConn, err := net.FilePacketConn(file)
if err != nil {
return nil, fmt.Errorf("converting file to packet conn failed: %w", err)
}
return packetConn, nil
}

View File

@@ -1,3 +1,5 @@
//go:build linux && !android
package udp
import (
@@ -21,16 +23,18 @@ type WGUDPProxy struct {
localWGListenPort int
mtu uint16
remoteConn net.Conn
localConn net.Conn
ctx context.Context
cancel context.CancelFunc
closeMu sync.Mutex
closed bool
remoteConn net.Conn
localConn net.Conn
srcFakerConn *SrcFaker
sendPkg func(data []byte) (int, error)
ctx context.Context
cancel context.CancelFunc
closeMu sync.Mutex
closed bool
pausedMu sync.Mutex
paused bool
isStarted bool
paused bool
pausedCond *sync.Cond
isStarted bool
closeListener *listener.CloseListener
}
@@ -41,6 +45,7 @@ func NewWGUDPProxy(wgPort int, mtu uint16) *WGUDPProxy {
p := &WGUDPProxy{
localWGListenPort: wgPort,
mtu: mtu,
pausedCond: sync.NewCond(&sync.Mutex{}),
closeListener: listener.NewCloseListener(),
}
return p
@@ -61,6 +66,7 @@ func (p *WGUDPProxy) AddTurnConn(ctx context.Context, endpoint *net.UDPAddr, rem
p.ctx, p.cancel = context.WithCancel(ctx)
p.localConn = localConn
p.sendPkg = p.localConn.Write
p.remoteConn = remoteConn
return err
@@ -84,15 +90,24 @@ func (p *WGUDPProxy) Work() {
return
}
p.pausedMu.Lock()
p.pausedCond.L.Lock()
p.paused = false
p.pausedMu.Unlock()
p.sendPkg = p.localConn.Write
if p.srcFakerConn != nil {
if err := p.srcFakerConn.Close(); err != nil {
log.Errorf("failed to close src faker conn: %s", err)
}
p.srcFakerConn = nil
}
if !p.isStarted {
p.isStarted = true
go p.proxyToRemote(p.ctx)
go p.proxyToLocal(p.ctx)
}
p.pausedCond.L.Unlock()
p.pausedCond.Signal()
}
// Pause pauses the proxy from receiving data from the remote peer
@@ -101,9 +116,35 @@ func (p *WGUDPProxy) Pause() {
return
}
p.pausedMu.Lock()
p.pausedCond.L.Lock()
p.paused = true
p.pausedMu.Unlock()
p.pausedCond.L.Unlock()
}
// RedirectAs start to use the fake sourced raw socket as package sender
func (p *WGUDPProxy) RedirectAs(endpoint *net.UDPAddr) {
p.pausedCond.L.Lock()
defer func() {
p.pausedCond.L.Unlock()
p.pausedCond.Signal()
}()
p.paused = false
if p.srcFakerConn != nil {
if err := p.srcFakerConn.Close(); err != nil {
log.Errorf("failed to close src faker conn: %s", err)
}
p.srcFakerConn = nil
}
srcFakerConn, err := NewSrcFaker(p.localWGListenPort, endpoint)
if err != nil {
log.Errorf("failed to create src faker conn: %s", err)
// fallback to continue without redirecting
p.paused = true
return
}
p.srcFakerConn = srcFakerConn
p.sendPkg = p.srcFakerConn.SendPkg
}
// CloseConn close the localConn
@@ -115,6 +156,8 @@ func (p *WGUDPProxy) CloseConn() error {
}
func (p *WGUDPProxy) close() error {
var result *multierror.Error
p.closeMu.Lock()
defer p.closeMu.Unlock()
@@ -128,7 +171,11 @@ func (p *WGUDPProxy) close() error {
p.cancel()
var result *multierror.Error
p.pausedCond.L.Lock()
p.paused = false
p.pausedCond.L.Unlock()
p.pausedCond.Signal()
if err := p.remoteConn.Close(); err != nil && !errors.Is(err, net.ErrClosed) {
result = multierror.Append(result, fmt.Errorf("remote conn: %s", err))
}
@@ -136,6 +183,13 @@ func (p *WGUDPProxy) close() error {
if err := p.localConn.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("local conn: %s", err))
}
if p.srcFakerConn != nil {
if err := p.srcFakerConn.Close(); err != nil {
result = multierror.Append(result, fmt.Errorf("src faker raw conn: %s", err))
}
}
return cerrors.FormatErrorOrNil(result)
}
@@ -194,14 +248,12 @@ func (p *WGUDPProxy) proxyToLocal(ctx context.Context) {
return
}
p.pausedMu.Lock()
if p.paused {
p.pausedMu.Unlock()
continue
p.pausedCond.L.Lock()
for p.paused {
p.pausedCond.Wait()
}
_, err = p.localConn.Write(buf[:n])
p.pausedMu.Unlock()
_, err = p.sendPkg(buf[:n])
p.pausedCond.L.Unlock()
if err != nil {
if ctx.Err() != nil {

View File

@@ -0,0 +1,101 @@
//go:build linux && !android
package udp
import (
"fmt"
"net"
"github.com/google/gopacket"
"github.com/google/gopacket/layers"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/wgproxy/rawsocket"
)
var (
serializeOpts = gopacket.SerializeOptions{
ComputeChecksums: true,
FixLengths: true,
}
localHostNetIPAddr = &net.IPAddr{
IP: net.ParseIP("127.0.0.1"),
}
)
type SrcFaker struct {
srcAddr *net.UDPAddr
rawSocket net.PacketConn
ipH gopacket.SerializableLayer
udpH gopacket.SerializableLayer
layerBuffer gopacket.SerializeBuffer
}
func NewSrcFaker(dstPort int, srcAddr *net.UDPAddr) (*SrcFaker, error) {
rawSocket, err := rawsocket.PrepareSenderRawSocket()
if err != nil {
return nil, err
}
ipH, udpH, err := prepareHeaders(dstPort, srcAddr)
if err != nil {
return nil, err
}
f := &SrcFaker{
srcAddr: srcAddr,
rawSocket: rawSocket,
ipH: ipH,
udpH: udpH,
layerBuffer: gopacket.NewSerializeBuffer(),
}
return f, nil
}
func (f *SrcFaker) Close() error {
return f.rawSocket.Close()
}
func (f *SrcFaker) SendPkg(data []byte) (int, error) {
defer func() {
if err := f.layerBuffer.Clear(); err != nil {
log.Errorf("failed to clear layer buffer: %s", err)
}
}()
payload := gopacket.Payload(data)
err := gopacket.SerializeLayers(f.layerBuffer, serializeOpts, f.ipH, f.udpH, payload)
if err != nil {
return 0, fmt.Errorf("serialize layers: %w", err)
}
n, err := f.rawSocket.WriteTo(f.layerBuffer.Bytes(), localHostNetIPAddr)
if err != nil {
return 0, fmt.Errorf("write to raw conn: %w", err)
}
return n, nil
}
func prepareHeaders(dstPort int, srcAddr *net.UDPAddr) (gopacket.SerializableLayer, gopacket.SerializableLayer, error) {
ipH := &layers.IPv4{
DstIP: net.ParseIP("127.0.0.1"),
SrcIP: srcAddr.IP,
Version: 4,
TTL: 64,
Protocol: layers.IPProtocolUDP,
}
udpH := &layers.UDP{
SrcPort: layers.UDPPort(srcAddr.Port),
DstPort: layers.UDPPort(dstPort), // dst is the localhost WireGuard port
}
err := udpH.SetNetworkLayerForChecksum(ipH)
if err != nil {
return nil, nil, fmt.Errorf("set network layer for checksum: %w", err)
}
return ipH, udpH, nil
}

View File

@@ -275,11 +275,12 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
c.engine.SetSyncResponsePersistence(c.persistSyncResponse)
c.engineMutex.Unlock()
if err := c.engine.Start(); err != nil {
if err := c.engine.Start(loginResp.GetNetbirdConfig(), c.config.ManagementURL); err != nil {
log.Errorf("error while starting Netbird Connection Engine: %s", err)
return wrapErr(err)
}
log.Infof("Netbird engine started, the IP is: %s", peerConfig.GetAddress())
state.Set(StatusConnected)

View File

@@ -0,0 +1,201 @@
package config
import (
"errors"
"fmt"
"net"
"net/netip"
"net/url"
"strings"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/shared/management/domain"
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
)
var (
ErrEmptyURL = errors.New("empty URL")
ErrEmptyHost = errors.New("empty host")
ErrIPNotAllowed = errors.New("IP address not allowed")
)
// ServerDomains represents the management server domains extracted from NetBird configuration
type ServerDomains struct {
Signal domain.Domain
Relay []domain.Domain
Flow domain.Domain
Stuns []domain.Domain
Turns []domain.Domain
}
// ExtractFromNetbirdConfig extracts domain information from NetBird protobuf configuration
func ExtractFromNetbirdConfig(config *mgmProto.NetbirdConfig) ServerDomains {
if config == nil {
return ServerDomains{}
}
domains := ServerDomains{}
domains.Signal = extractSignalDomain(config)
domains.Relay = extractRelayDomains(config)
domains.Flow = extractFlowDomain(config)
domains.Stuns = extractStunDomains(config)
domains.Turns = extractTurnDomains(config)
return domains
}
// ExtractValidDomain extracts a valid domain from a URL, filtering out IP addresses
func ExtractValidDomain(rawURL string) (domain.Domain, error) {
if rawURL == "" {
return "", ErrEmptyURL
}
parsedURL, err := url.Parse(rawURL)
if err == nil {
if domain, err := extractFromParsedURL(parsedURL); err != nil || domain != "" {
return domain, err
}
}
return extractFromRawString(rawURL)
}
// extractFromParsedURL handles domain extraction from successfully parsed URLs
func extractFromParsedURL(parsedURL *url.URL) (domain.Domain, error) {
if parsedURL.Hostname() != "" {
return extractDomainFromHost(parsedURL.Hostname())
}
if parsedURL.Opaque == "" || parsedURL.Scheme == "" {
return "", nil
}
// Handle URLs with opaque content (e.g., stun:host:port)
if strings.Contains(parsedURL.Scheme, ".") {
// This is likely "domain.com:port" being parsed as scheme:opaque
reconstructed := parsedURL.Scheme + ":" + parsedURL.Opaque
if host, _, err := net.SplitHostPort(reconstructed); err == nil {
return extractDomainFromHost(host)
}
return extractDomainFromHost(parsedURL.Scheme)
}
// Valid scheme with opaque content (e.g., stun:host:port)
host := parsedURL.Opaque
if queryIndex := strings.Index(host, "?"); queryIndex > 0 {
host = host[:queryIndex]
}
if hostOnly, _, err := net.SplitHostPort(host); err == nil {
return extractDomainFromHost(hostOnly)
}
return extractDomainFromHost(host)
}
// extractFromRawString handles domain extraction when URL parsing fails or returns no results
func extractFromRawString(rawURL string) (domain.Domain, error) {
if host, _, err := net.SplitHostPort(rawURL); err == nil {
return extractDomainFromHost(host)
}
return extractDomainFromHost(rawURL)
}
// extractDomainFromHost extracts domain from a host string, filtering out IP addresses
func extractDomainFromHost(host string) (domain.Domain, error) {
if host == "" {
return "", ErrEmptyHost
}
if _, err := netip.ParseAddr(host); err == nil {
return "", fmt.Errorf("%w: %s", ErrIPNotAllowed, host)
}
d, err := domain.FromString(host)
if err != nil {
return "", fmt.Errorf("invalid domain: %v", err)
}
return d, nil
}
// extractSingleDomain extracts a single domain from a URL with error logging
func extractSingleDomain(url, serviceType string) domain.Domain {
if url == "" {
return ""
}
d, err := ExtractValidDomain(url)
if err != nil {
log.Debugf("Skipping %s: %v", serviceType, err)
return ""
}
return d
}
// extractMultipleDomains extracts multiple domains from URLs with error logging
func extractMultipleDomains(urls []string, serviceType string) []domain.Domain {
var domains []domain.Domain
for _, url := range urls {
if url == "" {
continue
}
d, err := ExtractValidDomain(url)
if err != nil {
log.Debugf("Skipping %s: %v", serviceType, err)
continue
}
domains = append(domains, d)
}
return domains
}
// extractSignalDomain extracts the signal domain from NetBird configuration.
func extractSignalDomain(config *mgmProto.NetbirdConfig) domain.Domain {
if config.Signal != nil {
return extractSingleDomain(config.Signal.Uri, "signal")
}
return ""
}
// extractRelayDomains extracts relay server domains from NetBird configuration.
func extractRelayDomains(config *mgmProto.NetbirdConfig) []domain.Domain {
if config.Relay != nil {
return extractMultipleDomains(config.Relay.Urls, "relay")
}
return nil
}
// extractFlowDomain extracts the traffic flow domain from NetBird configuration.
func extractFlowDomain(config *mgmProto.NetbirdConfig) domain.Domain {
if config.Flow != nil {
return extractSingleDomain(config.Flow.Url, "flow")
}
return ""
}
// extractStunDomains extracts STUN server domains from NetBird configuration.
func extractStunDomains(config *mgmProto.NetbirdConfig) []domain.Domain {
var urls []string
for _, stun := range config.Stuns {
if stun != nil && stun.Uri != "" {
urls = append(urls, stun.Uri)
}
}
return extractMultipleDomains(urls, "STUN")
}
// extractTurnDomains extracts TURN server domains from NetBird configuration.
func extractTurnDomains(config *mgmProto.NetbirdConfig) []domain.Domain {
var urls []string
for _, turn := range config.Turns {
if turn != nil && turn.HostConfig != nil && turn.HostConfig.Uri != "" {
urls = append(urls, turn.HostConfig.Uri)
}
}
return extractMultipleDomains(urls, "TURN")
}

View File

@@ -0,0 +1,213 @@
package config
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestExtractValidDomain(t *testing.T) {
tests := []struct {
name string
url string
expected string
expectError bool
}{
{
name: "HTTPS URL with port",
url: "https://api.netbird.io:443",
expected: "api.netbird.io",
},
{
name: "HTTP URL without port",
url: "http://signal.example.com",
expected: "signal.example.com",
},
{
name: "Host with port (no scheme)",
url: "signal.netbird.io:443",
expected: "signal.netbird.io",
},
{
name: "STUN URL",
url: "stun:stun.netbird.io:443",
expected: "stun.netbird.io",
},
{
name: "STUN URL with different port",
url: "stun:stun.netbird.io:5555",
expected: "stun.netbird.io",
},
{
name: "TURNS URL with query params",
url: "turns:turn.netbird.io:443?transport=tcp",
expected: "turn.netbird.io",
},
{
name: "TURN URL",
url: "turn:turn.example.com:3478",
expected: "turn.example.com",
},
{
name: "REL URL",
url: "rel://relay.example.com:443",
expected: "relay.example.com",
},
{
name: "RELS URL",
url: "rels://relay.netbird.io:443",
expected: "relay.netbird.io",
},
{
name: "Raw hostname",
url: "example.org",
expected: "example.org",
},
{
name: "IP address should be rejected",
url: "192.168.1.1",
expectError: true,
},
{
name: "IP address with port should be rejected",
url: "192.168.1.1:443",
expectError: true,
},
{
name: "IPv6 address should be rejected",
url: "2001:db8::1",
expectError: true,
},
{
name: "HTTP URL with IPv4 should be rejected",
url: "http://192.168.1.1:8080",
expectError: true,
},
{
name: "HTTPS URL with IPv4 should be rejected",
url: "https://10.0.0.1:443",
expectError: true,
},
{
name: "STUN URL with IPv4 should be rejected",
url: "stun:192.168.1.1:3478",
expectError: true,
},
{
name: "TURN URL with IPv4 should be rejected",
url: "turn:10.0.0.1:3478",
expectError: true,
},
{
name: "TURNS URL with IPv4 should be rejected",
url: "turns:172.16.0.1:5349",
expectError: true,
},
{
name: "HTTP URL with IPv6 should be rejected",
url: "http://[2001:db8::1]:8080",
expectError: true,
},
{
name: "HTTPS URL with IPv6 should be rejected",
url: "https://[::1]:443",
expectError: true,
},
{
name: "STUN URL with IPv6 should be rejected",
url: "stun:[2001:db8::1]:3478",
expectError: true,
},
{
name: "IPv6 with port should be rejected",
url: "[2001:db8::1]:443",
expectError: true,
},
{
name: "Localhost IPv4 should be rejected",
url: "127.0.0.1:8080",
expectError: true,
},
{
name: "Localhost IPv6 should be rejected",
url: "[::1]:443",
expectError: true,
},
{
name: "REL URL with IPv4 should be rejected",
url: "rel://192.168.1.1:443",
expectError: true,
},
{
name: "RELS URL with IPv4 should be rejected",
url: "rels://10.0.0.1:443",
expectError: true,
},
{
name: "Empty URL",
url: "",
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := ExtractValidDomain(tt.url)
if tt.expectError {
assert.Error(t, err, "Expected error for URL: %s", tt.url)
} else {
assert.NoError(t, err, "Unexpected error for URL: %s", tt.url)
assert.Equal(t, tt.expected, result.SafeString(), "Domain mismatch for URL: %s", tt.url)
}
})
}
}
func TestExtractDomainFromHost(t *testing.T) {
tests := []struct {
name string
host string
expected string
expectError bool
}{
{
name: "Valid domain",
host: "example.com",
expected: "example.com",
},
{
name: "Subdomain",
host: "api.example.com",
expected: "api.example.com",
},
{
name: "IPv4 address",
host: "192.168.1.1",
expectError: true,
},
{
name: "IPv6 address",
host: "2001:db8::1",
expectError: true,
},
{
name: "Empty host",
host: "",
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := extractDomainFromHost(tt.host)
if tt.expectError {
assert.Error(t, err, "Expected error for host: %s", tt.host)
} else {
assert.NoError(t, err, "Unexpected error for host: %s", tt.host)
assert.Equal(t, tt.expected, result.SafeString(), "Domain mismatch for host: %s", tt.host)
}
})
}
}

View File

@@ -11,11 +11,12 @@ import (
)
const (
PriorityLocal = 100
PriorityDNSRoute = 75
PriorityUpstream = 50
PriorityDefault = 1
PriorityFallback = -100
PriorityMgmtCache = 150
PriorityLocal = 100
PriorityDNSRoute = 75
PriorityUpstream = 50
PriorityDefault = 1
PriorityFallback = -100
)
type SubdomainMatcher interface {
@@ -182,7 +183,10 @@ func (c *HandlerChain) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
// If handler wants to continue, try next handler
if chainWriter.shouldContinue {
log.Tracef("handler requested continue to next handler for domain=%s", qname)
// Only log continue for non-management cache handlers to reduce noise
if entry.Priority != PriorityMgmtCache {
log.Tracef("handler requested continue to next handler for domain=%s", qname)
}
continue
}
return

View File

@@ -34,7 +34,7 @@ func (d *Resolver) MatchSubdomains() bool {
// String returns a string representation of the local resolver
func (d *Resolver) String() string {
return fmt.Sprintf("local resolver [%d records]", len(d.records))
return fmt.Sprintf("LocalResolver [%d records]", len(d.records))
}
func (d *Resolver) Stop() {}

View File

@@ -0,0 +1,360 @@
package mgmt
import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/miekg/dns"
log "github.com/sirupsen/logrus"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/shared/management/domain"
)
const dnsTimeout = 5 * time.Second
// Resolver caches critical NetBird infrastructure domains
type Resolver struct {
records map[dns.Question][]dns.RR
mgmtDomain *domain.Domain
serverDomains *dnsconfig.ServerDomains
mutex sync.RWMutex
}
// NewResolver creates a new management domains cache resolver.
func NewResolver() *Resolver {
return &Resolver{
records: make(map[dns.Question][]dns.RR),
}
}
// String returns a string representation of the resolver.
func (m *Resolver) String() string {
return "MgmtCacheResolver"
}
// ServeDNS implements dns.Handler interface.
func (m *Resolver) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
if len(r.Question) == 0 {
m.continueToNext(w, r)
return
}
question := r.Question[0]
question.Name = strings.ToLower(dns.Fqdn(question.Name))
if question.Qtype != dns.TypeA && question.Qtype != dns.TypeAAAA {
m.continueToNext(w, r)
return
}
m.mutex.RLock()
records, found := m.records[question]
m.mutex.RUnlock()
if !found {
m.continueToNext(w, r)
return
}
resp := &dns.Msg{}
resp.SetReply(r)
resp.Authoritative = false
resp.RecursionAvailable = true
resp.Answer = append(resp.Answer, records...)
log.Debugf("serving %d cached records for domain=%s", len(resp.Answer), question.Name)
if err := w.WriteMsg(resp); err != nil {
log.Errorf("failed to write response: %v", err)
}
}
// MatchSubdomains returns false since this resolver only handles exact domain matches
// for NetBird infrastructure domains (signal, relay, flow, etc.), not their subdomains.
func (m *Resolver) MatchSubdomains() bool {
return false
}
// continueToNext signals the handler chain to continue to the next handler.
func (m *Resolver) continueToNext(w dns.ResponseWriter, r *dns.Msg) {
resp := &dns.Msg{}
resp.SetRcode(r, dns.RcodeNameError)
resp.MsgHdr.Zero = true
if err := w.WriteMsg(resp); err != nil {
log.Errorf("failed to write continue signal: %v", err)
}
}
// AddDomain manually adds a domain to cache by resolving it.
func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
ctx, cancel := context.WithTimeout(ctx, dnsTimeout)
defer cancel()
ips, err := net.DefaultResolver.LookupNetIP(ctx, "ip", d.PunycodeString())
if err != nil {
return fmt.Errorf("resolve domain %s: %w", d.SafeString(), err)
}
var aRecords, aaaaRecords []dns.RR
for _, ip := range ips {
if ip.Is4() {
rr := &dns.A{
Hdr: dns.RR_Header{
Name: dnsName,
Rrtype: dns.TypeA,
Class: dns.ClassINET,
Ttl: 300,
},
A: ip.AsSlice(),
}
aRecords = append(aRecords, rr)
} else if ip.Is6() {
rr := &dns.AAAA{
Hdr: dns.RR_Header{
Name: dnsName,
Rrtype: dns.TypeAAAA,
Class: dns.ClassINET,
Ttl: 300,
},
AAAA: ip.AsSlice(),
}
aaaaRecords = append(aaaaRecords, rr)
}
}
m.mutex.Lock()
if len(aRecords) > 0 {
aQuestion := dns.Question{
Name: dnsName,
Qtype: dns.TypeA,
Qclass: dns.ClassINET,
}
m.records[aQuestion] = aRecords
}
if len(aaaaRecords) > 0 {
aaaaQuestion := dns.Question{
Name: dnsName,
Qtype: dns.TypeAAAA,
Qclass: dns.ClassINET,
}
m.records[aaaaQuestion] = aaaaRecords
}
m.mutex.Unlock()
log.Debugf("added domain=%s with %d A records and %d AAAA records",
d.SafeString(), len(aRecords), len(aaaaRecords))
return nil
}
// PopulateFromConfig extracts and caches domains from the client configuration.
func (m *Resolver) PopulateFromConfig(ctx context.Context, mgmtURL *url.URL) error {
if mgmtURL == nil {
return nil
}
d, err := dnsconfig.ExtractValidDomain(mgmtURL.String())
if err != nil {
return fmt.Errorf("extract domain from URL: %w", err)
}
m.mutex.Lock()
m.mgmtDomain = &d
m.mutex.Unlock()
if err := m.AddDomain(ctx, d); err != nil {
return fmt.Errorf("add domain: %w", err)
}
return nil
}
// RemoveDomain removes a domain from the cache.
func (m *Resolver) RemoveDomain(d domain.Domain) error {
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
m.mutex.Lock()
defer m.mutex.Unlock()
aQuestion := dns.Question{
Name: dnsName,
Qtype: dns.TypeA,
Qclass: dns.ClassINET,
}
delete(m.records, aQuestion)
aaaaQuestion := dns.Question{
Name: dnsName,
Qtype: dns.TypeAAAA,
Qclass: dns.ClassINET,
}
delete(m.records, aaaaQuestion)
log.Debugf("removed domain=%s from cache", d.SafeString())
return nil
}
// GetCachedDomains returns a list of all cached domains.
func (m *Resolver) GetCachedDomains() domain.List {
m.mutex.RLock()
defer m.mutex.RUnlock()
domainSet := make(map[domain.Domain]struct{})
for question := range m.records {
domainName := strings.TrimSuffix(question.Name, ".")
domainSet[domain.Domain(domainName)] = struct{}{}
}
domains := make(domain.List, 0, len(domainSet))
for d := range domainSet {
domains = append(domains, d)
}
return domains
}
// UpdateFromServerDomains updates the cache with server domains from network configuration.
// It merges new domains with existing ones, replacing entire domain types when updated.
// Empty updates are ignored to prevent clearing infrastructure domains during partial updates.
func (m *Resolver) UpdateFromServerDomains(ctx context.Context, serverDomains dnsconfig.ServerDomains) (domain.List, error) {
newDomains := m.extractDomainsFromServerDomains(serverDomains)
var removedDomains domain.List
if len(newDomains) > 0 {
m.mutex.Lock()
if m.serverDomains == nil {
m.serverDomains = &dnsconfig.ServerDomains{}
}
updatedServerDomains := m.mergeServerDomains(*m.serverDomains, serverDomains)
m.serverDomains = &updatedServerDomains
m.mutex.Unlock()
allDomains := m.extractDomainsFromServerDomains(updatedServerDomains)
currentDomains := m.GetCachedDomains()
removedDomains = m.removeStaleDomains(currentDomains, allDomains)
}
m.addNewDomains(ctx, newDomains)
return removedDomains, nil
}
// removeStaleDomains removes cached domains not present in the target domain list.
// Management domains are preserved and never removed during server domain updates.
func (m *Resolver) removeStaleDomains(currentDomains, newDomains domain.List) domain.List {
var removedDomains domain.List
for _, currentDomain := range currentDomains {
if m.isDomainInList(currentDomain, newDomains) {
continue
}
if m.isManagementDomain(currentDomain) {
continue
}
removedDomains = append(removedDomains, currentDomain)
if err := m.RemoveDomain(currentDomain); err != nil {
log.Warnf("failed to remove domain=%s: %v", currentDomain.SafeString(), err)
}
}
return removedDomains
}
// mergeServerDomains merges new server domains with existing ones.
// When a domain type is provided in the new domains, it completely replaces that type.
func (m *Resolver) mergeServerDomains(existing, incoming dnsconfig.ServerDomains) dnsconfig.ServerDomains {
merged := existing
if incoming.Signal != "" {
merged.Signal = incoming.Signal
}
if len(incoming.Relay) > 0 {
merged.Relay = incoming.Relay
}
if incoming.Flow != "" {
merged.Flow = incoming.Flow
}
if len(incoming.Stuns) > 0 {
merged.Stuns = incoming.Stuns
}
if len(incoming.Turns) > 0 {
merged.Turns = incoming.Turns
}
return merged
}
// isDomainInList checks if domain exists in the list
func (m *Resolver) isDomainInList(domain domain.Domain, list domain.List) bool {
for _, d := range list {
if domain.SafeString() == d.SafeString() {
return true
}
}
return false
}
// isManagementDomain checks if domain is the protected management domain
func (m *Resolver) isManagementDomain(domain domain.Domain) bool {
m.mutex.RLock()
defer m.mutex.RUnlock()
return m.mgmtDomain != nil && domain == *m.mgmtDomain
}
// addNewDomains resolves and caches all domains from the update
func (m *Resolver) addNewDomains(ctx context.Context, newDomains domain.List) {
for _, newDomain := range newDomains {
if err := m.AddDomain(ctx, newDomain); err != nil {
log.Warnf("failed to add/update domain=%s: %v", newDomain.SafeString(), err)
} else {
log.Debugf("added/updated management cache domain=%s", newDomain.SafeString())
}
}
}
func (m *Resolver) extractDomainsFromServerDomains(serverDomains dnsconfig.ServerDomains) domain.List {
var domains domain.List
if serverDomains.Signal != "" {
domains = append(domains, serverDomains.Signal)
}
for _, relay := range serverDomains.Relay {
if relay != "" {
domains = append(domains, relay)
}
}
if serverDomains.Flow != "" {
domains = append(domains, serverDomains.Flow)
}
for _, stun := range serverDomains.Stuns {
if stun != "" {
domains = append(domains, stun)
}
}
for _, turn := range serverDomains.Turns {
if turn != "" {
domains = append(domains, turn)
}
}
return domains
}

View File

@@ -0,0 +1,416 @@
package mgmt
import (
"context"
"fmt"
"net/url"
"strings"
"testing"
"github.com/miekg/dns"
"github.com/stretchr/testify/assert"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/client/internal/dns/test"
"github.com/netbirdio/netbird/shared/management/domain"
)
func TestResolver_NewResolver(t *testing.T) {
resolver := NewResolver()
assert.NotNil(t, resolver)
assert.NotNil(t, resolver.records)
assert.False(t, resolver.MatchSubdomains())
}
func TestResolver_ExtractDomainFromURL(t *testing.T) {
tests := []struct {
name string
urlStr string
expectedDom string
expectError bool
}{
{
name: "HTTPS URL with port",
urlStr: "https://api.netbird.io:443",
expectedDom: "api.netbird.io",
expectError: false,
},
{
name: "HTTP URL without port",
urlStr: "http://signal.example.com",
expectedDom: "signal.example.com",
expectError: false,
},
{
name: "URL with path",
urlStr: "https://relay.netbird.io/status",
expectedDom: "relay.netbird.io",
expectError: false,
},
{
name: "Invalid URL",
urlStr: "not-a-valid-url",
expectedDom: "not-a-valid-url",
expectError: false,
},
{
name: "Empty URL",
urlStr: "",
expectedDom: "",
expectError: true,
},
{
name: "STUN URL",
urlStr: "stun:stun.example.com:3478",
expectedDom: "stun.example.com",
expectError: false,
},
{
name: "TURN URL",
urlStr: "turn:turn.example.com:3478",
expectedDom: "turn.example.com",
expectError: false,
},
{
name: "REL URL",
urlStr: "rel://relay.example.com:443",
expectedDom: "relay.example.com",
expectError: false,
},
{
name: "RELS URL",
urlStr: "rels://relay.example.com:443",
expectedDom: "relay.example.com",
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var parsedURL *url.URL
var err error
if tt.urlStr != "" {
parsedURL, err = url.Parse(tt.urlStr)
if err != nil && !tt.expectError {
t.Fatalf("Failed to parse URL: %v", err)
}
}
domain, err := extractDomainFromURL(parsedURL)
if tt.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.expectedDom, domain.SafeString())
}
})
}
}
func TestResolver_PopulateFromConfig(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
resolver := NewResolver()
// Test with IP address - should return error since IP addresses are rejected
mgmtURL, _ := url.Parse("https://127.0.0.1")
err := resolver.PopulateFromConfig(ctx, mgmtURL)
assert.Error(t, err)
assert.ErrorIs(t, err, dnsconfig.ErrIPNotAllowed)
// No domains should be cached when using IP addresses
domains := resolver.GetCachedDomains()
assert.Equal(t, 0, len(domains), "No domains should be cached when using IP addresses")
}
func TestResolver_ServeDNS(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
// Add a test domain to the cache - use example.org which is reserved for testing
testDomain, err := domain.FromString("example.org")
if err != nil {
t.Fatalf("Failed to create domain: %v", err)
}
err = resolver.AddDomain(ctx, testDomain)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
// Test A record query for cached domain
t.Run("Cached domain A record", func(t *testing.T) {
var capturedMsg *dns.Msg
mockWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error {
capturedMsg = m
return nil
},
}
req := new(dns.Msg)
req.SetQuestion("example.org.", dns.TypeA)
resolver.ServeDNS(mockWriter, req)
assert.NotNil(t, capturedMsg)
assert.Equal(t, dns.RcodeSuccess, capturedMsg.Rcode)
assert.True(t, len(capturedMsg.Answer) > 0, "Should have at least one answer")
})
// Test uncached domain signals to continue to next handler
t.Run("Uncached domain signals continue to next handler", func(t *testing.T) {
var capturedMsg *dns.Msg
mockWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error {
capturedMsg = m
return nil
},
}
req := new(dns.Msg)
req.SetQuestion("unknown.example.com.", dns.TypeA)
resolver.ServeDNS(mockWriter, req)
assert.NotNil(t, capturedMsg)
assert.Equal(t, dns.RcodeNameError, capturedMsg.Rcode)
// Zero flag set to true signals the handler chain to continue to next handler
assert.True(t, capturedMsg.MsgHdr.Zero, "Zero flag should be set to signal continuation to next handler")
assert.Empty(t, capturedMsg.Answer, "Should have no answers for uncached domain")
})
// Test that subdomains of cached domains are NOT resolved
t.Run("Subdomains of cached domains are not resolved", func(t *testing.T) {
var capturedMsg *dns.Msg
mockWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error {
capturedMsg = m
return nil
},
}
// Query for a subdomain of our cached domain
req := new(dns.Msg)
req.SetQuestion("sub.example.org.", dns.TypeA)
resolver.ServeDNS(mockWriter, req)
assert.NotNil(t, capturedMsg)
assert.Equal(t, dns.RcodeNameError, capturedMsg.Rcode)
assert.True(t, capturedMsg.MsgHdr.Zero, "Should signal continuation to next handler for subdomains")
assert.Empty(t, capturedMsg.Answer, "Should have no answers for subdomains")
})
// Test case-insensitive matching
t.Run("Case-insensitive domain matching", func(t *testing.T) {
var capturedMsg *dns.Msg
mockWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error {
capturedMsg = m
return nil
},
}
// Query with different casing
req := new(dns.Msg)
req.SetQuestion("EXAMPLE.ORG.", dns.TypeA)
resolver.ServeDNS(mockWriter, req)
assert.NotNil(t, capturedMsg)
assert.Equal(t, dns.RcodeSuccess, capturedMsg.Rcode)
assert.True(t, len(capturedMsg.Answer) > 0, "Should resolve regardless of case")
})
}
func TestResolver_GetCachedDomains(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
testDomain, err := domain.FromString("example.org")
if err != nil {
t.Fatalf("Failed to create domain: %v", err)
}
err = resolver.AddDomain(ctx, testDomain)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
cachedDomains := resolver.GetCachedDomains()
assert.Equal(t, 1, len(cachedDomains), "Should return exactly one domain for single added domain")
assert.Equal(t, testDomain.SafeString(), cachedDomains[0].SafeString(), "Cached domain should match original")
assert.False(t, strings.HasSuffix(cachedDomains[0].PunycodeString(), "."), "Domain should not have trailing dot")
}
func TestResolver_ManagementDomainProtection(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
mgmtURL, _ := url.Parse("https://example.org")
err := resolver.PopulateFromConfig(ctx, mgmtURL)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
initialDomains := resolver.GetCachedDomains()
if len(initialDomains) == 0 {
t.Skip("Management domain failed to resolve, skipping test")
}
assert.Equal(t, 1, len(initialDomains), "Should have management domain cached")
assert.Equal(t, "example.org", initialDomains[0].SafeString())
serverDomains := dnsconfig.ServerDomains{
Signal: "google.com",
Relay: []domain.Domain{"cloudflare.com"},
}
_, err = resolver.UpdateFromServerDomains(ctx, serverDomains)
if err != nil {
t.Logf("Server domains update failed: %v", err)
}
finalDomains := resolver.GetCachedDomains()
managementStillCached := false
for _, d := range finalDomains {
if d.SafeString() == "example.org" {
managementStillCached = true
break
}
}
assert.True(t, managementStillCached, "Management domain should never be removed")
}
// extractDomainFromURL extracts a domain from a URL - test helper function
func extractDomainFromURL(u *url.URL) (domain.Domain, error) {
if u == nil {
return "", fmt.Errorf("URL is nil")
}
return dnsconfig.ExtractValidDomain(u.String())
}
func TestResolver_EmptyUpdateDoesNotRemoveDomains(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
// Set up initial domains using resolvable domains
initialDomains := dnsconfig.ServerDomains{
Signal: "example.org",
Stuns: []domain.Domain{"google.com"},
Turns: []domain.Domain{"cloudflare.com"},
}
// Add initial domains
_, err := resolver.UpdateFromServerDomains(ctx, initialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
// Verify domains were added
cachedDomains := resolver.GetCachedDomains()
assert.Len(t, cachedDomains, 3)
// Update with empty ServerDomains (simulating partial network map update)
emptyDomains := dnsconfig.ServerDomains{}
removedDomains, err := resolver.UpdateFromServerDomains(ctx, emptyDomains)
assert.NoError(t, err)
// Verify no domains were removed
assert.Len(t, removedDomains, 0, "No domains should be removed when update is empty")
// Verify all original domains are still cached
finalDomains := resolver.GetCachedDomains()
assert.Len(t, finalDomains, 3, "All original domains should still be cached")
}
func TestResolver_PartialUpdateReplacesOnlyUpdatedTypes(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
// Set up initial complete domains using resolvable domains
initialDomains := dnsconfig.ServerDomains{
Signal: "example.org",
Stuns: []domain.Domain{"google.com"},
Turns: []domain.Domain{"cloudflare.com"},
}
// Add initial domains
_, err := resolver.UpdateFromServerDomains(ctx, initialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
assert.Len(t, resolver.GetCachedDomains(), 3)
// Update with partial ServerDomains (only signal domain - this should replace signal but preserve stun/turn)
partialDomains := dnsconfig.ServerDomains{
Signal: "github.com",
}
removedDomains, err := resolver.UpdateFromServerDomains(ctx, partialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
// Should remove only the old signal domain
assert.Len(t, removedDomains, 1, "Should remove only the old signal domain")
assert.Equal(t, "example.org", removedDomains[0].SafeString())
finalDomains := resolver.GetCachedDomains()
assert.Len(t, finalDomains, 3, "Should have new signal plus preserved stun/turn domains")
domainStrings := make([]string, len(finalDomains))
for i, d := range finalDomains {
domainStrings[i] = d.SafeString()
}
assert.Contains(t, domainStrings, "github.com")
assert.Contains(t, domainStrings, "google.com")
assert.Contains(t, domainStrings, "cloudflare.com")
assert.NotContains(t, domainStrings, "example.org")
}
func TestResolver_PartialUpdateAddsNewTypePreservesExisting(t *testing.T) {
resolver := NewResolver()
ctx := context.Background()
// Set up initial complete domains using resolvable domains
initialDomains := dnsconfig.ServerDomains{
Signal: "example.org",
Stuns: []domain.Domain{"google.com"},
Turns: []domain.Domain{"cloudflare.com"},
}
// Add initial domains
_, err := resolver.UpdateFromServerDomains(ctx, initialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
assert.Len(t, resolver.GetCachedDomains(), 3)
// Update with partial ServerDomains (only flow domain - new type, should preserve all existing)
partialDomains := dnsconfig.ServerDomains{
Flow: "github.com",
}
removedDomains, err := resolver.UpdateFromServerDomains(ctx, partialDomains)
if err != nil {
t.Skipf("Skipping test due to DNS resolution failure: %v", err)
}
assert.Len(t, removedDomains, 0, "Should not remove any domains when adding new type")
finalDomains := resolver.GetCachedDomains()
assert.Len(t, finalDomains, 4, "Should have all original domains plus new flow domain")
domainStrings := make([]string, len(finalDomains))
for i, d := range finalDomains {
domainStrings[i] = d.SafeString()
}
assert.Contains(t, domainStrings, "example.org")
assert.Contains(t, domainStrings, "google.com")
assert.Contains(t, domainStrings, "cloudflare.com")
assert.Contains(t, domainStrings, "github.com")
}

View File

@@ -3,20 +3,23 @@ package dns
import (
"fmt"
"net/netip"
"net/url"
"github.com/miekg/dns"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/shared/management/domain"
)
// MockServer is the mock instance of a dns server
type MockServer struct {
InitializeFunc func() error
StopFunc func()
UpdateDNSServerFunc func(serial uint64, update nbdns.Config) error
RegisterHandlerFunc func(domain.List, dns.Handler, int)
DeregisterHandlerFunc func(domain.List, int)
InitializeFunc func() error
StopFunc func()
UpdateDNSServerFunc func(serial uint64, update nbdns.Config) error
RegisterHandlerFunc func(domain.List, dns.Handler, int)
DeregisterHandlerFunc func(domain.List, int)
UpdateServerConfigFunc func(domains dnsconfig.ServerDomains) error
}
func (m *MockServer) RegisterHandler(domains domain.List, handler dns.Handler, priority int) {
@@ -70,3 +73,14 @@ func (m *MockServer) SearchDomains() []string {
// ProbeAvailability mocks implementation of ProbeAvailability from the Server interface
func (m *MockServer) ProbeAvailability() {
}
func (m *MockServer) UpdateServerConfig(domains dnsconfig.ServerDomains) error {
if m.UpdateServerConfigFunc != nil {
return m.UpdateServerConfigFunc(domains)
}
return nil
}
func (m *MockServer) PopulateManagementDomain(mgmtURL *url.URL) error {
return nil
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/netip"
"net/url"
"runtime"
"strings"
"sync"
@@ -15,7 +16,9 @@ import (
"golang.org/x/exp/maps"
"github.com/netbirdio/netbird/client/iface/netstack"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/client/internal/dns/local"
"github.com/netbirdio/netbird/client/internal/dns/mgmt"
"github.com/netbirdio/netbird/client/internal/dns/types"
"github.com/netbirdio/netbird/client/internal/listener"
"github.com/netbirdio/netbird/client/internal/peer"
@@ -45,6 +48,8 @@ type Server interface {
OnUpdatedHostDNSServer(addrs []netip.AddrPort)
SearchDomains() []string
ProbeAvailability()
UpdateServerConfig(domains dnsconfig.ServerDomains) error
PopulateManagementDomain(mgmtURL *url.URL) error
}
type nsGroupsByDomain struct {
@@ -77,6 +82,8 @@ type DefaultServer struct {
handlerChain *HandlerChain
extraDomains map[domain.Domain]int
mgmtCacheResolver *mgmt.Resolver
// permanent related properties
permanent bool
hostsDNSHolder *hostsDNSHolder
@@ -104,18 +111,20 @@ type handlerWrapper struct {
type registeredHandlerMap map[types.HandlerID]handlerWrapper
// DefaultServerConfig holds configuration parameters for NewDefaultServer
type DefaultServerConfig struct {
WgInterface WGIface
CustomAddress string
StatusRecorder *peer.Status
StateManager *statemanager.Manager
DisableSys bool
}
// NewDefaultServer returns a new dns server
func NewDefaultServer(
ctx context.Context,
wgInterface WGIface,
customAddress string,
statusRecorder *peer.Status,
stateManager *statemanager.Manager,
disableSys bool,
) (*DefaultServer, error) {
func NewDefaultServer(ctx context.Context, config DefaultServerConfig) (*DefaultServer, error) {
var addrPort *netip.AddrPort
if customAddress != "" {
parsedAddrPort, err := netip.ParseAddrPort(customAddress)
if config.CustomAddress != "" {
parsedAddrPort, err := netip.ParseAddrPort(config.CustomAddress)
if err != nil {
return nil, fmt.Errorf("unable to parse the custom dns address, got error: %s", err)
}
@@ -123,13 +132,14 @@ func NewDefaultServer(
}
var dnsService service
if wgInterface.IsUserspaceBind() {
dnsService = NewServiceViaMemory(wgInterface)
if config.WgInterface.IsUserspaceBind() {
dnsService = NewServiceViaMemory(config.WgInterface)
} else {
dnsService = newServiceViaListener(wgInterface, addrPort)
dnsService = newServiceViaListener(config.WgInterface, addrPort)
}
return newDefaultServer(ctx, wgInterface, dnsService, statusRecorder, stateManager, disableSys), nil
server := newDefaultServer(ctx, config.WgInterface, dnsService, config.StatusRecorder, config.StateManager, config.DisableSys)
return server, nil
}
// NewDefaultServerPermanentUpstream returns a new dns server. It optimized for mobile systems
@@ -178,20 +188,24 @@ func newDefaultServer(
) *DefaultServer {
handlerChain := NewHandlerChain()
ctx, stop := context.WithCancel(ctx)
mgmtCacheResolver := mgmt.NewResolver()
defaultServer := &DefaultServer{
ctx: ctx,
ctxCancel: stop,
disableSys: disableSys,
service: dnsService,
handlerChain: handlerChain,
extraDomains: make(map[domain.Domain]int),
dnsMuxMap: make(registeredHandlerMap),
localResolver: local.NewResolver(),
wgInterface: wgInterface,
statusRecorder: statusRecorder,
stateManager: stateManager,
hostsDNSHolder: newHostsDNSHolder(),
hostManager: &noopHostConfigurator{},
ctx: ctx,
ctxCancel: stop,
disableSys: disableSys,
service: dnsService,
handlerChain: handlerChain,
extraDomains: make(map[domain.Domain]int),
dnsMuxMap: make(registeredHandlerMap),
localResolver: local.NewResolver(),
wgInterface: wgInterface,
statusRecorder: statusRecorder,
stateManager: stateManager,
hostsDNSHolder: newHostsDNSHolder(),
hostManager: &noopHostConfigurator{},
mgmtCacheResolver: mgmtCacheResolver,
}
// register with root zone, handler chain takes care of the routing
@@ -217,7 +231,7 @@ func (s *DefaultServer) RegisterHandler(domains domain.List, handler dns.Handler
}
func (s *DefaultServer) registerHandler(domains []string, handler dns.Handler, priority int) {
log.Debugf("registering handler %s with priority %d", handler, priority)
log.Debugf("registering handler %s with priority %d for %v", handler, priority, domains)
for _, domain := range domains {
if domain == "" {
@@ -246,7 +260,7 @@ func (s *DefaultServer) DeregisterHandler(domains domain.List, priority int) {
}
func (s *DefaultServer) deregisterHandler(domains []string, priority int) {
log.Debugf("deregistering handler %v with priority %d", domains, priority)
log.Debugf("deregistering handler with priority %d for %v", priority, domains)
for _, domain := range domains {
if domain == "" {
@@ -432,6 +446,29 @@ func (s *DefaultServer) ProbeAvailability() {
wg.Wait()
}
func (s *DefaultServer) UpdateServerConfig(domains dnsconfig.ServerDomains) error {
s.mux.Lock()
defer s.mux.Unlock()
if s.mgmtCacheResolver != nil {
removedDomains, err := s.mgmtCacheResolver.UpdateFromServerDomains(s.ctx, domains)
if err != nil {
return fmt.Errorf("update management cache resolver: %w", err)
}
if len(removedDomains) > 0 {
s.deregisterHandler(removedDomains.ToPunycodeList(), PriorityMgmtCache)
}
newDomains := s.mgmtCacheResolver.GetCachedDomains()
if len(newDomains) > 0 {
s.registerHandler(newDomains.ToPunycodeList(), s.mgmtCacheResolver, PriorityMgmtCache)
}
}
return nil
}
func (s *DefaultServer) applyConfiguration(update nbdns.Config) error {
// is the service should be Disabled, we stop the listener or fake resolver
if update.ServiceEnable {
@@ -961,3 +998,11 @@ func toZone(d domain.Domain) domain.Domain {
),
)
}
// PopulateManagementDomain populates the DNS cache with management domain
func (s *DefaultServer) PopulateManagementDomain(mgmtURL *url.URL) error {
if s.mgmtCacheResolver != nil {
return s.mgmtCacheResolver.PopulateFromConfig(s.ctx, mgmtURL)
}
return nil
}

View File

@@ -363,7 +363,13 @@ func TestUpdateDNSServer(t *testing.T) {
t.Log(err)
}
}()
dnsServer, err := NewDefaultServer(context.Background(), wgIface, "", peer.NewRecorder("mgm"), nil, false)
dnsServer, err := NewDefaultServer(context.Background(), DefaultServerConfig{
WgInterface: wgIface,
CustomAddress: "",
StatusRecorder: peer.NewRecorder("mgm"),
StateManager: nil,
DisableSys: false,
})
if err != nil {
t.Fatal(err)
}
@@ -473,7 +479,13 @@ func TestDNSFakeResolverHandleUpdates(t *testing.T) {
return
}
dnsServer, err := NewDefaultServer(context.Background(), wgIface, "", peer.NewRecorder("mgm"), nil, false)
dnsServer, err := NewDefaultServer(context.Background(), DefaultServerConfig{
WgInterface: wgIface,
CustomAddress: "",
StatusRecorder: peer.NewRecorder("mgm"),
StateManager: nil,
DisableSys: false,
})
if err != nil {
t.Errorf("create DNS server: %v", err)
return
@@ -575,7 +587,13 @@ func TestDNSServerStartStop(t *testing.T) {
for _, testCase := range testCases {
t.Run(testCase.name, func(t *testing.T) {
dnsServer, err := NewDefaultServer(context.Background(), &mocWGIface{}, testCase.addrPort, peer.NewRecorder("mgm"), nil, false)
dnsServer, err := NewDefaultServer(context.Background(), DefaultServerConfig{
WgInterface: &mocWGIface{},
CustomAddress: testCase.addrPort,
StatusRecorder: peer.NewRecorder("mgm"),
StateManager: nil,
DisableSys: false,
})
if err != nil {
t.Fatalf("%v", err)
}

View File

@@ -33,9 +33,11 @@ func SetCurrentMTU(mtu uint16) {
}
const (
UpstreamTimeout = 15 * time.Second
UpstreamTimeout = 4 * time.Second
// ClientTimeout is the timeout for the dns.Client.
// Set longer than UpstreamTimeout to ensure context timeout takes precedence
ClientTimeout = 5 * time.Second
failsTillDeact = int32(5)
reactivatePeriod = 30 * time.Second
probeTimeout = 2 * time.Second
)
@@ -58,9 +60,7 @@ type upstreamResolverBase struct {
upstreamServers []netip.AddrPort
domain string
disabled bool
failsCount atomic.Int32
successCount atomic.Int32
failsTillDeact int32
mutex sync.Mutex
reactivatePeriod time.Duration
upstreamTimeout time.Duration
@@ -79,14 +79,13 @@ func newUpstreamResolverBase(ctx context.Context, statusRecorder *peer.Status, d
domain: domain,
upstreamTimeout: UpstreamTimeout,
reactivatePeriod: reactivatePeriod,
failsTillDeact: failsTillDeact,
statusRecorder: statusRecorder,
}
}
// String returns a string representation of the upstream resolver
func (u *upstreamResolverBase) String() string {
return fmt.Sprintf("upstream %s", u.upstreamServers)
return fmt.Sprintf("Upstream %s", u.upstreamServers)
}
// ID returns the unique handler ID
@@ -116,58 +115,102 @@ func (u *upstreamResolverBase) Stop() {
func (u *upstreamResolverBase) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
requestID := GenerateRequestID()
logger := log.WithField("request_id", requestID)
var err error
defer func() {
u.checkUpstreamFails(err)
}()
logger.Tracef("received upstream question: domain=%s type=%v class=%v", r.Question[0].Name, r.Question[0].Qtype, r.Question[0].Qclass)
u.prepareRequest(r)
if u.ctx.Err() != nil {
logger.Tracef("%s has been stopped", u)
return
}
if u.tryUpstreamServers(w, r, logger) {
return
}
u.writeErrorResponse(w, r, logger)
}
func (u *upstreamResolverBase) prepareRequest(r *dns.Msg) {
if r.Extra == nil {
r.MsgHdr.AuthenticatedData = true
}
}
select {
case <-u.ctx.Done():
logger.Tracef("%s has been stopped", u)
return
default:
func (u *upstreamResolverBase) tryUpstreamServers(w dns.ResponseWriter, r *dns.Msg, logger *log.Entry) bool {
timeout := u.upstreamTimeout
if len(u.upstreamServers) > 1 {
maxTotal := 5 * time.Second
minPerUpstream := 2 * time.Second
scaledTimeout := maxTotal / time.Duration(len(u.upstreamServers))
if scaledTimeout > minPerUpstream {
timeout = scaledTimeout
} else {
timeout = minPerUpstream
}
}
for _, upstream := range u.upstreamServers {
var rm *dns.Msg
var t time.Duration
func() {
ctx, cancel := context.WithTimeout(u.ctx, u.upstreamTimeout)
defer cancel()
rm, t, err = u.upstreamClient.exchange(ctx, upstream.String(), r)
}()
if err != nil {
if errors.Is(err, context.DeadlineExceeded) || isTimeout(err) {
logger.Warnf("upstream %s timed out for question domain=%s", upstream, r.Question[0].Name)
continue
}
logger.Warnf("failed to query upstream %s for question domain=%s: %s", upstream, r.Question[0].Name, err)
continue
if u.queryUpstream(w, r, upstream, timeout, logger) {
return true
}
}
return false
}
if rm == nil || !rm.Response {
logger.Warnf("no response from upstream %s for question domain=%s", upstream, r.Question[0].Name)
continue
}
func (u *upstreamResolverBase) queryUpstream(w dns.ResponseWriter, r *dns.Msg, upstream netip.AddrPort, timeout time.Duration, logger *log.Entry) bool {
var rm *dns.Msg
var t time.Duration
var err error
u.successCount.Add(1)
logger.Tracef("took %s to query the upstream %s for question domain=%s", t, upstream, r.Question[0].Name)
var startTime time.Time
func() {
ctx, cancel := context.WithTimeout(u.ctx, timeout)
defer cancel()
startTime = time.Now()
rm, t, err = u.upstreamClient.exchange(ctx, upstream.String(), r)
}()
if err = w.WriteMsg(rm); err != nil {
logger.Errorf("failed to write DNS response for question domain=%s: %s", r.Question[0].Name, err)
}
// count the fails only if they happen sequentially
u.failsCount.Store(0)
if err != nil {
u.handleUpstreamError(err, upstream, r.Question[0].Name, startTime, timeout, logger)
return false
}
if rm == nil || !rm.Response {
logger.Warnf("no response from upstream %s for question domain=%s", upstream, r.Question[0].Name)
return false
}
return u.writeSuccessResponse(w, rm, upstream, r.Question[0].Name, t, logger)
}
func (u *upstreamResolverBase) handleUpstreamError(err error, upstream netip.AddrPort, domain string, startTime time.Time, timeout time.Duration, logger *log.Entry) {
if !errors.Is(err, context.DeadlineExceeded) && !isTimeout(err) {
logger.Warnf("failed to query upstream %s for question domain=%s: %s", upstream, domain, err)
return
}
u.failsCount.Add(1)
elapsed := time.Since(startTime)
timeoutMsg := fmt.Sprintf("upstream %s timed out for question domain=%s after %v (timeout=%v)", upstream, domain, elapsed.Truncate(time.Millisecond), timeout)
if peerInfo := u.debugUpstreamTimeout(upstream); peerInfo != "" {
timeoutMsg += " " + peerInfo
}
timeoutMsg += fmt.Sprintf(" - error: %v", err)
logger.Warnf(timeoutMsg)
}
func (u *upstreamResolverBase) writeSuccessResponse(w dns.ResponseWriter, rm *dns.Msg, upstream netip.AddrPort, domain string, t time.Duration, logger *log.Entry) bool {
u.successCount.Add(1)
logger.Tracef("took %s to query the upstream %s for question domain=%s", t, upstream, domain)
if err := w.WriteMsg(rm); err != nil {
logger.Errorf("failed to write DNS response for question domain=%s: %s", domain, err)
}
return true
}
func (u *upstreamResolverBase) writeErrorResponse(w dns.ResponseWriter, r *dns.Msg, logger *log.Entry) {
logger.Errorf("all queries to the %s failed for question domain=%s", u, r.Question[0].Name)
m := new(dns.Msg)
@@ -177,41 +220,6 @@ func (u *upstreamResolverBase) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
}
}
// checkUpstreamFails counts fails and disables or enables upstream resolving
//
// If fails count is greater that failsTillDeact, upstream resolving
// will be disabled for reactivatePeriod, after that time period fails counter
// will be reset and upstream will be reactivated.
func (u *upstreamResolverBase) checkUpstreamFails(err error) {
u.mutex.Lock()
defer u.mutex.Unlock()
if u.failsCount.Load() < u.failsTillDeact || u.disabled {
return
}
select {
case <-u.ctx.Done():
return
default:
}
u.disable(err)
if u.statusRecorder == nil {
return
}
u.statusRecorder.PublishEvent(
proto.SystemEvent_WARNING,
proto.SystemEvent_DNS,
"All upstream servers failed (fail count exceeded)",
"Unable to reach one or more DNS servers. This might affect your ability to connect to some services.",
map[string]string{"upstreams": u.upstreamServersString()},
// TODO add domain meta
)
}
// ProbeAvailability tests all upstream servers simultaneously and
// disables the resolver if none work
func (u *upstreamResolverBase) ProbeAvailability() {
@@ -224,8 +232,8 @@ func (u *upstreamResolverBase) ProbeAvailability() {
default:
}
// avoid probe if upstreams could resolve at least one query and fails count is less than failsTillDeact
if u.successCount.Load() > 0 && u.failsCount.Load() < u.failsTillDeact {
// avoid probe if upstreams could resolve at least one query
if u.successCount.Load() > 0 {
return
}
@@ -312,7 +320,6 @@ func (u *upstreamResolverBase) waitUntilResponse() {
}
log.Infof("upstreams %s are responsive again. Adding them back to system", u.upstreamServersString())
u.failsCount.Store(0)
u.successCount.Add(1)
u.reactivate()
u.disabled = false
@@ -416,3 +423,80 @@ func GenerateRequestID() string {
}
return hex.EncodeToString(bytes)
}
// FormatPeerStatus formats peer connection status information for debugging DNS timeouts
func FormatPeerStatus(peerState *peer.State) string {
isConnected := peerState.ConnStatus == peer.StatusConnected
hasRecentHandshake := !peerState.LastWireguardHandshake.IsZero() &&
time.Since(peerState.LastWireguardHandshake) < 3*time.Minute
statusInfo := fmt.Sprintf("%s:%s", peerState.FQDN, peerState.IP)
switch {
case !isConnected:
statusInfo += " DISCONNECTED"
case !hasRecentHandshake:
statusInfo += " NO_RECENT_HANDSHAKE"
default:
statusInfo += " connected"
}
if !peerState.LastWireguardHandshake.IsZero() {
timeSinceHandshake := time.Since(peerState.LastWireguardHandshake)
statusInfo += fmt.Sprintf(" last_handshake=%v_ago", timeSinceHandshake.Truncate(time.Second))
} else {
statusInfo += " no_handshake"
}
if peerState.Relayed {
statusInfo += " via_relay"
}
if peerState.Latency > 0 {
statusInfo += fmt.Sprintf(" latency=%v", peerState.Latency)
}
return statusInfo
}
// findPeerForIP finds which peer handles the given IP address
func findPeerForIP(ip netip.Addr, statusRecorder *peer.Status) *peer.State {
if statusRecorder == nil {
return nil
}
fullStatus := statusRecorder.GetFullStatus()
var bestMatch *peer.State
var bestPrefixLen int
for _, peerState := range fullStatus.Peers {
routes := peerState.GetRoutes()
for route := range routes {
prefix, err := netip.ParsePrefix(route)
if err != nil {
continue
}
if prefix.Contains(ip) && prefix.Bits() > bestPrefixLen {
peerStateCopy := peerState
bestMatch = &peerStateCopy
bestPrefixLen = prefix.Bits()
}
}
}
return bestMatch
}
func (u *upstreamResolverBase) debugUpstreamTimeout(upstream netip.AddrPort) string {
if u.statusRecorder == nil {
return ""
}
peerInfo := findPeerForIP(upstream.Addr(), u.statusRecorder)
if peerInfo == nil {
return ""
}
return fmt.Sprintf("(routes through NetBird peer %s)", FormatPeerStatus(peerInfo))
}

View File

@@ -50,7 +50,9 @@ func (u *upstreamResolver) exchange(ctx context.Context, upstream string, r *dns
}
func (u *upstreamResolver) exchangeWithinVPN(ctx context.Context, upstream string, r *dns.Msg) (rm *dns.Msg, t time.Duration, err error) {
upstreamExchangeClient := &dns.Client{}
upstreamExchangeClient := &dns.Client{
Timeout: ClientTimeout,
}
return upstreamExchangeClient.ExchangeContext(ctx, r, upstream)
}
@@ -72,10 +74,11 @@ func (u *upstreamResolver) exchangeWithoutVPN(ctx context.Context, upstream stri
}
upstreamExchangeClient := &dns.Client{
Dialer: dialer,
Dialer: dialer,
Timeout: timeout,
}
return upstreamExchangeClient.Exchange(r, upstream)
return upstreamExchangeClient.ExchangeContext(ctx, r, upstream)
}
func (u *upstreamResolver) isLocalResolver(upstream string) bool {

View File

@@ -34,7 +34,10 @@ func newUpstreamResolver(
}
func (u *upstreamResolver) exchange(ctx context.Context, upstream string, r *dns.Msg) (rm *dns.Msg, t time.Duration, err error) {
return ExchangeWithFallback(ctx, &dns.Client{}, r, upstream)
client := &dns.Client{
Timeout: ClientTimeout,
}
return ExchangeWithFallback(ctx, client, r, upstream)
}
func GetClientPrivate(ip netip.Addr, interfaceName string, dialTimeout time.Duration) (*dns.Client, error) {

View File

@@ -47,7 +47,9 @@ func newUpstreamResolver(
}
func (u *upstreamResolverIOS) exchange(ctx context.Context, upstream string, r *dns.Msg) (rm *dns.Msg, t time.Duration, err error) {
client := &dns.Client{}
client := &dns.Client{
Timeout: ClientTimeout,
}
upstreamHost, _, err := net.SplitHostPort(upstream)
if err != nil {
return nil, 0, fmt.Errorf("error while parsing upstream host: %s", err)
@@ -110,7 +112,8 @@ func GetClientPrivate(ip netip.Addr, interfaceName string, dialTimeout time.Dura
},
}
client := &dns.Client{
Dialer: dialer,
Dialer: dialer,
Timeout: dialTimeout,
}
return client, nil
}

View File

@@ -124,29 +124,26 @@ func (c mockUpstreamResolver) exchange(_ context.Context, _ string, _ *dns.Msg)
}
func TestUpstreamResolver_DeactivationReactivation(t *testing.T) {
mockClient := &mockUpstreamResolver{
err: dns.ErrTime,
r: new(dns.Msg),
rtt: time.Millisecond,
}
resolver := &upstreamResolverBase{
ctx: context.TODO(),
upstreamClient: &mockUpstreamResolver{
err: nil,
r: new(dns.Msg),
rtt: time.Millisecond,
},
ctx: context.TODO(),
upstreamClient: mockClient,
upstreamTimeout: UpstreamTimeout,
reactivatePeriod: reactivatePeriod,
failsTillDeact: failsTillDeact,
reactivatePeriod: time.Microsecond * 100,
}
addrPort, _ := netip.ParseAddrPort("0.0.0.0:1") // Use valid port for parsing, test will still fail on connection
resolver.upstreamServers = []netip.AddrPort{netip.AddrPortFrom(addrPort.Addr().Unmap(), addrPort.Port())}
resolver.failsTillDeact = 0
resolver.reactivatePeriod = time.Microsecond * 100
responseWriter := &test.MockResponseWriter{
WriteMsgFunc: func(m *dns.Msg) error { return nil },
}
failed := false
resolver.deactivate = func(error) {
failed = true
// After deactivation, make the mock client work again
mockClient.err = nil
}
reactivated := false
@@ -154,7 +151,7 @@ func TestUpstreamResolver_DeactivationReactivation(t *testing.T) {
reactivated = true
}
resolver.ServeDNS(responseWriter, new(dns.Msg).SetQuestion("one.one.one.one.", dns.TypeA))
resolver.ProbeAvailability()
if !failed {
t.Errorf("expected that resolving was deactivated")
@@ -173,11 +170,6 @@ func TestUpstreamResolver_DeactivationReactivation(t *testing.T) {
return
}
if resolver.failsCount.Load() != 0 {
t.Errorf("fails count after reactivation should be 0")
return
}
if resolver.disabled {
t.Errorf("should be enabled")
}

View File

@@ -7,6 +7,7 @@ import (
"math/rand"
"net"
"net/netip"
"net/url"
"os"
"reflect"
"runtime"
@@ -17,8 +18,8 @@ import (
"time"
"github.com/hashicorp/go-multierror"
"github.com/pion/ice/v3"
"github.com/pion/stun/v2"
"github.com/pion/ice/v4"
"github.com/pion/stun/v3"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@@ -33,6 +34,7 @@ import (
nbnetstack "github.com/netbirdio/netbird/client/iface/netstack"
"github.com/netbirdio/netbird/client/internal/acl"
"github.com/netbirdio/netbird/client/internal/dns"
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
"github.com/netbirdio/netbird/client/internal/dnsfwd"
"github.com/netbirdio/netbird/client/internal/ingressgw"
"github.com/netbirdio/netbird/client/internal/netflow"
@@ -345,7 +347,7 @@ func (e *Engine) Stop() error {
// Start creates a new WireGuard tunnel interface and listens to events from Signal and Management services
// Connections to remote peers are not established here.
// However, they will be established once an event with a list of peers to connect to will be received from Management Service
func (e *Engine) Start() error {
func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL) error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
@@ -401,6 +403,11 @@ func (e *Engine) Start() error {
}
e.dnsServer = dnsServer
// Populate DNS cache with NetbirdConfig and management URL for early resolution
if err := e.PopulateNetbirdConfig(netbirdConfig, mgmtURL); err != nil {
log.Warnf("failed to populate DNS cache: %v", err)
}
e.routeManager = routemanager.NewManager(routemanager.ManagerConfig{
Context: e.ctx,
PublicKey: e.config.WgPrivateKey.PublicKey().String(),
@@ -661,6 +668,30 @@ func (e *Engine) removePeer(peerKey string) error {
return nil
}
// PopulateNetbirdConfig populates the DNS cache with infrastructure domains from login response
func (e *Engine) PopulateNetbirdConfig(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL) error {
if e.dnsServer == nil {
return nil
}
// Populate management URL if provided
if mgmtURL != nil {
if err := e.dnsServer.PopulateManagementDomain(mgmtURL); err != nil {
log.Warnf("failed to populate DNS cache with management URL: %v", err)
}
}
// Populate NetbirdConfig domains if provided
if netbirdConfig != nil {
serverDomains := dnsconfig.ExtractFromNetbirdConfig(netbirdConfig)
if err := e.dnsServer.UpdateServerConfig(serverDomains); err != nil {
return fmt.Errorf("update DNS server config from NetbirdConfig: %w", err)
}
}
return nil
}
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
@@ -692,6 +723,10 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
return fmt.Errorf("handle the flow configuration: %w", err)
}
if err := e.PopulateNetbirdConfig(wCfg, nil); err != nil {
log.Warnf("Failed to update DNS server config: %v", err)
}
// todo update signal
}
@@ -914,7 +949,6 @@ func (e *Engine) receiveManagementEvents() {
e.config.LazyConnectionEnabled,
)
// err = e.mgmClient.Sync(info, e.handleSync)
err = e.mgmClient.Sync(e.ctx, info, e.handleSync)
if err != nil {
// happens if management is unavailable for a long time.
@@ -925,7 +959,7 @@ func (e *Engine) receiveManagementEvents() {
}
log.Debugf("stopped receiving updates from Management Service")
}()
log.Debugf("connecting to Management Service updates stream")
log.Infof("connecting to Management Service updates stream")
}
func (e *Engine) updateSTUNs(stuns []*mgmProto.HostConfig) error {
@@ -1557,7 +1591,14 @@ func (e *Engine) newDnsServer(dnsConfig *nbdns.Config) (dns.Server, error) {
return dnsServer, nil
default:
dnsServer, err := dns.NewDefaultServer(e.ctx, e.wgInterface, e.config.CustomDNSAddress, e.statusRecorder, e.stateManager, e.config.DisableDNS)
dnsServer, err := dns.NewDefaultServer(e.ctx, dns.DefaultServerConfig{
WgInterface: e.wgInterface,
CustomAddress: e.config.CustomDNSAddress,
StatusRecorder: e.statusRecorder,
StateManager: e.stateManager,
DisableSys: e.config.DisableDNS,
})
if err != nil {
return nil, err
}

View File

@@ -266,7 +266,7 @@ func TestEngine_SSH(t *testing.T) {
},
}, nil
}
err = engine.Start()
err = engine.Start(nil, nil)
if err != nil {
t.Fatal(err)
}
@@ -612,7 +612,7 @@ func TestEngine_Sync(t *testing.T) {
}
}()
err = engine.Start()
err = engine.Start(nil, nil)
if err != nil {
t.Fatal(err)
return
@@ -1069,7 +1069,7 @@ func TestEngine_MultiplePeers(t *testing.T) {
defer mu.Unlock()
guid := fmt.Sprintf("{%s}", uuid.New().String())
device.CustomWindowsGUIDString = strings.ToLower(guid)
err = engine.Start()
err = engine.Start(nil, nil)
if err != nil {
t.Errorf("unable to start engine for peer %d with error %v", j, err)
wg.Done()

View File

@@ -40,7 +40,7 @@ func IsLoginRequired(ctx context.Context, config *profilemanager.Config) (bool,
return false, err
}
_, err = doMgmLogin(ctx, mgmClient, pubSSHKey, config)
_, _, err = doMgmLogin(ctx, mgmClient, pubSSHKey, config)
if isLoginNeeded(err) {
return true, nil
}
@@ -69,14 +69,18 @@ func Login(ctx context.Context, config *profilemanager.Config, setupKey string,
return err
}
serverKey, err := doMgmLogin(ctx, mgmClient, pubSSHKey, config)
serverKey, _, err := doMgmLogin(ctx, mgmClient, pubSSHKey, config)
if serverKey != nil && isRegistrationNeeded(err) {
log.Debugf("peer registration required")
_, err = registerPeer(ctx, *serverKey, mgmClient, setupKey, jwtToken, pubSSHKey, config)
if err != nil {
return err
}
} else if err != nil {
return err
}
return err
return nil
}
func getMgmClient(ctx context.Context, privateKey string, mgmURL *url.URL) (*mgm.GrpcClient, error) {
@@ -101,11 +105,11 @@ func getMgmClient(ctx context.Context, privateKey string, mgmURL *url.URL) (*mgm
return mgmClient, err
}
func doMgmLogin(ctx context.Context, mgmClient *mgm.GrpcClient, pubSSHKey []byte, config *profilemanager.Config) (*wgtypes.Key, error) {
func doMgmLogin(ctx context.Context, mgmClient *mgm.GrpcClient, pubSSHKey []byte, config *profilemanager.Config) (*wgtypes.Key, *mgmProto.LoginResponse, error) {
serverKey, err := mgmClient.GetServerPublicKey()
if err != nil {
log.Errorf("failed while getting Management Service public key: %v", err)
return nil, err
return nil, nil, err
}
sysInfo := system.GetInfo(ctx)
@@ -121,8 +125,8 @@ func doMgmLogin(ctx context.Context, mgmClient *mgm.GrpcClient, pubSSHKey []byte
config.BlockInbound,
config.LazyConnectionEnabled,
)
_, err = mgmClient.Login(*serverKey, sysInfo, pubSSHKey, config.DNSLabels)
return serverKey, err
loginResp, err := mgmClient.Login(*serverKey, sysInfo, pubSSHKey, config.DNSLabels)
return serverKey, loginResp, err
}
// registerPeer checks whether setupKey was provided via cmd line and if not then it prompts user to enter a key.

View File

@@ -11,7 +11,7 @@ import (
"sync"
"time"
"github.com/pion/ice/v3"
"github.com/pion/ice/v4"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@@ -376,12 +376,19 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
wgProxy.Work()
}
conn.Log.Infof("configure WireGuard endpoint to: %s", ep.String())
if err = conn.configureWGEndpoint(ep, iceConnInfo.RosenpassPubKey); err != nil {
conn.handleConfigurationFailure(err, wgProxy)
return
}
wgConfigWorkaround()
if conn.wgProxyRelay != nil {
conn.Log.Debugf("redirect packages from relayed conn to WireGuard")
conn.wgProxyRelay.RedirectAs(ep)
}
conn.currentConnPriority = priority
conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo)
@@ -419,6 +426,7 @@ func (conn *Conn) onICEStateDisconnected() {
defer conn.wgWatcherWg.Done()
conn.workerRelay.EnableWgWatcher(conn.ctx)
}()
conn.wgProxyRelay.Work()
conn.currentConnPriority = conntype.Relay
} else {
conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String())

View File

@@ -6,7 +6,7 @@ import (
"sync"
"time"
"github.com/pion/ice/v3"
"github.com/pion/ice/v4"
log "github.com/sirupsen/logrus"
icemaker "github.com/netbirdio/netbird/client/internal/peer/ice"

View File

@@ -43,13 +43,6 @@ type OfferAnswer struct {
SessionID *ICESessionID
}
func (oa *OfferAnswer) SessionIDString() string {
if oa.SessionID == nil {
return "unknown"
}
return oa.SessionID.String()
}
type Handshaker struct {
mu sync.Mutex
log *log.Entry
@@ -57,7 +50,7 @@ type Handshaker struct {
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
onNewOfferListeners []func(*OfferAnswer)
onNewOfferListeners []*OfferListener
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
remoteOffersCh chan OfferAnswer
@@ -78,7 +71,8 @@ func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *W
}
func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAnswer)) {
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
l := NewOfferListener(offer)
h.onNewOfferListeners = append(h.onNewOfferListeners, l)
}
func (h *Handshaker) Listen(ctx context.Context) {
@@ -91,13 +85,13 @@ func (h *Handshaker) Listen(ctx context.Context) {
continue
}
for _, listener := range h.onNewOfferListeners {
listener(&remoteOfferAnswer)
listener.Notify(&remoteOfferAnswer)
}
h.log.Infof("received offer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
case remoteOfferAnswer := <-h.remoteAnswerCh:
h.log.Infof("received answer, running version %s, remote WireGuard listen port %d, session id: %s", remoteOfferAnswer.Version, remoteOfferAnswer.WgListenPort, remoteOfferAnswer.SessionIDString())
for _, listener := range h.onNewOfferListeners {
listener(&remoteOfferAnswer)
listener.Notify(&remoteOfferAnswer)
}
case <-ctx.Done():
h.log.Infof("stop listening for remote offers and answers")

View File

@@ -0,0 +1,62 @@
package peer
import (
"sync"
)
type callbackFunc func(remoteOfferAnswer *OfferAnswer)
func (oa *OfferAnswer) SessionIDString() string {
if oa.SessionID == nil {
return "unknown"
}
return oa.SessionID.String()
}
type OfferListener struct {
fn callbackFunc
running bool
latest *OfferAnswer
mu sync.Mutex
}
func NewOfferListener(fn callbackFunc) *OfferListener {
return &OfferListener{
fn: fn,
}
}
func (o *OfferListener) Notify(remoteOfferAnswer *OfferAnswer) {
o.mu.Lock()
defer o.mu.Unlock()
// Store the latest offer
o.latest = remoteOfferAnswer
// If already running, the running goroutine will pick up this latest value
if o.running {
return
}
// Start processing
o.running = true
// Process in a goroutine to avoid blocking the caller
go func(remoteOfferAnswer *OfferAnswer) {
for {
o.fn(remoteOfferAnswer)
o.mu.Lock()
if o.latest == nil {
// No more work to do
o.running = false
o.mu.Unlock()
return
}
remoteOfferAnswer = o.latest
// Clear the latest to mark it as being processed
o.latest = nil
o.mu.Unlock()
}
}(remoteOfferAnswer)
}

View File

@@ -0,0 +1,39 @@
package peer
import (
"testing"
"time"
)
func Test_newOfferListener(t *testing.T) {
dummyOfferAnswer := &OfferAnswer{}
runChan := make(chan struct{}, 10)
longRunningFn := func(remoteOfferAnswer *OfferAnswer) {
time.Sleep(1 * time.Second)
runChan <- struct{}{}
}
hl := NewOfferListener(longRunningFn)
hl.Notify(dummyOfferAnswer)
hl.Notify(dummyOfferAnswer)
hl.Notify(dummyOfferAnswer)
// Wait for exactly 2 callbacks
for i := 0; i < 2; i++ {
select {
case <-runChan:
case <-time.After(3 * time.Second):
t.Fatal("Timeout waiting for callback")
}
}
// Verify no additional callbacks happen
select {
case <-runChan:
t.Fatal("Unexpected additional callback")
case <-time.After(100 * time.Millisecond):
t.Log("Correctly received exactly 2 callbacks")
}
}

View File

@@ -3,7 +3,7 @@ package ice
import (
"sync/atomic"
"github.com/pion/stun/v2"
"github.com/pion/stun/v3"
)
type StunTurn atomic.Value

View File

@@ -4,7 +4,7 @@ import (
"sync"
"time"
"github.com/pion/ice/v3"
"github.com/pion/ice/v4"
"github.com/pion/logging"
"github.com/pion/randutil"
log "github.com/sirupsen/logrus"

View File

@@ -1,7 +1,7 @@
package ice
import (
"github.com/pion/ice/v3"
"github.com/pion/ice/v4"
)
type Config struct {

View File

@@ -1,7 +1,7 @@
package peer
import (
"github.com/pion/ice/v3"
"github.com/pion/ice/v4"
log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"

View File

@@ -101,6 +101,10 @@ func (w *WGWatcher) periodicHandshakeCheck(ctx context.Context, ctxCancel contex
onDisconnectedFn()
return
}
if lastHandshake.IsZero() {
w.log.Infof("first wg handshake detected at: %s", handshake)
}
lastHandshake = *handshake
resetTime := time.Until(handshake.Add(checkPeriod))

View File

@@ -8,7 +8,7 @@ import (
"sync"
"time"
"github.com/pion/ice/v3"
"github.com/pion/ice/v4"
"github.com/pion/stun/v2"
log "github.com/sirupsen/logrus"
@@ -122,7 +122,6 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.log.Warnf("failed to close ICE agent: %s", err)
}
w.agent = nil
// todo consider to switch to Relay connection while establishing a new ICE connection
}
var preferredCandidateTypes []ice.CandidateType
@@ -214,10 +213,6 @@ func (w *WorkerICE) reCreateAgent(dialerCancel context.CancelFunc, candidates []
return nil, err
}
if err := agent.OnSuccessfulSelectedPairBindingResponse(w.onSuccessfulSelectedPairBindingResponse); err != nil {
return nil, fmt.Errorf("failed setting binding response callback: %w", err)
}
return agent, nil
}
@@ -253,6 +248,11 @@ func (w *WorkerICE) connect(ctx context.Context, agent *icemaker.ThreadSafeAgent
w.closeAgent(agent, w.agentDialerCancel)
return
}
if pair == nil {
w.log.Warnf("selected candidate pair is nil, cannot proceed")
w.closeAgent(agent, w.agentDialerCancel)
return
}
if !isRelayCandidate(pair.Local) {
// dynamically set remote WireGuard port if other side specified a different one from the default one
@@ -379,6 +379,27 @@ func (w *WorkerICE) onICECandidate(candidate ice.Candidate) {
func (w *WorkerICE) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidate) {
w.log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", c1.String(), c2.String(),
w.config.Key)
w.muxAgent.Lock()
pair, err := w.agent.GetSelectedCandidatePair()
if err != nil {
w.log.Warnf("failed to get selected candidate pair: %s", err)
w.muxAgent.Unlock()
return
}
if pair == nil {
w.log.Warnf("selected candidate pair is nil, cannot proceed")
w.muxAgent.Unlock()
return
}
w.muxAgent.Unlock()
duration := time.Duration(pair.CurrentRoundTripTime() * float64(time.Second))
if err := w.statusRecorder.UpdateLatency(w.config.Key, duration); err != nil {
w.log.Debugf("failed to update latency for peer: %s", err)
return
}
}
func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dialerCancel context.CancelFunc) func(ice.ConnectionState) {
@@ -388,7 +409,10 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia
case ice.ConnectionStateConnected:
w.lastKnownState = ice.ConnectionStateConnected
return
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected:
case ice.ConnectionStateFailed, ice.ConnectionStateDisconnected, ice.ConnectionStateClosed:
// ice.ConnectionStateClosed happens when we recreate the agent. For the P2P to TURN switch important to
// notify the conn.onICEStateDisconnected changes to update the current used priority
if w.lastKnownState == ice.ConnectionStateConnected {
w.lastKnownState = ice.ConnectionStateDisconnected
w.conn.onICEStateDisconnected()
@@ -400,13 +424,6 @@ func (w *WorkerICE) onConnectionStateChange(agent *icemaker.ThreadSafeAgent, dia
}
}
func (w *WorkerICE) onSuccessfulSelectedPairBindingResponse(pair *ice.CandidatePair) {
if err := w.statusRecorder.UpdateLatency(w.config.Key, pair.Latency()); err != nil {
w.log.Debugf("failed to update latency for peer: %s", err)
return
}
}
func (w *WorkerICE) shouldSendExtraSrflxCandidate(candidate ice.Candidate) bool {
if !w.sentExtraSrflx && candidate.Type() == ice.CandidateTypeServerReflexive && candidate.Port() != candidate.RelatedAddress().Port {
return true
@@ -425,7 +442,7 @@ func (w *WorkerICE) turnAgentDial(ctx context.Context, agent *icemaker.ThreadSaf
func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive, error) {
relatedAdd := candidate.RelatedAddress()
return ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{
ec, err := ice.NewCandidateServerReflexive(&ice.CandidateServerReflexiveConfig{
Network: candidate.NetworkType().String(),
Address: candidate.Address(),
Port: relatedAdd.Port,
@@ -433,6 +450,17 @@ func extraSrflxCandidate(candidate ice.Candidate) (*ice.CandidateServerReflexive
RelAddr: relatedAdd.Address,
RelPort: relatedAdd.Port,
})
if err != nil {
return nil, err
}
for _, e := range candidate.Extensions() {
if err := ec.AddExtension(e); err != nil {
return nil, err
}
}
return ec, nil
}
func candidateViaRoutes(candidate ice.Candidate, clientRoutes route.HAMap) bool {

View File

@@ -7,7 +7,7 @@ import (
"sync"
"time"
"github.com/pion/stun/v2"
"github.com/pion/stun/v3"
"github.com/pion/turn/v3"
log "github.com/sirupsen/logrus"

View File

@@ -2,11 +2,13 @@ package dnsinterceptor
import (
"context"
"errors"
"fmt"
"net/netip"
"runtime"
"strings"
"sync"
"time"
"github.com/hashicorp/go-multierror"
"github.com/miekg/dns"
@@ -26,6 +28,8 @@ import (
"github.com/netbirdio/netbird/route"
)
const dnsTimeout = 8 * time.Second
type domainMap map[domain.Domain][]netip.Prefix
type internalDNATer interface {
@@ -243,7 +247,7 @@ func (d *DnsInterceptor) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
return
}
client, err := nbdns.GetClientPrivate(d.wgInterface.Address().IP, d.wgInterface.Name(), nbdns.UpstreamTimeout)
client, err := nbdns.GetClientPrivate(d.wgInterface.Address().IP, d.wgInterface.Name(), dnsTimeout)
if err != nil {
d.writeDNSError(w, r, logger, fmt.Sprintf("create DNS client: %v", err))
return
@@ -254,9 +258,20 @@ func (d *DnsInterceptor) ServeDNS(w dns.ResponseWriter, r *dns.Msg) {
}
upstream := fmt.Sprintf("%s:%d", upstreamIP.String(), dnsfwd.ListenPort)
reply, _, err := nbdns.ExchangeWithFallback(context.TODO(), client, r, upstream)
ctx, cancel := context.WithTimeout(context.Background(), dnsTimeout)
defer cancel()
startTime := time.Now()
reply, _, err := nbdns.ExchangeWithFallback(ctx, client, r, upstream)
if err != nil {
logger.Errorf("failed to exchange DNS request with %s (%s) for domain=%s: %v", upstreamIP.String(), peerKey, r.Question[0].Name, err)
if errors.Is(err, context.DeadlineExceeded) {
elapsed := time.Since(startTime)
peerInfo := d.debugPeerTimeout(upstreamIP, peerKey)
logger.Errorf("peer DNS timeout after %v (timeout=%v) for domain=%s to peer %s (%s)%s - error: %v",
elapsed.Truncate(time.Millisecond), dnsTimeout, r.Question[0].Name, upstreamIP.String(), peerKey, peerInfo, err)
} else {
logger.Errorf("failed to exchange DNS request with %s (%s) for domain=%s: %v", upstreamIP.String(), peerKey, r.Question[0].Name, err)
}
if err := w.WriteMsg(&dns.Msg{MsgHdr: dns.MsgHdr{Rcode: dns.RcodeServerFailure, Id: r.Id}}); err != nil {
logger.Errorf("failed writing DNS response: %v", err)
}
@@ -568,3 +583,16 @@ func determinePrefixChanges(oldPrefixes, newPrefixes []netip.Prefix) (toAdd, toR
}
return
}
func (d *DnsInterceptor) debugPeerTimeout(peerIP netip.Addr, peerKey string) string {
if d.statusRecorder == nil {
return ""
}
peerState, err := d.statusRecorder.GetPeer(peerKey)
if err != nil {
return fmt.Sprintf(" (peer %s state error: %v)", peerKey[:8], err)
}
return fmt.Sprintf(" (peer %s)", nbdns.FormatPeerStatus(&peerState))
}

View File

@@ -6,6 +6,7 @@ import (
"net/netip"
"strings"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
"github.com/netbirdio/netbird/shared/management/proto"
@@ -180,6 +181,7 @@ func isDuplicated(addresses []NetworkAddress, addr NetworkAddress) bool {
// GetInfoWithChecks retrieves and parses the system information with applied checks.
func GetInfoWithChecks(ctx context.Context, checks []*proto.Checks) (*Info, error) {
log.Debugf("gathering system information with checks: %d", len(checks))
processCheckPaths := make([]string, 0)
for _, check := range checks {
processCheckPaths = append(processCheckPaths, check.GetFiles()...)
@@ -189,10 +191,12 @@ func GetInfoWithChecks(ctx context.Context, checks []*proto.Checks) (*Info, erro
if err != nil {
return nil, err
}
log.Debugf("gathering process check information completed")
info := GetInfo(ctx)
info.Files = files
log.Debugf("all system information gathered successfully")
return info, nil
}

16
go.mod
View File

@@ -12,7 +12,6 @@ require (
github.com/kardianos/service v1.2.3-0.20240613133416-becf2eb62b83
github.com/onsi/ginkgo v1.16.5
github.com/onsi/gomega v1.27.6
github.com/pion/ice/v3 v3.0.2
github.com/rs/cors v1.8.0
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.7.0
@@ -24,7 +23,7 @@ require (
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6
golang.zx2c4.com/wireguard/windows v0.5.3
google.golang.org/grpc v1.73.0
google.golang.org/protobuf v1.36.6
google.golang.org/protobuf v1.36.8
gopkg.in/natefinch/lumberjack.v2 v2.0.0
)
@@ -69,10 +68,12 @@ require (
github.com/oschwald/maxminddb-golang v1.12.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/petermattis/goid v0.0.0-20250303134427-723919f7f203
github.com/pion/logging v0.2.2
github.com/pion/ice/v4 v4.0.0-00010101000000-000000000000
github.com/pion/logging v0.2.4
github.com/pion/randutil v0.1.0
github.com/pion/stun/v2 v2.0.0
github.com/pion/transport/v3 v3.0.1
github.com/pion/stun/v3 v3.0.0
github.com/pion/transport/v3 v3.0.7
github.com/pion/turn/v3 v3.0.1
github.com/prometheus/client_golang v1.22.0
github.com/quic-go/quic-go v0.48.2
@@ -212,8 +213,10 @@ require (
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pion/dtls/v2 v2.2.10 // indirect
github.com/pion/mdns v0.0.12 // indirect
github.com/pion/dtls/v3 v3.0.7 // indirect
github.com/pion/mdns/v2 v2.0.7 // indirect
github.com/pion/transport/v2 v2.2.4 // indirect
github.com/pion/turn/v4 v4.1.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20240221224432-82ca36839d55 // indirect
@@ -229,6 +232,7 @@ require (
github.com/tklauser/numcpus v0.8.0 // indirect
github.com/vishvananda/netns v0.0.4 // indirect
github.com/vmihailenco/tagparser/v2 v2.0.0 // indirect
github.com/wlynxg/anet v0.0.3 // indirect
github.com/yuin/goldmark v1.7.1 // indirect
github.com/zeebo/blake3 v0.2.3 // indirect
go.opencensus.io v0.24.0 // indirect
@@ -257,6 +261,6 @@ replace golang.zx2c4.com/wireguard => github.com/netbirdio/wireguard-go v0.0.0-2
replace github.com/cloudflare/circl => github.com/cunicu/circl v0.0.0-20230801113412-fec58fc7b5f6
replace github.com/pion/ice/v3 => github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e
replace github.com/pion/ice/v4 => github.com/netbirdio/ice/v4 v4.0.0-20250827161942-426799a23107
replace github.com/libp2p/go-netroute => github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944

27
go.sum
View File

@@ -501,8 +501,8 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
github.com/neelance/sourcemap v0.0.0-20200213170602-2833bce08e4c/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944 h1:TDtJKmM6Sf8uYFx/dMeqNOL90KUoRscdfpFZ3Im89uk=
github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944/go.mod h1:sHA6TRxjQ6RLbnI+3R4DZo2Eseg/iKiPRfNmcuNySVQ=
github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e h1:PURA50S8u4mF6RrkYYCAvvPCixhqqEiEy3Ej6avh04c=
github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e/go.mod h1:YMLU7qbKfVjmEv7EoZPIVEI+kNYxWCdPK3VS0BU+U4Q=
github.com/netbirdio/ice/v4 v4.0.0-20250827161942-426799a23107 h1:ZJwhKexMlK15B/Ld+1T8VYE2Mt1lk1kf2DlXr46EHcw=
github.com/netbirdio/ice/v4 v4.0.0-20250827161942-426799a23107/go.mod h1:ZSIbPdBn5hePO8CpF1PekH2SfpTxg1PDhEwtbqZS7R8=
github.com/netbirdio/management-integrations/integrations v0.0.0-20250820151658-9ee1b34f4190 h1:/ZbExdcDwRq6XgTpTf5I1DPqnC3eInEf0fcmkqR8eSg=
github.com/netbirdio/management-integrations/integrations v0.0.0-20250820151658-9ee1b34f4190/go.mod h1:v0nUbbHbuQnqR7yKIYnKzsLBCswLtp2JctmKYmGgVhc=
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502 h1:3tHlFmhTdX9axERMVN63dqyFqnvuD+EMJHzM7mNGON8=
@@ -546,21 +546,29 @@ github.com/petermattis/goid v0.0.0-20250303134427-723919f7f203/go.mod h1:pxMtw7c
github.com/pion/dtls/v2 v2.2.7/go.mod h1:8WiMkebSHFD0T+dIU+UeBaoV7kDhOW5oDCzZ7WZ/F9s=
github.com/pion/dtls/v2 v2.2.10 h1:u2Axk+FyIR1VFTPurktB+1zoEPGIW3bmyj3LEFrXjAA=
github.com/pion/dtls/v2 v2.2.10/go.mod h1:d9SYc9fch0CqK90mRk1dC7AkzzpwJj6u2GU3u+9pqFE=
github.com/pion/logging v0.2.2 h1:M9+AIj/+pxNsDfAT64+MAVgJO0rsyLnoJKCqf//DoeY=
github.com/pion/dtls/v3 v3.0.7 h1:bItXtTYYhZwkPFk4t1n3Kkf5TDrfj6+4wG+CZR8uI9Q=
github.com/pion/dtls/v3 v3.0.7/go.mod h1:uDlH5VPrgOQIw59irKYkMudSFprY9IEFCqz/eTz16f8=
github.com/pion/logging v0.2.2/go.mod h1:k0/tDVsRCX2Mb2ZEmTqNa7CWsQPc+YYCB7Q+5pahoms=
github.com/pion/mdns v0.0.12 h1:CiMYlY+O0azojWDmxdNr7ADGrnZ+V6Ilfner+6mSVK8=
github.com/pion/mdns v0.0.12/go.mod h1:VExJjv8to/6Wqm1FXK+Ii/Z9tsVk/F5sD/N70cnYFbk=
github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8=
github.com/pion/logging v0.2.4/go.mod h1:DffhXTKYdNZU+KtJ5pyQDjvOAh/GsNSyv1lbkFbe3so=
github.com/pion/mdns/v2 v2.0.7 h1:c9kM8ewCgjslaAmicYMFQIde2H9/lrZpjBkN8VwoVtM=
github.com/pion/mdns/v2 v2.0.7/go.mod h1:vAdSYNAT0Jy3Ru0zl2YiW3Rm/fJCwIeM0nToenfOJKA=
github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA=
github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8=
github.com/pion/stun/v2 v2.0.0 h1:A5+wXKLAypxQri59+tmQKVs7+l6mMM+3d+eER9ifRU0=
github.com/pion/stun/v2 v2.0.0/go.mod h1:22qRSh08fSEttYUmJZGlriq9+03jtVmXNODgLccj8GQ=
github.com/pion/stun/v3 v3.0.0 h1:4h1gwhWLWuZWOJIJR9s2ferRO+W3zA/b6ijOI6mKzUw=
github.com/pion/stun/v3 v3.0.0/go.mod h1:HvCN8txt8mwi4FBvS3EmDghW6aQJ24T+y+1TKjB5jyU=
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v2 v2.2.4 h1:41JJK6DZQYSeVLxILA2+F4ZkKb4Xd/tFJZRFZQ9QAlo=
github.com/pion/transport/v2 v2.2.4/go.mod h1:q2U/tf9FEfnSBGSW6w5Qp5PFWRLRj3NjLhCCgpRK4p0=
github.com/pion/transport/v3 v3.0.1 h1:gDTlPJwROfSfz6QfSi0ZmeCSkFcnWWiiR9ES0ouANiM=
github.com/pion/transport/v3 v3.0.1/go.mod h1:UY7kiITrlMv7/IKgd5eTUcaahZx5oUN3l9SzK5f5xE0=
github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1o0=
github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo=
github.com/pion/turn/v3 v3.0.1 h1:wLi7BTQr6/Q20R0vt/lHbjv6y4GChFtC33nkYbasoT8=
github.com/pion/turn/v3 v3.0.1/go.mod h1:MrJDKgqryDyWy1/4NT9TWfXWGMC7UHT6pJIv1+gMeNE=
github.com/pion/turn/v4 v4.1.1 h1:9UnY2HB99tpDyz3cVVZguSxcqkJ1DsTSZ+8TGruh4fc=
github.com/pion/turn/v4 v4.1.1/go.mod h1:2123tHk1O++vmjI5VSD0awT50NywDAq5A2NNNU4Jjs8=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@@ -681,6 +689,8 @@ github.com/vmihailenco/msgpack/v5 v5.4.1 h1:cQriyiUvjTwOHg8QZaPihLWeRAAVoCpE00IU
github.com/vmihailenco/msgpack/v5 v5.4.1/go.mod h1:GaZTsDaehaPpQVyxrf5mtQlH+pc21PIudVV/E3rRQok=
github.com/vmihailenco/tagparser/v2 v2.0.0 h1:y09buUbR+b5aycVFQs/g70pqKVZNBmxwAhO7/IwNM9g=
github.com/vmihailenco/tagparser/v2 v2.0.0/go.mod h1:Wri+At7QHww0WTrCBeu4J6bNtoV6mEfg5OIWRZA9qds=
github.com/wlynxg/anet v0.0.3 h1:PvR53psxFXstc12jelG6f1Lv4MWqE0tI76/hHGjh9rg=
github.com/wlynxg/anet v0.0.3/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -1167,11 +1177,10 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.30.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY=
google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY=
google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc=
google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=

View File

@@ -111,3 +111,6 @@ Generate gRpc code:
#!/bin/bash
protoc -I proto/ proto/management.proto --go_out=. --go-grpc_out=.
```

View File

@@ -104,6 +104,8 @@ type DefaultAccountManager struct {
accountUpdateLocks sync.Map
updateAccountPeersBufferInterval atomic.Int64
loginFilter *loginFilter
disableDefaultPolicy bool
}
@@ -211,6 +213,7 @@ func BuildManager(
proxyController: proxyController,
settingsManager: settingsManager,
permissionsManager: permissionsManager,
loginFilter: newLoginFilter(),
disableDefaultPolicy: disableDefaultPolicy,
}
@@ -1133,7 +1136,18 @@ func (am *DefaultAccountManager) addNewPrivateAccount(ctx context.Context, domai
func (am *DefaultAccountManager) addNewUserToDomainAccount(ctx context.Context, domainAccountID string, userAuth nbcontext.UserAuth) (string, error) {
newUser := types.NewRegularUser(userAuth.UserId)
newUser.AccountID = domainAccountID
err := am.Store.SaveUser(ctx, newUser)
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, domainAccountID)
if err != nil {
return "", err
}
if settings != nil && settings.Extra != nil && settings.Extra.UserApprovalRequired {
newUser.Blocked = true
newUser.PendingApproval = true
}
err = am.Store.SaveUser(ctx, newUser)
if err != nil {
return "", err
}
@@ -1143,7 +1157,11 @@ func (am *DefaultAccountManager) addNewUserToDomainAccount(ctx context.Context,
return "", err
}
am.StoreEvent(ctx, userAuth.UserId, userAuth.UserId, domainAccountID, activity.UserJoined, nil)
if newUser.PendingApproval {
am.StoreEvent(ctx, userAuth.UserId, userAuth.UserId, domainAccountID, activity.UserJoined, map[string]any{"pending_approval": true})
} else {
am.StoreEvent(ctx, userAuth.UserId, userAuth.UserId, domainAccountID, activity.UserJoined, nil)
}
return domainAccountID, nil
}
@@ -1612,6 +1630,10 @@ func domainIsUpToDate(domain string, domainCategory string, userAuth nbcontext.U
return domainCategory == types.PrivateCategory || userAuth.DomainCategory != types.PrivateCategory || domain != userAuth.Domain
}
func (am *DefaultAccountManager) AllowSync(wgPubKey string, metahash uint64) bool {
return am.loginFilter.allowLogin(wgPubKey, metahash)
}
func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID string, peerPubKey string, meta nbpeer.PeerSystemMeta, realIP net.IP) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error) {
start := time.Now()
defer func() {
@@ -1628,6 +1650,9 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
}
metahash := metaHash(meta, realIP.String())
am.loginFilter.addLogin(peerPubKey, metahash)
return peer, netMap, postureChecks, nil
}
@@ -1636,7 +1661,6 @@ func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, account
if err != nil {
log.WithContext(ctx).Warnf("failed marking peer as disconnected %s %v", peerPubKey, err)
}
return nil
}
@@ -1786,6 +1810,9 @@ func newAccountWithId(ctx context.Context, accountID, userID, domain string, dis
PeerInactivityExpirationEnabled: false,
PeerInactivityExpiration: types.DefaultPeerInactivityExpiration,
RoutingPeerDNSResolutionEnabled: true,
Extra: &types.ExtraSettings{
UserApprovalRequired: true,
},
},
Onboarding: types.AccountOnboarding{
OnboardingFlowPending: true,
@@ -1892,6 +1919,9 @@ func (am *DefaultAccountManager) GetOrCreateAccountByPrivateDomain(ctx context.C
PeerInactivityExpirationEnabled: false,
PeerInactivityExpiration: types.DefaultPeerInactivityExpiration,
RoutingPeerDNSResolutionEnabled: true,
Extra: &types.ExtraSettings{
UserApprovalRequired: true,
},
},
}

View File

@@ -32,6 +32,8 @@ type Manager interface {
DeleteUser(ctx context.Context, accountID, initiatorUserID string, targetUserID string) error
DeleteRegularUsers(ctx context.Context, accountID, initiatorUserID string, targetUserIDs []string, userInfos map[string]*types.UserInfo) error
InviteUser(ctx context.Context, accountID string, initiatorUserID string, targetUserID string) error
ApproveUser(ctx context.Context, accountID, initiatorUserID, targetUserID string) (*types.UserInfo, error)
RejectUser(ctx context.Context, accountID, initiatorUserID, targetUserID string) error
ListSetupKeys(ctx context.Context, accountID, userID string) ([]*types.SetupKey, error)
SaveUser(ctx context.Context, accountID, initiatorUserID string, update *types.User) (*types.UserInfo, error)
SaveOrAddUser(ctx context.Context, accountID, initiatorUserID string, update *types.User, addIfNotExists bool) (*types.UserInfo, error)
@@ -123,4 +125,5 @@ type Manager interface {
UpdateToPrimaryAccount(ctx context.Context, accountId string) error
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
AllowSync(string, uint64) bool
}

View File

@@ -15,6 +15,7 @@ import (
"time"
"github.com/golang/mock/gomock"
"github.com/prometheus/client_golang/prometheus/push"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@@ -25,6 +26,7 @@ import (
"github.com/netbirdio/netbird/management/server/activity"
"github.com/netbirdio/netbird/management/server/cache"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
"github.com/netbirdio/netbird/management/server/idp"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types"
@@ -3046,19 +3048,14 @@ func BenchmarkSyncAndMarkPeer(b *testing.B) {
msPerOp := float64(duration.Nanoseconds()) / float64(b.N) / 1e6
b.ReportMetric(msPerOp, "ms/op")
minExpected := bc.minMsPerOpLocal
maxExpected := bc.maxMsPerOpLocal
if os.Getenv("CI") == "true" {
minExpected = bc.minMsPerOpCICD
maxExpected = bc.maxMsPerOpCICD
testing_tools.EvaluateBenchmarkResults(b, bc.name, time.Since(start), "sync", "syncAndMark")
}
if msPerOp < minExpected {
b.Fatalf("Benchmark %s failed: too fast (%.2f ms/op, minimum %.2f ms/op)", bc.name, msPerOp, minExpected)
}
if msPerOp > (maxExpected * 1.1) {
b.Fatalf("Benchmark %s failed: too slow (%.2f ms/op, maximum %.2f ms/op)", bc.name, msPerOp, maxExpected)
if msPerOp > maxExpected {
b.Logf("Benchmark %s: too slow (%.2f ms/op, max %.2f ms/op)", bc.name, msPerOp, maxExpected)
}
})
}
@@ -3121,19 +3118,14 @@ func BenchmarkLoginPeer_ExistingPeer(b *testing.B) {
msPerOp := float64(duration.Nanoseconds()) / float64(b.N) / 1e6
b.ReportMetric(msPerOp, "ms/op")
minExpected := bc.minMsPerOpLocal
maxExpected := bc.maxMsPerOpLocal
if os.Getenv("CI") == "true" {
minExpected = bc.minMsPerOpCICD
maxExpected = bc.maxMsPerOpCICD
testing_tools.EvaluateBenchmarkResults(b, bc.name, time.Since(start), "login", "existingPeer")
}
if msPerOp < minExpected {
b.Fatalf("Benchmark %s failed: too fast (%.2f ms/op, minimum %.2f ms/op)", bc.name, msPerOp, minExpected)
}
if msPerOp > (maxExpected * 1.1) {
b.Fatalf("Benchmark %s failed: too slow (%.2f ms/op, maximum %.2f ms/op)", bc.name, msPerOp, maxExpected)
if msPerOp > maxExpected {
b.Logf("Benchmark %s: too slow (%.2f ms/op, max %.2f ms/op)", bc.name, msPerOp, maxExpected)
}
})
}
@@ -3196,24 +3188,44 @@ func BenchmarkLoginPeer_NewPeer(b *testing.B) {
msPerOp := float64(duration.Nanoseconds()) / float64(b.N) / 1e6
b.ReportMetric(msPerOp, "ms/op")
minExpected := bc.minMsPerOpLocal
maxExpected := bc.maxMsPerOpLocal
if os.Getenv("CI") == "true" {
minExpected = bc.minMsPerOpCICD
maxExpected = bc.maxMsPerOpCICD
testing_tools.EvaluateBenchmarkResults(b, bc.name, time.Since(start), "login", "newPeer")
}
if msPerOp < minExpected {
b.Fatalf("Benchmark %s failed: too fast (%.2f ms/op, minimum %.2f ms/op)", bc.name, msPerOp, minExpected)
}
if msPerOp > (maxExpected * 1.1) {
b.Fatalf("Benchmark %s failed: too slow (%.2f ms/op, maximum %.2f ms/op)", bc.name, msPerOp, maxExpected)
if msPerOp > maxExpected {
b.Logf("Benchmark %s: too slow (%.2f ms/op, max %.2f ms/op)", bc.name, msPerOp, maxExpected)
}
})
}
}
func TestMain(m *testing.M) {
exitCode := m.Run()
if exitCode == 0 && os.Getenv("CI") == "true" {
runID := os.Getenv("GITHUB_RUN_ID")
storeEngine := os.Getenv("NETBIRD_STORE_ENGINE")
err := push.New("http://localhost:9091", "account_manager_benchmark").
Collector(testing_tools.BenchmarkDuration).
Grouping("ci_run", runID).
Grouping("store_engine", storeEngine).
Push()
if err != nil {
log.Printf("Failed to push metrics: %v", err)
} else {
time.Sleep(1 * time.Minute)
_ = push.New("http://localhost:9091", "account_manager_benchmark").
Grouping("ci_run", runID).
Grouping("store_engine", storeEngine).
Delete()
}
}
os.Exit(exitCode)
}
func Test_GetCreateAccountByPrivateDomain(t *testing.T) {
manager, err := createManager(t)
if err != nil {
@@ -3594,3 +3606,93 @@ func TestDefaultAccountManager_UpdatePeerIP(t *testing.T) {
require.Error(t, err, "should fail with invalid peer ID")
})
}
func TestAddNewUserToDomainAccountWithApproval(t *testing.T) {
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
}
// Create a domain-based account with user approval enabled
existingAccountID := "existing-account"
account := newAccountWithId(context.Background(), existingAccountID, "owner-user", "example.com", false)
account.Settings.Extra = &types.ExtraSettings{
UserApprovalRequired: true,
}
err = manager.Store.SaveAccount(context.Background(), account)
require.NoError(t, err)
// Set the account as domain primary account
account.IsDomainPrimaryAccount = true
account.DomainCategory = types.PrivateCategory
err = manager.Store.SaveAccount(context.Background(), account)
require.NoError(t, err)
// Test adding new user to existing account with approval required
newUserID := "new-user-id"
userAuth := nbcontext.UserAuth{
UserId: newUserID,
Domain: "example.com",
DomainCategory: types.PrivateCategory,
}
acc, err := manager.Store.GetAccount(context.Background(), existingAccountID)
require.NoError(t, err)
require.True(t, acc.IsDomainPrimaryAccount, "Account should be primary for the domain")
require.Equal(t, "example.com", acc.Domain, "Account domain should match")
returnedAccountID, err := manager.getAccountIDWithAuthorizationClaims(context.Background(), userAuth)
require.NoError(t, err)
require.Equal(t, existingAccountID, returnedAccountID)
// Verify user was created with pending approval
user, err := manager.Store.GetUserByUserID(context.Background(), store.LockingStrengthNone, newUserID)
require.NoError(t, err)
assert.True(t, user.Blocked, "User should be blocked when approval is required")
assert.True(t, user.PendingApproval, "User should be pending approval")
assert.Equal(t, existingAccountID, user.AccountID)
}
func TestAddNewUserToDomainAccountWithoutApproval(t *testing.T) {
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
}
// Create a domain-based account without user approval
ownerUserAuth := nbcontext.UserAuth{
UserId: "owner-user",
Domain: "example.com",
DomainCategory: types.PrivateCategory,
}
existingAccountID, err := manager.getAccountIDWithAuthorizationClaims(context.Background(), ownerUserAuth)
require.NoError(t, err)
// Modify the account to disable user approval
account, err := manager.Store.GetAccount(context.Background(), existingAccountID)
require.NoError(t, err)
account.Settings.Extra = &types.ExtraSettings{
UserApprovalRequired: false,
}
err = manager.Store.SaveAccount(context.Background(), account)
require.NoError(t, err)
// Test adding new user to existing account without approval required
newUserID := "new-user-id"
userAuth := nbcontext.UserAuth{
UserId: newUserID,
Domain: "example.com",
DomainCategory: types.PrivateCategory,
}
returnedAccountID, err := manager.getAccountIDWithAuthorizationClaims(context.Background(), userAuth)
require.NoError(t, err)
require.Equal(t, existingAccountID, returnedAccountID)
// Verify user was created without pending approval
user, err := manager.Store.GetUserByUserID(context.Background(), store.LockingStrengthNone, newUserID)
require.NoError(t, err)
assert.False(t, user.Blocked, "User should not be blocked when approval is not required")
assert.False(t, user.PendingApproval, "User should not be pending approval")
assert.Equal(t, existingAccountID, user.AccountID)
}

View File

@@ -177,6 +177,8 @@ const (
AccountNetworkRangeUpdated Activity = 87
PeerIPUpdated Activity = 88
UserApproved Activity = 89
UserRejected Activity = 90
AccountDeleted Activity = 99999
)
@@ -284,6 +286,8 @@ var activityMap = map[Activity]Code{
AccountNetworkRangeUpdated: {"Account network range updated", "account.network.range.update"},
PeerIPUpdated: {"Peer IP updated", "peer.ip.update"},
UserApproved: {"User approved", "user.approve"},
UserRejected: {"User rejected", "user.reject"},
}
// StringCode returns a string code of the activity

View File

@@ -202,35 +202,45 @@ func (am *DefaultAccountManager) CreateGroups(ctx context.Context, accountID, us
}
var eventsToStore []func()
var groupsToSave []*types.Group
var updateAccountPeers bool
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
groupIDs := make([]string, 0, len(groups))
for _, newGroup := range groups {
var globalErr error
groupIDs := make([]string, 0, len(groups))
for _, newGroup := range groups {
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
if err = validateNewGroup(ctx, transaction, accountID, newGroup); err != nil {
return err
}
newGroup.AccountID = accountID
groupsToSave = append(groupsToSave, newGroup)
if err = transaction.CreateGroup(ctx, newGroup); err != nil {
return err
}
err = transaction.IncrementNetworkSerial(ctx, accountID)
if err != nil {
return err
}
groupIDs = append(groupIDs, newGroup.ID)
events := am.prepareGroupEvents(ctx, transaction, accountID, userID, newGroup)
eventsToStore = append(eventsToStore, events...)
}
updateAccountPeers, err = areGroupChangesAffectPeers(ctx, transaction, accountID, groupIDs)
return nil
})
if err != nil {
return err
log.WithContext(ctx).Errorf("failed to update group %s: %v", newGroup.ID, err)
if len(groupIDs) == 1 {
return err
}
globalErr = errors.Join(globalErr, err)
// continue updating other groups
}
}
if err = transaction.CreateGroups(ctx, accountID, groupsToSave); err != nil {
return err
}
return transaction.IncrementNetworkSerial(ctx, accountID)
})
updateAccountPeers, err = areGroupChangesAffectPeers(ctx, am.Store, accountID, groupIDs)
if err != nil {
return err
}
@@ -243,7 +253,7 @@ func (am *DefaultAccountManager) CreateGroups(ctx context.Context, accountID, us
am.UpdateAccountPeers(ctx, accountID)
}
return nil
return globalErr
}
// UpdateGroups updates groups in the account.
@@ -260,35 +270,45 @@ func (am *DefaultAccountManager) UpdateGroups(ctx context.Context, accountID, us
}
var eventsToStore []func()
var groupsToSave []*types.Group
var updateAccountPeers bool
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
groupIDs := make([]string, 0, len(groups))
for _, newGroup := range groups {
var globalErr error
groupIDs := make([]string, 0, len(groups))
for _, newGroup := range groups {
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
if err = validateNewGroup(ctx, transaction, accountID, newGroup); err != nil {
return err
}
newGroup.AccountID = accountID
groupsToSave = append(groupsToSave, newGroup)
groupIDs = append(groupIDs, newGroup.ID)
if err = transaction.UpdateGroup(ctx, newGroup); err != nil {
return err
}
err = transaction.IncrementNetworkSerial(ctx, accountID)
if err != nil {
return err
}
events := am.prepareGroupEvents(ctx, transaction, accountID, userID, newGroup)
eventsToStore = append(eventsToStore, events...)
}
updateAccountPeers, err = areGroupChangesAffectPeers(ctx, transaction, accountID, groupIDs)
groupIDs = append(groupIDs, newGroup.ID)
return nil
})
if err != nil {
return err
log.WithContext(ctx).Errorf("failed to update group %s: %v", newGroup.ID, err)
if len(groups) == 1 {
return err
}
globalErr = errors.Join(globalErr, err)
// continue updating other groups
}
}
if err = transaction.UpdateGroups(ctx, accountID, groupsToSave); err != nil {
return err
}
return transaction.IncrementNetworkSerial(ctx, accountID)
})
updateAccountPeers, err = areGroupChangesAffectPeers(ctx, am.Store, accountID, groupIDs)
if err != nil {
return err
}
@@ -301,7 +321,7 @@ func (am *DefaultAccountManager) UpdateGroups(ctx context.Context, accountID, us
am.UpdateAccountPeers(ctx, accountID)
}
return nil
return globalErr
}
// prepareGroupEvents prepares a list of event functions to be stored.
@@ -584,13 +604,6 @@ func validateNewGroup(ctx context.Context, transaction store.Store, accountID st
newGroup.ID = xid.New().String()
}
for _, peerID := range newGroup.Peers {
_, err := transaction.GetPeerByID(ctx, store.LockingStrengthNone, accountID, peerID)
if err != nil {
return status.Errorf(status.InvalidArgument, "peer with ID \"%s\" not found", peerID)
}
}
return nil
}

View File

@@ -2,9 +2,11 @@ package server
import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"os"
"strings"
"sync"
"time"
@@ -38,20 +40,28 @@ import (
internalStatus "github.com/netbirdio/netbird/shared/management/status"
)
const (
envLogBlockedPeers = "NB_LOG_BLOCKED_PEERS"
envBlockPeers = "NB_BLOCK_SAME_PEERS"
)
// GRPCServer an instance of a Management gRPC API server
type GRPCServer struct {
accountManager account.Manager
settingsManager settings.Manager
wgKey wgtypes.Key
proto.UnimplementedManagementServiceServer
peersUpdateManager *PeersUpdateManager
config *nbconfig.Config
secretsManager SecretsManager
appMetrics telemetry.AppMetrics
ephemeralManager *EphemeralManager
peerLocks sync.Map
authManager auth.Manager
integratedPeerValidator integrated_validator.IntegratedValidator
peersUpdateManager *PeersUpdateManager
config *nbconfig.Config
secretsManager SecretsManager
appMetrics telemetry.AppMetrics
ephemeralManager *EphemeralManager
peerLocks sync.Map
authManager auth.Manager
logBlockedPeers bool
blockPeersWithSameConfig bool
integratedPeerValidator integrated_validator.IntegratedValidator
}
// NewServer creates a new Management server
@@ -82,18 +92,23 @@ func NewServer(
}
}
logBlockedPeers := strings.ToLower(os.Getenv(envLogBlockedPeers)) == "true"
blockPeersWithSameConfig := strings.ToLower(os.Getenv(envBlockPeers)) == "true"
return &GRPCServer{
wgKey: key,
// peerKey -> event channel
peersUpdateManager: peersUpdateManager,
accountManager: accountManager,
settingsManager: settingsManager,
config: config,
secretsManager: secretsManager,
authManager: authManager,
appMetrics: appMetrics,
ephemeralManager: ephemeralManager,
integratedPeerValidator: integratedPeerValidator,
peersUpdateManager: peersUpdateManager,
accountManager: accountManager,
settingsManager: settingsManager,
config: config,
secretsManager: secretsManager,
authManager: authManager,
appMetrics: appMetrics,
ephemeralManager: ephemeralManager,
logBlockedPeers: logBlockedPeers,
blockPeersWithSameConfig: blockPeersWithSameConfig,
integratedPeerValidator: integratedPeerValidator,
}, nil
}
@@ -136,9 +151,6 @@ func getRealIP(ctx context.Context) net.IP {
// notifies the connected peer of any updates (e.g. new peers under the same account)
func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
reqStart := time.Now()
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequest()
}
ctx := srv.Context()
@@ -147,6 +159,25 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
if err != nil {
return err
}
realIP := getRealIP(ctx)
sRealIP := realIP.String()
peerMeta := extractPeerMeta(ctx, syncReq.GetMeta())
metahashed := metaHash(peerMeta, sRealIP)
if !s.accountManager.AllowSync(peerKey.String(), metahashed) {
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequestBlocked()
}
if s.logBlockedPeers {
log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from syncing", peerKey.String(), metahashed)
}
if s.blockPeersWithSameConfig {
return mapError(ctx, internalStatus.ErrPeerAlreadyLoggedIn)
}
}
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountSyncRequest()
}
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
@@ -172,14 +203,13 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
// nolint:staticcheck
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
realIP := getRealIP(ctx)
log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, realIP.String())
log.WithContext(ctx).Debugf("Sync request from peer [%s] [%s]", req.WgPubKey, sRealIP)
if syncReq.GetMeta() == nil {
log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP)
}
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP)
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), peerMeta, realIP)
if err != nil {
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
return mapError(ctx, err)
@@ -345,6 +375,9 @@ func mapError(ctx context.Context, err error) error {
default:
}
}
if errors.Is(err, internalStatus.ErrPeerAlreadyLoggedIn) {
return status.Error(codes.PermissionDenied, internalStatus.ErrPeerAlreadyLoggedIn.Error())
}
log.WithContext(ctx).Errorf("got an unhandled error: %s", err)
return status.Errorf(codes.Internal, "failed handling request")
}
@@ -436,12 +469,9 @@ func (s *GRPCServer) parseRequest(ctx context.Context, req *proto.EncryptedMessa
// In case of the successful registration login is also successful
func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
reqStart := time.Now()
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
realIP := getRealIP(ctx)
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, realIP.String())
sRealIP := realIP.String()
log.WithContext(ctx).Debugf("Login request from peer [%s] [%s]", req.WgPubKey, sRealIP)
loginReq := &proto.LoginRequest{}
peerKey, err := s.parseRequest(ctx, req, loginReq)
@@ -449,6 +479,24 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
return nil, err
}
peerMeta := extractPeerMeta(ctx, loginReq.GetMeta())
metahashed := metaHash(peerMeta, sRealIP)
if !s.accountManager.AllowSync(peerKey.String(), metahashed) {
if s.logBlockedPeers {
log.WithContext(ctx).Warnf("peer %s with meta hash %d is blocked from login", peerKey.String(), metahashed)
}
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequestBlocked()
}
if s.blockPeersWithSameConfig {
return nil, internalStatus.ErrPeerAlreadyLoggedIn
}
}
if s.appMetrics != nil {
s.appMetrics.GRPCMetrics().CountLoginRequest()
}
//nolint
ctx = context.WithValue(ctx, nbContext.PeerIDKey, peerKey.String())
accountID, err := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String())
@@ -485,7 +533,7 @@ func (s *GRPCServer) Login(ctx context.Context, req *proto.EncryptedMessage) (*p
peer, netMap, postureChecks, err := s.accountManager.LoginPeer(ctx, types.PeerLogin{
WireGuardPubKey: peerKey.String(),
SSHKey: string(sshKey),
Meta: extractPeerMeta(ctx, loginReq.GetMeta()),
Meta: peerMeta,
UserID: userID,
SetupKey: loginReq.GetSetupKey(),
ConnectionIP: realIP,
@@ -951,8 +999,6 @@ func (s *GRPCServer) Logout(ctx context.Context, req *proto.EncryptedMessage) (*
return nil, mapError(ctx, err)
}
s.accountManager.BufferUpdateAccountPeers(ctx, peer.AccountID)
log.WithContext(ctx).Debugf("peer %s logged out successfully after %s", peerKey.String(), time.Since(start))
return &proto.Empty{}, nil

View File

@@ -11,11 +11,11 @@ import (
"github.com/netbirdio/netbird/management/server/account"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/http/util"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/netbirdio/netbird/management/server/types"
)
const (
@@ -198,6 +198,7 @@ func (h *handler) updateAccount(w http.ResponseWriter, r *http.Request) {
if req.Settings.Extra != nil {
settings.Extra = &types.ExtraSettings{
PeerApprovalEnabled: req.Settings.Extra.PeerApprovalEnabled,
UserApprovalRequired: req.Settings.Extra.UserApprovalRequired,
FlowEnabled: req.Settings.Extra.NetworkTrafficLogsEnabled,
FlowGroups: req.Settings.Extra.NetworkTrafficLogsGroups,
FlowPacketCounterEnabled: req.Settings.Extra.NetworkTrafficPacketCounterEnabled,
@@ -327,6 +328,7 @@ func toAccountResponse(accountID string, settings *types.Settings, meta *types.A
if settings.Extra != nil {
apiSettings.Extra = &api.AccountExtraSettings{
PeerApprovalEnabled: settings.Extra.PeerApprovalEnabled,
UserApprovalRequired: settings.Extra.UserApprovalRequired,
NetworkTrafficLogsEnabled: settings.Extra.FlowEnabled,
NetworkTrafficLogsGroups: settings.Extra.FlowGroups,
NetworkTrafficPacketCounterEnabled: settings.Extra.FlowPacketCounterEnabled,

View File

@@ -15,11 +15,11 @@ import (
"github.com/stretchr/testify/assert"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/management/server/mock_server"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/status"
)
func initAccountsTestData(t *testing.T, account *types.Account) *handler {

View File

@@ -14,11 +14,11 @@ import (
"github.com/netbirdio/netbird/management/server/activity"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/management/server/groups"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/http/util"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/netbirdio/netbird/management/server/types"
)
// Handler is a handler that returns peers of the account
@@ -354,7 +354,7 @@ func toSinglePeerResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dnsD
}
return &api.Peer{
CreatedAt: peer.CreatedAt,
CreatedAt: peer.CreatedAt,
Id: peer.ID,
Name: peer.Name,
Ip: peer.IP.String(),
@@ -391,33 +391,33 @@ func toPeerListItemResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dn
}
return &api.PeerBatch{
CreatedAt: peer.CreatedAt,
Id: peer.ID,
Name: peer.Name,
Ip: peer.IP.String(),
ConnectionIp: peer.Location.ConnectionIP.String(),
Connected: peer.Status.Connected,
LastSeen: peer.Status.LastSeen,
Os: fmt.Sprintf("%s %s", peer.Meta.OS, osVersion),
KernelVersion: peer.Meta.KernelVersion,
GeonameId: int(peer.Location.GeoNameID),
Version: peer.Meta.WtVersion,
Groups: groupsInfo,
SshEnabled: peer.SSHEnabled,
Hostname: peer.Meta.Hostname,
UserId: peer.UserID,
UiVersion: peer.Meta.UIVersion,
DnsLabel: fqdn(peer, dnsDomain),
ExtraDnsLabels: fqdnList(peer.ExtraDNSLabels, dnsDomain),
LoginExpirationEnabled: peer.LoginExpirationEnabled,
LastLogin: peer.GetLastLogin(),
LoginExpired: peer.Status.LoginExpired,
AccessiblePeersCount: accessiblePeersCount,
CountryCode: peer.Location.CountryCode,
CityName: peer.Location.CityName,
SerialNumber: peer.Meta.SystemSerialNumber,
CreatedAt: peer.CreatedAt,
Id: peer.ID,
Name: peer.Name,
Ip: peer.IP.String(),
ConnectionIp: peer.Location.ConnectionIP.String(),
Connected: peer.Status.Connected,
LastSeen: peer.Status.LastSeen,
Os: fmt.Sprintf("%s %s", peer.Meta.OS, osVersion),
KernelVersion: peer.Meta.KernelVersion,
GeonameId: int(peer.Location.GeoNameID),
Version: peer.Meta.WtVersion,
Groups: groupsInfo,
SshEnabled: peer.SSHEnabled,
Hostname: peer.Meta.Hostname,
UserId: peer.UserID,
UiVersion: peer.Meta.UIVersion,
DnsLabel: fqdn(peer, dnsDomain),
ExtraDnsLabels: fqdnList(peer.ExtraDNSLabels, dnsDomain),
LoginExpirationEnabled: peer.LoginExpirationEnabled,
LastLogin: peer.GetLastLogin(),
LoginExpired: peer.Status.LoginExpired,
AccessiblePeersCount: accessiblePeersCount,
CountryCode: peer.Location.CountryCode,
CityName: peer.Location.CityName,
SerialNumber: peer.Meta.SystemSerialNumber,
InactivityExpirationEnabled: peer.InactivityExpirationEnabled,
Ephemeral: peer.Ephemeral,
}
}

View File

@@ -9,11 +9,11 @@ import (
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/management/server/account"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/management/server/users"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/http/util"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/management/server/users"
nbcontext "github.com/netbirdio/netbird/management/server/context"
)
@@ -31,6 +31,8 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) {
router.HandleFunc("/users/{userId}", userHandler.deleteUser).Methods("DELETE", "OPTIONS")
router.HandleFunc("/users", userHandler.createUser).Methods("POST", "OPTIONS")
router.HandleFunc("/users/{userId}/invite", userHandler.inviteUser).Methods("POST", "OPTIONS")
router.HandleFunc("/users/{userId}/approve", userHandler.approveUser).Methods("POST", "OPTIONS")
router.HandleFunc("/users/{userId}/reject", userHandler.rejectUser).Methods("DELETE", "OPTIONS")
addUsersTokensEndpoint(accountManager, router)
}
@@ -323,17 +325,76 @@ func toUserResponse(user *types.UserInfo, currenUserID string) *api.User {
}
isCurrent := user.ID == currenUserID
return &api.User{
Id: user.ID,
Name: user.Name,
Email: user.Email,
Role: user.Role,
AutoGroups: autoGroups,
Status: userStatus,
IsCurrent: &isCurrent,
IsServiceUser: &user.IsServiceUser,
IsBlocked: user.IsBlocked,
LastLogin: &user.LastLogin,
Issued: &user.Issued,
Id: user.ID,
Name: user.Name,
Email: user.Email,
Role: user.Role,
AutoGroups: autoGroups,
Status: userStatus,
IsCurrent: &isCurrent,
IsServiceUser: &user.IsServiceUser,
IsBlocked: user.IsBlocked,
LastLogin: &user.LastLogin,
Issued: &user.Issued,
PendingApproval: user.PendingApproval,
}
}
// approveUser is a POST request to approve a user that is pending approval
func (h *handler) approveUser(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
util.WriteErrorResponse("wrong HTTP method", http.StatusMethodNotAllowed, w)
return
}
vars := mux.Vars(r)
targetUserID := vars["userId"]
if len(targetUserID) == 0 {
util.WriteErrorResponse("invalid user ID", http.StatusBadRequest, w)
return
}
userAuth, err := nbcontext.GetUserAuthFromContext(r.Context())
if err != nil {
util.WriteError(r.Context(), err, w)
return
}
user, err := h.accountManager.ApproveUser(r.Context(), userAuth.AccountId, userAuth.UserId, targetUserID)
if err != nil {
util.WriteError(r.Context(), err, w)
return
}
userResponse := toUserResponse(user, userAuth.UserId)
util.WriteJSONObject(r.Context(), w, userResponse)
}
// rejectUser is a DELETE request to reject a user that is pending approval
func (h *handler) rejectUser(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete {
util.WriteErrorResponse("wrong HTTP method", http.StatusMethodNotAllowed, w)
return
}
vars := mux.Vars(r)
targetUserID := vars["userId"]
if len(targetUserID) == 0 {
util.WriteErrorResponse("invalid user ID", http.StatusBadRequest, w)
return
}
userAuth, err := nbcontext.GetUserAuthFromContext(r.Context())
if err != nil {
util.WriteError(r.Context(), err, w)
return
}
err = h.accountManager.RejectUser(r.Context(), userAuth.AccountId, userAuth.UserId, targetUserID)
if err != nil {
util.WriteError(r.Context(), err, w)
return
}
util.WriteJSONObject(r.Context(), w, util.EmptyObject{})
}

View File

@@ -16,13 +16,13 @@ import (
"github.com/stretchr/testify/require"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/management/server/mock_server"
"github.com/netbirdio/netbird/management/server/permissions/modules"
"github.com/netbirdio/netbird/management/server/permissions/roles"
"github.com/netbirdio/netbird/shared/management/status"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/management/server/users"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/status"
)
const (
@@ -725,3 +725,133 @@ func stringifyPermissionsKeys(permissions roles.Permissions) map[string]map[stri
}
return modules
}
func TestApproveUserEndpoint(t *testing.T) {
adminUser := &types.User{
Id: "admin-user",
Role: types.UserRoleAdmin,
AccountID: existingAccountID,
AutoGroups: []string{},
}
pendingUser := &types.User{
Id: "pending-user",
Role: types.UserRoleUser,
AccountID: existingAccountID,
Blocked: true,
PendingApproval: true,
AutoGroups: []string{},
}
tt := []struct {
name string
expectedStatus int
expectedBody bool
requestingUser *types.User
}{
{
name: "approve user as admin should return 200",
expectedStatus: 200,
expectedBody: true,
requestingUser: adminUser,
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
am := &mock_server.MockAccountManager{}
am.ApproveUserFunc = func(ctx context.Context, accountID, initiatorUserID, targetUserID string) (*types.UserInfo, error) {
approvedUserInfo := &types.UserInfo{
ID: pendingUser.Id,
Email: "pending@example.com",
Name: "Pending User",
Role: string(pendingUser.Role),
AutoGroups: []string{},
IsServiceUser: false,
IsBlocked: false,
PendingApproval: false,
LastLogin: time.Now(),
Issued: types.UserIssuedAPI,
}
return approvedUserInfo, nil
}
handler := newHandler(am)
router := mux.NewRouter()
router.HandleFunc("/users/{userId}/approve", handler.approveUser).Methods("POST")
req, err := http.NewRequest("POST", "/users/pending-user/approve", nil)
require.NoError(t, err)
userAuth := nbcontext.UserAuth{
AccountId: existingAccountID,
UserId: tc.requestingUser.Id,
}
ctx := nbcontext.SetUserAuthInContext(req.Context(), userAuth)
req = req.WithContext(ctx)
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
assert.Equal(t, tc.expectedStatus, rr.Code)
if tc.expectedBody {
var response api.User
err = json.Unmarshal(rr.Body.Bytes(), &response)
require.NoError(t, err)
assert.Equal(t, "pending-user", response.Id)
assert.False(t, response.IsBlocked)
assert.False(t, response.PendingApproval)
}
})
}
}
func TestRejectUserEndpoint(t *testing.T) {
adminUser := &types.User{
Id: "admin-user",
Role: types.UserRoleAdmin,
AccountID: existingAccountID,
AutoGroups: []string{},
}
tt := []struct {
name string
expectedStatus int
requestingUser *types.User
}{
{
name: "reject user as admin should return 200",
expectedStatus: 200,
requestingUser: adminUser,
},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
am := &mock_server.MockAccountManager{}
am.RejectUserFunc = func(ctx context.Context, accountID, initiatorUserID, targetUserID string) error {
return nil
}
handler := newHandler(am)
router := mux.NewRouter()
router.HandleFunc("/users/{userId}/reject", handler.rejectUser).Methods("DELETE")
req, err := http.NewRequest("DELETE", "/users/pending-user/reject", nil)
require.NoError(t, err)
userAuth := nbcontext.UserAuth{
AccountId: existingAccountID,
UserId: tc.requestingUser.Id,
}
ctx := nbcontext.SetUserAuthInContext(req.Context(), userAuth)
req = req.WithContext(ctx)
rr := httptest.NewRecorder()
router.ServeHTTP(rr, req)
assert.Equal(t, tc.expectedStatus, rr.Code)
})
}
}

View File

@@ -17,8 +17,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/netbirdio/netbird/management/server"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools/channel"
"github.com/netbirdio/netbird/shared/management/http/api"
)
const modulePeers = "peers"
@@ -47,7 +48,7 @@ func BenchmarkUpdatePeer(b *testing.B) {
for name, bc := range benchCasesPeers {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/peers.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/peers.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
b.ResetTimer()
@@ -65,7 +66,7 @@ func BenchmarkUpdatePeer(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, modulePeers, testing_tools.OperationUpdate)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, modulePeers, testing_tools.OperationUpdate)
})
}
}
@@ -82,7 +83,7 @@ func BenchmarkGetOnePeer(b *testing.B) {
for name, bc := range benchCasesPeers {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/peers.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/peers.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
b.ResetTimer()
@@ -92,7 +93,7 @@ func BenchmarkGetOnePeer(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, modulePeers, testing_tools.OperationGetOne)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, modulePeers, testing_tools.OperationGetOne)
})
}
}
@@ -109,7 +110,7 @@ func BenchmarkGetAllPeers(b *testing.B) {
for name, bc := range benchCasesPeers {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/peers.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/peers.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
b.ResetTimer()
@@ -119,7 +120,7 @@ func BenchmarkGetAllPeers(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, modulePeers, testing_tools.OperationGetAll)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, modulePeers, testing_tools.OperationGetAll)
})
}
}
@@ -136,7 +137,7 @@ func BenchmarkDeletePeer(b *testing.B) {
for name, bc := range benchCasesPeers {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/peers.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/peers.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), 1000, bc.Groups, bc.Users, bc.SetupKeys)
b.ResetTimer()
@@ -146,7 +147,7 @@ func BenchmarkDeletePeer(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, modulePeers, testing_tools.OperationDelete)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, modulePeers, testing_tools.OperationDelete)
})
}
}

View File

@@ -17,8 +17,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/netbirdio/netbird/management/server"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools/channel"
"github.com/netbirdio/netbird/shared/management/http/api"
)
// Map to store peers, groups, users, and setupKeys by name
@@ -47,7 +48,7 @@ func BenchmarkCreateSetupKey(b *testing.B) {
for name, bc := range benchCasesSetupKeys {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
b.ResetTimer()
@@ -69,7 +70,7 @@ func BenchmarkCreateSetupKey(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationCreate)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationCreate)
})
}
}
@@ -86,7 +87,7 @@ func BenchmarkUpdateSetupKey(b *testing.B) {
for name, bc := range benchCasesSetupKeys {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
b.ResetTimer()
@@ -109,7 +110,7 @@ func BenchmarkUpdateSetupKey(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationUpdate)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationUpdate)
})
}
}
@@ -126,7 +127,7 @@ func BenchmarkGetOneSetupKey(b *testing.B) {
for name, bc := range benchCasesSetupKeys {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
b.ResetTimer()
@@ -136,7 +137,7 @@ func BenchmarkGetOneSetupKey(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationGetOne)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationGetOne)
})
}
}
@@ -153,7 +154,7 @@ func BenchmarkGetAllSetupKeys(b *testing.B) {
for name, bc := range benchCasesSetupKeys {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
b.ResetTimer()
@@ -163,7 +164,7 @@ func BenchmarkGetAllSetupKeys(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationGetAll)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationGetAll)
})
}
}
@@ -180,7 +181,7 @@ func BenchmarkDeleteSetupKey(b *testing.B) {
for name, bc := range benchCasesSetupKeys {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/setup_keys.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, 1000)
b.ResetTimer()
@@ -190,7 +191,7 @@ func BenchmarkDeleteSetupKey(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationDelete)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, moduleSetupKeys, testing_tools.OperationDelete)
})
}
}

View File

@@ -18,8 +18,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/netbirdio/netbird/management/server"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools/channel"
"github.com/netbirdio/netbird/shared/management/http/api"
)
const moduleUsers = "users"
@@ -46,7 +47,7 @@ func BenchmarkUpdateUser(b *testing.B) {
for name, bc := range benchCasesUsers {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/users.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/users.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
recorder := httptest.NewRecorder()
@@ -71,7 +72,7 @@ func BenchmarkUpdateUser(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, moduleUsers, testing_tools.OperationUpdate)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, moduleUsers, testing_tools.OperationUpdate)
})
}
}
@@ -84,18 +85,18 @@ func BenchmarkGetOneUser(b *testing.B) {
for name, bc := range benchCasesUsers {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/users.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/users.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
recorder := httptest.NewRecorder()
b.ResetTimer()
start := time.Now()
req := testing_tools.BuildRequest(b, nil, http.MethodGet, "/api/users/"+testing_tools.TestUserId, testing_tools.TestAdminId)
for i := 0; i < b.N; i++ {
req := testing_tools.BuildRequest(b, nil, http.MethodGet, "/api/users/"+testing_tools.TestUserId, testing_tools.TestAdminId)
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, moduleUsers, testing_tools.OperationGetOne)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, moduleUsers, testing_tools.OperationGetOne)
})
}
}
@@ -110,18 +111,18 @@ func BenchmarkGetAllUsers(b *testing.B) {
for name, bc := range benchCasesUsers {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/users.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/users.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, bc.Users, bc.SetupKeys)
recorder := httptest.NewRecorder()
b.ResetTimer()
start := time.Now()
req := testing_tools.BuildRequest(b, nil, http.MethodGet, "/api/users", testing_tools.TestAdminId)
for i := 0; i < b.N; i++ {
req := testing_tools.BuildRequest(b, nil, http.MethodGet, "/api/users", testing_tools.TestAdminId)
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, moduleUsers, testing_tools.OperationGetAll)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, moduleUsers, testing_tools.OperationGetAll)
})
}
}
@@ -136,7 +137,7 @@ func BenchmarkDeleteUsers(b *testing.B) {
for name, bc := range benchCasesUsers {
b.Run(name, func(b *testing.B) {
apiHandler, am, _ := testing_tools.BuildApiBlackBoxWithDBState(b, "../testdata/users.sql", nil, false)
apiHandler, am, _ := channel.BuildApiBlackBoxWithDBState(b, "../testdata/users.sql", nil, false)
testing_tools.PopulateTestData(b, am.(*server.DefaultAccountManager), bc.Peers, bc.Groups, 1000, bc.SetupKeys)
recorder := httptest.NewRecorder()
@@ -147,7 +148,7 @@ func BenchmarkDeleteUsers(b *testing.B) {
apiHandler.ServeHTTP(recorder, req)
}
testing_tools.EvaluateBenchmarkResults(b, name, time.Since(start), recorder, moduleUsers, testing_tools.OperationDelete)
testing_tools.EvaluateAPIBenchmarkResults(b, name, time.Since(start), recorder, moduleUsers, testing_tools.OperationDelete)
})
}
}

View File

@@ -15,9 +15,10 @@ import (
"github.com/stretchr/testify/assert"
"github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/management/server/http/handlers/setup_keys"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools/channel"
"github.com/netbirdio/netbird/shared/management/http/api"
)
func Test_SetupKeys_Create(t *testing.T) {
@@ -287,7 +288,7 @@ func Test_SetupKeys_Create(t *testing.T) {
for _, tc := range tt {
for _, user := range users {
t.Run(user.name+" - "+tc.name, func(t *testing.T) {
apiHandler, am, done := testing_tools.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
apiHandler, am, done := channel.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
body, err := json.Marshal(tc.requestBody)
if err != nil {
@@ -572,7 +573,7 @@ func Test_SetupKeys_Update(t *testing.T) {
for _, tc := range tt {
for _, user := range users {
t.Run(tc.name, func(t *testing.T) {
apiHandler, am, done := testing_tools.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
apiHandler, am, done := channel.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
body, err := json.Marshal(tc.requestBody)
if err != nil {
@@ -751,7 +752,7 @@ func Test_SetupKeys_Get(t *testing.T) {
for _, tc := range tt {
for _, user := range users {
t.Run(tc.name, func(t *testing.T) {
apiHandler, am, done := testing_tools.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
apiHandler, am, done := channel.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
req := testing_tools.BuildRequest(t, []byte{}, tc.requestType, strings.Replace(tc.requestPath, "{id}", tc.requestId, 1), user.userId)
@@ -903,7 +904,7 @@ func Test_SetupKeys_GetAll(t *testing.T) {
for _, tc := range tt {
for _, user := range users {
t.Run(tc.name, func(t *testing.T) {
apiHandler, am, done := testing_tools.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
apiHandler, am, done := channel.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
req := testing_tools.BuildRequest(t, []byte{}, tc.requestType, tc.requestPath, user.userId)
@@ -1087,7 +1088,7 @@ func Test_SetupKeys_Delete(t *testing.T) {
for _, tc := range tt {
for _, user := range users {
t.Run(tc.name, func(t *testing.T) {
apiHandler, am, done := testing_tools.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
apiHandler, am, done := channel.BuildApiBlackBoxWithDBState(t, "../testdata/setup_keys.sql", nil, true)
req := testing_tools.BuildRequest(t, []byte{}, tc.requestType, strings.Replace(tc.requestPath, "{id}", tc.requestId, 1), user.userId)

View File

@@ -0,0 +1,137 @@
package channel
import (
"context"
"errors"
"net/http"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/netbirdio/management-integrations/integrations"
"github.com/stretchr/testify/assert"
"github.com/netbirdio/netbird/management/server"
"github.com/netbirdio/netbird/management/server/account"
"github.com/netbirdio/netbird/management/server/activity"
"github.com/netbirdio/netbird/management/server/auth"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/management/server/geolocation"
"github.com/netbirdio/netbird/management/server/groups"
http2 "github.com/netbirdio/netbird/management/server/http"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
"github.com/netbirdio/netbird/management/server/networks"
"github.com/netbirdio/netbird/management/server/networks/resources"
"github.com/netbirdio/netbird/management/server/networks/routers"
"github.com/netbirdio/netbird/management/server/peers"
"github.com/netbirdio/netbird/management/server/permissions"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/users"
)
func BuildApiBlackBoxWithDBState(t testing_tools.TB, sqlFile string, expectedPeerUpdate *server.UpdateMessage, validateUpdate bool) (http.Handler, account.Manager, chan struct{}) {
store, cleanup, err := store.NewTestStoreFromSQL(context.Background(), sqlFile, t.TempDir())
if err != nil {
t.Fatalf("Failed to create test store: %v", err)
}
t.Cleanup(cleanup)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("Failed to create metrics: %v", err)
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
updMsg := peersUpdateManager.CreateChannel(context.Background(), testing_tools.TestPeerId)
done := make(chan struct{})
if validateUpdate {
go func() {
if expectedPeerUpdate != nil {
peerShouldReceiveUpdate(t, updMsg, expectedPeerUpdate)
} else {
peerShouldNotReceiveUpdate(t, updMsg)
}
close(done)
}()
}
geoMock := &geolocation.Mock{}
validatorMock := server.MockIntegratedValidator{}
proxyController := integrations.NewController(store)
userManager := users.NewManager(store)
permissionsManager := permissions.NewManager(store)
settingsManager := settings.NewManager(store, userManager, integrations.NewManager(&activity.InMemoryEventStore{}), permissionsManager)
am, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false)
if err != nil {
t.Fatalf("Failed to create manager: %v", err)
}
// @note this is required so that PAT's validate from store, but JWT's are mocked
authManager := auth.NewManager(store, "", "", "", "", []string{}, false)
authManagerMock := &auth.MockManager{
ValidateAndParseTokenFunc: mockValidateAndParseToken,
EnsureUserAccessByJWTGroupsFunc: authManager.EnsureUserAccessByJWTGroups,
MarkPATUsedFunc: authManager.MarkPATUsed,
GetPATInfoFunc: authManager.GetPATInfo,
}
networksManagerMock := networks.NewManagerMock()
resourcesManagerMock := resources.NewManagerMock()
routersManagerMock := routers.NewManagerMock()
groupsManagerMock := groups.NewManagerMock()
peersManager := peers.NewManager(store, permissionsManager)
apiHandler, err := http2.NewAPIHandler(context.Background(), am, networksManagerMock, resourcesManagerMock, routersManagerMock, groupsManagerMock, geoMock, authManagerMock, metrics, validatorMock, proxyController, permissionsManager, peersManager, settingsManager)
if err != nil {
t.Fatalf("Failed to create API handler: %v", err)
}
return apiHandler, am, done
}
func peerShouldNotReceiveUpdate(t testing_tools.TB, updateMessage <-chan *server.UpdateMessage) {
t.Helper()
select {
case msg := <-updateMessage:
t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond):
return
}
}
func peerShouldReceiveUpdate(t testing_tools.TB, updateMessage <-chan *server.UpdateMessage, expected *server.UpdateMessage) {
t.Helper()
select {
case msg := <-updateMessage:
if msg == nil {
t.Errorf("Received nil update message, expected valid message")
}
assert.Equal(t, expected, msg)
case <-time.After(500 * time.Millisecond):
t.Errorf("Timed out waiting for update message")
}
}
func mockValidateAndParseToken(_ context.Context, token string) (nbcontext.UserAuth, *jwt.Token, error) {
userAuth := nbcontext.UserAuth{}
switch token {
case "testUserId", "testAdminId", "testOwnerId", "testServiceUserId", "testServiceAdminId", "blockedUserId":
userAuth.UserId = token
userAuth.AccountId = "testAccountId"
userAuth.Domain = "test.com"
userAuth.DomainCategory = "private"
case "otherUserId":
userAuth.UserId = "otherUserId"
userAuth.AccountId = "otherAccountId"
userAuth.Domain = "other.com"
userAuth.DomainCategory = "private"
case "invalidToken":
return userAuth, nil, errors.New("invalid token")
}
jwtToken := jwt.New(jwt.SigningMethodHS256)
return userAuth, jwtToken, nil
}

View File

@@ -3,7 +3,6 @@ package testing_tools
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net"
@@ -14,32 +13,12 @@ import (
"testing"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/management-integrations/integrations"
"github.com/netbirdio/netbird/management/server/peers"
"github.com/netbirdio/netbird/management/server/permissions"
"github.com/netbirdio/netbird/management/server/settings"
"github.com/netbirdio/netbird/management/server/users"
"github.com/netbirdio/netbird/management/server"
"github.com/netbirdio/netbird/management/server/account"
"github.com/netbirdio/netbird/management/server/activity"
"github.com/netbirdio/netbird/management/server/auth"
nbcontext "github.com/netbirdio/netbird/management/server/context"
"github.com/netbirdio/netbird/management/server/geolocation"
"github.com/netbirdio/netbird/management/server/groups"
nbhttp "github.com/netbirdio/netbird/management/server/http"
"github.com/netbirdio/netbird/management/server/networks"
"github.com/netbirdio/netbird/management/server/networks/resources"
"github.com/netbirdio/netbird/management/server/networks/routers"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/posture"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/management/server/util"
)
@@ -106,90 +85,6 @@ type PerformanceMetrics struct {
MaxMsPerOpCICD float64
}
func BuildApiBlackBoxWithDBState(t TB, sqlFile string, expectedPeerUpdate *server.UpdateMessage, validateUpdate bool) (http.Handler, account.Manager, chan struct{}) {
store, cleanup, err := store.NewTestStoreFromSQL(context.Background(), sqlFile, t.TempDir())
if err != nil {
t.Fatalf("Failed to create test store: %v", err)
}
t.Cleanup(cleanup)
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
if err != nil {
t.Fatalf("Failed to create metrics: %v", err)
}
peersUpdateManager := server.NewPeersUpdateManager(nil)
updMsg := peersUpdateManager.CreateChannel(context.Background(), TestPeerId)
done := make(chan struct{})
if validateUpdate {
go func() {
if expectedPeerUpdate != nil {
peerShouldReceiveUpdate(t, updMsg, expectedPeerUpdate)
} else {
peerShouldNotReceiveUpdate(t, updMsg)
}
close(done)
}()
}
geoMock := &geolocation.Mock{}
validatorMock := server.MockIntegratedValidator{}
proxyController := integrations.NewController(store)
userManager := users.NewManager(store)
permissionsManager := permissions.NewManager(store)
settingsManager := settings.NewManager(store, userManager, integrations.NewManager(&activity.InMemoryEventStore{}), permissionsManager)
am, err := server.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false)
if err != nil {
t.Fatalf("Failed to create manager: %v", err)
}
// @note this is required so that PAT's validate from store, but JWT's are mocked
authManager := auth.NewManager(store, "", "", "", "", []string{}, false)
authManagerMock := &auth.MockManager{
ValidateAndParseTokenFunc: mockValidateAndParseToken,
EnsureUserAccessByJWTGroupsFunc: authManager.EnsureUserAccessByJWTGroups,
MarkPATUsedFunc: authManager.MarkPATUsed,
GetPATInfoFunc: authManager.GetPATInfo,
}
networksManagerMock := networks.NewManagerMock()
resourcesManagerMock := resources.NewManagerMock()
routersManagerMock := routers.NewManagerMock()
groupsManagerMock := groups.NewManagerMock()
peersManager := peers.NewManager(store, permissionsManager)
apiHandler, err := nbhttp.NewAPIHandler(context.Background(), am, networksManagerMock, resourcesManagerMock, routersManagerMock, groupsManagerMock, geoMock, authManagerMock, metrics, validatorMock, proxyController, permissionsManager, peersManager, settingsManager)
if err != nil {
t.Fatalf("Failed to create API handler: %v", err)
}
return apiHandler, am, done
}
func peerShouldNotReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage) {
t.Helper()
select {
case msg := <-updateMessage:
t.Errorf("Unexpected message received: %+v", msg)
case <-time.After(500 * time.Millisecond):
return
}
}
func peerShouldReceiveUpdate(t TB, updateMessage <-chan *server.UpdateMessage, expected *server.UpdateMessage) {
t.Helper()
select {
case msg := <-updateMessage:
if msg == nil {
t.Errorf("Received nil update message, expected valid message")
}
assert.Equal(t, expected, msg)
case <-time.After(500 * time.Millisecond):
t.Errorf("Timed out waiting for update message")
}
}
func BuildRequest(t TB, requestBody []byte, requestType, requestPath, user string) *http.Request {
t.Helper()
@@ -222,11 +117,11 @@ func ReadResponse(t *testing.T, recorder *httptest.ResponseRecorder, expectedSta
return content, expectedStatus == http.StatusOK
}
func PopulateTestData(b *testing.B, am *server.DefaultAccountManager, peers, groups, users, setupKeys int) {
func PopulateTestData(b *testing.B, am account.Manager, peers, groups, users, setupKeys int) {
b.Helper()
ctx := context.Background()
account, err := am.GetAccount(ctx, TestAccountId)
acc, err := am.GetAccount(ctx, TestAccountId)
if err != nil {
b.Fatalf("Failed to get account: %v", err)
}
@@ -242,23 +137,23 @@ func PopulateTestData(b *testing.B, am *server.DefaultAccountManager, peers, gro
Status: &nbpeer.PeerStatus{LastSeen: time.Now().UTC(), Connected: true},
UserID: TestUserId,
}
account.Peers[peer.ID] = peer
acc.Peers[peer.ID] = peer
}
// Create users
for i := 0; i < users; i++ {
user := &types.User{
Id: fmt.Sprintf("olduser-%d", i),
AccountID: account.Id,
AccountID: acc.Id,
Role: types.UserRoleUser,
}
account.Users[user.Id] = user
acc.Users[user.Id] = user
}
for i := 0; i < setupKeys; i++ {
key := &types.SetupKey{
Id: fmt.Sprintf("oldkey-%d", i),
AccountID: account.Id,
AccountID: acc.Id,
AutoGroups: []string{"someGroupID"},
UpdatedAt: time.Now().UTC(),
ExpiresAt: util.ToPtr(time.Now().Add(ExpiresIn * time.Second)),
@@ -266,11 +161,11 @@ func PopulateTestData(b *testing.B, am *server.DefaultAccountManager, peers, gro
Type: "reusable",
UsageLimit: 0,
}
account.SetupKeys[key.Id] = key
acc.SetupKeys[key.Id] = key
}
// Create groups and policies
account.Policies = make([]*types.Policy, 0, groups)
acc.Policies = make([]*types.Policy, 0, groups)
for i := 0; i < groups; i++ {
groupID := fmt.Sprintf("group-%d", i)
group := &types.Group{
@@ -281,7 +176,7 @@ func PopulateTestData(b *testing.B, am *server.DefaultAccountManager, peers, gro
peerIndex := i*(peers/groups) + j
group.Peers = append(group.Peers, fmt.Sprintf("peer-%d", peerIndex))
}
account.Groups[groupID] = group
acc.Groups[groupID] = group
// Create a policy for this group
policy := &types.Policy{
@@ -301,10 +196,10 @@ func PopulateTestData(b *testing.B, am *server.DefaultAccountManager, peers, gro
},
},
}
account.Policies = append(account.Policies, policy)
acc.Policies = append(acc.Policies, policy)
}
account.PostureChecks = []*posture.Checks{
acc.PostureChecks = []*posture.Checks{
{
ID: "PostureChecksAll",
Name: "All",
@@ -316,52 +211,38 @@ func PopulateTestData(b *testing.B, am *server.DefaultAccountManager, peers, gro
},
}
err = am.Store.SaveAccount(context.Background(), account)
store := am.GetStore()
err = store.SaveAccount(context.Background(), acc)
if err != nil {
b.Fatalf("Failed to save account: %v", err)
}
}
func EvaluateBenchmarkResults(b *testing.B, testCase string, duration time.Duration, recorder *httptest.ResponseRecorder, module string, operation string) {
func EvaluateAPIBenchmarkResults(b *testing.B, testCase string, duration time.Duration, recorder *httptest.ResponseRecorder, module string, operation string) {
b.Helper()
branch := os.Getenv("GIT_BRANCH")
if branch == "" {
b.Fatalf("environment variable GIT_BRANCH is not set")
}
if recorder.Code != http.StatusOK {
b.Fatalf("Benchmark %s failed: unexpected status code %d", testCase, recorder.Code)
}
EvaluateBenchmarkResults(b, testCase, duration, module, operation)
}
func EvaluateBenchmarkResults(b *testing.B, testCase string, duration time.Duration, module string, operation string) {
b.Helper()
branch := os.Getenv("GIT_BRANCH")
if branch == "" && os.Getenv("CI") == "true" {
b.Fatalf("environment variable GIT_BRANCH is not set")
}
msPerOp := float64(duration.Nanoseconds()) / float64(b.N) / 1e6
gauge := BenchmarkDuration.WithLabelValues(module, operation, testCase, branch)
gauge.Set(msPerOp)
b.ReportMetric(msPerOp, "ms/op")
}
func mockValidateAndParseToken(_ context.Context, token string) (nbcontext.UserAuth, *jwt.Token, error) {
userAuth := nbcontext.UserAuth{}
switch token {
case "testUserId", "testAdminId", "testOwnerId", "testServiceUserId", "testServiceAdminId", "blockedUserId":
userAuth.UserId = token
userAuth.AccountId = "testAccountId"
userAuth.Domain = "test.com"
userAuth.DomainCategory = "private"
case "otherUserId":
userAuth.UserId = "otherUserId"
userAuth.AccountId = "otherAccountId"
userAuth.Domain = "other.com"
userAuth.DomainCategory = "private"
case "invalidToken":
return userAuth, nil, errors.New("invalid token")
}
jwtToken := jwt.New(jwt.SigningMethodHS256)
return userAuth, jwtToken, nil
}

View File

@@ -0,0 +1,160 @@
package server
import (
"hash/fnv"
"math"
"sync"
"time"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
)
const (
reconnThreshold = 5 * time.Minute
baseBlockDuration = 10 * time.Minute // Duration for which a peer is banned after exceeding the reconnection limit
reconnLimitForBan = 30 // Number of reconnections within the reconnTreshold that triggers a ban
metaChangeLimit = 3 // Number of reconnections with different metadata that triggers a ban of one peer
)
type lfConfig struct {
reconnThreshold time.Duration
baseBlockDuration time.Duration
reconnLimitForBan int
metaChangeLimit int
}
func initCfg() *lfConfig {
return &lfConfig{
reconnThreshold: reconnThreshold,
baseBlockDuration: baseBlockDuration,
reconnLimitForBan: reconnLimitForBan,
metaChangeLimit: metaChangeLimit,
}
}
type loginFilter struct {
mu sync.RWMutex
cfg *lfConfig
logged map[string]*peerState
}
type peerState struct {
currentHash uint64
sessionCounter int
sessionStart time.Time
lastSeen time.Time
isBanned bool
banLevel int
banExpiresAt time.Time
metaChangeCounter int
metaChangeWindowStart time.Time
}
func newLoginFilter() *loginFilter {
return newLoginFilterWithCfg(initCfg())
}
func newLoginFilterWithCfg(cfg *lfConfig) *loginFilter {
return &loginFilter{
logged: make(map[string]*peerState),
cfg: cfg,
}
}
func (l *loginFilter) allowLogin(wgPubKey string, metaHash uint64) bool {
l.mu.RLock()
defer func() {
l.mu.RUnlock()
}()
state, ok := l.logged[wgPubKey]
if !ok {
return true
}
if state.isBanned && time.Now().Before(state.banExpiresAt) {
return false
}
if metaHash != state.currentHash {
if time.Now().Before(state.metaChangeWindowStart.Add(l.cfg.reconnThreshold)) && state.metaChangeCounter >= l.cfg.metaChangeLimit {
return false
}
}
return true
}
func (l *loginFilter) addLogin(wgPubKey string, metaHash uint64) {
now := time.Now()
l.mu.Lock()
defer func() {
l.mu.Unlock()
}()
state, ok := l.logged[wgPubKey]
if !ok {
l.logged[wgPubKey] = &peerState{
currentHash: metaHash,
sessionCounter: 1,
sessionStart: now,
lastSeen: now,
metaChangeWindowStart: now,
metaChangeCounter: 1,
}
return
}
if state.isBanned && now.After(state.banExpiresAt) {
state.isBanned = false
}
if state.banLevel > 0 && now.Sub(state.lastSeen) > (2*l.cfg.baseBlockDuration) {
state.banLevel = 0
}
if metaHash != state.currentHash {
if now.After(state.metaChangeWindowStart.Add(l.cfg.reconnThreshold)) {
state.metaChangeWindowStart = now
state.metaChangeCounter = 1
} else {
state.metaChangeCounter++
}
state.currentHash = metaHash
state.sessionCounter = 1
state.sessionStart = now
state.lastSeen = now
return
}
state.sessionCounter++
if state.sessionCounter > l.cfg.reconnLimitForBan && now.Sub(state.sessionStart) < l.cfg.reconnThreshold {
state.isBanned = true
state.banLevel++
backoffFactor := math.Pow(2, float64(state.banLevel-1))
duration := time.Duration(float64(l.cfg.baseBlockDuration) * backoffFactor)
state.banExpiresAt = now.Add(duration)
state.sessionCounter = 0
state.sessionStart = now
}
state.lastSeen = now
}
func metaHash(meta nbpeer.PeerSystemMeta, pubip string) uint64 {
h := fnv.New64a()
h.Write([]byte(meta.WtVersion))
h.Write([]byte(meta.OSVersion))
h.Write([]byte(meta.KernelVersion))
h.Write([]byte(meta.Hostname))
h.Write([]byte(meta.SystemSerialNumber))
h.Write([]byte(pubip))
macs := uint64(0)
for _, na := range meta.NetworkAddresses {
for _, r := range na.Mac {
macs += uint64(r)
}
}
return h.Sum64() + macs
}

View File

@@ -0,0 +1,275 @@
package server
import (
"hash/fnv"
"math"
"math/rand"
"strconv"
"strings"
"testing"
"time"
"github.com/stretchr/testify/suite"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
)
func testAdvancedCfg() *lfConfig {
return &lfConfig{
reconnThreshold: 50 * time.Millisecond,
baseBlockDuration: 100 * time.Millisecond,
reconnLimitForBan: 3,
metaChangeLimit: 2,
}
}
type LoginFilterTestSuite struct {
suite.Suite
filter *loginFilter
}
func (s *LoginFilterTestSuite) SetupTest() {
s.filter = newLoginFilterWithCfg(testAdvancedCfg())
}
func TestLoginFilterTestSuite(t *testing.T) {
suite.Run(t, new(LoginFilterTestSuite))
}
func (s *LoginFilterTestSuite) TestFirstLoginIsAlwaysAllowed() {
pubKey := "PUB_KEY_A"
meta := uint64(1)
s.True(s.filter.allowLogin(pubKey, meta))
s.filter.addLogin(pubKey, meta)
s.Require().Contains(s.filter.logged, pubKey)
s.Equal(1, s.filter.logged[pubKey].sessionCounter)
}
func (s *LoginFilterTestSuite) TestFlappingSameHashTriggersBan() {
pubKey := "PUB_KEY_A"
meta := uint64(1)
limit := s.filter.cfg.reconnLimitForBan
for i := 0; i <= limit; i++ {
s.filter.addLogin(pubKey, meta)
}
s.False(s.filter.allowLogin(pubKey, meta))
s.Require().Contains(s.filter.logged, pubKey)
s.True(s.filter.logged[pubKey].isBanned)
}
func (s *LoginFilterTestSuite) TestBanDurationIncreasesExponentially() {
pubKey := "PUB_KEY_A"
meta := uint64(1)
limit := s.filter.cfg.reconnLimitForBan
baseBan := s.filter.cfg.baseBlockDuration
for i := 0; i <= limit; i++ {
s.filter.addLogin(pubKey, meta)
}
s.Require().Contains(s.filter.logged, pubKey)
s.True(s.filter.logged[pubKey].isBanned)
s.Equal(1, s.filter.logged[pubKey].banLevel)
firstBanDuration := s.filter.logged[pubKey].banExpiresAt.Sub(s.filter.logged[pubKey].lastSeen)
s.InDelta(baseBan, firstBanDuration, float64(time.Millisecond))
s.filter.logged[pubKey].banExpiresAt = time.Now().Add(-time.Second)
s.filter.logged[pubKey].isBanned = false
for i := 0; i <= limit; i++ {
s.filter.addLogin(pubKey, meta)
}
s.True(s.filter.logged[pubKey].isBanned)
s.Equal(2, s.filter.logged[pubKey].banLevel)
secondBanDuration := s.filter.logged[pubKey].banExpiresAt.Sub(s.filter.logged[pubKey].lastSeen)
expectedSecondDuration := time.Duration(float64(baseBan) * math.Pow(2, 1))
s.InDelta(expectedSecondDuration, secondBanDuration, float64(time.Millisecond))
}
func (s *LoginFilterTestSuite) TestPeerIsAllowedAfterBanExpires() {
pubKey := "PUB_KEY_A"
meta := uint64(1)
s.filter.logged[pubKey] = &peerState{
isBanned: true,
banExpiresAt: time.Now().Add(-(s.filter.cfg.baseBlockDuration + time.Second)),
}
s.True(s.filter.allowLogin(pubKey, meta))
s.filter.addLogin(pubKey, meta)
s.Require().Contains(s.filter.logged, pubKey)
s.False(s.filter.logged[pubKey].isBanned)
}
func (s *LoginFilterTestSuite) TestBanLevelResetsAfterGoodBehavior() {
pubKey := "PUB_KEY_A"
meta := uint64(1)
s.filter.logged[pubKey] = &peerState{
currentHash: meta,
banLevel: 3,
lastSeen: time.Now().Add(-3 * s.filter.cfg.baseBlockDuration),
}
s.filter.addLogin(pubKey, meta)
s.Require().Contains(s.filter.logged, pubKey)
s.Equal(0, s.filter.logged[pubKey].banLevel)
}
func (s *LoginFilterTestSuite) TestFlappingDifferentHashesTriggersBlock() {
pubKey := "PUB_KEY_A"
limit := s.filter.cfg.metaChangeLimit
for i := range limit {
s.filter.addLogin(pubKey, uint64(i+1))
}
s.Require().Contains(s.filter.logged, pubKey)
s.Equal(limit, s.filter.logged[pubKey].metaChangeCounter)
isAllowed := s.filter.allowLogin(pubKey, uint64(limit+1))
s.False(isAllowed, "should block new meta hash after limit is reached")
}
func (s *LoginFilterTestSuite) TestMetaChangeIsAllowedAfterWindowResets() {
pubKey := "PUB_KEY_A"
meta1 := uint64(1)
meta2 := uint64(2)
meta3 := uint64(3)
s.filter.addLogin(pubKey, meta1)
s.filter.addLogin(pubKey, meta2)
s.Require().Contains(s.filter.logged, pubKey)
s.Equal(s.filter.cfg.metaChangeLimit, s.filter.logged[pubKey].metaChangeCounter)
s.False(s.filter.allowLogin(pubKey, meta3), "should be blocked inside window")
s.filter.logged[pubKey].metaChangeWindowStart = time.Now().Add(-(s.filter.cfg.reconnThreshold + time.Second))
s.True(s.filter.allowLogin(pubKey, meta3), "should be allowed after window expires")
s.filter.addLogin(pubKey, meta3)
s.Equal(1, s.filter.logged[pubKey].metaChangeCounter, "meta change counter should reset")
}
func BenchmarkHashingMethods(b *testing.B) {
meta := nbpeer.PeerSystemMeta{
WtVersion: "1.25.1",
OSVersion: "Ubuntu 22.04.3 LTS",
KernelVersion: "5.15.0-76-generic",
Hostname: "prod-server-database-01",
SystemSerialNumber: "PC-1234567890",
NetworkAddresses: []nbpeer.NetworkAddress{{Mac: "00:1B:44:11:3A:B7"}, {Mac: "00:1B:44:11:3A:B8"}},
}
pubip := "8.8.8.8"
var resultString string
var resultUint uint64
b.Run("BuilderString", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resultString = builderString(meta, pubip)
}
})
b.Run("FnvHashToString", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resultString = fnvHashToString(meta, pubip)
}
})
b.Run("FnvHashToUint64 - used", func(b *testing.B) {
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
resultUint = metaHash(meta, pubip)
}
})
_ = resultString
_ = resultUint
}
func fnvHashToString(meta nbpeer.PeerSystemMeta, pubip string) string {
h := fnv.New64a()
if len(meta.NetworkAddresses) != 0 {
for _, na := range meta.NetworkAddresses {
h.Write([]byte(na.Mac))
}
}
h.Write([]byte(meta.WtVersion))
h.Write([]byte(meta.OSVersion))
h.Write([]byte(meta.KernelVersion))
h.Write([]byte(meta.Hostname))
h.Write([]byte(meta.SystemSerialNumber))
h.Write([]byte(pubip))
return strconv.FormatUint(h.Sum64(), 16)
}
func builderString(meta nbpeer.PeerSystemMeta, pubip string) string {
mac := getMacAddress(meta.NetworkAddresses)
estimatedSize := len(meta.WtVersion) + len(meta.OSVersion) + len(meta.KernelVersion) + len(meta.Hostname) + len(meta.SystemSerialNumber) +
len(pubip) + len(mac) + 6
var b strings.Builder
b.Grow(estimatedSize)
b.WriteString(meta.WtVersion)
b.WriteByte('|')
b.WriteString(meta.OSVersion)
b.WriteByte('|')
b.WriteString(meta.KernelVersion)
b.WriteByte('|')
b.WriteString(meta.Hostname)
b.WriteByte('|')
b.WriteString(meta.SystemSerialNumber)
b.WriteByte('|')
b.WriteString(pubip)
return b.String()
}
func getMacAddress(nas []nbpeer.NetworkAddress) string {
if len(nas) == 0 {
return ""
}
macs := make([]string, 0, len(nas))
for _, na := range nas {
macs = append(macs, na.Mac)
}
return strings.Join(macs, "/")
}
func BenchmarkLoginFilter_ParallelLoad(b *testing.B) {
filter := newLoginFilterWithCfg(testAdvancedCfg())
numKeys := 100000
pubKeys := make([]string, numKeys)
for i := range numKeys {
pubKeys[i] = "PUB_KEY_" + strconv.Itoa(i)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
r := rand.New(rand.NewSource(time.Now().UnixNano()))
for pb.Next() {
key := pubKeys[r.Intn(numKeys)]
meta := r.Uint64()
if filter.allowLogin(key, meta) {
filter.addLogin(key, meta)
}
}
})
}

View File

@@ -95,6 +95,8 @@ type MockAccountManager struct {
LoginPeerFunc func(ctx context.Context, login types.PeerLogin) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error)
SyncPeerFunc func(ctx context.Context, sync types.PeerSync, accountID string) (*nbpeer.Peer, *types.NetworkMap, []*posture.Checks, error)
InviteUserFunc func(ctx context.Context, accountID string, initiatorUserID string, targetUserEmail string) error
ApproveUserFunc func(ctx context.Context, accountID, initiatorUserID, targetUserID string) (*types.UserInfo, error)
RejectUserFunc func(ctx context.Context, accountID, initiatorUserID, targetUserID string) error
GetAllConnectedPeersFunc func() (map[string]struct{}, error)
HasConnectedChannelFunc func(peerID string) bool
GetExternalCacheManagerFunc func() account.ExternalCacheManager
@@ -121,8 +123,10 @@ type MockAccountManager struct {
GetAccountOnboardingFunc func(ctx context.Context, accountID, userID string) (*types.AccountOnboarding, error)
UpdateAccountOnboardingFunc func(ctx context.Context, accountID, userID string, onboarding *types.AccountOnboarding) (*types.AccountOnboarding, error)
GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error)
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
AllowSyncFunc func(string, uint64) bool
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
}
func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error {
@@ -605,6 +609,20 @@ func (am *MockAccountManager) InviteUser(ctx context.Context, accountID string,
return status.Errorf(codes.Unimplemented, "method InviteUser is not implemented")
}
func (am *MockAccountManager) ApproveUser(ctx context.Context, accountID, initiatorUserID, targetUserID string) (*types.UserInfo, error) {
if am.ApproveUserFunc != nil {
return am.ApproveUserFunc(ctx, accountID, initiatorUserID, targetUserID)
}
return nil, status.Errorf(codes.Unimplemented, "method ApproveUser is not implemented")
}
func (am *MockAccountManager) RejectUser(ctx context.Context, accountID, initiatorUserID, targetUserID string) error {
if am.RejectUserFunc != nil {
return am.RejectUserFunc(ctx, accountID, initiatorUserID, targetUserID)
}
return status.Errorf(codes.Unimplemented, "method RejectUser is not implemented")
}
// GetNameServerGroup mocks GetNameServerGroup of the AccountManager interface
func (am *MockAccountManager) GetNameServerGroup(ctx context.Context, accountID, userID, nsGroupID string) (*nbdns.NameServerGroup, error) {
if am.GetNameServerGroupFunc != nil {
@@ -953,3 +971,10 @@ func (am *MockAccountManager) GetCurrentUserInfo(ctx context.Context, userAuth n
}
return nil, status.Errorf(codes.Unimplemented, "method GetCurrentUserInfo is not implemented")
}
func (am *MockAccountManager) AllowSync(key string, hash uint64) bool {
if am.AllowSyncFunc != nil {
return am.AllowSyncFunc(key, hash)
}
return true
}

View File

@@ -368,10 +368,6 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
return err
}
if err = transaction.RemovePeerFromAllGroups(ctx, peer.ID); err != nil {
return fmt.Errorf("failed to remove peer from groups: %w", err)
}
eventsToStore, err = deletePeers(ctx, am, transaction, accountID, userID, []*nbpeer.Peer{peer})
if err != nil {
return fmt.Errorf("failed to delete peer: %w", err)
@@ -493,6 +489,9 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, setupKey, userID s
if err != nil {
return nil, nil, nil, status.Errorf(status.NotFound, "failed adding new peer: user not found")
}
if user.PendingApproval {
return nil, nil, nil, status.Errorf(status.PermissionDenied, "user pending approval cannot add peers")
}
groupsToAdd = user.AutoGroups
opEvent.InitiatorID = userID
opEvent.Activity = activity.PeerAddedByUser

View File

@@ -26,6 +26,7 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/management/internals/server/config"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
"github.com/netbirdio/netbird/management/server/mock_server"
"github.com/netbirdio/netbird/management/server/permissions"
@@ -989,19 +990,14 @@ func BenchmarkUpdateAccountPeers(b *testing.B) {
msPerOp := float64(duration.Nanoseconds()) / float64(b.N) / 1e6
b.ReportMetric(msPerOp, "ms/op")
minExpected := bc.minMsPerOpLocal
maxExpected := bc.maxMsPerOpLocal
if os.Getenv("CI") == "true" {
minExpected = bc.minMsPerOpCICD
maxExpected = bc.maxMsPerOpCICD
testing_tools.EvaluateBenchmarkResults(b, bc.name, time.Since(start), "login", "newPeer")
}
if msPerOp < minExpected {
b.Fatalf("Benchmark %s failed: too fast (%.2f ms/op, minimum %.2f ms/op)", bc.name, msPerOp, minExpected)
}
if msPerOp > (maxExpected * 1.1) {
b.Fatalf("Benchmark %s failed: too slow (%.2f ms/op, maximum %.2f ms/op)", bc.name, msPerOp, maxExpected)
if msPerOp > maxExpected {
b.Logf("Benchmark %s: too slow (%.2f ms/op, max %.2f ms/op)", bc.name, msPerOp, maxExpected)
}
})
}
@@ -1609,7 +1605,6 @@ func Test_LoginPeer(t *testing.T) {
testCases := []struct {
name string
setupKey string
wireGuardPubKey string
expectExtraDNSLabelsMismatch bool
extraDNSLabels []string
expectLoginError bool
@@ -2388,3 +2383,186 @@ func TestBufferUpdateAccountPeers(t *testing.T) {
assert.Less(t, totalNewRuns, totalOldRuns, "Expected new approach to run less than old approach. New runs: %d, Old runs: %d", totalNewRuns, totalOldRuns)
t.Logf("New runs: %d, Old runs: %d", totalNewRuns, totalOldRuns)
}
func TestAddPeer_UserPendingApprovalBlocked(t *testing.T) {
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
}
// Create account
account := newAccountWithId(context.Background(), "test-account", "owner", "", false)
err = manager.Store.SaveAccount(context.Background(), account)
require.NoError(t, err)
// Create user pending approval
pendingUser := types.NewRegularUser("pending-user")
pendingUser.AccountID = account.Id
pendingUser.Blocked = true
pendingUser.PendingApproval = true
err = manager.Store.SaveUser(context.Background(), pendingUser)
require.NoError(t, err)
// Try to add peer with pending approval user
key, err := wgtypes.GenerateKey()
require.NoError(t, err)
peer := &nbpeer.Peer{
Key: key.PublicKey().String(),
Name: "test-peer",
Meta: nbpeer.PeerSystemMeta{
Hostname: "test-peer",
OS: "linux",
},
}
_, _, _, err = manager.AddPeer(context.Background(), "", pendingUser.Id, peer)
require.Error(t, err)
assert.Contains(t, err.Error(), "user pending approval cannot add peers")
}
func TestAddPeer_ApprovedUserCanAddPeers(t *testing.T) {
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
}
// Create account
account := newAccountWithId(context.Background(), "test-account", "owner", "", false)
err = manager.Store.SaveAccount(context.Background(), account)
require.NoError(t, err)
// Create regular user (not pending approval)
regularUser := types.NewRegularUser("regular-user")
regularUser.AccountID = account.Id
err = manager.Store.SaveUser(context.Background(), regularUser)
require.NoError(t, err)
// Try to add peer with regular user
key, err := wgtypes.GenerateKey()
require.NoError(t, err)
peer := &nbpeer.Peer{
Key: key.PublicKey().String(),
Name: "test-peer",
Meta: nbpeer.PeerSystemMeta{
Hostname: "test-peer",
OS: "linux",
},
}
_, _, _, err = manager.AddPeer(context.Background(), "", regularUser.Id, peer)
require.NoError(t, err, "Regular user should be able to add peers")
}
func TestLoginPeer_UserPendingApprovalBlocked(t *testing.T) {
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
}
// Create account
account := newAccountWithId(context.Background(), "test-account", "owner", "", false)
err = manager.Store.SaveAccount(context.Background(), account)
require.NoError(t, err)
// Create user pending approval
pendingUser := types.NewRegularUser("pending-user")
pendingUser.AccountID = account.Id
pendingUser.Blocked = true
pendingUser.PendingApproval = true
err = manager.Store.SaveUser(context.Background(), pendingUser)
require.NoError(t, err)
// Create a peer using AddPeer method for the pending user (simulate existing peer)
key, err := wgtypes.GenerateKey()
require.NoError(t, err)
// Set the user to not be pending initially so peer can be added
pendingUser.Blocked = false
pendingUser.PendingApproval = false
err = manager.Store.SaveUser(context.Background(), pendingUser)
require.NoError(t, err)
// Add peer using regular flow
newPeer := &nbpeer.Peer{
Key: key.PublicKey().String(),
Name: "test-peer",
Meta: nbpeer.PeerSystemMeta{
Hostname: "test-peer",
OS: "linux",
WtVersion: "0.28.0",
},
}
existingPeer, _, _, err := manager.AddPeer(context.Background(), "", pendingUser.Id, newPeer)
require.NoError(t, err)
// Now set the user back to pending approval after peer was created
pendingUser.Blocked = true
pendingUser.PendingApproval = true
err = manager.Store.SaveUser(context.Background(), pendingUser)
require.NoError(t, err)
// Try to login with pending approval user
login := types.PeerLogin{
WireGuardPubKey: existingPeer.Key,
UserID: pendingUser.Id,
Meta: nbpeer.PeerSystemMeta{
Hostname: "test-peer",
OS: "linux",
},
}
_, _, _, err = manager.LoginPeer(context.Background(), login)
require.Error(t, err)
e, ok := status.FromError(err)
require.True(t, ok, "error is not a gRPC status error")
assert.Equal(t, status.PermissionDenied, e.Type(), "expected PermissionDenied error code")
}
func TestLoginPeer_ApprovedUserCanLogin(t *testing.T) {
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
}
// Create account
account := newAccountWithId(context.Background(), "test-account", "owner", "", false)
err = manager.Store.SaveAccount(context.Background(), account)
require.NoError(t, err)
// Create regular user (not pending approval)
regularUser := types.NewRegularUser("regular-user")
regularUser.AccountID = account.Id
err = manager.Store.SaveUser(context.Background(), regularUser)
require.NoError(t, err)
// Add peer using regular flow for the regular user
key, err := wgtypes.GenerateKey()
require.NoError(t, err)
newPeer := &nbpeer.Peer{
Key: key.PublicKey().String(),
Name: "test-peer",
Meta: nbpeer.PeerSystemMeta{
Hostname: "test-peer",
OS: "linux",
WtVersion: "0.28.0",
},
}
existingPeer, _, _, err := manager.AddPeer(context.Background(), "", regularUser.Id, newPeer)
require.NoError(t, err)
// Try to login with regular user
login := types.PeerLogin{
WireGuardPubKey: existingPeer.Key,
UserID: regularUser.Id,
Meta: nbpeer.PeerSystemMeta{
Hostname: "test-peer",
OS: "linux",
},
}
_, _, _, err = manager.LoginPeer(context.Background(), login)
require.NoError(t, err, "Regular user should be able to login peers")
}

View File

@@ -54,10 +54,14 @@ func (m *managerImpl) ValidateUserPermissions(
return false, status.NewUserNotFoundError(userID)
}
if user.IsBlocked() {
if user.IsBlocked() && !user.PendingApproval {
return false, status.NewUserBlockedError()
}
if user.IsBlocked() && user.PendingApproval {
return false, status.NewUserPendingApprovalError()
}
if err := m.ValidateAccountAccess(ctx, accountID, user, false); err != nil {
return false, err
}

View File

@@ -914,7 +914,7 @@ func (s *SqlStore) GetAccountByPeerPubKey(ctx context.Context, peerKey string) (
func (s *SqlStore) GetAnyAccountID(ctx context.Context) (string, error) {
var account types.Account
result := s.db.WithContext(ctx).Select("id").Order("created_at desc").Limit(1).Find(&account)
result := s.db.Select("id").Order("created_at desc").Limit(1).Find(&account)
if result.Error != nil {
return "", status.NewGetAccountFromStoreError(result.Error)
}
@@ -1399,7 +1399,7 @@ func (s *SqlStore) AddPeerToGroup(ctx context.Context, accountID, peerID, groupI
PeerID: peerID,
}
err := s.db.WithContext(ctx).Clauses(clause.OnConflict{
err := s.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "group_id"}, {Name: "peer_id"}},
DoNothing: true,
}).Create(peer).Error
@@ -1414,7 +1414,7 @@ func (s *SqlStore) AddPeerToGroup(ctx context.Context, accountID, peerID, groupI
// RemovePeerFromGroup removes a peer from a group
func (s *SqlStore) RemovePeerFromGroup(ctx context.Context, peerID string, groupID string) error {
err := s.db.WithContext(ctx).
err := s.db.
Delete(&types.GroupPeer{}, "group_id = ? AND peer_id = ?", groupID, peerID).Error
if err != nil {
@@ -1427,7 +1427,7 @@ func (s *SqlStore) RemovePeerFromGroup(ctx context.Context, peerID string, group
// RemovePeerFromAllGroups removes a peer from all groups
func (s *SqlStore) RemovePeerFromAllGroups(ctx context.Context, peerID string) error {
err := s.db.WithContext(ctx).
err := s.db.
Delete(&types.GroupPeer{}, "peer_id = ?", peerID).Error
if err != nil {
@@ -2015,7 +2015,7 @@ func (s *SqlStore) SavePolicy(ctx context.Context, policy *types.Policy) error {
}
func (s *SqlStore) DeletePolicy(ctx context.Context, accountID, policyID string) error {
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
return s.db.Transaction(func(tx *gorm.DB) error {
if err := tx.Where("policy_id = ?", policyID).Delete(&types.PolicyRule{}).Error; err != nil {
return fmt.Errorf("delete policy rules: %w", err)
}
@@ -2706,7 +2706,7 @@ func (s *SqlStore) GetPeerByIP(ctx context.Context, lockStrength LockingStrength
}
func (s *SqlStore) GetPeerIdByLabel(ctx context.Context, lockStrength LockingStrength, accountID string, hostname string) (string, error) {
tx := s.db.WithContext(ctx)
tx := s.db
if lockStrength != LockingStrengthNone {
tx = tx.Clauses(clause.Locking{Strength: string(lockStrength)})
}

View File

@@ -9,18 +9,23 @@ import (
)
const AccountIDLabel = "account_id"
const HighLatencyThreshold = time.Second * 7
// GRPCMetrics are gRPC server metrics
type GRPCMetrics struct {
meter metric.Meter
syncRequestsCounter metric.Int64Counter
loginRequestsCounter metric.Int64Counter
getKeyRequestsCounter metric.Int64Counter
activeStreamsGauge metric.Int64ObservableGauge
syncRequestDuration metric.Int64Histogram
loginRequestDuration metric.Int64Histogram
channelQueueLength metric.Int64Histogram
ctx context.Context
meter metric.Meter
syncRequestsCounter metric.Int64Counter
syncRequestsBlockedCounter metric.Int64Counter
syncRequestHighLatencyCounter metric.Int64Counter
loginRequestsCounter metric.Int64Counter
loginRequestsBlockedCounter metric.Int64Counter
loginRequestHighLatencyCounter metric.Int64Counter
getKeyRequestsCounter metric.Int64Counter
activeStreamsGauge metric.Int64ObservableGauge
syncRequestDuration metric.Int64Histogram
loginRequestDuration metric.Int64Histogram
channelQueueLength metric.Int64Histogram
ctx context.Context
}
// NewGRPCMetrics creates new GRPCMetrics struct and registers common metrics of the gRPC server
@@ -33,6 +38,22 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
return nil, err
}
syncRequestsBlockedCounter, err := meter.Int64Counter("management.grpc.sync.request.blocked.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of sync gRPC requests from blocked peers"),
)
if err != nil {
return nil, err
}
syncRequestHighLatencyCounter, err := meter.Int64Counter("management.grpc.sync.request.high.latency.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of sync gRPC requests from the peers that took longer than the threshold to establish a connection and receive network map updates (update channel)"),
)
if err != nil {
return nil, err
}
loginRequestsCounter, err := meter.Int64Counter("management.grpc.login.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of login gRPC requests from the peers to authenticate and receive initial configuration and relay credentials"),
@@ -41,6 +62,22 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
return nil, err
}
loginRequestsBlockedCounter, err := meter.Int64Counter("management.grpc.login.request.blocked.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of login gRPC requests from blocked peers"),
)
if err != nil {
return nil, err
}
loginRequestHighLatencyCounter, err := meter.Int64Counter("management.grpc.login.request.high.latency.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of login gRPC requests from the peers that took longer than the threshold to authenticate and receive initial configuration and relay credentials"),
)
if err != nil {
return nil, err
}
getKeyRequestsCounter, err := meter.Int64Counter("management.grpc.key.request.counter",
metric.WithUnit("1"),
metric.WithDescription("Number of key gRPC requests from the peers to get the server's public WireGuard key"),
@@ -86,15 +123,19 @@ func NewGRPCMetrics(ctx context.Context, meter metric.Meter) (*GRPCMetrics, erro
}
return &GRPCMetrics{
meter: meter,
syncRequestsCounter: syncRequestsCounter,
loginRequestsCounter: loginRequestsCounter,
getKeyRequestsCounter: getKeyRequestsCounter,
activeStreamsGauge: activeStreamsGauge,
syncRequestDuration: syncRequestDuration,
loginRequestDuration: loginRequestDuration,
channelQueueLength: channelQueue,
ctx: ctx,
meter: meter,
syncRequestsCounter: syncRequestsCounter,
syncRequestsBlockedCounter: syncRequestsBlockedCounter,
syncRequestHighLatencyCounter: syncRequestHighLatencyCounter,
loginRequestsCounter: loginRequestsCounter,
loginRequestsBlockedCounter: loginRequestsBlockedCounter,
loginRequestHighLatencyCounter: loginRequestHighLatencyCounter,
getKeyRequestsCounter: getKeyRequestsCounter,
activeStreamsGauge: activeStreamsGauge,
syncRequestDuration: syncRequestDuration,
loginRequestDuration: loginRequestDuration,
channelQueueLength: channelQueue,
ctx: ctx,
}, err
}
@@ -103,6 +144,11 @@ func (grpcMetrics *GRPCMetrics) CountSyncRequest() {
grpcMetrics.syncRequestsCounter.Add(grpcMetrics.ctx, 1)
}
// CountSyncRequestBlocked counts the number of gRPC sync requests from blocked peers
func (grpcMetrics *GRPCMetrics) CountSyncRequestBlocked() {
grpcMetrics.syncRequestsBlockedCounter.Add(grpcMetrics.ctx, 1)
}
// CountGetKeyRequest counts the number of gRPC get server key requests coming to the gRPC API
func (grpcMetrics *GRPCMetrics) CountGetKeyRequest() {
grpcMetrics.getKeyRequestsCounter.Add(grpcMetrics.ctx, 1)
@@ -113,14 +159,25 @@ func (grpcMetrics *GRPCMetrics) CountLoginRequest() {
grpcMetrics.loginRequestsCounter.Add(grpcMetrics.ctx, 1)
}
// CountLoginRequestBlocked counts the number of gRPC login requests from blocked peers
func (grpcMetrics *GRPCMetrics) CountLoginRequestBlocked() {
grpcMetrics.loginRequestsBlockedCounter.Add(grpcMetrics.ctx, 1)
}
// CountLoginRequestDuration counts the duration of the login gRPC requests
func (grpcMetrics *GRPCMetrics) CountLoginRequestDuration(duration time.Duration, accountID string) {
grpcMetrics.loginRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds(), metric.WithAttributes(attribute.String(AccountIDLabel, accountID)))
grpcMetrics.loginRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds())
if duration > HighLatencyThreshold {
grpcMetrics.loginRequestHighLatencyCounter.Add(grpcMetrics.ctx, 1, metric.WithAttributes(attribute.String(AccountIDLabel, accountID)))
}
}
// CountSyncRequestDuration counts the duration of the sync gRPC requests
func (grpcMetrics *GRPCMetrics) CountSyncRequestDuration(duration time.Duration, accountID string) {
grpcMetrics.syncRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds(), metric.WithAttributes(attribute.String(AccountIDLabel, accountID)))
grpcMetrics.syncRequestDuration.Record(grpcMetrics.ctx, duration.Milliseconds())
if duration > HighLatencyThreshold {
grpcMetrics.syncRequestHighLatencyCounter.Add(grpcMetrics.ctx, 1, metric.WithAttributes(attribute.String(AccountIDLabel, accountID)))
}
}
// RegisterConnectedStreams registers a function that collects number of active streams and feeds it to the metrics gauge.

View File

@@ -83,6 +83,9 @@ type ExtraSettings struct {
// PeerApprovalEnabled enables or disables the need for peers bo be approved by an administrator
PeerApprovalEnabled bool
// UserApprovalRequired enables or disables the need for users joining via domain matching to be approved by an administrator
UserApprovalRequired bool
// IntegratedValidator is the string enum for the integrated validator type
IntegratedValidator string
// IntegratedValidatorGroups list of group IDs to be used with integrated approval configurations
@@ -99,6 +102,7 @@ type ExtraSettings struct {
func (e *ExtraSettings) Copy() *ExtraSettings {
return &ExtraSettings{
PeerApprovalEnabled: e.PeerApprovalEnabled,
UserApprovalRequired: e.UserApprovalRequired,
IntegratedValidatorGroups: slices.Clone(e.IntegratedValidatorGroups),
IntegratedValidator: e.IntegratedValidator,
FlowEnabled: e.FlowEnabled,

View File

@@ -64,6 +64,7 @@ type UserInfo struct {
NonDeletable bool `json:"non_deletable"`
LastLogin time.Time `json:"last_login"`
Issued string `json:"issued"`
PendingApproval bool `json:"pending_approval"`
IntegrationReference integration_reference.IntegrationReference `json:"-"`
}
@@ -84,6 +85,8 @@ type User struct {
PATsG []PersonalAccessToken `json:"-" gorm:"foreignKey:UserID;references:id;constraint:OnDelete:CASCADE;"`
// Blocked indicates whether the user is blocked. Blocked users can't use the system.
Blocked bool
// PendingApproval indicates whether the user requires approval before being activated
PendingApproval bool
// LastLogin is the last time the user logged in to IdP
LastLogin *time.Time
// CreatedAt records the time the user was created
@@ -141,16 +144,17 @@ func (u *User) ToUserInfo(userData *idp.UserData) (*UserInfo, error) {
if userData == nil {
return &UserInfo{
ID: u.Id,
Email: "",
Name: u.ServiceUserName,
Role: string(u.Role),
AutoGroups: u.AutoGroups,
Status: string(UserStatusActive),
IsServiceUser: u.IsServiceUser,
IsBlocked: u.Blocked,
LastLogin: u.GetLastLogin(),
Issued: u.Issued,
ID: u.Id,
Email: "",
Name: u.ServiceUserName,
Role: string(u.Role),
AutoGroups: u.AutoGroups,
Status: string(UserStatusActive),
IsServiceUser: u.IsServiceUser,
IsBlocked: u.Blocked,
LastLogin: u.GetLastLogin(),
Issued: u.Issued,
PendingApproval: u.PendingApproval,
}, nil
}
if userData.ID != u.Id {
@@ -163,16 +167,17 @@ func (u *User) ToUserInfo(userData *idp.UserData) (*UserInfo, error) {
}
return &UserInfo{
ID: u.Id,
Email: userData.Email,
Name: userData.Name,
Role: string(u.Role),
AutoGroups: autoGroups,
Status: string(userStatus),
IsServiceUser: u.IsServiceUser,
IsBlocked: u.Blocked,
LastLogin: u.GetLastLogin(),
Issued: u.Issued,
ID: u.Id,
Email: userData.Email,
Name: userData.Name,
Role: string(u.Role),
AutoGroups: autoGroups,
Status: string(userStatus),
IsServiceUser: u.IsServiceUser,
IsBlocked: u.Blocked,
LastLogin: u.GetLastLogin(),
Issued: u.Issued,
PendingApproval: u.PendingApproval,
}, nil
}
@@ -194,6 +199,7 @@ func (u *User) Copy() *User {
ServiceUserName: u.ServiceUserName,
PATs: pats,
Blocked: u.Blocked,
PendingApproval: u.PendingApproval,
LastLogin: u.LastLogin,
CreatedAt: u.CreatedAt,
Issued: u.Issued,

View File

@@ -519,33 +519,46 @@ func (am *DefaultAccountManager) SaveOrAddUsers(ctx context.Context, accountID,
initiatorUser = result
}
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
for _, update := range updates {
if update == nil {
return status.Errorf(status.InvalidArgument, "provided user update is nil")
}
var globalErr error
for _, update := range updates {
if update == nil {
return nil, status.Errorf(status.InvalidArgument, "provided user update is nil")
}
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
userHadPeers, updatedUser, userPeersToExpire, userEvents, err := am.processUserUpdate(
ctx, transaction, groupsMap, accountID, initiatorUserID, initiatorUser, update, addIfNotExists, settings,
)
if err != nil {
return fmt.Errorf("failed to process update for user %s: %w", update.Id, err)
}
usersToSave = append(usersToSave, updatedUser)
addUserEvents = append(addUserEvents, userEvents...)
peersToExpire = append(peersToExpire, userPeersToExpire...)
if userHadPeers {
updateAccountPeers = true
}
err = transaction.SaveUser(ctx, updatedUser)
if err != nil {
return fmt.Errorf("failed to save updated user %s: %w", update.Id, err)
}
usersToSave = append(usersToSave, updatedUser)
addUserEvents = append(addUserEvents, userEvents...)
peersToExpire = append(peersToExpire, userPeersToExpire...)
return nil
})
if err != nil {
log.WithContext(ctx).Errorf("failed to save user %s: %s", update.Id, err)
if len(updates) == 1 {
return nil, err
}
globalErr = errors.Join(globalErr, err)
// continue when updating multiple users
}
return transaction.SaveUsers(ctx, usersToSave)
})
if err != nil {
return nil, err
}
var updatedUsersInfo = make([]*types.UserInfo, 0, len(updates))
var updatedUsersInfo = make([]*types.UserInfo, 0, len(usersToSave))
userInfos, err := am.GetUsersFromAccount(ctx, accountID, initiatorUserID)
if err != nil {
@@ -578,7 +591,7 @@ func (am *DefaultAccountManager) SaveOrAddUsers(ctx context.Context, accountID,
am.UpdateAccountPeers(ctx, accountID)
}
return updatedUsersInfo, nil
return updatedUsersInfo, globalErr
}
// prepareUserUpdateEvents prepares a list user update events based on the changes between the old and new user data.
@@ -643,7 +656,7 @@ func (am *DefaultAccountManager) processUserUpdate(ctx context.Context, transact
}
transferredOwnerRole = result
userPeers, err := transaction.GetUserPeers(ctx, store.LockingStrengthUpdate, updatedUser.AccountID, update.Id)
userPeers, err := transaction.GetUserPeers(ctx, store.LockingStrengthNone, updatedUser.AccountID, update.Id)
if err != nil {
return false, nil, nil, nil, err
}
@@ -1194,3 +1207,77 @@ func (am *DefaultAccountManager) GetCurrentUserInfo(ctx context.Context, userAut
return userWithPermissions, nil
}
// ApproveUser approves a user that is pending approval
func (am *DefaultAccountManager) ApproveUser(ctx context.Context, accountID, initiatorUserID, targetUserID string) (*types.UserInfo, error) {
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, initiatorUserID, modules.Users, operations.Update)
if err != nil {
return nil, status.NewPermissionValidationError(err)
}
if !allowed {
return nil, status.NewPermissionDeniedError()
}
user, err := am.Store.GetUserByUserID(ctx, store.LockingStrengthNone, targetUserID)
if err != nil {
return nil, err
}
if user.AccountID != accountID {
return nil, status.NewUserNotFoundError(targetUserID)
}
if !user.PendingApproval {
return nil, status.Errorf(status.InvalidArgument, "user %s is not pending approval", targetUserID)
}
user.Blocked = false
user.PendingApproval = false
err = am.Store.SaveUser(ctx, user)
if err != nil {
return nil, err
}
am.StoreEvent(ctx, initiatorUserID, targetUserID, accountID, activity.UserApproved, nil)
userInfo, err := am.getUserInfo(ctx, user, accountID)
if err != nil {
return nil, err
}
return userInfo, nil
}
// RejectUser rejects a user that is pending approval by deleting them
func (am *DefaultAccountManager) RejectUser(ctx context.Context, accountID, initiatorUserID, targetUserID string) error {
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, initiatorUserID, modules.Users, operations.Delete)
if err != nil {
return status.NewPermissionValidationError(err)
}
if !allowed {
return status.NewPermissionDeniedError()
}
user, err := am.Store.GetUserByUserID(ctx, store.LockingStrengthNone, targetUserID)
if err != nil {
return err
}
if user.AccountID != accountID {
return status.NewUserNotFoundError(targetUserID)
}
if !user.PendingApproval {
return status.Errorf(status.InvalidArgument, "user %s is not pending approval", targetUserID)
}
err = am.DeleteUser(ctx, accountID, initiatorUserID, targetUserID)
if err != nil {
return err
}
am.StoreEvent(ctx, initiatorUserID, targetUserID, accountID, activity.UserRejected, nil)
return nil
}

View File

@@ -1746,3 +1746,117 @@ func mergeRolePermissions(role roles.RolePermissions) roles.Permissions {
return permissions
}
func TestApproveUser(t *testing.T) {
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
}
// Create account with admin and pending approval user
account := newAccountWithId(context.Background(), "account-1", "admin-user", "example.com", false)
err = manager.Store.SaveAccount(context.Background(), account)
require.NoError(t, err)
// Create admin user
adminUser := types.NewAdminUser("admin-user")
adminUser.AccountID = account.Id
err = manager.Store.SaveUser(context.Background(), adminUser)
require.NoError(t, err)
// Create user pending approval
pendingUser := types.NewRegularUser("pending-user")
pendingUser.AccountID = account.Id
pendingUser.Blocked = true
pendingUser.PendingApproval = true
err = manager.Store.SaveUser(context.Background(), pendingUser)
require.NoError(t, err)
// Test successful approval
approvedUser, err := manager.ApproveUser(context.Background(), account.Id, adminUser.Id, pendingUser.Id)
require.NoError(t, err)
assert.False(t, approvedUser.IsBlocked)
assert.False(t, approvedUser.PendingApproval)
// Verify user is updated in store
updatedUser, err := manager.Store.GetUserByUserID(context.Background(), store.LockingStrengthNone, pendingUser.Id)
require.NoError(t, err)
assert.False(t, updatedUser.Blocked)
assert.False(t, updatedUser.PendingApproval)
// Test approval of non-pending user should fail
_, err = manager.ApproveUser(context.Background(), account.Id, adminUser.Id, pendingUser.Id)
require.Error(t, err)
assert.Contains(t, err.Error(), "not pending approval")
// Test approval by non-admin should fail
regularUser := types.NewRegularUser("regular-user")
regularUser.AccountID = account.Id
err = manager.Store.SaveUser(context.Background(), regularUser)
require.NoError(t, err)
pendingUser2 := types.NewRegularUser("pending-user-2")
pendingUser2.AccountID = account.Id
pendingUser2.Blocked = true
pendingUser2.PendingApproval = true
err = manager.Store.SaveUser(context.Background(), pendingUser2)
require.NoError(t, err)
_, err = manager.ApproveUser(context.Background(), account.Id, regularUser.Id, pendingUser2.Id)
require.Error(t, err)
}
func TestRejectUser(t *testing.T) {
manager, err := createManager(t)
if err != nil {
t.Fatal(err)
}
// Create account with admin and pending approval user
account := newAccountWithId(context.Background(), "account-1", "admin-user", "example.com", false)
err = manager.Store.SaveAccount(context.Background(), account)
require.NoError(t, err)
// Create admin user
adminUser := types.NewAdminUser("admin-user")
adminUser.AccountID = account.Id
err = manager.Store.SaveUser(context.Background(), adminUser)
require.NoError(t, err)
// Create user pending approval
pendingUser := types.NewRegularUser("pending-user")
pendingUser.AccountID = account.Id
pendingUser.Blocked = true
pendingUser.PendingApproval = true
err = manager.Store.SaveUser(context.Background(), pendingUser)
require.NoError(t, err)
// Test successful rejection
err = manager.RejectUser(context.Background(), account.Id, adminUser.Id, pendingUser.Id)
require.NoError(t, err)
// Verify user is deleted from store
_, err = manager.Store.GetUserByUserID(context.Background(), store.LockingStrengthNone, pendingUser.Id)
require.Error(t, err)
// Test rejection of non-pending user should fail
regularUser := types.NewRegularUser("regular-user")
regularUser.AccountID = account.Id
err = manager.Store.SaveUser(context.Background(), regularUser)
require.NoError(t, err)
err = manager.RejectUser(context.Background(), account.Id, adminUser.Id, regularUser.Id)
require.Error(t, err)
assert.Contains(t, err.Error(), "not pending approval")
// Test rejection by non-admin should fail
pendingUser2 := types.NewRegularUser("pending-user-2")
pendingUser2.AccountID = account.Id
pendingUser2.Blocked = true
pendingUser2.PendingApproval = true
err = manager.Store.SaveUser(context.Background(), pendingUser2)
require.NoError(t, err)
err = manager.RejectUser(context.Background(), account.Id, regularUser.Id, pendingUser2.Id)
require.Error(t, err)
}

View File

@@ -73,7 +73,12 @@ func (l *Listener) Shutdown(ctx context.Context) error {
func (l *Listener) onAccept(w http.ResponseWriter, r *http.Request) {
connRemoteAddr := remoteAddr(r)
wsConn, err := websocket.Accept(w, r, nil)
acceptOptions := &websocket.AcceptOptions{
OriginPatterns: []string{"*"},
}
wsConn, err := websocket.Accept(w, r, acceptOptions)
if err != nil {
log.Errorf("failed to accept ws connection from %s: %s", connRemoteAddr, err)
return

View File

@@ -8,8 +8,8 @@ import (
"net/http/httptest"
"testing"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools/channel"
"github.com/netbirdio/netbird/shared/management/client/rest"
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
)
func withMockClient(callback func(*rest.Client, *http.ServeMux)) {
@@ -26,7 +26,7 @@ func ptr[T any, PT *T](x T) PT {
func withBlackBoxServer(t *testing.T, callback func(*rest.Client)) {
t.Helper()
handler, _, _ := testing_tools.BuildApiBlackBoxWithDBState(t, "../../../../management/server/testdata/store.sql", nil, false)
handler, _, _ := channel.BuildApiBlackBoxWithDBState(t, "../../../../management/server/testdata/store.sql", nil, false)
server := httptest.NewServer(handler)
defer server.Close()
c := rest.New(server.URL, "nbp_apTmlmUXHSC4PKmHwtIZNaGr8eqcVI2gMURp")

View File

@@ -158,6 +158,10 @@ components:
description: (Cloud only) Enables or disables peer approval globally. If enabled, all peers added will be in pending state until approved by an admin.
type: boolean
example: true
user_approval_required:
description: Enables manual approval for new users joining via domain matching. When enabled, users are blocked with pending approval status until explicitly approved by an admin.
type: boolean
example: false
network_traffic_logs_enabled:
description: Enables or disables network traffic logging. If enabled, all network traffic events from peers will be stored.
type: boolean
@@ -174,6 +178,7 @@ components:
example: true
required:
- peer_approval_enabled
- user_approval_required
- network_traffic_logs_enabled
- network_traffic_logs_groups
- network_traffic_packet_counter_enabled
@@ -235,6 +240,10 @@ components:
description: Is true if this user is blocked. Blocked users can't use the system
type: boolean
example: false
pending_approval:
description: Is true if this user requires approval before being activated. Only applicable for users joining via domain matching when user_approval_required is enabled.
type: boolean
example: false
issued:
description: How user was issued by API or Integration
type: string
@@ -249,6 +258,7 @@ components:
- auto_groups
- status
- is_blocked
- pending_approval
UserPermissions:
type: object
properties:
@@ -2544,6 +2554,63 @@ paths:
"$ref": "#/components/responses/forbidden"
'500':
"$ref": "#/components/responses/internal_error"
/api/users/{userId}/approve:
post:
summary: Approve user
description: Approve a user that is pending approval
tags: [ Users ]
security:
- BearerAuth: [ ]
- TokenAuth: [ ]
parameters:
- in: path
name: userId
required: true
schema:
type: string
description: The unique identifier of a user
responses:
'200':
description: Returns the approved user
content:
application/json:
schema:
"$ref": "#/components/schemas/User"
'400':
"$ref": "#/components/responses/bad_request"
'401':
"$ref": "#/components/responses/requires_authentication"
'403':
"$ref": "#/components/responses/forbidden"
'500':
"$ref": "#/components/responses/internal_error"
/api/users/{userId}/reject:
delete:
summary: Reject user
description: Reject a user that is pending approval by removing them from the account
tags: [ Users ]
security:
- BearerAuth: [ ]
- TokenAuth: [ ]
parameters:
- in: path
name: userId
required: true
schema:
type: string
description: The unique identifier of a user
responses:
'200':
description: User rejected successfully
content: {}
'400':
"$ref": "#/components/responses/bad_request"
'401':
"$ref": "#/components/responses/requires_authentication"
'403':
"$ref": "#/components/responses/forbidden"
'500':
"$ref": "#/components/responses/internal_error"
/api/users/current:
get:
summary: Retrieve current user

View File

@@ -268,6 +268,9 @@ type AccountExtraSettings struct {
// PeerApprovalEnabled (Cloud only) Enables or disables peer approval globally. If enabled, all peers added will be in pending state until approved by an admin.
PeerApprovalEnabled bool `json:"peer_approval_enabled"`
// UserApprovalRequired Enables manual approval for new users joining via domain matching. When enabled, users are blocked with pending approval status until explicitly approved by an admin.
UserApprovalRequired bool `json:"user_approval_required"`
}
// AccountOnboarding defines model for AccountOnboarding.
@@ -1015,8 +1018,6 @@ type OSVersionCheck struct {
// Peer defines model for Peer.
type Peer struct {
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// ApprovalRequired (Cloud only) Indicates whether peer needs approval
ApprovalRequired bool `json:"approval_required"`
@@ -1032,6 +1033,9 @@ type Peer struct {
// CountryCode 2-letter ISO 3166-1 alpha-2 code that represents the country
CountryCode CountryCode `json:"country_code"`
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
DnsLabel string `json:"dns_label"`
@@ -1098,8 +1102,6 @@ type Peer struct {
// PeerBatch defines model for PeerBatch.
type PeerBatch struct {
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// AccessiblePeersCount Number of accessible peers
AccessiblePeersCount int `json:"accessible_peers_count"`
@@ -1118,6 +1120,9 @@ type PeerBatch struct {
// CountryCode 2-letter ISO 3166-1 alpha-2 code that represents the country
CountryCode CountryCode `json:"country_code"`
// CreatedAt Peer creation date (UTC)
CreatedAt time.Time `json:"created_at"`
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
DnsLabel string `json:"dns_label"`
@@ -1774,8 +1779,11 @@ type User struct {
LastLogin *time.Time `json:"last_login,omitempty"`
// Name User's name from idp provider
Name string `json:"name"`
Permissions *UserPermissions `json:"permissions,omitempty"`
Name string `json:"name"`
// PendingApproval Is true if this user requires approval before being activated. Only applicable for users joining via domain matching when user_approval_required is enabled.
PendingApproval bool `json:"pending_approval"`
Permissions *UserPermissions `json:"permissions,omitempty"`
// Role User's NetBird account role
Role string `json:"role"`

View File

@@ -42,7 +42,10 @@ const (
// Type is a type of the Error
type Type int32
var ErrExtraSettingsNotFound = fmt.Errorf("extra settings not found")
var (
ErrExtraSettingsNotFound = errors.New("extra settings not found")
ErrPeerAlreadyLoggedIn = errors.New("peer with the same public key is already logged in")
)
// Error is an internal error
type Error struct {
@@ -110,6 +113,11 @@ func NewUserBlockedError() error {
return Errorf(PermissionDenied, "user is blocked")
}
// NewUserPendingApprovalError creates a new Error with PermissionDenied type for a blocked user pending approval
func NewUserPendingApprovalError() error {
return Errorf(PermissionDenied, "user is pending approval")
}
// NewPeerNotRegisteredError creates a new Error with Unauthenticated type unregistered peer
func NewPeerNotRegisteredError() error {
return Errorf(Unauthenticated, "peer is not registered")

View File

@@ -230,10 +230,8 @@ func (s *SharedSocket) Close() error {
// read start a read loop for a specific receiver and sends the packet to the packetDemux channel
func (s *SharedSocket) read(receiver receiver) {
// Buffer reuse is safe: packetDemux is unbuffered, so read() blocks until
// ReadFrom() synchronously processes the packet before next iteration
buf := make([]byte, s.mtu+maxIPUDPOverhead)
for {
buf := make([]byte, s.mtu+maxIPUDPOverhead)
n, addr, err := receiver(s.ctx, buf, 0)
select {
case <-s.ctx.Done():