Compare commits

18 Commits
aio ... main

Author SHA1 Message Date
42b0493adb fix-2
All checks were successful
release-tag / release-image (push) Successful in 1m26s
2025-09-30 00:34:24 +02:00
b3c1f37632 fix
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:19:39 +02:00
40ded4d4db Bugfix: cache failed
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:09:57 +02:00
291cfa33a9 Persistenz implementiert : Test-3
All checks were successful
release-tag / release-image (push) Successful in 1m27s
2025-09-29 23:55:35 +02:00
74fef30251 Test-2
All checks were successful
release-tag / release-image (push) Successful in 1m31s
2025-09-29 23:44:08 +02:00
b7729e8b39 Test-1
All checks were successful
release-tag / release-image (push) Successful in 1m52s
2025-09-29 23:08:34 +02:00
9122ebf2f1 vor temaplte 2025-09-29 23:04:46 +02:00
36d1c1512a Node autocleanup
All checks were successful
release-tag / release-image (push) Successful in 1m32s
2025-09-29 21:19:07 +02:00
aaa95410e4 Template Anpassung
All checks were successful
release-tag / release-image (push) Successful in 1m26s
2025-09-28 00:00:09 +02:00
0bf22b6120 fix self
All checks were successful
release-tag / release-image (push) Successful in 1m25s
2025-09-27 23:53:17 +02:00
b142a0b1a5 layout max to 1200
All checks were successful
release-tag / release-image (push) Successful in 1m33s
2025-09-27 23:42:35 +02:00
baedff9e9d Umstellung auf uuid
All checks were successful
release-tag / release-image (push) Successful in 1m32s
2025-09-27 21:42:32 +02:00
92e222f648 buxfix-2
All checks were successful
release-tag / release-image (push) Successful in 1m26s
2025-09-27 17:52:10 +02:00
43f1d01a8a diverse bugfixes
All checks were successful
release-tag / release-image (push) Successful in 1m32s
2025-09-27 17:39:39 +02:00
d125e3dd54 findnodes
All checks were successful
release-tag / release-image (push) Successful in 1m28s
2025-09-27 16:25:13 +02:00
6a0dc578ea fix
All checks were successful
release-tag / release-image (push) Successful in 1m27s
2025-09-27 16:00:53 +02:00
8c05ad6ffe update auf public download
All checks were successful
release-tag / release-image (push) Successful in 1m40s
2025-09-27 15:55:16 +02:00
7d0f4befe1 update
All checks were successful
release-tag / release-image (push) Successful in 1m28s
2025-09-27 14:16:15 +02:00
9 changed files with 1049 additions and 156 deletions

View File

