diff --git a/cmd/unified/main.go b/cmd/unified/main.go index 0b73fd6..e373e74 100644 --- a/cmd/unified/main.go +++ b/cmd/unified/main.go @@ -55,6 +55,9 @@ func loadConfig() AppConfig { m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute) m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second) + m.HelloInterval = parseDuration(os.Getenv("MESH_HELLO_INTERVAL"), 20*time.Second) + m.HelloFanout = parseIntEnv(os.Getenv("MESH_HELLO_FANOUT"), 8) + // Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung: if strings.TrimSpace(m.AdvertURL) == "" { m.AdvertURL = inferAdvertURL(m.BindAddr) @@ -101,6 +104,18 @@ func parseBoolEnv(k string, def bool) bool { return v == "1" || v == "true" || v == "yes" || v == "on" } +func parseIntEnv(k string, def int) int { + v := strings.TrimSpace(os.Getenv(k)) + if v == "" { + return def + } + n, err := strconv.Atoi(v) + if err != nil { + return def + } + return n +} + func splitCSV(s string) []string { s = strings.TrimSpace(s) if s == "" { diff --git a/internal/mesh/mesh.go b/internal/mesh/mesh.go index ee28e17..7b03fb9 100644 --- a/internal/mesh/mesh.go +++ b/internal/mesh/mesh.go @@ -12,6 +12,7 @@ import ( "hash/crc32" "io" "log" + "math/rand/v2" "net" "net/http" "os" @@ -33,6 +34,13 @@ type Config struct { DiscoveryAddress string // "239.8.8.8:9898" PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten) PruneInterval time.Duration // wie oft wird gepruned + + SyncInterval time.Duration + Fanout int + + // NEU: + HelloInterval time.Duration // wie oft pingen + HelloFanout int // wie viele Peers pro Tick } type Peer struct { @@ -190,21 +198,38 @@ func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) { http.Error(w, "bad signature", http.StatusUnauthorized) return } - var p Peer - if err := json.Unmarshal(body, &p); err != nil { + var req struct { + URL string `json:"url"` + } + if err := json.Unmarshal(body, &req); err != nil || strings.TrimSpace(req.URL) == "" { http.Error(w, "bad json", http.StatusBadRequest) return } - p.LastSeen = time.Now() + + // Peer anlegen (falls neu) und LastSeen setzen n.mu.Lock() - if existing, ok := n.peers[p.URL]; ok { - existing.LastSeen = p.LastSeen - } else if p.URL != n.self.URL { - cp := p - n.peers[p.URL] = &cp + if req.URL != n.self.URL { + if p, ok := n.peers[req.URL]; ok { + p.LastSeen = time.Now() + } else { + cp := Peer{URL: req.URL, LastSeen: time.Now()} // weitere Felder wie bekannt + n.peers[req.URL] = &cp + } + } + n.mu.Unlock() + + w.WriteHeader(http.StatusOK) +} + +func (n *Node) touchPeer(url string) { + if strings.TrimSpace(url) == "" { + return + } + n.mu.Lock() + if p, ok := n.peers[url]; ok { + p.LastSeen = time.Now() } n.mu.Unlock() - w.WriteHeader(http.StatusNoContent) } func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) { @@ -283,6 +308,12 @@ func (n *Node) Serve() error { n.loopAntiEntropy() }() + n.wg.Add(1) + go func() { + defer n.wg.Done() + n.loopHello() + }() + // http server errc := make(chan error, 1) go func() { @@ -467,28 +498,100 @@ func (n *Node) loopBeaconRecv() { /*** Outgoing ***/ func (n *Node) sendHello(url string) error { - p := n.self - b, _ := json.Marshal(p) + b, _ := json.Marshal(struct { + URL string `json:"url"` + }{URL: n.self.URL}) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b)) req.Header.Set("X-Mesh-Sig", n.sign(b)) + resp, err := n.client.Do(req) - if err == nil { - io.Copy(io.Discard, resp.Body) - resp.Body.Close() + if err != nil { + return err + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + n.touchPeer(url) // eigenen Blick auf den Peer direkt als „gesehen“ markieren + return nil + } + return fmt.Errorf("hello %s: %s", url, resp.Status) +} + +func (n *Node) loopHello() { + interval := n.cfg.HelloInterval + if interval <= 0 { + interval = 20 * time.Second + } + fanout := n.cfg.HelloFanout + if fanout <= 0 { + fanout = 8 + } + + t := time.NewTicker(interval) + defer t.Stop() + + for { + select { + case <-n.stop: + return + case <-t.C: + } + + // Liste der *bekannten* Peers (nicht nur Seeds) + n.mu.RLock() + targets := make([]string, 0, len(n.peers)) + for url := range n.peers { + if url != n.self.URL { + targets = append(targets, url) + } + } + n.mu.RUnlock() + if len(targets) == 0 { + continue + } + + // zufällig mischen und auf Fanout begrenzen + rand.Shuffle(len(targets), func(i, j int) { targets[i], targets[j] = targets[j], targets[i] }) + if fanout < len(targets) { + targets = targets[:fanout] + } + + // leicht parallel pingen + var wg sync.WaitGroup + for _, u := range targets { + u := u + wg.Add(1) + go func() { + defer wg.Done() + _ = n.sendHello(u) + }() + } + wg.Wait() } - return err } func (n *Node) sendSync(url string, s Snapshot) error { b, _ := json.Marshal(s) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b)) req.Header.Set("X-Mesh-Sig", n.sign(b)) + + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + defer cancel() + req = req.WithContext(ctx) + resp, err := n.client.Do(req) - if err == nil { - io.Copy(io.Discard, resp.Body) - resp.Body.Close() + if err != nil { + return err } - return err + defer resp.Body.Close() + io.Copy(io.Discard, resp.Body) + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("sync %s: %s", url, resp.Status) + } + + n.touchPeer(url) + return nil } // PeerList liefert eine Kopie der bekannten Peers inkl. Self. @@ -608,14 +711,37 @@ func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) { _, _ = io.Copy(w, rc) } -// interner Helper: signierter Blob-Request an einen Peer -func (n *Node) sendBlobRequest(url string, id string) (*http.Response, error) { - b, _ := json.Marshal(struct { +// sendBlobRequest schickt eine signierte Anfrage an /mesh/blob und liefert die Response +// zurück (Caller MUSS resp.Body schließen!). Bei HTTP 200 wird der Peer als gesehen markiert. +func (n *Node) sendBlobRequest(url, id string) (*http.Response, error) { + body, _ := json.Marshal(struct { ID string `json:"id"` }{ID: id}) - req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b)) - req.Header.Set("X-Mesh-Sig", n.sign(b)) - return n.client.Do(req) + + req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(body)) + req.Header.Set("X-Mesh-Sig", n.sign(body)) + req.Header.Set("Content-Type", "application/json") + + // kurzer Timeout für Blob-Requests (anpassbar) + ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second) + defer cancel() + req = req.WithContext(ctx) + + resp, err := n.client.Do(req) + if err != nil { + return nil, err + } + + // OK → Peer als "gesehen" markieren und Response (für Streaming) zurückgeben + if resp.StatusCode == http.StatusOK { + n.touchPeer(url) + return resp, nil // Caller liest/streamt Body und schließt ihn + } + + // Nicht-OK → Body leeren/schließen und Fehler zurück + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + return nil, fmt.Errorf("blob %s from %s: %s", id, url, resp.Status) } // Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen