Compare commits

...

7 Commits

Author SHA1 Message Date
42b0493adb fix-2
All checks were successful
release-tag / release-image (push) Successful in 1m26s
2025-09-30 00:34:24 +02:00
b3c1f37632 fix
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:19:39 +02:00
40ded4d4db Bugfix: cache failed
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:09:57 +02:00
291cfa33a9 Persistenz implementiert : Test-3
All checks were successful
release-tag / release-image (push) Successful in 1m27s
2025-09-29 23:55:35 +02:00
74fef30251 Test-2
All checks were successful
release-tag / release-image (push) Successful in 1m31s
2025-09-29 23:44:08 +02:00
b7729e8b39 Test-1
All checks were successful
release-tag / release-image (push) Successful in 1m52s
2025-09-29 23:08:34 +02:00
9122ebf2f1 vor temaplte 2025-09-29 23:04:46 +02:00
6 changed files with 552 additions and 91 deletions

View File

@@ -10,6 +10,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/signal" "os/signal"
"path/filepath"
"strconv" "strconv"
"strings" "strings"
"syscall" "syscall"
@@ -55,6 +56,11 @@ func loadConfig() AppConfig {
m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute) m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute)
m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second) m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second)
m.HelloInterval = parseDuration(os.Getenv("MESH_HELLO_INTERVAL"), 20*time.Second)
m.HelloFanout = parseIntEnv(os.Getenv("MESH_HELLO_FANOUT"), 8)
m.BlobTimeout = parseDuration(os.Getenv("MESH_BLOB_TIMEOUT"), 0)
// Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung: // Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung:
if strings.TrimSpace(m.AdvertURL) == "" { if strings.TrimSpace(m.AdvertURL) == "" {
m.AdvertURL = inferAdvertURL(m.BindAddr) m.AdvertURL = inferAdvertURL(m.BindAddr)
@@ -101,6 +107,18 @@ func parseBoolEnv(k string, def bool) bool {
return v == "1" || v == "true" || v == "yes" || v == "on" return v == "1" || v == "true" || v == "yes" || v == "on"
} }
func parseIntEnv(k string, def int) int {
v := strings.TrimSpace(os.Getenv(k))
if v == "" {
return def
}
n, err := strconv.Atoi(v)
if err != nil {
return def
}
return n
}
func splitCSV(s string) []string { func splitCSV(s string) []string {
s = strings.TrimSpace(s) s = strings.TrimSpace(s)
if s == "" { if s == "" {
@@ -257,6 +275,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store)
} }
it, err := store.Rename(r.Context(), in.ID, in.Name) it, err := store.Rename(r.Context(), in.ID, in.Name)
if err != nil { if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) { if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound status = http.StatusNotFound
@@ -283,6 +305,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store)
it, err := store.Delete(r.Context(), in.ID) it, err := store.Delete(r.Context(), in.ID)
_ = blobs.Delete(r.Context(), string(in.ID)) _ = blobs.Delete(r.Context(), string(in.ID))
if err != nil { if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) { if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound status = http.StatusNotFound
@@ -363,13 +389,23 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
} }
// 3) remote holen & cachen // 3) remote holen & cachen
it1, _ := store.Get(r.Context(), filesvc.ID(id))
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if cfg := meshNode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it1.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id) rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id)
if err != nil { if err != nil {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
defer rrc.Close() defer rrc.Close()
if _, err := blobs.Save(r.Context(), id, it.Name, rrc); err != nil { if _, err := blobs.Save(r.Context(), id, it1.Name, rrc); err != nil {
http.Error(w, "cache failed", http.StatusInternalServerError) http.Error(w, "cache failed", http.StatusInternalServerError)
return return
} }
@@ -381,7 +417,7 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
return return
} }
defer lrc.Close() defer lrc.Close()
serveBlob(w, r, lrc, meta, it.Name) serveBlob(w, r, lrc, meta, it1.Name)
}) })
} }
@@ -395,6 +431,7 @@ func toMeshSnapshot(s filesvc.Snapshot) mesh.Snapshot {
Name: it.Name, Name: it.Name,
UpdatedAt: it.UpdatedAt, UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted, Deleted: it.Deleted,
Owner: it.Owner,
}) })
} }
return out return out
@@ -408,22 +445,52 @@ func fromMeshSnapshot(ms mesh.Snapshot) filesvc.Snapshot {
Name: it.Name, Name: it.Name,
UpdatedAt: it.UpdatedAt, UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted, Deleted: it.Deleted,
Owner: it.Owner,
}) })
} }
return out return out
} }
// isOwnerActive prüft, ob der Owner in der Peer-Liste als "aktiv" gilt.
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
owner = strings.TrimSpace(owner)
if owner == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) != owner {
continue
}
// Self ist per Definition aktiv
if p.Self {
return true
}
// ohne LastSeen: als inaktiv behandeln
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
// Owner ist nicht mal in der Liste: inaktiv
return false
}
/*** main ***/ /*** main ***/
func main() { func main() {
cfg := loadConfig() cfg := loadConfig()
// Domain-Store (mesh-fähig) // Domain-Store (mesh-fähig)
st := filesvc.NewMemStore() nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL)
//st := filesvc.NewMemStore(nodeID)
// Mesh starten // Mesh starten
//mcfg := mesh.FromEnv() //mcfg := mesh.FromEnv()
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data")) blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
dataDir := getenvDefault("DATA_DIR", "./data")
metaPath := filepath.Join(dataDir, "meta", "items.json")
st := filesvc.NewMemStorePersistent(nodeID, metaPath)
mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{ mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) { GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
@@ -463,6 +530,47 @@ func main() {
} }
}() }()
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// aktive Owner bestimmen
peers := mnode.PeerList()
ttl := 2 * time.Minute
if cfg := mnode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
cutoff := time.Now().Add(-ttl)
active := map[string]bool{}
for _, p := range peers {
if p.Self {
active[p.URL] = true
continue
}
if !p.LastSeen.IsZero() && p.LastSeen.After(cutoff) {
active[p.URL] = true
}
}
// alle Items durchgehen; Blobs von Offline-Ownern löschen
var next filesvc.ID
for {
items, nextOut, _ := st.List(context.Background(), next, 1000)
for _, it := range items {
if it.Owner == "" || active[it.Owner] {
continue
}
_ = blobs.Delete(context.Background(), it.ID)
}
if nextOut == "" {
break
}
next = nextOut
}
}
}()
// Root-Mux // Root-Mux
root := http.NewServeMux() root := http.NewServeMux()
@@ -533,51 +641,64 @@ func registerPublicDownloads(mux *http.ServeMux, store filesvc.MeshStore, blobs
base = "/" + base base = "/" + base
} }
mux.HandleFunc(base+"/", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc(base+"/", func(w http.ResponseWriter, r *http.Request) {
idStr := strings.TrimPrefix(r.URL.Path, base+"/") id := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, base+"/"))
if idStr == "" { if id == "" {
http.NotFound(w, r)
return
}
id := idStr
if strings.TrimSpace(id) == "" {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
// 1) Metadaten prüfen (nicht vorhanden oder gelöscht → 404) // 1) Metadaten prüfen
it, err := store.Get(r.Context(), filesvc.ID(id)) it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted { if err != nil || it.Deleted {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
// 2) lokal versuchen // 2) Lokal versuchen
if rc, meta, err := blobs.Open(r.Context(), id); err == nil { if rc, meta, err := blobs.Open(context.Background(), id); err == nil {
defer rc.Close() defer rc.Close()
serveBlob(w, r, rc, meta, it.Name) // Download-Name aus Store! serveBlob(w, r, rc, meta, it.Name)
return return
} }
// 3) aus Mesh holen (signiert) und cachen // (Optional) Owner-Online-Check — wenn du auch bei offline Ownern liefern willst, block auskommentieren
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id) {
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if !isOwnerActive(it.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
}
// 3) Aus Mesh holen — EIGENER Timeout-Kontext, NICHT r.Context()
rrc, remoteName, _, _, err := meshNode.FetchBlobAny(context.Background(), id)
if err != nil { if err != nil {
http.NotFound(w, r) http.NotFound(w, r)
return return
} }
defer rrc.Close() defer rrc.Close()
if _, err := blobs.Save(r.Context(), id, it.Name, rrc); err != nil {
http.Error(w, "cache failed", http.StatusInternalServerError) filename := strings.TrimSpace(remoteName)
if filename == "" {
filename = it.Name
}
// 4) Lokal cachen — KEIN Request-Kontext, damit Save nicht abbricht
if _, err := blobs.Save(context.Background(), id, filename, rrc); err != nil {
log.Printf("[public] cache save failed id=%s name=%q: %v", id, filename, err)
http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError)
return return
} }
// 4) erneut lokal öffnen und streamen // 5) Erneut lokal öffnen und streamen
lrc, meta, err := blobs.Open(r.Context(), id) lrc, meta, err := blobs.Open(context.Background(), id)
if err != nil { if err != nil {
http.Error(w, "open failed", http.StatusInternalServerError) http.Error(w, "open failed: "+err.Error(), http.StatusInternalServerError)
return return
} }
defer lrc.Close() defer lrc.Close()
serveBlob(w, r, lrc, meta, it.Name) serveBlob(w, r, lrc, meta, filename)
}) })
} }

