Further adjust structure to include peer monitor

Former-commit-id: 5a2918b2a4
This commit is contained in:
Owen
2025-12-01 21:55:11 -05:00
parent 45ef6e5279
commit 51162d6be6
4 changed files with 53 additions and 47 deletions

View File

@@ -407,28 +407,7 @@ func StartTunnel(config TunnelConfig) {
LocalIP: interfaceIP,
SharedBind: sharedBind,
WSClient: wsClientForMonitor,
StatusCallback: func(siteID int, connected bool, rtt time.Duration) {
// Find the site config to get endpoint information
var endpoint string
var isRelay bool
for _, site := range wgData.Sites {
if site.SiteId == siteID {
if site.RelayEndpoint != "" {
endpoint = site.RelayEndpoint
} else {
endpoint = site.Endpoint
}
isRelay = site.RelayEndpoint != ""
break
}
}
apiServer.UpdatePeerStatus(siteID, connected, rtt, endpoint, isRelay)
if connected {
logger.Info("Peer %d is now connected (RTT: %v)", siteID, rtt)
} else {
logger.Warn("Peer %d is disconnected", siteID)
}
},
APIServer: apiServer,
})
for i := range wgData.Sites {
@@ -450,14 +429,12 @@ func StartTunnel(config TunnelConfig) {
logger.Info("Configured peer %s", site.PublicKey)
}
peerManager.SetHolepunchStatusCallback(func(siteID int, endpoint string, connected bool, rtt time.Duration) {
// This callback is for additional handling if needed
// The PeerMonitor already logs status changes
logger.Info("+++++++++++++++++++++++++ holepunch monitor callback for site %d, endpoint %s, connected: %v, rtt: %v", siteID, endpoint, connected, rtt)
})
peerManager.Start()
if err := dnsProxy.Start(); err != nil { // start DNS proxy first so there is no downtime
logger.Error("Failed to start DNS proxy: %v", err)
}
if config.OverrideDNS {
// Set up DNS override to use our DNS proxy
if err := dnsOverride.SetupDNSOverride(interfaceName, dnsProxy); err != nil {
@@ -466,10 +443,6 @@ func StartTunnel(config TunnelConfig) {
}
}
if err := dnsProxy.Start(); err != nil {
logger.Error("Failed to start DNS proxy: %v", err)
}
apiServer.SetRegistered(true)
connected = true

View File

@@ -11,6 +11,7 @@ import (
"github.com/fosrl/newt/bind"
"github.com/fosrl/newt/logger"
"github.com/fosrl/newt/network"
"github.com/fosrl/olm/api"
olmDevice "github.com/fosrl/olm/device"
"github.com/fosrl/olm/dns"
"github.com/fosrl/olm/peers/monitor"
@@ -19,9 +20,6 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
// PeerStatusCallback is called when a peer's connection status changes
type PeerStatusCallback func(siteID int, connected bool, rtt time.Duration)
// HolepunchStatusCallback is called when holepunch connection status changes
// This is an alias for monitor.HolepunchStatusCallback
type HolepunchStatusCallback = monitor.HolepunchStatusCallback
@@ -37,9 +35,8 @@ type PeerManagerConfig struct {
LocalIP string
SharedBind *bind.SharedBind
// WSClient is optional - if nil, relay messages won't be sent
WSClient *websocket.Client
// StatusCallback is called when peer connection status changes
StatusCallback PeerStatusCallback
WSClient *websocket.Client
APIServer *api.API
}
type PeerManager struct {
@@ -56,8 +53,7 @@ type PeerManager struct {
// allowedIPClaims tracks all peers that claim each allowed IP
// key is the CIDR string, value is a set of siteIds that want this IP
allowedIPClaims map[string]map[int]bool
// statusCallback is called when peer connection status changes
statusCallback PeerStatusCallback
APIServer *api.API
}
// NewPeerManager creates a new PeerManager with an internal PeerMonitor
@@ -70,15 +66,37 @@ func NewPeerManager(config PeerManagerConfig) *PeerManager {
privateKey: config.PrivateKey,
allowedIPOwners: make(map[string]int),
allowedIPClaims: make(map[string]map[int]bool),
statusCallback: config.StatusCallback,
APIServer: config.APIServer,
}
// Create the peer monitor
pm.peerMonitor = monitor.NewPeerMonitor(
func(siteID int, connected bool, rtt time.Duration) {
// Call the external status callback if set
if pm.statusCallback != nil {
pm.statusCallback(siteID, connected, rtt)
// Update API status directly
if pm.APIServer != nil {
// Find the peer config to get endpoint information
pm.mu.RLock()
peer, exists := pm.peers[siteID]
pm.mu.RUnlock()
var endpoint string
var isRelay bool
if exists {
if peer.RelayEndpoint != "" {
endpoint = peer.RelayEndpoint
isRelay = true
} else {
endpoint = peer.Endpoint
isRelay = false
}
}
pm.APIServer.UpdatePeerStatus(siteID, connected, rtt, endpoint, isRelay)
}
if connected {
logger.Info("Peer %d is now connected (RTT: %v)", siteID, rtt)
} else {
logger.Warn("Peer %d is disconnected", siteID)
}
},
config.WSClient,
@@ -154,7 +172,7 @@ func (pm *PeerManager) AddPeer(siteConfig SiteConfig, endpoint string) error {
monitorAddress := strings.Split(siteConfig.ServerIP, "/")[0]
monitorPeer := net.JoinHostPort(monitorAddress, strconv.Itoa(int(siteConfig.ServerPort+1))) // +1 for the monitor port
err := pm.peerMonitor.AddPeer(siteConfig.SiteId, monitorPeer)
err := pm.peerMonitor.AddPeer(siteConfig.SiteId, monitorPeer, siteConfig.Endpoint) // always use the real site endpoint for hole punch monitoring
if err != nil {
logger.Warn("Failed to setup monitoring for site %d: %v", siteConfig.SiteId, err)
} else {
@@ -371,6 +389,8 @@ func (pm *PeerManager) UpdatePeer(siteConfig SiteConfig, endpoint string) error
pm.dnsProxy.AddDNSRecord(alias.Alias, address)
}
pm.peerMonitor.UpdateHolepunchEndpoint(siteConfig.SiteId, siteConfig.Endpoint)
pm.peers[siteConfig.SiteId] = siteConfig
return nil
}

View File

@@ -138,7 +138,7 @@ func (pm *PeerMonitor) SetMaxAttempts(attempts int) {
}
// AddPeer adds a new peer to monitor
func (pm *PeerMonitor) AddPeer(siteID int, endpoint string) error {
func (pm *PeerMonitor) AddPeer(siteID int, endpoint string, holepunchEndpoint string) error {
pm.mutex.Lock()
defer pm.mutex.Unlock()
@@ -157,7 +157,8 @@ func (pm *PeerMonitor) AddPeer(siteID int, endpoint string) error {
client.SetMaxAttempts(pm.maxAttempts)
pm.monitors[siteID] = client
pm.holepunchEndpoints[siteID] = endpoint
pm.holepunchEndpoints[siteID] = holepunchEndpoint
pm.holepunchStatus[siteID] = false // Initially unknown/disconnected
if pm.running {
@@ -171,6 +172,14 @@ func (pm *PeerMonitor) AddPeer(siteID int, endpoint string) error {
return nil
}
// update holepunch endpoint for a peer
func (pm *PeerMonitor) UpdateHolepunchEndpoint(siteID int, endpoint string) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.holepunchEndpoints[siteID] = endpoint
}
// removePeerUnlocked stops monitoring a peer and removes it from the monitor
// This function assumes the mutex is already held by the caller
func (pm *PeerMonitor) removePeerUnlocked(siteID int) {
@@ -189,6 +198,10 @@ func (pm *PeerMonitor) RemovePeer(siteID int) {
pm.mutex.Lock()
defer pm.mutex.Unlock()
// remove the holepunch endpoint info
delete(pm.holepunchEndpoints, siteID)
delete(pm.holepunchStatus, siteID)
pm.removePeerUnlocked(siteID)
}