diff --git a/client/iface/configurer/kernel_unix.go b/client/iface/configurer/kernel_unix.go index 198955194..fd5e3e626 100644 --- a/client/iface/configurer/kernel_unix.go +++ b/client/iface/configurer/kernel_unix.go @@ -213,8 +213,8 @@ func (c *KernelConfigurer) GetStats(peerKey string) (WGStats, error) { }, nil } -func (c *KernelConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) { - stats := make(map[wgtypes.Key]WGStats) +func (c *KernelConfigurer) Transfers() (map[string]WGStats, error) { + stats := make(map[string]WGStats) wg, err := wgctrl.New() if err != nil { return nil, fmt.Errorf("wgctl: %w", err) @@ -231,7 +231,7 @@ func (c *KernelConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) { return nil, fmt.Errorf("get device %s: %w", c.deviceName, err) } for _, peer := range wgDevice.Peers { - stats[peer.PublicKey] = WGStats{ + stats[peer.PublicKey.String()] = WGStats{ LastHandshake: peer.LastHandshakeTime, TxBytes: peer.TransmitBytes, RxBytes: peer.ReceiveBytes, diff --git a/client/iface/configurer/usp.go b/client/iface/configurer/usp.go index 4b8aac936..6600c6f16 100644 --- a/client/iface/configurer/usp.go +++ b/client/iface/configurer/usp.go @@ -263,7 +263,7 @@ func (t *WGUSPConfigurer) GetStats(peerKey string) (WGStats, error) { }, nil } -func (t *WGUSPConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) { +func (t *WGUSPConfigurer) Transfers() (map[string]WGStats, error) { ipc, err := t.device.IpcGet() if err != nil { return nil, fmt.Errorf("ipc get: %w", err) @@ -272,10 +272,10 @@ func (t *WGUSPConfigurer) Transfers() (map[wgtypes.Key]WGStats, error) { return parseTransfers(ipc) } -func parseTransfers(ipc string) (map[wgtypes.Key]WGStats, error) { - stats := make(map[wgtypes.Key]WGStats) +func parseTransfers(ipc string) (map[string]WGStats, error) { + stats := make(map[string]WGStats) var ( - currentKey wgtypes.Key + currentKey string currentStats WGStats hasPeer bool ) @@ -291,14 +291,7 @@ func parseTransfers(ipc string) (map[wgtypes.Key]WGStats, error) { if err != nil { return nil, fmt.Errorf("decode peerID: %w", err) } - b64 := base64.StdEncoding.EncodeToString(h) - - peerKeyParsed, err := wgtypes.ParseKey(b64) - if err != nil { - return nil, fmt.Errorf("parse key: %w", err) - } - - currentKey = peerKeyParsed + currentKey = base64.StdEncoding.EncodeToString(h) currentStats = WGStats{} // Reset stats for the new peer hasPeer = true stats[currentKey] = currentStats diff --git a/client/iface/device/interface.go b/client/iface/device/interface.go index 31ab6208e..2a368f6e6 100644 --- a/client/iface/device/interface.go +++ b/client/iface/device/interface.go @@ -17,5 +17,5 @@ type WGConfigurer interface { RemoveAllowedIP(peerKey string, allowedIP string) error Close() GetStats(peerKey string) (configurer.WGStats, error) - Transfers() (map[wgtypes.Key]configurer.WGStats, error) + Transfers() (map[string]configurer.WGStats, error) } diff --git a/client/iface/iface.go b/client/iface/iface.go index 453285e9d..d952524d9 100644 --- a/client/iface/iface.go +++ b/client/iface/iface.go @@ -218,7 +218,7 @@ func (w *WGIface) GetStats(peerKey string) (configurer.WGStats, error) { return w.configurer.GetStats(peerKey) } -func (w *WGIface) Transfers() (map[wgtypes.Key]configurer.WGStats, error) { +func (w *WGIface) Transfers() (map[string]configurer.WGStats, error) { return w.configurer.Transfers() } diff --git a/client/internal/conn_mgr.go b/client/internal/conn_mgr.go new file mode 100644 index 000000000..c44986928 --- /dev/null +++ b/client/internal/conn_mgr.go @@ -0,0 +1,103 @@ +package internal + +import ( + "context" + "os" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/internal/lazyconn" + lazyConnManager "github.com/netbirdio/netbird/client/internal/lazyconn/manager" + "github.com/netbirdio/netbird/client/internal/peer" + "github.com/netbirdio/netbird/client/internal/peerstore" +) + +const ( + envDisableLazyConn = "NB_LAZY_CONN_DISABLE" +) + +type ConnMgr struct { + lazyConnMgr *lazyConnManager.Manager + peerStore *peerstore.Store + + excludes map[string]struct{} +} + +func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface) *ConnMgr { + e := &ConnMgr{ + peerStore: peerStore, + lazyConnMgr: lazyConnManager.NewManager(iface), + } + return e +} + +func (e *ConnMgr) Start(ctx context.Context) { + if os.Getenv(envDisableLazyConn) == "true" { + log.Infof("lazy connection manager is disabled") + return + } + go e.lazyConnMgr.Start() + go e.receiveLazyConnEvents(ctx) +} + +func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) { + e.excludes[peerID] = struct{}{} +} + +func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) { + if success := e.peerStore.AddPeerConn(peerKey, conn); !success { + return true + } + + _, exists = e.excludes[peerKey] + if exists { + conn.Open() + return + } + + lazyPeerCfg := lazyconn.PeerConfig{ + PublicKey: peerKey, + AllowedIPs: conn.WgConfig().AllowedIps, + } + if err := e.lazyConnMgr.AddPeer(lazyPeerCfg); err != nil { + log.Errorf("failed to add peer to lazyconn manager: %v", err) + conn.Open() + } + return +} + +func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) { + conn, ok := e.peerStore.PeerConn(peerKey) + if !ok { + return nil, false + } + + if ok := e.lazyConnMgr.RemovePeer(peerKey); ok { + conn.Open() + } + return conn, true +} + +func (e *ConnMgr) RemovePeerConn(peerKey string) { + conn, ok := e.peerStore.Remove(peerKey) + if ok { + conn.Close() + } + + e.lazyConnMgr.RemovePeer(peerKey) +} + +func (e *ConnMgr) Close() { + // todo wait for receiveLazyConnEvents to finish + e.lazyConnMgr.Close() +} + +func (e *ConnMgr) receiveLazyConnEvents(ctx context.Context) { + for { + select { + case peerID := <-e.lazyConnMgr.PeerActivityChan: + e.peerStore.PeerConnOpen(peerID) + case <-ctx.Done(): + } + } +} diff --git a/client/internal/engine.go b/client/internal/engine.go index 61e737a4f..9fa721dcb 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -33,8 +33,6 @@ import ( "github.com/netbirdio/netbird/client/internal/acl" "github.com/netbirdio/netbird/client/internal/dns" "github.com/netbirdio/netbird/client/internal/dnsfwd" - "github.com/netbirdio/netbird/client/internal/lazyconn" - lazyConnManager "github.com/netbirdio/netbird/client/internal/lazyconn/manager" "github.com/netbirdio/netbird/client/internal/networkmonitor" "github.com/netbirdio/netbird/client/internal/peer" "github.com/netbirdio/netbird/client/internal/peer/guard" @@ -133,7 +131,7 @@ type Engine struct { // peerConns is a map that holds all the peers that are known to this peer peerStore *peerstore.Store - lazyConnMgr *lazyConnManager.Manager + connMgr *ConnMgr beforePeerHook nbnet.AddHookFunc afterPeerHook nbnet.RemoveHookFunc @@ -261,6 +259,8 @@ func (e *Engine) Stop() error { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() + e.connMgr.Close() + // stopping network monitor first to avoid starting the engine again if e.networkMonitor != nil { e.networkMonitor.Stop() @@ -293,8 +293,6 @@ func (e *Engine) Stop() error { return fmt.Errorf("failed to remove all peers: %s", err) } - e.lazyConnMgr.Close() - if e.cancel != nil { e.cancel() } @@ -406,10 +404,6 @@ func (e *Engine) Start() error { return fmt.Errorf("up wg interface: %w", err) } - e.lazyConnMgr = lazyConnManager.NewManager(e.wgInterface) - go e.lazyConnMgr.Start() - go e.receiveLazyConnEvents() - if e.firewall != nil { e.acl = acl.NewDefaultManager(e.firewall) } @@ -429,6 +423,9 @@ func (e *Engine) Start() error { NATExternalIPs: e.parseNATExternalIPMappings(), } + e.connMgr = NewConnMgr(e.peerStore, wgIface) + e.connMgr.Start(e.ctx) + e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg) e.srWatcher.Start() @@ -607,16 +604,11 @@ func (e *Engine) removePeer(peerKey string) error { e.sshServer.RemoveAuthorizedKey(peerKey) } - defer func() { - err := e.statusRecorder.RemovePeer(peerKey) - if err != nil { - log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err) - } - }() + e.connMgr.RemovePeerConn(peerKey) - conn, exists := e.peerStore.Remove(peerKey) - if exists { - conn.Close() + err := e.statusRecorder.RemovePeer(peerKey) + if err != nil { + log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err) } return nil } @@ -1134,7 +1126,7 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error { return fmt.Errorf("create peer connection: %w", err) } - if ok := e.peerStore.AddPeerConn(peerKey, conn); !ok { + if exists := e.connMgr.AddPeerConn(peerKey, conn); exists { conn.Close() return fmt.Errorf("peer already exists: %s", peerKey) } @@ -1149,19 +1141,6 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error { log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err) } - peerKeyParsed, err := wgtypes.ParseKey(peerKey) - if err != nil { - return err - } - - lazyPeerCfg := lazyconn.PeerConfig{ - PublicKey: peerKeyParsed, - AllowedIPs: peerIPs, - } - if err := e.lazyConnMgr.AddPeer(lazyPeerCfg); err != nil { - return err - } - return nil } @@ -1237,19 +1216,11 @@ func (e *Engine) receiveSignalEvents() { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() - conn, ok := e.peerStore.PeerConn(msg.Key) + conn, ok := e.connMgr.OnSignalMsg(msg.Key) if !ok { return fmt.Errorf("wrongly addressed message %s", msg.Key) } - peerKeyParsed, err := wgtypes.ParseKey(msg.Key) - if err != nil { - return err - } - if ok := e.lazyConnMgr.RemovePeer(peerKeyParsed); ok { - conn.Open() - } - switch msg.GetBody().Type { case sProto.Body_OFFER: remoteCred, err := signal.UnMarshalCredential(msg) @@ -1375,12 +1346,6 @@ func (e *Engine) parseNATExternalIPMappings() []string { } func (e *Engine) close() { - log.Debugf("stop lazy connection manager") - if e.lazyConnMgr != nil { - e.lazyConnMgr.Close() - e.lazyConnMgr = nil - } - log.Debugf("removing Netbird interface %s", e.config.WgIfaceName) if e.wgInterface != nil { if err := e.wgInterface.Close(); err != nil { @@ -1805,23 +1770,6 @@ func (e *Engine) Address() (netip.Addr, error) { return ip.Unmap(), nil } -func (e *Engine) receiveLazyConnEvents() { - for { - select { - case peerID := <-e.lazyConnMgr.PeerActivityChan: - e.syncMsgMux.Lock() - peerConn, ok := e.peerStore.PeerConn(peerID.String()) - if !ok { - e.syncMsgMux.Unlock() - continue - } - peerConn.Open() - e.syncMsgMux.Unlock() - case <-e.ctx.Done(): - } - } -} - // isChecksEqual checks if two slices of checks are equal. func isChecksEqual(checks []*mgmProto.Checks, oChecks []*mgmProto.Checks) bool { for _, check := range checks { diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index 54a347e31..65581bef8 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -88,6 +88,7 @@ type MockWGIface struct { GetDeviceFunc func() *device.FilteredDevice GetWGDeviceFunc func() *wgdevice.Device GetStatsFunc func(peerKey string) (configurer.WGStats, error) + TransfersMoc func() (map[string]configurer.WGStats, error) GetInterfaceGUIDStringFunc func() (string, error) GetProxyFunc func() wgproxy.Proxy GetNetFunc func() *netstack.Net @@ -169,6 +170,10 @@ func (m *MockWGIface) GetStats(peerKey string) (configurer.WGStats, error) { return m.GetStatsFunc(peerKey) } +func (m *MockWGIface) Transfers() (map[string]configurer.WGStats, error) { + return m.TransfersMoc() +} + func (m *MockWGIface) GetProxy() wgproxy.Proxy { return m.GetProxyFunc() } diff --git a/client/internal/iface_common.go b/client/internal/iface_common.go index 424e03f77..7e45759c6 100644 --- a/client/internal/iface_common.go +++ b/client/internal/iface_common.go @@ -35,6 +35,6 @@ type wgIfaceBase interface { GetDevice() *device.FilteredDevice GetWGDevice() *wgdevice.Device GetStats(peerKey string) (configurer.WGStats, error) - Transfers() (map[wgtypes.Key]configurer.WGStats, error) + Transfers() (map[string]configurer.WGStats, error) GetNet() *netstack.Net } diff --git a/client/internal/lazyconn/listener/listener.go b/client/internal/lazyconn/listener/listener.go index 1aa740b88..94d9f9667 100644 --- a/client/internal/lazyconn/listener/listener.go +++ b/client/internal/lazyconn/listener/listener.go @@ -4,20 +4,18 @@ import ( "net" "sync" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - log "github.com/sirupsen/logrus" ) type Listener struct { - peerID wgtypes.Key + peerID string conn *net.UDPConn // todo is not thread safe. If you start the ReadPackets in upper layer in a Go thread then wait for Close() there too wg sync.WaitGroup } -func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) { +func NewListener(peerID string, addr *net.UDPAddr) (*Listener, error) { conn, err := net.ListenUDP("udp", addr) if err != nil { return nil, err @@ -30,7 +28,7 @@ func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) { return d, nil } -func (d *Listener) ReadPackets(trigger func(peerID wgtypes.Key)) { +func (d *Listener) ReadPackets(trigger func(peerID string)) { d.wg.Add(1) defer d.wg.Done() diff --git a/client/internal/lazyconn/listener/manager.go b/client/internal/lazyconn/listener/manager.go index 3b456bc0e..00493012d 100644 --- a/client/internal/lazyconn/listener/manager.go +++ b/client/internal/lazyconn/listener/manager.go @@ -5,7 +5,6 @@ import ( "net" log "github.com/sirupsen/logrus" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "github.com/netbirdio/netbird/client/internal/lazyconn" ) @@ -30,27 +29,28 @@ func (p *portGenerator) nextPort() int { } type Manager struct { - TrafficStartChan chan wgtypes.Key + TrafficStartChan chan string wgIface lazyconn.WGIface portGenerator *portGenerator - peers map[wgtypes.Key]*Listener - done chan struct{} + // todo peers add/remove is not thread safe because of the callback function + peers map[string]*Listener + done chan struct{} } func NewManager(wgIface lazyconn.WGIface) *Manager { m := &Manager{ - TrafficStartChan: make(chan wgtypes.Key, 1), + TrafficStartChan: make(chan string, 1), wgIface: wgIface, portGenerator: newPortGenerator(), - peers: make(map[wgtypes.Key]*Listener), + peers: make(map[string]*Listener), done: make(chan struct{}), } return m } -func (m *Manager) CreateFakePeers(peerCfg lazyconn.PeerConfig) error { +func (m *Manager) CreateFakePeer(peerCfg lazyconn.PeerConfig) error { if _, ok := m.peers[peerCfg.PublicKey]; ok { return nil } @@ -58,11 +58,11 @@ func (m *Manager) CreateFakePeers(peerCfg lazyconn.PeerConfig) error { if err := m.createFakePeer(peerCfg); err != nil { return err } - log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey.String()) + log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey) return nil } -func (m *Manager) RemovePeer(peerID wgtypes.Key) { +func (m *Manager) RemovePeer(peerID string) { listener, ok := m.peers[peerID] if !ok { return @@ -70,7 +70,7 @@ func (m *Manager) RemovePeer(peerID wgtypes.Key) { listener.Close() - if err := m.wgIface.RemovePeer(peerID.String()); err != nil { + if err := m.wgIface.RemovePeer(peerID); err != nil { log.Warnf("failed to remove fake peer: %v", err) } @@ -104,11 +104,11 @@ func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error { } if listener == nil { - return fmt.Errorf("failed to allocate lazy connection port for: %s", peerCfg.PublicKey.String()) + return fmt.Errorf("failed to allocate lazy connection port for: %s", peerCfg.PublicKey) } if err := m.createEndpoint(peerCfg, addr); err != nil { - log.Errorf("failed to create endpoint for %s: %v", peerCfg.PublicKey.String(), err) + log.Errorf("failed to create endpoint for %s: %v", peerCfg.PublicKey, err) listener.Close() return err } @@ -119,10 +119,14 @@ func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error { return nil } -func (m *Manager) onTrigger(peerID wgtypes.Key) { +// todo: it is not thread safe, but it is ok if we protect from upper layer +func (m *Manager) onTrigger(peerID string) { log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID) - // todo: it is not thread safe, but it is ok if we protect from upper layer - m.RemovePeer(peerID) + if err := m.wgIface.RemovePeer(peerID); err != nil { + log.Warnf("failed to remove fake peer: %v", err) + } + + delete(m.peers, peerID) select { case <-m.done: @@ -131,5 +135,5 @@ func (m *Manager) onTrigger(peerID wgtypes.Key) { } func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error { - return m.wgIface.UpdatePeer(peerCfg.PublicKey.String(), peerCfg.AllowedIPs, 0, endpoint, nil) + return m.wgIface.UpdatePeer(peerCfg.PublicKey, peerCfg.AllowedIPs, 0, endpoint, nil) } diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 3ea264b03..e9623e394 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -5,7 +5,6 @@ import ( "sync" log "github.com/sirupsen/logrus" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "github.com/netbirdio/netbird/client/internal/lazyconn" "github.com/netbirdio/netbird/client/internal/lazyconn/listener" @@ -13,11 +12,11 @@ import ( ) type Manager struct { - PeerActivityChan chan wgtypes.Key + PeerActivityChan chan string watcher *watcher.Watcher listenerMgr *listener.Manager - managedPeers map[wgtypes.Key]lazyconn.PeerConfig + managedPeers map[string]lazyconn.PeerConfig watcherWG sync.WaitGroup mu sync.Mutex @@ -25,10 +24,10 @@ type Manager struct { func NewManager(wgIface lazyconn.WGIface) *Manager { m := &Manager{ - PeerActivityChan: make(chan wgtypes.Key, 1), + PeerActivityChan: make(chan string, 1), watcher: watcher.NewWatcher(wgIface), listenerMgr: listener.NewManager(wgIface), - managedPeers: make(map[wgtypes.Key]lazyconn.PeerConfig), + managedPeers: make(map[string]lazyconn.PeerConfig), } return m } @@ -59,7 +58,7 @@ func (m *Manager) Start() { continue } - if err := m.listenerMgr.CreateFakePeers(cfg); err != nil { + if err := m.listenerMgr.CreateFakePeer(cfg); err != nil { log.Errorf("failed to start watch lazy connection tries: %s", err) } m.mu.Unlock() @@ -90,7 +89,7 @@ func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error { return nil } - if err := m.listenerMgr.CreateFakePeers(peer); err != nil { + if err := m.listenerMgr.CreateFakePeer(peer); err != nil { return err } @@ -98,7 +97,7 @@ func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error { return nil } -func (m *Manager) RemovePeer(peerID wgtypes.Key) bool { +func (m *Manager) RemovePeer(peerID string) bool { m.mu.Lock() defer m.mu.Unlock() @@ -120,10 +119,10 @@ func (m *Manager) Close() { m.listenerMgr.Close() m.watcherWG.Wait() - m.managedPeers = make(map[wgtypes.Key]lazyconn.PeerConfig) + m.managedPeers = make(map[string]lazyconn.PeerConfig) } -func (m *Manager) notifyPeerAction(ctx context.Context, peerID wgtypes.Key) { +func (m *Manager) notifyPeerAction(ctx context.Context, peerID string) { select { case <-ctx.Done(): case m.PeerActivityChan <- peerID: diff --git a/client/internal/lazyconn/peercfg.go b/client/internal/lazyconn/peercfg.go index 11d4200af..8c45ed357 100644 --- a/client/internal/lazyconn/peercfg.go +++ b/client/internal/lazyconn/peercfg.go @@ -2,11 +2,9 @@ package lazyconn import ( "net/netip" - - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) type PeerConfig struct { - PublicKey wgtypes.Key + PublicKey string AllowedIPs []netip.Prefix } diff --git a/client/internal/lazyconn/watcher/watcher.go b/client/internal/lazyconn/watcher/watcher.go index 5d96fe6c0..de56e9c96 100644 --- a/client/internal/lazyconn/watcher/watcher.go +++ b/client/internal/lazyconn/watcher/watcher.go @@ -6,7 +6,6 @@ import ( "time" log "github.com/sirupsen/logrus" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "github.com/netbirdio/netbird/client/iface/configurer" "github.com/netbirdio/netbird/client/internal/lazyconn" @@ -22,19 +21,19 @@ type rxHistory struct { } type Watcher struct { - PeerTimedOutChan chan wgtypes.Key + PeerTimedOutChan chan string wgIface lazyconn.WGIface - peers map[wgtypes.Key]*rxHistory + peers map[string]*rxHistory peersMu sync.Mutex } func NewWatcher(wgIface lazyconn.WGIface) *Watcher { return &Watcher{ - PeerTimedOutChan: make(chan wgtypes.Key, 1), + PeerTimedOutChan: make(chan string, 1), wgIface: wgIface, - peers: make(map[wgtypes.Key]*rxHistory), + peers: make(map[string]*rxHistory), } } @@ -57,14 +56,14 @@ func (m *Watcher) Watch(ctx context.Context) { } } -func (m *Watcher) AddPeer(peerID wgtypes.Key) { +func (m *Watcher) AddPeer(peerID string) { m.peersMu.Lock() defer m.peersMu.Unlock() m.peers[peerID] = &rxHistory{} } -func (m *Watcher) RemovePeer(id wgtypes.Key) { +func (m *Watcher) RemovePeer(id string) { m.peersMu.Lock() defer m.peersMu.Unlock() @@ -72,7 +71,7 @@ func (m *Watcher) RemovePeer(id wgtypes.Key) { } // Todo: this is a naive implementation, we must to finish it -func (m *Watcher) checkTimeouts(ctx context.Context, allPeersStats map[wgtypes.Key]configurer.WGStats) { +func (m *Watcher) checkTimeouts(ctx context.Context, allPeersStats map[string]configurer.WGStats) { m.peersMu.Lock() defer m.peersMu.Unlock() diff --git a/client/internal/lazyconn/wgiface.go b/client/internal/lazyconn/wgiface.go index 04879fb54..4eff7b833 100644 --- a/client/internal/lazyconn/wgiface.go +++ b/client/internal/lazyconn/wgiface.go @@ -11,7 +11,7 @@ import ( ) type WGIface interface { - Transfers() (map[wgtypes.Key]configurer.WGStats, error) + Transfers() (map[string]configurer.WGStats, error) RemovePeer(peerKey string) error UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error } diff --git a/client/internal/peerstore/store.go b/client/internal/peerstore/store.go index 15d34d3d0..d1bab85e4 100644 --- a/client/internal/peerstore/store.go +++ b/client/internal/peerstore/store.go @@ -79,6 +79,18 @@ func (s *Store) PeerConn(pubKey string) (*peer.Conn, bool) { return p, true } +func (s *Store) PeerConnOpen(pubKey string) { + s.peerConnsMu.RLock() + defer s.peerConnsMu.RUnlock() + + p, ok := s.peerConns[pubKey] + if !ok { + return + } + // this can be blocked because of the connect open limiter semaphore + p.Open() +} + func (s *Store) PeersPubKey() []string { s.peerConnsMu.RLock() defer s.peerConnsMu.RUnlock()