package mesh import ( "bytes" "context" "crypto/hmac" "crypto/sha256" "encoding/hex" "encoding/json" "errors" "fmt" "hash/crc32" "io" "log" "net" "net/http" "os" "slices" "strconv" "strings" "sync" "time" ) /*** Types & Config ***/ type Config struct { BindAddr string // e.g. ":9090" AdvertURL string // e.g. "http://10.0.0.5:9090" Seeds []string // other peers' mesh base URLs ClusterSecret string // HMAC key EnableDiscovery bool DiscoveryAddress string // "239.8.8.8:9898" } type Peer struct { URL string `json:"url"` LastSeen time.Time `json:"lastSeen"` Self bool `json:"self"` OwnerHint int `json:"ownerHint"` // optional } type Item struct { ID string `json:"id"` Name string `json:"name"` UpdatedAt int64 `json:"updatedAt"` Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes } type Snapshot struct { Items []Item `json:"items"` } // Callbacks that your app provides type Callbacks struct { GetSnapshot func(ctx context.Context) (Snapshot, error) ApplyRemote func(ctx context.Context, s Snapshot) error BlobOpen func(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) } /*** Node ***/ type Node struct { cfg Config cbs Callbacks self Peer mu sync.RWMutex peers map[string]*Peer client *http.Client srv *http.Server stop chan struct{} wg sync.WaitGroup } func New(cfg Config, cbs Callbacks) (*Node, error) { if cfg.BindAddr == "" || cfg.AdvertURL == "" { return nil, errors.New("mesh: BindAddr and AdvertURL required") } if cfg.ClusterSecret == "" { return nil, errors.New("mesh: ClusterSecret required") } n := &Node{ cfg: cfg, cbs: cbs, self: Peer{URL: cfg.AdvertURL, LastSeen: time.Now(), Self: true}, peers: make(map[string]*Peer), client: &http.Client{ Timeout: 5 * time.Second, }, stop: make(chan struct{}), } return n, nil } /*** HMAC helpers ***/ func (n *Node) sign(b []byte) string { m := hmac.New(sha256.New, []byte(n.cfg.ClusterSecret)) m.Write(b) return hex.EncodeToString(m.Sum(nil)) } func (n *Node) verify(b []byte, sig string) bool { want := n.sign(b) return hmac.Equal([]byte(want), []byte(sig)) } /*** HTTP handlers (control plane) ***/ func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) if !n.verify(body, r.Header.Get("X-Mesh-Sig")) { http.Error(w, "bad signature", http.StatusUnauthorized) return } var p Peer if err := json.Unmarshal(body, &p); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } p.LastSeen = time.Now() n.mu.Lock() if existing, ok := n.peers[p.URL]; ok { existing.LastSeen = p.LastSeen } else if p.URL != n.self.URL { cp := p n.peers[p.URL] = &cp } n.mu.Unlock() w.WriteHeader(http.StatusNoContent) } func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) { n.mu.RLock() defer n.mu.RUnlock() var list []Peer list = append(list, n.self) for _, p := range n.peers { list = append(list, *p) } writeJSON(w, http.StatusOK, list) } func (n *Node) syncHandler(w http.ResponseWriter, r *http.Request) { // verify signature body, _ := io.ReadAll(r.Body) if !n.verify(body, r.Header.Get("X-Mesh-Sig")) { http.Error(w, "bad signature", http.StatusUnauthorized) return } var s Snapshot if err := json.Unmarshal(body, &s); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } // apply if err := n.cbs.ApplyRemote(r.Context(), s); err != nil { http.Error(w, "apply error: "+err.Error(), http.StatusInternalServerError) return } w.WriteHeader(http.StatusNoContent) } func writeJSON(w http.ResponseWriter, code int, v any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) _ = json.NewEncoder(w).Encode(v) } /*** Serve ***/ func (n *Node) Serve() error { mux := http.NewServeMux() mux.HandleFunc("/mesh/peers", n.peersHandler) mux.HandleFunc("/mesh/hello", n.helloHandler) mux.HandleFunc("/mesh/blob", n.blobHandler) mux.HandleFunc("/mesh/sync", n.syncHandler) n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux} // background loops n.wg.Add(1) go func() { defer n.wg.Done() n.loopSeeder() }() n.wg.Add(1) go func() { defer n.wg.Done(); n.loopPeerExchange() }() if n.cfg.EnableDiscovery && n.cfg.DiscoveryAddress != "" { n.wg.Add(2) go func() { defer n.wg.Done() n.loopBeaconSend() }() go func() { defer n.wg.Done() n.loopBeaconRecv() }() } n.wg.Add(1) go func() { defer n.wg.Done() n.loopAntiEntropy() }() // http server errc := make(chan error, 1) go func() { errc <- n.srv.ListenAndServe() }() select { case err := <-errc: return err case <-n.stop: return http.ErrServerClosed } } func (n *Node) Close(ctx context.Context) error { close(n.stop) if n.srv != nil { _ = n.srv.Shutdown(ctx) } n.wg.Wait() return nil } /*** Loops ***/ func (n *Node) loopPeerExchange() { t := time.NewTicker(30 * time.Second) defer t.Stop() for { select { case <-n.stop: return case <-t.C: } // Seeds abfragen for _, s := range n.cfg.Seeds { if strings.TrimSpace(s) == "" { continue } resp, err := n.client.Get(strings.TrimRight(s, "/") + "/mesh/peers") if err != nil { continue } var list []Peer if json.NewDecoder(resp.Body).Decode(&list) == nil { n.mu.Lock() for _, p := range list { if p.URL == "" || p.URL == n.self.URL { continue } if _, ok := n.peers[p.URL]; !ok { cp := p n.peers[p.URL] = &cp } } n.mu.Unlock() } resp.Body.Close() } } } func (n *Node) loopSeeder() { // attempt to hello known seeds every 5s at start, then every 30s backoff := 5 * time.Second for { select { case <-n.stop: return case <-time.After(backoff): } if len(n.cfg.Seeds) == 0 { backoff = 30 * time.Second continue } for _, s := range n.cfg.Seeds { if s == "" || s == n.self.URL { continue } _ = n.sendHello(s) } backoff = 30 * time.Second } } func (n *Node) loopAntiEntropy() { t := time.NewTicker(10 * time.Second) defer t.Stop() for { select { case <-n.stop: return case <-t.C: n.mu.RLock() targets := make([]string, 0, len(n.peers)) for url := range n.peers { targets = append(targets, url) } n.mu.RUnlock() if len(targets) == 0 { continue } ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) snap, err := n.cbs.GetSnapshot(ctx) cancel() if err != nil { continue } for _, url := range targets { _ = n.sendSync(url, snap) } } } } func (n *Node) loopBeaconSend() { addr, err := net.ResolveUDPAddr("udp", n.cfg.DiscoveryAddress) if err != nil { log.Printf("mesh beacon send resolve: %v", err) return } conn, err := net.DialUDP("udp", nil, addr) if err != nil { log.Printf("mesh beacon send dial: %v", err) return } defer conn.Close() type beacon struct { URL string `json:"url"` } t := time.NewTicker(5 * time.Second) defer t.Stop() for { select { case <-n.stop: return case <-t.C: b, _ := json.Marshal(beacon{URL: n.self.URL}) _, _ = conn.Write(b) } } } func (n *Node) loopBeaconRecv() { addr, err := net.ResolveUDPAddr("udp", n.cfg.DiscoveryAddress) if err != nil { log.Printf("mesh beacon recv resolve: %v", err) return } // enable multicast receive l, err := net.ListenMulticastUDP("udp", nil, addr) if err != nil { log.Printf("mesh beacon recv listen: %v", err) return } defer l.Close() _ = l.SetReadBuffer(1 << 16) buf := make([]byte, 2048) for { select { case <-n.stop: return default: } _ = l.SetDeadline(time.Now().Add(6 * time.Second)) nr, _, err := l.ReadFromUDP(buf) if err != nil { if ne, ok := err.(net.Error); ok && ne.Timeout() { continue } continue } var msg struct{ URL string } if err := json.Unmarshal(buf[:nr], &msg); err == nil { if msg.URL != "" && msg.URL != n.self.URL { _ = n.sendHello(msg.URL) } } } } /*** Outgoing ***/ func (n *Node) sendHello(url string) error { p := n.self b, _ := json.Marshal(p) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b)) req.Header.Set("X-Mesh-Sig", n.sign(b)) resp, err := n.client.Do(req) if err == nil { io.Copy(io.Discard, resp.Body) resp.Body.Close() } return err } func (n *Node) sendSync(url string, s Snapshot) error { b, _ := json.Marshal(s) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b)) req.Header.Set("X-Mesh-Sig", n.sign(b)) resp, err := n.client.Do(req) if err == nil { io.Copy(io.Discard, resp.Body) resp.Body.Close() } return err } // PeerList liefert eine Kopie der bekannten Peers inkl. Self. func (n *Node) PeerList() []Peer { n.mu.RLock() defer n.mu.RUnlock() out := make([]Peer, 0, len(n.peers)+1) out = append(out, n.self) for _, p := range n.peers { cp := *p out = append(out, cp) } return out } // SyncNow verschickt sofort den aktuellen Snapshot an alle bekannten Peers. func (n *Node) SyncNow(ctx context.Context) error { snap, err := n.cbs.GetSnapshot(ctx) if err != nil { return err } n.mu.RLock() targets := make([]string, 0, len(n.peers)) for url := range n.peers { targets = append(targets, url) } n.mu.RUnlock() for _, u := range targets { _ = n.sendSync(u, snap) } return nil } /*** Utilities ***/ // OwnerHint is a simple, optional mapping to distribute responsibility. func OwnerHint(id string, peers []string) int { if len(peers) == 0 { return 0 } h := crc32.ChecksumIEEE([]byte(id)) return int(h % uint32(len(peers))) } // Helpers to load from ENV quickly func FromEnv() Config { return Config{ BindAddr: getenvDefault("MESH_BIND", ":9090"), AdvertURL: os.Getenv("MESH_ADVERT"), Seeds: splitCSV(os.Getenv("MESH_SEEDS")), ClusterSecret: os.Getenv("MESH_CLUSTER_SECRET"), EnableDiscovery: os.Getenv("MESH_ENABLE_DISCOVERY") == "true", DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"), } } func splitCSV(s string) []string { if strings.TrimSpace(s) == "" { return nil } parts := strings.Split(s, ",") for i := range parts { parts[i] = strings.TrimSpace(parts[i]) } // dedup out := make([]string, 0, len(parts)) for _, p := range parts { if p == "" || slices.Contains(out, p) { continue } out = append(out, p) } return out } func getenvDefault(k, def string) string { v := os.Getenv(k) if v == "" { return def } return v } // POST /mesh/blob (Body: {"id":}) -> streamt den Blob func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) { body, _ := io.ReadAll(r.Body) if !n.verify(body, r.Header.Get("X-Mesh-Sig")) { http.Error(w, "bad signature", http.StatusUnauthorized) return } var req struct { ID string `json:"id"` } if err := json.Unmarshal(body, &req); err != nil { http.Error(w, "bad json", http.StatusBadRequest) return } if n.cbs.BlobOpen == nil { http.Error(w, "blob unavailable", http.StatusNotFound) return } rc, name, ct, size, err := n.cbs.BlobOpen(r.Context(), req.ID) if err != nil { http.Error(w, "not found", http.StatusNotFound) return } defer rc.Close() if ct == "" { ct = "application/octet-stream" } if size > 0 { w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) } w.Header().Set("Content-Type", ct) w.Header().Set("X-Blob-Name", name) _, _ = io.Copy(w, rc) } // interner Helper: signierter Blob-Request an einen Peer func (n *Node) sendBlobRequest(url string, id string) (*http.Response, error) { b, _ := json.Marshal(struct { ID string `json:"id"` }{ID: id}) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b)) req.Header.Set("X-Mesh-Sig", n.sign(b)) return n.client.Do(req) } // Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen func (n *Node) FetchBlobAny(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) { n.mu.RLock() targets := make([]string, 0, len(n.peers)) for url := range n.peers { targets = append(targets, url) } n.mu.RUnlock() if len(targets) == 0 { // Fallback: Seeds probieren targets = append(targets, n.cfg.Seeds...) } for _, u := range targets { if strings.TrimSpace(u) == "" || u == n.self.URL { continue } resp, err := n.sendBlobRequest(u, id) if err != nil { continue } if resp.StatusCode == http.StatusOK { name := resp.Header.Get("X-Blob-Name") ct := resp.Header.Get("Content-Type") var size int64 = -1 if cl := resp.Header.Get("Content-Length"); cl != "" { if s, err := strconv.ParseInt(cl, 10, 64); err == nil { size = s } } // Caller muss resp.Body schließen return resp.Body, name, ct, size, nil } io.Copy(io.Discard, resp.Body) resp.Body.Close() } return nil, "", "", 0, fmt.Errorf("blob %s not found on peers", id) }