View File

@@ -60,21 +60,32 @@ func Register(mux *http.ServeMux, d Deps) {
items, nextOut, _ := d.Store.List(r.Context(), nextID, 100) items, nextOut, _ := d.Store.List(r.Context(), nextID, 100)
type row struct { type row struct {
ID string ID string
Name string Name string
UpdatedAt int64 UpdatedAt int64
HasBlob bool HasBlob bool
Size int64 Size int64
Owner string
OwnerActive bool
} }
rows := make([]row, 0, len(items)) rows := make([]row, 0, len(items))
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
for _, it := range items { for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), it.ID) meta, ok, _ := d.Blob.Stat(r.Context(), it.ID)
rows = append(rows, row{ rows = append(rows, row{
ID: it.ID, ID: it.ID,
Name: it.Name, Name: it.Name,
UpdatedAt: it.UpdatedAt, UpdatedAt: it.UpdatedAt,
HasBlob: ok, HasBlob: ok,
Size: meta.Size, Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
}) })
} }
@@ -84,6 +95,42 @@ func Register(mux *http.ServeMux, d Deps) {
}) })
}) })
mux.HandleFunc("/admin/items/takeover", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
id := strings.TrimSpace(r.FormValue("id"))
if id != "" {
// Nur zulassen, wenn Owner tatsächlich offline ist
it, err := d.Store.Get(r.Context(), filesvc.ID(id))
if err == nil {
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it.Owner, peers, ttl) {
// eigene URL aus PeerList ermitteln
self := ""
for _, p := range peers {
if p.Self {
self = p.URL
break
}
}
if self == "" {
self = "unknown-self"
}
if _, err := d.Store.TakeoverOwner(r.Context(), filesvc.ID(id), self); err == nil {
_ = d.Mesh.SyncNow(r.Context())
}
}
}
}
renderItemsPartial(w, r, d)
})
// Upload (multipart/form-data, Feldname "file", optional name-Override) // Upload (multipart/form-data, Feldname "file", optional name-Override)
mux.HandleFunc("/admin/files/upload", func(w http.ResponseWriter, r *http.Request) { mux.HandleFunc("/admin/files/upload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost { if r.Method != http.MethodPost {
@@ -258,25 +305,32 @@ func BasicAuth(user, pass string, next http.Handler) http.Handler {
// rebuild & render items partial for HTMX swaps // rebuild & render items partial for HTMX swaps
func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) { func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) {
type row struct { type row struct {
ID string ID string
Name string Name string
UpdatedAt int64 UpdatedAt int64
HasBlob bool HasBlob bool
Size int64 Size int64
Owner string
OwnerActive bool
} }
nextID := filesvc.ID(strings.TrimSpace(r.URL.Query().Get("next"))) nextID := filesvc.ID(strings.TrimSpace(r.URL.Query().Get("next")))
items, nextOut, _ := d.Store.List(r.Context(), nextID, 100) items, nextOut, _ := d.Store.List(r.Context(), nextID, 100)
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
rows := make([]row, 0, len(items)) rows := make([]row, 0, len(items))
for _, it := range items { for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), (it.ID)) meta, ok, _ := d.Blob.Stat(r.Context(), (it.ID))
rows = append(rows, row{ rows = append(rows, row{
ID: (it.ID), ID: (it.ID),
Name: it.Name, Name: it.Name,
UpdatedAt: it.UpdatedAt, UpdatedAt: it.UpdatedAt,
HasBlob: ok, HasBlob: ok,
Size: meta.Size, Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
}) })
} }
w.Header().Set("Content-Type", "text/html; charset=utf-8") w.Header().Set("Content-Type", "text/html; charset=utf-8")
@@ -285,3 +339,23 @@ func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) {
"Next": nextOut, "Next": nextOut,
}) })
} }
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
if strings.TrimSpace(owner) == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) == strings.TrimSpace(owner) {
// Self ist immer aktiv, sonst nach LastSeen
if p.Self {
return true
}
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
}
return false
}

