Compare commits

..

5 Commits

Author SHA1 Message Date
42b0493adb fix-2
All checks were successful
release-tag / release-image (push) Successful in 1m26s
2025-09-30 00:34:24 +02:00
b3c1f37632 fix
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:19:39 +02:00
40ded4d4db Bugfix: cache failed
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:09:57 +02:00
291cfa33a9 Persistenz implementiert : Test-3
All checks were successful
release-tag / release-image (push) Successful in 1m27s
2025-09-29 23:55:35 +02:00
74fef30251 Test-2
All checks were successful
release-tag / release-image (push) Successful in 1m31s
2025-09-29 23:44:08 +02:00
3 changed files with 280 additions and 50 deletions

View File

@@ -10,6 +10,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@@ -55,6 +56,11 @@ func loadConfig() AppConfig {
m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute) m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute)
m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second) m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second)
m.HelloInterval = parseDuration(os.Getenv("MESH_HELLO_INTERVAL"), 20*time.Second)
m.HelloFanout = parseIntEnv(os.Getenv("MESH_HELLO_FANOUT"), 8)
m.BlobTimeout = parseDuration(os.Getenv("MESH_BLOB_TIMEOUT"), 0)
// Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung: // Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung:
if strings.TrimSpace(m.AdvertURL) == "" { if strings.TrimSpace(m.AdvertURL) == "" {
m.AdvertURL = inferAdvertURL(m.BindAddr) m.AdvertURL = inferAdvertURL(m.BindAddr)
@@ -101,6 +107,18 @@ func parseBoolEnv(k string, def bool) bool {
return v == "1" || v == "true" || v == "yes" || v == "on" return v == "1" || v == "true" || v == "yes" || v == "on"
} }
func parseIntEnv(k string, def int) int {
v := strings.TrimSpace(os.Getenv(k))
if v == "" {
return def
}
n, err := strconv.Atoi(v)
if err != nil {
return def
}
return n
}
func splitCSV(s string) []string { func splitCSV(s string) []string {
s = strings.TrimSpace(s) s = strings.TrimSpace(s)
if s == "" { if s == "" {
@@ -465,11 +483,14 @@ func main() {
// Domain-Store (mesh-fähig) // Domain-Store (mesh-fähig)
nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL) nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL)
st := filesvc.NewMemStore(nodeID) //st := filesvc.NewMemStore(nodeID)
// Mesh starten // Mesh starten
//mcfg := mesh.FromEnv() //mcfg := mesh.FromEnv()
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data")) blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
dataDir := getenvDefault("DATA_DIR", "./data")
metaPath := filepath.Join(dataDir, "meta", "items.json")
st := filesvc.NewMemStorePersistent(nodeID, metaPath)
mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{ mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) { GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
@@ -620,36 +641,28 @@ func registerPublicDownloads(mux *http.ServeMux, store filesvc.MeshStore, blobs
base = "/" + base base = "/" + base
} }
mux.HandleFunc(base+"/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc(base+"/", func(w http.ResponseWriter, r *http.Request) {
idStr := strings.TrimPrefix(r.URL.Path, base+"/") id := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, base+"/"))
if idStr == "" { if id == "" {
http.NotFound(w, r)
return
}
id := idStr
if strings.TrimSpace(id) == "" {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
// 1) Metadaten prüfen (nicht vorhanden oder gelöscht → 404) // 1) Metadaten prüfen
it, err := store.Get(r.Context(), filesvc.ID(id)) it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted { if err != nil || it.Deleted {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
// 2) lokal versuchen // 2) Lokal versuchen
if rc, meta, err := blobs.Open(r.Context(), id); err == nil { if rc, meta, err := blobs.Open(context.Background(), id); err == nil {
defer rc.Close() defer rc.Close()
serveBlob(w, r, rc, meta, it.Name) // Download-Name aus Store! serveBlob(w, r, rc, meta, it.Name)
return
} else { // ########## HIER EVENTUELL PROBLEM!!! ###########
// Lokal nicht vorhanden → nur aus Mesh ziehen, wenn Owner aktiv ist
it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted {
http.NotFound(w, r)
return return
} }
// (Optional) Owner-Online-Check — wenn du auch bei offline Ownern liefern willst, block auskommentieren
{
peers := meshNode.PeerList() peers := meshNode.PeerList()
ttl := 2 * time.Minute ttl := 2 * time.Minute
if !isOwnerActive(it.Owner, peers, ttl) { if !isOwnerActive(it.Owner, peers, ttl) {
@@ -658,26 +671,34 @@ func registerPublicDownloads(mux *http.ServeMux, store filesvc.MeshStore, blobs
} }
} }
// 3) aus Mesh holen (signiert) und cachen // 3) Aus Mesh holen — EIGENER Timeout-Kontext, NICHT r.Context()
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id) rrc, remoteName, _, _, err := meshNode.FetchBlobAny(context.Background(), id)
if err != nil { if err != nil {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
defer rrc.Close() defer rrc.Close()
if _, err := blobs.Save(r.Context(), id, it.Name, rrc); err != nil {
http.Error(w, "cache failed", http.StatusInternalServerError) filename := strings.TrimSpace(remoteName)
if filename == "" {
filename = it.Name
}
// 4) Lokal cachen — KEIN Request-Kontext, damit Save nicht abbricht
if _, err := blobs.Save(context.Background(), id, filename, rrc); err != nil {
log.Printf("[public] cache save failed id=%s name=%q: %v", id, filename, err)
http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError)
return return
} }
// 4) erneut lokal öffnen und streamen // 5) Erneut lokal öffnen und streamen
lrc, meta, err := blobs.Open(r.Context(), id) lrc, meta, err := blobs.Open(context.Background(), id)
if err != nil { if err != nil {
http.Error(w, "open failed", http.StatusInternalServerError) http.Error(w, "open failed: "+err.Error(), http.StatusInternalServerError)
return return
} }
defer lrc.Close() defer lrc.Close()
serveBlob(w, r, lrc, meta, it.Name) serveBlob(w, r, lrc, meta, filename)
}) })
} }

View File

@@ -2,6 +2,9 @@ package filesvc
import ( import (
"context" "context"
"encoding/json"
"os"
"path/filepath"
"slices" "slices"
"strings" "strings"
"sync" "sync"
@@ -14,12 +17,81 @@ type MemStore struct {
self string self string
// optionales Eventing // optionales Eventing
subs []chan ChangeEvent subs []chan ChangeEvent
persistPath string
} }
func NewMemStore(self string) *MemStore { func NewMemStore(self string) *MemStore {
return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)} return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)}
} }
func NewMemStorePersistent(self, path string) *MemStore {
m := NewMemStore(self)
m.persistPath = strings.TrimSpace(path)
// beim Start versuchen zu laden
_ = m.loadFromDisk()
return m
}
// --- Persistenz-Helper (NEU) ---
func (m *MemStore) loadFromDisk() error {
if m.persistPath == "" {
return nil
}
f, err := os.Open(m.persistPath)
if err != nil {
return nil // Datei existiert beim ersten Start nicht ok
}
defer f.Close()
var snap Snapshot
if err := json.NewDecoder(f).Decode(&snap); err != nil {
return err
}
m.mu.Lock()
for _, it := range snap.Items {
m.items[it.ID] = it
}
m.mu.Unlock()
return nil
}
func (m *MemStore) saveLocked() error {
if m.persistPath == "" {
return nil
}
if err := os.MkdirAll(filepath.Dir(m.persistPath), 0o755); err != nil {
return err
}
// Snapshot aus Map bauen
snap := Snapshot{Items: make([]File, 0, len(m.items))}
for _, it := range m.items {
snap.Items = append(snap.Items, it)
}
// atomar schreiben
tmp := m.persistPath + ".tmp"
f, err := os.Create(tmp)
if err != nil {
return err
}
enc := json.NewEncoder(f)
enc.SetIndent("", " ")
if err := enc.Encode(&snap); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Sync(); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return err
}
return os.Rename(tmp, m.persistPath)
}
/*** Store ***/ /*** Store ***/
func (m *MemStore) Get(_ context.Context, id ID) (File, error) { func (m *MemStore) Get(_ context.Context, id ID) (File, error) {
@@ -102,6 +174,7 @@ func (m *MemStore) Create(_ context.Context, name string) (File, error) {
} }
it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self} it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self}
m.items[it.ID] = it m.items[it.ID] = it
_ = m.saveLocked()
m.emit(it) m.emit(it)
return it, nil return it, nil
} }
@@ -124,6 +197,7 @@ func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error
it.Name = strings.TrimSpace(newName) it.Name = strings.TrimSpace(newName)
it.UpdatedAt = time.Now().UnixNano() it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it m.items[id] = it
_ = m.saveLocked()
m.emit(it) m.emit(it)
return it, nil return it, nil
} }
@@ -144,6 +218,7 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) {
it.Deleted = true it.Deleted = true
it.UpdatedAt = time.Now().UnixNano() it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it m.items[id] = it
_ = m.saveLocked()
m.emit(it) m.emit(it)
return it, nil return it, nil
} }
@@ -169,6 +244,7 @@ func (m *MemStore) TakeoverOwner(_ context.Context, id ID, newOwner string) (Fil
it.Owner = newOwner it.Owner = newOwner
it.UpdatedAt = time.Now().UnixNano() it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it m.items[id] = it
_ = m.saveLocked()
m.emitLocked(it) m.emitLocked(it)
return it, nil return it, nil
} }
@@ -188,6 +264,7 @@ func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error { func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
changed := false
for _, ri := range s.Items { for _, ri := range s.Items {
li, ok := m.items[ri.ID] li, ok := m.items[ri.ID]
if !ok || ri.UpdatedAt > li.UpdatedAt { if !ok || ri.UpdatedAt > li.UpdatedAt {
@@ -196,9 +273,13 @@ func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
ri.Owner = li.Owner ri.Owner = li.Owner
} }
m.items[ri.ID] = ri m.items[ri.ID] = ri
changed = true
m.emitLocked(ri) m.emitLocked(ri)
} }
} }
if changed {
_ = m.saveLocked() // ← NEU
}
return nil return nil
} }

