mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-30 19:59:56 +00:00
Compare commits
2 Commits
0.74.0-bra
...
refactor/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
50a29c07ce | ||
|
|
7d8e20030b |
@@ -54,19 +54,15 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
w.relaySupportedOnRemotePeer.Store(true)
|
||||
|
||||
// the relayManager will return with error in case if the connection has lost with relay server
|
||||
currentRelayAddress, _, err := w.relayManager.RelayInstanceAddress()
|
||||
_, _, err := w.relayManager.RelayInstanceAddress()
|
||||
if err != nil {
|
||||
w.log.Errorf("failed to handle new offer: %s", err)
|
||||
return
|
||||
}
|
||||
|
||||
srv := w.preferredRelayServer(currentRelayAddress, remoteOfferAnswer.RelaySrvAddress)
|
||||
var serverIP netip.Addr
|
||||
if srv == remoteOfferAnswer.RelaySrvAddress {
|
||||
serverIP = remoteOfferAnswer.RelaySrvIP
|
||||
}
|
||||
|
||||
relayedConn, err := w.relayManager.OpenConn(w.peerCtx, srv, w.config.Key, serverIP)
|
||||
preferForeign := !w.isController
|
||||
remoteRelayServer := relayClient.RelayServer{Addr: remoteOfferAnswer.RelaySrvAddress, IP: remoteOfferAnswer.RelaySrvIP}
|
||||
relayedConn, err := w.relayManager.OpenConn(w.peerCtx, remoteRelayServer, w.config.Key, preferForeign)
|
||||
if err != nil {
|
||||
if errors.Is(err, relayClient.ErrConnAlreadyExists) {
|
||||
w.log.Debugf("handled offer by reusing existing relay connection")
|
||||
@@ -80,14 +76,13 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
||||
w.relayedConn = relayedConn
|
||||
w.relayLock.Unlock()
|
||||
|
||||
err = w.relayManager.AddCloseListener(srv, w.onRelayClientDisconnected)
|
||||
if err != nil {
|
||||
log.Errorf("failed to add close listener: %s", err)
|
||||
if err := w.relayManager.AddCloseListener(relayedConn.RemoteAddr().String(), w.onRelayClientDisconnected); err != nil {
|
||||
w.log.Errorf("failed to add close listener: %s", err)
|
||||
_ = relayedConn.Close()
|
||||
return
|
||||
}
|
||||
|
||||
w.log.Debugf("peer conn opened via Relay: %s", srv)
|
||||
w.log.Debugf("peer conn opened via Relay: %s", relayedConn.RemoteAddr())
|
||||
go w.conn.onRelayConnectionIsReady(RelayConnInfo{
|
||||
relayedConn: relayedConn,
|
||||
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
|
||||
@@ -126,13 +121,6 @@ func (w *WorkerRelay) isRelaySupported(answer *OfferAnswer) bool {
|
||||
return answer.RelaySrvAddress != ""
|
||||
}
|
||||
|
||||
func (w *WorkerRelay) preferredRelayServer(myRelayAddress, remoteRelayAddress string) string {
|
||||
if w.isController {
|
||||
return myRelayAddress
|
||||
}
|
||||
return remoteRelayAddress
|
||||
}
|
||||
|
||||
func (w *WorkerRelay) onRelayClientDisconnected() {
|
||||
go w.conn.onRelayDisconnected()
|
||||
}
|
||||
|
||||
158
shared/relay/client/fallback.go
Normal file
158
shared/relay/client/fallback.go
Normal file
@@ -0,0 +1,158 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
raceTotalTimeout = 60 * time.Second
|
||||
raceFallbackDelay = 10 * time.Second
|
||||
)
|
||||
|
||||
type raceAttempt struct {
|
||||
conn net.Conn
|
||||
err error
|
||||
}
|
||||
|
||||
type connRace struct {
|
||||
racer *ConnRacer
|
||||
peerKey string
|
||||
remoteRelayServer RelayServer
|
||||
preferForeign bool
|
||||
|
||||
raceCtx context.Context
|
||||
otherCtx context.Context
|
||||
cancelPreferred context.CancelFunc
|
||||
cancelOther context.CancelFunc
|
||||
results chan raceAttempt
|
||||
fallbackTimer *time.Timer
|
||||
|
||||
otherStarted bool
|
||||
settled int
|
||||
lastErr error
|
||||
}
|
||||
|
||||
type ConnRacer struct {
|
||||
home *Client
|
||||
foreignStore *ForeignRelaysStore
|
||||
}
|
||||
|
||||
func NewConnRacer(home *Client, foreignStore *ForeignRelaysStore) *ConnRacer {
|
||||
return &ConnRacer{
|
||||
home: home,
|
||||
foreignStore: foreignStore,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ConnRacer) Run(ctx context.Context, peerKey string, remoteRelayServer RelayServer, preferForeign bool) (net.Conn, error) {
|
||||
raceCtx, cancel := context.WithTimeout(ctx, raceTotalTimeout)
|
||||
defer cancel()
|
||||
|
||||
preferredCtx, cancelPreferred := context.WithCancel(raceCtx)
|
||||
otherCtx, cancelOther := context.WithCancel(raceCtx)
|
||||
|
||||
race := &connRace{
|
||||
racer: r,
|
||||
peerKey: peerKey,
|
||||
remoteRelayServer: remoteRelayServer,
|
||||
preferForeign: preferForeign,
|
||||
raceCtx: raceCtx,
|
||||
otherCtx: otherCtx,
|
||||
cancelPreferred: cancelPreferred,
|
||||
cancelOther: cancelOther,
|
||||
results: make(chan raceAttempt, 2),
|
||||
fallbackTimer: time.NewTimer(raceFallbackDelay),
|
||||
}
|
||||
defer race.fallbackTimer.Stop()
|
||||
|
||||
go func() {
|
||||
race.results <- r.open(preferredCtx, peerKey, remoteRelayServer, preferForeign)
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-race.fallbackTimer.C:
|
||||
race.startOther()
|
||||
case res := <-race.results:
|
||||
if conn, err, done := race.handleResult(res); done {
|
||||
return conn, err
|
||||
}
|
||||
case <-raceCtx.Done():
|
||||
return race.onTimeout()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connRace) startOther() {
|
||||
if c.otherStarted {
|
||||
return
|
||||
}
|
||||
c.otherStarted = true
|
||||
c.fallbackTimer.Stop()
|
||||
go func() {
|
||||
c.results <- c.racer.open(c.otherCtx, c.peerKey, c.remoteRelayServer, !c.preferForeign)
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *connRace) handleResult(res raceAttempt) (net.Conn, error, bool) {
|
||||
if (res.err == nil && res.conn != nil) || errors.Is(res.err, ErrConnAlreadyExists) {
|
||||
c.stop()
|
||||
return res.conn, res.err, true
|
||||
}
|
||||
|
||||
c.lastErr = res.err
|
||||
c.settled++
|
||||
if !c.otherStarted {
|
||||
c.startOther()
|
||||
return nil, nil, false
|
||||
}
|
||||
if c.settled == 2 {
|
||||
c.cancelPreferred()
|
||||
c.cancelOther()
|
||||
return nil, c.lastErr, true
|
||||
}
|
||||
return nil, nil, false
|
||||
}
|
||||
|
||||
func (c *connRace) onTimeout() (net.Conn, error) {
|
||||
c.stop()
|
||||
if c.lastErr != nil {
|
||||
return nil, c.lastErr
|
||||
}
|
||||
return nil, c.raceCtx.Err()
|
||||
}
|
||||
|
||||
func (c *connRace) stop() {
|
||||
c.cancelPreferred()
|
||||
c.cancelOther()
|
||||
go c.racer.drainLoser(c.results, c.settled, c.otherStarted)
|
||||
}
|
||||
|
||||
func (r *ConnRacer) open(ctx context.Context, peerKey string, remoteRelayServer RelayServer, foreign bool) raceAttempt {
|
||||
if foreign {
|
||||
conn, err := r.foreignStore.OpenConn(ctx, peerKey, remoteRelayServer)
|
||||
return raceAttempt{conn: conn, err: err}
|
||||
}
|
||||
conn, err := r.home.OpenConn(ctx, peerKey)
|
||||
return raceAttempt{conn: conn, err: err}
|
||||
}
|
||||
|
||||
func (r *ConnRacer) drainLoser(results chan raceAttempt, settled int, otherStarted bool) {
|
||||
started := 1
|
||||
if otherStarted {
|
||||
started = 2
|
||||
}
|
||||
for i := settled; i < started; i++ {
|
||||
res := <-results
|
||||
if res.conn != nil {
|
||||
if err := res.conn.Close(); err != nil {
|
||||
log.Debugf("failed to close losing relay connection: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
155
shared/relay/client/foreign_relays.go
Normal file
155
shared/relay/client/foreign_relays.go
Normal file
@@ -0,0 +1,155 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.org/x/sync/singleflight"
|
||||
|
||||
relayAuth "github.com/netbirdio/netbird/shared/relay/auth/hmac"
|
||||
)
|
||||
|
||||
type foreignRelay struct {
|
||||
client *Client
|
||||
created time.Time
|
||||
inUse int
|
||||
}
|
||||
|
||||
type ForeignRelaysStore struct {
|
||||
mu sync.RWMutex
|
||||
clients map[string]*foreignRelay
|
||||
|
||||
group singleflight.Group
|
||||
|
||||
ctx context.Context
|
||||
tokenStore *relayAuth.TokenStore
|
||||
peerID string
|
||||
mtu uint16
|
||||
transportFallback *transportFallback
|
||||
onDisconnect func(string)
|
||||
keepUnusedServerTime time.Duration
|
||||
}
|
||||
|
||||
func NewForeignRelaysStore(ctx context.Context, tokenStore *relayAuth.TokenStore, peerID string, mtu uint16, transportFallback *transportFallback, onDisconnect func(string), keepUnusedServerTime time.Duration) *ForeignRelaysStore {
|
||||
return &ForeignRelaysStore{
|
||||
clients: make(map[string]*foreignRelay),
|
||||
ctx: ctx,
|
||||
tokenStore: tokenStore,
|
||||
peerID: peerID,
|
||||
mtu: mtu,
|
||||
transportFallback: transportFallback,
|
||||
onDisconnect: onDisconnect,
|
||||
keepUnusedServerTime: keepUnusedServerTime,
|
||||
}
|
||||
}
|
||||
|
||||
func (f *ForeignRelaysStore) OpenConn(ctx context.Context, peerKey string, remoteRelayServer RelayServer) (net.Conn, error) {
|
||||
fr, err := f.acquire(remoteRelayServer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer f.release(fr)
|
||||
|
||||
return fr.client.OpenConn(ctx, peerKey)
|
||||
}
|
||||
|
||||
func (f *ForeignRelaysStore) acquire(remoteRelayServer RelayServer) (*foreignRelay, error) {
|
||||
f.mu.Lock()
|
||||
if fr, ok := f.clients[remoteRelayServer.Addr]; ok {
|
||||
fr.inUse++
|
||||
f.mu.Unlock()
|
||||
return fr, nil
|
||||
}
|
||||
f.mu.Unlock()
|
||||
|
||||
v, err, _ := f.group.Do(remoteRelayServer.Addr, func() (any, error) {
|
||||
f.mu.RLock()
|
||||
fr, ok := f.clients[remoteRelayServer.Addr]
|
||||
f.mu.RUnlock()
|
||||
if ok {
|
||||
return fr, nil
|
||||
}
|
||||
|
||||
relayClient := NewClientWithServerIP(remoteRelayServer.Addr, remoteRelayServer.IP, f.tokenStore, f.peerID, f.mtu)
|
||||
relayClient.SetTransportFallback(f.transportFallback)
|
||||
if err := relayClient.Connect(f.ctx); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
relayClient.SetOnDisconnectListener(f.onDisconnect)
|
||||
|
||||
f.mu.Lock()
|
||||
fr = &foreignRelay{client: relayClient, created: time.Now()}
|
||||
f.clients[remoteRelayServer.Addr] = fr
|
||||
f.mu.Unlock()
|
||||
return fr, nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fr := v.(*foreignRelay)
|
||||
f.mu.Lock()
|
||||
if cur, ok := f.clients[remoteRelayServer.Addr]; !ok || cur != fr {
|
||||
f.mu.Unlock()
|
||||
return f.acquire(remoteRelayServer)
|
||||
}
|
||||
fr.inUse++
|
||||
f.mu.Unlock()
|
||||
return fr, nil
|
||||
}
|
||||
|
||||
func (f *ForeignRelaysStore) release(fr *foreignRelay) {
|
||||
f.mu.Lock()
|
||||
fr.inUse--
|
||||
f.mu.Unlock()
|
||||
}
|
||||
|
||||
func (f *ForeignRelaysStore) evict(serverAddress string) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
if _, ok := f.clients[serverAddress]; ok {
|
||||
delete(f.clients, serverAddress)
|
||||
log.Debugf("evicted disconnected foreign relay client: %s", serverAddress)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *ForeignRelaysStore) cleanupUnused() {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
for addr, fr := range f.clients {
|
||||
if time.Since(fr.created) <= f.keepUnusedServerTime {
|
||||
continue
|
||||
}
|
||||
if fr.inUse > 0 {
|
||||
continue
|
||||
}
|
||||
if fr.client.HasConns() {
|
||||
continue
|
||||
}
|
||||
fr.client.SetOnDisconnectListener(nil)
|
||||
go func() {
|
||||
_ = fr.client.Close()
|
||||
}()
|
||||
log.Debugf("clean up unused relay server connection: %s", addr)
|
||||
delete(f.clients, addr)
|
||||
}
|
||||
}
|
||||
|
||||
func (f *ForeignRelaysStore) states() []RelayConnState {
|
||||
f.mu.RLock()
|
||||
clients := make([]*Client, 0, len(f.clients))
|
||||
for _, fr := range f.clients {
|
||||
clients = append(clients, fr.client)
|
||||
}
|
||||
f.mu.RUnlock()
|
||||
|
||||
states := make([]RelayConnState, 0, len(clients))
|
||||
for _, c := range clients {
|
||||
states = append(states, relayConnState(c))
|
||||
}
|
||||
return states
|
||||
}
|
||||
@@ -22,22 +22,6 @@ var (
|
||||
ErrRelayClientNotConnected = fmt.Errorf("relay client not connected")
|
||||
)
|
||||
|
||||
// RelayTrack hold the relay clients for the foreign relay servers.
|
||||
// With the mutex can ensure we can open new connection in case the relay connection has been established with
|
||||
// the relay server.
|
||||
type RelayTrack struct {
|
||||
sync.RWMutex
|
||||
relayClient *Client
|
||||
err error
|
||||
created time.Time
|
||||
}
|
||||
|
||||
func NewRelayTrack() *RelayTrack {
|
||||
return &RelayTrack{
|
||||
created: time.Now(),
|
||||
}
|
||||
}
|
||||
|
||||
type OnServerCloseListener func()
|
||||
|
||||
// ManagerOption configures a Manager at construction time.
|
||||
@@ -54,6 +38,11 @@ type RelayConnState struct {
|
||||
Err error
|
||||
}
|
||||
|
||||
type RelayServer struct {
|
||||
Addr string
|
||||
IP netip.Addr
|
||||
}
|
||||
|
||||
// WithMaxBackoffInterval caps the exponential backoff between reconnect
|
||||
// attempts to the home relay. A non-positive value keeps the default.
|
||||
func WithMaxBackoffInterval(d time.Duration) ManagerOption {
|
||||
@@ -78,8 +67,7 @@ type Manager struct {
|
||||
relayClientMu sync.RWMutex
|
||||
reconnectGuard *Guard
|
||||
|
||||
relayClients map[string]*RelayTrack
|
||||
relayClientsMutex sync.RWMutex
|
||||
foreign *ForeignRelaysStore
|
||||
|
||||
onDisconnectedListeners map[string]*list.List
|
||||
onReconnectedListenerFn func()
|
||||
@@ -115,7 +103,6 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin
|
||||
ConnectionTimeout: defaultConnectionTimeout,
|
||||
TransportFallback: tf,
|
||||
},
|
||||
relayClients: make(map[string]*RelayTrack),
|
||||
onDisconnectedListeners: make(map[string]*list.List),
|
||||
cleanupInterval: relayCleanupInterval,
|
||||
keepUnusedServerTime: keepUnusedServerTime,
|
||||
@@ -123,6 +110,7 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin
|
||||
for _, opt := range opts {
|
||||
opt(m)
|
||||
}
|
||||
m.foreign = NewForeignRelaysStore(ctx, tokenStore, peerID, mtu, tf, m.onServerDisconnected, m.keepUnusedServerTime)
|
||||
m.serverPicker.ServerURLs.Store(serverURLs)
|
||||
m.reconnectGuard = NewGuard(m.serverPicker, m.maxBackoffInterval)
|
||||
return m
|
||||
@@ -154,13 +142,7 @@ func (m *Manager) Serve() error {
|
||||
return err
|
||||
}
|
||||
|
||||
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
|
||||
// established via the relay server. If the peer is on a different relay server, the manager will establish a new
|
||||
// connection to the relay server. It returns back with a net.Conn what represent the remote peer connection.
|
||||
//
|
||||
// serverIP, when valid and serverAddress is foreign, is used as a dial target if the FQDN-based dial fails.
|
||||
// Ignored for the local home-server path. TLS verification still uses the FQDN via SNI.
|
||||
func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string, serverIP netip.Addr) (net.Conn, error) {
|
||||
func (m *Manager) OpenConn(ctx context.Context, remoteRelayServer RelayServer, peerKey string, preferForeign bool) (net.Conn, error) {
|
||||
m.relayClientMu.RLock()
|
||||
defer m.relayClientMu.RUnlock()
|
||||
|
||||
@@ -168,26 +150,17 @@ func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string, s
|
||||
return nil, ErrRelayClientNotConnected
|
||||
}
|
||||
|
||||
foreign, err := m.isForeignServer(serverAddress)
|
||||
foreign, err := m.isForeignServer(remoteRelayServer.Addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var (
|
||||
netConn net.Conn
|
||||
)
|
||||
if !foreign {
|
||||
log.Debugf("open peer connection via permanent server: %s", peerKey)
|
||||
netConn, err = m.relayClient.OpenConn(ctx, peerKey)
|
||||
} else {
|
||||
log.Debugf("open peer connection via foreign server: %s", serverAddress)
|
||||
netConn, err = m.openConnVia(ctx, serverAddress, peerKey, serverIP)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return m.relayClient.OpenConn(ctx, peerKey)
|
||||
}
|
||||
|
||||
return netConn, err
|
||||
racer := NewConnRacer(m.relayClient, m.foreign)
|
||||
return racer.Run(ctx, peerKey, remoteRelayServer, preferForeign)
|
||||
}
|
||||
|
||||
// Ready returns true if the home Relay client is connected to the relay server.
|
||||
@@ -282,26 +255,7 @@ func (m *Manager) RelayStates() []RelayConnState {
|
||||
states = append(states, st)
|
||||
}
|
||||
|
||||
// Snapshot the tracks, then query each outside the map lock: a track can be
|
||||
// held by an in-progress Connect, and blocking on it must not stall other
|
||||
// relay operations.
|
||||
m.relayClientsMutex.RLock()
|
||||
tracks := make([]*RelayTrack, 0, len(m.relayClients))
|
||||
for _, rt := range m.relayClients {
|
||||
tracks = append(tracks, rt)
|
||||
}
|
||||
m.relayClientsMutex.RUnlock()
|
||||
|
||||
// Only connected foreign relays carry state; a failed connect is evicted
|
||||
// immediately (openConnVia), so there is no error state to surface.
|
||||
for _, rt := range tracks {
|
||||
rt.RLock()
|
||||
rc := rt.relayClient
|
||||
rt.RUnlock()
|
||||
if rc != nil {
|
||||
states = append(states, relayConnState(rc))
|
||||
}
|
||||
}
|
||||
states = append(states, m.foreign.states()...)
|
||||
|
||||
return states
|
||||
}
|
||||
@@ -322,64 +276,6 @@ func (m *Manager) UpdateToken(token *relayAuth.Token) error {
|
||||
return m.tokenStore.UpdateToken(token)
|
||||
}
|
||||
|
||||
func (m *Manager) openConnVia(ctx context.Context, serverAddress, peerKey string, serverIP netip.Addr) (net.Conn, error) {
|
||||
// check if already has a connection to the desired relay server
|
||||
m.relayClientsMutex.RLock()
|
||||
rt, ok := m.relayClients[serverAddress]
|
||||
if ok {
|
||||
rt.RLock()
|
||||
m.relayClientsMutex.RUnlock()
|
||||
defer rt.RUnlock()
|
||||
if rt.err != nil {
|
||||
return nil, rt.err
|
||||
}
|
||||
return rt.relayClient.OpenConn(ctx, peerKey)
|
||||
}
|
||||
m.relayClientsMutex.RUnlock()
|
||||
|
||||
// if not, establish a new connection but check it again (because changed the lock type) before starting the
|
||||
// connection
|
||||
m.relayClientsMutex.Lock()
|
||||
rt, ok = m.relayClients[serverAddress]
|
||||
if ok {
|
||||
rt.RLock()
|
||||
m.relayClientsMutex.Unlock()
|
||||
defer rt.RUnlock()
|
||||
if rt.err != nil {
|
||||
return nil, rt.err
|
||||
}
|
||||
return rt.relayClient.OpenConn(ctx, peerKey)
|
||||
}
|
||||
|
||||
// create a new relay client and store it in the relayClients map
|
||||
rt = NewRelayTrack()
|
||||
rt.Lock()
|
||||
m.relayClients[serverAddress] = rt
|
||||
m.relayClientsMutex.Unlock()
|
||||
|
||||
relayClient := NewClientWithServerIP(serverAddress, serverIP, m.tokenStore, m.peerID, m.mtu)
|
||||
relayClient.SetTransportFallback(m.transportFallback)
|
||||
err := relayClient.Connect(m.ctx)
|
||||
if err != nil {
|
||||
rt.err = err
|
||||
rt.Unlock()
|
||||
m.relayClientsMutex.Lock()
|
||||
delete(m.relayClients, serverAddress)
|
||||
m.relayClientsMutex.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
// if connection closed then delete the relay client from the list
|
||||
relayClient.SetOnDisconnectListener(m.onServerDisconnected)
|
||||
rt.relayClient = relayClient
|
||||
rt.Unlock()
|
||||
|
||||
conn, err := relayClient.OpenConn(ctx, peerKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func (m *Manager) onServerConnected() {
|
||||
m.listenerLock.Lock()
|
||||
defer m.listenerLock.Unlock()
|
||||
@@ -405,21 +301,12 @@ func (m *Manager) onServerDisconnected(serverAddress string) {
|
||||
m.relayClientMu.Unlock()
|
||||
|
||||
if !isHome {
|
||||
m.evictForeignRelay(serverAddress)
|
||||
m.foreign.evict(serverAddress)
|
||||
}
|
||||
|
||||
m.notifyOnDisconnectListeners(serverAddress)
|
||||
}
|
||||
|
||||
func (m *Manager) evictForeignRelay(serverAddress string) {
|
||||
m.relayClientsMutex.Lock()
|
||||
defer m.relayClientsMutex.Unlock()
|
||||
if _, ok := m.relayClients[serverAddress]; ok {
|
||||
delete(m.relayClients, serverAddress)
|
||||
log.Debugf("evicted disconnected foreign relay client: %s", serverAddress)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) listenGuardEvent(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
@@ -458,43 +345,11 @@ func (m *Manager) startCleanupLoop() {
|
||||
case <-m.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.cleanUpUnusedRelays()
|
||||
m.foreign.cleanupUnused()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) cleanUpUnusedRelays() {
|
||||
m.relayClientsMutex.Lock()
|
||||
defer m.relayClientsMutex.Unlock()
|
||||
|
||||
for addr, rt := range m.relayClients {
|
||||
rt.Lock()
|
||||
// if the connection failed to the server the relay client will be nil
|
||||
// but the instance will be kept in the relayClients until the next locking
|
||||
if rt.err != nil {
|
||||
rt.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
if time.Since(rt.created) <= m.keepUnusedServerTime {
|
||||
rt.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
if rt.relayClient.HasConns() {
|
||||
rt.Unlock()
|
||||
continue
|
||||
}
|
||||
rt.relayClient.SetOnDisconnectListener(nil)
|
||||
go func() {
|
||||
_ = rt.relayClient.Close()
|
||||
}()
|
||||
log.Debugf("clean up unused relay server connection: %s", addr)
|
||||
delete(m.relayClients, addr)
|
||||
rt.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) addListener(serverAddress string, onClosedListener OnServerCloseListener) {
|
||||
m.listenerLock.Lock()
|
||||
defer m.listenerLock.Unlock()
|
||||
|
||||
Reference in New Issue
Block a user