introduce ConnPriorityStore

This commit is contained in:
Maycon Santos
2025-07-20 23:09:42 +02:00
parent cc1c77f6dc
commit cc78a3c65f
2 changed files with 29 additions and 16 deletions

View File

@@ -99,7 +99,7 @@ type Conn struct {
statusRelay *worker.AtomicWorkerStatus statusRelay *worker.AtomicWorkerStatus
statusICE *worker.AtomicWorkerStatus statusICE *worker.AtomicWorkerStatus
currentConnPriority conntype.ConnPriority currentConnPriority conntype.ConnPriorityStore
opened bool // this flag is used to prevent close in case of not opened connection opened bool // this flag is used to prevent close in case of not opened connection
workerICE *WorkerICE workerICE *WorkerICE
@@ -283,7 +283,7 @@ func (conn *Conn) Close(signalToRemote bool) {
// doesn't block, discards the message if connection wasn't ready // doesn't block, discards the message if connection wasn't ready
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool { func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool {
conn.dumpState.RemoteAnswer() conn.dumpState.RemoteAnswer()
conn.Log.Infof("OnRemoteAnswer, priority: %s, status ICE: %s, status relay: %s", conn.currentConnPriority, conn.statusICE, conn.statusRelay) conn.Log.Infof("OnRemoteAnswer, priority: %s, status ICE: %s, status relay: %s", conn.currentConnPriority.Get(), conn.statusICE, conn.statusRelay)
return conn.handshaker.OnRemoteAnswer(answer) return conn.handshaker.OnRemoteAnswer(answer)
} }
@@ -353,8 +353,8 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
// this never should happen, because Relay is the lower priority and ICE always close the deprecated connection before upgrade // this never should happen, because Relay is the lower priority and ICE always close the deprecated connection before upgrade
// todo consider to remove this check // todo consider to remove this check
if conn.currentConnPriority > priority { if conn.currentConnPriority.Get() > priority {
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority) conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority.Get(), priority)
conn.statusICE.SetConnected() conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo) conn.updateIceState(iceConnInfo)
return return
@@ -408,7 +408,7 @@ func (conn *Conn) onICEConnectionIsReady(priority conntype.ConnPriority, iceConn
} }
wgConfigWorkaround() wgConfigWorkaround()
conn.currentConnPriority = priority conn.currentConnPriority.Set(priority)
conn.statusICE.SetConnected() conn.statusICE.SetConnected()
conn.updateIceState(iceConnInfo) conn.updateIceState(iceConnInfo)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr) conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
@@ -445,10 +445,10 @@ func (conn *Conn) onICEStateDisconnected() {
defer conn.wgWatcherWg.Done() defer conn.wgWatcherWg.Done()
conn.workerRelay.EnableWgWatcher(conn.ctx) conn.workerRelay.EnableWgWatcher(conn.ctx)
}() }()
conn.currentConnPriority = conntype.Relay conn.currentConnPriority.Set(conntype.Relay)
} else { } else {
conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String()) conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", conntype.None.String())
conn.currentConnPriority = conntype.None conn.currentConnPriority.Set(conntype.None)
} }
changed := conn.statusICE.Get() != worker.StatusDisconnected changed := conn.statusICE.Get() != worker.StatusDisconnected
@@ -496,7 +496,7 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
conn.Log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String()) conn.Log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
if conn.isICEActive() { if conn.isICEActive() {
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String()) conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.Get().String())
conn.setRelayedProxy(wgProxy) conn.setRelayedProxy(wgProxy)
conn.statusRelay.SetConnected() conn.statusRelay.SetConnected()
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
@@ -524,7 +524,7 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
wgConfigWorkaround() wgConfigWorkaround()
conn.rosenpassRemoteKey = rci.rosenpassPubKey conn.rosenpassRemoteKey = rci.rosenpassPubKey
conn.currentConnPriority = conntype.Relay conn.currentConnPriority.Set(conntype.Relay)
conn.statusRelay.SetConnected() conn.statusRelay.SetConnected()
conn.setRelayedProxy(wgProxy) conn.setRelayedProxy(wgProxy)
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey) conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
@@ -542,9 +542,9 @@ func (conn *Conn) onRelayDisconnected() {
conn.Log.Debugf("relay connection is disconnected") conn.Log.Debugf("relay connection is disconnected")
if conn.currentConnPriority == conntype.Relay { if conn.currentConnPriority.Get() == conntype.Relay {
conn.Log.Debugf("clean up WireGuard config") conn.Log.Debugf("clean up WireGuard config")
conn.currentConnPriority = conntype.None conn.currentConnPriority.Set(conntype.None)
} }
if conn.wgProxyRelay != nil { if conn.wgProxyRelay != nil {
@@ -626,7 +626,7 @@ func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
func (conn *Conn) setStatusToDisconnected() { func (conn *Conn) setStatusToDisconnected() {
conn.statusRelay.SetDisconnected() conn.statusRelay.SetDisconnected()
conn.statusICE.SetDisconnected() conn.statusICE.SetDisconnected()
conn.currentConnPriority = conntype.None conn.currentConnPriority.Set(conntype.None)
peerState := State{ peerState := State{
PubKey: conn.config.Key, PubKey: conn.config.Key,
@@ -669,7 +669,7 @@ func (conn *Conn) waitInitialRandomSleepTime(ctx context.Context) {
} }
func (conn *Conn) isRelayed() bool { func (conn *Conn) isRelayed() bool {
switch conn.currentConnPriority { switch conn.currentConnPriority.Get() {
case conntype.Relay, conntype.ICETurn: case conntype.Relay, conntype.ICETurn:
return true return true
default: default:
@@ -753,11 +753,11 @@ func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
} }
func (conn *Conn) isReadyToUpgrade() bool { func (conn *Conn) isReadyToUpgrade() bool {
return conn.wgProxyRelay != nil && conn.currentConnPriority != conntype.Relay return conn.wgProxyRelay != nil && conn.currentConnPriority.Get() != conntype.Relay
} }
func (conn *Conn) isICEActive() bool { func (conn *Conn) isICEActive() bool {
return (conn.currentConnPriority == conntype.ICEP2P || conn.currentConnPriority == conntype.ICETurn) && conn.statusICE.Get() == worker.StatusConnected return (conn.currentConnPriority.Get() == conntype.ICEP2P || conn.currentConnPriority.Get() == conntype.ICETurn) && conn.statusICE.Get() == worker.StatusConnected
} }
func (conn *Conn) removeWgPeer() error { func (conn *Conn) removeWgPeer() error {

View File

@@ -2,6 +2,7 @@ package conntype
import ( import (
"fmt" "fmt"
"sync/atomic"
) )
const ( const (
@@ -11,7 +12,7 @@ const (
ICEP2P ConnPriority = 3 ICEP2P ConnPriority = 3
) )
type ConnPriority int type ConnPriority int32
func (cp ConnPriority) String() string { func (cp ConnPriority) String() string {
switch cp { switch cp {
@@ -27,3 +28,15 @@ func (cp ConnPriority) String() string {
return fmt.Sprintf("ConnPriority(%d)", cp) return fmt.Sprintf("ConnPriority(%d)", cp)
} }
} }
type ConnPriorityStore struct {
store atomic.Int32
}
func (cps *ConnPriorityStore) Get() ConnPriority {
return ConnPriority(cps.store.Load())
}
func (cps *ConnPriorityStore) Set(cp ConnPriority) {
cps.store.Store(int32(cp))
}