View File

@@ -24,13 +24,19 @@
<table> <table>
<thead> <thead>
<tr> <tr>
<th>ID</th><th>Name</th><th>Updated</th><th>Blob</th><th style="width:260px">Aktionen</th> <th>ID</th>
<th>Name</th>
<th>Updated</th>
<th>Blob</th>
<th>Owner</th>
<th>Status</th>
<th style="width:320px">Aktionen</th>
</tr> </tr>
</thead> </thead>
<tbody> <tbody>
{{ range .Items }} {{ range .Items }}
<tr> <tr>
<td>{{ .ID }}</td> <td><code style="font-size:12px">{{ .ID }}</code></td>
<td>{{ .Name }}</td> <td>{{ .Name }}</td>
<td><small class="muted">{{ printf "%.19s" (timeRFC3339 .UpdatedAt) }}</small></td> <td><small class="muted">{{ printf "%.19s" (timeRFC3339 .UpdatedAt) }}</small></td>
<td> <td>
@@ -41,18 +47,34 @@
<span class="pill" style="background:#3a0b0b;border-color:#5b1a1a;color:#fbb;">fehlt</span> <span class="pill" style="background:#3a0b0b;border-color:#5b1a1a;color:#fbb;">fehlt</span>
{{ end }} {{ end }}
</td> </td>
<td><small class="muted">{{ .Owner }}</small></td>
<td>
{{ if .OwnerActive }}
<span class="pill">online</span>
{{ else }}
<span class="pill" style="background:#5b1a1a;border-color:#7a2b2b;color:#fbb">offline</span>
<!-- Owner-Handover nur wenn offline -->
<form style="display:inline"
hx-post="/admin/items/takeover"
hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Owner übernehmen?');">
<input type="hidden" name="id" value="{{ .ID }}">
<button class="btn" type="submit">Übernehmen</button>
</form>
{{ end }}
</td>
<td> <td>
<form style="display:inline-flex; gap:6px" <form style="display:inline-flex; gap:6px"
hx-post="/admin/items/rename" hx-post="/admin/items/rename"
hx-target="#items" hx-swap="outerHTML"> hx-target="#items" hx-swap="outerHTML">
<input type="hidden" name="id" value="{{ .ID }}"> <input type="hidden" name="id" value="{{ .ID }}">
<input type="text" name="name" placeholder="Neuer Name"> <input type="text" name="name" placeholder="Neuer Name">
<button class="btn" type="submit">Rename</button> <button class="btn" type="submit">Rename</button>
</form> </form>
<form style="display:inline" <form style="display:inline"
hx-post="/admin/items/delete" hx-post="/admin/items/delete"
hx-target="#items" hx-swap="outerHTML" hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Wirklich löschen (inkl. Blob)?');"> onsubmit="return confirm('Wirklich löschen (inkl. Blob)?');">
<input type="hidden" name="id" value="{{ .ID }}"> <input type="hidden" name="id" value="{{ .ID }}">
<button class="btn" type="submit">Delete</button> <button class="btn" type="submit">Delete</button>
</form> </form>
@@ -62,7 +84,7 @@
</td> </td>
</tr> </tr>
{{ else }} {{ else }}
<tr><td colspan="5" class="muted">Keine Dateien vorhanden.</td></tr> <tr><td colspan="7" class="muted">Keine Dateien vorhanden.</td></tr>
{{ end }} {{ end }}
</tbody> </tbody>
</table> </table>

