vor temaplte

This commit is contained in:
2025-09-29 23:04:46 +02:00
parent 36d1c1512a
commit 9122ebf2f1
6 changed files with 251 additions and 40 deletions

View File

@@ -257,6 +257,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store)
}
it, err := store.Rename(r.Context(), in.ID, in.Name)
if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound
@@ -283,6 +287,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store)
it, err := store.Delete(r.Context(), in.ID)
_ = blobs.Delete(r.Context(), string(in.ID))
if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound
@@ -363,13 +371,23 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
}
// 3) remote holen & cachen
it1, _ := store.Get(r.Context(), filesvc.ID(id))
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if cfg := meshNode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it1.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id)
if err != nil {
http.NotFound(w, r)
return
}
defer rrc.Close()
if _, err := blobs.Save(r.Context(), id, it.Name, rrc); err != nil {
if _, err := blobs.Save(r.Context(), id, it1.Name, rrc); err != nil {
http.Error(w, "cache failed", http.StatusInternalServerError)
return
}
@@ -381,7 +399,7 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
return
}
defer lrc.Close()
serveBlob(w, r, lrc, meta, it.Name)
serveBlob(w, r, lrc, meta, it1.Name)
})
}
@@ -395,6 +413,7 @@ func toMeshSnapshot(s filesvc.Snapshot) mesh.Snapshot {
Name: it.Name,
UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted,
Owner: it.Owner,
})
}
return out
@@ -408,18 +427,45 @@ func fromMeshSnapshot(ms mesh.Snapshot) filesvc.Snapshot {
Name: it.Name,
UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted,
Owner: it.Owner,
})
}
return out
}
// isOwnerActive prüft, ob der Owner in der Peer-Liste als "aktiv" gilt.
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
owner = strings.TrimSpace(owner)
if owner == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) != owner {
continue
}
// Self ist per Definition aktiv
if p.Self {
return true
}
// ohne LastSeen: als inaktiv behandeln
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
// Owner ist nicht mal in der Liste: inaktiv
return false
}
/*** main ***/
func main() {
cfg := loadConfig()
// Domain-Store (mesh-fähig)
st := filesvc.NewMemStore()
nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL)
st := filesvc.NewMemStore(nodeID)
// Mesh starten
//mcfg := mesh.FromEnv()
@@ -463,6 +509,47 @@ func main() {
}
}()
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// aktive Owner bestimmen
peers := mnode.PeerList()
ttl := 2 * time.Minute
if cfg := mnode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
cutoff := time.Now().Add(-ttl)
active := map[string]bool{}
for _, p := range peers {
if p.Self {
active[p.URL] = true
continue
}
if !p.LastSeen.IsZero() && p.LastSeen.After(cutoff) {
active[p.URL] = true
}
}
// alle Items durchgehen; Blobs von Offline-Ownern löschen
var next filesvc.ID
for {
items, nextOut, _ := st.List(context.Background(), next, 1000)
for _, it := range items {
if it.Owner == "" || active[it.Owner] {
continue
}
_ = blobs.Delete(context.Background(), it.ID)
}
if nextOut == "" {
break
}
next = nextOut
}
}
}()
// Root-Mux
root := http.NewServeMux()

View File

@@ -65,8 +65,17 @@ func Register(mux *http.ServeMux, d Deps) {
UpdatedAt int64
HasBlob bool
Size int64
Owner string
OwnerActive bool
}
rows := make([]row, 0, len(items))
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), it.ID)
rows = append(rows, row{
@@ -75,6 +84,8 @@ func Register(mux *http.ServeMux, d Deps) {
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
})
}
@@ -84,6 +95,42 @@ func Register(mux *http.ServeMux, d Deps) {
})
})
mux.HandleFunc("/admin/items/takeover", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
id := strings.TrimSpace(r.FormValue("id"))
if id != "" {
// Nur zulassen, wenn Owner tatsächlich offline ist
it, err := d.Store.Get(r.Context(), filesvc.ID(id))
if err == nil {
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it.Owner, peers, ttl) {
// eigene URL aus PeerList ermitteln
self := ""
for _, p := range peers {
if p.Self {
self = p.URL
break
}
}
if self == "" {
self = "unknown-self"
}
if _, err := d.Store.TakeoverOwner(r.Context(), filesvc.ID(id), self); err == nil {
_ = d.Mesh.SyncNow(r.Context())
}
}
}
}
renderItemsPartial(w, r, d)
})
// Upload (multipart/form-data, Feldname "file", optional name-Override)
mux.HandleFunc("/admin/files/upload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
@@ -263,11 +310,16 @@ func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) {
UpdatedAt int64
HasBlob bool
Size int64
Owner string
OwnerActive bool
}
nextID := filesvc.ID(strings.TrimSpace(r.URL.Query().Get("next")))
items, nextOut, _ := d.Store.List(r.Context(), nextID, 100)
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
rows := make([]row, 0, len(items))
for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), (it.ID))
@@ -277,6 +329,8 @@ func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) {
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
})
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
@@ -285,3 +339,23 @@ func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) {
"Next": nextOut,
})
}
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
if strings.TrimSpace(owner) == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) == strings.TrimSpace(owner) {
// Self ist immer aktiv, sonst nach LastSeen
if p.Self {
return true
}
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
}
return false
}

