Fix postponed remove endpoint config

This commit is contained in:
Zoltán Papp
2025-02-27 16:56:20 +01:00
parent d0d37babe2
commit 4a5edc1374
2 changed files with 89 additions and 71 deletions

View File

@@ -2,43 +2,68 @@ package listener
import (
"net"
"sync"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/lazyconn"
)
type Listener struct {
peerID string
conn *net.UDPConn
wgIface lazyconn.WGIface
peerCfg lazyconn.PeerConfig
conn *net.UDPConn
endpoint *net.UDPAddr
done sync.Mutex
}
func NewListener(peerID string, conn *net.UDPConn) *Listener {
func NewListener(wgIface lazyconn.WGIface, cfg lazyconn.PeerConfig, conn *net.UDPConn, addr *net.UDPAddr) (*Listener, error) {
d := &Listener{
conn: conn,
peerID: peerID,
wgIface: wgIface,
peerCfg: cfg,
conn: conn,
endpoint: addr,
}
return d
if err := d.createEndpoint(); err != nil {
return nil, err
}
d.done.Lock()
return d, nil
}
func (d *Listener) ReadPackets(trigger func(peerID string)) {
func (d *Listener) ReadPackets() {
for {
buffer := make([]byte, 10)
n, remoteAddr, err := d.conn.ReadFromUDP(buffer)
if err != nil {
log.Infof("exit from peer listener reader: %v", err)
return
break
}
if n < 4 {
log.Warnf("received %d bytes from %s, too short", n, remoteAddr)
continue
}
trigger(d.peerID)
break
}
d.removeEndpoint()
d.done.Unlock()
}
func (d *Listener) Close() {
if err := d.conn.Close(); err != nil {
log.Errorf("failed to close UDP listener: %s", err)
}
d.done.Lock()
}
func (d *Listener) removeEndpoint() {
if err := d.wgIface.RemovePeer(d.peerCfg.PublicKey); err != nil {
log.Warnf("failed to remove peer listener: %v", err)
}
}
func (d *Listener) createEndpoint() error {
return d.wgIface.UpdatePeer(d.peerCfg.PublicKey, d.peerCfg.AllowedIPs, 0, d.endpoint, nil)
}

View File

@@ -2,7 +2,6 @@ package listener
import (
"fmt"
"net"
"sync"
log "github.com/sirupsen/logrus"
@@ -16,10 +15,10 @@ type Manager struct {
wgIface lazyconn.WGIface
portGenerator *portAllocator
// todo peers add/remove is not thread safe because of the callback function
peers map[string]*Listener
wg sync.WaitGroup
done chan struct{}
peers map[string]*Listener
done chan struct{}
mu sync.Mutex
}
func NewManager(wgIface lazyconn.WGIface) *Manager {
@@ -34,83 +33,77 @@ func NewManager(wgIface lazyconn.WGIface) *Manager {
}
func (m *Manager) CreatePeerListener(peerCfg lazyconn.PeerConfig) error {
m.mu.Lock()
defer m.mu.Unlock()
if _, ok := m.peers[peerCfg.PublicKey]; ok {
return nil
}
if err := m.createPeerlistener(peerCfg); err != nil {
return err
}
log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey)
return nil
}
func (m *Manager) RemovePeer(peerID string) {
listener, ok := m.peers[peerID]
if !ok {
return
}
listener.Close()
m.removeEndpoint(peerID)
delete(m.peers, peerID)
}
func (m *Manager) Close() {
close(m.done)
for peerID, listener := range m.peers {
listener.Close()
delete(m.peers, peerID)
}
m.wg.Wait()
}
func (m *Manager) createPeerlistener(peerCfg lazyconn.PeerConfig) error {
conn, addr, err := m.portGenerator.newConn()
if err != nil {
return fmt.Errorf("failed to bind lazy connection: %v", err)
}
listener := NewListener(peerCfg.PublicKey, conn)
if err := m.createEndpoint(peerCfg, addr); err != nil {
log.Errorf("failed to create endpoint for %s: %v", peerCfg.PublicKey, err)
listener.Close()
listener, err := NewListener(m.wgIface, peerCfg, conn, addr)
if err != nil {
return err
}
m.peers[peerCfg.PublicKey] = listener
log.Infof("created on-demand listener: %s, for peer: %s", addr.String(), peerCfg.PublicKey)
go m.waitForTraffic(listener, peerCfg.PublicKey)
m.wg.Add(1)
go func() {
defer m.wg.Done()
listener.ReadPackets(m.onTrigger)
}()
m.peers[peerCfg.PublicKey] = listener
log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey)
return nil
}
// todo: it is not thread safe, but it is ok if we protect from upper layer
func (m *Manager) onTrigger(peerID string) {
log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID)
m.removeEndpoint(peerID)
func (m *Manager) RemovePeer(peerID string) {
m.mu.Lock()
listener, ok := m.peers[peerID]
if !ok {
m.mu.Unlock()
return
}
delete(m.peers, peerID)
listener.Close()
m.mu.Unlock()
}
func (m *Manager) Close() {
m.mu.Lock()
defer m.mu.Unlock()
close(m.done)
for peerID, listener := range m.peers {
listener.Close()
delete(m.peers, peerID)
}
}
func (m *Manager) waitForTraffic(listener *Listener, peerID string) {
listener.ReadPackets()
m.mu.Lock()
if _, ok := m.peers[peerID]; !ok {
return
}
delete(m.peers, peerID)
m.mu.Unlock()
m.notify(peerID)
}
// todo: cause issue in this scenario
// - notify peerID to TrafficStartChan
// - do not read the upper layer yet the event
// - RemovePeer(peerID string)
// at this moment we expect to never receive the event for peerID
// - read from TrafficStartChan and the event will be there
func (m *Manager) notify(peerID string) {
log.Debugf("peer started to send traffic, remove lazy endpoint: %s", peerID)
select {
case <-m.done:
case m.TrafficStartChan <- peerID:
}
}
func (m *Manager) removeEndpoint(peerID string) {
if err := m.wgIface.RemovePeer(peerID); err != nil {
log.Warnf("failed to remove peer listener: %v", err)
}
}
func (m *Manager) createEndpoint(peerCfg lazyconn.PeerConfig, endpoint *net.UDPAddr) error {
return m.wgIface.UpdatePeer(peerCfg.PublicKey, peerCfg.AllowedIPs, 0, endpoint, nil)
}