mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 07:16:38 +00:00
* implement reverse proxy --------- Co-authored-by: Alisdair MacLeod <git@alisdairmacleod.co.uk> Co-authored-by: mlsmaycon <mlsmaycon@gmail.com> Co-authored-by: Eduard Gert <kontakt@eduardgert.de> Co-authored-by: Viktor Liu <viktor@netbird.io> Co-authored-by: Diego Noguês <diego.sure@gmail.com> Co-authored-by: Diego Noguês <49420+diegocn@users.noreply.github.com> Co-authored-by: Bethuel Mmbaga <bethuelmbaga12@gmail.com> Co-authored-by: Zoltan Papp <zoltan.pmail@gmail.com> Co-authored-by: Ashley Mensah <ashleyamo982@gmail.com>
282 lines
8.1 KiB
Go
282 lines
8.1 KiB
Go
// Package k8s provides a lightweight Kubernetes API client for coordination
|
|
// Leases. It uses raw HTTP calls against the mounted service account
|
|
// credentials, avoiding a dependency on client-go.
|
|
package k8s
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/sha256"
|
|
"crypto/tls"
|
|
"crypto/x509"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
saTokenPath = "/var/run/secrets/kubernetes.io/serviceaccount/token" //nolint:gosec
|
|
saNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
|
|
saCACertPath = "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt"
|
|
|
|
leaseAPIPath = "/apis/coordination.k8s.io/v1"
|
|
)
|
|
|
|
// ErrConflict is returned when a Lease update fails due to a
|
|
// resourceVersion mismatch (another writer updated the object first).
|
|
var ErrConflict = errors.New("conflict: resource version mismatch")
|
|
|
|
// Lease represents a coordination.k8s.io/v1 Lease object with only the
|
|
// fields needed for distributed locking.
|
|
type Lease struct {
|
|
APIVersion string `json:"apiVersion"`
|
|
Kind string `json:"kind"`
|
|
Metadata LeaseMetadata `json:"metadata"`
|
|
Spec LeaseSpec `json:"spec"`
|
|
}
|
|
|
|
// LeaseMetadata holds the standard k8s object metadata fields used by Leases.
|
|
type LeaseMetadata struct {
|
|
Name string `json:"name"`
|
|
Namespace string `json:"namespace,omitempty"`
|
|
ResourceVersion string `json:"resourceVersion,omitempty"`
|
|
Annotations map[string]string `json:"annotations,omitempty"`
|
|
}
|
|
|
|
// LeaseSpec holds the Lease specification fields.
|
|
type LeaseSpec struct {
|
|
HolderIdentity *string `json:"holderIdentity"`
|
|
LeaseDurationSeconds *int32 `json:"leaseDurationSeconds,omitempty"`
|
|
AcquireTime *MicroTime `json:"acquireTime"`
|
|
RenewTime *MicroTime `json:"renewTime"`
|
|
}
|
|
|
|
// MicroTime wraps time.Time with Kubernetes MicroTime JSON formatting.
|
|
type MicroTime struct {
|
|
time.Time
|
|
}
|
|
|
|
const microTimeFormat = "2006-01-02T15:04:05.000000Z"
|
|
|
|
// MarshalJSON implements json.Marshaler with k8s MicroTime format.
|
|
func (t *MicroTime) MarshalJSON() ([]byte, error) {
|
|
return json.Marshal(t.UTC().Format(microTimeFormat))
|
|
}
|
|
|
|
// UnmarshalJSON implements json.Unmarshaler with k8s MicroTime format.
|
|
func (t *MicroTime) UnmarshalJSON(data []byte) error {
|
|
var s string
|
|
if err := json.Unmarshal(data, &s); err != nil {
|
|
return err
|
|
}
|
|
if s == "" {
|
|
t.Time = time.Time{}
|
|
return nil
|
|
}
|
|
|
|
parsed, err := time.Parse(microTimeFormat, s)
|
|
if err != nil {
|
|
return fmt.Errorf("parse MicroTime %q: %w", s, err)
|
|
}
|
|
t.Time = parsed
|
|
return nil
|
|
}
|
|
|
|
// LeaseClient talks to the Kubernetes coordination API using raw HTTP.
|
|
type LeaseClient struct {
|
|
baseURL string
|
|
namespace string
|
|
httpClient *http.Client
|
|
}
|
|
|
|
// NewLeaseClient creates a client that authenticates via the pod's
|
|
// mounted service account. It reads the namespace and CA certificate
|
|
// at construction time (they don't rotate) but reads the bearer token
|
|
// fresh on each request (tokens rotate).
|
|
func NewLeaseClient() (*LeaseClient, error) {
|
|
host := os.Getenv("KUBERNETES_SERVICE_HOST")
|
|
port := os.Getenv("KUBERNETES_SERVICE_PORT")
|
|
if host == "" || port == "" {
|
|
return nil, fmt.Errorf("KUBERNETES_SERVICE_HOST/PORT not set")
|
|
}
|
|
|
|
ns, err := os.ReadFile(saNamespacePath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read namespace from %s: %w", saNamespacePath, err)
|
|
}
|
|
|
|
caCert, err := os.ReadFile(saCACertPath)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read CA cert from %s: %w", saCACertPath, err)
|
|
}
|
|
|
|
pool := x509.NewCertPool()
|
|
if !pool.AppendCertsFromPEM(caCert) {
|
|
return nil, fmt.Errorf("parse CA certificate from %s", saCACertPath)
|
|
}
|
|
|
|
return &LeaseClient{
|
|
baseURL: fmt.Sprintf("https://%s:%s", host, port),
|
|
namespace: strings.TrimSpace(string(ns)),
|
|
httpClient: &http.Client{
|
|
Timeout: 10 * time.Second,
|
|
Transport: &http.Transport{
|
|
TLSClientConfig: &tls.Config{
|
|
RootCAs: pool,
|
|
},
|
|
},
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
// Namespace returns the namespace this client operates in.
|
|
func (c *LeaseClient) Namespace() string {
|
|
return c.namespace
|
|
}
|
|
|
|
// Get retrieves a Lease by name. Returns (nil, nil) if the Lease does not exist.
|
|
func (c *LeaseClient) Get(ctx context.Context, name string) (*Lease, error) {
|
|
url := fmt.Sprintf("%s%s/namespaces/%s/leases/%s", c.baseURL, leaseAPIPath, c.namespace, name)
|
|
|
|
resp, err := c.doRequest(ctx, http.MethodGet, url, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode == http.StatusNotFound {
|
|
return nil, nil //nolint:nilnil
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, c.readError(resp)
|
|
}
|
|
|
|
var lease Lease
|
|
if err := json.NewDecoder(resp.Body).Decode(&lease); err != nil {
|
|
return nil, fmt.Errorf("decode lease response: %w", err)
|
|
}
|
|
return &lease, nil
|
|
}
|
|
|
|
// Create creates a new Lease. Returns the created Lease with server-assigned
|
|
// fields like resourceVersion populated.
|
|
func (c *LeaseClient) Create(ctx context.Context, lease *Lease) (*Lease, error) {
|
|
url := fmt.Sprintf("%s%s/namespaces/%s/leases", c.baseURL, leaseAPIPath, c.namespace)
|
|
|
|
lease.APIVersion = "coordination.k8s.io/v1"
|
|
lease.Kind = "Lease"
|
|
if lease.Metadata.Namespace == "" {
|
|
lease.Metadata.Namespace = c.namespace
|
|
}
|
|
|
|
resp, err := c.doRequest(ctx, http.MethodPost, url, lease)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode == http.StatusConflict {
|
|
return nil, ErrConflict
|
|
}
|
|
if resp.StatusCode != http.StatusCreated {
|
|
return nil, c.readError(resp)
|
|
}
|
|
|
|
var created Lease
|
|
if err := json.NewDecoder(resp.Body).Decode(&created); err != nil {
|
|
return nil, fmt.Errorf("decode created lease: %w", err)
|
|
}
|
|
return &created, nil
|
|
}
|
|
|
|
// Update replaces a Lease. The lease.Metadata.ResourceVersion must match
|
|
// the current server value (optimistic concurrency). Returns ErrConflict
|
|
// on version mismatch.
|
|
func (c *LeaseClient) Update(ctx context.Context, lease *Lease) (*Lease, error) {
|
|
url := fmt.Sprintf("%s%s/namespaces/%s/leases/%s", c.baseURL, leaseAPIPath, c.namespace, lease.Metadata.Name)
|
|
|
|
lease.APIVersion = "coordination.k8s.io/v1"
|
|
lease.Kind = "Lease"
|
|
|
|
resp, err := c.doRequest(ctx, http.MethodPut, url, lease)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer func() { _ = resp.Body.Close() }()
|
|
|
|
if resp.StatusCode == http.StatusConflict {
|
|
return nil, ErrConflict
|
|
}
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, c.readError(resp)
|
|
}
|
|
|
|
var updated Lease
|
|
if err := json.NewDecoder(resp.Body).Decode(&updated); err != nil {
|
|
return nil, fmt.Errorf("decode updated lease: %w", err)
|
|
}
|
|
return &updated, nil
|
|
}
|
|
|
|
func (c *LeaseClient) doRequest(ctx context.Context, method, url string, body any) (*http.Response, error) {
|
|
token, err := readToken()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("read service account token: %w", err)
|
|
}
|
|
|
|
var bodyReader io.Reader
|
|
if body != nil {
|
|
data, err := json.Marshal(body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("marshal request body: %w", err)
|
|
}
|
|
bodyReader = bytes.NewReader(data)
|
|
}
|
|
|
|
req, err := http.NewRequestWithContext(ctx, method, url, bodyReader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("create request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Authorization", "Bearer "+token)
|
|
req.Header.Set("Accept", "application/json")
|
|
if body != nil {
|
|
req.Header.Set("Content-Type", "application/json")
|
|
}
|
|
|
|
return c.httpClient.Do(req)
|
|
}
|
|
|
|
func readToken() (string, error) {
|
|
data, err := os.ReadFile(saTokenPath)
|
|
if err != nil {
|
|
return "", fmt.Errorf("read %s: %w", saTokenPath, err)
|
|
}
|
|
return strings.TrimSpace(string(data)), nil
|
|
}
|
|
|
|
func (c *LeaseClient) readError(resp *http.Response) error {
|
|
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
|
|
return fmt.Errorf("k8s API %s %d: %s", resp.Request.URL.Path, resp.StatusCode, string(body))
|
|
}
|
|
|
|
// LeaseNameForDomain returns a deterministic, DNS-label-safe Lease name
|
|
// for the given domain. The domain is hashed to avoid dots and length issues.
|
|
func LeaseNameForDomain(domain string) string {
|
|
h := sha256.Sum256([]byte(domain))
|
|
return "cert-lock-" + hex.EncodeToString(h[:8])
|
|
}
|
|
|
|
// InCluster reports whether the process is running inside a Kubernetes pod
|
|
// by checking for the KUBERNETES_SERVICE_HOST environment variable.
|
|
func InCluster() bool {
|
|
_, exists := os.LookupEnv("KUBERNETES_SERVICE_HOST")
|
|
return exists
|
|
}
|