mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 07:16:38 +00:00
[client] Check the client status in the earlier phase (#4509)
This PR improves the NetBird client's status checking mechanism by implementing earlier detection of client state changes and better handling of connection lifecycle management. The key improvements focus on: • Enhanced status detection - Added waitForReady option to StatusRequest for improved client status handling • Better connection management - Improved context handling for signal and management gRPC connections• Reduced connection timeouts - Increased gRPC dial timeout from 3 to 10 seconds for better reliability • Cleaner error handling - Enhanced error propagation and context cancellation in retry loops Key Changes Core Status Improvements: - Added waitForReady optional field to StatusRequest proto (daemon.proto:190) - Enhanced status checking logic to detect client state changes earlier in the connection process - Improved handling of client permanent exit scenarios from retry loops Connection & Context Management: - Fixed context cancellation in management and signal client retry mechanisms - Added proper context propagation for Login operations - Enhanced gRPC connection handling with better timeout management Error Handling & Cleanup: - Moved feedback channels to upper layers for better separation of concerns - Improved error handling patterns throughout the client server implementation - Fixed synchronization issues and removed debug logging
This commit is contained in:
@@ -67,6 +67,7 @@ type Server struct {
|
||||
proto.UnimplementedDaemonServiceServer
|
||||
clientRunning bool // protected by mutex
|
||||
clientRunningChan chan struct{}
|
||||
clientGiveUpChan chan struct{}
|
||||
|
||||
connectClient *internal.ConnectClient
|
||||
|
||||
@@ -106,6 +107,10 @@ func (s *Server) Start() error {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if s.clientRunning {
|
||||
return nil
|
||||
}
|
||||
|
||||
state := internal.CtxGetState(s.rootCtx)
|
||||
|
||||
if err := handlePanicLog(); err != nil {
|
||||
@@ -175,12 +180,10 @@ func (s *Server) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
if s.clientRunning {
|
||||
return nil
|
||||
}
|
||||
s.clientRunning = true
|
||||
s.clientRunningChan = make(chan struct{}, 1)
|
||||
go s.connectWithRetryRuns(ctx, config, s.statusRecorder, s.clientRunningChan)
|
||||
s.clientRunningChan = make(chan struct{})
|
||||
s.clientGiveUpChan = make(chan struct{})
|
||||
go s.connectWithRetryRuns(ctx, config, s.statusRecorder, s.clientRunningChan, s.clientGiveUpChan)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -211,7 +214,7 @@ func (s *Server) setDefaultConfigIfNotExists(ctx context.Context) error {
|
||||
// connectWithRetryRuns runs the client connection with a backoff strategy where we retry the operation as additional
|
||||
// mechanism to keep the client connected even when the connection is lost.
|
||||
// we cancel retry if the client receive a stop or down command, or if disable auto connect is configured.
|
||||
func (s *Server) connectWithRetryRuns(ctx context.Context, profileConfig *profilemanager.Config, statusRecorder *peer.Status, runningChan chan struct{}) {
|
||||
func (s *Server) connectWithRetryRuns(ctx context.Context, profileConfig *profilemanager.Config, statusRecorder *peer.Status, runningChan chan struct{}, giveUpChan chan struct{}) {
|
||||
defer func() {
|
||||
s.mutex.Lock()
|
||||
s.clientRunning = false
|
||||
@@ -261,6 +264,10 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, profileConfig *profil
|
||||
if err := backoff.Retry(runOperation, backOff); err != nil {
|
||||
log.Errorf("operation failed: %v", err)
|
||||
}
|
||||
|
||||
if giveUpChan != nil {
|
||||
close(giveUpChan)
|
||||
}
|
||||
}
|
||||
|
||||
// loginAttempt attempts to login using the provided information. it returns a status in case something fails
|
||||
@@ -379,7 +386,7 @@ func (s *Server) Login(callerCtx context.Context, msg *proto.LoginRequest) (*pro
|
||||
if s.actCancel != nil {
|
||||
s.actCancel()
|
||||
}
|
||||
ctx, cancel := context.WithCancel(s.rootCtx)
|
||||
ctx, cancel := context.WithCancel(callerCtx)
|
||||
|
||||
md, ok := metadata.FromIncomingContext(callerCtx)
|
||||
if ok {
|
||||
@@ -389,11 +396,11 @@ func (s *Server) Login(callerCtx context.Context, msg *proto.LoginRequest) (*pro
|
||||
s.actCancel = cancel
|
||||
s.mutex.Unlock()
|
||||
|
||||
if err := restoreResidualState(ctx, s.profileManager.GetStatePath()); err != nil {
|
||||
if err := restoreResidualState(s.rootCtx, s.profileManager.GetStatePath()); err != nil {
|
||||
log.Warnf(errRestoreResidualState, err)
|
||||
}
|
||||
|
||||
state := internal.CtxGetState(ctx)
|
||||
state := internal.CtxGetState(s.rootCtx)
|
||||
defer func() {
|
||||
status, err := state.Status()
|
||||
if err != nil || (status != internal.StatusNeedsLogin && status != internal.StatusLoginFailed) {
|
||||
@@ -606,6 +613,20 @@ func (s *Server) WaitSSOLogin(callerCtx context.Context, msg *proto.WaitSSOLogin
|
||||
// Up starts engine work in the daemon.
|
||||
func (s *Server) Up(callerCtx context.Context, msg *proto.UpRequest) (*proto.UpResponse, error) {
|
||||
s.mutex.Lock()
|
||||
if s.clientRunning {
|
||||
state := internal.CtxGetState(s.rootCtx)
|
||||
status, err := state.Status()
|
||||
if err != nil {
|
||||
s.mutex.Unlock()
|
||||
return nil, err
|
||||
}
|
||||
if status == internal.StatusNeedsLogin {
|
||||
s.actCancel()
|
||||
}
|
||||
s.mutex.Unlock()
|
||||
|
||||
return s.waitForUp(callerCtx)
|
||||
}
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if err := restoreResidualState(callerCtx, s.profileManager.GetStatePath()); err != nil {
|
||||
@@ -621,16 +642,16 @@ func (s *Server) Up(callerCtx context.Context, msg *proto.UpRequest) (*proto.UpR
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if status != internal.StatusIdle {
|
||||
return nil, fmt.Errorf("up already in progress: current status %s", status)
|
||||
}
|
||||
|
||||
// it should be nil here, but .
|
||||
// it should be nil here, but in case it isn't we cancel it.
|
||||
if s.actCancel != nil {
|
||||
s.actCancel()
|
||||
}
|
||||
ctx, cancel := context.WithCancel(s.rootCtx)
|
||||
|
||||
md, ok := metadata.FromIncomingContext(callerCtx)
|
||||
if ok {
|
||||
ctx = metadata.NewOutgoingContext(ctx, md)
|
||||
@@ -673,26 +694,31 @@ func (s *Server) Up(callerCtx context.Context, msg *proto.UpRequest) (*proto.UpR
|
||||
s.statusRecorder.UpdateManagementAddress(s.config.ManagementURL.String())
|
||||
s.statusRecorder.UpdateRosenpass(s.config.RosenpassEnabled, s.config.RosenpassPermissive)
|
||||
|
||||
s.clientRunning = true
|
||||
s.clientRunningChan = make(chan struct{})
|
||||
s.clientGiveUpChan = make(chan struct{})
|
||||
go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, s.clientRunningChan, s.clientGiveUpChan)
|
||||
|
||||
return s.waitForUp(callerCtx)
|
||||
}
|
||||
|
||||
// todo: handle potential race conditions
|
||||
func (s *Server) waitForUp(callerCtx context.Context) (*proto.UpResponse, error) {
|
||||
timeoutCtx, cancel := context.WithTimeout(callerCtx, 50*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if !s.clientRunning {
|
||||
s.clientRunning = true
|
||||
s.clientRunningChan = make(chan struct{}, 1)
|
||||
go s.connectWithRetryRuns(ctx, s.config, s.statusRecorder, s.clientRunningChan)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
case <-s.clientRunningChan:
|
||||
s.isSessionActive.Store(true)
|
||||
return &proto.UpResponse{}, nil
|
||||
case <-callerCtx.Done():
|
||||
log.Debug("context done, stopping the wait for engine to become ready")
|
||||
return nil, callerCtx.Err()
|
||||
case <-timeoutCtx.Done():
|
||||
log.Debug("up is timed out, stopping the wait for engine to become ready")
|
||||
return nil, timeoutCtx.Err()
|
||||
}
|
||||
select {
|
||||
case <-s.clientGiveUpChan:
|
||||
return nil, fmt.Errorf("client gave up to connect")
|
||||
case <-s.clientRunningChan:
|
||||
s.isSessionActive.Store(true)
|
||||
return &proto.UpResponse{}, nil
|
||||
case <-callerCtx.Done():
|
||||
log.Debug("context done, stopping the wait for engine to become ready")
|
||||
return nil, callerCtx.Err()
|
||||
case <-timeoutCtx.Done():
|
||||
log.Debug("up is timed out, stopping the wait for engine to become ready")
|
||||
return nil, timeoutCtx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -966,12 +992,46 @@ func (s *Server) Status(
|
||||
ctx context.Context,
|
||||
msg *proto.StatusRequest,
|
||||
) (*proto.StatusResponse, error) {
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
clientRunning := s.clientRunning
|
||||
s.mutex.Unlock()
|
||||
|
||||
if msg.WaitForReady != nil && *msg.WaitForReady && clientRunning {
|
||||
state := internal.CtxGetState(s.rootCtx)
|
||||
status, err := state.Status()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if status != internal.StatusIdle && status != internal.StatusConnected && status != internal.StatusConnecting {
|
||||
s.actCancel()
|
||||
}
|
||||
|
||||
ticker := time.NewTicker(1 * time.Second)
|
||||
defer ticker.Stop()
|
||||
loop:
|
||||
for {
|
||||
select {
|
||||
case <-s.clientGiveUpChan:
|
||||
ticker.Stop()
|
||||
break loop
|
||||
case <-s.clientRunningChan:
|
||||
ticker.Stop()
|
||||
break loop
|
||||
case <-ticker.C:
|
||||
status, err := state.Status()
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if status != internal.StatusIdle && status != internal.StatusConnected && status != internal.StatusConnecting {
|
||||
s.actCancel()
|
||||
}
|
||||
continue
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
status, err := internal.CtxGetState(s.rootCtx).Status()
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user