View File

@@ -12,6 +12,7 @@ import (
"hash/crc32" "hash/crc32"
"io" "io"
"log" "log"
"math/rand/v2"
"net" "net"
"net/http" "net/http"
"os" "os"
@@ -33,6 +34,15 @@ type Config struct {
DiscoveryAddress string // "239.8.8.8:9898" DiscoveryAddress string // "239.8.8.8:9898"
PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten) PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten)
PruneInterval time.Duration // wie oft wird gepruned PruneInterval time.Duration // wie oft wird gepruned
SyncInterval time.Duration
Fanout int
// NEU:
HelloInterval time.Duration // wie oft pingen
HelloFanout int // wie viele Peers pro Tick
BlobTimeout time.Duration
} }
type Peer struct { type Peer struct {
@@ -190,21 +200,38 @@ func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "bad signature", http.StatusUnauthorized) http.Error(w, "bad signature", http.StatusUnauthorized)
return return
} }
var p Peer var req struct {
if err := json.Unmarshal(body, &p); err != nil { URL string `json:"url"`
}
if err := json.Unmarshal(body, &req); err != nil || strings.TrimSpace(req.URL) == "" {
http.Error(w, "bad json", http.StatusBadRequest) http.Error(w, "bad json", http.StatusBadRequest)
return return
} }
p.LastSeen = time.Now()
// Peer anlegen (falls neu) und LastSeen setzen
n.mu.Lock() n.mu.Lock()
if existing, ok := n.peers[p.URL]; ok { if req.URL != n.self.URL {
existing.LastSeen = p.LastSeen if p, ok := n.peers[req.URL]; ok {
} else if p.URL != n.self.URL { p.LastSeen = time.Now()
cp := p } else {
n.peers[p.URL] = &cp cp := Peer{URL: req.URL, LastSeen: time.Now()} // weitere Felder wie bekannt
n.peers[req.URL] = &cp
}
}
n.mu.Unlock()
w.WriteHeader(http.StatusOK)
}
func (n *Node) touchPeer(url string) {
if strings.TrimSpace(url) == "" {
return
}
n.mu.Lock()
if p, ok := n.peers[url]; ok {
p.LastSeen = time.Now()
} }
n.mu.Unlock() n.mu.Unlock()
w.WriteHeader(http.StatusNoContent)
} }
func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) { func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) {
@@ -283,6 +310,12 @@ func (n *Node) Serve() error {
n.loopAntiEntropy() n.loopAntiEntropy()
}() }()
n.wg.Add(1)
go func() {
defer n.wg.Done()
n.loopHello()
}()
// http server // http server
errc := make(chan error, 1) errc := make(chan error, 1)
go func() { go func() {
@@ -467,28 +500,100 @@ func (n *Node) loopBeaconRecv() {
/*** Outgoing ***/ /*** Outgoing ***/
func (n *Node) sendHello(url string) error { func (n *Node) sendHello(url string) error {
p := n.self b, _ := json.Marshal(struct {
b, _ := json.Marshal(p) URL string `json:"url"`
}{URL: n.self.URL})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b)) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b)) req.Header.Set("X-Mesh-Sig", n.sign(b))
resp, err := n.client.Do(req) resp, err := n.client.Do(req)
if err == nil { if err != nil {
return err
}
io.Copy(io.Discard, resp.Body) io.Copy(io.Discard, resp.Body)
resp.Body.Close() resp.Body.Close()
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // eigenen Blick auf den Peer direkt als „gesehen“ markieren
return nil
}
return fmt.Errorf("hello %s: %s", url, resp.Status)
}
func (n *Node) loopHello() {
interval := n.cfg.HelloInterval
if interval <= 0 {
interval = 20 * time.Second
}
fanout := n.cfg.HelloFanout
if fanout <= 0 {
fanout = 8
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
}
// Liste der *bekannten* Peers (nicht nur Seeds)
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
if url != n.self.URL {
targets = append(targets, url)
}
}
n.mu.RUnlock()
if len(targets) == 0 {
continue
}
// zufällig mischen und auf Fanout begrenzen
rand.Shuffle(len(targets), func(i, j int) { targets[i], targets[j] = targets[j], targets[i] })
if fanout < len(targets) {
targets = targets[:fanout]
}
// leicht parallel pingen
var wg sync.WaitGroup
for _, u := range targets {
u := u
wg.Add(1)
go func() {
defer wg.Done()
_ = n.sendHello(u)
}()
}
wg.Wait()
} }
return err
} }
func (n *Node) sendSync(url string, s Snapshot) error { func (n *Node) sendSync(url string, s Snapshot) error {
b, _ := json.Marshal(s) b, _ := json.Marshal(s)
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b)) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b)) req.Header.Set("X-Mesh-Sig", n.sign(b))
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
req = req.WithContext(ctx)
resp, err := n.client.Do(req) resp, err := n.client.Do(req)
if err == nil { if err != nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
return err return err
}
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("sync %s: %s", url, resp.Status)
}
n.touchPeer(url)
return nil
} }
// PeerList liefert eine Kopie der bekannten Peers inkl. Self. // PeerList liefert eine Kopie der bekannten Peers inkl. Self.
@@ -608,14 +713,37 @@ func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) {
_, _ = io.Copy(w, rc) _, _ = io.Copy(w, rc)
} }
// interner Helper: signierter Blob-Request an einen Peer // sendBlobRequest schickt eine signierte Anfrage an /mesh/blob und liefert die Response
func (n *Node) sendBlobRequest(url string, id string) (*http.Response, error) { // zurück (Caller MUSS resp.Body schließen!). Bei HTTP 200 wird der Peer als gesehen markiert.
func (n *Node) sendBlobRequest(url, id string) (*http.Response, error) {
b, _ := json.Marshal(struct { b, _ := json.Marshal(struct {
ID string `json:"id"` ID string `json:"id"`
}{ID: id}) }{ID: id})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b)) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b)) req.Header.Set("X-Mesh-Sig", n.sign(b))
return n.client.Do(req) req.Header.Set("Content-Type", "application/json")
// ❗ WICHTIG: kein kurzer Timeout. Optional: großer Timeout aus Config
ctx := context.Background()
if n.cfg.BlobTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), n.cfg.BlobTimeout)
defer cancel()
}
req = req.WithContext(ctx)
resp, err := n.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // ausgehender Erfolg zählt als "gesehen"
return resp, nil
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
return nil, fmt.Errorf("blob %s from %s: %s", id, url, resp.Status)
} }
// Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen // Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen