[management, reverse proxy] Add reverse proxy feature (#5291)

* implement reverse proxy


---------

Co-authored-by: Alisdair MacLeod <git@alisdairmacleod.co.uk>
Co-authored-by: mlsmaycon <mlsmaycon@gmail.com>
Co-authored-by: Eduard Gert <kontakt@eduardgert.de>
Co-authored-by: Viktor Liu <viktor@netbird.io>
Co-authored-by: Diego Noguês <diego.sure@gmail.com>
Co-authored-by: Diego Noguês <49420+diegocn@users.noreply.github.com>
Co-authored-by: Bethuel Mmbaga <bethuelmbaga12@gmail.com>
Co-authored-by: Zoltan Papp <zoltan.pmail@gmail.com>
Co-authored-by: Ashley Mensah <ashleyamo982@gmail.com>
This commit is contained in:
Pascal Fischer
2026-02-13 19:37:43 +01:00
committed by GitHub
parent edce11b34d
commit f53155562f
225 changed files with 35513 additions and 235 deletions

View File

@@ -0,0 +1,102 @@
package acme
import (
"context"
"path/filepath"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/proxy/internal/flock"
"github.com/netbirdio/netbird/proxy/internal/k8s"
)
// certLocker provides distributed mutual exclusion for certificate operations.
// Implementations must be safe for concurrent use from multiple goroutines.
type certLocker interface {
// Lock acquires an exclusive lock for the given domain.
// It blocks until the lock is acquired, the context is cancelled, or an
// unrecoverable error occurs. The returned function releases the lock;
// callers must call it exactly once when the critical section is complete.
Lock(ctx context.Context, domain string) (unlock func(), err error)
}
// CertLockMethod controls how ACME certificate locks are coordinated.
type CertLockMethod string
const (
// CertLockAuto detects the environment and selects k8s-lease if running
// in a Kubernetes pod, otherwise flock.
CertLockAuto CertLockMethod = "auto"
// CertLockFlock uses advisory file locks via flock(2).
CertLockFlock CertLockMethod = "flock"
// CertLockK8sLease uses Kubernetes coordination Leases.
CertLockK8sLease CertLockMethod = "k8s-lease"
)
func newCertLocker(method CertLockMethod, certDir string, logger *log.Logger) certLocker {
if logger == nil {
logger = log.StandardLogger()
}
if method == "" || method == CertLockAuto {
if k8s.InCluster() {
method = CertLockK8sLease
} else {
method = CertLockFlock
}
logger.Infof("auto-detected cert lock method: %s", method)
}
switch method {
case CertLockK8sLease:
locker, err := newK8sLeaseLocker(logger)
if err != nil {
logger.Warnf("create k8s lease locker, falling back to flock: %v", err)
return newFlockLocker(certDir, logger)
}
logger.Infof("using k8s lease locker in namespace %s", locker.client.Namespace())
return locker
default:
logger.Infof("using flock cert locker in %s", certDir)
return newFlockLocker(certDir, logger)
}
}
type flockLocker struct {
certDir string
logger *log.Logger
}
func newFlockLocker(certDir string, logger *log.Logger) *flockLocker {
if logger == nil {
logger = log.StandardLogger()
}
return &flockLocker{certDir: certDir, logger: logger}
}
// Lock acquires an advisory file lock for the given domain.
func (l *flockLocker) Lock(ctx context.Context, domain string) (func(), error) {
lockPath := filepath.Join(l.certDir, domain+".lock")
lockFile, err := flock.Lock(ctx, lockPath)
if err != nil {
return nil, err
}
// nil lockFile means locking is not supported (non-unix).
if lockFile == nil {
return func() { /* no-op: locking unsupported on this platform */ }, nil
}
return func() {
if err := flock.Unlock(lockFile); err != nil {
l.logger.Debugf("release cert lock for domain %q: %v", domain, err)
}
}, nil
}
type noopLocker struct{}
// Lock is a no-op that always succeeds immediately.
func (noopLocker) Lock(context.Context, string) (func(), error) {
return func() { /* no-op: locker disabled */ }, nil
}

View File

@@ -0,0 +1,197 @@
package acme
import (
"context"
"errors"
"fmt"
"os"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/proxy/internal/k8s"
)
const (
// leaseDurationSec is the Kubernetes Lease TTL. If the holder crashes without
// releasing the lock, other replicas must wait this long before taking over.
// This is intentionally generous: in the worst case two replicas may both
// issue an ACME request for the same domain, which is harmless (the CA
// deduplicates and the cache converges).
leaseDurationSec = 300
retryBaseBackoff = 500 * time.Millisecond
retryMaxBackoff = 10 * time.Second
)
type k8sLeaseLocker struct {
client *k8s.LeaseClient
identity string
logger *log.Logger
}
func newK8sLeaseLocker(logger *log.Logger) (*k8sLeaseLocker, error) {
client, err := k8s.NewLeaseClient()
if err != nil {
return nil, fmt.Errorf("create k8s lease client: %w", err)
}
identity, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("get hostname: %w", err)
}
return &k8sLeaseLocker{
client: client,
identity: identity,
logger: logger,
}, nil
}
// Lock acquires a Kubernetes Lease for the given domain using optimistic
// concurrency. It retries with exponential backoff until the lease is
// acquired or the context is cancelled.
func (l *k8sLeaseLocker) Lock(ctx context.Context, domain string) (func(), error) {
leaseName := k8s.LeaseNameForDomain(domain)
backoff := retryBaseBackoff
for {
acquired, err := l.tryAcquire(ctx, leaseName, domain)
if err != nil {
return nil, fmt.Errorf("acquire lease %s for %q: %w", leaseName, domain, err)
}
if acquired {
l.logger.Debugf("k8s lease %s acquired for domain %q", leaseName, domain)
return l.unlockFunc(leaseName, domain), nil
}
l.logger.Debugf("k8s lease %s held by another replica, retrying in %s", leaseName, backoff)
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return nil, ctx.Err()
case <-timer.C:
}
backoff *= 2
if backoff > retryMaxBackoff {
backoff = retryMaxBackoff
}
}
}
// tryAcquire attempts to create or take over a Lease. Returns (true, nil)
// on success, (false, nil) if the lease is held and not stale, or an error.
func (l *k8sLeaseLocker) tryAcquire(ctx context.Context, name, domain string) (bool, error) {
existing, err := l.client.Get(ctx, name)
if err != nil {
return false, err
}
now := k8s.MicroTime{Time: time.Now().UTC()}
dur := int32(leaseDurationSec)
if existing == nil {
lease := &k8s.Lease{
Metadata: k8s.LeaseMetadata{
Name: name,
Annotations: map[string]string{
"netbird.io/domain": domain,
},
},
Spec: k8s.LeaseSpec{
HolderIdentity: &l.identity,
LeaseDurationSeconds: &dur,
AcquireTime: &now,
RenewTime: &now,
},
}
if _, err := l.client.Create(ctx, lease); errors.Is(err, k8s.ErrConflict) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
if !l.canTakeover(existing) {
return false, nil
}
existing.Spec.HolderIdentity = &l.identity
existing.Spec.LeaseDurationSeconds = &dur
existing.Spec.AcquireTime = &now
existing.Spec.RenewTime = &now
if _, err := l.client.Update(ctx, existing); errors.Is(err, k8s.ErrConflict) {
return false, nil
} else if err != nil {
return false, err
}
return true, nil
}
// canTakeover returns true if the lease is free (no holder) or stale
// (renewTime + leaseDuration has passed).
func (l *k8sLeaseLocker) canTakeover(lease *k8s.Lease) bool {
holder := lease.Spec.HolderIdentity
if holder == nil || *holder == "" {
return true
}
// We already hold it (e.g. from a previous crashed attempt).
if *holder == l.identity {
return true
}
if lease.Spec.RenewTime == nil || lease.Spec.LeaseDurationSeconds == nil {
return true
}
expiry := lease.Spec.RenewTime.Add(time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second)
if time.Now().After(expiry) {
l.logger.Infof("k8s lease %s held by %q is stale (expired %s ago), taking over",
lease.Metadata.Name, *holder, time.Since(expiry).Round(time.Second))
return true
}
return false
}
// unlockFunc returns a closure that releases the lease by clearing the holder.
func (l *k8sLeaseLocker) unlockFunc(name, domain string) func() {
return func() {
// Use a fresh context: the parent may already be cancelled.
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// Re-GET to get current resourceVersion (ours may be stale if
// the lock was held for a long time and something updated it).
current, err := l.client.Get(ctx, name)
if err != nil {
l.logger.Debugf("release k8s lease %s for %q: get: %v", name, domain, err)
return
}
if current == nil {
return
}
// Only clear if we're still the holder.
if current.Spec.HolderIdentity == nil || *current.Spec.HolderIdentity != l.identity {
l.logger.Debugf("k8s lease %s for %q: holder changed to %v, skip release",
name, domain, current.Spec.HolderIdentity)
return
}
empty := ""
current.Spec.HolderIdentity = &empty
current.Spec.AcquireTime = nil
current.Spec.RenewTime = nil
if _, err := l.client.Update(ctx, current); err != nil {
l.logger.Debugf("release k8s lease %s for %q: update: %v", name, domain, err)
}
}
}

View File

@@ -0,0 +1,65 @@
package acme
import (
"context"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestFlockLockerRoundTrip(t *testing.T) {
dir := t.TempDir()
locker := newFlockLocker(dir, nil)
unlock, err := locker.Lock(context.Background(), "example.com")
require.NoError(t, err)
require.NotNil(t, unlock)
// Lock file should exist.
assert.FileExists(t, filepath.Join(dir, "example.com.lock"))
unlock()
}
func TestNoopLocker(t *testing.T) {
locker := noopLocker{}
unlock, err := locker.Lock(context.Background(), "example.com")
require.NoError(t, err)
require.NotNil(t, unlock)
unlock()
}
func TestNewCertLockerDefaultsToFlock(t *testing.T) {
dir := t.TempDir()
// t.Setenv registers cleanup to restore the original value.
// os.Unsetenv is needed because the production code uses LookupEnv,
// which distinguishes "empty" from "not set".
t.Setenv("KUBERNETES_SERVICE_HOST", "")
os.Unsetenv("KUBERNETES_SERVICE_HOST")
locker := newCertLocker(CertLockAuto, dir, nil)
_, ok := locker.(*flockLocker)
assert.True(t, ok, "auto without k8s env should select flockLocker")
}
func TestNewCertLockerExplicitFlock(t *testing.T) {
dir := t.TempDir()
locker := newCertLocker(CertLockFlock, dir, nil)
_, ok := locker.(*flockLocker)
assert.True(t, ok, "explicit flock should select flockLocker")
}
func TestNewCertLockerK8sFallsBackToFlock(t *testing.T) {
dir := t.TempDir()
// k8s-lease without SA files should fall back to flock.
locker := newCertLocker(CertLockK8sLease, dir, nil)
_, ok := locker.(*flockLocker)
assert.True(t, ok, "k8s-lease without SA should fall back to flockLocker")
}

View File

@@ -0,0 +1,336 @@
package acme
import (
"context"
"crypto/tls"
"crypto/x509"
"encoding/asn1"
"encoding/binary"
"fmt"
"net"
"slices"
"sync"
"time"
log "github.com/sirupsen/logrus"
"golang.org/x/crypto/acme"
"golang.org/x/crypto/acme/autocert"
"github.com/netbirdio/netbird/shared/management/domain"
)
// OID for the SCT list extension (1.3.6.1.4.1.11129.2.4.2)
var oidSCTList = asn1.ObjectIdentifier{1, 3, 6, 1, 4, 1, 11129, 2, 4, 2}
type certificateNotifier interface {
NotifyCertificateIssued(ctx context.Context, accountID, serviceID, domain string) error
}
type domainState int
const (
domainPending domainState = iota
domainReady
domainFailed
)
type domainInfo struct {
accountID string
serviceID string
state domainState
err string
}
// Manager wraps autocert.Manager with domain tracking and cross-replica
// coordination via a pluggable locking strategy. The locker prevents
// duplicate ACME requests when multiple replicas share a certificate cache.
type Manager struct {
*autocert.Manager
certDir string
locker certLocker
mu sync.RWMutex
domains map[domain.Domain]*domainInfo
certNotifier certificateNotifier
logger *log.Logger
}
// NewManager creates a new ACME certificate manager. The certDir is used
// for caching certificates. The lockMethod controls cross-replica
// coordination strategy (see CertLockMethod constants).
func NewManager(certDir, acmeURL string, notifier certificateNotifier, logger *log.Logger, lockMethod CertLockMethod) *Manager {
if logger == nil {
logger = log.StandardLogger()
}
mgr := &Manager{
certDir: certDir,
locker: newCertLocker(lockMethod, certDir, logger),
domains: make(map[domain.Domain]*domainInfo),
certNotifier: notifier,
logger: logger,
}
mgr.Manager = &autocert.Manager{
Prompt: autocert.AcceptTOS,
HostPolicy: mgr.hostPolicy,
Cache: autocert.DirCache(certDir),
Client: &acme.Client{
DirectoryURL: acmeURL,
},
}
return mgr
}
func (mgr *Manager) hostPolicy(_ context.Context, host string) error {
if h, _, err := net.SplitHostPort(host); err == nil {
host = h
}
mgr.mu.RLock()
_, exists := mgr.domains[domain.Domain(host)]
mgr.mu.RUnlock()
if !exists {
return fmt.Errorf("unknown domain %q", host)
}
return nil
}
// AddDomain registers a domain for ACME certificate prefetching.
func (mgr *Manager) AddDomain(d domain.Domain, accountID, serviceID string) {
mgr.mu.Lock()
mgr.domains[d] = &domainInfo{
accountID: accountID,
serviceID: serviceID,
state: domainPending,
}
mgr.mu.Unlock()
go mgr.prefetchCertificate(d)
}
// prefetchCertificate proactively triggers certificate generation for a domain.
// It acquires a distributed lock to prevent multiple replicas from issuing
// duplicate ACME requests. The second replica will block until the first
// finishes, then find the certificate in the cache.
func (mgr *Manager) prefetchCertificate(d domain.Domain) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
name := d.PunycodeString()
mgr.logger.Infof("acquiring cert lock for domain %q", name)
lockStart := time.Now()
unlock, err := mgr.locker.Lock(ctx, name)
if err != nil {
mgr.logger.Warnf("acquire cert lock for domain %q, proceeding without lock: %v", name, err)
} else {
mgr.logger.Infof("acquired cert lock for domain %q in %s", name, time.Since(lockStart))
defer unlock()
}
hello := &tls.ClientHelloInfo{
ServerName: name,
Conn: &dummyConn{ctx: ctx},
}
start := time.Now()
cert, err := mgr.GetCertificate(hello)
elapsed := time.Since(start)
if err != nil {
mgr.logger.Warnf("prefetch certificate for domain %q: %v", name, err)
mgr.setDomainState(d, domainFailed, err.Error())
return
}
mgr.setDomainState(d, domainReady, "")
now := time.Now()
if cert != nil && cert.Leaf != nil {
leaf := cert.Leaf
mgr.logger.Infof("certificate for domain %q ready in %s: serial=%s SANs=%v notBefore=%s, notAfter=%s, now=%s",
name, elapsed.Round(time.Millisecond),
leaf.SerialNumber.Text(16),
leaf.DNSNames,
leaf.NotBefore.UTC().Format(time.RFC3339),
leaf.NotAfter.UTC().Format(time.RFC3339),
now.UTC().Format(time.RFC3339),
)
mgr.logCertificateDetails(name, leaf, now)
} else {
mgr.logger.Infof("certificate for domain %q ready in %s", name, elapsed.Round(time.Millisecond))
}
mgr.mu.RLock()
info := mgr.domains[d]
mgr.mu.RUnlock()
if info != nil && mgr.certNotifier != nil {
if err := mgr.certNotifier.NotifyCertificateIssued(ctx, info.accountID, info.serviceID, name); err != nil {
mgr.logger.Warnf("notify certificate ready for domain %q: %v", name, err)
}
}
}
func (mgr *Manager) setDomainState(d domain.Domain, state domainState, errMsg string) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
if info, ok := mgr.domains[d]; ok {
info.state = state
info.err = errMsg
}
}
// logCertificateDetails logs certificate validity and SCT timestamps.
func (mgr *Manager) logCertificateDetails(domain string, cert *x509.Certificate, now time.Time) {
if cert.NotBefore.After(now) {
mgr.logger.Warnf("certificate for %q NotBefore is in the future by %v", domain, cert.NotBefore.Sub(now))
}
sctTimestamps := mgr.parseSCTTimestamps(cert)
if len(sctTimestamps) == 0 {
return
}
for i, sctTime := range sctTimestamps {
if sctTime.After(now) {
mgr.logger.Warnf("certificate for %q SCT[%d] timestamp is in the future: %v (by %v)",
domain, i, sctTime.UTC(), sctTime.Sub(now))
} else {
mgr.logger.Debugf("certificate for %q SCT[%d] timestamp: %v (%v in the past)",
domain, i, sctTime.UTC(), now.Sub(sctTime))
}
}
}
// parseSCTTimestamps extracts SCT timestamps from a certificate.
func (mgr *Manager) parseSCTTimestamps(cert *x509.Certificate) []time.Time {
var timestamps []time.Time
for _, ext := range cert.Extensions {
if !ext.Id.Equal(oidSCTList) {
continue
}
// The extension value is an OCTET STRING containing the SCT list
var sctListBytes []byte
if _, err := asn1.Unmarshal(ext.Value, &sctListBytes); err != nil {
mgr.logger.Debugf("failed to unmarshal SCT list outer wrapper: %v", err)
continue
}
// SCT list format: 2-byte length prefix, then concatenated SCTs
if len(sctListBytes) < 2 {
continue
}
listLen := int(binary.BigEndian.Uint16(sctListBytes[:2]))
data := sctListBytes[2:]
if len(data) < listLen {
continue
}
// Parse individual SCTs
offset := 0
for offset < listLen {
if offset+2 > len(data) {
break
}
sctLen := int(binary.BigEndian.Uint16(data[offset : offset+2]))
offset += 2
if offset+sctLen > len(data) {
break
}
sctData := data[offset : offset+sctLen]
offset += sctLen
// SCT format: version (1) + log_id (32) + timestamp (8) + ...
if len(sctData) < 41 {
continue
}
// Timestamp is at offset 33 (after version + log_id), 8 bytes, milliseconds since epoch
tsMillis := binary.BigEndian.Uint64(sctData[33:41])
ts := time.UnixMilli(int64(tsMillis))
timestamps = append(timestamps, ts)
}
}
return timestamps
}
// dummyConn implements net.Conn to provide context for certificate fetching.
type dummyConn struct {
ctx context.Context
}
func (c *dummyConn) Read(b []byte) (n int, err error) { return 0, nil }
func (c *dummyConn) Write(b []byte) (n int, err error) { return len(b), nil }
func (c *dummyConn) Close() error { return nil }
func (c *dummyConn) LocalAddr() net.Addr { return nil }
func (c *dummyConn) RemoteAddr() net.Addr { return nil }
func (c *dummyConn) SetDeadline(t time.Time) error { return nil }
func (c *dummyConn) SetReadDeadline(t time.Time) error { return nil }
func (c *dummyConn) SetWriteDeadline(t time.Time) error { return nil }
// RemoveDomain removes a domain from tracking.
func (mgr *Manager) RemoveDomain(d domain.Domain) {
mgr.mu.Lock()
defer mgr.mu.Unlock()
delete(mgr.domains, d)
}
// PendingCerts returns the number of certificates currently being prefetched.
func (mgr *Manager) PendingCerts() int {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
var n int
for _, info := range mgr.domains {
if info.state == domainPending {
n++
}
}
return n
}
// TotalDomains returns the total number of registered domains.
func (mgr *Manager) TotalDomains() int {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
return len(mgr.domains)
}
// PendingDomains returns the domain names currently being prefetched.
func (mgr *Manager) PendingDomains() []string {
return mgr.domainsByState(domainPending)
}
// ReadyDomains returns domain names that have successfully obtained certificates.
func (mgr *Manager) ReadyDomains() []string {
return mgr.domainsByState(domainReady)
}
// FailedDomains returns domain names that failed certificate prefetch, mapped to their error.
func (mgr *Manager) FailedDomains() map[string]string {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
result := make(map[string]string)
for d, info := range mgr.domains {
if info.state == domainFailed {
result[d.PunycodeString()] = info.err
}
}
return result
}
func (mgr *Manager) domainsByState(state domainState) []string {
mgr.mu.RLock()
defer mgr.mu.RUnlock()
var domains []string
for d, info := range mgr.domains {
if info.state == state {
domains = append(domains, d.PunycodeString())
}
}
slices.Sort(domains)
return domains
}

