diff --git a/client/internal/engine.go b/client/internal/engine.go index 351b21b2e..9c96dc50d 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -150,6 +150,8 @@ type Engine struct { signalProbe *Probe relayProbe *Probe wgProbe *Probe + + wgConnWorker sync.WaitGroup } // Peer is an instance of the Connection Peer @@ -245,6 +247,7 @@ func (e *Engine) Stop() error { time.Sleep(500 * time.Millisecond) e.close() + e.wgConnWorker.Wait() log.Infof("stopped Netbird Engine") return nil } @@ -869,18 +872,25 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error { log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err) } + e.wgConnWorker.Add(1) go e.connWorker(conn, peerKey) } return nil } func (e *Engine) connWorker(conn *peer.Conn, peerKey string) { + defer e.wgConnWorker.Done() for { // randomize starting time a bit min := 500 max := 2000 - time.Sleep(time.Duration(rand.Intn(max-min)+min) * time.Millisecond) + duration := time.Duration(rand.Intn(max-min)+min) * time.Millisecond + select { + case <-e.ctx.Done(): + return + case <-time.After(duration): + } // if peer has been removed -> give up if !e.peerExists(peerKey) { diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index 0239ae58a..f5a98cb7f 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -229,6 +229,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) { t.Fatal(err) } engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn}) + engine.ctx = ctx type testCase struct { name string @@ -408,6 +409,7 @@ func TestEngine_Sync(t *testing.T) { WgPrivateKey: key, WgPort: 33100, }, MobileDependency{}, peer.NewRecorder("https://mgm")) + engine.ctx = ctx engine.dnsServer = &dns.MockServer{ UpdateDNSServerFunc: func(serial uint64, update nbdns.Config) error { return nil }, @@ -566,6 +568,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) { WgPrivateKey: key, WgPort: 33100, }, MobileDependency{}, peer.NewRecorder("https://mgm")) + engine.ctx = ctx newNet, err := stdnet.NewNet() if err != nil { t.Fatal(err) @@ -735,6 +738,8 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) { WgPrivateKey: key, WgPort: 33100, }, MobileDependency{}, peer.NewRecorder("https://mgm")) + engine.ctx = ctx + newNet, err := stdnet.NewNet() if err != nil { t.Fatal(err) @@ -1003,7 +1008,9 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin WgPort: wgPort, } - return NewEngine(ctx, cancel, signalClient, mgmtClient, conf, MobileDependency{}, peer.NewRecorder("https://mgm")), nil + e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, conf, MobileDependency{}, peer.NewRecorder("https://mgm")), nil + e.ctx = ctx + return e, err } func startSignal() (*grpc.Server, string, error) {