mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 16:26:38 +00:00
[Client] Remove connection semaphore (#5419)
* [Client] Remove connection semaphore Remove the semaphore and the initial random sleep time (300ms) from the connectivity logic to speed up the initial connection time. Note: Implement limiter logic that can prioritize router peers and keep the fast connection option for the first few peers. * Remove unused function
This commit is contained in:
@@ -53,13 +53,11 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/internal/updatemanager"
|
"github.com/netbirdio/netbird/client/internal/updatemanager"
|
||||||
"github.com/netbirdio/netbird/client/jobexec"
|
"github.com/netbirdio/netbird/client/jobexec"
|
||||||
cProto "github.com/netbirdio/netbird/client/proto"
|
cProto "github.com/netbirdio/netbird/client/proto"
|
||||||
"github.com/netbirdio/netbird/shared/management/domain"
|
|
||||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/system"
|
"github.com/netbirdio/netbird/client/system"
|
||||||
nbdns "github.com/netbirdio/netbird/dns"
|
nbdns "github.com/netbirdio/netbird/dns"
|
||||||
"github.com/netbirdio/netbird/route"
|
"github.com/netbirdio/netbird/route"
|
||||||
mgm "github.com/netbirdio/netbird/shared/management/client"
|
mgm "github.com/netbirdio/netbird/shared/management/client"
|
||||||
|
"github.com/netbirdio/netbird/shared/management/domain"
|
||||||
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
||||||
auth "github.com/netbirdio/netbird/shared/relay/auth/hmac"
|
auth "github.com/netbirdio/netbird/shared/relay/auth/hmac"
|
||||||
relayClient "github.com/netbirdio/netbird/shared/relay/client"
|
relayClient "github.com/netbirdio/netbird/shared/relay/client"
|
||||||
@@ -75,7 +73,6 @@ import (
|
|||||||
const (
|
const (
|
||||||
PeerConnectionTimeoutMax = 45000 // ms
|
PeerConnectionTimeoutMax = 45000 // ms
|
||||||
PeerConnectionTimeoutMin = 30000 // ms
|
PeerConnectionTimeoutMin = 30000 // ms
|
||||||
connInitLimit = 200
|
|
||||||
disableAutoUpdate = "disabled"
|
disableAutoUpdate = "disabled"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -208,7 +205,6 @@ type Engine struct {
|
|||||||
syncRespMux sync.RWMutex
|
syncRespMux sync.RWMutex
|
||||||
persistSyncResponse bool
|
persistSyncResponse bool
|
||||||
latestSyncResponse *mgmProto.SyncResponse
|
latestSyncResponse *mgmProto.SyncResponse
|
||||||
connSemaphore *semaphoregroup.SemaphoreGroup
|
|
||||||
flowManager nftypes.FlowManager
|
flowManager nftypes.FlowManager
|
||||||
|
|
||||||
// auto-update
|
// auto-update
|
||||||
@@ -266,7 +262,6 @@ func NewEngine(
|
|||||||
statusRecorder: statusRecorder,
|
statusRecorder: statusRecorder,
|
||||||
stateManager: stateManager,
|
stateManager: stateManager,
|
||||||
checks: checks,
|
checks: checks,
|
||||||
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
|
|
||||||
probeStunTurn: relay.NewStunTurnProbe(relay.DefaultCacheTTL),
|
probeStunTurn: relay.NewStunTurnProbe(relay.DefaultCacheTTL),
|
||||||
jobExecutor: jobexec.NewExecutor(),
|
jobExecutor: jobexec.NewExecutor(),
|
||||||
}
|
}
|
||||||
@@ -1539,7 +1534,6 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix, agentV
|
|||||||
IFaceDiscover: e.mobileDep.IFaceDiscover,
|
IFaceDiscover: e.mobileDep.IFaceDiscover,
|
||||||
RelayManager: e.relayManager,
|
RelayManager: e.relayManager,
|
||||||
SrWatcher: e.srWatcher,
|
SrWatcher: e.srWatcher,
|
||||||
Semaphore: e.connSemaphore,
|
|
||||||
}
|
}
|
||||||
peerConn, err := peer.NewConn(config, serviceDependencies)
|
peerConn, err := peer.NewConn(config, serviceDependencies)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -3,7 +3,6 @@ package peer
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"runtime"
|
"runtime"
|
||||||
@@ -25,7 +24,6 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/internal/stdnet"
|
"github.com/netbirdio/netbird/client/internal/stdnet"
|
||||||
"github.com/netbirdio/netbird/route"
|
"github.com/netbirdio/netbird/route"
|
||||||
relayClient "github.com/netbirdio/netbird/shared/relay/client"
|
relayClient "github.com/netbirdio/netbird/shared/relay/client"
|
||||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type ServiceDependencies struct {
|
type ServiceDependencies struct {
|
||||||
@@ -34,7 +32,6 @@ type ServiceDependencies struct {
|
|||||||
IFaceDiscover stdnet.ExternalIFaceDiscover
|
IFaceDiscover stdnet.ExternalIFaceDiscover
|
||||||
RelayManager *relayClient.Manager
|
RelayManager *relayClient.Manager
|
||||||
SrWatcher *guard.SRWatcher
|
SrWatcher *guard.SRWatcher
|
||||||
Semaphore *semaphoregroup.SemaphoreGroup
|
|
||||||
PeerConnDispatcher *dispatcher.ConnectionDispatcher
|
PeerConnDispatcher *dispatcher.ConnectionDispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -111,9 +108,8 @@ type Conn struct {
|
|||||||
wgProxyRelay wgproxy.Proxy
|
wgProxyRelay wgproxy.Proxy
|
||||||
handshaker *Handshaker
|
handshaker *Handshaker
|
||||||
|
|
||||||
guard *guard.Guard
|
guard *guard.Guard
|
||||||
semaphore *semaphoregroup.SemaphoreGroup
|
wg sync.WaitGroup
|
||||||
wg sync.WaitGroup
|
|
||||||
|
|
||||||
// debug purpose
|
// debug purpose
|
||||||
dumpState *stateDump
|
dumpState *stateDump
|
||||||
@@ -139,7 +135,6 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
|||||||
iFaceDiscover: services.IFaceDiscover,
|
iFaceDiscover: services.IFaceDiscover,
|
||||||
relayManager: services.RelayManager,
|
relayManager: services.RelayManager,
|
||||||
srWatcher: services.SrWatcher,
|
srWatcher: services.SrWatcher,
|
||||||
semaphore: services.Semaphore,
|
|
||||||
statusRelay: worker.NewAtomicStatus(),
|
statusRelay: worker.NewAtomicStatus(),
|
||||||
statusICE: worker.NewAtomicStatus(),
|
statusICE: worker.NewAtomicStatus(),
|
||||||
dumpState: dumpState,
|
dumpState: dumpState,
|
||||||
@@ -154,15 +149,10 @@ func NewConn(config ConnConfig, services ServiceDependencies) (*Conn, error) {
|
|||||||
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
||||||
// be used.
|
// be used.
|
||||||
func (conn *Conn) Open(engineCtx context.Context) error {
|
func (conn *Conn) Open(engineCtx context.Context) error {
|
||||||
if err := conn.semaphore.Add(engineCtx); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
if conn.opened {
|
if conn.opened {
|
||||||
conn.semaphore.Done()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -173,7 +163,6 @@ func (conn *Conn) Open(engineCtx context.Context) error {
|
|||||||
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
||||||
workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally)
|
workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.semaphore.Done()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
conn.workerICE = workerICE
|
conn.workerICE = workerICE
|
||||||
@@ -207,10 +196,6 @@ func (conn *Conn) Open(engineCtx context.Context) error {
|
|||||||
conn.wg.Add(1)
|
conn.wg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
defer conn.wg.Done()
|
defer conn.wg.Done()
|
||||||
|
|
||||||
conn.waitInitialRandomSleepTime(conn.ctx)
|
|
||||||
conn.semaphore.Done()
|
|
||||||
|
|
||||||
conn.guard.Start(conn.ctx, conn.onGuardEvent)
|
conn.guard.Start(conn.ctx, conn.onGuardEvent)
|
||||||
}()
|
}()
|
||||||
conn.opened = true
|
conn.opened = true
|
||||||
@@ -670,19 +655,6 @@ func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAdd
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) waitInitialRandomSleepTime(ctx context.Context) {
|
|
||||||
maxWait := 300
|
|
||||||
duration := time.Duration(rand.Intn(maxWait)) * time.Millisecond
|
|
||||||
|
|
||||||
timeout := time.NewTimer(duration)
|
|
||||||
defer timeout.Stop()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
case <-timeout.C:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (conn *Conn) isRelayed() bool {
|
func (conn *Conn) isRelayed() bool {
|
||||||
switch conn.currentConnPriority {
|
switch conn.currentConnPriority {
|
||||||
case conntype.Relay, conntype.ICETurn:
|
case conntype.Relay, conntype.ICETurn:
|
||||||
|
|||||||
@@ -15,7 +15,6 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/internal/peer/ice"
|
"github.com/netbirdio/netbird/client/internal/peer/ice"
|
||||||
"github.com/netbirdio/netbird/client/internal/stdnet"
|
"github.com/netbirdio/netbird/client/internal/stdnet"
|
||||||
"github.com/netbirdio/netbird/util"
|
"github.com/netbirdio/netbird/util"
|
||||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var testDispatcher = dispatcher.NewConnectionDispatcher()
|
var testDispatcher = dispatcher.NewConnectionDispatcher()
|
||||||
@@ -53,7 +52,6 @@ func TestConn_GetKey(t *testing.T) {
|
|||||||
|
|
||||||
sd := ServiceDependencies{
|
sd := ServiceDependencies{
|
||||||
SrWatcher: swWatcher,
|
SrWatcher: swWatcher,
|
||||||
Semaphore: semaphoregroup.NewSemaphoreGroup(1),
|
|
||||||
PeerConnDispatcher: testDispatcher,
|
PeerConnDispatcher: testDispatcher,
|
||||||
}
|
}
|
||||||
conn, err := NewConn(connConf, sd)
|
conn, err := NewConn(connConf, sd)
|
||||||
@@ -71,7 +69,6 @@ func TestConn_OnRemoteOffer(t *testing.T) {
|
|||||||
sd := ServiceDependencies{
|
sd := ServiceDependencies{
|
||||||
StatusRecorder: NewRecorder("https://mgm"),
|
StatusRecorder: NewRecorder("https://mgm"),
|
||||||
SrWatcher: swWatcher,
|
SrWatcher: swWatcher,
|
||||||
Semaphore: semaphoregroup.NewSemaphoreGroup(1),
|
|
||||||
PeerConnDispatcher: testDispatcher,
|
PeerConnDispatcher: testDispatcher,
|
||||||
}
|
}
|
||||||
conn, err := NewConn(connConf, sd)
|
conn, err := NewConn(connConf, sd)
|
||||||
@@ -110,7 +107,6 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
|
|||||||
sd := ServiceDependencies{
|
sd := ServiceDependencies{
|
||||||
StatusRecorder: NewRecorder("https://mgm"),
|
StatusRecorder: NewRecorder("https://mgm"),
|
||||||
SrWatcher: swWatcher,
|
SrWatcher: swWatcher,
|
||||||
Semaphore: semaphoregroup.NewSemaphoreGroup(1),
|
|
||||||
PeerConnDispatcher: testDispatcher,
|
PeerConnDispatcher: testDispatcher,
|
||||||
}
|
}
|
||||||
conn, err := NewConn(connConf, sd)
|
conn, err := NewConn(connConf, sd)
|
||||||
|
|||||||
Reference in New Issue
Block a user