From 7d0f4befe1701c9e69254cca5c4a3d840121338a Mon Sep 17 00:00:00 2001 From: jbergner Date: Sat, 27 Sep 2025 14:16:15 +0200 Subject: [PATCH] update --- cmd/unified/main.go | 45 ++++++++++++++++++--- internal/admin/admin.go | 32 +++++++++++++-- internal/mesh/mesh.go | 87 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 156 insertions(+), 8 deletions(-) diff --git a/cmd/unified/main.go b/cmd/unified/main.go index e179ccb..2a54c01 100644 --- a/cmd/unified/main.go +++ b/cmd/unified/main.go @@ -336,16 +336,42 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m http.NotFound(w, r) return } - rc, meta, err := blobs.Open(r.Context(), id) + + // 1) lokal + if rc, meta, err := blobs.Open(r.Context(), id); err == nil { + defer rc.Close() + w.Header().Set("Content-Type", meta.ContentType) + w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10)) + w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name)) + _, _ = io.Copy(w, rc) + return + } + + // 2) remote holen + rrc, name, _, _, err := meshNode.FetchBlobAny(r.Context(), id) if err != nil { http.NotFound(w, r) return } - defer rc.Close() + defer rrc.Close() + + // 3) lokal cachen + if _, err := blobs.Save(r.Context(), id, name, rrc); err != nil { + http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError) + return + } + + // 4) erneut lokal öffnen und streamen + lrc, meta, err := blobs.Open(r.Context(), id) + if err != nil { + http.Error(w, "open failed", http.StatusInternalServerError) + return + } + defer lrc.Close() w.Header().Set("Content-Type", meta.ContentType) w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10)) w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name)) - _, _ = io.Copy(w, rc) + _, _ = io.Copy(w, lrc) }) } @@ -387,7 +413,9 @@ func main() { // Mesh starten mcfg := mesh.FromEnv() - mnode, err := mesh.New(mcfg, mesh.Callbacks{ + blobs := blobfs.New(getenvDefault("DATA_DIR", "./data")) + + mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{ GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) { s, err := st.Snapshot(ctx) if err != nil { @@ -398,6 +426,13 @@ func main() { ApplyRemote: func(ctx context.Context, s mesh.Snapshot) error { return st.ApplyRemote(ctx, fromMeshSnapshot(s)) }, + BlobOpen: func(ctx context.Context, id int64) (io.ReadCloser, string, string, int64, error) { + rc, meta, err := blobs.Open(ctx, id) + if err != nil { + return nil, "", "", 0, err + } + return rc, meta.Name, meta.ContentType, meta.Size, nil + }, }) if err != nil { log.Fatalf("mesh init: %v", err) @@ -414,7 +449,7 @@ func main() { root := http.NewServeMux() // API (Bearer-Auth) - blobs := blobfs.New(getenvDefault("DATA_DIR", "./data")) + //blobs := blobfs.New(getenvDefault("DATA_DIR", "./data")) apiMux := http.NewServeMux() fileRoutes(apiMux, st) apiFiles(apiMux, st, blobs, mnode) diff --git a/internal/admin/admin.go b/internal/admin/admin.go index 66bcb57..a321cc5 100644 --- a/internal/admin/admin.go +++ b/internal/admin/admin.go @@ -142,16 +142,41 @@ func Register(mux *http.ServeMux, d Deps) { http.NotFound(w, r) return } - rc, meta, err := d.Blob.Open(r.Context(), id) + + // 1) lokal versuchen + if rc, meta, err := d.Blob.Open(r.Context(), id); err == nil { + defer rc.Close() + w.Header().Set("Content-Type", meta.ContentType) + w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10)) + w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name)) + _, _ = io.Copy(w, rc) + return + } + + // 2) Remote über Mesh holen + rrc, name, _, _, err := d.Mesh.FetchBlobAny(r.Context(), id) if err != nil { http.NotFound(w, r) return } - defer rc.Close() + defer rrc.Close() + + // 3) lokal cachen (Save konsumiert den Stream) + if _, err := d.Blob.Save(r.Context(), id, name, rrc); err != nil { + http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError) + return + } + // 4) aus lokalem Store ausliefern (saubere Größe/CT) + lrc, meta, err := d.Blob.Open(r.Context(), id) + if err != nil { + http.Error(w, "open failed", http.StatusInternalServerError) + return + } + defer lrc.Close() w.Header().Set("Content-Type", meta.ContentType) w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10)) w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name)) - _, _ = io.Copy(w, rc) + _, _ = io.Copy(w, lrc) }) mux.HandleFunc("/admin/peers", func(w http.ResponseWriter, r *http.Request) { @@ -198,6 +223,7 @@ func Register(mux *http.ServeMux, d Deps) { } if id, err := strconv.ParseInt(r.FormValue("id"), 10, 64); err == nil { _, _ = d.Store.Delete(r.Context(), filesvc.ID(id)) + _ = d.Blob.Delete(r.Context(), int64(id)) _ = d.Mesh.SyncNow(r.Context()) } http.Redirect(w, r, "/admin/items", http.StatusSeeOther) diff --git a/internal/mesh/mesh.go b/internal/mesh/mesh.go index c8049a6..aea5ebf 100644 --- a/internal/mesh/mesh.go +++ b/internal/mesh/mesh.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "encoding/json" "errors" + "fmt" "hash/crc32" "io" "log" @@ -15,6 +16,7 @@ import ( "net/http" "os" "slices" + "strconv" "strings" "sync" "time" @@ -53,6 +55,7 @@ type Snapshot struct { type Callbacks struct { GetSnapshot func(ctx context.Context) (Snapshot, error) ApplyRemote func(ctx context.Context, s Snapshot) error + BlobOpen func(ctx context.Context, id int64) (io.ReadCloser, string, string, int64, error) } /*** Node ***/ @@ -170,6 +173,7 @@ func (n *Node) Serve() error { mux := http.NewServeMux() mux.HandleFunc("/mesh/peers", n.peersHandler) mux.HandleFunc("/mesh/hello", n.helloHandler) + mux.HandleFunc("/mesh/blob", n.blobHandler) mux.HandleFunc("/mesh/sync", n.syncHandler) n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux} @@ -449,3 +453,86 @@ func getenvDefault(k, def string) string { } return v } + +// POST /mesh/blob (Body: {"id":}) -> streamt den Blob +func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + if !n.verify(body, r.Header.Get("X-Mesh-Sig")) { + http.Error(w, "bad signature", http.StatusUnauthorized) + return + } + var req struct { + ID int64 `json:"id"` + } + if err := json.Unmarshal(body, &req); err != nil { + http.Error(w, "bad json", http.StatusBadRequest) + return + } + if n.cbs.BlobOpen == nil { + http.Error(w, "blob unavailable", http.StatusNotFound) + return + } + rc, name, ct, size, err := n.cbs.BlobOpen(r.Context(), req.ID) + if err != nil { + http.Error(w, "not found", http.StatusNotFound) + return + } + defer rc.Close() + if ct == "" { + ct = "application/octet-stream" + } + if size > 0 { + w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) + } + w.Header().Set("Content-Type", ct) + w.Header().Set("X-Blob-Name", name) + _, _ = io.Copy(w, rc) +} + +// interner Helper: signierter Blob-Request an einen Peer +func (n *Node) sendBlobRequest(url string, id int64) (*http.Response, error) { + b, _ := json.Marshal(struct { + ID int64 `json:"id"` + }{ID: id}) + req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b)) + req.Header.Set("X-Mesh-Sig", n.sign(b)) + return n.client.Do(req) +} + +// Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen +func (n *Node) FetchBlobAny(ctx context.Context, id int64) (io.ReadCloser, string, string, int64, error) { + n.mu.RLock() + targets := make([]string, 0, len(n.peers)) + for url := range n.peers { + targets = append(targets, url) + } + n.mu.RUnlock() + if len(targets) == 0 { + // Fallback: Seeds probieren + targets = append(targets, n.cfg.Seeds...) + } + for _, u := range targets { + if strings.TrimSpace(u) == "" || u == n.self.URL { + continue + } + resp, err := n.sendBlobRequest(u, id) + if err != nil { + continue + } + if resp.StatusCode == http.StatusOK { + name := resp.Header.Get("X-Blob-Name") + ct := resp.Header.Get("Content-Type") + var size int64 = -1 + if cl := resp.Header.Get("Content-Length"); cl != "" { + if s, err := strconv.ParseInt(cl, 10, 64); err == nil { + size = s + } + } + // Caller muss resp.Body schließen + return resp.Body, name, ct, size, nil + } + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + return nil, "", "", 0, fmt.Errorf("blob %d not found on peers", id) +}