@@ -10,6 +10,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@@ -22,6 +23,14 @@ import (
"git.send.nrw/sendnrw/decent-webui/internal/mesh" "git.send.nrw/sendnrw/decent-webui/internal/mesh"
) )
func parseDuration(s string, def time.Duration) time.Duration {
d, err := time.ParseDuration(strings.TrimSpace(s))
if err != nil || d <= 0 {
return def
}
return d
}
/*** Config ***/ /*** Config ***/
func loadConfig() AppConfig { func loadConfig() AppConfig {
// HTTP // HTTP
@@ -44,6 +53,14 @@ func loadConfig() AppConfig {
DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"), DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"),
} }
m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute)
m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second)
m.HelloInterval = parseDuration(os.Getenv("MESH_HELLO_INTERVAL"), 20*time.Second)
m.HelloFanout = parseIntEnv(os.Getenv("MESH_HELLO_FANOUT"), 8)
m.BlobTimeout = parseDuration(os.Getenv("MESH_BLOB_TIMEOUT"), 0)
// Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung: // Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung:
if strings.TrimSpace(m.AdvertURL) == "" { if strings.TrimSpace(m.AdvertURL) == "" {
m.AdvertURL = inferAdvertURL(m.BindAddr) m.AdvertURL = inferAdvertURL(m.BindAddr)
@@ -62,11 +79,13 @@ func loadConfig() AppConfig {
} }
return AppConfig{ return AppConfig{
HTTPAddr: httpAddr, HTTPAddr: httpAddr,
APIKey: apiKey, APIKey: apiKey,
AdminUser: adminUser, AdminUser: adminUser,
AdminPass: adminPass, AdminPass: adminPass,
Mesh: m, Mesh: m,
PublicDownloads: parseBoolEnv("PUBLIC_DOWNLOADS", false),
PublicPath: getenvDefault("PUBLIC_DOWNLOAD_PATH", "/dl"),
} }
} }
@@ -88,6 +107,18 @@ func parseBoolEnv(k string, def bool) bool {
return v == "1" || v == "true" || v == "yes" || v == "on" return v == "1" || v == "true" || v == "yes" || v == "on"
} }
func parseIntEnv(k string, def int) int {
v := strings.TrimSpace(os.Getenv(k))
if v == "" {
return def
}
n, err := strconv.Atoi(v)
if err != nil {
return def
}
return n
}
func splitCSV(s string) []string { func splitCSV(s string) []string {
s = strings.TrimSpace(s) s = strings.TrimSpace(s)
if s == "" { if s == "" {
@@ -137,6 +168,9 @@ type AppConfig struct {
AdminUser string AdminUser string
AdminPass string AdminPass string
Mesh mesh.Config Mesh mesh.Config
PublicDownloads bool // ENV: PUBLIC_DOWNLOADS (default false)
PublicPath string // ENV: PUBLIC_DOWNLOAD_PATH (default "/dl")
} }
/*** Middleware ***/ /*** Middleware ***/
@@ -188,7 +222,7 @@ func writeJSON(w http.ResponseWriter, code int, v any) {
/*** API-Routen ***/ /*** API-Routen ***/
func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore) { func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store) {
// Health // Health
mux.HandleFunc("/api/v1/health", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/api/v1/health", func(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
@@ -198,13 +232,8 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore) {
mux.HandleFunc("/api/v1/items", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/api/v1/items", func(w http.ResponseWriter, r *http.Request) {
switch r.Method { switch r.Method {
case http.MethodGet: case http.MethodGet:
nextStr := r.URL.Query().Get("next") nextStr := strings.TrimSpace(r.URL.Query().Get("next"))
var next filesvc.ID next := filesvc.ID(nextStr)
if nextStr != "" {
if n, err := strconv.ParseInt(nextStr, 10, 64); err == nil {
next = filesvc.ID(n)
}
}
items, nextOut, err := store.List(r.Context(), next, 100) items, nextOut, err := store.List(r.Context(), next, 100)
if err != nil { if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
@@ -246,6 +275,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore) {
} }
it, err := store.Rename(r.Context(), in.ID, in.Name) it, err := store.Rename(r.Context(), in.ID, in.Name)
if err != nil { if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) { if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound status = http.StatusNotFound
@@ -270,7 +303,12 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore) {
return return
} }
it, err := store.Delete(r.Context(), in.ID) it, err := store.Delete(r.Context(), in.ID)
_ = blobs.Delete(r.Context(), string(in.ID))
if err != nil { if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) { if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound status = http.StatusNotFound
@@ -310,7 +348,7 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return return
} }
meta, err := blobs.Save(r.Context(), int64(it.ID), name, fh) meta, err := blobs.Save(r.Context(), string(it.ID), name, fh)
if err != nil { if err != nil {
_, _ = store.Delete(r.Context(), it.ID) _, _ = store.Delete(r.Context(), it.ID)
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
@@ -325,27 +363,61 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
// Download // Download
mux.HandleFunc("/api/v1/files/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/api/v1/files/", func(w http.ResponseWriter, r *http.Request) {
// /api/v1/files/{id}/download
parts := strings.Split(strings.TrimPrefix(r.URL.Path, "/api/v1/files/"), "/") parts := strings.Split(strings.TrimPrefix(r.URL.Path, "/api/v1/files/"), "/")
if len(parts) != 2 || parts[1] != "download" { if len(parts) != 2 || parts[1] != "download" {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
id, err := strconv.ParseInt(parts[0], 10, 64) id := parts[0]
if strings.TrimSpace(id) == "" {
http.NotFound(w, r)
return
}
// 1) Metadaten prüfen
it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted {
http.NotFound(w, r)
return
}
// 2) lokal
if rc, meta, err := blobs.Open(r.Context(), id); err == nil {
defer rc.Close()
serveBlob(w, r, rc, meta, it.Name)
return
}
// 3) remote holen & cachen
it1, _ := store.Get(r.Context(), filesvc.ID(id))
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if cfg := meshNode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it1.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id)
if err != nil { if err != nil {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
rc, meta, err := blobs.Open(r.Context(), id) defer rrc.Close()
if err != nil { if _, err := blobs.Save(r.Context(), id, it1.Name, rrc); err != nil {
http.NotFound(w, r) http.Error(w, "cache failed", http.StatusInternalServerError)
return return
} }
defer rc.Close()
w.Header().Set("Content-Type", meta.ContentType) // 4) lokal streamen
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10)) lrc, meta, err := blobs.Open(r.Context(), id)
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name)) if err != nil {
_, _ = io.Copy(w, rc) http.Error(w, "open failed", http.StatusInternalServerError)
return
}
defer lrc.Close()
serveBlob(w, r, lrc, meta, it1.Name)
}) })
} }
@@ -355,10 +427,11 @@ func toMeshSnapshot(s filesvc.Snapshot) mesh.Snapshot {
out := mesh.Snapshot{Items: make([]mesh.Item, 0, len(s.Items))} out := mesh.Snapshot{Items: make([]mesh.Item, 0, len(s.Items))}
for _, it := range s.Items { for _, it := range s.Items {
out.Items = append(out.Items, mesh.Item{ out.Items = append(out.Items, mesh.Item{
ID: int64(it.ID), ID: it.ID,
Name: it.Name, Name: it.Name,
UpdatedAt: it.UpdatedAt, UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted, Deleted: it.Deleted,
Owner: it.Owner,
}) })
} }
return out return out
@@ -368,26 +441,58 @@ func fromMeshSnapshot(ms mesh.Snapshot) filesvc.Snapshot {
out := filesvc.Snapshot{Items: make([]filesvc.File, 0, len(ms.Items))} out := filesvc.Snapshot{Items: make([]filesvc.File, 0, len(ms.Items))}
for _, it := range ms.Items { for _, it := range ms.Items {
out.Items = append(out.Items, filesvc.File{ out.Items = append(out.Items, filesvc.File{
ID: filesvc.ID(it.ID), ID: it.ID,
Name: it.Name, Name: it.Name,
UpdatedAt: it.UpdatedAt, UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted, Deleted: it.Deleted,
Owner: it.Owner,
}) })
} }
return out return out
} }
// isOwnerActive prüft, ob der Owner in der Peer-Liste als "aktiv" gilt.
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
owner = strings.TrimSpace(owner)
if owner == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) != owner {
continue
}
// Self ist per Definition aktiv
if p.Self {
return true
}
// ohne LastSeen: als inaktiv behandeln
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
// Owner ist nicht mal in der Liste: inaktiv
return false
}
/*** main ***/ /*** main ***/
func main() { func main() {
cfg := loadConfig() cfg := loadConfig()
// Domain-Store (mesh-fähig) // Domain-Store (mesh-fähig)
st := filesvc.NewMemStore() nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL)
//st := filesvc.NewMemStore(nodeID)
// Mesh starten // Mesh starten
mcfg := mesh.FromEnv() //mcfg := mesh.FromEnv()
mnode, err := mesh.New(mcfg, mesh.Callbacks{ blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
dataDir := getenvDefault("DATA_DIR", "./data")
metaPath := filepath.Join(dataDir, "meta", "items.json")
st := filesvc.NewMemStorePersistent(nodeID, metaPath)
mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) { GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
s, err := st.Snapshot(ctx) s, err := st.Snapshot(ctx)
if err != nil { if err != nil {
@@ -398,28 +503,88 @@ func main() {
ApplyRemote: func(ctx context.Context, s mesh.Snapshot) error { ApplyRemote: func(ctx context.Context, s mesh.Snapshot) error {
return st.ApplyRemote(ctx, fromMeshSnapshot(s)) return st.ApplyRemote(ctx, fromMeshSnapshot(s))
}, },
BlobOpen: func(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) { //5588
it, err := st.Get(ctx, filesvc.ID(id))
if err != nil || it.Deleted {
return nil, "", "", 0, fmt.Errorf("not found")
}
rc, meta, err := blobs.Open(ctx, id)
if err != nil {
return nil, "", "", 0, err
}
return rc, it.Name, meta.ContentType, meta.Size, nil
},
}) })
if err != nil { if err != nil {
log.Fatalf("mesh init: %v", err) log.Fatalf("mesh init: %v", err)
} }
// Hintergrund-Pruner starten
mnode.StartPeerPruner()
go func() { go func() {
log.Printf("[mesh] listening on %s advertise %s seeds=%v discovery=%v", log.Printf("[mesh] listening on %s advertise %s seeds=%v discovery=%v",
mcfg.BindAddr, mcfg.AdvertURL, mcfg.Seeds, mcfg.EnableDiscovery) cfg.Mesh.BindAddr, cfg.Mesh.AdvertURL, cfg.Mesh.Seeds, cfg.Mesh.EnableDiscovery)
if err := mnode.Serve(); err != nil && !errors.Is(err, http.ErrServerClosed) { if err := mnode.Serve(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("mesh serve: %v", err) log.Fatalf("mesh serve: %v", err)
} }
}() }()
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// aktive Owner bestimmen
peers := mnode.PeerList()
ttl := 2 * time.Minute
if cfg := mnode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
cutoff := time.Now().Add(-ttl)
active := map[string]bool{}
for _, p := range peers {
if p.Self {
active[p.URL] = true
continue
}
if !p.LastSeen.IsZero() && p.LastSeen.After(cutoff) {
active[p.URL] = true
}
}
// alle Items durchgehen; Blobs von Offline-Ownern löschen
var next filesvc.ID
for {
items, nextOut, _ := st.List(context.Background(), next, 1000)
for _, it := range items {
if it.Owner == "" || active[it.Owner] {
continue
}
_ = blobs.Delete(context.Background(), it.ID)
}
if nextOut == "" {
break
}
next = nextOut
}
}
}()
// Root-Mux // Root-Mux
root := http.NewServeMux() root := http.NewServeMux()
// API (Bearer-Auth) // API (Bearer-Auth)
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data")) //blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
apiMux := http.NewServeMux() apiMux := http.NewServeMux()
fileRoutes(apiMux, st) fileRoutes(apiMux, st, blobs)
apiFiles(apiMux, st, blobs, mnode) apiFiles(apiMux, st, blobs, mnode)
root.Handle("/api/", authMiddleware(cfg.APIKey, apiMux)) root.Handle("/api/", authMiddleware(cfg.APIKey, apiMux))
if cfg.PublicDownloads {
registerPublicDownloads(root, st, blobs, mnode, cfg.PublicPath)
}
// Admin-UI (optional BasicAuth via ADMIN_USER/ADMIN_PASS) // Admin-UI (optional BasicAuth via ADMIN_USER/ADMIN_PASS)
adminRoot := http.NewServeMux() adminRoot := http.NewServeMux()
admin.Register(adminRoot, admin.Deps{Store: st, Mesh: mnode, Blob: blobs}) admin.Register(adminRoot, admin.Deps{Store: st, Mesh: mnode, Blob: blobs})
@@ -450,7 +615,8 @@ func main() {
// Graceful shutdown // Graceful shutdown
go func() { go func() {
log.Printf("http listening on %s (api=/api/v1, admin=/admin)", cfg.Mesh.BindAddr) log.Printf("http listening on %s (api=/api/v1, admin=/admin)", cfg.HTTPAddr)
log.Printf("mesh listening on %s", cfg.Mesh.BindAddr)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("http server: %v", err) log.Fatalf("http server: %v", err)
} }
@@ -467,3 +633,96 @@ func main() {
_ = mnode.Close(ctx) _ = mnode.Close(ctx)
log.Println("shutdown complete") log.Println("shutdown complete")
} }
// Public: GET {base}/{id}
// Beispiel: /dl/1
func registerPublicDownloads(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, meshNode *mesh.Node, base string) {
if !strings.HasPrefix(base, "/") {
base = "/" + base
}
mux.HandleFunc(base+"/", func(w http.ResponseWriter, r *http.Request) {
id := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, base+"/"))
if id == "" {
http.NotFound(w, r)
return
}
// 1) Metadaten prüfen
it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted {
http.NotFound(w, r)
return
}
// 2) Lokal versuchen
if rc, meta, err := blobs.Open(context.Background(), id); err == nil {
defer rc.Close()
serveBlob(w, r, rc, meta, it.Name)
return
}
// (Optional) Owner-Online-Check — wenn du auch bei offline Ownern liefern willst, block auskommentieren
{
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if !isOwnerActive(it.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
}
// 3) Aus Mesh holen — EIGENER Timeout-Kontext, NICHT r.Context()
rrc, remoteName, _, _, err := meshNode.FetchBlobAny(context.Background(), id)
if err != nil {
http.NotFound(w, r)
return
}
defer rrc.Close()
filename := strings.TrimSpace(remoteName)
if filename == "" {
filename = it.Name
}
// 4) Lokal cachen — KEIN Request-Kontext, damit Save nicht abbricht
if _, err := blobs.Save(context.Background(), id, filename, rrc); err != nil {
log.Printf("[public] cache save failed id=%s name=%q: %v", id, filename, err)
http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError)
return
}
// 5) Erneut lokal öffnen und streamen
lrc, meta, err := blobs.Open(context.Background(), id)
if err != nil {
http.Error(w, "open failed: "+err.Error(), http.StatusInternalServerError)
return
}
defer lrc.Close()
serveBlob(w, r, lrc, meta, filename)
})
}
// Hilfsfunktion: setzt sinnvolle Header und streamt die Datei
func serveBlob(w http.ResponseWriter, r *http.Request, rc io.ReadSeeker, meta blobfs.Meta, downloadName string) {
if meta.SHA256 != "" {
etag := `W/"` + meta.SHA256 + `"`
if r.Header.Get("If-None-Match") == etag {
w.WriteHeader(http.StatusNotModified)
return
}
w.Header().Set("ETag", etag)
}
if meta.ContentType == "" {
meta.ContentType = "application/octet-stream"
}
if downloadName == "" {
downloadName = meta.Name
}
w.Header().Set("Content-Type", meta.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, downloadName))
w.Header().Set("Access-Control-Expose-Headers", "Content-Disposition")
w.Header().Set("X-Robots-Tag", "noindex")
_, _ = io.Copy(w, rc)
}

