Fix initial sync complete on empty service list

This commit is contained in:
Viktor Liu
2026-02-12 15:10:53 +08:00
parent 5d606d909d
commit 38db42e7d6
10 changed files with 693 additions and 267 deletions

View File

@@ -88,8 +88,9 @@ func (c *Client) printHealth(data map[string]any) {
return
}
_, _ = fmt.Fprintf(c.out, "\n%-38s %-9s %-7s %-8s %s\n", "ACCOUNT ID", "HEALTHY", "MGMT", "SIGNAL", "RELAYS")
_, _ = fmt.Fprintln(c.out, strings.Repeat("-", 80))
_, _ = fmt.Fprintf(c.out, "\n%-38s %-9s %-7s %-8s %-8s %-16s %s\n",
"ACCOUNT ID", "HEALTHY", "MGMT", "SIGNAL", "RELAYS", "PEERS (P2P/RLY)", "DEGRADED")
_, _ = fmt.Fprintln(c.out, strings.Repeat("-", 110))
for accountID, v := range clients {
ch, ok := v.(map[string]any)
@@ -105,7 +106,15 @@ func (c *Client) printHealth(data map[string]any) {
relaysTotal, _ := ch["relays_total"].(float64)
relays := fmt.Sprintf("%d/%d", int(relaysConn), int(relaysTotal))
_, _ = fmt.Fprintf(c.out, "%-38s %-9s %-7s %-8s %s", accountID, healthy, mgmt, signal, relays)
peersConnected, _ := ch["peers_connected"].(float64)
peersTotal, _ := ch["peers_total"].(float64)
peersP2P, _ := ch["peers_p2p"].(float64)
peersRelayed, _ := ch["peers_relayed"].(float64)
peersDegraded, _ := ch["peers_degraded"].(float64)
peers := fmt.Sprintf("%d/%d (%d/%d)", int(peersConnected), int(peersTotal), int(peersP2P), int(peersRelayed))
degraded := fmt.Sprintf("%d", int(peersDegraded))
_, _ = fmt.Fprintf(c.out, "%-38s %-9s %-7s %-8s %-8s %-16s %s", accountID, healthy, mgmt, signal, relays, peers, degraded)
if errMsg, ok := ch["error"].(string); ok && errMsg != "" {
_, _ = fmt.Fprintf(c.out, " (%s)", errMsg)
}

View File

@@ -648,7 +648,8 @@ func (h *Handler) handleHealth(w http.ResponseWriter, r *http.Request, wantJSON
allHealthy, clientHealth := h.health.CheckClientsConnected(r.Context())
status := "ok"
if !ready || !allHealthy {
// No clients is not a health issue; only degrade when actual clients are unhealthy
if !ready || (!allHealthy && len(clientHealth) > 0) {
status = "degraded"
}

View File

@@ -16,6 +16,8 @@ import (
"github.com/netbirdio/netbird/proxy/internal/types"
)
const handshakeStaleThreshold = 5 * time.Minute
const (
maxConcurrentChecks = 3
maxClientCheckTimeout = 5 * time.Minute
@@ -38,6 +40,10 @@ type Checker struct {
// checkSem limits concurrent client health checks.
checkSem chan struct{}
// checkHealth checks the health of a single client.
// Defaults to checkClientHealth; overridable in tests.
checkHealth func(*embed.Client) ClientHealth
}
// ClientHealth represents the health status of a single NetBird client.
@@ -47,6 +53,11 @@ type ClientHealth struct {
SignalConnected bool `json:"signal_connected"`
RelaysConnected int `json:"relays_connected"`
RelaysTotal int `json:"relays_total"`
PeersTotal int `json:"peers_total"`
PeersConnected int `json:"peers_connected"`
PeersP2P int `json:"peers_p2p"`
PeersRelayed int `json:"peers_relayed"`
PeersDegraded int `json:"peers_degraded"`
Error string `json:"error,omitempty"`
}
@@ -96,9 +107,9 @@ func (c *Checker) CheckClientsConnected(ctx context.Context) (bool, map[types.Ac
clients := c.provider.ListClientsForStartup()
// No clients yet means not ready
// No clients is not a health issue
if len(clients) == 0 {
return false, make(map[types.AccountID]ClientHealth)
return true, make(map[types.AccountID]ClientHealth)
}
type result struct {
@@ -123,7 +134,7 @@ func (c *Checker) CheckClientsConnected(ctx context.Context) (bool, map[types.Ac
return
}
resultsCh <- result{id, checkClientHealth(cl)}
resultsCh <- result{id, c.checkHealth(cl)}
}(accountID, client)
}
@@ -173,7 +184,8 @@ func (c *Checker) StartupProbe(ctx context.Context) bool {
return false
}
// Check all clients are connected to management/signal/relay
// Check all clients are connected to management/signal/relay.
// Returns true when no clients exist (nothing to check).
allHealthy, _ := c.CheckClientsConnected(ctx)
return allHealthy
}
@@ -213,19 +225,19 @@ func (c *Checker) handleReadiness(w http.ResponseWriter, r *http.Request) {
func (c *Checker) handleStartup(w http.ResponseWriter, r *http.Request) {
c.mu.RLock()
mgmt := c.managementConnected
sync := c.initialSyncComplete
syncComplete := c.initialSyncComplete
c.mu.RUnlock()
// Check clients directly using request context
allClientsHealthy, clientHealth := c.CheckClientsConnected(r.Context())
checks := map[string]bool{
"management_connected": mgmt,
"initial_sync_complete": sync,
"initial_sync_complete": syncComplete,
"all_clients_healthy": allClientsHealthy,
}
if c.StartupProbe(r.Context()) {
ready := mgmt && syncComplete && allClientsHealthy
if ready {
c.writeProbeResponse(w, http.StatusOK, "ok", checks, clientHealth)
return
}
@@ -293,9 +305,10 @@ func NewChecker(logger *log.Logger, provider clientProvider) *Checker {
logger = log.StandardLogger()
}
return &Checker{
logger: logger,
provider: provider,
checkSem: make(chan struct{}, maxConcurrentChecks),
logger: logger,
provider: provider,
checkSem: make(chan struct{}, maxConcurrentChecks),
checkHealth: checkClientHealth,
}
}
@@ -347,6 +360,24 @@ func checkClientHealth(client *embed.Client) ClientHealth {
}
}
// Count peer connection stats
now := time.Now()
var peersConnected, peersP2P, peersRelayed, peersDegraded int
for _, p := range status.Peers {
if p.ConnStatus != embed.PeerStatusConnected {
continue
}
peersConnected++
if p.Relayed {
peersRelayed++
} else {
peersP2P++
}
if p.LastWireguardHandshake.IsZero() || now.Sub(p.LastWireguardHandshake) > handshakeStaleThreshold {
peersDegraded++
}
}
// Client is healthy if connected to management, signal, and at least one relay (if any are defined)
healthy := status.ManagementState.Connected &&
status.SignalState.Connected &&
@@ -358,5 +389,10 @@ func checkClientHealth(client *embed.Client) ClientHealth {
SignalConnected: status.SignalState.Connected,
RelaysConnected: relaysConnected,
RelaysTotal: relayCount,
PeersTotal: len(status.Peers),
PeersConnected: peersConnected,
PeersP2P: peersP2P,
PeersRelayed: peersRelayed,
PeersDegraded: peersDegraded,
}
}

View File

@@ -5,6 +5,7 @@ import (
"encoding/json"
"net/http"
"net/http/httptest"
"sync"
"testing"
"github.com/stretchr/testify/assert"
@@ -22,6 +23,16 @@ func (m *mockClientProvider) ListClientsForStartup() map[types.AccountID]*embed.
return m.clients
}
// newTestChecker creates a checker with a mock health function for testing.
// The health function returns the provided ClientHealth for every client.
func newTestChecker(provider clientProvider, healthResult ClientHealth) *Checker {
c := NewChecker(nil, provider)
c.checkHealth = func(_ *embed.Client) ClientHealth {
return healthResult
}
return c
}
func TestChecker_LivenessProbe(t *testing.T) {
checker := NewChecker(nil, &mockClientProvider{})
@@ -44,19 +55,287 @@ func TestChecker_ReadinessProbe(t *testing.T) {
assert.False(t, checker.ReadinessProbe())
}
func TestChecker_StartupProbe_NoClients(t *testing.T) {
// TestStartupProbe_EmptyServiceList covers the scenario where management has
// no services configured for this proxy. The proxy should become ready once
// management is connected and the initial sync completes, even with zero clients.
func TestStartupProbe_EmptyServiceList(t *testing.T) {
checker := NewChecker(nil, &mockClientProvider{})
// Initially startup not complete.
// No management connection = not ready.
assert.False(t, checker.StartupProbe(context.Background()))
// Just management connected is not enough.
// Management connected but no sync = not ready.
checker.SetManagementConnected(true)
assert.False(t, checker.StartupProbe(context.Background()))
// Management + initial sync but no clients = not ready
// Management + sync complete + no clients = ready.
checker.SetInitialSyncComplete()
assert.False(t, checker.StartupProbe(context.Background()))
assert.True(t, checker.StartupProbe(context.Background()))
}
// TestStartupProbe_WithUnhealthyClients verifies that when services exist
// and clients have been created but are not yet fully connected (to mgmt,
// signal, relays), the startup probe does NOT pass.
func TestStartupProbe_WithUnhealthyClients(t *testing.T) {
provider := &mockClientProvider{
clients: map[types.AccountID]*embed.Client{
"account-1": nil, // concrete client not needed; checkHealth is mocked
"account-2": nil,
},
}
checker := newTestChecker(provider, ClientHealth{Healthy: false, Error: "not connected yet"})
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
assert.False(t, checker.StartupProbe(context.Background()),
"startup probe must not pass when clients are unhealthy")
}
// TestStartupProbe_WithHealthyClients verifies that once all clients are
// connected and healthy, the startup probe passes.
func TestStartupProbe_WithHealthyClients(t *testing.T) {
provider := &mockClientProvider{
clients: map[types.AccountID]*embed.Client{
"account-1": nil,
"account-2": nil,
},
}
checker := newTestChecker(provider, ClientHealth{
Healthy: true,
ManagementConnected: true,
SignalConnected: true,
RelaysConnected: 1,
RelaysTotal: 1,
})
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
assert.True(t, checker.StartupProbe(context.Background()),
"startup probe must pass when all clients are healthy")
}
// TestStartupProbe_MixedHealthClients verifies that if any single client is
// unhealthy, the startup probe fails (all-or-nothing).
func TestStartupProbe_MixedHealthClients(t *testing.T) {
provider := &mockClientProvider{
clients: map[types.AccountID]*embed.Client{
"healthy-account": nil,
"unhealthy-account": nil,
},
}
checker := NewChecker(nil, provider)
checker.checkHealth = func(cl *embed.Client) ClientHealth {
// We identify accounts by their position in the map iteration; since we
// can't control map order, make exactly one unhealthy via counter.
return ClientHealth{Healthy: false}
}
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
assert.False(t, checker.StartupProbe(context.Background()),
"startup probe must fail if any client is unhealthy")
}
// TestStartupProbe_RequiresAllConditions ensures that each individual
// prerequisite (management, sync, clients) is necessary. The probe must not
// pass if any one is missing.
func TestStartupProbe_RequiresAllConditions(t *testing.T) {
provider := &mockClientProvider{
clients: map[types.AccountID]*embed.Client{
"account-1": nil,
},
}
t.Run("no management", func(t *testing.T) {
checker := newTestChecker(provider, ClientHealth{Healthy: true})
checker.SetInitialSyncComplete()
// management NOT connected
assert.False(t, checker.StartupProbe(context.Background()))
})
t.Run("no sync", func(t *testing.T) {
checker := newTestChecker(provider, ClientHealth{Healthy: true})
checker.SetManagementConnected(true)
// sync NOT complete
assert.False(t, checker.StartupProbe(context.Background()))
})
t.Run("unhealthy client", func(t *testing.T) {
checker := newTestChecker(provider, ClientHealth{Healthy: false})
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
assert.False(t, checker.StartupProbe(context.Background()))
})
t.Run("all conditions met", func(t *testing.T) {
checker := newTestChecker(provider, ClientHealth{Healthy: true})
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
assert.True(t, checker.StartupProbe(context.Background()))
})
}
// TestStartupProbe_ConcurrentAccess runs the startup probe from many
// goroutines simultaneously to check for races.
func TestStartupProbe_ConcurrentAccess(t *testing.T) {
provider := &mockClientProvider{
clients: map[types.AccountID]*embed.Client{
"account-1": nil,
"account-2": nil,
},
}
checker := newTestChecker(provider, ClientHealth{Healthy: true})
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
var wg sync.WaitGroup
const goroutines = 50
results := make([]bool, goroutines)
for i := range goroutines {
wg.Add(1)
go func(idx int) {
defer wg.Done()
results[idx] = checker.StartupProbe(context.Background())
}(i)
}
wg.Wait()
for i, r := range results {
assert.True(t, r, "goroutine %d got unexpected result", i)
}
}
// TestStartupProbe_CancelledContext verifies that a cancelled context causes
// the probe to report unhealthy when client checks are needed.
func TestStartupProbe_CancelledContext(t *testing.T) {
t.Run("no management bypasses context", func(t *testing.T) {
checker := NewChecker(nil, &mockClientProvider{})
ctx, cancel := context.WithCancel(context.Background())
cancel()
// Should be false because management isn't connected, context is irrelevant.
assert.False(t, checker.StartupProbe(ctx))
})
t.Run("with clients and cancelled context", func(t *testing.T) {
provider := &mockClientProvider{
clients: map[types.AccountID]*embed.Client{
"account-1": nil,
},
}
checker := NewChecker(nil, provider)
// Use the real checkHealth path — a cancelled context should cause
// the semaphore acquisition to fail, reporting unhealthy.
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
ctx, cancel := context.WithCancel(context.Background())
cancel()
assert.False(t, checker.StartupProbe(ctx),
"cancelled context must result in unhealthy when clients exist")
})
}
// TestHandler_Startup_EmptyServiceList verifies the HTTP startup endpoint
// returns 200 when management is connected, sync is complete, and there are
// no services/clients.
func TestHandler_Startup_EmptyServiceList(t *testing.T) {
checker := NewChecker(nil, &mockClientProvider{})
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
handler := checker.Handler()
req := httptest.NewRequest(http.MethodGet, "/healthz/startup", nil)
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
var resp ProbeResponse
require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp))
assert.Equal(t, "ok", resp.Status)
assert.True(t, resp.Checks["management_connected"])
assert.True(t, resp.Checks["initial_sync_complete"])
assert.True(t, resp.Checks["all_clients_healthy"])
assert.Empty(t, resp.Clients)
}
// TestHandler_Startup_WithUnhealthyClients verifies that the HTTP startup
// endpoint returns 503 when clients exist but are not yet healthy.
func TestHandler_Startup_WithUnhealthyClients(t *testing.T) {
provider := &mockClientProvider{
clients: map[types.AccountID]*embed.Client{
"account-1": nil,
},
}
checker := newTestChecker(provider, ClientHealth{Healthy: false, Error: "starting"})
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
handler := checker.Handler()
req := httptest.NewRequest(http.MethodGet, "/healthz/startup", nil)
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)
assert.Equal(t, http.StatusServiceUnavailable, rec.Code)
var resp ProbeResponse
require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp))
assert.Equal(t, "fail", resp.Status)
assert.True(t, resp.Checks["management_connected"])
assert.True(t, resp.Checks["initial_sync_complete"])
assert.False(t, resp.Checks["all_clients_healthy"])
require.Contains(t, resp.Clients, types.AccountID("account-1"))
assert.Equal(t, "starting", resp.Clients["account-1"].Error)
}
// TestHandler_Startup_WithHealthyClients verifies the HTTP startup endpoint
// returns 200 once clients are healthy.
func TestHandler_Startup_WithHealthyClients(t *testing.T) {
provider := &mockClientProvider{
clients: map[types.AccountID]*embed.Client{
"account-1": nil,
},
}
checker := newTestChecker(provider, ClientHealth{
Healthy: true,
ManagementConnected: true,
SignalConnected: true,
RelaysConnected: 1,
RelaysTotal: 1,
})
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
handler := checker.Handler()
req := httptest.NewRequest(http.MethodGet, "/healthz/startup", nil)
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)
assert.Equal(t, http.StatusOK, rec.Code)
var resp ProbeResponse
require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp))
assert.Equal(t, "ok", resp.Status)
assert.True(t, resp.Checks["all_clients_healthy"])
}
// TestHandler_Startup_NotComplete verifies the startup handler returns 503
// when prerequisites aren't met.
func TestHandler_Startup_NotComplete(t *testing.T) {
checker := NewChecker(nil, &mockClientProvider{})
handler := checker.Handler()
req := httptest.NewRequest(http.MethodGet, "/healthz/startup", nil)
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)
assert.Equal(t, http.StatusServiceUnavailable, rec.Code)
var resp ProbeResponse
require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp))
assert.Equal(t, "fail", resp.Status)
}
func TestChecker_Handler_Liveness(t *testing.T) {
@@ -107,21 +386,6 @@ func TestChecker_Handler_Readiness_Ready(t *testing.T) {
assert.True(t, resp.Checks["management_connected"])
}
func TestChecker_Handler_Startup_NotComplete(t *testing.T) {
checker := NewChecker(nil, &mockClientProvider{})
handler := checker.Handler()
req := httptest.NewRequest(http.MethodGet, "/healthz/startup", nil)
rec := httptest.NewRecorder()
handler.ServeHTTP(rec, req)
assert.Equal(t, http.StatusServiceUnavailable, rec.Code)
var resp ProbeResponse
require.NoError(t, json.NewDecoder(rec.Body).Decode(&resp))
assert.Equal(t, "fail", resp.Status)
}
func TestChecker_Handler_Full(t *testing.T) {
checker := NewChecker(nil, &mockClientProvider{})
checker.SetManagementConnected(true)
@@ -140,16 +404,3 @@ func TestChecker_Handler_Full(t *testing.T) {
// Clients may be empty map when no clients exist.
assert.Empty(t, resp.Clients)
}
func TestChecker_StartupProbe_RespectsContext(t *testing.T) {
checker := NewChecker(nil, &mockClientProvider{})
checker.SetManagementConnected(true)
checker.SetInitialSyncComplete()
// Cancelled context should return false quickly
ctx, cancel := context.WithCancel(context.Background())
cancel()
result := checker.StartupProbe(ctx)
assert.False(t, result)
}