View File

@@ -2,6 +2,9 @@ package filesvc
import ( import (
"context" "context"
"encoding/json"
"os"
"path/filepath"
"slices" "slices"
"strings" "strings"
"sync" "sync"
@@ -11,15 +14,82 @@ import (
type MemStore struct { type MemStore struct {
mu sync.Mutex mu sync.Mutex
items map[ID]File items map[ID]File
self string
// optionales Eventing // optionales Eventing
subs []chan ChangeEvent subs []chan ChangeEvent
persistPath string
} }
func NewMemStore() *MemStore { func NewMemStore(self string) *MemStore {
return &MemStore{ return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)}
items: make(map[ID]File), }
func NewMemStorePersistent(self, path string) *MemStore {
m := NewMemStore(self)
m.persistPath = strings.TrimSpace(path)
// beim Start versuchen zu laden
_ = m.loadFromDisk()
return m
}
// --- Persistenz-Helper (NEU) ---
func (m *MemStore) loadFromDisk() error {
if m.persistPath == "" {
return nil
} }
f, err := os.Open(m.persistPath)
if err != nil {
return nil // Datei existiert beim ersten Start nicht ok
}
defer f.Close()
var snap Snapshot
if err := json.NewDecoder(f).Decode(&snap); err != nil {
return err
}
m.mu.Lock()
for _, it := range snap.Items {
m.items[it.ID] = it
}
m.mu.Unlock()
return nil
}
func (m *MemStore) saveLocked() error {
if m.persistPath == "" {
return nil
}
if err := os.MkdirAll(filepath.Dir(m.persistPath), 0o755); err != nil {
return err
}
// Snapshot aus Map bauen
snap := Snapshot{Items: make([]File, 0, len(m.items))}
for _, it := range m.items {
snap.Items = append(snap.Items, it)
}
// atomar schreiben
tmp := m.persistPath + ".tmp"
f, err := os.Create(tmp)
if err != nil {
return err
}
enc := json.NewEncoder(f)
enc.SetIndent("", " ")
if err := enc.Encode(&snap); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Sync(); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return err
}
return os.Rename(tmp, m.persistPath)
} }
/*** Store ***/ /*** Store ***/
@@ -102,8 +172,9 @@ func (m *MemStore) Create(_ context.Context, name string) (File, error) {
if err != nil { if err != nil {
return File{}, err return File{}, err
} }
it := File{ID: uid, Name: name, UpdatedAt: now} it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self}
m.items[it.ID] = it m.items[it.ID] = it
_ = m.saveLocked()
m.emit(it) m.emit(it)
return it, nil return it, nil
} }
@@ -120,9 +191,13 @@ func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error
if !ok || it.Deleted { if !ok || it.Deleted {
return File{}, ErrNotFound return File{}, ErrNotFound
} }
it.Name = newName if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
it.Name = strings.TrimSpace(newName)
it.UpdatedAt = time.Now().UnixNano() it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it m.items[id] = it
_ = m.saveLocked()
m.emit(it) m.emit(it)
return it, nil return it, nil
} }
@@ -134,16 +209,46 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) {
if !ok { if !ok {
return File{}, ErrNotFound return File{}, ErrNotFound
} }
if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
if it.Deleted { if it.Deleted {
return it, nil return it, nil
} }
it.Deleted = true it.Deleted = true
it.UpdatedAt = time.Now().UnixNano() it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it m.items[id] = it
_ = m.saveLocked()
m.emit(it) m.emit(it)
return it, nil return it, nil
} }
func (m *MemStore) TakeoverOwner(_ context.Context, id ID, newOwner string) (File, error) {
m.mu.Lock()
defer m.mu.Unlock()
it, ok := m.items[id]
if !ok || it.Deleted {
return File{}, ErrNotFound
}
newOwner = strings.TrimSpace(newOwner)
if newOwner == "" {
return File{}, ErrBadInput
}
// Sicherheit: nur für sich selbst übernehmen
if newOwner != m.self {
return File{}, ErrForbidden
}
if it.Owner == newOwner {
return it, nil
}
it.Owner = newOwner
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
_ = m.saveLocked()
m.emitLocked(it)
return it, nil
}
/*** Replicable ***/ /*** Replicable ***/
func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) { func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
@@ -159,13 +264,22 @@ func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error { func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
changed := false
for _, ri := range s.Items { for _, ri := range s.Items {
li, ok := m.items[ri.ID] li, ok := m.items[ri.ID]
if !ok || ri.UpdatedAt > li.UpdatedAt { if !ok || ri.UpdatedAt > li.UpdatedAt {
// Owner nie überschreiben, außer er ist leer
if ok && li.Owner != "" && ri.Owner != "" && ri.Owner != li.Owner {
ri.Owner = li.Owner
}
m.items[ri.ID] = ri m.items[ri.ID] = ri
changed = true
m.emitLocked(ri) m.emitLocked(ri)
} }
} }
if changed {
_ = m.saveLocked() // ← NEU
}
return nil return nil
} }

