diff --git a/client/internal/iface_common.go b/client/internal/iface_common.go index 65b425015..424e03f77 100644 --- a/client/internal/iface_common.go +++ b/client/internal/iface_common.go @@ -35,5 +35,6 @@ type wgIfaceBase interface { GetDevice() *device.FilteredDevice GetWGDevice() *wgdevice.Device GetStats(peerKey string) (configurer.WGStats, error) + Transfers() (map[wgtypes.Key]configurer.WGStats, error) GetNet() *netstack.Net } diff --git a/client/internal/lazyconn/listener/listener.go b/client/internal/lazyconn/listener/listener.go index 0cd91864d..1aa740b88 100644 --- a/client/internal/lazyconn/listener/listener.go +++ b/client/internal/lazyconn/listener/listener.go @@ -1,10 +1,11 @@ package listener import ( - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "net" "sync" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + log "github.com/sirupsen/logrus" ) @@ -12,7 +13,8 @@ type Listener struct { peerID wgtypes.Key conn *net.UDPConn - wg sync.WaitGroup + // todo is not thread safe. If you start the ReadPackets in upper layer in a Go thread then wait for Close() there too + wg sync.WaitGroup } func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) { @@ -29,7 +31,7 @@ func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) { } func (d *Listener) ReadPackets(trigger func(peerID wgtypes.Key)) { - d.wg.Done() + d.wg.Add(1) defer d.wg.Done() for { diff --git a/client/internal/lazyconn/listener/manager.go b/client/internal/lazyconn/listener/manager.go index ea32dd39a..3b456bc0e 100644 --- a/client/internal/lazyconn/listener/manager.go +++ b/client/internal/lazyconn/listener/manager.go @@ -44,6 +44,7 @@ func NewManager(wgIface lazyconn.WGIface) *Manager { TrafficStartChan: make(chan wgtypes.Key, 1), wgIface: wgIface, portGenerator: newPortGenerator(), + peers: make(map[wgtypes.Key]*Listener), done: make(chan struct{}), } return m @@ -69,7 +70,7 @@ func (m *Manager) RemovePeer(peerID wgtypes.Key) { listener.Close() - if err := m.wgIface.RemovePeer(peerID); err != nil { + if err := m.wgIface.RemovePeer(peerID.String()); err != nil { log.Warnf("failed to remove fake peer: %v", err) } @@ -119,9 +120,10 @@ func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error { } func (m *Manager) onTrigger(peerID wgtypes.Key) { - if err := m.wgIface.RemovePeer(peerID); err != nil { - log.Errorf("failed to remove peer: %v", err) - } + log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID) + // todo: it is not thread safe, but it is ok if we protect from upper layer + m.RemovePeer(peerID) + select { case <-m.done: case m.TrafficStartChan <- peerID: @@ -129,5 +131,5 @@ func (m *Manager) onTrigger(peerID wgtypes.Key) { } func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error { - return m.wgIface.UpdatePeer(peerCfg.PublicKey.String(), peerCfg.AllowedIP.String(), 0, endpoint, nil) + return m.wgIface.UpdatePeer(peerCfg.PublicKey.String(), peerCfg.AllowedIPs, 0, endpoint, nil) } diff --git a/client/internal/lazyconn/manager/manager.go b/client/internal/lazyconn/manager/manager.go index 120f0bd54..3ea264b03 100644 --- a/client/internal/lazyconn/manager/manager.go +++ b/client/internal/lazyconn/manager/manager.go @@ -13,24 +13,22 @@ import ( ) type Manager struct { + PeerActivityChan chan wgtypes.Key + watcher *watcher.Watcher listenerMgr *listener.Manager managedPeers map[wgtypes.Key]lazyconn.PeerConfig - addPeers chan []lazyconn.PeerConfig - removePeer chan wgtypes.Key - watcherWG sync.WaitGroup mu sync.Mutex } func NewManager(wgIface lazyconn.WGIface) *Manager { m := &Manager{ - watcher: watcher.NewWatcher(wgIface), - listenerMgr: listener.NewManager(wgIface), - managedPeers: make(map[wgtypes.Key]lazyconn.PeerConfig), - addPeers: make(chan []lazyconn.PeerConfig, 1), - removePeer: make(chan wgtypes.Key, 1), + PeerActivityChan: make(chan wgtypes.Key, 1), + watcher: watcher.NewWatcher(wgIface), + listenerMgr: listener.NewManager(wgIface), + managedPeers: make(map[wgtypes.Key]lazyconn.PeerConfig), } return m } @@ -57,6 +55,7 @@ func (m *Manager) Start() { m.mu.Lock() cfg, ok := m.managedPeers[peerID] if !ok { + m.mu.Unlock() continue } @@ -68,21 +67,25 @@ func (m *Manager) Start() { m.mu.Lock() _, ok := m.managedPeers[peerID] if !ok { + log.Debugf("lazy peer is not managed: %s", peerID) + m.mu.Unlock() continue } - log.Infof("peer %s started to send traffic", peerID) - m.watcher.AddPeer(peerID) - m.notifyPeerAction(peerID) + //m.watcher.AddPeer(peerID) + log.Infof("lazy peer is active: %s", peerID) + m.notifyPeerAction(ctx, peerID) m.mu.Unlock() } } } -func (m *Manager) SetPeer(peer lazyconn.PeerConfig) error { +func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error { m.mu.Lock() defer m.mu.Unlock() + log.Debugf("adding lazy peer: %s", peer.PublicKey) + if _, ok := m.managedPeers[peer.PublicKey]; ok { return nil } @@ -91,17 +94,24 @@ func (m *Manager) SetPeer(peer lazyconn.PeerConfig) error { return err } - // todo: remove removed peers from the list + m.managedPeers[peer.PublicKey] = peer return nil } -func (m *Manager) RemovePeer(peerID wgtypes.Key) { +func (m *Manager) RemovePeer(peerID wgtypes.Key) bool { m.mu.Lock() defer m.mu.Unlock() + if _, ok := m.managedPeers[peerID]; !ok { + return false + } + + log.Debugf("removing lazy peer: %s", peerID) + m.watcher.RemovePeer(peerID) m.listenerMgr.RemovePeer(peerID) delete(m.managedPeers, peerID) + return false } func (m *Manager) Close() { @@ -113,6 +123,9 @@ func (m *Manager) Close() { m.managedPeers = make(map[wgtypes.Key]lazyconn.PeerConfig) } -func (m *Manager) notifyPeerAction(peerID wgtypes.Key) { - // todo notify engine +func (m *Manager) notifyPeerAction(ctx context.Context, peerID wgtypes.Key) { + select { + case <-ctx.Done(): + case m.PeerActivityChan <- peerID: + } } diff --git a/client/internal/lazyconn/peercfg.go b/client/internal/lazyconn/peercfg.go index c2b3fed86..11d4200af 100644 --- a/client/internal/lazyconn/peercfg.go +++ b/client/internal/lazyconn/peercfg.go @@ -1,12 +1,12 @@ package lazyconn import ( - "net" + "net/netip" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" ) type PeerConfig struct { - PublicKey wgtypes.Key - AllowedIP net.IPNet + PublicKey wgtypes.Key + AllowedIPs []netip.Prefix } diff --git a/client/internal/lazyconn/wgiface.go b/client/internal/lazyconn/wgiface.go index 0a7a79c13..04879fb54 100644 --- a/client/internal/lazyconn/wgiface.go +++ b/client/internal/lazyconn/wgiface.go @@ -2,6 +2,7 @@ package lazyconn import ( "net" + "net/netip" "time" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" @@ -11,6 +12,6 @@ import ( type WGIface interface { Transfers() (map[wgtypes.Key]configurer.WGStats, error) - RemovePeer(key wgtypes.Key) error - UpdatePeer(peerKey string, allowedIps string, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error + RemovePeer(peerKey string) error + UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error }