View File

@@ -56,31 +56,36 @@ func Register(mux *http.ServeMux, d Deps) {
// Partials // Partials
mux.HandleFunc("/admin/items", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/admin/items", func(w http.ResponseWriter, r *http.Request) {
nextQ := strings.TrimSpace(r.URL.Query().Get("next")) nextID := filesvc.ID(strings.TrimSpace(r.URL.Query().Get("next")))
var nextID filesvc.ID
if nextQ != "" {
if n, err := strconv.ParseInt(nextQ, 10, 64); err == nil {
nextID = filesvc.ID(n)
}
}
items, nextOut, _ := d.Store.List(r.Context(), nextID, 100) items, nextOut, _ := d.Store.List(r.Context(), nextID, 100)
type row struct { type row struct {
ID int64 ID string
Name string Name string
UpdatedAt int64 UpdatedAt int64
HasBlob bool HasBlob bool
Size int64 Size int64
Owner string
OwnerActive bool
} }
rows := make([]row, 0, len(items)) rows := make([]row, 0, len(items))
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
for _, it := range items { for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), int64(it.ID)) meta, ok, _ := d.Blob.Stat(r.Context(), it.ID)
rows = append(rows, row{ rows = append(rows, row{
ID: int64(it.ID), ID: it.ID,
Name: it.Name, Name: it.Name,
UpdatedAt: it.UpdatedAt, UpdatedAt: it.UpdatedAt,
HasBlob: ok, HasBlob: ok,
Size: meta.Size, Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
}) })
} }
@@ -90,6 +95,42 @@ func Register(mux *http.ServeMux, d Deps) {
}) })
}) })
mux.HandleFunc("/admin/items/takeover", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
id := strings.TrimSpace(r.FormValue("id"))
if id != "" {
// Nur zulassen, wenn Owner tatsächlich offline ist
it, err := d.Store.Get(r.Context(), filesvc.ID(id))
if err == nil {
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it.Owner, peers, ttl) {
// eigene URL aus PeerList ermitteln
self := ""
for _, p := range peers {
if p.Self {
self = p.URL
break
}
}
if self == "" {
self = "unknown-self"
}
if _, err := d.Store.TakeoverOwner(r.Context(), filesvc.ID(id), self); err == nil {
_ = d.Mesh.SyncNow(r.Context())
}
}
}
}
renderItemsPartial(w, r, d)
})
// Upload (multipart/form-data, Feldname "file", optional name-Override) // Upload (multipart/form-data, Feldname "file", optional name-Override)
mux.HandleFunc("/admin/files/upload", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/admin/files/upload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost { if r.Method != http.MethodPost {
@@ -119,14 +160,14 @@ func Register(mux *http.ServeMux, d Deps) {
} }
// 2) Blob speichern // 2) Blob speichern
if _, err := d.Blob.Save(r.Context(), int64(it.ID), name, fh); err != nil { if _, err := d.Blob.Save(r.Context(), (it.ID), name, fh); err != nil {
// zurückrollen (Tombstone) // zurückrollen (Tombstone)
_, _ = d.Store.Delete(r.Context(), it.ID) _, _ = d.Store.Delete(r.Context(), it.ID)
http.Error(w, "save failed: "+err.Error(), http.StatusInternalServerError) http.Error(w, "save failed: "+err.Error(), http.StatusInternalServerError)
return return
} }
_ = d.Mesh.SyncNow(r.Context()) // best-effort Push _ = d.Mesh.SyncNow(r.Context()) // best-effort Push
http.Redirect(w, r, "/admin/items", http.StatusSeeOther) http.Redirect(w, r, "/admin", http.StatusSeeOther)
}) })
// Download (Admin BasicAuth schützt ggf.) // Download (Admin BasicAuth schützt ggf.)
@@ -137,21 +178,46 @@ func Register(mux *http.ServeMux, d Deps) {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
id, err := strconv.ParseInt(parts[0], 10, 64) id := parts[0]
if strings.TrimSpace(id) == "" {
http.NotFound(w, r)
return
}
// 1) lokal versuchen
if rc, meta, err := d.Blob.Open(r.Context(), id); err == nil {
defer rc.Close()
w.Header().Set("Content-Type", meta.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
_, _ = io.Copy(w, rc)
return
}
// 2) Remote über Mesh holen
rrc, name, _, _, err := d.Mesh.FetchBlobAny(r.Context(), id)
if err != nil { if err != nil {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
rc, meta, err := d.Blob.Open(r.Context(), id) defer rrc.Close()
if err != nil {
http.NotFound(w, r) // 3) lokal cachen (Save konsumiert den Stream)
if _, err := d.Blob.Save(r.Context(), id, name, rrc); err != nil {
http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError)
return return
} }
defer rc.Close() // 4) aus lokalem Store ausliefern (saubere Größe/CT)
lrc, meta, err := d.Blob.Open(r.Context(), id)
if err != nil {
http.Error(w, "open failed", http.StatusInternalServerError)
return
}
defer lrc.Close()
w.Header().Set("Content-Type", meta.ContentType) w.Header().Set("Content-Type", meta.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10)) w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name)) w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
_, _ = io.Copy(w, rc) _, _ = io.Copy(w, lrc)
}) })
mux.HandleFunc("/admin/peers", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/admin/peers", func(w http.ResponseWriter, r *http.Request) {
@@ -162,7 +228,7 @@ func Register(mux *http.ServeMux, d Deps) {
}) })
}) })
// Actions (HTMX POSTs) // CREATE
mux.HandleFunc("/admin/items/create", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/admin/items/create", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost { if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed) http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
@@ -171,36 +237,40 @@ func Register(mux *http.ServeMux, d Deps) {
name := strings.TrimSpace(r.FormValue("name")) name := strings.TrimSpace(r.FormValue("name"))
if name != "" { if name != "" {
_, _ = d.Store.Create(r.Context(), name) _, _ = d.Store.Create(r.Context(), name)
_ = d.Mesh.SyncNow(r.Context()) // prompt push (best effort) _ = d.Mesh.SyncNow(r.Context())
} }
// Nach Aktion Items partial zurückgeben (HTMX swap) // statt Redirect:
http.Redirect(w, r, "/admin/items", http.StatusSeeOther) renderItemsPartial(w, r, d)
}) })
// RENAME
mux.HandleFunc("/admin/items/rename", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/admin/items/rename", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost { if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed) http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return return
} }
idStr := r.FormValue("id") id := strings.TrimSpace(r.FormValue("id"))
newName := strings.TrimSpace(r.FormValue("name")) newName := strings.TrimSpace(r.FormValue("name"))
if id, err := strconv.ParseInt(idStr, 10, 64); err == nil && newName != "" { if id != "" && newName != "" {
_, _ = d.Store.Rename(r.Context(), filesvc.ID(id), newName) _, _ = d.Store.Rename(r.Context(), filesvc.ID(id), newName)
_ = d.Mesh.SyncNow(r.Context()) _ = d.Mesh.SyncNow(r.Context())
} }
http.Redirect(w, r, "/admin/items", http.StatusSeeOther) renderItemsPartial(w, r, d)
}) })
// DELETE
mux.HandleFunc("/admin/items/delete", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/admin/items/delete", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost { if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed) http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return return
} }
if id, err := strconv.ParseInt(r.FormValue("id"), 10, 64); err == nil { id := strings.TrimSpace(r.FormValue("id"))
if id != "" {
_, _ = d.Store.Delete(r.Context(), filesvc.ID(id)) _, _ = d.Store.Delete(r.Context(), filesvc.ID(id))
_ = d.Blob.Delete(r.Context(), id)
_ = d.Mesh.SyncNow(r.Context()) _ = d.Mesh.SyncNow(r.Context())
} }
http.Redirect(w, r, "/admin/items", http.StatusSeeOther) renderItemsPartial(w, r, d)
}) })
mux.HandleFunc("/admin/mesh/syncnow", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/admin/mesh/syncnow", func(w http.ResponseWriter, r *http.Request) {
@@ -231,3 +301,61 @@ func BasicAuth(user, pass string, next http.Handler) http.Handler {
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
}) })
} }
// rebuild & render items partial for HTMX swaps
func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) {
type row struct {
ID string
Name string
UpdatedAt int64
HasBlob bool
Size int64
Owner string
OwnerActive bool
}
nextID := filesvc.ID(strings.TrimSpace(r.URL.Query().Get("next")))
items, nextOut, _ := d.Store.List(r.Context(), nextID, 100)
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
rows := make([]row, 0, len(items))
for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), (it.ID))
rows = append(rows, row{
ID: (it.ID),
Name: it.Name,
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
})
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_ = tplItems.Execute(w, map[string]any{
"Items": rows,
"Next": nextOut,
})
}
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
if strings.TrimSpace(owner) == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) == strings.TrimSpace(owner) {
// Self ist immer aktiv, sonst nach LastSeen
if p.Self {
return true
}
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
}
return false
}