View File

@@ -24,12 +24,27 @@
<table>
<thead>
<tr>
<th>ID</th><th>Name</th><th>Updated</th><th>Blob</th><th style="width:260px">Aktionen</th>
<th>Owner</th><th>Status</th><th>ID</th><th>Name</th><th>Updated</th><th>Blob</th><th style="width:260px">Aktionen</th>
</tr>
</thead>
<tbody>
{{ range .Items }}
<tr>
<td><small class="muted">{{ .Owner }}</small></td>
<td>
{{ if .OwnerActive }}
<span class="pill">online</span>
{{ else }}
<span class="pill" style="background:#5b1a1a;border-color:#7a2b2b;color:#fbb">offline</span>
<form style="display:inline"
hx-post="/admin/items/takeover"
hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Owner übernehmen?');">
<input type="hidden" name="id" value="{{ .ID }}">
<button class="btn" type="submit">Übernehmen</button>
</form>
{{ end }}
</td>
<td>{{ .ID }}</td>
<td>{{ .Name }}</td>
<td><small class="muted">{{ printf "%.19s" (timeRFC3339 .UpdatedAt) }}</small></td>

View File

@@ -11,15 +11,13 @@ import (
type MemStore struct {
mu sync.Mutex
items map[ID]File
self string
// optionales Eventing
subs []chan ChangeEvent
}
func NewMemStore() *MemStore {
return &MemStore{
items: make(map[ID]File),
}
func NewMemStore(self string) *MemStore {
return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)}
}
/*** Store ***/
@@ -102,7 +100,7 @@ func (m *MemStore) Create(_ context.Context, name string) (File, error) {
if err != nil {
return File{}, err
}
it := File{ID: uid, Name: name, UpdatedAt: now}
it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self}
m.items[it.ID] = it
m.emit(it)
return it, nil
@@ -120,7 +118,10 @@ func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error
if !ok || it.Deleted {
return File{}, ErrNotFound
}
it.Name = newName
if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
it.Name = strings.TrimSpace(newName)
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
m.emit(it)
@@ -134,6 +135,9 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) {
if !ok {
return File{}, ErrNotFound
}
if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
if it.Deleted {
return it, nil
}
@@ -144,6 +148,31 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) {
return it, nil
}
func (m *MemStore) TakeoverOwner(_ context.Context, id ID, newOwner string) (File, error) {
m.mu.Lock()
defer m.mu.Unlock()
it, ok := m.items[id]
if !ok || it.Deleted {
return File{}, ErrNotFound
}
newOwner = strings.TrimSpace(newOwner)
if newOwner == "" {
return File{}, ErrBadInput
}
// Sicherheit: nur für sich selbst übernehmen
if newOwner != m.self {
return File{}, ErrForbidden
}
if it.Owner == newOwner {
return it, nil
}
it.Owner = newOwner
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
m.emitLocked(it)
return it, nil
}
/*** Replicable ***/
func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
@@ -162,6 +191,10 @@ func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
for _, ri := range s.Items {
li, ok := m.items[ri.ID]
if !ok || ri.UpdatedAt > li.UpdatedAt {
// Owner nie überschreiben, außer er ist leer
if ok && li.Owner != "" && ri.Owner != "" && ri.Owner != li.Owner {
ri.Owner = li.Owner
}
m.items[ri.ID] = ri
m.emitLocked(ri)
}

View File

@@ -15,9 +15,9 @@ type ID = string
type File struct {
ID ID `json:"id"`
Name string `json:"name"`
// weitere Metadaten optional: Size, Hash, Owner, Tags, ...
UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW
Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete
Owner string `json:"owner"` //AdvertURL/NodeID des Erzeugers
}
/*** Fehler ***/
@@ -36,11 +36,10 @@ type Store interface {
// Lesen & Auflisten
Get(ctx context.Context, id ID) (File, error)
List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error)
// Mutationen mit LWW-Semantik (UpdatedAt wird intern gesetzt, außer bei ApplyRemote)
Create(ctx context.Context, name string) (File, error)
Rename(ctx context.Context, id ID, newName string) (File, error)
Delete(ctx context.Context, id ID) (File, error)
Rename(ctx context.Context, id ID, newName string) (File, error) // nur Owner darf
Delete(ctx context.Context, id ID) (File, error) // nur Owner darf
TakeoverOwner(ctx context.Context, id ID, newOwner string) (File, error)
}
/*** Mesh-Replikation ***/

View File

@@ -47,6 +47,7 @@ type Item struct {
Name string `json:"name"`
UpdatedAt int64 `json:"updatedAt"`
Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes
Owner string `json:"owner"`
}
type Snapshot struct {
@@ -92,6 +93,8 @@ func (n *Node) RemovePeer(url string) bool {
return false
}
func (n *Node) Config() Config { return n.cfg }
// PruneNow entfernt alle Peers, deren LastSeen vor cutoff liegt (Seeds bleiben).
func (n *Node) PruneNow(cutoff time.Time) int {
n.mu.Lock()