package mesh import ( "bytes" "context" "crypto/hmac" "crypto/sha256" "encoding/hex" "encoding/json" "errors" "fmt" "hash/crc32" "io" "log" "math/rand/v2" "net" "net/http" "os" "slices" "strconv" "strings" "sync" "time" ) /*** Types & Config ***/ type Config struct { BindAddr string // e.g. ":9090" AdvertURL string // e.g. "http://10.0.0.5:9090" Seeds []string // other peers' mesh base URLs ClusterSecret string // HMAC key EnableDiscovery bool DiscoveryAddress string // "239.8.8.8:9898" PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten) PruneInterval time.Duration // wie oft wird gepruned SyncInterval time.Duration Fanout int // NEU: HelloInterval time.Duration // wie oft pingen HelloFanout int // wie viele Peers pro Tick BlobTimeout time.Duration } type Peer struct { URL string `json:"url"` LastSeen time.Time `json:"lastSeen"` Self bool `json:"self"` OwnerHint int `json:"ownerHint"` // optional } type Item struct { ID string `json:"id"` Name string `json:"name"` UpdatedAt int64 `json:"updatedAt"` Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes Owner string `json:"owner"` } type Snapshot struct { Items []Item `json:"items"` } // Callbacks that your app provides type Callbacks struct { GetSnapshot func(ctx context.Context) (Snapshot, error) ApplyRemote func(ctx context.Context, s Snapshot) error BlobOpen func(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) } /*** Node ***/ type Node struct { cfg Config cbs Callbacks self Peer mu sync.RWMutex peers map[string]*Peer client *http.Client srv *http.Server stop chan struct{} wg sync.WaitGroup } // RemovePeer löscht einen Peer aus der Peer-Tabelle. Seeds werden standardmäßig nicht entfernt. func (n *Node) RemovePeer(url string) bool { n.mu.Lock() defer n.mu.Unlock() if url == "" || url == n.self.URL { return false } // Seeds schützen if n.isSeed(url) { return false } if _, ok := n.peers[url]; ok { delete(n.peers, url) return true } return false } func (n *Node) Config() Config { return n.cfg } // PruneNow entfernt alle Peers, deren LastSeen vor cutoff liegt (Seeds bleiben). func (n *Node) PruneNow(cutoff time.Time) int { n.mu.Lock() defer n.mu.Unlock() removed := 0 for url, p := range n.peers { if url == n.self.URL || n.isSeed(url) { continue } if p.LastSeen.IsZero() || p.LastSeen.Before(cutoff) { delete(n.peers, url) removed++ } } return removed } // StartPeerPruner startet den Hintergrundjob (stoppt automatisch bei n.stop). func (n *Node) StartPeerPruner() { go n.loopPrunePeers() } func (n *Node) loopPrunePeers() { ttl := n.cfg.PeerTTL if ttl <= 0 { ttl = 2 * time.Minute } interval := n.cfg.PruneInterval if interval <= 0 { interval = 30 * time.Second } t := time.NewTicker(interval) defer t.Stop() for { select { case <-n.stop: return case <-t.C: cutoff := time.Now().Add(-ttl) _ = n.PruneNow(cutoff) } } } // helper: ist url ein Seed? func (n *Node) isSeed(url string) bool { for _, s := range n.cfg.Seeds { if strings.TrimSpace(s) == strings.TrimSpace(url) { return true } } return false } func New(cfg Config, cbs Callbacks) (*Node, error) { if cfg.BindAddr == "" || cfg.AdvertURL == "" { return nil, errors.New("mesh: BindAddr and AdvertURL required") } if cfg.ClusterSecret == "" { return nil, errors.New("mesh: ClusterSecret required") } n := &Node{ cfg: cfg, cbs: cbs, self: Peer{URL: cfg.AdvertURL, LastSeen: time.Now(), Self: true}, peers: make(map[string]*Peer), client: &http.Client{ Timeout: 5 * time.Second, }, stop: make(chan struct{}), } return n, nil } /*** HMAC helpers ***/ func (n *Node) sign(b []byte) string { m := hmac.New(sha256.New, []byte(n.cfg.ClusterSecret)) m.Write(b) return hex.EncodeToString(m.Sum(nil)) } func (n *Node) verify(b []byte, sig string) bool { want := n.sign(b) return hmac.Equal([]byte(want), []byte(sig)) } /*** HTTP handlers (control plane) ***/ func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) if !n.verify(body, r.Header.Get("X-Mesh-Sig")) { http.Error(w, "bad signature", http.StatusUnauthorized) return } var req struct { URL string `json:"url"` } if err := json.Unmarshal(body, &req); err != nil || strings.TrimSpace(req.URL) == "" { http.Error(w, "bad json", http.StatusBadRequest) return } // Peer anlegen (falls neu) und LastSeen setzen n.mu.Lock() if req.URL != n.self.URL { if p, ok := n.peers[req.URL]; ok { p.LastSeen = time.Now() } else { cp := Peer{URL: req.URL, LastSeen: time.Now()} // weitere Felder wie bekannt n.peers[req.URL] = &cp } } n.mu.Unlock() w.WriteHeader(http.StatusOK) } func (n *Node) touchPeer(url string) { if strings.TrimSpace(url) == "" { return } n.mu.Lock() if p, ok := n.peers[url]; ok { p.LastSeen = time.Now() } n.mu.Unlock() } func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) { n.mu.RLock() defer n.mu.RUnlock() var list []Peer list = append(list, n.self) for _, p := range n.peers { //p.Self = false list = append(list, *p) } writeJSON(w, http.StatusOK, list) } func (n *Node) syncHandler(w http.ResponseWriter, r *http.Request) { // verify signature body, _ := io.ReadAll(r.Body) if !n.verify(body, r.Header.Get("X-Mesh-Sig")) { http.Error(w, "bad signature", http.StatusUnauthorized) return } var s Snapshot if err := json.Unmarshal(body, &s); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } // apply if err := n.cbs.ApplyRemote(r.Context(), s); err != nil { http.Error(w, "apply error: "+err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } func writeJSON(w http.ResponseWriter, code int, v any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) _ = json.NewEncoder(w).Encode(v) } /*** Serve ***/ func (n *Node) Serve() error { mux := http.NewServeMux() mux.HandleFunc("/mesh/peers", n.peersHandler) mux.HandleFunc("/mesh/hello", n.helloHandler) mux.HandleFunc("/mesh/blob", n.blobHandler) mux.HandleFunc("/mesh/sync", n.syncHandler) n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux} // background loops n.wg.Add(1) go func() { defer n.wg.Done() n.loopSeeder() }() n.wg.Add(1) go func() { defer n.wg.Done(); n.loopPeerExchange() }() if n.cfg.EnableDiscovery && n.cfg.DiscoveryAddress != "" { n.wg.Add(2) go func() { defer n.wg.Done() n.loopBeaconSend() }() go func() { defer n.wg.Done() n.loopBeaconRecv() }() } n.wg.Add(1) go func() { defer n.wg.Done() n.loopAntiEntropy() }() n.wg.Add(1) go func() { defer n.wg.Done() n.loopHello() }() // http server errc := make(chan error, 1) go func() { errc <- n.srv.ListenAndServe() }() select { case err := <-errc: return err case <-n.stop: return http.ErrServerClosed } } func (n *Node) Close(ctx context.Context) error { close(n.stop) if n.srv != nil { _ = n.srv.Shutdown(ctx) } n.wg.Wait() return nil } /*** Loops ***/ func (n *Node) loopPeerExchange() { t := time.NewTicker(30 * time.Second) defer t.Stop() for { select { case <-n.stop: return case <-t.C: } // Seeds abfragen for _, s := range n.cfg.Seeds { if strings.TrimSpace(s) == "" { continue } resp, err := n.client.Get(strings.TrimRight(s, "/") + "/mesh/peers") if err != nil { continue } var list []Peer if json.NewDecoder(resp.Body).Decode(&list) == nil { n.mu.Lock() for _, p := range list { if p.URL == "" || p.URL == n.self.URL { continue } if _, ok := n.peers[p.URL]; !ok { cp := p n.peers[p.URL] = &cp } } n.mu.Unlock() } resp.Body.Close() } } } func (n *Node) loopSeeder() { // attempt to hello known seeds every 5s at start, then every 30s backoff := 5 * time.Second for { select { case <-n.stop: return case <-time.After(backoff): } if len(n.cfg.Seeds) == 0 { backoff = 30 * time.Second continue } for _, s := range n.cfg.Seeds { if s == "" || s == n.self.URL { continue } _ = n.sendHello(s) } backoff = 30 * time.Second } } func (n *Node) loopAntiEntropy() { t := time.NewTicker(10 * time.Second) defer t.Stop() for { select { case <-n.stop: return case <-t.C: n.mu.RLock() targets := make([]string, 0, len(n.peers)) for url := range n.peers { targets = append(targets, url) } n.mu.RUnlock() if len(targets) == 0 { continue } ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) snap, err := n.cbs.GetSnapshot(ctx) cancel() if err != nil { continue } for _, url := range targets { _ = n.sendSync(url, snap) } } } } func (n *Node) loopBeaconSend() { addr, err := net.ResolveUDPAddr("udp", n.cfg.DiscoveryAddress) if err != nil { log.Printf("mesh beacon send resolve: %v", err) return } conn, err := net.DialUDP("udp", nil, addr) if err != nil { log.Printf("mesh beacon send dial: %v", err) return } defer conn.Close() type beacon struct { URL string `json:"url"` } t := time.NewTicker(5 * time.Second) defer t.Stop() for { select { case <-n.stop: return case <-t.C: b, _ := json.Marshal(beacon{URL: n.self.URL}) _, _ = conn.Write(b) } } } func (n *Node) loopBeaconRecv() { addr, err := net.ResolveUDPAddr("udp", n.cfg.DiscoveryAddress) if err != nil { log.Printf("mesh beacon recv resolve: %v", err) return } // enable multicast receive l, err := net.ListenMulticastUDP("udp", nil, addr) if err != nil { log.Printf("mesh beacon recv listen: %v", err) return } defer l.Close() _ = l.SetReadBuffer(1 << 16) buf := make([]byte, 2048) for { select { case <-n.stop: return default: } _ = l.SetDeadline(time.Now().Add(6 * time.Second)) nr, _, err := l.ReadFromUDP(buf) if err != nil { if ne, ok := err.(net.Error); ok && ne.Timeout() { continue } continue } var msg struct{ URL string } if err := json.Unmarshal(buf[:nr], &msg); err == nil { if msg.URL != "" && msg.URL != n.self.URL { _ = n.sendHello(msg.URL) } } } } /*** Outgoing ***/ func (n *Node) sendHello(url string) error { b, _ := json.Marshal(struct { URL string `json:"url"` }{URL: n.self.URL}) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b)) req.Header.Set("X-Mesh-Sig", n.sign(b)) resp, err := n.client.Do(req) if err != nil { return err } io.Copy(io.Discard, resp.Body) resp.Body.Close() if resp.StatusCode == http.StatusOK { n.touchPeer(url) // eigenen Blick auf den Peer direkt als „gesehen“ markieren return nil } return fmt.Errorf("hello %s: %s", url, resp.Status) } func (n *Node) loopHello() { interval := n.cfg.HelloInterval if interval <= 0 { interval = 20 * time.Second } fanout := n.cfg.HelloFanout if fanout <= 0 { fanout = 8 } t := time.NewTicker(interval) defer t.Stop() for { select { case <-n.stop: return case <-t.C: } // Liste der *bekannten* Peers (nicht nur Seeds) n.mu.RLock() targets := make([]string, 0, len(n.peers)) for url := range n.peers { if url != n.self.URL { targets = append(targets, url) } } n.mu.RUnlock() if len(targets) == 0 { continue } // zufällig mischen und auf Fanout begrenzen rand.Shuffle(len(targets), func(i, j int) { targets[i], targets[j] = targets[j], targets[i] }) if fanout < len(targets) { targets = targets[:fanout] } // leicht parallel pingen var wg sync.WaitGroup for _, u := range targets { u := u wg.Add(1) go func() { defer wg.Done() _ = n.sendHello(u) }() } wg.Wait() } } func (n *Node) sendSync(url string, s Snapshot) error { b, _ := json.Marshal(s) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b)) req.Header.Set("X-Mesh-Sig", n.sign(b)) ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) defer cancel() req = req.WithContext(ctx) resp, err := n.client.Do(req) if err != nil { return err } defer resp.Body.Close() io.Copy(io.Discard, resp.Body) if resp.StatusCode != http.StatusOK { return fmt.Errorf("sync %s: %s", url, resp.Status) } n.touchPeer(url) return nil } // PeerList liefert eine Kopie der bekannten Peers inkl. Self. func (n *Node) PeerList() []Peer { n.mu.RLock() defer n.mu.RUnlock() out := make([]Peer, 0, len(n.peers)+1) out = append(out, n.self) for _, p := range n.peers { cp := *p cp.Self = false out = append(out, cp) } return out } // SyncNow verschickt sofort den aktuellen Snapshot an alle bekannten Peers. func (n *Node) SyncNow(ctx context.Context) error { snap, err := n.cbs.GetSnapshot(ctx) if err != nil { return err } n.mu.RLock() targets := make([]string, 0, len(n.peers)) for url := range n.peers { targets = append(targets, url) } n.mu.RUnlock() for _, u := range targets { _ = n.sendSync(u, snap) } return nil } /*** Utilities ***/ // OwnerHint is a simple, optional mapping to distribute responsibility. func OwnerHint(id string, peers []string) int { if len(peers) == 0 { return 0 } h := crc32.ChecksumIEEE([]byte(id)) return int(h % uint32(len(peers))) } // Helpers to load from ENV quickly func FromEnv() Config { return Config{ BindAddr: getenvDefault("MESH_BIND", ":9090"), AdvertURL: os.Getenv("MESH_ADVERT"), Seeds: splitCSV(os.Getenv("MESH_SEEDS")), ClusterSecret: os.Getenv("MESH_CLUSTER_SECRET"), EnableDiscovery: os.Getenv("MESH_ENABLE_DISCOVERY") == "true", DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"), } } func splitCSV(s string) []string { if strings.TrimSpace(s) == "" { return nil } parts := strings.Split(s, ",") for i := range parts { parts[i] = strings.TrimSpace(parts[i]) } // dedup out := make([]string, 0, len(parts)) for _, p := range parts { if p == "" || slices.Contains(out, p) { continue } out = append(out, p) } return out } func getenvDefault(k, def string) string { v := os.Getenv(k) if v == "" { return def } return v } // POST /mesh/blob (Body: {"id":}) -> streamt den Blob func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) if !n.verify(body, r.Header.Get("X-Mesh-Sig")) { http.Error(w, "bad signature", http.StatusUnauthorized) return } var req struct { ID string `json:"id"` } if err := json.Unmarshal(body, &req); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } if n.cbs.BlobOpen == nil { http.Error(w, "blob unavailable", http.StatusNotFound) return } rc, name, ct, size, err := n.cbs.BlobOpen(r.Context(), req.ID) if err != nil { http.Error(w, "not found", http.StatusNotFound) return } defer rc.Close() if ct == "" { ct = "application/octet-stream" } if size > 0 { w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) } w.Header().Set("Content-Type", ct) w.Header().Set("X-Blob-Name", name) _, _ = io.Copy(w, rc) } // sendBlobRequest schickt eine signierte Anfrage an /mesh/blob und liefert die Response // zurück (Caller MUSS resp.Body schließen!). Bei HTTP 200 wird der Peer als gesehen markiert. func (n *Node) sendBlobRequest(url, id string) (*http.Response, error) { b, _ := json.Marshal(struct { ID string `json:"id"` }{ID: id}) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b)) req.Header.Set("X-Mesh-Sig", n.sign(b)) req.Header.Set("Content-Type", "application/json") // ❗ WICHTIG: kein kurzer Timeout. Optional: großer Timeout aus Config ctx := context.Background() if n.cfg.BlobTimeout > 0 { var cancel context.CancelFunc ctx, cancel = context.WithTimeout(context.Background(), n.cfg.BlobTimeout) defer cancel() } req = req.WithContext(ctx) resp, err := n.client.Do(req) if err != nil { return nil, err } if resp.StatusCode == http.StatusOK { n.touchPeer(url) // ausgehender Erfolg zählt als "gesehen" return resp, nil } io.Copy(io.Discard, resp.Body) resp.Body.Close() return nil, fmt.Errorf("blob %s from %s: %s", id, url, resp.Status) } // Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen func (n *Node) FetchBlobAny(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) { n.mu.RLock() targets := make([]string, 0, len(n.peers)) for url := range n.peers { targets = append(targets, url) } n.mu.RUnlock() if len(targets) == 0 { // Fallback: Seeds probieren targets = append(targets, n.cfg.Seeds...) } for _, u := range targets { if strings.TrimSpace(u) == "" || u == n.self.URL { continue } resp, err := n.sendBlobRequest(u, id) if err != nil { continue } if resp.StatusCode == http.StatusOK { name := resp.Header.Get("X-Blob-Name") ct := resp.Header.Get("Content-Type") var size int64 = -1 if cl := resp.Header.Get("Content-Length"); cl != "" { if s, err := strconv.ParseInt(cl, 10, 64); err == nil { size = s } } // Caller muss resp.Body schließen return resp.Body, name, ct, size, nil } io.Copy(io.Discard, resp.Body) resp.Body.Close() } return nil, "", "", 0, fmt.Errorf("blob %s not found on peers", id) }