View File

@@ -8,7 +8,7 @@
<style> <style>
:root { --bg:#0b1220; --card:#121a2b; --muted:#94a3b8; --text:#e5e7eb; --accent:#4f46e5; } :root { --bg:#0b1220; --card:#121a2b; --muted:#94a3b8; --text:#e5e7eb; --accent:#4f46e5; }
html,body { margin:0; background:var(--bg); color:var(--text); font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Ubuntu; } html,body { margin:0; background:var(--bg); color:var(--text); font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Ubuntu; }
.wrap { max-width: 980px; margin: 40px auto; padding: 0 16px; } .wrap { max-width: 1200px; margin: 40px auto; padding: 0 16px; }
.nav { display:flex; gap:12px; margin-bottom:16px; } .nav { display:flex; gap:12px; margin-bottom:16px; }
.btn { background:var(--card); border:1px solid #243044; padding:8px 12px; border-radius:10px; color:var(--text); cursor:pointer; } .btn { background:var(--card); border:1px solid #243044; padding:8px 12px; border-radius:10px; color:var(--text); cursor:pointer; }
.btn:hover { border-color:#3b4a66; } .btn:hover { border-color:#3b4a66; }

View File

@@ -1,6 +1,6 @@
<div class="row"> <div class="row">
<div style="flex: 1 1 360px"> <div style="flex: 1 1 360px">
<form hx-post="/admin/items/create" hx-target="#items" hx-get="/admin/items" hx-swap="outerHTML"> <form hx-post="/admin/items/create" hx-target="#items" hx-swap="outerHTML">
<label>Neue leere Datei (nur Metadaten)</label> <label>Neue leere Datei (nur Metadaten)</label>
<div style="display:flex; gap:8px; margin-top:6px"> <div style="display:flex; gap:8px; margin-top:6px">
<input type="text" name="name" placeholder="z.B. notes.txt" required> <input type="text" name="name" placeholder="z.B. notes.txt" required>
@@ -24,13 +24,19 @@
<table> <table>
<thead> <thead>
<tr> <tr>
<th>ID</th><th>Name</th><th>Updated</th><th>Blob</th><th style="width:260px">Aktionen</th> <th>ID</th>
<th>Name</th>
<th>Updated</th>
<th>Blob</th>
<th>Owner</th>
<th>Status</th>
<th style="width:320px">Aktionen</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
{{ range .Items }} {{ range .Items }}
<tr> <tr>
<td>{{ .ID }}</td> <td><code style="font-size:12px">{{ .ID }}</code></td>
<td>{{ .Name }}</td> <td>{{ .Name }}</td>
<td><small class="muted">{{ printf "%.19s" (timeRFC3339 .UpdatedAt) }}</small></td> <td><small class="muted">{{ printf "%.19s" (timeRFC3339 .UpdatedAt) }}</small></td>
<td> <td>
@@ -41,17 +47,33 @@
<span class="pill" style="background:#3a0b0b;border-color:#5b1a1a;color:#fbb;">fehlt</span> <span class="pill" style="background:#3a0b0b;border-color:#5b1a1a;color:#fbb;">fehlt</span>
{{ end }} {{ end }}
</td> </td>
<td><small class="muted">{{ .Owner }}</small></td>
<td>
{{ if .OwnerActive }}
<span class="pill">online</span>
{{ else }}
<span class="pill" style="background:#5b1a1a;border-color:#7a2b2b;color:#fbb">offline</span>
<!-- Owner-Handover nur wenn offline -->
<form style="display:inline"
hx-post="/admin/items/takeover"
hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Owner übernehmen?');">
<input type="hidden" name="id" value="{{ .ID }}">
<button class="btn" type="submit">Übernehmen</button>
</form>
{{ end }}
</td>
<td> <td>
<form style="display:inline-flex; gap:6px" <form style="display:inline-flex; gap:6px"
hx-post="/admin/items/rename" hx-post="/admin/items/rename"
hx-target="#items" hx-get="/admin/items" hx-swap="outerHTML"> hx-target="#items" hx-swap="outerHTML">
<input type="hidden" name="id" value="{{ .ID }}"> <input type="hidden" name="id" value="{{ .ID }}">
<input type="text" name="name" placeholder="Neuer Name"> <input type="text" name="name" placeholder="Neuer Name">
<button class="btn" type="submit">Rename</button> <button class="btn" type="submit">Rename</button>
</form> </form>
<form style="display:inline" <form style="display:inline"
hx-post="/admin/items/delete" hx-post="/admin/items/delete"
hx-target="#items" hx-get="/admin/items" hx-swap="outerHTML" hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Wirklich löschen (inkl. Blob)?');"> onsubmit="return confirm('Wirklich löschen (inkl. Blob)?');">
<input type="hidden" name="id" value="{{ .ID }}"> <input type="hidden" name="id" value="{{ .ID }}">
<button class="btn" type="submit">Delete</button> <button class="btn" type="submit">Delete</button>
@@ -62,7 +84,7 @@
</td> </td>
</tr> </tr>
{{ else }} {{ else }}
<tr><td colspan="5" class="muted">Keine Dateien vorhanden.</td></tr> <tr><td colspan="7" class="muted">Keine Dateien vorhanden.</td></tr>
{{ end }} {{ end }}
</tbody> </tbody>
</table> </table>

View File

@@ -11,11 +11,11 @@
{{ range .Peers }} {{ range .Peers }}
<tr> <tr>
<td>{{ .URL }}</td> <td>{{ .URL }}</td>
<td>{{ if .Self }}✅{{ else }}{{ end }}</td> <td>{{ if .Self }}✅{{ else }}🔗{{ end }}</td>
<td><small class="muted">{{ .LastSeen }}</small></td> <td><small class="muted">{{ .LastSeen }}</small></td>
</tr> </tr>
{{ else }} {{ else }}
<tr><td colspan="3" class="muted">Keine Peers vorhanden.</td></tr> <tr><td colspan="3" class="muted">⚠️ Keine Peers vorhanden.</td></tr>
{{ end }} {{ end }}
</tbody> </tbody>
</table> </table>

View File

@@ -6,7 +6,6 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"io" "io"
"mime" "mime"
"net/http" "net/http"
@@ -23,22 +22,35 @@ type Meta struct {
} }
type Store interface { type Store interface {
Save(ctx context.Context, id int64, filename string, r io.Reader) (Meta, error) Save(ctx context.Context, id string, filename string, r io.Reader) (Meta, error)
Open(ctx context.Context, id int64) (io.ReadSeekCloser, Meta, error) Open(ctx context.Context, id string) (io.ReadSeekCloser, Meta, error)
Stat(ctx context.Context, id int64) (Meta, bool, error) Stat(ctx context.Context, id string) (Meta, bool, error)
Delete(ctx context.Context, id int64) error Delete(ctx context.Context, id string) error
} }
type FS struct{ root string } type FS struct{ root string }
func New(root string) *FS { return &FS{root: root} } func New(root string) *FS { return &FS{root: root} }
func (fs *FS) dir(id int64) string { return filepath.Join(fs.root, "files", fmt.Sprintf("%d", id)) } func (fs *FS) dir(id string) string { return filepath.Join(fs.root, "files", sanitizeID(id)) }
func (fs *FS) metaPath(id int64) string { return filepath.Join(fs.dir(id), "meta.json") } func (fs *FS) metaPath(id string) string { return filepath.Join(fs.dir(id), "meta.json") }
func (fs *FS) blobPath(id int64, name string) string { func (fs *FS) blobPath(id string, name string) string {
return filepath.Join(fs.dir(id), "blob"+safeExt(name)) return filepath.Join(fs.dir(id), "blob"+safeExt(name))
} }
func sanitizeID(id string) string {
// nur 0-9a-zA-Z- zulassen; Rest mit '_' ersetzen
b := make([]rune, 0, len(id))
for _, r := range id {
if (r >= '0' && r <= '9') || (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || r == '-' {
b = append(b, r)
} else {
b = append(b, '_')
}
}
return string(b)
}
func safeExt(name string) string { func safeExt(name string) string {
ext := filepath.Ext(name) ext := filepath.Ext(name)
if len(ext) > 16 { // unrealistisch lange Exts beschneiden if len(ext) > 16 { // unrealistisch lange Exts beschneiden
@@ -47,7 +59,7 @@ func safeExt(name string) string {
return ext return ext
} }
func (fs *FS) Save(_ context.Context, id int64, filename string, r io.Reader) (Meta, error) { func (fs *FS) Save(_ context.Context, id string, filename string, r io.Reader) (Meta, error) {
if strings.TrimSpace(filename) == "" { if strings.TrimSpace(filename) == "" {
return Meta{}, errors.New("filename required") return Meta{}, errors.New("filename required")
} }
@@ -107,7 +119,7 @@ func (fs *FS) Save(_ context.Context, id int64, filename string, r io.Reader) (M
return meta, nil return meta, nil
} }
func (fs *FS) Open(_ context.Context, id int64) (io.ReadSeekCloser, Meta, error) { func (fs *FS) Open(_ context.Context, id string) (io.ReadSeekCloser, Meta, error) {
meta, ok, err := fs.Stat(context.Background(), id) meta, ok, err := fs.Stat(context.Background(), id)
if err != nil { if err != nil {
return nil, Meta{}, err return nil, Meta{}, err
@@ -119,7 +131,7 @@ func (fs *FS) Open(_ context.Context, id int64) (io.ReadSeekCloser, Meta, error)
return f, meta, err return f, meta, err
} }
func (fs *FS) Stat(_ context.Context, id int64) (Meta, bool, error) { func (fs *FS) Stat(_ context.Context, id string) (Meta, bool, error) {
b, err := os.ReadFile(fs.metaPath(id)) b, err := os.ReadFile(fs.metaPath(id))
if err != nil { if err != nil {
if os.IsNotExist(err) { if os.IsNotExist(err) {
@@ -139,7 +151,7 @@ func (fs *FS) Stat(_ context.Context, id int64) (Meta, bool, error) {
return m, true, nil return m, true, nil
} }
func (fs *FS) Delete(_ context.Context, id int64) error { func (fs *FS) Delete(_ context.Context, id string) error {
return os.RemoveAll(fs.dir(id)) return os.RemoveAll(fs.dir(id))
} }

View File

@@ -2,6 +2,9 @@ package filesvc
import ( import (
"context" "context"
"encoding/json"
"os"
"path/filepath"
"slices" "slices"
"strings" "strings"
"sync" "sync"
@@ -11,17 +14,82 @@ import (
type MemStore struct { type MemStore struct {
mu sync.Mutex mu sync.Mutex
items map[ID]File items map[ID]File
next ID self string
// optionales Eventing // optionales Eventing
subs []chan ChangeEvent subs []chan ChangeEvent
persistPath string
} }
func NewMemStore() *MemStore { func NewMemStore(self string) *MemStore {
return &MemStore{ return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)}
items: make(map[ID]File), }
next: 1,
func NewMemStorePersistent(self, path string) *MemStore {
m := NewMemStore(self)
m.persistPath = strings.TrimSpace(path)
// beim Start versuchen zu laden
_ = m.loadFromDisk()
return m
}
// --- Persistenz-Helper (NEU) ---
func (m *MemStore) loadFromDisk() error {
if m.persistPath == "" {
return nil
} }
f, err := os.Open(m.persistPath)
if err != nil {
return nil // Datei existiert beim ersten Start nicht ok
}
defer f.Close()
var snap Snapshot
if err := json.NewDecoder(f).Decode(&snap); err != nil {
return err
}
m.mu.Lock()
for _, it := range snap.Items {
m.items[it.ID] = it
}
m.mu.Unlock()
return nil
}
func (m *MemStore) saveLocked() error {
if m.persistPath == "" {
return nil
}
if err := os.MkdirAll(filepath.Dir(m.persistPath), 0o755); err != nil {
return err
}
// Snapshot aus Map bauen
snap := Snapshot{Items: make([]File, 0, len(m.items))}
for _, it := range m.items {
snap.Items = append(snap.Items, it)
}
// atomar schreiben
tmp := m.persistPath + ".tmp"
f, err := os.Create(tmp)
if err != nil {
return err
}
enc := json.NewEncoder(f)
enc.SetIndent("", " ")
if err := enc.Encode(&snap); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Sync(); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return err
}
return os.Rename(tmp, m.persistPath)
} }
/*** Store ***/ /*** Store ***/
@@ -39,39 +107,54 @@ func (m *MemStore) Get(_ context.Context, id ID) (File, error) {
func (m *MemStore) List(_ context.Context, next ID, limit int) ([]File, ID, error) { func (m *MemStore) List(_ context.Context, next ID, limit int) ([]File, ID, error) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
if limit <= 0 || limit > 1000 { if limit <= 0 || limit > 1000 {
limit = 100 limit = 100
} }
ids := make([]ID, 0, len(m.items))
for id := range m.items { // sortiere deterministisch nach UpdatedAt, dann ID
ids = append(ids, id) all := make([]File, 0, len(m.items))
for _, v := range m.items {
all = append(all, v)
} }
slices.Sort(ids) slices.SortFunc(all, func(a, b File) int {
if a.UpdatedAt == b.UpdatedAt {
if a.ID == b.ID {
return 0
}
if a.ID < b.ID {
return -1
}
return 1
}
if a.UpdatedAt < b.UpdatedAt {
return -1
}
return 1
})
start := 0 start := 0
if next > 0 { if next != "" {
for i, id := range ids { for i, it := range all {
if id >= next { if it.ID >= next {
start = i start = i
break break
} }
} }
} }
end := start + limit end := start + limit
if end > len(ids) { if end > len(all) {
end = len(ids) end = len(all)
} }
out := make([]File, 0, end-start) out := make([]File, 0, end-start)
for _, id := range ids[start:end] { for _, it := range all[start:end] {
it := m.items[id]
if !it.Deleted { if !it.Deleted {
out = append(out, it) out = append(out, it)
} }
} }
var nextOut ID var nextOut ID
if end < len(ids) { if end < len(all) {
nextOut = ids[end] nextOut = all[end].ID
} }
return out, nextOut, nil return out, nextOut, nil
} }
@@ -85,9 +168,13 @@ func (m *MemStore) Create(_ context.Context, name string) (File, error) {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
now := time.Now().UnixNano() now := time.Now().UnixNano()
it := File{ID: m.next, Name: name, UpdatedAt: now} uid, err := NewUUIDv4()
if err != nil {
return File{}, err
}
it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self}
m.items[it.ID] = it m.items[it.ID] = it
m.next++ _ = m.saveLocked()
m.emit(it) m.emit(it)
return it, nil return it, nil
} }
@@ -104,9 +191,13 @@ func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error
if !ok || it.Deleted { if !ok || it.Deleted {
return File{}, ErrNotFound return File{}, ErrNotFound
} }
it.Name = newName if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
it.Name = strings.TrimSpace(newName)
it.UpdatedAt = time.Now().UnixNano() it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it m.items[id] = it
_ = m.saveLocked()
m.emit(it) m.emit(it)
return it, nil return it, nil
} }
@@ -118,16 +209,46 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) {
if !ok { if !ok {
return File{}, ErrNotFound return File{}, ErrNotFound
} }
if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
if it.Deleted { if it.Deleted {
return it, nil return it, nil
} }
it.Deleted = true it.Deleted = true
it.UpdatedAt = time.Now().UnixNano() it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it m.items[id] = it
_ = m.saveLocked()
m.emit(it) m.emit(it)
return it, nil return it, nil
} }
func (m *MemStore) TakeoverOwner(_ context.Context, id ID, newOwner string) (File, error) {
m.mu.Lock()
defer m.mu.Unlock()
it, ok := m.items[id]
if !ok || it.Deleted {
return File{}, ErrNotFound
}
newOwner = strings.TrimSpace(newOwner)
if newOwner == "" {
return File{}, ErrBadInput
}
// Sicherheit: nur für sich selbst übernehmen
if newOwner != m.self {
return File{}, ErrForbidden
}
if it.Owner == newOwner {
return it, nil
}
it.Owner = newOwner
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
_ = m.saveLocked()
m.emitLocked(it)
return it, nil
}
/*** Replicable ***/ /*** Replicable ***/
func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) { func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
@@ -143,16 +264,22 @@ func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error { func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
changed := false
for _, ri := range s.Items { for _, ri := range s.Items {
li, ok := m.items[ri.ID] li, ok := m.items[ri.ID]
if !ok || ri.UpdatedAt > li.UpdatedAt { if !ok || ri.UpdatedAt > li.UpdatedAt {
m.items[ri.ID] = ri // Owner nie überschreiben, außer er ist leer
if ri.ID >= m.next { if ok && li.Owner != "" && ri.Owner != "" && ri.Owner != li.Owner {
m.next = ri.ID + 1 ri.Owner = li.Owner
} }
m.items[ri.ID] = ri
changed = true
m.emitLocked(ri) m.emitLocked(ri)
} }
} }
if changed {
_ = m.saveLocked() // ← NEU
}
return nil return nil
} }

View File

@@ -2,20 +2,22 @@ package filesvc
import ( import (
"context" "context"
"crypto/rand"
"errors" "errors"
"fmt"
"time" "time"
) )
/*** Domain ***/ /*** Domain ***/
type ID = int64 type ID = string
type File struct { type File struct {
ID ID `json:"id"` ID ID `json:"id"`
Name string `json:"name"` Name string `json:"name"`
// weitere Metadaten optional: Size, Hash, Owner, Tags, ... UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW
UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete
Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete Owner string `json:"owner"` //AdvertURL/NodeID des Erzeugers
} }
/*** Fehler ***/ /*** Fehler ***/
@@ -34,11 +36,10 @@ type Store interface {
// Lesen & Auflisten // Lesen & Auflisten
Get(ctx context.Context, id ID) (File, error) Get(ctx context.Context, id ID) (File, error)
List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error) List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error)
// Mutationen mit LWW-Semantik (UpdatedAt wird intern gesetzt, außer bei ApplyRemote)
Create(ctx context.Context, name string) (File, error) Create(ctx context.Context, name string) (File, error)
Rename(ctx context.Context, id ID, newName string) (File, error) Rename(ctx context.Context, id ID, newName string) (File, error) // nur Owner darf
Delete(ctx context.Context, id ID) (File, error) Delete(ctx context.Context, id ID) (File, error) // nur Owner darf
TakeoverOwner(ctx context.Context, id ID, newOwner string) (File, error)
} }
/*** Mesh-Replikation ***/ /*** Mesh-Replikation ***/
@@ -76,3 +77,13 @@ type MeshStore interface {
Replicable Replicable
Watchable // optional kann Noop sein Watchable // optional kann Noop sein
} }
func NewUUIDv4() (string, error) {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
return "", err
}
b[6] = (b[6] & 0x0f) | 0x40 // Version 4
b[8] = (b[8] & 0x3f) | 0x80 // Variant RFC4122
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]), nil
}