View File

@@ -0,0 +1,102 @@
package acme
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestHostPolicy(t *testing.T) {
mgr := NewManager(t.TempDir(), "https://acme.example.com/directory", nil, nil, "")
mgr.AddDomain("example.com", "acc1", "rp1")
// Wait for the background prefetch goroutine to finish so the temp dir
// can be cleaned up without a race.
t.Cleanup(func() {
assert.Eventually(t, func() bool {
return mgr.PendingCerts() == 0
}, 30*time.Second, 50*time.Millisecond)
})
tests := []struct {
name string
host string
wantErr bool
}{
{
name: "exact domain match",
host: "example.com",
},
{
name: "domain with port",
host: "example.com:443",
},
{
name: "unknown domain",
host: "unknown.com",
wantErr: true,
},
{
name: "unknown domain with port",
host: "unknown.com:443",
wantErr: true,
},
{
name: "empty host",
host: "",
wantErr: true,
},
{
name: "port only",
host: ":443",
wantErr: true,
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
err := mgr.hostPolicy(context.Background(), tc.host)
if tc.wantErr {
require.Error(t, err)
assert.Contains(t, err.Error(), "unknown domain")
} else {
assert.NoError(t, err)
}
})
}
}
func TestDomainStates(t *testing.T) {
mgr := NewManager(t.TempDir(), "https://acme.example.com/directory", nil, nil, "")
assert.Equal(t, 0, mgr.PendingCerts(), "initially zero")
assert.Equal(t, 0, mgr.TotalDomains(), "initially zero domains")
assert.Empty(t, mgr.PendingDomains())
assert.Empty(t, mgr.ReadyDomains())
assert.Empty(t, mgr.FailedDomains())
// AddDomain starts as pending, then the prefetch goroutine will fail
// (no real ACME server) and transition to failed.
mgr.AddDomain("a.example.com", "acc1", "rp1")
mgr.AddDomain("b.example.com", "acc1", "rp1")
assert.Equal(t, 2, mgr.TotalDomains(), "two domains registered")
// Pending domains should eventually drain after prefetch goroutines finish.
assert.Eventually(t, func() bool {
return mgr.PendingCerts() == 0
}, 30*time.Second, 100*time.Millisecond, "pending certs should return to zero after prefetch completes")
assert.Empty(t, mgr.PendingDomains())
assert.Equal(t, 2, mgr.TotalDomains(), "total domains unchanged")
// With a fake ACME URL, both should have failed.
failed := mgr.FailedDomains()
assert.Len(t, failed, 2, "both domains should have failed")
assert.Contains(t, failed, "a.example.com")
assert.Contains(t, failed, "b.example.com")
assert.Empty(t, mgr.ReadyDomains())
}