Create conn mgr

This commit is contained in:
Zoltán Papp
2025-02-25 23:50:25 +01:00
parent 0457251d09
commit 8542674a83
15 changed files with 184 additions and 125 deletions

View File

@@ -213,8 +213,8 @@ func (c *KernelConfigurer) GetStats(peerKey string) (WGStats, error) {
}, nil }, nil
} }
func (c *KernelConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) { func (c *KernelConfigurer) Transfers() (map[string]WGStats, error) {
stats := make(map[wgtypes.Key]WGStats) stats := make(map[string]WGStats)
wg, err := wgctrl.New() wg, err := wgctrl.New()
if err != nil { if err != nil {
return nil, fmt.Errorf("wgctl: %w", err) return nil, fmt.Errorf("wgctl: %w", err)
@@ -231,7 +231,7 @@ func (c *KernelConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) {
return nil, fmt.Errorf("get device %s: %w", c.deviceName, err) return nil, fmt.Errorf("get device %s: %w", c.deviceName, err)
} }
for _, peer := range wgDevice.Peers { for _, peer := range wgDevice.Peers {
stats[peer.PublicKey] = WGStats{ stats[peer.PublicKey.String()] = WGStats{
LastHandshake: peer.LastHandshakeTime, LastHandshake: peer.LastHandshakeTime,
TxBytes: peer.TransmitBytes, TxBytes: peer.TransmitBytes,
RxBytes: peer.ReceiveBytes, RxBytes: peer.ReceiveBytes,

View File

@@ -263,7 +263,7 @@ func (t *WGUSPConfigurer) GetStats(peerKey string) (WGStats, error) {
}, nil }, nil
} }
func (t *WGUSPConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) { func (t *WGUSPConfigurer) Transfers() (map[string]WGStats, error) {
ipc, err := t.device.IpcGet() ipc, err := t.device.IpcGet()
if err != nil { if err != nil {
return nil, fmt.Errorf("ipc get: %w", err) return nil, fmt.Errorf("ipc get: %w", err)
@@ -272,10 +272,10 @@ func (t *WGUSPConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) {
return parseTransfers(ipc) return parseTransfers(ipc)
} }
func parseTransfers(ipc string) (map[wgtypes.Key]WGStats, error) { func parseTransfers(ipc string) (map[string]WGStats, error) {
stats := make(map[wgtypes.Key]WGStats) stats := make(map[string]WGStats)
var ( var (
currentKey wgtypes.Key currentKey string
currentStats WGStats currentStats WGStats
hasPeer bool hasPeer bool
) )
@@ -291,14 +291,7 @@ func parseTransfers(ipc string) (map[wgtypes.Key]WGStats, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("decode peerID: %w", err) return nil, fmt.Errorf("decode peerID: %w", err)
} }
b64 := base64.StdEncoding.EncodeToString(h) currentKey = base64.StdEncoding.EncodeToString(h)
peerKeyParsed, err := wgtypes.ParseKey(b64)
if err != nil {
return nil, fmt.Errorf("parse key: %w", err)
}
currentKey = peerKeyParsed
currentStats = WGStats{} // Reset stats for the new peer currentStats = WGStats{} // Reset stats for the new peer
hasPeer = true hasPeer = true
stats[currentKey] = currentStats stats[currentKey] = currentStats

View File

@@ -17,5 +17,5 @@ type WGConfigurer interface {
RemoveAllowedIP(peerKey string, allowedIP string) error RemoveAllowedIP(peerKey string, allowedIP string) error
Close() Close()
GetStats(peerKey string) (configurer.WGStats, error) GetStats(peerKey string) (configurer.WGStats, error)
Transfers() (map[wgtypes.Key]configurer.WGStats, error) Transfers() (map[string]configurer.WGStats, error)
} }

View File

@@ -218,7 +218,7 @@ func (w *WGIface) GetStats(peerKey string) (configurer.WGStats, error) {
return w.configurer.GetStats(peerKey) return w.configurer.GetStats(peerKey)
} }
func (w *WGIface) Transfers() (map[wgtypes.Key]configurer.WGStats, error) { func (w *WGIface) Transfers() (map[string]configurer.WGStats, error) {
return w.configurer.Transfers() return w.configurer.Transfers()
} }

103
client/internal/conn_mgr.go Normal file
View File

@@ -0,0 +1,103 @@
package internal
import (
"context"
"os"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/lazyconn"
lazyConnManager "github.com/netbirdio/netbird/client/internal/lazyconn/manager"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/peerstore"
)
const (
envDisableLazyConn = "NB_LAZY_CONN_DISABLE"
)
type ConnMgr struct {
lazyConnMgr *lazyConnManager.Manager
peerStore *peerstore.Store
excludes map[string]struct{}
}
func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface) *ConnMgr {
e := &ConnMgr{
peerStore: peerStore,
lazyConnMgr: lazyConnManager.NewManager(iface),
}
return e
}
func (e *ConnMgr) Start(ctx context.Context) {
if os.Getenv(envDisableLazyConn) == "true" {
log.Infof("lazy connection manager is disabled")
return
}
go e.lazyConnMgr.Start()
go e.receiveLazyConnEvents(ctx)
}
func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) {
e.excludes[peerID] = struct{}{}
}
func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
if success := e.peerStore.AddPeerConn(peerKey, conn); !success {
return true
}
_, exists = e.excludes[peerKey]
if exists {
conn.Open()
return
}
lazyPeerCfg := lazyconn.PeerConfig{
PublicKey: peerKey,
AllowedIPs: conn.WgConfig().AllowedIps,
}
if err := e.lazyConnMgr.AddPeer(lazyPeerCfg); err != nil {
log.Errorf("failed to add peer to lazyconn manager: %v", err)
conn.Open()
}
return
}
func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) {
conn, ok := e.peerStore.PeerConn(peerKey)
if !ok {
return nil, false
}
if ok := e.lazyConnMgr.RemovePeer(peerKey); ok {
conn.Open()
}
return conn, true
}
func (e *ConnMgr) RemovePeerConn(peerKey string) {
conn, ok := e.peerStore.Remove(peerKey)
if ok {
conn.Close()
}
e.lazyConnMgr.RemovePeer(peerKey)
}
func (e *ConnMgr) Close() {
// todo wait for receiveLazyConnEvents to finish
e.lazyConnMgr.Close()
}
func (e *ConnMgr) receiveLazyConnEvents(ctx context.Context) {
for {
select {
case peerID := <-e.lazyConnMgr.PeerActivityChan:
e.peerStore.PeerConnOpen(peerID)
case <-ctx.Done():
}
}
}

View File

@@ -33,8 +33,6 @@ import (
"github.com/netbirdio/netbird/client/internal/acl" "github.com/netbirdio/netbird/client/internal/acl"
"github.com/netbirdio/netbird/client/internal/dns" "github.com/netbirdio/netbird/client/internal/dns"
"github.com/netbirdio/netbird/client/internal/dnsfwd" "github.com/netbirdio/netbird/client/internal/dnsfwd"
"github.com/netbirdio/netbird/client/internal/lazyconn"
lazyConnManager "github.com/netbirdio/netbird/client/internal/lazyconn/manager"
"github.com/netbirdio/netbird/client/internal/networkmonitor" "github.com/netbirdio/netbird/client/internal/networkmonitor"
"github.com/netbirdio/netbird/client/internal/peer" "github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/client/internal/peer/guard" "github.com/netbirdio/netbird/client/internal/peer/guard"
@@ -133,7 +131,7 @@ type Engine struct {
// peerConns is a map that holds all the peers that are known to this peer // peerConns is a map that holds all the peers that are known to this peer
peerStore *peerstore.Store peerStore *peerstore.Store
lazyConnMgr *lazyConnManager.Manager connMgr *ConnMgr
beforePeerHook nbnet.AddHookFunc beforePeerHook nbnet.AddHookFunc
afterPeerHook nbnet.RemoveHookFunc afterPeerHook nbnet.RemoveHookFunc
@@ -261,6 +259,8 @@ func (e *Engine) Stop() error {
e.syncMsgMux.Lock() e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock() defer e.syncMsgMux.Unlock()
e.connMgr.Close()
// stopping network monitor first to avoid starting the engine again // stopping network monitor first to avoid starting the engine again
if e.networkMonitor != nil { if e.networkMonitor != nil {
e.networkMonitor.Stop() e.networkMonitor.Stop()
@@ -293,8 +293,6 @@ func (e *Engine) Stop() error {
return fmt.Errorf("failed to remove all peers: %s", err) return fmt.Errorf("failed to remove all peers: %s", err)
} }
e.lazyConnMgr.Close()
if e.cancel != nil { if e.cancel != nil {
e.cancel() e.cancel()
} }
@@ -406,10 +404,6 @@ func (e *Engine) Start() error {
return fmt.Errorf("up wg interface: %w", err) return fmt.Errorf("up wg interface: %w", err)
} }
e.lazyConnMgr = lazyConnManager.NewManager(e.wgInterface)
go e.lazyConnMgr.Start()
go e.receiveLazyConnEvents()
if e.firewall != nil { if e.firewall != nil {
e.acl = acl.NewDefaultManager(e.firewall) e.acl = acl.NewDefaultManager(e.firewall)
} }
@@ -429,6 +423,9 @@ func (e *Engine) Start() error {
NATExternalIPs: e.parseNATExternalIPMappings(), NATExternalIPs: e.parseNATExternalIPMappings(),
} }
e.connMgr = NewConnMgr(e.peerStore, wgIface)
e.connMgr.Start(e.ctx)
e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg) e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg)
e.srWatcher.Start() e.srWatcher.Start()
@@ -607,16 +604,11 @@ func (e *Engine) removePeer(peerKey string) error {
e.sshServer.RemoveAuthorizedKey(peerKey) e.sshServer.RemoveAuthorizedKey(peerKey)
} }
defer func() { e.connMgr.RemovePeerConn(peerKey)
err := e.statusRecorder.RemovePeer(peerKey)
if err != nil {
log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
}
}()
conn, exists := e.peerStore.Remove(peerKey) err := e.statusRecorder.RemovePeer(peerKey)
if exists { if err != nil {
conn.Close() log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
} }
return nil return nil
} }
@@ -1134,7 +1126,7 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
return fmt.Errorf("create peer connection: %w", err) return fmt.Errorf("create peer connection: %w", err)
} }
if ok := e.peerStore.AddPeerConn(peerKey, conn); !ok { if exists := e.connMgr.AddPeerConn(peerKey, conn); exists {
conn.Close() conn.Close()
return fmt.Errorf("peer already exists: %s", peerKey) return fmt.Errorf("peer already exists: %s", peerKey)
} }
@@ -1149,19 +1141,6 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err) log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
} }
peerKeyParsed, err := wgtypes.ParseKey(peerKey)
if err != nil {
return err
}
lazyPeerCfg := lazyconn.PeerConfig{
PublicKey: peerKeyParsed,
AllowedIPs: peerIPs,
}
if err := e.lazyConnMgr.AddPeer(lazyPeerCfg); err != nil {
return err
}
return nil return nil
} }
@@ -1237,19 +1216,11 @@ func (e *Engine) receiveSignalEvents() {
e.syncMsgMux.Lock() e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock() defer e.syncMsgMux.Unlock()
conn, ok := e.peerStore.PeerConn(msg.Key) conn, ok := e.connMgr.OnSignalMsg(msg.Key)
if !ok { if !ok {
return fmt.Errorf("wrongly addressed message %s", msg.Key) return fmt.Errorf("wrongly addressed message %s", msg.Key)
} }
peerKeyParsed, err := wgtypes.ParseKey(msg.Key)
if err != nil {
return err
}
if ok := e.lazyConnMgr.RemovePeer(peerKeyParsed); ok {
conn.Open()
}
switch msg.GetBody().Type { switch msg.GetBody().Type {
case sProto.Body_OFFER: case sProto.Body_OFFER:
remoteCred, err := signal.UnMarshalCredential(msg) remoteCred, err := signal.UnMarshalCredential(msg)
@@ -1375,12 +1346,6 @@ func (e *Engine) parseNATExternalIPMappings() []string {
} }
func (e *Engine) close() { func (e *Engine) close() {
log.Debugf("stop lazy connection manager")
if e.lazyConnMgr != nil {
e.lazyConnMgr.Close()
e.lazyConnMgr = nil
}
log.Debugf("removing Netbird interface %s", e.config.WgIfaceName) log.Debugf("removing Netbird interface %s", e.config.WgIfaceName)
if e.wgInterface != nil { if e.wgInterface != nil {
if err := e.wgInterface.Close(); err != nil { if err := e.wgInterface.Close(); err != nil {
@@ -1805,23 +1770,6 @@ func (e *Engine) Address() (netip.Addr, error) {
return ip.Unmap(), nil return ip.Unmap(), nil
} }
func (e *Engine) receiveLazyConnEvents() {
for {
select {
case peerID := <-e.lazyConnMgr.PeerActivityChan:
e.syncMsgMux.Lock()
peerConn, ok := e.peerStore.PeerConn(peerID.String())
if !ok {
e.syncMsgMux.Unlock()
continue
}
peerConn.Open()
e.syncMsgMux.Unlock()
case <-e.ctx.Done():
}
}
}
// isChecksEqual checks if two slices of checks are equal. // isChecksEqual checks if two slices of checks are equal.
func isChecksEqual(checks []*mgmProto.Checks, oChecks []*mgmProto.Checks) bool { func isChecksEqual(checks []*mgmProto.Checks, oChecks []*mgmProto.Checks) bool {
for _, check := range checks { for _, check := range checks {

View File

@@ -88,6 +88,7 @@ type MockWGIface struct {
GetDeviceFunc func() *device.FilteredDevice GetDeviceFunc func() *device.FilteredDevice
GetWGDeviceFunc func() *wgdevice.Device GetWGDeviceFunc func() *wgdevice.Device
GetStatsFunc func(peerKey string) (configurer.WGStats, error) GetStatsFunc func(peerKey string) (configurer.WGStats, error)
TransfersMoc func() (map[string]configurer.WGStats, error)
GetInterfaceGUIDStringFunc func() (string, error) GetInterfaceGUIDStringFunc func() (string, error)
GetProxyFunc func() wgproxy.Proxy GetProxyFunc func() wgproxy.Proxy
GetNetFunc func() *netstack.Net GetNetFunc func() *netstack.Net
@@ -169,6 +170,10 @@ func (m *MockWGIface) GetStats(peerKey string) (configurer.WGStats, error) {
return m.GetStatsFunc(peerKey) return m.GetStatsFunc(peerKey)
} }
func (m *MockWGIface) Transfers() (map[string]configurer.WGStats, error) {
return m.TransfersMoc()
}
func (m *MockWGIface) GetProxy() wgproxy.Proxy { func (m *MockWGIface) GetProxy() wgproxy.Proxy {
return m.GetProxyFunc() return m.GetProxyFunc()
} }

View File

@@ -35,6 +35,6 @@ type wgIfaceBase interface {
GetDevice() *device.FilteredDevice GetDevice() *device.FilteredDevice
GetWGDevice() *wgdevice.Device GetWGDevice() *wgdevice.Device
GetStats(peerKey string) (configurer.WGStats, error) GetStats(peerKey string) (configurer.WGStats, error)
Transfers() (map[wgtypes.Key]configurer.WGStats, error) Transfers() (map[string]configurer.WGStats, error)
GetNet() *netstack.Net GetNet() *netstack.Net
} }

View File

@@ -4,20 +4,18 @@ import (
"net" "net"
"sync" "sync"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
type Listener struct { type Listener struct {
peerID wgtypes.Key peerID string
conn *net.UDPConn conn *net.UDPConn
// todo is not thread safe. If you start the ReadPackets in upper layer in a Go thread then wait for Close() there too // todo is not thread safe. If you start the ReadPackets in upper layer in a Go thread then wait for Close() there too
wg sync.WaitGroup wg sync.WaitGroup
} }
func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) { func NewListener(peerID string, addr *net.UDPAddr) (*Listener, error) {
conn, err := net.ListenUDP("udp", addr) conn, err := net.ListenUDP("udp", addr)
if err != nil { if err != nil {
return nil, err return nil, err
@@ -30,7 +28,7 @@ func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) {
return d, nil return d, nil
} }
func (d *Listener) ReadPackets(trigger func(peerID wgtypes.Key)) { func (d *Listener) ReadPackets(trigger func(peerID string)) {
d.wg.Add(1) d.wg.Add(1)
defer d.wg.Done() defer d.wg.Done()

View File

@@ -5,7 +5,6 @@ import (
"net" "net"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn"
) )
@@ -30,27 +29,28 @@ func (p *portGenerator) nextPort() int {
} }
type Manager struct { type Manager struct {
TrafficStartChan chan wgtypes.Key TrafficStartChan chan string
wgIface lazyconn.WGIface wgIface lazyconn.WGIface
portGenerator *portGenerator portGenerator *portGenerator
peers map[wgtypes.Key]*Listener // todo peers add/remove is not thread safe because of the callback function
done chan struct{} peers map[string]*Listener
done chan struct{}
} }
func NewManager(wgIface lazyconn.WGIface) *Manager { func NewManager(wgIface lazyconn.WGIface) *Manager {
m := &Manager{ m := &Manager{
TrafficStartChan: make(chan wgtypes.Key, 1), TrafficStartChan: make(chan string, 1),
wgIface: wgIface, wgIface: wgIface,
portGenerator: newPortGenerator(), portGenerator: newPortGenerator(),
peers: make(map[wgtypes.Key]*Listener), peers: make(map[string]*Listener),
done: make(chan struct{}), done: make(chan struct{}),
} }
return m return m
} }
func (m *Manager) CreateFakePeers(peerCfg lazyconn.PeerConfig) error { func (m *Manager) CreateFakePeer(peerCfg lazyconn.PeerConfig) error {
if _, ok := m.peers[peerCfg.PublicKey]; ok { if _, ok := m.peers[peerCfg.PublicKey]; ok {
return nil return nil
} }
@@ -58,11 +58,11 @@ func (m *Manager) CreateFakePeers(peerCfg lazyconn.PeerConfig) error {
if err := m.createFakePeer(peerCfg); err != nil { if err := m.createFakePeer(peerCfg); err != nil {
return err return err
} }
log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey.String()) log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey)
return nil return nil
} }
func (m *Manager) RemovePeer(peerID wgtypes.Key) { func (m *Manager) RemovePeer(peerID string) {
listener, ok := m.peers[peerID] listener, ok := m.peers[peerID]
if !ok { if !ok {
return return
@@ -70,7 +70,7 @@ func (m *Manager) RemovePeer(peerID wgtypes.Key) {
listener.Close() listener.Close()
if err := m.wgIface.RemovePeer(peerID.String()); err != nil { if err := m.wgIface.RemovePeer(peerID); err != nil {
log.Warnf("failed to remove fake peer: %v", err) log.Warnf("failed to remove fake peer: %v", err)
} }
@@ -104,11 +104,11 @@ func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error {
} }
if listener == nil { if listener == nil {
return fmt.Errorf("failed to allocate lazy connection port for: %s", peerCfg.PublicKey.String()) return fmt.Errorf("failed to allocate lazy connection port for: %s", peerCfg.PublicKey)
} }
if err := m.createEndpoint(peerCfg, addr); err != nil { if err := m.createEndpoint(peerCfg, addr); err != nil {
log.Errorf("failed to create endpoint for %s: %v", peerCfg.PublicKey.String(), err) log.Errorf("failed to create endpoint for %s: %v", peerCfg.PublicKey, err)
listener.Close() listener.Close()
return err return err
} }
@@ -119,10 +119,14 @@ func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error {
return nil return nil
} }
func (m *Manager) onTrigger(peerID wgtypes.Key) { // todo: it is not thread safe, but it is ok if we protect from upper layer
func (m *Manager) onTrigger(peerID string) {
log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID) log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID)
// todo: it is not thread safe, but it is ok if we protect from upper layer if err := m.wgIface.RemovePeer(peerID); err != nil {
m.RemovePeer(peerID) log.Warnf("failed to remove fake peer: %v", err)
}
delete(m.peers, peerID)
select { select {
case <-m.done: case <-m.done:
@@ -131,5 +135,5 @@ func (m *Manager) onTrigger(peerID wgtypes.Key) {
} }
func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error { func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error {
return m.wgIface.UpdatePeer(peerCfg.PublicKey.String(), peerCfg.AllowedIPs, 0, endpoint, nil) return m.wgIface.UpdatePeer(peerCfg.PublicKey, peerCfg.AllowedIPs, 0, endpoint, nil)
} }

View File

@@ -5,7 +5,6 @@ import (
"sync" "sync"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/client/internal/lazyconn/listener" "github.com/netbirdio/netbird/client/internal/lazyconn/listener"
@@ -13,11 +12,11 @@ import (
) )
type Manager struct { type Manager struct {
PeerActivityChan chan wgtypes.Key PeerActivityChan chan string
watcher *watcher.Watcher watcher *watcher.Watcher
listenerMgr *listener.Manager listenerMgr *listener.Manager
managedPeers map[wgtypes.Key]lazyconn.PeerConfig managedPeers map[string]lazyconn.PeerConfig
watcherWG sync.WaitGroup watcherWG sync.WaitGroup
mu sync.Mutex mu sync.Mutex
@@ -25,10 +24,10 @@ type Manager struct {
func NewManager(wgIface lazyconn.WGIface) *Manager { func NewManager(wgIface lazyconn.WGIface) *Manager {
m := &Manager{ m := &Manager{
PeerActivityChan: make(chan wgtypes.Key, 1), PeerActivityChan: make(chan string, 1),
watcher: watcher.NewWatcher(wgIface), watcher: watcher.NewWatcher(wgIface),
listenerMgr: listener.NewManager(wgIface), listenerMgr: listener.NewManager(wgIface),
managedPeers: make(map[wgtypes.Key]lazyconn.PeerConfig), managedPeers: make(map[string]lazyconn.PeerConfig),
} }
return m return m
} }
@@ -59,7 +58,7 @@ func (m *Manager) Start() {
continue continue
} }
if err := m.listenerMgr.CreateFakePeers(cfg); err != nil { if err := m.listenerMgr.CreateFakePeer(cfg); err != nil {
log.Errorf("failed to start watch lazy connection tries: %s", err) log.Errorf("failed to start watch lazy connection tries: %s", err)
} }
m.mu.Unlock() m.mu.Unlock()
@@ -90,7 +89,7 @@ func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error {
return nil return nil
} }
if err := m.listenerMgr.CreateFakePeers(peer); err != nil { if err := m.listenerMgr.CreateFakePeer(peer); err != nil {
return err return err
} }
@@ -98,7 +97,7 @@ func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error {
return nil return nil
} }
func (m *Manager) RemovePeer(peerID wgtypes.Key) bool { func (m *Manager) RemovePeer(peerID string) bool {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@@ -120,10 +119,10 @@ func (m *Manager) Close() {
m.listenerMgr.Close() m.listenerMgr.Close()
m.watcherWG.Wait() m.watcherWG.Wait()
m.managedPeers = make(map[wgtypes.Key]lazyconn.PeerConfig) m.managedPeers = make(map[string]lazyconn.PeerConfig)
} }
func (m *Manager) notifyPeerAction(ctx context.Context, peerID wgtypes.Key) { func (m *Manager) notifyPeerAction(ctx context.Context, peerID string) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
case m.PeerActivityChan <- peerID: case m.PeerActivityChan <- peerID:

View File

@@ -2,11 +2,9 @@ package lazyconn
import ( import (
"net/netip" "net/netip"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
type PeerConfig struct { type PeerConfig struct {
PublicKey wgtypes.Key PublicKey string
AllowedIPs []netip.Prefix AllowedIPs []netip.Prefix
} }

View File

@@ -6,7 +6,6 @@ import (
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/client/iface/configurer" "github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn"
@@ -22,19 +21,19 @@ type rxHistory struct {
} }
type Watcher struct { type Watcher struct {
PeerTimedOutChan chan wgtypes.Key PeerTimedOutChan chan string
wgIface lazyconn.WGIface wgIface lazyconn.WGIface
peers map[wgtypes.Key]*rxHistory peers map[string]*rxHistory
peersMu sync.Mutex peersMu sync.Mutex
} }
func NewWatcher(wgIface lazyconn.WGIface) *Watcher { func NewWatcher(wgIface lazyconn.WGIface) *Watcher {
return &Watcher{ return &Watcher{
PeerTimedOutChan: make(chan wgtypes.Key, 1), PeerTimedOutChan: make(chan string, 1),
wgIface: wgIface, wgIface: wgIface,
peers: make(map[wgtypes.Key]*rxHistory), peers: make(map[string]*rxHistory),
} }
} }
@@ -57,14 +56,14 @@ func (m *Watcher) Watch(ctx context.Context) {
} }
} }
func (m *Watcher) AddPeer(peerID wgtypes.Key) { func (m *Watcher) AddPeer(peerID string) {
m.peersMu.Lock() m.peersMu.Lock()
defer m.peersMu.Unlock() defer m.peersMu.Unlock()
m.peers[peerID] = &rxHistory{} m.peers[peerID] = &rxHistory{}
} }
func (m *Watcher) RemovePeer(id wgtypes.Key) { func (m *Watcher) RemovePeer(id string) {
m.peersMu.Lock() m.peersMu.Lock()
defer m.peersMu.Unlock() defer m.peersMu.Unlock()
@@ -72,7 +71,7 @@ func (m *Watcher) RemovePeer(id wgtypes.Key) {
} }
// Todo: this is a naive implementation, we must to finish it // Todo: this is a naive implementation, we must to finish it
func (m *Watcher) checkTimeouts(ctx context.Context, allPeersStats map[wgtypes.Key]configurer.WGStats) { func (m *Watcher) checkTimeouts(ctx context.Context, allPeersStats map[string]configurer.WGStats) {
m.peersMu.Lock() m.peersMu.Lock()
defer m.peersMu.Unlock() defer m.peersMu.Unlock()

View File

@@ -11,7 +11,7 @@ import (
) )
type WGIface interface { type WGIface interface {
Transfers() (map[wgtypes.Key]configurer.WGStats, error) Transfers() (map[string]configurer.WGStats, error)
RemovePeer(peerKey string) error RemovePeer(peerKey string) error
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
} }

View File

@@ -79,6 +79,18 @@ func (s *Store) PeerConn(pubKey string) (*peer.Conn, bool) {
return p, true return p, true
} }
func (s *Store) PeerConnOpen(pubKey string) {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()
p, ok := s.peerConns[pubKey]
if !ok {
return
}
// this can be blocked because of the connect open limiter semaphore
p.Open()
}
func (s *Store) PeersPubKey() []string { func (s *Store) PeersPubKey() []string {
s.peerConnsMu.RLock() s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock() defer s.peerConnsMu.RUnlock()