Files
netbird/proxy/internal/k8s/lease.go
Pascal Fischer f53155562f [management, reverse proxy] Add reverse proxy feature (#5291)
* implement reverse proxy


---------

Co-authored-by: Alisdair MacLeod <git@alisdairmacleod.co.uk>
Co-authored-by: mlsmaycon <mlsmaycon@gmail.com>
Co-authored-by: Eduard Gert <kontakt@eduardgert.de>
Co-authored-by: Viktor Liu <viktor@netbird.io>
Co-authored-by: Diego Noguês <diego.sure@gmail.com>
Co-authored-by: Diego Noguês <49420+diegocn@users.noreply.github.com>
Co-authored-by: Bethuel Mmbaga <bethuelmbaga12@gmail.com>
Co-authored-by: Zoltan Papp <zoltan.pmail@gmail.com>
Co-authored-by: Ashley Mensah <ashleyamo982@gmail.com>
2026-02-13 19:37:43 +01:00

282 lines
8.1 KiB
Go

// Package k8s provides a lightweight Kubernetes API client for coordination
// Leases. It uses raw HTTP calls against the mounted service account
// credentials, avoiding a dependency on client-go.
package k8s
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"
)
const (
saTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" //nolint:gosec
saNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
saCACertPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
leaseAPIPath = "/apis/coordination.k8s.io/v1"
)
// ErrConflict is returned when a Lease update fails due to a
// resourceVersion mismatch (another writer updated the object first).
var ErrConflict = errors.New("conflict: resource version mismatch")
// Lease represents a coordination.k8s.io/v1 Lease object with only the
// fields needed for distributed locking.
type Lease struct {
APIVersion string `json:"apiVersion"`
Kind string `json:"kind"`
Metadata LeaseMetadata `json:"metadata"`
Spec LeaseSpec `json:"spec"`
}
// LeaseMetadata holds the standard k8s object metadata fields used by Leases.
type LeaseMetadata struct {
Name string `json:"name"`
Namespace string `json:"namespace,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
}
// LeaseSpec holds the Lease specification fields.
type LeaseSpec struct {
HolderIdentity *string `json:"holderIdentity"`
LeaseDurationSeconds *int32 `json:"leaseDurationSeconds,omitempty"`
AcquireTime *MicroTime `json:"acquireTime"`
RenewTime *MicroTime `json:"renewTime"`
}
// MicroTime wraps time.Time with Kubernetes MicroTime JSON formatting.
type MicroTime struct {
time.Time
}
const microTimeFormat = "2006-01-02T15:04:05.000000Z"
// MarshalJSON implements json.Marshaler with k8s MicroTime format.
func (t *MicroTime) MarshalJSON() ([]byte, error) {
return json.Marshal(t.UTC().Format(microTimeFormat))
}
// UnmarshalJSON implements json.Unmarshaler with k8s MicroTime format.
func (t *MicroTime) UnmarshalJSON(data []byte) error {
var s string
if err := json.Unmarshal(data, &s); err != nil {
return err
}
if s == "" {
t.Time = time.Time{}
return nil
}
parsed, err := time.Parse(microTimeFormat, s)
if err != nil {
return fmt.Errorf("parse MicroTime %q: %w", s, err)
}
t.Time = parsed
return nil
}
// LeaseClient talks to the Kubernetes coordination API using raw HTTP.
type LeaseClient struct {
baseURL string
namespace string
httpClient *http.Client
}
// NewLeaseClient creates a client that authenticates via the pod's
// mounted service account. It reads the namespace and CA certificate
// at construction time (they don't rotate) but reads the bearer token
// fresh on each request (tokens rotate).
func NewLeaseClient() (*LeaseClient, error) {
host := os.Getenv("KUBERNETES_SERVICE_HOST")
port := os.Getenv("KUBERNETES_SERVICE_PORT")
if host == "" || port == "" {
return nil, fmt.Errorf("KUBERNETES_SERVICE_HOST/PORT not set")
}
ns, err := os.ReadFile(saNamespacePath)
if err != nil {
return nil, fmt.Errorf("read namespace from %s: %w", saNamespacePath, err)
}
caCert, err := os.ReadFile(saCACertPath)
if err != nil {
return nil, fmt.Errorf("read CA cert from %s: %w", saCACertPath, err)
}
pool := x509.NewCertPool()
if !pool.AppendCertsFromPEM(caCert) {
return nil, fmt.Errorf("parse CA certificate from %s", saCACertPath)
}
return &LeaseClient{
baseURL: fmt.Sprintf("https://%s:%s", host, port),
namespace: strings.TrimSpace(string(ns)),
httpClient: &http.Client{
Timeout: 10 * time.Second,
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
RootCAs: pool,
},
},
},
}, nil
}
// Namespace returns the namespace this client operates in.
func (c *LeaseClient) Namespace() string {
return c.namespace
}
// Get retrieves a Lease by name. Returns (nil, nil) if the Lease does not exist.
func (c *LeaseClient) Get(ctx context.Context, name string) (*Lease, error) {
url := fmt.Sprintf("%s%s/namespaces/%s/leases/%s", c.baseURL, leaseAPIPath, c.namespace, name)
resp, err := c.doRequest(ctx, http.MethodGet, url, nil)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode == http.StatusNotFound {
return nil, nil //nolint:nilnil
}
if resp.StatusCode != http.StatusOK {
return nil, c.readError(resp)
}
var lease Lease
if err := json.NewDecoder(resp.Body).Decode(&lease); err != nil {
return nil, fmt.Errorf("decode lease response: %w", err)
}
return &lease, nil
}
// Create creates a new Lease. Returns the created Lease with server-assigned
// fields like resourceVersion populated.
func (c *LeaseClient) Create(ctx context.Context, lease *Lease) (*Lease, error) {
url := fmt.Sprintf("%s%s/namespaces/%s/leases", c.baseURL, leaseAPIPath, c.namespace)
lease.APIVersion = "coordination.k8s.io/v1"
lease.Kind = "Lease"
if lease.Metadata.Namespace == "" {
lease.Metadata.Namespace = c.namespace
}
resp, err := c.doRequest(ctx, http.MethodPost, url, lease)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode == http.StatusConflict {
return nil, ErrConflict
}
if resp.StatusCode != http.StatusCreated {
return nil, c.readError(resp)
}
var created Lease
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil {
return nil, fmt.Errorf("decode created lease: %w", err)
}
return &created, nil
}
// Update replaces a Lease. The lease.Metadata.ResourceVersion must match
// the current server value (optimistic concurrency). Returns ErrConflict
// on version mismatch.
func (c *LeaseClient) Update(ctx context.Context, lease *Lease) (*Lease, error) {
url := fmt.Sprintf("%s%s/namespaces/%s/leases/%s", c.baseURL, leaseAPIPath, c.namespace, lease.Metadata.Name)
lease.APIVersion = "coordination.k8s.io/v1"
lease.Kind = "Lease"
resp, err := c.doRequest(ctx, http.MethodPut, url, lease)
if err != nil {
return nil, err
}
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode == http.StatusConflict {
return nil, ErrConflict
}
if resp.StatusCode != http.StatusOK {
return nil, c.readError(resp)
}
var updated Lease
if err := json.NewDecoder(resp.Body).Decode(&updated); err != nil {
return nil, fmt.Errorf("decode updated lease: %w", err)
}
return &updated, nil
}
func (c *LeaseClient) doRequest(ctx context.Context, method, url string, body any) (*http.Response, error) {
token, err := readToken()
if err != nil {
return nil, fmt.Errorf("read service account token: %w", err)
}
var bodyReader io.Reader
if body != nil {
data, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("marshal request body: %w", err)
}
bodyReader = bytes.NewReader(data)
}
req, err := http.NewRequestWithContext(ctx, method, url, bodyReader)
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
req.Header.Set("Authorization", "Bearer "+token)
req.Header.Set("Accept", "application/json")
if body != nil {
req.Header.Set("Content-Type", "application/json")
}
return c.httpClient.Do(req)
}
func readToken() (string, error) {
data, err := os.ReadFile(saTokenPath)
if err != nil {
return "", fmt.Errorf("read %s: %w", saTokenPath, err)
}
return strings.TrimSpace(string(data)), nil
}
func (c *LeaseClient) readError(resp *http.Response) error {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return fmt.Errorf("k8s API %s %d: %s", resp.Request.URL.Path, resp.StatusCode, string(body))
}
// LeaseNameForDomain returns a deterministic, DNS-label-safe Lease name
// for the given domain. The domain is hashed to avoid dots and length issues.
func LeaseNameForDomain(domain string) string {
h := sha256.Sum256([]byte(domain))
return "cert-lock-" + hex.EncodeToString(h[:8])
}
// InCluster reports whether the process is running inside a Kubernetes pod
// by checking for the KUBERNETES_SERVICE_HOST environment variable.
func InCluster() bool {
_, exists := os.LookupEnv("KUBERNETES_SERVICE_HOST")
return exists
}