This commit is contained in:
@@ -55,6 +55,9 @@ func loadConfig() AppConfig {
|
||||
m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute)
|
||||
m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second)
|
||||
|
||||
m.HelloInterval = parseDuration(os.Getenv("MESH_HELLO_INTERVAL"), 20*time.Second)
|
||||
m.HelloFanout = parseIntEnv(os.Getenv("MESH_HELLO_FANOUT"), 8)
|
||||
|
||||
// Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung:
|
||||
if strings.TrimSpace(m.AdvertURL) == "" {
|
||||
m.AdvertURL = inferAdvertURL(m.BindAddr)
|
||||
@@ -101,6 +104,18 @@ func parseBoolEnv(k string, def bool) bool {
|
||||
return v == "1" || v == "true" || v == "yes" || v == "on"
|
||||
}
|
||||
|
||||
func parseIntEnv(k string, def int) int {
|
||||
v := strings.TrimSpace(os.Getenv(k))
|
||||
if v == "" {
|
||||
return def
|
||||
}
|
||||
n, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return def
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func splitCSV(s string) []string {
|
||||
s = strings.TrimSpace(s)
|
||||
if s == "" {
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand/v2"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -33,6 +34,13 @@ type Config struct {
|
||||
DiscoveryAddress string // "239.8.8.8:9898"
|
||||
PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten)
|
||||
PruneInterval time.Duration // wie oft wird gepruned
|
||||
|
||||
SyncInterval time.Duration
|
||||
Fanout int
|
||||
|
||||
// NEU:
|
||||
HelloInterval time.Duration // wie oft pingen
|
||||
HelloFanout int // wie viele Peers pro Tick
|
||||
}
|
||||
|
||||
type Peer struct {
|
||||
@@ -190,21 +198,38 @@ func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) {
|
||||
http.Error(w, "bad signature", http.StatusUnauthorized)
|
||||
return
|
||||
}
|
||||
var p Peer
|
||||
if err := json.Unmarshal(body, &p); err != nil {
|
||||
var req struct {
|
||||
URL string `json:"url"`
|
||||
}
|
||||
if err := json.Unmarshal(body, &req); err != nil || strings.TrimSpace(req.URL) == "" {
|
||||
http.Error(w, "bad json", http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
p.LastSeen = time.Now()
|
||||
|
||||
// Peer anlegen (falls neu) und LastSeen setzen
|
||||
n.mu.Lock()
|
||||
if existing, ok := n.peers[p.URL]; ok {
|
||||
existing.LastSeen = p.LastSeen
|
||||
} else if p.URL != n.self.URL {
|
||||
cp := p
|
||||
n.peers[p.URL] = &cp
|
||||
if req.URL != n.self.URL {
|
||||
if p, ok := n.peers[req.URL]; ok {
|
||||
p.LastSeen = time.Now()
|
||||
} else {
|
||||
cp := Peer{URL: req.URL, LastSeen: time.Now()} // weitere Felder wie bekannt
|
||||
n.peers[req.URL] = &cp
|
||||
}
|
||||
}
|
||||
n.mu.Unlock()
|
||||
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (n *Node) touchPeer(url string) {
|
||||
if strings.TrimSpace(url) == "" {
|
||||
return
|
||||
}
|
||||
n.mu.Lock()
|
||||
if p, ok := n.peers[url]; ok {
|
||||
p.LastSeen = time.Now()
|
||||
}
|
||||
n.mu.Unlock()
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
}
|
||||
|
||||
func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -283,6 +308,12 @@ func (n *Node) Serve() error {
|
||||
n.loopAntiEntropy()
|
||||
}()
|
||||
|
||||
n.wg.Add(1)
|
||||
go func() {
|
||||
defer n.wg.Done()
|
||||
n.loopHello()
|
||||
}()
|
||||
|
||||
// http server
|
||||
errc := make(chan error, 1)
|
||||
go func() {
|
||||
@@ -467,28 +498,100 @@ func (n *Node) loopBeaconRecv() {
|
||||
/*** Outgoing ***/
|
||||
|
||||
func (n *Node) sendHello(url string) error {
|
||||
p := n.self
|
||||
b, _ := json.Marshal(p)
|
||||
b, _ := json.Marshal(struct {
|
||||
URL string `json:"url"`
|
||||
}{URL: n.self.URL})
|
||||
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b))
|
||||
req.Header.Set("X-Mesh-Sig", n.sign(b))
|
||||
|
||||
resp, err := n.client.Do(req)
|
||||
if err == nil {
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
n.touchPeer(url) // eigenen Blick auf den Peer direkt als „gesehen“ markieren
|
||||
return nil
|
||||
}
|
||||
return fmt.Errorf("hello %s: %s", url, resp.Status)
|
||||
}
|
||||
|
||||
func (n *Node) loopHello() {
|
||||
interval := n.cfg.HelloInterval
|
||||
if interval <= 0 {
|
||||
interval = 20 * time.Second
|
||||
}
|
||||
fanout := n.cfg.HelloFanout
|
||||
if fanout <= 0 {
|
||||
fanout = 8
|
||||
}
|
||||
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-n.stop:
|
||||
return
|
||||
case <-t.C:
|
||||
}
|
||||
|
||||
// Liste der *bekannten* Peers (nicht nur Seeds)
|
||||
n.mu.RLock()
|
||||
targets := make([]string, 0, len(n.peers))
|
||||
for url := range n.peers {
|
||||
if url != n.self.URL {
|
||||
targets = append(targets, url)
|
||||
}
|
||||
}
|
||||
n.mu.RUnlock()
|
||||
if len(targets) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
// zufällig mischen und auf Fanout begrenzen
|
||||
rand.Shuffle(len(targets), func(i, j int) { targets[i], targets[j] = targets[j], targets[i] })
|
||||
if fanout < len(targets) {
|
||||
targets = targets[:fanout]
|
||||
}
|
||||
|
||||
// leicht parallel pingen
|
||||
var wg sync.WaitGroup
|
||||
for _, u := range targets {
|
||||
u := u
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
_ = n.sendHello(u)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (n *Node) sendSync(url string, s Snapshot) error {
|
||||
b, _ := json.Marshal(s)
|
||||
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b))
|
||||
req.Header.Set("X-Mesh-Sig", n.sign(b))
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := n.client.Do(req)
|
||||
if err == nil {
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return err
|
||||
defer resp.Body.Close()
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("sync %s: %s", url, resp.Status)
|
||||
}
|
||||
|
||||
n.touchPeer(url)
|
||||
return nil
|
||||
}
|
||||
|
||||
// PeerList liefert eine Kopie der bekannten Peers inkl. Self.
|
||||
@@ -608,14 +711,37 @@ func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) {
|
||||
_, _ = io.Copy(w, rc)
|
||||
}
|
||||
|
||||
// interner Helper: signierter Blob-Request an einen Peer
|
||||
func (n *Node) sendBlobRequest(url string, id string) (*http.Response, error) {
|
||||
b, _ := json.Marshal(struct {
|
||||
// sendBlobRequest schickt eine signierte Anfrage an /mesh/blob und liefert die Response
|
||||
// zurück (Caller MUSS resp.Body schließen!). Bei HTTP 200 wird der Peer als gesehen markiert.
|
||||
func (n *Node) sendBlobRequest(url, id string) (*http.Response, error) {
|
||||
body, _ := json.Marshal(struct {
|
||||
ID string `json:"id"`
|
||||
}{ID: id})
|
||||
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b))
|
||||
req.Header.Set("X-Mesh-Sig", n.sign(b))
|
||||
return n.client.Do(req)
|
||||
|
||||
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(body))
|
||||
req.Header.Set("X-Mesh-Sig", n.sign(body))
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
|
||||
// kurzer Timeout für Blob-Requests (anpassbar)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
|
||||
defer cancel()
|
||||
req = req.WithContext(ctx)
|
||||
|
||||
resp, err := n.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// OK → Peer als "gesehen" markieren und Response (für Streaming) zurückgeben
|
||||
if resp.StatusCode == http.StatusOK {
|
||||
n.touchPeer(url)
|
||||
return resp, nil // Caller liest/streamt Body und schließt ihn
|
||||
}
|
||||
|
||||
// Nicht-OK → Body leeren/schließen und Fehler zurück
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("blob %s from %s: %s", id, url, resp.Status)
|
||||
}
|
||||
|
||||
// Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen
|
||||
|
||||
Reference in New Issue
Block a user