Files
decent-webui/internal/mesh/mesh.go
jbergner 0bf22b6120
All checks were successful
release-tag / release-image (push) Successful in 1m25s
fix self
2025-09-27 23:53:17 +02:00

581 lines
13 KiB
Go

package mesh
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io"
"log"
"net"
"net/http"
"os"
"slices"
"strconv"
"strings"
"sync"
"time"
)
/*** Types & Config ***/
type Config struct {
BindAddr string // e.g. ":9090"
AdvertURL string // e.g. "http://10.0.0.5:9090"
Seeds []string // other peers' mesh base URLs
ClusterSecret string // HMAC key
EnableDiscovery bool
DiscoveryAddress string // "239.8.8.8:9898"
}
type Peer struct {
URL string `json:"url"`
LastSeen time.Time `json:"lastSeen"`
Self bool `json:"self"`
OwnerHint int `json:"ownerHint"` // optional
}
type Item struct {
ID string `json:"id"`
Name string `json:"name"`
UpdatedAt int64 `json:"updatedAt"`
Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes
}
type Snapshot struct {
Items []Item `json:"items"`
}
// Callbacks that your app provides
type Callbacks struct {
GetSnapshot func(ctx context.Context) (Snapshot, error)
ApplyRemote func(ctx context.Context, s Snapshot) error
BlobOpen func(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error)
}
/*** Node ***/
type Node struct {
cfg Config
cbs Callbacks
self Peer
mu sync.RWMutex
peers map[string]*Peer
client *http.Client
srv *http.Server
stop chan struct{}
wg sync.WaitGroup
}
func New(cfg Config, cbs Callbacks) (*Node, error) {
if cfg.BindAddr == "" || cfg.AdvertURL == "" {
return nil, errors.New("mesh: BindAddr and AdvertURL required")
}
if cfg.ClusterSecret == "" {
return nil, errors.New("mesh: ClusterSecret required")
}
n := &Node{
cfg: cfg,
cbs: cbs,
self: Peer{URL: cfg.AdvertURL, LastSeen: time.Now(), Self: true},
peers: make(map[string]*Peer),
client: &http.Client{
Timeout: 5 * time.Second,
},
stop: make(chan struct{}),
}
return n, nil
}
/*** HMAC helpers ***/
func (n *Node) sign(b []byte) string {
m := hmac.New(sha256.New, []byte(n.cfg.ClusterSecret))
m.Write(b)
return hex.EncodeToString(m.Sum(nil))
}
func (n *Node) verify(b []byte, sig string) bool {
want := n.sign(b)
return hmac.Equal([]byte(want), []byte(sig))
}
/*** HTTP handlers (control plane) ***/
func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
if !n.verify(body, r.Header.Get("X-Mesh-Sig")) {
http.Error(w, "bad signature", http.StatusUnauthorized)
return
}
var p Peer
if err := json.Unmarshal(body, &p); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
p.LastSeen = time.Now()
n.mu.Lock()
if existing, ok := n.peers[p.URL]; ok {
existing.LastSeen = p.LastSeen
} else if p.URL != n.self.URL {
cp := p
n.peers[p.URL] = &cp
}
n.mu.Unlock()
w.WriteHeader(http.StatusNoContent)
}
func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) {
n.mu.RLock()
defer n.mu.RUnlock()
var list []Peer
list = append(list, n.self)
for _, p := range n.peers {
//p.Self = false
list = append(list, *p)
}
writeJSON(w, http.StatusOK, list)
}
func (n *Node) syncHandler(w http.ResponseWriter, r *http.Request) {
// verify signature
body, _ := io.ReadAll(r.Body)
if !n.verify(body, r.Header.Get("X-Mesh-Sig")) {
http.Error(w, "bad signature", http.StatusUnauthorized)
return
}
var s Snapshot
if err := json.Unmarshal(body, &s); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
// apply
if err := n.cbs.ApplyRemote(r.Context(), s); err != nil {
http.Error(w, "apply error: "+err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusNoContent)
}
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
/*** Serve ***/
func (n *Node) Serve() error {
mux := http.NewServeMux()
mux.HandleFunc("/mesh/peers", n.peersHandler)
mux.HandleFunc("/mesh/hello", n.helloHandler)
mux.HandleFunc("/mesh/blob", n.blobHandler)
mux.HandleFunc("/mesh/sync", n.syncHandler)
n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux}
// background loops
n.wg.Add(1)
go func() {
defer n.wg.Done()
n.loopSeeder()
}()
n.wg.Add(1)
go func() { defer n.wg.Done(); n.loopPeerExchange() }()
if n.cfg.EnableDiscovery && n.cfg.DiscoveryAddress != "" {
n.wg.Add(2)
go func() {
defer n.wg.Done()
n.loopBeaconSend()
}()
go func() {
defer n.wg.Done()
n.loopBeaconRecv()
}()
}
n.wg.Add(1)
go func() {
defer n.wg.Done()
n.loopAntiEntropy()
}()
// http server
errc := make(chan error, 1)
go func() {
errc <- n.srv.ListenAndServe()
}()
select {
case err := <-errc:
return err
case <-n.stop:
return http.ErrServerClosed
}
}
func (n *Node) Close(ctx context.Context) error {
close(n.stop)
if n.srv != nil {
_ = n.srv.Shutdown(ctx)
}
n.wg.Wait()
return nil
}
/*** Loops ***/
func (n *Node) loopPeerExchange() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
}
// Seeds abfragen
for _, s := range n.cfg.Seeds {
if strings.TrimSpace(s) == "" {
continue
}
resp, err := n.client.Get(strings.TrimRight(s, "/") + "/mesh/peers")
if err != nil {
continue
}
var list []Peer
if json.NewDecoder(resp.Body).Decode(&list) == nil {
n.mu.Lock()
for _, p := range list {
if p.URL == "" || p.URL == n.self.URL {
continue
}
if _, ok := n.peers[p.URL]; !ok {
cp := p
n.peers[p.URL] = &cp
}
}
n.mu.Unlock()
}
resp.Body.Close()
}
}
}
func (n *Node) loopSeeder() {
// attempt to hello known seeds every 5s at start, then every 30s
backoff := 5 * time.Second
for {
select {
case <-n.stop:
return
case <-time.After(backoff):
}
if len(n.cfg.Seeds) == 0 {
backoff = 30 * time.Second
continue
}
for _, s := range n.cfg.Seeds {
if s == "" || s == n.self.URL {
continue
}
_ = n.sendHello(s)
}
backoff = 30 * time.Second
}
}
func (n *Node) loopAntiEntropy() {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
targets = append(targets, url)
}
n.mu.RUnlock()
if len(targets) == 0 {
continue
}
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
snap, err := n.cbs.GetSnapshot(ctx)
cancel()
if err != nil {
continue
}
for _, url := range targets {
_ = n.sendSync(url, snap)
}
}
}
}
func (n *Node) loopBeaconSend() {
addr, err := net.ResolveUDPAddr("udp", n.cfg.DiscoveryAddress)
if err != nil {
log.Printf("mesh beacon send resolve: %v", err)
return
}
conn, err := net.DialUDP("udp", nil, addr)
if err != nil {
log.Printf("mesh beacon send dial: %v", err)
return
}
defer conn.Close()
type beacon struct {
URL string `json:"url"`
}
t := time.NewTicker(5 * time.Second)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
b, _ := json.Marshal(beacon{URL: n.self.URL})
_, _ = conn.Write(b)
}
}
}
func (n *Node) loopBeaconRecv() {
addr, err := net.ResolveUDPAddr("udp", n.cfg.DiscoveryAddress)
if err != nil {
log.Printf("mesh beacon recv resolve: %v", err)
return
}
// enable multicast receive
l, err := net.ListenMulticastUDP("udp", nil, addr)
if err != nil {
log.Printf("mesh beacon recv listen: %v", err)
return
}
defer l.Close()
_ = l.SetReadBuffer(1 << 16)
buf := make([]byte, 2048)
for {
select {
case <-n.stop:
return
default:
}
_ = l.SetDeadline(time.Now().Add(6 * time.Second))
nr, _, err := l.ReadFromUDP(buf)
if err != nil {
if ne, ok := err.(net.Error); ok && ne.Timeout() {
continue
}
continue
}
var msg struct{ URL string }
if err := json.Unmarshal(buf[:nr], &msg); err == nil {
if msg.URL != "" && msg.URL != n.self.URL {
_ = n.sendHello(msg.URL)
}
}
}
}
/*** Outgoing ***/
func (n *Node) sendHello(url string) error {
p := n.self
b, _ := json.Marshal(p)
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
resp, err := n.client.Do(req)
if err == nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
return err
}
func (n *Node) sendSync(url string, s Snapshot) error {
b, _ := json.Marshal(s)
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
resp, err := n.client.Do(req)
if err == nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
return err
}
// PeerList liefert eine Kopie der bekannten Peers inkl. Self.
func (n *Node) PeerList() []Peer {
n.mu.RLock()
defer n.mu.RUnlock()
out := make([]Peer, 0, len(n.peers)+1)
out = append(out, n.self)
for _, p := range n.peers {
cp := *p
cp.Self = false
out = append(out, cp)
}
return out
}
// SyncNow verschickt sofort den aktuellen Snapshot an alle bekannten Peers.
func (n *Node) SyncNow(ctx context.Context) error {
snap, err := n.cbs.GetSnapshot(ctx)
if err != nil {
return err
}
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
targets = append(targets, url)
}
n.mu.RUnlock()
for _, u := range targets {
_ = n.sendSync(u, snap)
}
return nil
}
/*** Utilities ***/
// OwnerHint is a simple, optional mapping to distribute responsibility.
func OwnerHint(id string, peers []string) int {
if len(peers) == 0 {
return 0
}
h := crc32.ChecksumIEEE([]byte(id))
return int(h % uint32(len(peers)))
}
// Helpers to load from ENV quickly
func FromEnv() Config {
return Config{
BindAddr: getenvDefault("MESH_BIND", ":9090"),
AdvertURL: os.Getenv("MESH_ADVERT"),
Seeds: splitCSV(os.Getenv("MESH_SEEDS")),
ClusterSecret: os.Getenv("MESH_CLUSTER_SECRET"),
EnableDiscovery: os.Getenv("MESH_ENABLE_DISCOVERY") == "true",
DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"),
}
}
func splitCSV(s string) []string {
if strings.TrimSpace(s) == "" {
return nil
}
parts := strings.Split(s, ",")
for i := range parts {
parts[i] = strings.TrimSpace(parts[i])
}
// dedup
out := make([]string, 0, len(parts))
for _, p := range parts {
if p == "" || slices.Contains(out, p) {
continue
}
out = append(out, p)
}
return out
}
func getenvDefault(k, def string) string {
v := os.Getenv(k)
if v == "" {
return def
}
return v
}
// POST /mesh/blob (Body: {"id":<int64>}) -> streamt den Blob
func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
if !n.verify(body, r.Header.Get("X-Mesh-Sig")) {
http.Error(w, "bad signature", http.StatusUnauthorized)
return
}
var req struct {
ID string `json:"id"`
}
if err := json.Unmarshal(body, &req); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
if n.cbs.BlobOpen == nil {
http.Error(w, "blob unavailable", http.StatusNotFound)
return
}
rc, name, ct, size, err := n.cbs.BlobOpen(r.Context(), req.ID)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
defer rc.Close()
if ct == "" {
ct = "application/octet-stream"
}
if size > 0 {
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
}
w.Header().Set("Content-Type", ct)
w.Header().Set("X-Blob-Name", name)
_, _ = io.Copy(w, rc)
}
// interner Helper: signierter Blob-Request an einen Peer
func (n *Node) sendBlobRequest(url string, id string) (*http.Response, error) {
b, _ := json.Marshal(struct {
ID string `json:"id"`
}{ID: id})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
return n.client.Do(req)
}
// Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen
func (n *Node) FetchBlobAny(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) {
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
targets = append(targets, url)
}
n.mu.RUnlock()
if len(targets) == 0 {
// Fallback: Seeds probieren
targets = append(targets, n.cfg.Seeds...)
}
for _, u := range targets {
if strings.TrimSpace(u) == "" || u == n.self.URL {
continue
}
resp, err := n.sendBlobRequest(u, id)
if err != nil {
continue
}
if resp.StatusCode == http.StatusOK {
name := resp.Header.Get("X-Blob-Name")
ct := resp.Header.Get("Content-Type")
var size int64 = -1
if cl := resp.Header.Get("Content-Length"); cl != "" {
if s, err := strconv.ParseInt(cl, 10, 64); err == nil {
size = s
}
}
// Caller muss resp.Body schließen
return resp.Body, name, ct, size, nil
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
return nil, "", "", 0, fmt.Errorf("blob %s not found on peers", id)
}