diff --git a/client/iface/configurer/kernel_unix.go b/client/iface/configurer/kernel_unix.go index 7c1c41669..d358220d5 100644 --- a/client/iface/configurer/kernel_unix.go +++ b/client/iface/configurer/kernel_unix.go @@ -218,3 +218,30 @@ func (c *KernelConfigurer) GetStats(peerKey string) (WGStats, error) { RxBytes: peer.ReceiveBytes, }, nil } + +func (c *KernelConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) { + stats := make(map[wgtypes.Key]WGStats) + wg, err := wgctrl.New() + if err != nil { + return nil, fmt.Errorf("wgctl: %w", err) + } + defer func() { + err = wg.Close() + if err != nil { + log.Errorf("Got error while closing wgctl: %v", err) + } + }() + + wgDevice, err := wg.Device(c.deviceName) + if err != nil { + return nil, fmt.Errorf("get device %s: %w", c.deviceName, err) + } + for _, peer := range wgDevice.Peers { + stats[peer.PublicKey] = WGStats{ + LastHandshake: peer.LastHandshakeTime, + TxBytes: peer.TransmitBytes, + RxBytes: peer.ReceiveBytes, + } + } + return stats, nil +} diff --git a/client/iface/configurer/usp.go b/client/iface/configurer/usp.go index 391269dd0..b0fbdaf8f 100644 --- a/client/iface/configurer/usp.go +++ b/client/iface/configurer/usp.go @@ -1,6 +1,7 @@ package configurer import ( + "encoding/base64" "encoding/hex" "fmt" "net" @@ -17,6 +18,13 @@ import ( nbnet "github.com/netbirdio/netbird/util/net" ) +const ( + ipcKeyLastHandshakeTimeSec = "last_handshake_time_sec" + ipcKeyLastHandshakeTimeNsec = "last_handshake_time_nsec" + ipcKeyTxBytes = "tx_bytes" + ipcKeyRxBytes = "rx_bytes" +) + var ErrAllowedIPNotFound = fmt.Errorf("allowed IP not found") type WGUSPConfigurer struct { @@ -230,39 +238,115 @@ func (t *WGUSPConfigurer) GetStats(peerKey string) (WGStats, error) { } stats, err := findPeerInfo(ipc, peerKey, []string{ - "last_handshake_time_sec", - "last_handshake_time_nsec", - "tx_bytes", - "rx_bytes", + ipcKeyLastHandshakeTimeSec, + ipcKeyLastHandshakeTimeNsec, + ipcKeyTxBytes, + ipcKeyRxBytes, }) if err != nil { return WGStats{}, fmt.Errorf("find peer info: %w", err) } - sec, err := strconv.ParseInt(stats["last_handshake_time_sec"], 10, 64) + lastHandshake, err := toLastHandshake(stats[ipcKeyLastHandshakeTimeSec]) if err != nil { - return WGStats{}, fmt.Errorf("parse handshake sec: %w", err) + return WGStats{}, err } - nsec, err := strconv.ParseInt(stats["last_handshake_time_nsec"], 10, 64) + + txBytes, err := toTxBytes(stats[ipcKeyTxBytes]) if err != nil { - return WGStats{}, fmt.Errorf("parse handshake nsec: %w", err) + return WGStats{}, err } - txBytes, err := strconv.ParseInt(stats["tx_bytes"], 10, 64) + + rxBytes, err := toRxBytes(stats[ipcKeyRxBytes]) if err != nil { - return WGStats{}, fmt.Errorf("parse tx_bytes: %w", err) - } - rxBytes, err := strconv.ParseInt(stats["rx_bytes"], 10, 64) - if err != nil { - return WGStats{}, fmt.Errorf("parse rx_bytes: %w", err) + return WGStats{}, err } return WGStats{ - LastHandshake: time.Unix(sec, nsec), + LastHandshake: lastHandshake, TxBytes: txBytes, RxBytes: rxBytes, }, nil } +func (t *WGUSPConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) { + ipc, err := t.device.IpcGet() + if err != nil { + return nil, fmt.Errorf("ipc get: %w", err) + } + + return parseTransfers(ipc) +} + +func parseTransfers(ipc string) (map[wgtypes.Key]WGStats, error) { + stats := make(map[wgtypes.Key]WGStats) + var ( + currentKey wgtypes.Key + currentStats WGStats + hasPeer bool + ) + lines := strings.Split(ipc, "\n") + for _, line := range lines { + line = strings.TrimSpace(line) + + // If we're within the details of the found peer and encounter another public key, + // this means we're starting another peer's details. So, stop. + if strings.HasPrefix(line, "public_key=") { + peerID := strings.TrimPrefix(line, "public_key=") + h, err := hex.DecodeString(peerID) + if err != nil { + return nil, fmt.Errorf("decode peerID: %w", err) + } + b64 := base64.StdEncoding.EncodeToString(h) + + peerKeyParsed, err := wgtypes.ParseKey(b64) + if err != nil { + return nil, fmt.Errorf("parse key: %w", err) + } + + currentKey = peerKeyParsed + currentStats = WGStats{} // Reset stats for the new peer + hasPeer = true + stats[currentKey] = currentStats + continue + } + + if !hasPeer { + continue + } + + key := strings.SplitN(line, "=", 2) + if len(key) != 2 { + continue + } + switch key[0] { + case ipcKeyLastHandshakeTimeSec: + hs, err := toLastHandshake(key[1]) + if err != nil { + return nil, err + } + currentStats.LastHandshake = hs + stats[currentKey] = currentStats + case ipcKeyRxBytes: + rxBytes, err := strconv.ParseInt(key[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("parse rx_bytes: %w", err) + } + currentStats.RxBytes = rxBytes + stats[currentKey] = currentStats + case ipcKeyTxBytes: + txBytes, err := strconv.ParseInt(key[1], 10, 64) + if err != nil { + return nil, fmt.Errorf("parse tx_bytes: %w", err) + } + currentStats.TxBytes = txBytes + stats[currentKey] = currentStats + } + } + + return stats, nil +} + func findPeerInfo(ipcInput string, peerKey string, searchConfigKeys []string) (map[string]string, error) { peerKeyParsed, err := wgtypes.ParseKey(peerKey) if err != nil { @@ -361,6 +445,38 @@ func toWgUserspaceString(wgCfg wgtypes.Config) string { return sb.String() } +func toLastHandshake(stringVar string) (time.Time, error) { + sec, err := strconv.ParseInt(stringVar, 10, 64) + if err != nil { + return time.Time{}, fmt.Errorf("parse handshake sec: %w", err) + } + nsec, err := strconv.ParseInt(stringVar, 10, 64) + if err != nil { + return time.Time{}, fmt.Errorf("parse handshake nsec: %w", err) + } + return time.Unix(sec, nsec), nil +} + +func toRxBytes(s string) (int64, error) { + b, err := toBytes(s) + if err != nil { + return 0, fmt.Errorf("parse rx_bytes: %w", err) + } + return b, nil +} + +func toTxBytes(s string) (int64, error) { + b, err := toBytes(s) + if err != nil { + return 0, fmt.Errorf("parse tx_bytes: %w", err) + } + return b, nil +} + +func toBytes(s string) (int64, error) { + return strconv.ParseInt(s, 10, 64) +} + func getFwmark() int { if nbnet.AdvancedRouting() { return nbnet.NetbirdFwmark diff --git a/client/iface/configurer/usp_test.go b/client/iface/configurer/usp_test.go index 775339f24..8e43cb359 100644 --- a/client/iface/configurer/usp_test.go +++ b/client/iface/configurer/usp_test.go @@ -34,6 +34,17 @@ errno=0 ` +func Test_Transfer(t *testing.T) { + stats, err := parseTransfers(ipcFixture) + if err != nil { + t.Fatal(err) + } + + if len(stats) != 3 { + t.Fatalf("expected 2 stats, got %d", len(stats)) + } +} + func Test_findPeerInfo(t *testing.T) { tests := []struct { name string diff --git a/client/iface/device/interface.go b/client/iface/device/interface.go index 0196b0085..54d1a9ed0 100644 --- a/client/iface/device/interface.go +++ b/client/iface/device/interface.go @@ -17,4 +17,5 @@ type WGConfigurer interface { RemoveAllowedIP(peerKey string, allowedIP string) error Close() GetStats(peerKey string) (configurer.WGStats, error) + Transfers() (map[wgtypes.Key]configurer.WGStats, error) } diff --git a/client/iface/iface.go b/client/iface/iface.go index 8056dd9a6..102690d50 100644 --- a/client/iface/iface.go +++ b/client/iface/iface.go @@ -216,6 +216,10 @@ func (w *WGIface) GetStats(peerKey string) (configurer.WGStats, error) { return w.configurer.GetStats(peerKey) } +func (w *WGIface) Transfers() (map[wgtypes.Key]configurer.WGStats, error) { + return w.configurer.Transfers() +} + func (w *WGIface) waitUntilRemoved() error { maxWaitTime := 5 * time.Second timeout := time.NewTimer(maxWaitTime) diff --git a/client/internal/lazyconn/listener/listener.go b/client/internal/lazyconn/listener/listener.go new file mode 100644 index 000000000..0cd91864d --- /dev/null +++ b/client/internal/lazyconn/listener/listener.go @@ -0,0 +1,56 @@ +package listener + +import ( + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "net" + "sync" + + log "github.com/sirupsen/logrus" +) + +type Listener struct { + peerID wgtypes.Key + + conn *net.UDPConn + wg sync.WaitGroup +} + +func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) { + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return nil, err + } + + d := &Listener{ + conn: conn, + peerID: peerID, + } + return d, nil +} + +func (d *Listener) ReadPackets(trigger func(peerID wgtypes.Key)) { + d.wg.Done() + defer d.wg.Done() + + for { + buffer := make([]byte, 10) + n, remoteAddr, err := d.conn.ReadFromUDP(buffer) + if err != nil { + log.Infof("exit from fake peer reader: %v", err) + return + } + + if n < 4 { + log.Warnf("received %d bytes from %s, too short", n, remoteAddr) + continue + } + trigger(d.peerID) + } +} + +func (d *Listener) Close() { + if err := d.conn.Close(); err != nil { + log.Errorf("failed to close UDP listener: %s", err) + } + d.wg.Wait() +} diff --git a/client/internal/lazyconn/listener/manager.go b/client/internal/lazyconn/listener/manager.go new file mode 100644 index 000000000..ea32dd39a --- /dev/null +++ b/client/internal/lazyconn/listener/manager.go @@ -0,0 +1,133 @@ +package listener + +import ( + "fmt" + "net" + + log "github.com/sirupsen/logrus" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + + "github.com/netbirdio/netbird/client/internal/lazyconn" +) + +type portGenerator struct { + nextFreePort uint16 +} + +func newPortGenerator() *portGenerator { + return &portGenerator{ + nextFreePort: 65535, + } +} + +func (p *portGenerator) nextPort() int { + port := p.nextFreePort + p.nextFreePort-- + if p.nextFreePort == 0 { + p.nextFreePort = 65535 + } + return int(port) +} + +type Manager struct { + TrafficStartChan chan wgtypes.Key + + wgIface lazyconn.WGIface + + portGenerator *portGenerator + peers map[wgtypes.Key]*Listener + done chan struct{} +} + +func NewManager(wgIface lazyconn.WGIface) *Manager { + m := &Manager{ + TrafficStartChan: make(chan wgtypes.Key, 1), + wgIface: wgIface, + portGenerator: newPortGenerator(), + done: make(chan struct{}), + } + return m +} + +func (m *Manager) CreateFakePeers(peerCfg lazyconn.PeerConfig) error { + if _, ok := m.peers[peerCfg.PublicKey]; ok { + return nil + } + + if err := m.createFakePeer(peerCfg); err != nil { + return err + } + log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey.String()) + return nil +} + +func (m *Manager) RemovePeer(peerID wgtypes.Key) { + listener, ok := m.peers[peerID] + if !ok { + return + } + + listener.Close() + + if err := m.wgIface.RemovePeer(peerID); err != nil { + log.Warnf("failed to remove fake peer: %v", err) + } + + delete(m.peers, peerID) +} + +func (m *Manager) Close() { + close(m.done) + for peerID, listener := range m.peers { + listener.Close() + delete(m.peers, peerID) + } +} + +func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error { + var ( + listener *Listener + err error + addr *net.UDPAddr + ) + for i := 0; i < 100; i++ { + addr = &net.UDPAddr{ + Port: m.portGenerator.nextPort(), + IP: net.ParseIP("127.0.0.254"), + } + listener, err = NewListener(peerCfg.PublicKey, addr) + if err != nil { + log.Debugf("failed to allocate port: %d: %v", addr.Port, err) + continue + } + } + + if listener == nil { + return fmt.Errorf("failed to allocate lazy connection port for: %s", peerCfg.PublicKey.String()) + } + + if err := m.createEndpoint(peerCfg, addr); err != nil { + log.Errorf("failed to create endpoint for %s: %v", peerCfg.PublicKey.String(), err) + listener.Close() + return err + } + + go listener.ReadPackets(m.onTrigger) + + m.peers[peerCfg.PublicKey] = listener + return nil +} + +func (m *Manager) onTrigger(peerID wgtypes.Key) { + if err := m.wgIface.RemovePeer(peerID); err != nil { + log.Errorf("failed to remove peer: %v", err) + } + select { + case <-m.done: + case m.TrafficStartChan <- peerID: + } +} + +func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error { + return m.wgIface.UpdatePeer(peerCfg.PublicKey.String(), peerCfg.AllowedIP.String(), 0, endpoint, nil) +} diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go new file mode 100644 index 000000000..120f0bd54 --- /dev/null +++ b/client/internal/lazyconn/manager/manager.go @@ -0,0 +1,118 @@ +package manager + +import ( + "context" + "sync" + + log "github.com/sirupsen/logrus" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + + "github.com/netbirdio/netbird/client/internal/lazyconn" + "github.com/netbirdio/netbird/client/internal/lazyconn/listener" + "github.com/netbirdio/netbird/client/internal/lazyconn/watcher" +) + +type Manager struct { + watcher *watcher.Watcher + listenerMgr *listener.Manager + managedPeers map[wgtypes.Key]lazyconn.PeerConfig + + addPeers chan []lazyconn.PeerConfig + removePeer chan wgtypes.Key + + watcherWG sync.WaitGroup + mu sync.Mutex +} + +func NewManager(wgIface lazyconn.WGIface) *Manager { + m := &Manager{ + watcher: watcher.NewWatcher(wgIface), + listenerMgr: listener.NewManager(wgIface), + managedPeers: make(map[wgtypes.Key]lazyconn.PeerConfig), + addPeers: make(chan []lazyconn.PeerConfig, 1), + removePeer: make(chan wgtypes.Key, 1), + } + return m +} + +func (m *Manager) Start() { + m.mu.Lock() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + m.watcherWG.Add(1) + m.mu.Unlock() + + go func() { + m.watcher.Watch(ctx) + m.watcherWG.Done() + }() + + for { + select { + case <-ctx.Done(): + return + case peerID := <-m.watcher.PeerTimedOutChan: + m.mu.Lock() + cfg, ok := m.managedPeers[peerID] + if !ok { + continue + } + + if err := m.listenerMgr.CreateFakePeers(cfg); err != nil { + log.Errorf("failed to start watch lazy connection tries: %s", err) + } + m.mu.Unlock() + case peerID := <-m.listenerMgr.TrafficStartChan: + m.mu.Lock() + _, ok := m.managedPeers[peerID] + if !ok { + continue + } + + log.Infof("peer %s started to send traffic", peerID) + m.watcher.AddPeer(peerID) + m.notifyPeerAction(peerID) + m.mu.Unlock() + } + } +} + +func (m *Manager) SetPeer(peer lazyconn.PeerConfig) error { + m.mu.Lock() + defer m.mu.Unlock() + + if _, ok := m.managedPeers[peer.PublicKey]; ok { + return nil + } + + if err := m.listenerMgr.CreateFakePeers(peer); err != nil { + return err + } + + // todo: remove removed peers from the list + return nil +} + +func (m *Manager) RemovePeer(peerID wgtypes.Key) { + m.mu.Lock() + defer m.mu.Unlock() + + m.watcher.RemovePeer(peerID) + m.listenerMgr.RemovePeer(peerID) + delete(m.managedPeers, peerID) +} + +func (m *Manager) Close() { + m.mu.Lock() + defer m.mu.Unlock() + + m.listenerMgr.Close() + m.watcherWG.Wait() + m.managedPeers = make(map[wgtypes.Key]lazyconn.PeerConfig) +} + +func (m *Manager) notifyPeerAction(peerID wgtypes.Key) { + // todo notify engine +} diff --git a/client/internal/lazyconn/peercfg.go b/client/internal/lazyconn/peercfg.go new file mode 100644 index 000000000..c2b3fed86 --- /dev/null +++ b/client/internal/lazyconn/peercfg.go @@ -0,0 +1,12 @@ +package lazyconn + +import ( + "net" + + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" +) + +type PeerConfig struct { + PublicKey wgtypes.Key + AllowedIP net.IPNet +} diff --git a/client/internal/lazyconn/watcher/watcher.go b/client/internal/lazyconn/watcher/watcher.go new file mode 100644 index 000000000..5d96fe6c0 --- /dev/null +++ b/client/internal/lazyconn/watcher/watcher.go @@ -0,0 +1,103 @@ +package watcher + +import ( + "context" + "sync" + "time" + + log "github.com/sirupsen/logrus" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + + "github.com/netbirdio/netbird/client/iface/configurer" + "github.com/netbirdio/netbird/client/internal/lazyconn" +) + +const ( + checkPeriod = 75 * time.Second // 3 * keep alive time (25s) + expectedMinimumRx = 90 * 2 // 2x keep alive packets +) + +type rxHistory struct { + received int64 +} + +type Watcher struct { + PeerTimedOutChan chan wgtypes.Key + + wgIface lazyconn.WGIface + + peers map[wgtypes.Key]*rxHistory + peersMu sync.Mutex +} + +func NewWatcher(wgIface lazyconn.WGIface) *Watcher { + return &Watcher{ + PeerTimedOutChan: make(chan wgtypes.Key, 1), + wgIface: wgIface, + peers: make(map[wgtypes.Key]*rxHistory), + } +} + +func (m *Watcher) Watch(ctx context.Context) { + timer := time.NewTimer(checkPeriod) + defer timer.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-timer.C: + stats, err := m.wgIface.Transfers() + if err != nil { + log.Errorf("failed to get peer stats: %s", err) + continue + } + m.checkTimeouts(ctx, stats) + } + } +} + +func (m *Watcher) AddPeer(peerID wgtypes.Key) { + m.peersMu.Lock() + defer m.peersMu.Unlock() + + m.peers[peerID] = &rxHistory{} +} + +func (m *Watcher) RemovePeer(id wgtypes.Key) { + m.peersMu.Lock() + defer m.peersMu.Unlock() + + delete(m.peers, id) +} + +// Todo: this is a naive implementation, we must to finish it +func (m *Watcher) checkTimeouts(ctx context.Context, allPeersStats map[wgtypes.Key]configurer.WGStats) { + m.peersMu.Lock() + defer m.peersMu.Unlock() + + for p, rxh := range m.peers { + s, ok := allPeersStats[p] + if !ok { + log.Warnf("no stats for peer %s", p) + } + + // received bytes since last check + received := s.RxBytes - rxh.received + if received >= expectedMinimumRx { + rxh.received = s.RxBytes + continue + } + + // todo handle that case when swtich from P2P to Relay and the endpoint has been reseted. + + // peer timed out + delete(m.peers, p) + + select { + case <-ctx.Done(): + return + case m.PeerTimedOutChan <- p: + } + } +} diff --git a/client/internal/lazyconn/wgiface.go b/client/internal/lazyconn/wgiface.go new file mode 100644 index 000000000..0a7a79c13 --- /dev/null +++ b/client/internal/lazyconn/wgiface.go @@ -0,0 +1,16 @@ +package lazyconn + +import ( + "net" + "time" + + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + + "github.com/netbirdio/netbird/client/iface/configurer" +) + +type WGIface interface { + Transfers() (map[wgtypes.Key]configurer.WGStats, error) + RemovePeer(key wgtypes.Key) error + UpdatePeer(peerKey string, allowedIps string, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error +}