View File

@@ -8,13 +8,16 @@ import (
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt"
"hash/crc32" "hash/crc32"
"io" "io"
"log" "log"
"math/rand/v2"
"net" "net"
"net/http" "net/http"
"os" "os"
"slices" "slices"
"strconv"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -28,7 +31,18 @@ type Config struct {
Seeds []string // other peers' mesh base URLs Seeds []string // other peers' mesh base URLs
ClusterSecret string // HMAC key ClusterSecret string // HMAC key
EnableDiscovery bool EnableDiscovery bool
DiscoveryAddress string // "239.8.8.8:9898" DiscoveryAddress string // "239.8.8.8:9898"
PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten)
PruneInterval time.Duration // wie oft wird gepruned
SyncInterval time.Duration
Fanout int
// NEU:
HelloInterval time.Duration // wie oft pingen
HelloFanout int // wie viele Peers pro Tick
BlobTimeout time.Duration
} }
type Peer struct { type Peer struct {
@@ -39,10 +53,11 @@ type Peer struct {
} }
type Item struct { type Item struct {
ID int64 `json:"id"` ID string `json:"id"`
Name string `json:"name"` Name string `json:"name"`
UpdatedAt int64 `json:"updatedAt"` UpdatedAt int64 `json:"updatedAt"`
Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes
Owner string `json:"owner"`
} }
type Snapshot struct { type Snapshot struct {
@@ -53,6 +68,7 @@ type Snapshot struct {
type Callbacks struct { type Callbacks struct {
GetSnapshot func(ctx context.Context) (Snapshot, error) GetSnapshot func(ctx context.Context) (Snapshot, error)
ApplyRemote func(ctx context.Context, s Snapshot) error ApplyRemote func(ctx context.Context, s Snapshot) error
BlobOpen func(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error)
} }
/*** Node ***/ /*** Node ***/
@@ -69,6 +85,80 @@ type Node struct {
wg sync.WaitGroup wg sync.WaitGroup
} }
// RemovePeer löscht einen Peer aus der Peer-Tabelle. Seeds werden standardmäßig nicht entfernt.
func (n *Node) RemovePeer(url string) bool {
n.mu.Lock()
defer n.mu.Unlock()
if url == "" || url == n.self.URL {
return false
}
// Seeds schützen
if n.isSeed(url) {
return false
}
if _, ok := n.peers[url]; ok {
delete(n.peers, url)
return true
}
return false
}
func (n *Node) Config() Config { return n.cfg }
// PruneNow entfernt alle Peers, deren LastSeen vor cutoff liegt (Seeds bleiben).
func (n *Node) PruneNow(cutoff time.Time) int {
n.mu.Lock()
defer n.mu.Unlock()
removed := 0
for url, p := range n.peers {
if url == n.self.URL || n.isSeed(url) {
continue
}
if p.LastSeen.IsZero() || p.LastSeen.Before(cutoff) {
delete(n.peers, url)
removed++
}
}
return removed
}
// StartPeerPruner startet den Hintergrundjob (stoppt automatisch bei n.stop).
func (n *Node) StartPeerPruner() {
go n.loopPrunePeers()
}
func (n *Node) loopPrunePeers() {
ttl := n.cfg.PeerTTL
if ttl <= 0 {
ttl = 2 * time.Minute
}
interval := n.cfg.PruneInterval
if interval <= 0 {
interval = 30 * time.Second
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
cutoff := time.Now().Add(-ttl)
_ = n.PruneNow(cutoff)
}
}
}
// helper: ist url ein Seed?
func (n *Node) isSeed(url string) bool {
for _, s := range n.cfg.Seeds {
if strings.TrimSpace(s) == strings.TrimSpace(url) {
return true
}
}
return false
}
func New(cfg Config, cbs Callbacks) (*Node, error) { func New(cfg Config, cbs Callbacks) (*Node, error) {
if cfg.BindAddr == "" || cfg.AdvertURL == "" { if cfg.BindAddr == "" || cfg.AdvertURL == "" {
return nil, errors.New("mesh: BindAddr and AdvertURL required") return nil, errors.New("mesh: BindAddr and AdvertURL required")
@@ -110,21 +200,38 @@ func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "bad signature", http.StatusUnauthorized) http.Error(w, "bad signature", http.StatusUnauthorized)
return return
} }
var p Peer var req struct {
if err := json.Unmarshal(body, &p); err != nil { URL string `json:"url"`
}
if err := json.Unmarshal(body, &req); err != nil || strings.TrimSpace(req.URL) == "" {
http.Error(w, "bad json", http.StatusBadRequest) http.Error(w, "bad json", http.StatusBadRequest)
return return
} }
p.LastSeen = time.Now()
// Peer anlegen (falls neu) und LastSeen setzen
n.mu.Lock() n.mu.Lock()
if existing, ok := n.peers[p.URL]; ok { if req.URL != n.self.URL {
existing.LastSeen = p.LastSeen if p, ok := n.peers[req.URL]; ok {
} else if p.URL != n.self.URL { p.LastSeen = time.Now()
cp := p } else {
n.peers[p.URL] = &cp cp := Peer{URL: req.URL, LastSeen: time.Now()} // weitere Felder wie bekannt
n.peers[req.URL] = &cp
}
}
n.mu.Unlock()
w.WriteHeader(http.StatusOK)
}
func (n *Node) touchPeer(url string) {
if strings.TrimSpace(url) == "" {
return
}
n.mu.Lock()
if p, ok := n.peers[url]; ok {
p.LastSeen = time.Now()
} }
n.mu.Unlock() n.mu.Unlock()
w.WriteHeader(http.StatusNoContent)
} }
func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) { func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) {
@@ -133,6 +240,7 @@ func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) {
var list []Peer var list []Peer
list = append(list, n.self) list = append(list, n.self)
for _, p := range n.peers { for _, p := range n.peers {
//p.Self = false
list = append(list, *p) list = append(list, *p)
} }
writeJSON(w, http.StatusOK, list) writeJSON(w, http.StatusOK, list)
@@ -170,6 +278,7 @@ func (n *Node) Serve() error {
mux := http.NewServeMux() mux := http.NewServeMux()
mux.HandleFunc("/mesh/peers", n.peersHandler) mux.HandleFunc("/mesh/peers", n.peersHandler)
mux.HandleFunc("/mesh/hello", n.helloHandler) mux.HandleFunc("/mesh/hello", n.helloHandler)
mux.HandleFunc("/mesh/blob", n.blobHandler)
mux.HandleFunc("/mesh/sync", n.syncHandler) mux.HandleFunc("/mesh/sync", n.syncHandler)
n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux} n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux}
@@ -180,6 +289,9 @@ func (n *Node) Serve() error {
n.loopSeeder() n.loopSeeder()
}() }()
n.wg.Add(1)
go func() { defer n.wg.Done(); n.loopPeerExchange() }()
if n.cfg.EnableDiscovery && n.cfg.DiscoveryAddress != "" { if n.cfg.EnableDiscovery && n.cfg.DiscoveryAddress != "" {
n.wg.Add(2) n.wg.Add(2)
go func() { go func() {
@@ -198,6 +310,12 @@ func (n *Node) Serve() error {
n.loopAntiEntropy() n.loopAntiEntropy()
}() }()
n.wg.Add(1)
go func() {
defer n.wg.Done()
n.loopHello()
}()
// http server // http server
errc := make(chan error, 1) errc := make(chan error, 1)
go func() { go func() {
@@ -222,6 +340,43 @@ func (n *Node) Close(ctx context.Context) error {
/*** Loops ***/ /*** Loops ***/
func (n *Node) loopPeerExchange() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
}
// Seeds abfragen
for _, s := range n.cfg.Seeds {
if strings.TrimSpace(s) == "" {
continue
}
resp, err := n.client.Get(strings.TrimRight(s, "/") + "/mesh/peers")
if err != nil {
continue
}
var list []Peer
if json.NewDecoder(resp.Body).Decode(&list) == nil {
n.mu.Lock()
for _, p := range list {
if p.URL == "" || p.URL == n.self.URL {
continue
}
if _, ok := n.peers[p.URL]; !ok {
cp := p
n.peers[p.URL] = &cp
}
}
n.mu.Unlock()
}
resp.Body.Close()
}
}
}
func (n *Node) loopSeeder() { func (n *Node) loopSeeder() {
// attempt to hello known seeds every 5s at start, then every 30s // attempt to hello known seeds every 5s at start, then every 30s
backoff := 5 * time.Second backoff := 5 * time.Second
@@ -345,28 +500,100 @@ func (n *Node) loopBeaconRecv() {
/*** Outgoing ***/ /*** Outgoing ***/
func (n *Node) sendHello(url string) error { func (n *Node) sendHello(url string) error {
p := n.self b, _ := json.Marshal(struct {
b, _ := json.Marshal(p) URL string `json:"url"`
}{URL: n.self.URL})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b)) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b)) req.Header.Set("X-Mesh-Sig", n.sign(b))
resp, err := n.client.Do(req) resp, err := n.client.Do(req)
if err == nil { if err != nil {
io.Copy(io.Discard, resp.Body) return err
resp.Body.Close() }
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // eigenen Blick auf den Peer direkt als „gesehen“ markieren
return nil
}
return fmt.Errorf("hello %s: %s", url, resp.Status)
}
func (n *Node) loopHello() {
interval := n.cfg.HelloInterval
if interval <= 0 {
interval = 20 * time.Second
}
fanout := n.cfg.HelloFanout
if fanout <= 0 {
fanout = 8
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
}
// Liste der *bekannten* Peers (nicht nur Seeds)
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
if url != n.self.URL {
targets = append(targets, url)
}
}
n.mu.RUnlock()
if len(targets) == 0 {
continue
}
// zufällig mischen und auf Fanout begrenzen
rand.Shuffle(len(targets), func(i, j int) { targets[i], targets[j] = targets[j], targets[i] })
if fanout < len(targets) {
targets = targets[:fanout]
}
// leicht parallel pingen
var wg sync.WaitGroup
for _, u := range targets {
u := u
wg.Add(1)
go func() {
defer wg.Done()
_ = n.sendHello(u)
}()
}
wg.Wait()
} }
return err
} }
func (n *Node) sendSync(url string, s Snapshot) error { func (n *Node) sendSync(url string, s Snapshot) error {
b, _ := json.Marshal(s) b, _ := json.Marshal(s)
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b)) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b)) req.Header.Set("X-Mesh-Sig", n.sign(b))
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
req = req.WithContext(ctx)
resp, err := n.client.Do(req) resp, err := n.client.Do(req)
if err == nil { if err != nil {
io.Copy(io.Discard, resp.Body) return err
resp.Body.Close()
} }
return err defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("sync %s: %s", url, resp.Status)
}
n.touchPeer(url)
return nil
} }
// PeerList liefert eine Kopie der bekannten Peers inkl. Self. // PeerList liefert eine Kopie der bekannten Peers inkl. Self.
@@ -377,6 +604,7 @@ func (n *Node) PeerList() []Peer {
out = append(out, n.self) out = append(out, n.self)
for _, p := range n.peers { for _, p := range n.peers {
cp := *p cp := *p
cp.Self = false
out = append(out, cp) out = append(out, cp)
} }
return out return out
@@ -403,11 +631,11 @@ func (n *Node) SyncNow(ctx context.Context) error {
/*** Utilities ***/ /*** Utilities ***/
// OwnerHint is a simple, optional mapping to distribute responsibility. // OwnerHint is a simple, optional mapping to distribute responsibility.
func OwnerHint(id int64, peers []string) int { func OwnerHint(id string, peers []string) int {
if len(peers) == 0 { if len(peers) == 0 {
return 0 return 0
} }
h := crc32.ChecksumIEEE([]byte(string(rune(id)))) h := crc32.ChecksumIEEE([]byte(id))
return int(h % uint32(len(peers))) return int(h % uint32(len(peers)))
} }
@@ -449,3 +677,109 @@ func getenvDefault(k, def string) string {
} }
return v return v
} }
// POST /mesh/blob (Body: {"id":<int64>}) -> streamt den Blob
func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
if !n.verify(body, r.Header.Get("X-Mesh-Sig")) {
http.Error(w, "bad signature", http.StatusUnauthorized)
return
}
var req struct {
ID string `json:"id"`
}
if err := json.Unmarshal(body, &req); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
if n.cbs.BlobOpen == nil {
http.Error(w, "blob unavailable", http.StatusNotFound)
return
}
rc, name, ct, size, err := n.cbs.BlobOpen(r.Context(), req.ID)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
defer rc.Close()
if ct == "" {
ct = "application/octet-stream"
}
if size > 0 {
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
}
w.Header().Set("Content-Type", ct)
w.Header().Set("X-Blob-Name", name)
_, _ = io.Copy(w, rc)
}
// sendBlobRequest schickt eine signierte Anfrage an /mesh/blob und liefert die Response
// zurück (Caller MUSS resp.Body schließen!). Bei HTTP 200 wird der Peer als gesehen markiert.
func (n *Node) sendBlobRequest(url, id string) (*http.Response, error) {
b, _ := json.Marshal(struct {
ID string `json:"id"`
}{ID: id})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
req.Header.Set("Content-Type", "application/json")
// ❗ WICHTIG: kein kurzer Timeout. Optional: großer Timeout aus Config
ctx := context.Background()
if n.cfg.BlobTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), n.cfg.BlobTimeout)
defer cancel()
}
req = req.WithContext(ctx)
resp, err := n.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // ausgehender Erfolg zählt als "gesehen"
return resp, nil
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
return nil, fmt.Errorf("blob %s from %s: %s", id, url, resp.Status)
}
// Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen
func (n *Node) FetchBlobAny(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) {
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
targets = append(targets, url)
}
n.mu.RUnlock()
if len(targets) == 0 {
// Fallback: Seeds probieren
targets = append(targets, n.cfg.Seeds...)
}
for _, u := range targets {
if strings.TrimSpace(u) == "" || u == n.self.URL {
continue
}
resp, err := n.sendBlobRequest(u, id)
if err != nil {
continue
}
if resp.StatusCode == http.StatusOK {
name := resp.Header.Get("X-Blob-Name")
ct := resp.Header.Get("Content-Type")
var size int64 = -1
if cl := resp.Header.Get("Content-Length"); cl != "" {
if s, err := strconv.ParseInt(cl, 10, 64); err == nil {
size = s
}
}
// Caller muss resp.Body schließen
return resp.Body, name, ct, size, nil
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
return nil, "", "", 0, fmt.Errorf("blob %s not found on peers", id)
}