mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
Improve mgmt backoff
This commit is contained in:
1
go.mod
1
go.mod
@@ -40,7 +40,6 @@ require (
|
|||||||
github.com/c-robinson/iplib v1.0.3
|
github.com/c-robinson/iplib v1.0.3
|
||||||
github.com/caddyserver/certmagic v0.21.3
|
github.com/caddyserver/certmagic v0.21.3
|
||||||
github.com/cilium/ebpf v0.15.0
|
github.com/cilium/ebpf v0.15.0
|
||||||
github.com/cloudflare/backoff v0.0.0-20240920015135-e46b80a3a7d0
|
|
||||||
github.com/coder/websocket v1.8.13
|
github.com/coder/websocket v1.8.13
|
||||||
github.com/coreos/go-iptables v0.7.0
|
github.com/coreos/go-iptables v0.7.0
|
||||||
github.com/coreos/go-oidc/v3 v3.14.1
|
github.com/coreos/go-oidc/v3 v3.14.1
|
||||||
|
|||||||
2
go.sum
2
go.sum
@@ -107,8 +107,6 @@ github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UF
|
|||||||
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||||
github.com/cilium/ebpf v0.15.0 h1:7NxJhNiBT3NG8pZJ3c+yfrVdHY8ScgKD27sScgjLMMk=
|
github.com/cilium/ebpf v0.15.0 h1:7NxJhNiBT3NG8pZJ3c+yfrVdHY8ScgKD27sScgjLMMk=
|
||||||
github.com/cilium/ebpf v0.15.0/go.mod h1:DHp1WyrLeiBh19Cf/tfiSMhqheEiK8fXFZ4No0P1Hso=
|
github.com/cilium/ebpf v0.15.0/go.mod h1:DHp1WyrLeiBh19Cf/tfiSMhqheEiK8fXFZ4No0P1Hso=
|
||||||
github.com/cloudflare/backoff v0.0.0-20240920015135-e46b80a3a7d0 h1:pRcxfaAlK0vR6nOeQs7eAEvjJzdGXl8+KaBlcvpQTyQ=
|
|
||||||
github.com/cloudflare/backoff v0.0.0-20240920015135-e46b80a3a7d0/go.mod h1:rzgs2ZOiguV6/NpiDgADjRLPNyZlApIWxKpkT+X8SdY=
|
|
||||||
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
|
github.com/coder/websocket v1.8.13 h1:f3QZdXy7uGVz+4uCJy2nTZyM0yTBj8yANEHhqlXZ9FE=
|
||||||
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
|
github.com/coder/websocket v1.8.13/go.mod h1:LNVeNrXQZfe5qhS9ALED3uA+l5pPqvwXg3CKoDBB2gs=
|
||||||
github.com/containerd/containerd v1.7.29 h1:90fWABQsaN9mJhGkoVnuzEY+o1XDPbg9BTC9QTAHnuE=
|
github.com/containerd/containerd v1.7.29 h1:90fWABQsaN9mJhGkoVnuzEY+o1XDPbg9BTC9QTAHnuE=
|
||||||
|
|||||||
@@ -21,7 +21,7 @@ import (
|
|||||||
"path/filepath"
|
"path/filepath"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cloudflare/backoff"
|
backoff "github.com/cenkalti/backoff/v4"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
@@ -295,12 +295,21 @@ func (s *Server) ListenAndServe(ctx context.Context, addr string) (err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) newManagementMappingWorker(ctx context.Context, client proto.ProxyServiceClient) {
|
func (s *Server) newManagementMappingWorker(ctx context.Context, client proto.ProxyServiceClient) {
|
||||||
b := backoff.New(0, 0)
|
bo := &backoff.ExponentialBackOff{
|
||||||
initialSyncDone := false
|
InitialInterval: 800 * time.Millisecond,
|
||||||
for {
|
RandomizationFactor: 1,
|
||||||
s.Logger.Debug("Getting mapping updates from management server")
|
Multiplier: 1.7,
|
||||||
|
MaxInterval: 10 * time.Second,
|
||||||
|
MaxElapsedTime: 0, // retry indefinitely until context is canceled
|
||||||
|
Stop: backoff.Stop,
|
||||||
|
Clock: backoff.SystemClock,
|
||||||
|
}
|
||||||
|
|
||||||
|
initialSyncDone := false
|
||||||
|
|
||||||
|
operation := func() error {
|
||||||
|
s.Logger.Debug("connecting to management mapping stream")
|
||||||
|
|
||||||
// Mark management as disconnected while we're attempting to reconnect.
|
|
||||||
if s.healthChecker != nil {
|
if s.healthChecker != nil {
|
||||||
s.healthChecker.SetManagementConnected(false)
|
s.healthChecker.SetManagementConnected(false)
|
||||||
}
|
}
|
||||||
@@ -312,47 +321,36 @@ func (s *Server) newManagementMappingWorker(ctx context.Context, client proto.Pr
|
|||||||
Address: s.ProxyURL,
|
Address: s.ProxyURL,
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
s.Logger.WithError(err).Warn("Could not get mapping updates, will retry")
|
return fmt.Errorf("create mapping stream: %w", err)
|
||||||
backoffDuration := b.Duration()
|
|
||||||
s.Logger.WithFields(log.Fields{
|
|
||||||
"backoff": backoffDuration,
|
|
||||||
"error": err,
|
|
||||||
}).Error("Unable to create mapping client to management server, retrying connection after backoff")
|
|
||||||
time.Sleep(backoffDuration)
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Mark management as connected once stream is established.
|
|
||||||
if s.healthChecker != nil {
|
if s.healthChecker != nil {
|
||||||
s.healthChecker.SetManagementConnected(true)
|
s.healthChecker.SetManagementConnected(true)
|
||||||
}
|
}
|
||||||
s.Logger.Debug("Got mapping updates client from management server")
|
s.Logger.Debug("management mapping stream established")
|
||||||
|
|
||||||
err = s.handleMappingStream(ctx, mappingClient, &initialSyncDone)
|
// Stream established — reset backoff so the next failure retries quickly.
|
||||||
|
bo.Reset()
|
||||||
|
|
||||||
|
streamErr := s.handleMappingStream(ctx, mappingClient, &initialSyncDone)
|
||||||
|
|
||||||
if s.healthChecker != nil {
|
if s.healthChecker != nil {
|
||||||
s.healthChecker.SetManagementConnected(false)
|
s.healthChecker.SetManagementConnected(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
backoffDuration := b.Duration()
|
if streamErr == nil {
|
||||||
switch {
|
return fmt.Errorf("stream closed by server")
|
||||||
case errors.Is(err, context.Canceled),
|
|
||||||
errors.Is(err, context.DeadlineExceeded):
|
|
||||||
// Context is telling us that it is time to quit so gracefully exit here.
|
|
||||||
// No need to log the error as it is a parent context causing this return.
|
|
||||||
s.Logger.Debugf("Got context error, will exit loop: %v", err)
|
|
||||||
return
|
|
||||||
case err != nil:
|
|
||||||
// Log the error and then retry the connection.
|
|
||||||
s.Logger.WithFields(log.Fields{
|
|
||||||
"backoff": backoffDuration,
|
|
||||||
"error": err,
|
|
||||||
}).Error("Error processing mapping stream from management server, retrying connection after backoff")
|
|
||||||
default:
|
|
||||||
// TODO: should this really be at error level? Maybe, if you start getting lots of these this could be an indication of connectivity issues.
|
|
||||||
s.Logger.WithField("backoff", backoffDuration).Error("Management mapping connection terminated by the server, retrying connection after backoff")
|
|
||||||
}
|
}
|
||||||
time.Sleep(backoffDuration)
|
|
||||||
|
return fmt.Errorf("mapping stream: %w", streamErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
notify := func(err error, next time.Duration) {
|
||||||
|
s.Logger.Warnf("management connection failed, retrying in %s: %v", next.Truncate(time.Millisecond), err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := backoff.RetryNotify(operation, backoff.WithContext(bo, ctx), notify); err != nil {
|
||||||
|
s.Logger.WithError(err).Debug("management mapping worker exiting")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user