This commit is contained in:
@@ -336,16 +336,42 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
|
|||||||
http.NotFound(w, r)
|
http.NotFound(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rc, meta, err := blobs.Open(r.Context(), id)
|
|
||||||
|
// 1) lokal
|
||||||
|
if rc, meta, err := blobs.Open(r.Context(), id); err == nil {
|
||||||
|
defer rc.Close()
|
||||||
|
w.Header().Set("Content-Type", meta.ContentType)
|
||||||
|
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
|
||||||
|
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
|
||||||
|
_, _ = io.Copy(w, rc)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2) remote holen
|
||||||
|
rrc, name, _, _, err := meshNode.FetchBlobAny(r.Context(), id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.NotFound(w, r)
|
http.NotFound(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer rc.Close()
|
defer rrc.Close()
|
||||||
|
|
||||||
|
// 3) lokal cachen
|
||||||
|
if _, err := blobs.Save(r.Context(), id, name, rrc); err != nil {
|
||||||
|
http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 4) erneut lokal öffnen und streamen
|
||||||
|
lrc, meta, err := blobs.Open(r.Context(), id)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "open failed", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer lrc.Close()
|
||||||
w.Header().Set("Content-Type", meta.ContentType)
|
w.Header().Set("Content-Type", meta.ContentType)
|
||||||
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
|
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
|
||||||
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
|
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
|
||||||
_, _ = io.Copy(w, rc)
|
_, _ = io.Copy(w, lrc)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -387,7 +413,9 @@ func main() {
|
|||||||
|
|
||||||
// Mesh starten
|
// Mesh starten
|
||||||
mcfg := mesh.FromEnv()
|
mcfg := mesh.FromEnv()
|
||||||
mnode, err := mesh.New(mcfg, mesh.Callbacks{
|
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
|
||||||
|
|
||||||
|
mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{
|
||||||
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
|
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
|
||||||
s, err := st.Snapshot(ctx)
|
s, err := st.Snapshot(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -398,6 +426,13 @@ func main() {
|
|||||||
ApplyRemote: func(ctx context.Context, s mesh.Snapshot) error {
|
ApplyRemote: func(ctx context.Context, s mesh.Snapshot) error {
|
||||||
return st.ApplyRemote(ctx, fromMeshSnapshot(s))
|
return st.ApplyRemote(ctx, fromMeshSnapshot(s))
|
||||||
},
|
},
|
||||||
|
BlobOpen: func(ctx context.Context, id int64) (io.ReadCloser, string, string, int64, error) {
|
||||||
|
rc, meta, err := blobs.Open(ctx, id)
|
||||||
|
if err != nil {
|
||||||
|
return nil, "", "", 0, err
|
||||||
|
}
|
||||||
|
return rc, meta.Name, meta.ContentType, meta.Size, nil
|
||||||
|
},
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("mesh init: %v", err)
|
log.Fatalf("mesh init: %v", err)
|
||||||
@@ -414,7 +449,7 @@ func main() {
|
|||||||
root := http.NewServeMux()
|
root := http.NewServeMux()
|
||||||
|
|
||||||
// API (Bearer-Auth)
|
// API (Bearer-Auth)
|
||||||
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
|
//blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
|
||||||
apiMux := http.NewServeMux()
|
apiMux := http.NewServeMux()
|
||||||
fileRoutes(apiMux, st)
|
fileRoutes(apiMux, st)
|
||||||
apiFiles(apiMux, st, blobs, mnode)
|
apiFiles(apiMux, st, blobs, mnode)
|
||||||
|
|||||||
@@ -142,16 +142,41 @@ func Register(mux *http.ServeMux, d Deps) {
|
|||||||
http.NotFound(w, r)
|
http.NotFound(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rc, meta, err := d.Blob.Open(r.Context(), id)
|
|
||||||
|
// 1) lokal versuchen
|
||||||
|
if rc, meta, err := d.Blob.Open(r.Context(), id); err == nil {
|
||||||
|
defer rc.Close()
|
||||||
|
w.Header().Set("Content-Type", meta.ContentType)
|
||||||
|
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
|
||||||
|
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
|
||||||
|
_, _ = io.Copy(w, rc)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2) Remote über Mesh holen
|
||||||
|
rrc, name, _, _, err := d.Mesh.FetchBlobAny(r.Context(), id)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.NotFound(w, r)
|
http.NotFound(w, r)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer rc.Close()
|
defer rrc.Close()
|
||||||
|
|
||||||
|
// 3) lokal cachen (Save konsumiert den Stream)
|
||||||
|
if _, err := d.Blob.Save(r.Context(), id, name, rrc); err != nil {
|
||||||
|
http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// 4) aus lokalem Store ausliefern (saubere Größe/CT)
|
||||||
|
lrc, meta, err := d.Blob.Open(r.Context(), id)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "open failed", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer lrc.Close()
|
||||||
w.Header().Set("Content-Type", meta.ContentType)
|
w.Header().Set("Content-Type", meta.ContentType)
|
||||||
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
|
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
|
||||||
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
|
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
|
||||||
_, _ = io.Copy(w, rc)
|
_, _ = io.Copy(w, lrc)
|
||||||
})
|
})
|
||||||
|
|
||||||
mux.HandleFunc("/admin/peers", func(w http.ResponseWriter, r *http.Request) {
|
mux.HandleFunc("/admin/peers", func(w http.ResponseWriter, r *http.Request) {
|
||||||
@@ -198,6 +223,7 @@ func Register(mux *http.ServeMux, d Deps) {
|
|||||||
}
|
}
|
||||||
if id, err := strconv.ParseInt(r.FormValue("id"), 10, 64); err == nil {
|
if id, err := strconv.ParseInt(r.FormValue("id"), 10, 64); err == nil {
|
||||||
_, _ = d.Store.Delete(r.Context(), filesvc.ID(id))
|
_, _ = d.Store.Delete(r.Context(), filesvc.ID(id))
|
||||||
|
_ = d.Blob.Delete(r.Context(), int64(id))
|
||||||
_ = d.Mesh.SyncNow(r.Context())
|
_ = d.Mesh.SyncNow(r.Context())
|
||||||
}
|
}
|
||||||
http.Redirect(w, r, "/admin/items", http.StatusSeeOther)
|
http.Redirect(w, r, "/admin/items", http.StatusSeeOther)
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ import (
|
|||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"hash/crc32"
|
"hash/crc32"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
@@ -15,6 +16,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"slices"
|
"slices"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -53,6 +55,7 @@ type Snapshot struct {
|
|||||||
type Callbacks struct {
|
type Callbacks struct {
|
||||||
GetSnapshot func(ctx context.Context) (Snapshot, error)
|
GetSnapshot func(ctx context.Context) (Snapshot, error)
|
||||||
ApplyRemote func(ctx context.Context, s Snapshot) error
|
ApplyRemote func(ctx context.Context, s Snapshot) error
|
||||||
|
BlobOpen func(ctx context.Context, id int64) (io.ReadCloser, string, string, int64, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*** Node ***/
|
/*** Node ***/
|
||||||
@@ -170,6 +173,7 @@ func (n *Node) Serve() error {
|
|||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
mux.HandleFunc("/mesh/peers", n.peersHandler)
|
mux.HandleFunc("/mesh/peers", n.peersHandler)
|
||||||
mux.HandleFunc("/mesh/hello", n.helloHandler)
|
mux.HandleFunc("/mesh/hello", n.helloHandler)
|
||||||
|
mux.HandleFunc("/mesh/blob", n.blobHandler)
|
||||||
mux.HandleFunc("/mesh/sync", n.syncHandler)
|
mux.HandleFunc("/mesh/sync", n.syncHandler)
|
||||||
n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux}
|
n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux}
|
||||||
|
|
||||||
@@ -449,3 +453,86 @@ func getenvDefault(k, def string) string {
|
|||||||
}
|
}
|
||||||
return v
|
return v
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// POST /mesh/blob (Body: {"id":<int64>}) -> streamt den Blob
|
||||||
|
func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) {
|
||||||
|
body, _ := io.ReadAll(r.Body)
|
||||||
|
if !n.verify(body, r.Header.Get("X-Mesh-Sig")) {
|
||||||
|
http.Error(w, "bad signature", http.StatusUnauthorized)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
var req struct {
|
||||||
|
ID int64 `json:"id"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal(body, &req); err != nil {
|
||||||
|
http.Error(w, "bad json", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if n.cbs.BlobOpen == nil {
|
||||||
|
http.Error(w, "blob unavailable", http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
rc, name, ct, size, err := n.cbs.BlobOpen(r.Context(), req.ID)
|
||||||
|
if err != nil {
|
||||||
|
http.Error(w, "not found", http.StatusNotFound)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer rc.Close()
|
||||||
|
if ct == "" {
|
||||||
|
ct = "application/octet-stream"
|
||||||
|
}
|
||||||
|
if size > 0 {
|
||||||
|
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
|
||||||
|
}
|
||||||
|
w.Header().Set("Content-Type", ct)
|
||||||
|
w.Header().Set("X-Blob-Name", name)
|
||||||
|
_, _ = io.Copy(w, rc)
|
||||||
|
}
|
||||||
|
|
||||||
|
// interner Helper: signierter Blob-Request an einen Peer
|
||||||
|
func (n *Node) sendBlobRequest(url string, id int64) (*http.Response, error) {
|
||||||
|
b, _ := json.Marshal(struct {
|
||||||
|
ID int64 `json:"id"`
|
||||||
|
}{ID: id})
|
||||||
|
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b))
|
||||||
|
req.Header.Set("X-Mesh-Sig", n.sign(b))
|
||||||
|
return n.client.Do(req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen
|
||||||
|
func (n *Node) FetchBlobAny(ctx context.Context, id int64) (io.ReadCloser, string, string, int64, error) {
|
||||||
|
n.mu.RLock()
|
||||||
|
targets := make([]string, 0, len(n.peers))
|
||||||
|
for url := range n.peers {
|
||||||
|
targets = append(targets, url)
|
||||||
|
}
|
||||||
|
n.mu.RUnlock()
|
||||||
|
if len(targets) == 0 {
|
||||||
|
// Fallback: Seeds probieren
|
||||||
|
targets = append(targets, n.cfg.Seeds...)
|
||||||
|
}
|
||||||
|
for _, u := range targets {
|
||||||
|
if strings.TrimSpace(u) == "" || u == n.self.URL {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
resp, err := n.sendBlobRequest(u, id)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if resp.StatusCode == http.StatusOK {
|
||||||
|
name := resp.Header.Get("X-Blob-Name")
|
||||||
|
ct := resp.Header.Get("Content-Type")
|
||||||
|
var size int64 = -1
|
||||||
|
if cl := resp.Header.Get("Content-Length"); cl != "" {
|
||||||
|
if s, err := strconv.ParseInt(cl, 10, 64); err == nil {
|
||||||
|
size = s
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Caller muss resp.Body schließen
|
||||||
|
return resp.Body, name, ct, size, nil
|
||||||
|
}
|
||||||
|
io.Copy(io.Discard, resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
}
|
||||||
|
return nil, "", "", 0, fmt.Errorf("blob %d not found on peers", id)
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user