Files
netbird/proxy/internal/acme/locker_k8s.go
Pascal Fischer f53155562f [management, reverse proxy] Add reverse proxy feature (#5291)
* implement reverse proxy


---------

Co-authored-by: Alisdair MacLeod <git@alisdairmacleod.co.uk>
Co-authored-by: mlsmaycon <mlsmaycon@gmail.com>
Co-authored-by: Eduard Gert <kontakt@eduardgert.de>
Co-authored-by: Viktor Liu <viktor@netbird.io>
Co-authored-by: Diego Noguês <diego.sure@gmail.com>
Co-authored-by: Diego Noguês <49420+diegocn@users.noreply.github.com>
Co-authored-by: Bethuel Mmbaga <bethuelmbaga12@gmail.com>
Co-authored-by: Zoltan Papp <zoltan.pmail@gmail.com>
Co-authored-by: Ashley Mensah <ashleyamo982@gmail.com>
2026-02-13 19:37:43 +01:00

198 lines
5.2 KiB
Go

package acme
import (
"context"
"errors"
"fmt"
"os"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/proxy/internal/k8s"
)
const (
// leaseDurationSec is the Kubernetes Lease TTL. If the holder crashes without
// releasing the lock, other replicas must wait this long before taking over.
// This is intentionally generous: in the worst case two replicas may both
// issue an ACME request for the same domain, which is harmless (the CA
// deduplicates and the cache converges).
leaseDurationSec = 300
retryBaseBackoff = 500 * time.Millisecond
retryMaxBackoff = 10 * time.Second
)
type k8sLeaseLocker struct {
client *k8s.LeaseClient
identity string
logger *log.Logger
}
func newK8sLeaseLocker(logger *log.Logger) (*k8sLeaseLocker, error) {
client, err := k8s.NewLeaseClient()
if err != nil {
return nil, fmt.Errorf("create k8s lease client: %w", err)
}
identity, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("get hostname: %w", err)
}
return &k8sLeaseLocker{
client: client,
identity: identity,
logger: logger,
}, nil
}
// Lock acquires a Kubernetes Lease for the given domain using optimistic
// concurrency. It retries with exponential backoff until the lease is
// acquired or the context is cancelled.
func (l *k8sLeaseLocker) Lock(ctx context.Context, domain string) (func(), error) {
leaseName := k8s.LeaseNameForDomain(domain)
backoff := retryBaseBackoff
for {
acquired, err := l.tryAcquire(ctx, leaseName, domain)
if err != nil {
return nil, fmt.Errorf("acquire lease %s for %q: %w", leaseName, domain, err)
}
if acquired {
l.logger.Debugf("k8s lease %s acquired for domain %q", leaseName, domain)
return l.unlockFunc(leaseName, domain), nil
}
l.logger.Debugf("k8s lease %s held by another replica, retrying in %s", leaseName, backoff)
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return nil, ctx.Err()
case <-timer.C:
}
backoff *= 2
if backoff > retryMaxBackoff {
backoff = retryMaxBackoff
}
}
}
// tryAcquire attempts to create or take over a Lease. Returns (true, nil)
// on success, (false, nil) if the lease is held and not stale, or an error.
func (l *k8sLeaseLocker) tryAcquire(ctx context.Context, name, domain string) (bool, error) {
existing, err := l.client.Get(ctx, name)
if err != nil {
return false, err
}
now := k8s.MicroTime{Time: time.Now().UTC()}
dur := int32(leaseDurationSec)
if existing == nil {
lease := &k8s.Lease{
Metadata: k8s.LeaseMetadata{
Name: name,
Annotations: map[string]string{
"netbird.io/domain": domain,
},
},
Spec: k8s.LeaseSpec{
HolderIdentity: &l.identity,
LeaseDurationSeconds: &dur,
AcquireTime: &now,
RenewTime: &now,
},
}
if _, err := l.client.Create(ctx, lease); errors.Is(err, k8s.ErrConflict) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
if !l.canTakeover(existing) {
return false, nil
}
existing.Spec.HolderIdentity = &l.identity
existing.Spec.LeaseDurationSeconds = &dur
existing.Spec.AcquireTime = &now
existing.Spec.RenewTime = &now
if _, err := l.client.Update(ctx, existing); errors.Is(err, k8s.ErrConflict) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
// canTakeover returns true if the lease is free (no holder) or stale
// (renewTime + leaseDuration has passed).
func (l *k8sLeaseLocker) canTakeover(lease *k8s.Lease) bool {
holder := lease.Spec.HolderIdentity
if holder == nil || *holder == "" {
return true
}
// We already hold it (e.g. from a previous crashed attempt).
if *holder == l.identity {
return true
}
if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil {
return true
}
expiry := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second)
if time.Now().After(expiry) {
l.logger.Infof("k8s lease %s held by %q is stale (expired %s ago), taking over",
lease.Metadata.Name, *holder, time.Since(expiry).Round(time.Second))
return true
}
return false
}
// unlockFunc returns a closure that releases the lease by clearing the holder.
func (l *k8sLeaseLocker) unlockFunc(name, domain string) func() {
return func() {
// Use a fresh context: the parent may already be cancelled.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Re-GET to get current resourceVersion (ours may be stale if
// the lock was held for a long time and something updated it).
current, err := l.client.Get(ctx, name)
if err != nil {
l.logger.Debugf("release k8s lease %s for %q: get: %v", name, domain, err)
return
}
if current == nil {
return
}
// Only clear if we're still the holder.
if current.Spec.HolderIdentity == nil || *current.Spec.HolderIdentity != l.identity {
l.logger.Debugf("k8s lease %s for %q: holder changed to %v, skip release",
name, domain, current.Spec.HolderIdentity)
return
}
empty := ""
current.Spec.HolderIdentity = &empty
current.Spec.AcquireTime = nil
current.Spec.RenewTime = nil
if _, err := l.client.Update(ctx, current); err != nil {
l.logger.Debugf("release k8s lease %s for %q: update: %v", name, domain, err)
}
}
}