Compare commits

18 Commits
aio ... main

Author SHA1 Message Date
42b0493adb fix-2
All checks were successful
release-tag / release-image (push) Successful in 1m26s
2025-09-30 00:34:24 +02:00
b3c1f37632 fix
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:19:39 +02:00
40ded4d4db Bugfix: cache failed
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:09:57 +02:00
291cfa33a9 Persistenz implementiert : Test-3
All checks were successful
release-tag / release-image (push) Successful in 1m27s
2025-09-29 23:55:35 +02:00
74fef30251 Test-2
All checks were successful
release-tag / release-image (push) Successful in 1m31s
2025-09-29 23:44:08 +02:00
b7729e8b39 Test-1
All checks were successful
release-tag / release-image (push) Successful in 1m52s
2025-09-29 23:08:34 +02:00
9122ebf2f1 vor temaplte 2025-09-29 23:04:46 +02:00
36d1c1512a Node autocleanup
All checks were successful
release-tag / release-image (push) Successful in 1m32s
2025-09-29 21:19:07 +02:00
aaa95410e4 Template Anpassung
All checks were successful
release-tag / release-image (push) Successful in 1m26s
2025-09-28 00:00:09 +02:00
0bf22b6120 fix self
All checks were successful
release-tag / release-image (push) Successful in 1m25s
2025-09-27 23:53:17 +02:00
b142a0b1a5 layout max to 1200
All checks were successful
release-tag / release-image (push) Successful in 1m33s
2025-09-27 23:42:35 +02:00
baedff9e9d Umstellung auf uuid
All checks were successful
release-tag / release-image (push) Successful in 1m32s
2025-09-27 21:42:32 +02:00
92e222f648 buxfix-2
All checks were successful
release-tag / release-image (push) Successful in 1m26s
2025-09-27 17:52:10 +02:00
43f1d01a8a diverse bugfixes
All checks were successful
release-tag / release-image (push) Successful in 1m32s
2025-09-27 17:39:39 +02:00
d125e3dd54 findnodes
All checks were successful
release-tag / release-image (push) Successful in 1m28s
2025-09-27 16:25:13 +02:00
6a0dc578ea fix
All checks were successful
release-tag / release-image (push) Successful in 1m27s
2025-09-27 16:00:53 +02:00
8c05ad6ffe update auf public download
All checks were successful
release-tag / release-image (push) Successful in 1m40s
2025-09-27 15:55:16 +02:00
7d0f4befe1 update
All checks were successful
release-tag / release-image (push) Successful in 1m28s
2025-09-27 14:16:15 +02:00
9 changed files with 1049 additions and 156 deletions

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
@@ -22,6 +23,14 @@ import (
"git.send.nrw/sendnrw/decent-webui/internal/mesh"
)
func parseDuration(s string, def time.Duration) time.Duration {
d, err := time.ParseDuration(strings.TrimSpace(s))
if err != nil || d <= 0 {
return def
}
return d
}
/*** Config ***/
func loadConfig() AppConfig {
// HTTP
@@ -44,6 +53,14 @@ func loadConfig() AppConfig {
DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"),
}
m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute)
m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second)
m.HelloInterval = parseDuration(os.Getenv("MESH_HELLO_INTERVAL"), 20*time.Second)
m.HelloFanout = parseIntEnv(os.Getenv("MESH_HELLO_FANOUT"), 8)
m.BlobTimeout = parseDuration(os.Getenv("MESH_BLOB_TIMEOUT"), 0)
// Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung:
if strings.TrimSpace(m.AdvertURL) == "" {
m.AdvertURL = inferAdvertURL(m.BindAddr)
@@ -62,11 +79,13 @@ func loadConfig() AppConfig {
}
return AppConfig{
HTTPAddr: httpAddr,
APIKey: apiKey,
AdminUser: adminUser,
AdminPass: adminPass,
Mesh: m,
HTTPAddr: httpAddr,
APIKey: apiKey,
AdminUser: adminUser,
AdminPass: adminPass,
Mesh: m,
PublicDownloads: parseBoolEnv("PUBLIC_DOWNLOADS", false),
PublicPath: getenvDefault("PUBLIC_DOWNLOAD_PATH", "/dl"),
}
}
@@ -88,6 +107,18 @@ func parseBoolEnv(k string, def bool) bool {
return v == "1" || v == "true" || v == "yes" || v == "on"
}
func parseIntEnv(k string, def int) int {
v := strings.TrimSpace(os.Getenv(k))
if v == "" {
return def
}
n, err := strconv.Atoi(v)
if err != nil {
return def
}
return n
}
func splitCSV(s string) []string {
s = strings.TrimSpace(s)
if s == "" {
@@ -137,6 +168,9 @@ type AppConfig struct {
AdminUser string
AdminPass string
Mesh mesh.Config
PublicDownloads bool // ENV: PUBLIC_DOWNLOADS (default false)
PublicPath string // ENV: PUBLIC_DOWNLOAD_PATH (default "/dl")
}
/*** Middleware ***/
@@ -188,7 +222,7 @@ func writeJSON(w http.ResponseWriter, code int, v any) {
/*** API-Routen ***/
func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore) {
func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store) {
// Health
mux.HandleFunc("/api/v1/health", func(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
@@ -198,13 +232,8 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore) {
mux.HandleFunc("/api/v1/items", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
nextStr := r.URL.Query().Get("next")
var next filesvc.ID
if nextStr != "" {
if n, err := strconv.ParseInt(nextStr, 10, 64); err == nil {
next = filesvc.ID(n)
}
}
nextStr := strings.TrimSpace(r.URL.Query().Get("next"))
next := filesvc.ID(nextStr)
items, nextOut, err := store.List(r.Context(), next, 100)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
@@ -246,6 +275,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore) {
}
it, err := store.Rename(r.Context(), in.ID, in.Name)
if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound
@@ -270,7 +303,12 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore) {
return
}
it, err := store.Delete(r.Context(), in.ID)
_ = blobs.Delete(r.Context(), string(in.ID))
if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound
@@ -310,7 +348,7 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
meta, err := blobs.Save(r.Context(), int64(it.ID), name, fh)
meta, err := blobs.Save(r.Context(), string(it.ID), name, fh)
if err != nil {
_, _ = store.Delete(r.Context(), it.ID)
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
@@ -325,27 +363,61 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
// Download
mux.HandleFunc("/api/v1/files/", func(w http.ResponseWriter, r *http.Request) {
// /api/v1/files/{id}/download
parts := strings.Split(strings.TrimPrefix(r.URL.Path, "/api/v1/files/"), "/")
if len(parts) != 2 || parts[1] != "download" {
http.NotFound(w, r)
return
}
id, err := strconv.ParseInt(parts[0], 10, 64)
id := parts[0]
if strings.TrimSpace(id) == "" {
http.NotFound(w, r)
return
}
// 1) Metadaten prüfen
it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted {
http.NotFound(w, r)
return
}
// 2) lokal
if rc, meta, err := blobs.Open(r.Context(), id); err == nil {
defer rc.Close()
serveBlob(w, r, rc, meta, it.Name)
return
}
// 3) remote holen & cachen
it1, _ := store.Get(r.Context(), filesvc.ID(id))
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if cfg := meshNode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it1.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id)
if err != nil {
http.NotFound(w, r)
return
}
rc, meta, err := blobs.Open(r.Context(), id)
if err != nil {
http.NotFound(w, r)
defer rrc.Close()
if _, err := blobs.Save(r.Context(), id, it1.Name, rrc); err != nil {
http.Error(w, "cache failed", http.StatusInternalServerError)
return
}
defer rc.Close()
w.Header().Set("Content-Type", meta.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
_, _ = io.Copy(w, rc)
// 4) lokal streamen
lrc, meta, err := blobs.Open(r.Context(), id)
if err != nil {
http.Error(w, "open failed", http.StatusInternalServerError)
return
}
defer lrc.Close()
serveBlob(w, r, lrc, meta, it1.Name)
})
}
@@ -355,10 +427,11 @@ func toMeshSnapshot(s filesvc.Snapshot) mesh.Snapshot {
out := mesh.Snapshot{Items: make([]mesh.Item, 0, len(s.Items))}
for _, it := range s.Items {
out.Items = append(out.Items, mesh.Item{
ID: int64(it.ID),
ID: it.ID,
Name: it.Name,
UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted,
Owner: it.Owner,
})
}
return out
@@ -368,26 +441,58 @@ func fromMeshSnapshot(ms mesh.Snapshot) filesvc.Snapshot {
out := filesvc.Snapshot{Items: make([]filesvc.File, 0, len(ms.Items))}
for _, it := range ms.Items {
out.Items = append(out.Items, filesvc.File{
ID: filesvc.ID(it.ID),
ID: it.ID,
Name: it.Name,
UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted,
Owner: it.Owner,
})
}
return out
}
// isOwnerActive prüft, ob der Owner in der Peer-Liste als "aktiv" gilt.
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
owner = strings.TrimSpace(owner)
if owner == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) != owner {
continue
}
// Self ist per Definition aktiv
if p.Self {
return true
}
// ohne LastSeen: als inaktiv behandeln
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
// Owner ist nicht mal in der Liste: inaktiv
return false
}
/*** main ***/
func main() {
cfg := loadConfig()
// Domain-Store (mesh-fähig)
st := filesvc.NewMemStore()
nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL)
//st := filesvc.NewMemStore(nodeID)
// Mesh starten
mcfg := mesh.FromEnv()
mnode, err := mesh.New(mcfg, mesh.Callbacks{
//mcfg := mesh.FromEnv()
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
dataDir := getenvDefault("DATA_DIR", "./data")
metaPath := filepath.Join(dataDir, "meta", "items.json")
st := filesvc.NewMemStorePersistent(nodeID, metaPath)
mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
s, err := st.Snapshot(ctx)
if err != nil {
@@ -398,28 +503,88 @@ func main() {
ApplyRemote: func(ctx context.Context, s mesh.Snapshot) error {
return st.ApplyRemote(ctx, fromMeshSnapshot(s))
},
BlobOpen: func(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) { //5588
it, err := st.Get(ctx, filesvc.ID(id))
if err != nil || it.Deleted {
return nil, "", "", 0, fmt.Errorf("not found")
}
rc, meta, err := blobs.Open(ctx, id)
if err != nil {
return nil, "", "", 0, err
}
return rc, it.Name, meta.ContentType, meta.Size, nil
},
})
if err != nil {
log.Fatalf("mesh init: %v", err)
}
// Hintergrund-Pruner starten
mnode.StartPeerPruner()
go func() {
log.Printf("[mesh] listening on %s advertise %s seeds=%v discovery=%v",
mcfg.BindAddr, mcfg.AdvertURL, mcfg.Seeds, mcfg.EnableDiscovery)
cfg.Mesh.BindAddr, cfg.Mesh.AdvertURL, cfg.Mesh.Seeds, cfg.Mesh.EnableDiscovery)
if err := mnode.Serve(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("mesh serve: %v", err)
}
}()
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// aktive Owner bestimmen
peers := mnode.PeerList()
ttl := 2 * time.Minute
if cfg := mnode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
cutoff := time.Now().Add(-ttl)
active := map[string]bool{}
for _, p := range peers {
if p.Self {
active[p.URL] = true
continue
}
if !p.LastSeen.IsZero() && p.LastSeen.After(cutoff) {
active[p.URL] = true
}
}
// alle Items durchgehen; Blobs von Offline-Ownern löschen
var next filesvc.ID
for {
items, nextOut, _ := st.List(context.Background(), next, 1000)
for _, it := range items {
if it.Owner == "" || active[it.Owner] {
continue
}
_ = blobs.Delete(context.Background(), it.ID)
}
if nextOut == "" {
break
}
next = nextOut
}
}
}()
// Root-Mux
root := http.NewServeMux()
// API (Bearer-Auth)
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
//blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
apiMux := http.NewServeMux()
fileRoutes(apiMux, st)
fileRoutes(apiMux, st, blobs)
apiFiles(apiMux, st, blobs, mnode)
root.Handle("/api/", authMiddleware(cfg.APIKey, apiMux))
if cfg.PublicDownloads {
registerPublicDownloads(root, st, blobs, mnode, cfg.PublicPath)
}
// Admin-UI (optional BasicAuth via ADMIN_USER/ADMIN_PASS)
adminRoot := http.NewServeMux()
admin.Register(adminRoot, admin.Deps{Store: st, Mesh: mnode, Blob: blobs})
@@ -450,7 +615,8 @@ func main() {
// Graceful shutdown
go func() {
log.Printf("http listening on %s (api=/api/v1, admin=/admin)", cfg.Mesh.BindAddr)
log.Printf("http listening on %s (api=/api/v1, admin=/admin)", cfg.HTTPAddr)
log.Printf("mesh listening on %s", cfg.Mesh.BindAddr)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("http server: %v", err)
}
@@ -467,3 +633,96 @@ func main() {
_ = mnode.Close(ctx)
log.Println("shutdown complete")
}
// Public: GET {base}/{id}
// Beispiel: /dl/1
func registerPublicDownloads(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, meshNode *mesh.Node, base string) {
if !strings.HasPrefix(base, "/") {
base = "/" + base
}
mux.HandleFunc(base+"/", func(w http.ResponseWriter, r *http.Request) {
id := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, base+"/"))
if id == "" {
http.NotFound(w, r)
return
}
// 1) Metadaten prüfen
it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted {
http.NotFound(w, r)
return
}
// 2) Lokal versuchen
if rc, meta, err := blobs.Open(context.Background(), id); err == nil {
defer rc.Close()
serveBlob(w, r, rc, meta, it.Name)
return
}
// (Optional) Owner-Online-Check — wenn du auch bei offline Ownern liefern willst, block auskommentieren
{
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if !isOwnerActive(it.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
}
// 3) Aus Mesh holen — EIGENER Timeout-Kontext, NICHT r.Context()
rrc, remoteName, _, _, err := meshNode.FetchBlobAny(context.Background(), id)
if err != nil {
http.NotFound(w, r)
return
}
defer rrc.Close()
filename := strings.TrimSpace(remoteName)
if filename == "" {
filename = it.Name
}
// 4) Lokal cachen — KEIN Request-Kontext, damit Save nicht abbricht
if _, err := blobs.Save(context.Background(), id, filename, rrc); err != nil {
log.Printf("[public] cache save failed id=%s name=%q: %v", id, filename, err)
http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError)
return
}
// 5) Erneut lokal öffnen und streamen
lrc, meta, err := blobs.Open(context.Background(), id)
if err != nil {
http.Error(w, "open failed: "+err.Error(), http.StatusInternalServerError)
return
}
defer lrc.Close()
serveBlob(w, r, lrc, meta, filename)
})
}
// Hilfsfunktion: setzt sinnvolle Header und streamt die Datei
func serveBlob(w http.ResponseWriter, r *http.Request, rc io.ReadSeeker, meta blobfs.Meta, downloadName string) {
if meta.SHA256 != "" {
etag := `W/"` + meta.SHA256 + `"`
if r.Header.Get("If-None-Match") == etag {
w.WriteHeader(http.StatusNotModified)
return
}
w.Header().Set("ETag", etag)
}
if meta.ContentType == "" {
meta.ContentType = "application/octet-stream"
}
if downloadName == "" {
downloadName = meta.Name
}
w.Header().Set("Content-Type", meta.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, downloadName))
w.Header().Set("Access-Control-Expose-Headers", "Content-Disposition")
w.Header().Set("X-Robots-Tag", "noindex")
_, _ = io.Copy(w, rc)
}

View File

@@ -56,31 +56,36 @@ func Register(mux *http.ServeMux, d Deps) {
// Partials
mux.HandleFunc("/admin/items", func(w http.ResponseWriter, r *http.Request) {
nextQ := strings.TrimSpace(r.URL.Query().Get("next"))
var nextID filesvc.ID
if nextQ != "" {
if n, err := strconv.ParseInt(nextQ, 10, 64); err == nil {
nextID = filesvc.ID(n)
}
}
nextID := filesvc.ID(strings.TrimSpace(r.URL.Query().Get("next")))
items, nextOut, _ := d.Store.List(r.Context(), nextID, 100)
type row struct {
ID int64
Name string
UpdatedAt int64
HasBlob bool
Size int64
ID string
Name string
UpdatedAt int64
HasBlob bool
Size int64
Owner string
OwnerActive bool
}
rows := make([]row, 0, len(items))
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), int64(it.ID))
meta, ok, _ := d.Blob.Stat(r.Context(), it.ID)
rows = append(rows, row{
ID: int64(it.ID),
Name: it.Name,
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
ID: it.ID,
Name: it.Name,
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
})
}
@@ -90,6 +95,42 @@ func Register(mux *http.ServeMux, d Deps) {
})
})
mux.HandleFunc("/admin/items/takeover", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
id := strings.TrimSpace(r.FormValue("id"))
if id != "" {
// Nur zulassen, wenn Owner tatsächlich offline ist
it, err := d.Store.Get(r.Context(), filesvc.ID(id))
if err == nil {
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it.Owner, peers, ttl) {
// eigene URL aus PeerList ermitteln
self := ""
for _, p := range peers {
if p.Self {
self = p.URL
break
}
}
if self == "" {
self = "unknown-self"
}
if _, err := d.Store.TakeoverOwner(r.Context(), filesvc.ID(id), self); err == nil {
_ = d.Mesh.SyncNow(r.Context())
}
}
}
}
renderItemsPartial(w, r, d)
})
// Upload (multipart/form-data, Feldname "file", optional name-Override)
mux.HandleFunc("/admin/files/upload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
@@ -119,14 +160,14 @@ func Register(mux *http.ServeMux, d Deps) {
}
// 2) Blob speichern
if _, err := d.Blob.Save(r.Context(), int64(it.ID), name, fh); err != nil {
if _, err := d.Blob.Save(r.Context(), (it.ID), name, fh); err != nil {
// zurückrollen (Tombstone)
_, _ = d.Store.Delete(r.Context(), it.ID)
http.Error(w, "save failed: "+err.Error(), http.StatusInternalServerError)
return
}
_ = d.Mesh.SyncNow(r.Context()) // best-effort Push
http.Redirect(w, r, "/admin/items", http.StatusSeeOther)
http.Redirect(w, r, "/admin", http.StatusSeeOther)
})
// Download (Admin BasicAuth schützt ggf.)
@@ -137,21 +178,46 @@ func Register(mux *http.ServeMux, d Deps) {
http.NotFound(w, r)
return
}
id, err := strconv.ParseInt(parts[0], 10, 64)
id := parts[0]
if strings.TrimSpace(id) == "" {
http.NotFound(w, r)
return
}
// 1) lokal versuchen
if rc, meta, err := d.Blob.Open(r.Context(), id); err == nil {
defer rc.Close()
w.Header().Set("Content-Type", meta.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
_, _ = io.Copy(w, rc)
return
}
// 2) Remote über Mesh holen
rrc, name, _, _, err := d.Mesh.FetchBlobAny(r.Context(), id)
if err != nil {
http.NotFound(w, r)
return
}
rc, meta, err := d.Blob.Open(r.Context(), id)
if err != nil {
http.NotFound(w, r)
defer rrc.Close()
// 3) lokal cachen (Save konsumiert den Stream)
if _, err := d.Blob.Save(r.Context(), id, name, rrc); err != nil {
http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError)
return
}
defer rc.Close()
// 4) aus lokalem Store ausliefern (saubere Größe/CT)
lrc, meta, err := d.Blob.Open(r.Context(), id)
if err != nil {
http.Error(w, "open failed", http.StatusInternalServerError)
return
}
defer lrc.Close()
w.Header().Set("Content-Type", meta.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, meta.Name))
_, _ = io.Copy(w, rc)
_, _ = io.Copy(w, lrc)
})
mux.HandleFunc("/admin/peers", func(w http.ResponseWriter, r *http.Request) {
@@ -162,7 +228,7 @@ func Register(mux *http.ServeMux, d Deps) {
})
})
// Actions (HTMX POSTs)
// CREATE
mux.HandleFunc("/admin/items/create", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
@@ -171,36 +237,40 @@ func Register(mux *http.ServeMux, d Deps) {
name := strings.TrimSpace(r.FormValue("name"))
if name != "" {
_, _ = d.Store.Create(r.Context(), name)
_ = d.Mesh.SyncNow(r.Context()) // prompt push (best effort)
_ = d.Mesh.SyncNow(r.Context())
}
// Nach Aktion Items partial zurückgeben (HTMX swap)
http.Redirect(w, r, "/admin/items", http.StatusSeeOther)
// statt Redirect:
renderItemsPartial(w, r, d)
})
// RENAME
mux.HandleFunc("/admin/items/rename", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
idStr := r.FormValue("id")
id := strings.TrimSpace(r.FormValue("id"))
newName := strings.TrimSpace(r.FormValue("name"))
if id, err := strconv.ParseInt(idStr, 10, 64); err == nil && newName != "" {
if id != "" && newName != "" {
_, _ = d.Store.Rename(r.Context(), filesvc.ID(id), newName)
_ = d.Mesh.SyncNow(r.Context())
}
http.Redirect(w, r, "/admin/items", http.StatusSeeOther)
renderItemsPartial(w, r, d)
})
// DELETE
mux.HandleFunc("/admin/items/delete", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
if id, err := strconv.ParseInt(r.FormValue("id"), 10, 64); err == nil {
id := strings.TrimSpace(r.FormValue("id"))
if id != "" {
_, _ = d.Store.Delete(r.Context(), filesvc.ID(id))
_ = d.Blob.Delete(r.Context(), id)
_ = d.Mesh.SyncNow(r.Context())
}
http.Redirect(w, r, "/admin/items", http.StatusSeeOther)
renderItemsPartial(w, r, d)
})
mux.HandleFunc("/admin/mesh/syncnow", func(w http.ResponseWriter, r *http.Request) {
@@ -231,3 +301,61 @@ func BasicAuth(user, pass string, next http.Handler) http.Handler {
next.ServeHTTP(w, r)
})
}
// rebuild & render items partial for HTMX swaps
func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) {
type row struct {
ID string
Name string
UpdatedAt int64
HasBlob bool
Size int64
Owner string
OwnerActive bool
}
nextID := filesvc.ID(strings.TrimSpace(r.URL.Query().Get("next")))
items, nextOut, _ := d.Store.List(r.Context(), nextID, 100)
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
rows := make([]row, 0, len(items))
for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), (it.ID))
rows = append(rows, row{
ID: (it.ID),
Name: it.Name,
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
})
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
_ = tplItems.Execute(w, map[string]any{
"Items": rows,
"Next": nextOut,
})
}
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
if strings.TrimSpace(owner) == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) == strings.TrimSpace(owner) {
// Self ist immer aktiv, sonst nach LastSeen
if p.Self {
return true
}
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
}
return false
}