View File

@@ -13,11 +13,11 @@ import (
type ID = string type ID = string
type File struct { type File struct {
ID ID `json:"id"` ID ID `json:"id"`
Name string `json:"name"` Name string `json:"name"`
// weitere Metadaten optional: Size, Hash, Owner, Tags, ... UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW
UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete
Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete Owner string `json:"owner"` //AdvertURL/NodeID des Erzeugers
} }
/*** Fehler ***/ /*** Fehler ***/
@@ -36,11 +36,10 @@ type Store interface {
// Lesen & Auflisten // Lesen & Auflisten
Get(ctx context.Context, id ID) (File, error) Get(ctx context.Context, id ID) (File, error)
List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error) List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error)
// Mutationen mit LWW-Semantik (UpdatedAt wird intern gesetzt, außer bei ApplyRemote)
Create(ctx context.Context, name string) (File, error) Create(ctx context.Context, name string) (File, error)
Rename(ctx context.Context, id ID, newName string) (File, error) Rename(ctx context.Context, id ID, newName string) (File, error) // nur Owner darf
Delete(ctx context.Context, id ID) (File, error) Delete(ctx context.Context, id ID) (File, error) // nur Owner darf
TakeoverOwner(ctx context.Context, id ID, newOwner string) (File, error)
} }
/*** Mesh-Replikation ***/ /*** Mesh-Replikation ***/

