diff --git a/cmd/unified/main.go b/cmd/unified/main.go index b14dccf..dec0654 100644 --- a/cmd/unified/main.go +++ b/cmd/unified/main.go @@ -257,6 +257,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store) } it, err := store.Rename(r.Context(), in.ID, in.Name) if err != nil { + if errors.Is(err, filesvc.ErrForbidden) { + writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"}) + return + } status := http.StatusBadRequest if errors.Is(err, filesvc.ErrNotFound) { status = http.StatusNotFound @@ -283,6 +287,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store) it, err := store.Delete(r.Context(), in.ID) _ = blobs.Delete(r.Context(), string(in.ID)) if err != nil { + if errors.Is(err, filesvc.ErrForbidden) { + writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"}) + return + } status := http.StatusBadRequest if errors.Is(err, filesvc.ErrNotFound) { status = http.StatusNotFound @@ -363,13 +371,23 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m } // 3) remote holen & cachen + it1, _ := store.Get(r.Context(), filesvc.ID(id)) + peers := meshNode.PeerList() + ttl := 2 * time.Minute + if cfg := meshNode.Config(); cfg.PeerTTL > 0 { + ttl = cfg.PeerTTL + } + if !isOwnerActive(it1.Owner, peers, ttl) { + http.NotFound(w, r) + return + } rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id) if err != nil { http.NotFound(w, r) return } defer rrc.Close() - if _, err := blobs.Save(r.Context(), id, it.Name, rrc); err != nil { + if _, err := blobs.Save(r.Context(), id, it1.Name, rrc); err != nil { http.Error(w, "cache failed", http.StatusInternalServerError) return } @@ -381,7 +399,7 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m return } defer lrc.Close() - serveBlob(w, r, lrc, meta, it.Name) + serveBlob(w, r, lrc, meta, it1.Name) }) } @@ -395,6 +413,7 @@ func toMeshSnapshot(s filesvc.Snapshot) mesh.Snapshot { Name: it.Name, UpdatedAt: it.UpdatedAt, Deleted: it.Deleted, + Owner: it.Owner, }) } return out @@ -408,18 +427,45 @@ func fromMeshSnapshot(ms mesh.Snapshot) filesvc.Snapshot { Name: it.Name, UpdatedAt: it.UpdatedAt, Deleted: it.Deleted, + Owner: it.Owner, }) } return out } +// isOwnerActive prüft, ob der Owner in der Peer-Liste als "aktiv" gilt. +func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool { + owner = strings.TrimSpace(owner) + if owner == "" { + return true + } + cutoff := time.Now().Add(-ttl) + for _, p := range peers { + if strings.TrimSpace(p.URL) != owner { + continue + } + // Self ist per Definition aktiv + if p.Self { + return true + } + // ohne LastSeen: als inaktiv behandeln + if p.LastSeen.IsZero() { + return false + } + return p.LastSeen.After(cutoff) + } + // Owner ist nicht mal in der Liste: inaktiv + return false +} + /*** main ***/ func main() { cfg := loadConfig() // Domain-Store (mesh-fähig) - st := filesvc.NewMemStore() + nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL) + st := filesvc.NewMemStore(nodeID) // Mesh starten //mcfg := mesh.FromEnv() @@ -463,6 +509,47 @@ func main() { } }() + go func() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + for range ticker.C { + // aktive Owner bestimmen + peers := mnode.PeerList() + ttl := 2 * time.Minute + if cfg := mnode.Config(); cfg.PeerTTL > 0 { + ttl = cfg.PeerTTL + } + cutoff := time.Now().Add(-ttl) + + active := map[string]bool{} + for _, p := range peers { + if p.Self { + active[p.URL] = true + continue + } + if !p.LastSeen.IsZero() && p.LastSeen.After(cutoff) { + active[p.URL] = true + } + } + + // alle Items durchgehen; Blobs von Offline-Ownern löschen + var next filesvc.ID + for { + items, nextOut, _ := st.List(context.Background(), next, 1000) + for _, it := range items { + if it.Owner == "" || active[it.Owner] { + continue + } + _ = blobs.Delete(context.Background(), it.ID) + } + if nextOut == "" { + break + } + next = nextOut + } + } + }() + // Root-Mux root := http.NewServeMux() diff --git a/internal/admin/admin.go b/internal/admin/admin.go index 25600da..54d9da1 100644 --- a/internal/admin/admin.go +++ b/internal/admin/admin.go @@ -60,21 +60,32 @@ func Register(mux *http.ServeMux, d Deps) { items, nextOut, _ := d.Store.List(r.Context(), nextID, 100) type row struct { - ID string - Name string - UpdatedAt int64 - HasBlob bool - Size int64 + ID string + Name string + UpdatedAt int64 + HasBlob bool + Size int64 + Owner string + OwnerActive bool } rows := make([]row, 0, len(items)) + + peers := d.Mesh.PeerList() + ttl := 2 * time.Minute + if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 { + ttl = cfg.PeerTTL + } + for _, it := range items { meta, ok, _ := d.Blob.Stat(r.Context(), it.ID) rows = append(rows, row{ - ID: it.ID, - Name: it.Name, - UpdatedAt: it.UpdatedAt, - HasBlob: ok, - Size: meta.Size, + ID: it.ID, + Name: it.Name, + UpdatedAt: it.UpdatedAt, + HasBlob: ok, + Size: meta.Size, + Owner: it.Owner, + OwnerActive: isOwnerActive(it.Owner, peers, ttl), }) } @@ -84,6 +95,42 @@ func Register(mux *http.ServeMux, d Deps) { }) }) + mux.HandleFunc("/admin/items/takeover", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + id := strings.TrimSpace(r.FormValue("id")) + if id != "" { + // Nur zulassen, wenn Owner tatsächlich offline ist + it, err := d.Store.Get(r.Context(), filesvc.ID(id)) + if err == nil { + peers := d.Mesh.PeerList() + ttl := 2 * time.Minute + if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 { + ttl = cfg.PeerTTL + } + if !isOwnerActive(it.Owner, peers, ttl) { + // eigene URL aus PeerList ermitteln + self := "" + for _, p := range peers { + if p.Self { + self = p.URL + break + } + } + if self == "" { + self = "unknown-self" + } + if _, err := d.Store.TakeoverOwner(r.Context(), filesvc.ID(id), self); err == nil { + _ = d.Mesh.SyncNow(r.Context()) + } + } + } + } + renderItemsPartial(w, r, d) + }) + // Upload (multipart/form-data, Feldname "file", optional name-Override) mux.HandleFunc("/admin/files/upload", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { @@ -258,25 +305,32 @@ func BasicAuth(user, pass string, next http.Handler) http.Handler { // rebuild & render items partial for HTMX swaps func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) { type row struct { - ID string - Name string - UpdatedAt int64 - HasBlob bool - Size int64 + ID string + Name string + UpdatedAt int64 + HasBlob bool + Size int64 + Owner string + OwnerActive bool } nextID := filesvc.ID(strings.TrimSpace(r.URL.Query().Get("next"))) items, nextOut, _ := d.Store.List(r.Context(), nextID, 100) + peers := d.Mesh.PeerList() + ttl := 2 * time.Minute + rows := make([]row, 0, len(items)) for _, it := range items { meta, ok, _ := d.Blob.Stat(r.Context(), (it.ID)) rows = append(rows, row{ - ID: (it.ID), - Name: it.Name, - UpdatedAt: it.UpdatedAt, - HasBlob: ok, - Size: meta.Size, + ID: (it.ID), + Name: it.Name, + UpdatedAt: it.UpdatedAt, + HasBlob: ok, + Size: meta.Size, + Owner: it.Owner, + OwnerActive: isOwnerActive(it.Owner, peers, ttl), }) } w.Header().Set("Content-Type", "text/html; charset=utf-8") @@ -285,3 +339,23 @@ func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) { "Next": nextOut, }) } + +func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool { + if strings.TrimSpace(owner) == "" { + return true + } + cutoff := time.Now().Add(-ttl) + for _, p := range peers { + if strings.TrimSpace(p.URL) == strings.TrimSpace(owner) { + // Self ist immer aktiv, sonst nach LastSeen + if p.Self { + return true + } + if p.LastSeen.IsZero() { + return false + } + return p.LastSeen.After(cutoff) + } + } + return false +} diff --git a/internal/admin/tpl/partials_items.html b/internal/admin/tpl/partials_items.html index 5a2d8d1..3023b23 100644 --- a/internal/admin/tpl/partials_items.html +++ b/internal/admin/tpl/partials_items.html @@ -24,12 +24,27 @@ - + {{ range .Items }} + + diff --git a/internal/filesvc/memstore.go b/internal/filesvc/memstore.go index 2e0d40e..4425535 100644 --- a/internal/filesvc/memstore.go +++ b/internal/filesvc/memstore.go @@ -11,15 +11,13 @@ import ( type MemStore struct { mu sync.Mutex items map[ID]File - + self string // optionales Eventing subs []chan ChangeEvent } -func NewMemStore() *MemStore { - return &MemStore{ - items: make(map[ID]File), - } +func NewMemStore(self string) *MemStore { + return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)} } /*** Store ***/ @@ -102,7 +100,7 @@ func (m *MemStore) Create(_ context.Context, name string) (File, error) { if err != nil { return File{}, err } - it := File{ID: uid, Name: name, UpdatedAt: now} + it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self} m.items[it.ID] = it m.emit(it) return it, nil @@ -120,7 +118,10 @@ func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error if !ok || it.Deleted { return File{}, ErrNotFound } - it.Name = newName + if it.Owner != "" && it.Owner != m.self { // ← nur Owner + return File{}, ErrForbidden + } + it.Name = strings.TrimSpace(newName) it.UpdatedAt = time.Now().UnixNano() m.items[id] = it m.emit(it) @@ -134,6 +135,9 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) { if !ok { return File{}, ErrNotFound } + if it.Owner != "" && it.Owner != m.self { // ← nur Owner + return File{}, ErrForbidden + } if it.Deleted { return it, nil } @@ -144,6 +148,31 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) { return it, nil } +func (m *MemStore) TakeoverOwner(_ context.Context, id ID, newOwner string) (File, error) { + m.mu.Lock() + defer m.mu.Unlock() + it, ok := m.items[id] + if !ok || it.Deleted { + return File{}, ErrNotFound + } + newOwner = strings.TrimSpace(newOwner) + if newOwner == "" { + return File{}, ErrBadInput + } + // Sicherheit: nur für sich selbst übernehmen + if newOwner != m.self { + return File{}, ErrForbidden + } + if it.Owner == newOwner { + return it, nil + } + it.Owner = newOwner + it.UpdatedAt = time.Now().UnixNano() + m.items[id] = it + m.emitLocked(it) + return it, nil +} + /*** Replicable ***/ func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) { @@ -162,6 +191,10 @@ func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error { for _, ri := range s.Items { li, ok := m.items[ri.ID] if !ok || ri.UpdatedAt > li.UpdatedAt { + // Owner nie überschreiben, außer er ist leer + if ok && li.Owner != "" && ri.Owner != "" && ri.Owner != li.Owner { + ri.Owner = li.Owner + } m.items[ri.ID] = ri m.emitLocked(ri) } diff --git a/internal/filesvc/store.go b/internal/filesvc/store.go index 6eeb5c3..d67c997 100644 --- a/internal/filesvc/store.go +++ b/internal/filesvc/store.go @@ -13,11 +13,11 @@ import ( type ID = string type File struct { - ID ID `json:"id"` - Name string `json:"name"` - // weitere Metadaten optional: Size, Hash, Owner, Tags, ... - UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW - Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete + ID ID `json:"id"` + Name string `json:"name"` + UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW + Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete + Owner string `json:"owner"` //AdvertURL/NodeID des Erzeugers } /*** Fehler ***/ @@ -36,11 +36,10 @@ type Store interface { // Lesen & Auflisten Get(ctx context.Context, id ID) (File, error) List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error) - - // Mutationen mit LWW-Semantik (UpdatedAt wird intern gesetzt, außer bei ApplyRemote) Create(ctx context.Context, name string) (File, error) - Rename(ctx context.Context, id ID, newName string) (File, error) - Delete(ctx context.Context, id ID) (File, error) + Rename(ctx context.Context, id ID, newName string) (File, error) // nur Owner darf + Delete(ctx context.Context, id ID) (File, error) // nur Owner darf + TakeoverOwner(ctx context.Context, id ID, newOwner string) (File, error) } /*** Mesh-Replikation ***/ diff --git a/internal/mesh/mesh.go b/internal/mesh/mesh.go index 1b18fc7..ee28e17 100644 --- a/internal/mesh/mesh.go +++ b/internal/mesh/mesh.go @@ -47,6 +47,7 @@ type Item struct { Name string `json:"name"` UpdatedAt int64 `json:"updatedAt"` Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes + Owner string `json:"owner"` } type Snapshot struct { @@ -92,6 +93,8 @@ func (n *Node) RemovePeer(url string) bool { return false } +func (n *Node) Config() Config { return n.cfg } + // PruneNow entfernt alle Peers, deren LastSeen vor cutoff liegt (Seeds bleiben). func (n *Node) PruneNow(cutoff time.Time) int { n.mu.Lock()
IDNameUpdatedBlobAktionenOwnerStatusIDNameUpdatedBlobAktionen
{{ .Owner }} + {{ if .OwnerActive }} + online + {{ else }} + offline +
+ + +
+ {{ end }} +
{{ .ID }} {{ .Name }} {{ printf "%.19s" (timeRFC3339 .UpdatedAt) }}