View File

@@ -8,7 +8,7 @@
<style>
:root { --bg:#0b1220; --card:#121a2b; --muted:#94a3b8; --text:#e5e7eb; --accent:#4f46e5; }
html,body { margin:0; background:var(--bg); color:var(--text); font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, Ubuntu; }
.wrap { max-width: 980px; margin: 40px auto; padding: 0 16px; }
.wrap { max-width: 1200px; margin: 40px auto; padding: 0 16px; }
.nav { display:flex; gap:12px; margin-bottom:16px; }
.btn { background:var(--card); border:1px solid #243044; padding:8px 12px; border-radius:10px; color:var(--text); cursor:pointer; }
.btn:hover { border-color:#3b4a66; }

View File

@@ -1,6 +1,6 @@
<div class="row">
<div style="flex: 1 1 360px">
<form hx-post="/admin/items/create" hx-target="#items" hx-get="/admin/items" hx-swap="outerHTML">
<form hx-post="/admin/items/create" hx-target="#items" hx-swap="outerHTML">
<label>Neue leere Datei (nur Metadaten)</label>
<div style="display:flex; gap:8px; margin-top:6px">
<input type="text" name="name" placeholder="z.B. notes.txt" required>
@@ -24,13 +24,19 @@
<table>
<thead>
<tr>
<th>ID</th><th>Name</th><th>Updated</th><th>Blob</th><th style="width:260px">Aktionen</th>
<th>ID</th>
<th>Name</th>
<th>Updated</th>
<th>Blob</th>
<th>Owner</th>
<th>Status</th>
<th style="width:320px">Aktionen</th>
</tr>
</thead>
<tbody>
{{ range .Items }}
<tr>
<td>{{ .ID }}</td>
<td><code style="font-size:12px">{{ .ID }}</code></td>
<td>{{ .Name }}</td>
<td><small class="muted">{{ printf "%.19s" (timeRFC3339 .UpdatedAt) }}</small></td>
<td>
@@ -41,17 +47,33 @@
<span class="pill" style="background:#3a0b0b;border-color:#5b1a1a;color:#fbb;">fehlt</span>
{{ end }}
</td>
<td><small class="muted">{{ .Owner }}</small></td>
<td>
{{ if .OwnerActive }}
<span class="pill">online</span>
{{ else }}
<span class="pill" style="background:#5b1a1a;border-color:#7a2b2b;color:#fbb">offline</span>
<!-- Owner-Handover nur wenn offline -->
<form style="display:inline"
hx-post="/admin/items/takeover"
hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Owner übernehmen?');">
<input type="hidden" name="id" value="{{ .ID }}">
<button class="btn" type="submit">Übernehmen</button>
</form>
{{ end }}
</td>
<td>
<form style="display:inline-flex; gap:6px"
hx-post="/admin/items/rename"
hx-target="#items" hx-get="/admin/items" hx-swap="outerHTML">
hx-target="#items" hx-swap="outerHTML">
<input type="hidden" name="id" value="{{ .ID }}">
<input type="text" name="name" placeholder="Neuer Name">
<button class="btn" type="submit">Rename</button>
</form>
<form style="display:inline"
hx-post="/admin/items/delete"
hx-target="#items" hx-get="/admin/items" hx-swap="outerHTML"
hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Wirklich löschen (inkl. Blob)?');">
<input type="hidden" name="id" value="{{ .ID }}">
<button class="btn" type="submit">Delete</button>
@@ -62,7 +84,7 @@
</td>
</tr>
{{ else }}
<tr><td colspan="5" class="muted">Keine Dateien vorhanden.</td></tr>
<tr><td colspan="7" class="muted">Keine Dateien vorhanden.</td></tr>
{{ end }}
</tbody>
</table>

View File

@@ -11,11 +11,11 @@
{{ range .Peers }}
<tr>
<td>{{ .URL }}</td>
<td>{{ if .Self }}✅{{ else }}{{ end }}</td>
<td>{{ if .Self }}✅{{ else }}🔗{{ end }}</td>
<td><small class="muted">{{ .LastSeen }}</small></td>
</tr>
{{ else }}
<tr><td colspan="3" class="muted">Keine Peers vorhanden.</td></tr>
<tr><td colspan="3" class="muted">⚠️ Keine Peers vorhanden.</td></tr>
{{ end }}
</tbody>
</table>

View File

@@ -6,7 +6,6 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"io"
"mime"
"net/http"
@@ -23,22 +22,35 @@ type Meta struct {
}
type Store interface {
Save(ctx context.Context, id int64, filename string, r io.Reader) (Meta, error)
Open(ctx context.Context, id int64) (io.ReadSeekCloser, Meta, error)
Stat(ctx context.Context, id int64) (Meta, bool, error)
Delete(ctx context.Context, id int64) error
Save(ctx context.Context, id string, filename string, r io.Reader) (Meta, error)
Open(ctx context.Context, id string) (io.ReadSeekCloser, Meta, error)
Stat(ctx context.Context, id string) (Meta, bool, error)
Delete(ctx context.Context, id string) error
}
type FS struct{ root string }
func New(root string) *FS { return &FS{root: root} }
func (fs *FS) dir(id int64) string { return filepath.Join(fs.root, "files", fmt.Sprintf("%d", id)) }
func (fs *FS) metaPath(id int64) string { return filepath.Join(fs.dir(id), "meta.json") }
func (fs *FS) blobPath(id int64, name string) string {
func (fs *FS) dir(id string) string { return filepath.Join(fs.root, "files", sanitizeID(id)) }
func (fs *FS) metaPath(id string) string { return filepath.Join(fs.dir(id), "meta.json") }
func (fs *FS) blobPath(id string, name string) string {
return filepath.Join(fs.dir(id), "blob"+safeExt(name))
}
func sanitizeID(id string) string {
// nur 0-9a-zA-Z- zulassen; Rest mit '_' ersetzen
b := make([]rune, 0, len(id))
for _, r := range id {
if (r >= '0' && r <= '9') || (r >= 'a' && r <= 'z') || (r >= 'A' && r <= 'Z') || r == '-' {
b = append(b, r)
} else {
b = append(b, '_')
}
}
return string(b)
}
func safeExt(name string) string {
ext := filepath.Ext(name)
if len(ext) > 16 { // unrealistisch lange Exts beschneiden
@@ -47,7 +59,7 @@ func safeExt(name string) string {
return ext
}
func (fs *FS) Save(_ context.Context, id int64, filename string, r io.Reader) (Meta, error) {
func (fs *FS) Save(_ context.Context, id string, filename string, r io.Reader) (Meta, error) {
if strings.TrimSpace(filename) == "" {
return Meta{}, errors.New("filename required")
}
@@ -107,7 +119,7 @@ func (fs *FS) Save(_ context.Context, id int64, filename string, r io.Reader) (M
return meta, nil
}
func (fs *FS) Open(_ context.Context, id int64) (io.ReadSeekCloser, Meta, error) {
func (fs *FS) Open(_ context.Context, id string) (io.ReadSeekCloser, Meta, error) {
meta, ok, err := fs.Stat(context.Background(), id)
if err != nil {
return nil, Meta{}, err
@@ -119,7 +131,7 @@ func (fs *FS) Open(_ context.Context, id int64) (io.ReadSeekCloser, Meta, error)
return f, meta, err
}
func (fs *FS) Stat(_ context.Context, id int64) (Meta, bool, error) {
func (fs *FS) Stat(_ context.Context, id string) (Meta, bool, error) {
b, err := os.ReadFile(fs.metaPath(id))
if err != nil {
if os.IsNotExist(err) {
@@ -139,7 +151,7 @@ func (fs *FS) Stat(_ context.Context, id int64) (Meta, bool, error) {
return m, true, nil
}
func (fs *FS) Delete(_ context.Context, id int64) error {
func (fs *FS) Delete(_ context.Context, id string) error {
return os.RemoveAll(fs.dir(id))
}

View File

@@ -2,6 +2,9 @@ package filesvc
import (
"context"
"encoding/json"
"os"
"path/filepath"
"slices"
"strings"
"sync"
@@ -11,17 +14,82 @@ import (
type MemStore struct {
mu sync.Mutex
items map[ID]File
next ID
self string
// optionales Eventing
subs []chan ChangeEvent
subs []chan ChangeEvent
persistPath string
}
func NewMemStore() *MemStore {
return &MemStore{
items: make(map[ID]File),
next: 1,
func NewMemStore(self string) *MemStore {
return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)}
}
func NewMemStorePersistent(self, path string) *MemStore {
m := NewMemStore(self)
m.persistPath = strings.TrimSpace(path)
// beim Start versuchen zu laden
_ = m.loadFromDisk()
return m
}
// --- Persistenz-Helper (NEU) ---
func (m *MemStore) loadFromDisk() error {
if m.persistPath == "" {
return nil
}
f, err := os.Open(m.persistPath)
if err != nil {
return nil // Datei existiert beim ersten Start nicht ok
}
defer f.Close()
var snap Snapshot
if err := json.NewDecoder(f).Decode(&snap); err != nil {
return err
}
m.mu.Lock()
for _, it := range snap.Items {
m.items[it.ID] = it
}
m.mu.Unlock()
return nil
}
func (m *MemStore) saveLocked() error {
if m.persistPath == "" {
return nil
}
if err := os.MkdirAll(filepath.Dir(m.persistPath), 0o755); err != nil {
return err
}
// Snapshot aus Map bauen
snap := Snapshot{Items: make([]File, 0, len(m.items))}
for _, it := range m.items {
snap.Items = append(snap.Items, it)
}
// atomar schreiben
tmp := m.persistPath + ".tmp"
f, err := os.Create(tmp)
if err != nil {
return err
}
enc := json.NewEncoder(f)
enc.SetIndent("", " ")
if err := enc.Encode(&snap); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Sync(); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return err
}
return os.Rename(tmp, m.persistPath)
}
/*** Store ***/
@@ -39,39 +107,54 @@ func (m *MemStore) Get(_ context.Context, id ID) (File, error) {
func (m *MemStore) List(_ context.Context, next ID, limit int) ([]File, ID, error) {
m.mu.Lock()
defer m.mu.Unlock()
if limit <= 0 || limit > 1000 {
limit = 100
}
ids := make([]ID, 0, len(m.items))
for id := range m.items {
ids = append(ids, id)
// sortiere deterministisch nach UpdatedAt, dann ID
all := make([]File, 0, len(m.items))
for _, v := range m.items {
all = append(all, v)
}
slices.Sort(ids)
slices.SortFunc(all, func(a, b File) int {
if a.UpdatedAt == b.UpdatedAt {
if a.ID == b.ID {
return 0
}
if a.ID < b.ID {
return -1
}
return 1
}
if a.UpdatedAt < b.UpdatedAt {
return -1
}
return 1
})
start := 0
if next > 0 {
for i, id := range ids {
if id >= next {
if next != "" {
for i, it := range all {
if it.ID >= next {
start = i
break
}
}
}
end := start + limit
if end > len(ids) {
end = len(ids)
if end > len(all) {
end = len(all)
}
out := make([]File, 0, end-start)
for _, id := range ids[start:end] {
it := m.items[id]
for _, it := range all[start:end] {
if !it.Deleted {
out = append(out, it)
}
}
var nextOut ID
if end < len(ids) {
nextOut = ids[end]
if end < len(all) {
nextOut = all[end].ID
}
return out, nextOut, nil
}
@@ -85,9 +168,13 @@ func (m *MemStore) Create(_ context.Context, name string) (File, error) {
m.mu.Lock()
defer m.mu.Unlock()
now := time.Now().UnixNano()
it := File{ID: m.next, Name: name, UpdatedAt: now}
uid, err := NewUUIDv4()
if err != nil {
return File{}, err
}
it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self}
m.items[it.ID] = it
m.next++
_ = m.saveLocked()
m.emit(it)
return it, nil
}
@@ -104,9 +191,13 @@ func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error
if !ok || it.Deleted {
return File{}, ErrNotFound
}
it.Name = newName
if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
it.Name = strings.TrimSpace(newName)
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
_ = m.saveLocked()
m.emit(it)
return it, nil
}
@@ -118,16 +209,46 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) {
if !ok {
return File{}, ErrNotFound
}
if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
if it.Deleted {
return it, nil
}
it.Deleted = true
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
_ = m.saveLocked()
m.emit(it)
return it, nil
}
func (m *MemStore) TakeoverOwner(_ context.Context, id ID, newOwner string) (File, error) {
m.mu.Lock()
defer m.mu.Unlock()
it, ok := m.items[id]
if !ok || it.Deleted {
return File{}, ErrNotFound
}
newOwner = strings.TrimSpace(newOwner)
if newOwner == "" {
return File{}, ErrBadInput
}
// Sicherheit: nur für sich selbst übernehmen
if newOwner != m.self {
return File{}, ErrForbidden
}
if it.Owner == newOwner {
return it, nil
}
it.Owner = newOwner
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
_ = m.saveLocked()
m.emitLocked(it)
return it, nil
}
/*** Replicable ***/
func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
@@ -143,16 +264,22 @@ func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
m.mu.Lock()
defer m.mu.Unlock()
changed := false
for _, ri := range s.Items {
li, ok := m.items[ri.ID]
if !ok || ri.UpdatedAt > li.UpdatedAt {
m.items[ri.ID] = ri
if ri.ID >= m.next {
m.next = ri.ID + 1
// Owner nie überschreiben, außer er ist leer
if ok && li.Owner != "" && ri.Owner != "" && ri.Owner != li.Owner {
ri.Owner = li.Owner
}
m.items[ri.ID] = ri
changed = true
m.emitLocked(ri)
}
}
if changed {
_ = m.saveLocked() // ← NEU
}
return nil
}

View File

@@ -2,20 +2,22 @@ package filesvc
import (
"context"
"crypto/rand"
"errors"
"fmt"
"time"
)
/*** Domain ***/
type ID = int64
type ID = string
type File struct {
ID ID `json:"id"`
Name string `json:"name"`
// weitere Metadaten optional: Size, Hash, Owner, Tags, ...
UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW
Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete
ID ID `json:"id"`
Name string `json:"name"`
UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW
Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete
Owner string `json:"owner"` //AdvertURL/NodeID des Erzeugers
}
/*** Fehler ***/
@@ -34,11 +36,10 @@ type Store interface {
// Lesen & Auflisten
Get(ctx context.Context, id ID) (File, error)
List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error)
// Mutationen mit LWW-Semantik (UpdatedAt wird intern gesetzt, außer bei ApplyRemote)
Create(ctx context.Context, name string) (File, error)
Rename(ctx context.Context, id ID, newName string) (File, error)
Delete(ctx context.Context, id ID) (File, error)
Rename(ctx context.Context, id ID, newName string) (File, error) // nur Owner darf
Delete(ctx context.Context, id ID) (File, error) // nur Owner darf
TakeoverOwner(ctx context.Context, id ID, newOwner string) (File, error)
}
/*** Mesh-Replikation ***/
@@ -76,3 +77,13 @@ type MeshStore interface {
Replicable
Watchable // optional kann Noop sein
}
func NewUUIDv4() (string, error) {
b := make([]byte, 16)
if _, err := rand.Read(b); err != nil {
return "", err
}
b[6] = (b[6] & 0x0f) | 0x40 // Version 4
b[8] = (b[8] & 0x3f) | 0x80 // Variant RFC4122
return fmt.Sprintf("%x-%x-%x-%x-%x", b[0:4], b[4:6], b[6:8], b[8:10], b[10:16]), nil
}

View File

@@ -8,13 +8,16 @@ import (
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"hash/crc32"
"io"
"log"
"math/rand/v2"
"net"
"net/http"
"os"
"slices"
"strconv"
"strings"
"sync"
"time"
@@ -28,7 +31,18 @@ type Config struct {
Seeds []string // other peers' mesh base URLs
ClusterSecret string // HMAC key
EnableDiscovery bool
DiscoveryAddress string // "239.8.8.8:9898"
DiscoveryAddress string // "239.8.8.8:9898"
PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten)
PruneInterval time.Duration // wie oft wird gepruned
SyncInterval time.Duration
Fanout int
// NEU:
HelloInterval time.Duration // wie oft pingen
HelloFanout int // wie viele Peers pro Tick
BlobTimeout time.Duration
}
type Peer struct {
@@ -39,10 +53,11 @@ type Peer struct {
}
type Item struct {
ID int64 `json:"id"`
ID string `json:"id"`
Name string `json:"name"`
UpdatedAt int64 `json:"updatedAt"`
Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes
Owner string `json:"owner"`
}
type Snapshot struct {
@@ -53,6 +68,7 @@ type Snapshot struct {
type Callbacks struct {
GetSnapshot func(ctx context.Context) (Snapshot, error)
ApplyRemote func(ctx context.Context, s Snapshot) error
BlobOpen func(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error)
}
/*** Node ***/
@@ -69,6 +85,80 @@ type Node struct {
wg sync.WaitGroup
}
// RemovePeer löscht einen Peer aus der Peer-Tabelle. Seeds werden standardmäßig nicht entfernt.
func (n *Node) RemovePeer(url string) bool {
n.mu.Lock()
defer n.mu.Unlock()
if url == "" || url == n.self.URL {
return false
}
// Seeds schützen
if n.isSeed(url) {
return false
}
if _, ok := n.peers[url]; ok {
delete(n.peers, url)
return true
}
return false
}
func (n *Node) Config() Config { return n.cfg }
// PruneNow entfernt alle Peers, deren LastSeen vor cutoff liegt (Seeds bleiben).
func (n *Node) PruneNow(cutoff time.Time) int {
n.mu.Lock()
defer n.mu.Unlock()
removed := 0
for url, p := range n.peers {
if url == n.self.URL || n.isSeed(url) {
continue
}
if p.LastSeen.IsZero() || p.LastSeen.Before(cutoff) {
delete(n.peers, url)
removed++
}
}
return removed
}
// StartPeerPruner startet den Hintergrundjob (stoppt automatisch bei n.stop).
func (n *Node) StartPeerPruner() {
go n.loopPrunePeers()
}
func (n *Node) loopPrunePeers() {
ttl := n.cfg.PeerTTL
if ttl <= 0 {
ttl = 2 * time.Minute
}
interval := n.cfg.PruneInterval
if interval <= 0 {
interval = 30 * time.Second
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
cutoff := time.Now().Add(-ttl)
_ = n.PruneNow(cutoff)
}
}
}
// helper: ist url ein Seed?
func (n *Node) isSeed(url string) bool {
for _, s := range n.cfg.Seeds {
if strings.TrimSpace(s) == strings.TrimSpace(url) {
return true
}
}
return false
}
func New(cfg Config, cbs Callbacks) (*Node, error) {
if cfg.BindAddr == "" || cfg.AdvertURL == "" {
return nil, errors.New("mesh: BindAddr and AdvertURL required")
@@ -110,21 +200,38 @@ func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "bad signature", http.StatusUnauthorized)
return
}
var p Peer
if err := json.Unmarshal(body, &p); err != nil {
var req struct {
URL string `json:"url"`
}
if err := json.Unmarshal(body, &req); err != nil || strings.TrimSpace(req.URL) == "" {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
p.LastSeen = time.Now()
// Peer anlegen (falls neu) und LastSeen setzen
n.mu.Lock()
if existing, ok := n.peers[p.URL]; ok {
existing.LastSeen = p.LastSeen
} else if p.URL != n.self.URL {
cp := p
n.peers[p.URL] = &cp
if req.URL != n.self.URL {
if p, ok := n.peers[req.URL]; ok {
p.LastSeen = time.Now()
} else {
cp := Peer{URL: req.URL, LastSeen: time.Now()} // weitere Felder wie bekannt
n.peers[req.URL] = &cp
}
}
n.mu.Unlock()
w.WriteHeader(http.StatusOK)
}
func (n *Node) touchPeer(url string) {
if strings.TrimSpace(url) == "" {
return
}
n.mu.Lock()
if p, ok := n.peers[url]; ok {
p.LastSeen = time.Now()
}
n.mu.Unlock()
w.WriteHeader(http.StatusNoContent)
}
func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) {
@@ -133,6 +240,7 @@ func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) {
var list []Peer
list = append(list, n.self)
for _, p := range n.peers {
//p.Self = false
list = append(list, *p)
}
writeJSON(w, http.StatusOK, list)
@@ -170,6 +278,7 @@ func (n *Node) Serve() error {
mux := http.NewServeMux()
mux.HandleFunc("/mesh/peers", n.peersHandler)
mux.HandleFunc("/mesh/hello", n.helloHandler)
mux.HandleFunc("/mesh/blob", n.blobHandler)
mux.HandleFunc("/mesh/sync", n.syncHandler)
n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux}
@@ -180,6 +289,9 @@ func (n *Node) Serve() error {
n.loopSeeder()
}()
n.wg.Add(1)
go func() { defer n.wg.Done(); n.loopPeerExchange() }()
if n.cfg.EnableDiscovery && n.cfg.DiscoveryAddress != "" {
n.wg.Add(2)
go func() {
@@ -198,6 +310,12 @@ func (n *Node) Serve() error {
n.loopAntiEntropy()
}()
n.wg.Add(1)
go func() {
defer n.wg.Done()
n.loopHello()
}()
// http server
errc := make(chan error, 1)
go func() {
@@ -222,6 +340,43 @@ func (n *Node) Close(ctx context.Context) error {
/*** Loops ***/
func (n *Node) loopPeerExchange() {
t := time.NewTicker(30 * time.Second)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
}
// Seeds abfragen
for _, s := range n.cfg.Seeds {
if strings.TrimSpace(s) == "" {
continue
}
resp, err := n.client.Get(strings.TrimRight(s, "/") + "/mesh/peers")
if err != nil {
continue
}
var list []Peer
if json.NewDecoder(resp.Body).Decode(&list) == nil {
n.mu.Lock()
for _, p := range list {
if p.URL == "" || p.URL == n.self.URL {
continue
}
if _, ok := n.peers[p.URL]; !ok {
cp := p
n.peers[p.URL] = &cp
}
}
n.mu.Unlock()
}
resp.Body.Close()
}
}
}
func (n *Node) loopSeeder() {
// attempt to hello known seeds every 5s at start, then every 30s
backoff := 5 * time.Second
@@ -345,28 +500,100 @@ func (n *Node) loopBeaconRecv() {
/*** Outgoing ***/
func (n *Node) sendHello(url string) error {
p := n.self
b, _ := json.Marshal(p)
b, _ := json.Marshal(struct {
URL string `json:"url"`
}{URL: n.self.URL})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
resp, err := n.client.Do(req)
if err == nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if err != nil {
return err
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // eigenen Blick auf den Peer direkt als „gesehen“ markieren
return nil
}
return fmt.Errorf("hello %s: %s", url, resp.Status)
}
func (n *Node) loopHello() {
interval := n.cfg.HelloInterval
if interval <= 0 {
interval = 20 * time.Second
}
fanout := n.cfg.HelloFanout
if fanout <= 0 {
fanout = 8
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
}
// Liste der *bekannten* Peers (nicht nur Seeds)
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
if url != n.self.URL {
targets = append(targets, url)
}
}
n.mu.RUnlock()
if len(targets) == 0 {
continue
}
// zufällig mischen und auf Fanout begrenzen
rand.Shuffle(len(targets), func(i, j int) { targets[i], targets[j] = targets[j], targets[i] })
if fanout < len(targets) {
targets = targets[:fanout]
}
// leicht parallel pingen
var wg sync.WaitGroup
for _, u := range targets {
u := u
wg.Add(1)
go func() {
defer wg.Done()
_ = n.sendHello(u)
}()
}
wg.Wait()
}
return err
}
func (n *Node) sendSync(url string, s Snapshot) error {
b, _ := json.Marshal(s)
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
req = req.WithContext(ctx)
resp, err := n.client.Do(req)
if err == nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if err != nil {
return err
}
return err
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("sync %s: %s", url, resp.Status)
}
n.touchPeer(url)
return nil
}
// PeerList liefert eine Kopie der bekannten Peers inkl. Self.
@@ -377,6 +604,7 @@ func (n *Node) PeerList() []Peer {
out = append(out, n.self)
for _, p := range n.peers {
cp := *p
cp.Self = false
out = append(out, cp)
}
return out
@@ -403,11 +631,11 @@ func (n *Node) SyncNow(ctx context.Context) error {
/*** Utilities ***/
// OwnerHint is a simple, optional mapping to distribute responsibility.
func OwnerHint(id int64, peers []string) int {
func OwnerHint(id string, peers []string) int {
if len(peers) == 0 {
return 0
}
h := crc32.ChecksumIEEE([]byte(string(rune(id))))
h := crc32.ChecksumIEEE([]byte(id))
return int(h % uint32(len(peers)))
}
@@ -449,3 +677,109 @@ func getenvDefault(k, def string) string {
}
return v
}
// POST /mesh/blob (Body: {"id":<int64>}) -> streamt den Blob
func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) {
body, _ := io.ReadAll(r.Body)
if !n.verify(body, r.Header.Get("X-Mesh-Sig")) {
http.Error(w, "bad signature", http.StatusUnauthorized)
return
}
var req struct {
ID string `json:"id"`
}
if err := json.Unmarshal(body, &req); err != nil {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
if n.cbs.BlobOpen == nil {
http.Error(w, "blob unavailable", http.StatusNotFound)
return
}
rc, name, ct, size, err := n.cbs.BlobOpen(r.Context(), req.ID)
if err != nil {
http.Error(w, "not found", http.StatusNotFound)
return
}
defer rc.Close()
if ct == "" {
ct = "application/octet-stream"
}
if size > 0 {
w.Header().Set("Content-Length", strconv.FormatInt(size, 10))
}
w.Header().Set("Content-Type", ct)
w.Header().Set("X-Blob-Name", name)
_, _ = io.Copy(w, rc)
}
// sendBlobRequest schickt eine signierte Anfrage an /mesh/blob und liefert die Response
// zurück (Caller MUSS resp.Body schließen!). Bei HTTP 200 wird der Peer als gesehen markiert.
func (n *Node) sendBlobRequest(url, id string) (*http.Response, error) {
b, _ := json.Marshal(struct {
ID string `json:"id"`
}{ID: id})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
req.Header.Set("Content-Type", "application/json")
// ❗ WICHTIG: kein kurzer Timeout. Optional: großer Timeout aus Config
ctx := context.Background()
if n.cfg.BlobTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), n.cfg.BlobTimeout)
defer cancel()
}
req = req.WithContext(ctx)
resp, err := n.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // ausgehender Erfolg zählt als "gesehen"
return resp, nil
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
return nil, fmt.Errorf("blob %s from %s: %s", id, url, resp.Status)
}
// Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen
func (n *Node) FetchBlobAny(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) {
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
targets = append(targets, url)
}
n.mu.RUnlock()
if len(targets) == 0 {
// Fallback: Seeds probieren
targets = append(targets, n.cfg.Seeds...)
}
for _, u := range targets {
if strings.TrimSpace(u) == "" || u == n.self.URL {
continue
}
resp, err := n.sendBlobRequest(u, id)
if err != nil {
continue
}
if resp.StatusCode == http.StatusOK {
name := resp.Header.Get("X-Blob-Name")
ct := resp.Header.Get("Content-Type")
var size int64 = -1
if cl := resp.Header.Get("Content-Length"); cl != "" {
if s, err := strconv.ParseInt(cl, 10, 64); err == nil {
size = s
}
}
// Caller muss resp.Body schließen
return resp.Body, name, ct, size, nil
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
}
return nil, "", "", 0, fmt.Errorf("blob %s not found on peers", id)
}