View File

@@ -12,6 +12,7 @@ import (
"hash/crc32" "hash/crc32"
"io" "io"
"log" "log"
"math/rand/v2"
"net" "net"
"net/http" "net/http"
"os" "os"
@@ -33,6 +34,15 @@ type Config struct {
DiscoveryAddress string // "239.8.8.8:9898" DiscoveryAddress string // "239.8.8.8:9898"
PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten) PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten)
PruneInterval time.Duration // wie oft wird gepruned PruneInterval time.Duration // wie oft wird gepruned
SyncInterval time.Duration
Fanout int
// NEU:
HelloInterval time.Duration // wie oft pingen
HelloFanout int // wie viele Peers pro Tick
BlobTimeout time.Duration
} }
type Peer struct { type Peer struct {
@@ -47,6 +57,7 @@ type Item struct {
Name string `json:"name"` Name string `json:"name"`
UpdatedAt int64 `json:"updatedAt"` UpdatedAt int64 `json:"updatedAt"`
Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes
Owner string `json:"owner"`
} }
type Snapshot struct { type Snapshot struct {
@@ -92,6 +103,8 @@ func (n *Node) RemovePeer(url string) bool {
return false return false
} }
func (n *Node) Config() Config { return n.cfg }
// PruneNow entfernt alle Peers, deren LastSeen vor cutoff liegt (Seeds bleiben). // PruneNow entfernt alle Peers, deren LastSeen vor cutoff liegt (Seeds bleiben).
func (n *Node) PruneNow(cutoff time.Time) int { func (n *Node) PruneNow(cutoff time.Time) int {
n.mu.Lock() n.mu.Lock()
@@ -187,21 +200,38 @@ func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "bad signature", http.StatusUnauthorized) http.Error(w, "bad signature", http.StatusUnauthorized)
return return
} }
var p Peer var req struct {
if err := json.Unmarshal(body, &p); err != nil { URL string `json:"url"`
}
if err := json.Unmarshal(body, &req); err != nil || strings.TrimSpace(req.URL) == "" {
http.Error(w, "bad json", http.StatusBadRequest) http.Error(w, "bad json", http.StatusBadRequest)
return return
} }
p.LastSeen = time.Now()
// Peer anlegen (falls neu) und LastSeen setzen
n.mu.Lock() n.mu.Lock()
if existing, ok := n.peers[p.URL]; ok { if req.URL != n.self.URL {
existing.LastSeen = p.LastSeen if p, ok := n.peers[req.URL]; ok {
} else if p.URL != n.self.URL { p.LastSeen = time.Now()
cp := p } else {
n.peers[p.URL] = &cp cp := Peer{URL: req.URL, LastSeen: time.Now()} // weitere Felder wie bekannt
n.peers[req.URL] = &cp
}
}
n.mu.Unlock()
w.WriteHeader(http.StatusOK)
}
func (n *Node) touchPeer(url string) {
if strings.TrimSpace(url) == "" {
return
}
n.mu.Lock()
if p, ok := n.peers[url]; ok {
p.LastSeen = time.Now()
} }
n.mu.Unlock() n.mu.Unlock()
w.WriteHeader(http.StatusNoContent)
} }
func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) { func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) {
@@ -280,6 +310,12 @@ func (n *Node) Serve() error {
n.loopAntiEntropy() n.loopAntiEntropy()
}() }()
n.wg.Add(1)
go func() {
defer n.wg.Done()
n.loopHello()
}()
// http server // http server
errc := make(chan error, 1) errc := make(chan error, 1)
go func() { go func() {
@@ -464,28 +500,100 @@ func (n *Node) loopBeaconRecv() {
/*** Outgoing ***/ /*** Outgoing ***/
func (n *Node) sendHello(url string) error { func (n *Node) sendHello(url string) error {
p := n.self b, _ := json.Marshal(struct {
b, _ := json.Marshal(p) URL string `json:"url"`
}{URL: n.self.URL})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b)) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b)) req.Header.Set("X-Mesh-Sig", n.sign(b))
resp, err := n.client.Do(req) resp, err := n.client.Do(req)
if err == nil { if err != nil {
io.Copy(io.Discard, resp.Body) return err
resp.Body.Close() }
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // eigenen Blick auf den Peer direkt als „gesehen“ markieren
return nil
}
return fmt.Errorf("hello %s: %s", url, resp.Status)
}
func (n *Node) loopHello() {
interval := n.cfg.HelloInterval
if interval <= 0 {
interval = 20 * time.Second
}
fanout := n.cfg.HelloFanout
if fanout <= 0 {
fanout = 8
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
}
// Liste der *bekannten* Peers (nicht nur Seeds)
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
if url != n.self.URL {
targets = append(targets, url)
}
}
n.mu.RUnlock()
if len(targets) == 0 {
continue
}
// zufällig mischen und auf Fanout begrenzen
rand.Shuffle(len(targets), func(i, j int) { targets[i], targets[j] = targets[j], targets[i] })
if fanout < len(targets) {
targets = targets[:fanout]
}
// leicht parallel pingen
var wg sync.WaitGroup
for _, u := range targets {
u := u
wg.Add(1)
go func() {
defer wg.Done()
_ = n.sendHello(u)
}()
}
wg.Wait()
} }
return err
} }
func (n *Node) sendSync(url string, s Snapshot) error { func (n *Node) sendSync(url string, s Snapshot) error {
b, _ := json.Marshal(s) b, _ := json.Marshal(s)
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b)) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b)) req.Header.Set("X-Mesh-Sig", n.sign(b))
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
req = req.WithContext(ctx)
resp, err := n.client.Do(req) resp, err := n.client.Do(req)
if err == nil { if err != nil {
io.Copy(io.Discard, resp.Body) return err
resp.Body.Close()
} }
return err defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("sync %s: %s", url, resp.Status)
}
n.touchPeer(url)
return nil
} }
// PeerList liefert eine Kopie der bekannten Peers inkl. Self. // PeerList liefert eine Kopie der bekannten Peers inkl. Self.
@@ -605,14 +713,37 @@ func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) {
_, _ = io.Copy(w, rc) _, _ = io.Copy(w, rc)
} }
// interner Helper: signierter Blob-Request an einen Peer // sendBlobRequest schickt eine signierte Anfrage an /mesh/blob und liefert die Response
func (n *Node) sendBlobRequest(url string, id string) (*http.Response, error) { // zurück (Caller MUSS resp.Body schließen!). Bei HTTP 200 wird der Peer als gesehen markiert.
func (n *Node) sendBlobRequest(url, id string) (*http.Response, error) {
b, _ := json.Marshal(struct { b, _ := json.Marshal(struct {
ID string `json:"id"` ID string `json:"id"`
}{ID: id}) }{ID: id})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b)) req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b)) req.Header.Set("X-Mesh-Sig", n.sign(b))
return n.client.Do(req) req.Header.Set("Content-Type", "application/json")
// ❗ WICHTIG: kein kurzer Timeout. Optional: großer Timeout aus Config
ctx := context.Background()
if n.cfg.BlobTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), n.cfg.BlobTimeout)
defer cancel()
}
req = req.WithContext(ctx)
resp, err := n.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // ausgehender Erfolg zählt als "gesehen"
return resp, nil
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
return nil, fmt.Errorf("blob %s from %s: %s", id, url, resp.Status)
} }
// Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen // Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen