Refactor names ang logging

This commit is contained in:
Zoltán Papp
2025-03-13 16:02:36 +01:00
parent f50b5d3ec2
commit 231a481c0f
4 changed files with 52 additions and 50 deletions

View File

@@ -112,7 +112,7 @@ func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) {
return conn, true
}
if found := e.lazyConnMgr.RunIdleWatch(peerKey); found {
if found := e.lazyConnMgr.RunInactivityMonitor(peerKey); found {
conn.Open()
}
return conn, true
@@ -150,12 +150,12 @@ func (e *ConnMgr) receiveLazyEvents(ctx context.Context) {
select {
case <-ctx.Done():
return
case peerID := <-e.lazyConnMgr.OnDemand:
case peerID := <-e.lazyConnMgr.OnActive:
e.peerStore.PeerConnOpen(peerID)
case peerID := <-e.lazyConnMgr.Idle:
// todo consider to use engine lock
e.peerStore.PeerConnClose(peerID)
e.lazyConnMgr.RunOnDemandListener(peerID)
e.lazyConnMgr.RunActivityMonitor(peerID)
}
}
}

View File

@@ -43,13 +43,13 @@ func (m *Manager) MonitorPeerActivity(peerCfg lazyconn.PeerConfig) error {
defer m.mu.Unlock()
if _, ok := m.peers[peerCfg.PublicKey]; ok {
log.Warnf("on-demand listener already exists for: %s", peerCfg.PublicKey)
log.Warnf("activity listener already exists for: %s", peerCfg.PublicKey)
return nil
}
conn, addr, err := m.portGenerator.newConn()
if err != nil {
return fmt.Errorf("failed to bind lazy connection: %v", err)
return fmt.Errorf("failed to bind activity listener: %v", err)
}
listener, err := NewListener(m.wgIface, peerCfg, conn, addr)
@@ -58,10 +58,9 @@ func (m *Manager) MonitorPeerActivity(peerCfg lazyconn.PeerConfig) error {
}
m.peers[peerCfg.PublicKey] = listener
log.Infof("created on-demand listener: %s, for peer: %s", addr.String(), peerCfg.PublicKey)
go m.waitForTraffic(listener, peerCfg.PublicKey, peerCfg.PeerConnID)
log.Debugf("created lazy connection listener for: %s", peerCfg.PublicKey)
peerCfg.Log.Infof("created activity listener: %s", addr.String())
return nil
}
@@ -104,7 +103,7 @@ func (m *Manager) waitForTraffic(listener *Listener, peerID string, peerConnID p
}
func (m *Manager) notify(event OnAcitvityEvent) {
log.Debugf("peer started to send traffic: %s", event.PeerID)
log.Debugf("peer activity detected: %s", event.PeerID)
select {
case <-m.done:
case m.OnActivityChan <- event:

View File

@@ -37,7 +37,6 @@ func (i *InactivityMonitor) Start(ctx context.Context, timeoutChan chan string)
case <-i.timer.C:
select {
case timeoutChan <- i.peerID:
log.Infof("--- idle timeout for peer: %s", i.peerID)
case <-ctx.Done():
return
}
@@ -52,11 +51,9 @@ func (i *InactivityMonitor) Stop() {
}
func (i *InactivityMonitor) PauseTimer() {
log.Info("--- hangup idle timer")
i.timer.Stop()
}
func (i *InactivityMonitor) ResetTimer() {
log.Info("--- resetting idle timer")
i.timer.Reset(inactivityThreshold)
}

View File

@@ -15,32 +15,34 @@ import (
// Manager manages lazy connections
// This is not a thread safe implementation, do not call exported functions concurrently
type Manager struct {
OnDemand chan string
OnActive chan string
Idle chan string
connStateDispatcher *peer.ConnectionDispatcher
managedPeers map[string]lazyconn.PeerConfig
activityManager *activity.Manager
inactivityMonitors map[string]*inactivity.InactivityMonitor
managedPeers map[string]*lazyconn.PeerConfig
managedByConnID map[string]*lazyconn.PeerConfig
activityManager *activity.Manager
inactivityMonitors map[string]*inactivity.InactivityMonitor
excludes map[string]struct{}
managedPeersMu sync.Mutex
cancel context.CancelFunc
connStateListener *peer.ConnectionListener
onIdle chan string
onInactive chan string
}
func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager {
m := &Manager{
OnDemand: make(chan string, 1),
OnActive: make(chan string, 1),
Idle: make(chan string, 1),
connStateDispatcher: connStateDispatcher,
managedPeers: make(map[string]lazyconn.PeerConfig),
managedPeers: make(map[string]*lazyconn.PeerConfig),
activityManager: activity.NewManager(wgIface),
inactivityMonitors: make(map[string]*inactivity.InactivityMonitor),
excludes: make(map[string]struct{}),
onIdle: make(chan string),
onInactive: make(chan string),
}
m.connStateListener = &peer.ConnectionListener{
@@ -53,30 +55,30 @@ func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDi
return m
}
func (m *Manager) AddPeer(peer lazyconn.PeerConfig) (bool, error) {
func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
peer.Log.Debugf("adding peer to lazy connection manager")
peerCfg.Log.Debugf("adding peer to lazy connection manager")
_, exists := m.excludes[peer.PublicKey]
_, exists := m.excludes[peerCfg.PublicKey]
if exists {
return true, nil
}
if _, ok := m.managedPeers[peer.PublicKey]; ok {
peer.Log.Warnf("peer already managed")
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
peerCfg.Log.Warnf("peer already managed")
return false, nil
}
if err := m.activityManager.MonitorPeerActivity(peer); err != nil {
if err := m.activityManager.MonitorPeerActivity(peerCfg); err != nil {
return false, err
}
iw := inactivity.NewInactivityMonitor(peer.PublicKey)
m.inactivityMonitors[peer.PublicKey] = iw
iw := inactivity.NewInactivityMonitor(peerCfg.PublicKey)
m.inactivityMonitors[peerCfg.PublicKey] = iw
m.managedPeers[peer.PublicKey] = peer
m.managedPeers[peerCfg.PublicKey] = &peerCfg
return false, nil
}
@@ -91,18 +93,18 @@ func (m *Manager) RemovePeer(peerID string) {
cfg.Log.Infof("removing lazy peer")
if idleWatch, ok := m.inactivityMonitors[peerID]; ok {
idleWatch.Stop()
if im, ok := m.inactivityMonitors[peerID]; ok {
im.Stop()
delete(m.inactivityMonitors, peerID)
cfg.Log.Debugf("idle watch stopped")
cfg.Log.Debugf("inactivity monitor stopped")
}
m.activityManager.RemovePeer(peerID)
delete(m.managedPeers, peerID)
cfg.Log.Debugf("on-demand listener removed")
cfg.Log.Debugf("activity listener removed")
}
func (m *Manager) RunIdleWatch(peerID string) (found bool) {
func (m *Manager) RunInactivityMonitor(peerID string) (found bool) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -115,13 +117,13 @@ func (m *Manager) RunIdleWatch(peerID string) (found bool) {
return false
}
m.inactivityMonitors[peerID].PauseTimer()
m.inactivityMonitors[peerID].ResetTimer()
cfg.Log.Infof("stoped on-demand listener and idle watcher")
cfg.Log.Infof("stoped activity monitor and reset inactivity monitor")
return true
}
func (m *Manager) RunOnDemandListener(peerID string) {
func (m *Manager) RunActivityMonitor(peerID string) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -130,13 +132,13 @@ func (m *Manager) RunOnDemandListener(peerID string) {
return
}
cfg.Log.Infof("run on-demand listener")
cfg.Log.Infof("start activity monitor")
// just in case free up
m.inactivityMonitors[peerID].PauseTimer()
if err := m.activityManager.MonitorPeerActivity(cfg); err != nil {
cfg.Log.Errorf("failed to create on-demand listener: %v", err)
if err := m.activityManager.MonitorPeerActivity(*cfg); err != nil {
cfg.Log.Errorf("failed to create activity monitor: %v", err)
return
}
}
@@ -162,7 +164,7 @@ func (m *Manager) Close() {
iw.Stop()
}
m.inactivityMonitors = make(map[string]*inactivity.InactivityMonitor)
m.managedPeers = make(map[string]lazyconn.PeerConfig)
m.managedPeers = make(map[string]*lazyconn.PeerConfig)
log.Infof("lazy connection manager closed")
}
@@ -173,14 +175,14 @@ func (m *Manager) Start(ctx context.Context) {
case <-ctx.Done():
return
case e := <-m.activityManager.OnActivityChan:
m.onPeerDemand(ctx, e)
case peerID := <-m.onIdle:
m.onPeerIdleTimeout(ctx, peerID)
m.onPeerActivity(ctx, e)
case peerID := <-m.onInactive:
m.onPeerInactivityTimedOut(ctx, peerID)
}
}
}
func (m *Manager) onPeerDemand(ctx context.Context, e activity.OnAcitvityEvent) {
func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -196,17 +198,17 @@ func (m *Manager) onPeerDemand(ctx context.Context, e activity.OnAcitvityEvent)
return
}
pcfg.Log.Infof("starting idle watcher")
pcfg.Log.Infof("starting inactivity monitor")
go m.inactivityMonitors[e.PeerID].Start(ctx, m.Idle)
select {
case m.OnDemand <- e.PeerID:
case m.OnActive <- e.PeerID:
case <-ctx.Done():
return
}
}
func (m *Manager) onPeerIdleTimeout(ctx context.Context, peerID string) {
func (m *Manager) onPeerInactivityTimedOut(ctx context.Context, peerID string) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
@@ -239,16 +241,18 @@ func (m *Manager) onPeerConnected(conn *peer.Conn) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
if _, ok := m.excludes[conn.GetKey()]; ok {
peerCfg, ok := m.managedPeers[conn.GetKey()]
if !ok {
return
}
iw, ok := m.inactivityMonitors[conn.GetKey()]
if !ok {
conn.Log.Errorf("idle watch not found for peer")
conn.Log.Errorf("inactivity monitor not found for peer")
return
}
peerCfg.Log.Infof("pause inactivity monitor")
iw.PauseTimer()
}
@@ -256,7 +260,8 @@ func (m *Manager) onPeerDisconnected(conn *peer.Conn) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
if _, ok := m.excludes[conn.GetKey()]; ok {
peerCfg, ok := m.managedPeers[conn.GetKey()]
if !ok {
return
}
@@ -265,5 +270,6 @@ func (m *Manager) onPeerDisconnected(conn *peer.Conn) {
return
}
peerCfg.Log.Infof("reset inactivity monitor timer")
iw.ResetTimer()
}