Fix logic

This commit is contained in:
Zoltán Papp
2025-02-24 17:37:28 +01:00
parent c8b031e819
commit d6e185086f
6 changed files with 48 additions and 29 deletions

View File

@@ -35,5 +35,6 @@ type wgIfaceBase interface {
GetDevice() *device.FilteredDevice GetDevice() *device.FilteredDevice
GetWGDevice() *wgdevice.Device GetWGDevice() *wgdevice.Device
GetStats(peerKey string) (configurer.WGStats, error) GetStats(peerKey string) (configurer.WGStats, error)
Transfers() (map[wgtypes.Key]configurer.WGStats, error)
GetNet() *netstack.Net GetNet() *netstack.Net
} }

View File

@@ -1,10 +1,11 @@
package listener package listener
import ( import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"net" "net"
"sync" "sync"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@@ -12,7 +13,8 @@ type Listener struct {
peerID wgtypes.Key peerID wgtypes.Key
conn *net.UDPConn conn *net.UDPConn
wg sync.WaitGroup // todo is not thread safe. If you start the ReadPackets in upper layer in a Go thread then wait for Close() there too
wg sync.WaitGroup
} }
func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) { func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) {
@@ -29,7 +31,7 @@ func NewListener(peerID wgtypes.Key, addr *net.UDPAddr) (*Listener, error) {
} }
func (d *Listener) ReadPackets(trigger func(peerID wgtypes.Key)) { func (d *Listener) ReadPackets(trigger func(peerID wgtypes.Key)) {
d.wg.Done() d.wg.Add(1)
defer d.wg.Done() defer d.wg.Done()
for { for {

View File

@@ -44,6 +44,7 @@ func NewManager(wgIface lazyconn.WGIface) *Manager {
TrafficStartChan: make(chan wgtypes.Key, 1), TrafficStartChan: make(chan wgtypes.Key, 1),
wgIface: wgIface, wgIface: wgIface,
portGenerator: newPortGenerator(), portGenerator: newPortGenerator(),
peers: make(map[wgtypes.Key]*Listener),
done: make(chan struct{}), done: make(chan struct{}),
} }
return m return m
@@ -69,7 +70,7 @@ func (m *Manager) RemovePeer(peerID wgtypes.Key) {
listener.Close() listener.Close()
if err := m.wgIface.RemovePeer(peerID); err != nil { if err := m.wgIface.RemovePeer(peerID.String()); err != nil {
log.Warnf("failed to remove fake peer: %v", err) log.Warnf("failed to remove fake peer: %v", err)
} }
@@ -119,9 +120,10 @@ func (m *Manager) createFakePeer(peerCfg lazyconn.PeerConfig) error {
} }
func (m *Manager) onTrigger(peerID wgtypes.Key) { func (m *Manager) onTrigger(peerID wgtypes.Key) {
if err := m.wgIface.RemovePeer(peerID); err != nil { log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID)
log.Errorf("failed to remove peer: %v", err) // todo: it is not thread safe, but it is ok if we protect from upper layer
} m.RemovePeer(peerID)
select { select {
case <-m.done: case <-m.done:
case m.TrafficStartChan <- peerID: case m.TrafficStartChan <- peerID:
@@ -129,5 +131,5 @@ func (m *Manager) onTrigger(peerID wgtypes.Key) {
} }
func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error { func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error {
return m.wgIface.UpdatePeer(peerCfg.PublicKey.String(), peerCfg.AllowedIP.String(), 0, endpoint, nil) return m.wgIface.UpdatePeer(peerCfg.PublicKey.String(), peerCfg.AllowedIPs, 0, endpoint, nil)
} }

View File

@@ -13,24 +13,22 @@ import (
) )
type Manager struct { type Manager struct {
PeerActivityChan chan wgtypes.Key
watcher *watcher.Watcher watcher *watcher.Watcher
listenerMgr *listener.Manager listenerMgr *listener.Manager
managedPeers map[wgtypes.Key]lazyconn.PeerConfig managedPeers map[wgtypes.Key]lazyconn.PeerConfig
addPeers chan []lazyconn.PeerConfig
removePeer chan wgtypes.Key
watcherWG sync.WaitGroup watcherWG sync.WaitGroup
mu sync.Mutex mu sync.Mutex
} }
func NewManager(wgIface lazyconn.WGIface) *Manager { func NewManager(wgIface lazyconn.WGIface) *Manager {
m := &Manager{ m := &Manager{
watcher: watcher.NewWatcher(wgIface), PeerActivityChan: make(chan wgtypes.Key, 1),
listenerMgr: listener.NewManager(wgIface), watcher: watcher.NewWatcher(wgIface),
managedPeers: make(map[wgtypes.Key]lazyconn.PeerConfig), listenerMgr: listener.NewManager(wgIface),
addPeers: make(chan []lazyconn.PeerConfig, 1), managedPeers: make(map[wgtypes.Key]lazyconn.PeerConfig),
removePeer: make(chan wgtypes.Key, 1),
} }
return m return m
} }
@@ -57,6 +55,7 @@ func (m *Manager) Start() {
m.mu.Lock() m.mu.Lock()
cfg, ok := m.managedPeers[peerID] cfg, ok := m.managedPeers[peerID]
if !ok { if !ok {
m.mu.Unlock()
continue continue
} }
@@ -68,21 +67,25 @@ func (m *Manager) Start() {
m.mu.Lock() m.mu.Lock()
_, ok := m.managedPeers[peerID] _, ok := m.managedPeers[peerID]
if !ok { if !ok {
log.Debugf("lazy peer is not managed: %s", peerID)
m.mu.Unlock()
continue continue
} }
log.Infof("peer %s started to send traffic", peerID) //m.watcher.AddPeer(peerID)
m.watcher.AddPeer(peerID) log.Infof("lazy peer is active: %s", peerID)
m.notifyPeerAction(peerID) m.notifyPeerAction(ctx, peerID)
m.mu.Unlock() m.mu.Unlock()
} }
} }
} }
func (m *Manager) SetPeer(peer lazyconn.PeerConfig) error { func (m *Manager) AddPeer(peer lazyconn.PeerConfig) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
log.Debugf("adding lazy peer: %s", peer.PublicKey)
if _, ok := m.managedPeers[peer.PublicKey]; ok { if _, ok := m.managedPeers[peer.PublicKey]; ok {
return nil return nil
} }
@@ -91,17 +94,24 @@ func (m *Manager) SetPeer(peer lazyconn.PeerConfig) error {
return err return err
} }
// todo: remove removed peers from the list m.managedPeers[peer.PublicKey] = peer
return nil return nil
} }
func (m *Manager) RemovePeer(peerID wgtypes.Key) { func (m *Manager) RemovePeer(peerID wgtypes.Key) bool {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
if _, ok := m.managedPeers[peerID]; !ok {
return false
}
log.Debugf("removing lazy peer: %s", peerID)
m.watcher.RemovePeer(peerID) m.watcher.RemovePeer(peerID)
m.listenerMgr.RemovePeer(peerID) m.listenerMgr.RemovePeer(peerID)
delete(m.managedPeers, peerID) delete(m.managedPeers, peerID)
return false
} }
func (m *Manager) Close() { func (m *Manager) Close() {
@@ -113,6 +123,9 @@ func (m *Manager) Close() {
m.managedPeers = make(map[wgtypes.Key]lazyconn.PeerConfig) m.managedPeers = make(map[wgtypes.Key]lazyconn.PeerConfig)
} }
func (m *Manager) notifyPeerAction(peerID wgtypes.Key) { func (m *Manager) notifyPeerAction(ctx context.Context, peerID wgtypes.Key) {
// todo notify engine select {
case <-ctx.Done():
case m.PeerActivityChan <- peerID:
}
} }

View File

@@ -1,12 +1,12 @@
package lazyconn package lazyconn
import ( import (
"net" "net/netip"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
) )
type PeerConfig struct { type PeerConfig struct {
PublicKey wgtypes.Key PublicKey wgtypes.Key
AllowedIP net.IPNet AllowedIPs []netip.Prefix
} }

View File

@@ -2,6 +2,7 @@ package lazyconn
import ( import (
"net" "net"
"net/netip"
"time" "time"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
@@ -11,6 +12,6 @@ import (
type WGIface interface { type WGIface interface {
Transfers() (map[wgtypes.Key]configurer.WGStats, error) Transfers() (map[wgtypes.Key]configurer.WGStats, error)
RemovePeer(key wgtypes.Key) error RemovePeer(peerKey string) error
UpdatePeer(peerKey string, allowedIps string, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
} }