Compare commits

...

7 Commits

Author SHA1 Message Date
42b0493adb fix-2
All checks were successful
release-tag / release-image (push) Successful in 1m26s
2025-09-30 00:34:24 +02:00
b3c1f37632 fix
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:19:39 +02:00
40ded4d4db Bugfix: cache failed
All checks were successful
release-tag / release-image (push) Successful in 1m30s
2025-09-30 00:09:57 +02:00
291cfa33a9 Persistenz implementiert : Test-3
All checks were successful
release-tag / release-image (push) Successful in 1m27s
2025-09-29 23:55:35 +02:00
74fef30251 Test-2
All checks were successful
release-tag / release-image (push) Successful in 1m31s
2025-09-29 23:44:08 +02:00
b7729e8b39 Test-1
All checks were successful
release-tag / release-image (push) Successful in 1m52s
2025-09-29 23:08:34 +02:00
9122ebf2f1 vor temaplte 2025-09-29 23:04:46 +02:00
6 changed files with 552 additions and 91 deletions

View File

@@ -10,6 +10,7 @@ import (
"net/http"
"os"
"os/signal"
"path/filepath"
"strconv"
"strings"
"syscall"
@@ -55,6 +56,11 @@ func loadConfig() AppConfig {
m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute)
m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second)
m.HelloInterval = parseDuration(os.Getenv("MESH_HELLO_INTERVAL"), 20*time.Second)
m.HelloFanout = parseIntEnv(os.Getenv("MESH_HELLO_FANOUT"), 8)
m.BlobTimeout = parseDuration(os.Getenv("MESH_BLOB_TIMEOUT"), 0)
// Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung:
if strings.TrimSpace(m.AdvertURL) == "" {
m.AdvertURL = inferAdvertURL(m.BindAddr)
@@ -101,6 +107,18 @@ func parseBoolEnv(k string, def bool) bool {
return v == "1" || v == "true" || v == "yes" || v == "on"
}
func parseIntEnv(k string, def int) int {
v := strings.TrimSpace(os.Getenv(k))
if v == "" {
return def
}
n, err := strconv.Atoi(v)
if err != nil {
return def
}
return n
}
func splitCSV(s string) []string {
s = strings.TrimSpace(s)
if s == "" {
@@ -257,6 +275,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store)
}
it, err := store.Rename(r.Context(), in.ID, in.Name)
if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound
@@ -283,6 +305,10 @@ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store)
it, err := store.Delete(r.Context(), in.ID)
_ = blobs.Delete(r.Context(), string(in.ID))
if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound
@@ -363,13 +389,23 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
}
// 3) remote holen & cachen
it1, _ := store.Get(r.Context(), filesvc.ID(id))
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if cfg := meshNode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it1.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id)
if err != nil {
http.NotFound(w, r)
return
}
defer rrc.Close()
if _, err := blobs.Save(r.Context(), id, it.Name, rrc); err != nil {
if _, err := blobs.Save(r.Context(), id, it1.Name, rrc); err != nil {
http.Error(w, "cache failed", http.StatusInternalServerError)
return
}
@@ -381,7 +417,7 @@ func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, m
return
}
defer lrc.Close()
serveBlob(w, r, lrc, meta, it.Name)
serveBlob(w, r, lrc, meta, it1.Name)
})
}
@@ -395,6 +431,7 @@ func toMeshSnapshot(s filesvc.Snapshot) mesh.Snapshot {
Name: it.Name,
UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted,
Owner: it.Owner,
})
}
return out
@@ -408,22 +445,52 @@ func fromMeshSnapshot(ms mesh.Snapshot) filesvc.Snapshot {
Name: it.Name,
UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted,
Owner: it.Owner,
})
}
return out
}
// isOwnerActive prüft, ob der Owner in der Peer-Liste als "aktiv" gilt.
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
owner = strings.TrimSpace(owner)
if owner == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) != owner {
continue
}
// Self ist per Definition aktiv
if p.Self {
return true
}
// ohne LastSeen: als inaktiv behandeln
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
// Owner ist nicht mal in der Liste: inaktiv
return false
}
/*** main ***/
func main() {
cfg := loadConfig()
// Domain-Store (mesh-fähig)
st := filesvc.NewMemStore()
nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL)
//st := filesvc.NewMemStore(nodeID)
// Mesh starten
//mcfg := mesh.FromEnv()
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
dataDir := getenvDefault("DATA_DIR", "./data")
metaPath := filepath.Join(dataDir, "meta", "items.json")
st := filesvc.NewMemStorePersistent(nodeID, metaPath)
mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
@@ -463,6 +530,47 @@ func main() {
}
}()
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// aktive Owner bestimmen
peers := mnode.PeerList()
ttl := 2 * time.Minute
if cfg := mnode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
cutoff := time.Now().Add(-ttl)
active := map[string]bool{}
for _, p := range peers {
if p.Self {
active[p.URL] = true
continue
}
if !p.LastSeen.IsZero() && p.LastSeen.After(cutoff) {
active[p.URL] = true
}
}
// alle Items durchgehen; Blobs von Offline-Ownern löschen
var next filesvc.ID
for {
items, nextOut, _ := st.List(context.Background(), next, 1000)
for _, it := range items {
if it.Owner == "" || active[it.Owner] {
continue
}
_ = blobs.Delete(context.Background(), it.ID)
}
if nextOut == "" {
break
}
next = nextOut
}
}
}()
// Root-Mux
root := http.NewServeMux()
@@ -533,51 +641,64 @@ func registerPublicDownloads(mux *http.ServeMux, store filesvc.MeshStore, blobs
base = "/" + base
}
mux.HandleFunc(base+"/", func(w http.ResponseWriter, r *http.Request) {
idStr := strings.TrimPrefix(r.URL.Path, base+"/")
if idStr == "" {
http.NotFound(w, r)
return
}
id := idStr
if strings.TrimSpace(id) == "" {
id := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, base+"/"))
if id == "" {
http.NotFound(w, r)
return
}
// 1) Metadaten prüfen (nicht vorhanden oder gelöscht → 404)
// 1) Metadaten prüfen
it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted {
http.NotFound(w, r)
return
}
// 2) lokal versuchen
if rc, meta, err := blobs.Open(r.Context(), id); err == nil {
// 2) Lokal versuchen
if rc, meta, err := blobs.Open(context.Background(), id); err == nil {
defer rc.Close()
serveBlob(w, r, rc, meta, it.Name) // Download-Name aus Store!
serveBlob(w, r, rc, meta, it.Name)
return
}
// 3) aus Mesh holen (signiert) und cachen
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id)
// (Optional) Owner-Online-Check — wenn du auch bei offline Ownern liefern willst, block auskommentieren
{
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if !isOwnerActive(it.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
}
// 3) Aus Mesh holen — EIGENER Timeout-Kontext, NICHT r.Context()
rrc, remoteName, _, _, err := meshNode.FetchBlobAny(context.Background(), id)
if err != nil {
http.NotFound(w, r)
return
}
defer rrc.Close()
if _, err := blobs.Save(r.Context(), id, it.Name, rrc); err != nil {
http.Error(w, "cache failed", http.StatusInternalServerError)
filename := strings.TrimSpace(remoteName)
if filename == "" {
filename = it.Name
}
// 4) Lokal cachen — KEIN Request-Kontext, damit Save nicht abbricht
if _, err := blobs.Save(context.Background(), id, filename, rrc); err != nil {
log.Printf("[public] cache save failed id=%s name=%q: %v", id, filename, err)
http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError)
return
}
// 4) erneut lokal öffnen und streamen
lrc, meta, err := blobs.Open(r.Context(), id)
// 5) Erneut lokal öffnen und streamen
lrc, meta, err := blobs.Open(context.Background(), id)
if err != nil {
http.Error(w, "open failed", http.StatusInternalServerError)
http.Error(w, "open failed: "+err.Error(), http.StatusInternalServerError)
return
}
defer lrc.Close()
serveBlob(w, r, lrc, meta, it.Name)
serveBlob(w, r, lrc, meta, filename)
})
}

View File

@@ -60,21 +60,32 @@ func Register(mux *http.ServeMux, d Deps) {
items, nextOut, _ := d.Store.List(r.Context(), nextID, 100)
type row struct {
ID string
Name string
UpdatedAt int64
HasBlob bool
Size int64
ID string
Name string
UpdatedAt int64
HasBlob bool
Size int64
Owner string
OwnerActive bool
}
rows := make([]row, 0, len(items))
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), it.ID)
rows = append(rows, row{
ID: it.ID,
Name: it.Name,
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
ID: it.ID,
Name: it.Name,
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
})
}
@@ -84,6 +95,42 @@ func Register(mux *http.ServeMux, d Deps) {
})
})
mux.HandleFunc("/admin/items/takeover", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
id := strings.TrimSpace(r.FormValue("id"))
if id != "" {
// Nur zulassen, wenn Owner tatsächlich offline ist
it, err := d.Store.Get(r.Context(), filesvc.ID(id))
if err == nil {
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
if cfg := d.Mesh.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it.Owner, peers, ttl) {
// eigene URL aus PeerList ermitteln
self := ""
for _, p := range peers {
if p.Self {
self = p.URL
break
}
}
if self == "" {
self = "unknown-self"
}
if _, err := d.Store.TakeoverOwner(r.Context(), filesvc.ID(id), self); err == nil {
_ = d.Mesh.SyncNow(r.Context())
}
}
}
}
renderItemsPartial(w, r, d)
})
// Upload (multipart/form-data, Feldname "file", optional name-Override)
mux.HandleFunc("/admin/files/upload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
@@ -258,25 +305,32 @@ func BasicAuth(user, pass string, next http.Handler) http.Handler {
// rebuild & render items partial for HTMX swaps
func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) {
type row struct {
ID string
Name string
UpdatedAt int64
HasBlob bool
Size int64
ID string
Name string
UpdatedAt int64
HasBlob bool
Size int64
Owner string
OwnerActive bool
}
nextID := filesvc.ID(strings.TrimSpace(r.URL.Query().Get("next")))
items, nextOut, _ := d.Store.List(r.Context(), nextID, 100)
peers := d.Mesh.PeerList()
ttl := 2 * time.Minute
rows := make([]row, 0, len(items))
for _, it := range items {
meta, ok, _ := d.Blob.Stat(r.Context(), (it.ID))
rows = append(rows, row{
ID: (it.ID),
Name: it.Name,
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
ID: (it.ID),
Name: it.Name,
UpdatedAt: it.UpdatedAt,
HasBlob: ok,
Size: meta.Size,
Owner: it.Owner,
OwnerActive: isOwnerActive(it.Owner, peers, ttl),
})
}
w.Header().Set("Content-Type", "text/html; charset=utf-8")
@@ -285,3 +339,23 @@ func renderItemsPartial(w http.ResponseWriter, r *http.Request, d Deps) {
"Next": nextOut,
})
}
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
if strings.TrimSpace(owner) == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) == strings.TrimSpace(owner) {
// Self ist immer aktiv, sonst nach LastSeen
if p.Self {
return true
}
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
}
return false
}

View File

@@ -24,13 +24,19 @@
<table>
<thead>
<tr>
<th>ID</th><th>Name</th><th>Updated</th><th>Blob</th><th style="width:260px">Aktionen</th>
<th>ID</th>
<th>Name</th>
<th>Updated</th>
<th>Blob</th>
<th>Owner</th>
<th>Status</th>
<th style="width:320px">Aktionen</th>
</tr>
</thead>
<tbody>
{{ range .Items }}
<tr>
<td>{{ .ID }}</td>
<td><code style="font-size:12px">{{ .ID }}</code></td>
<td>{{ .Name }}</td>
<td><small class="muted">{{ printf "%.19s" (timeRFC3339 .UpdatedAt) }}</small></td>
<td>
@@ -41,18 +47,34 @@
<span class="pill" style="background:#3a0b0b;border-color:#5b1a1a;color:#fbb;">fehlt</span>
{{ end }}
</td>
<td><small class="muted">{{ .Owner }}</small></td>
<td>
{{ if .OwnerActive }}
<span class="pill">online</span>
{{ else }}
<span class="pill" style="background:#5b1a1a;border-color:#7a2b2b;color:#fbb">offline</span>
<!-- Owner-Handover nur wenn offline -->
<form style="display:inline"
hx-post="/admin/items/takeover"
hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Owner übernehmen?');">
<input type="hidden" name="id" value="{{ .ID }}">
<button class="btn" type="submit">Übernehmen</button>
</form>
{{ end }}
</td>
<td>
<form style="display:inline-flex; gap:6px"
hx-post="/admin/items/rename"
hx-target="#items" hx-swap="outerHTML">
hx-post="/admin/items/rename"
hx-target="#items" hx-swap="outerHTML">
<input type="hidden" name="id" value="{{ .ID }}">
<input type="text" name="name" placeholder="Neuer Name">
<button class="btn" type="submit">Rename</button>
</form>
<form style="display:inline"
hx-post="/admin/items/delete"
hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Wirklich löschen (inkl. Blob)?');">
hx-post="/admin/items/delete"
hx-target="#items" hx-swap="outerHTML"
onsubmit="return confirm('Wirklich löschen (inkl. Blob)?');">
<input type="hidden" name="id" value="{{ .ID }}">
<button class="btn" type="submit">Delete</button>
</form>
@@ -62,17 +84,17 @@
</td>
</tr>
{{ else }}
<tr><td colspan="5" class="muted">Keine Dateien vorhanden.</td></tr>
<tr><td colspan="7" class="muted">Keine Dateien vorhanden.</td></tr>
{{ end }}
</tbody>
</table>
{{ if .Next }}
<div style="margin-top:10px">
<button class="btn"
hx-get="/admin/items?next={{ .Next }}"
<button class="btn"
hx-get="/admin/items?next={{ .Next }}"
hx-target="#items" hx-swap="outerHTML">Mehr laden</button>
<span class="pill">next={{ .Next }}</span>
</div>
{{ end }}
</div>
</div>

View File

@@ -2,6 +2,9 @@ package filesvc
import (
"context"
"encoding/json"
"os"
"path/filepath"
"slices"
"strings"
"sync"
@@ -11,15 +14,82 @@ import (
type MemStore struct {
mu sync.Mutex
items map[ID]File
self string
// optionales Eventing
subs []chan ChangeEvent
subs []chan ChangeEvent
persistPath string
}
func NewMemStore() *MemStore {
return &MemStore{
items: make(map[ID]File),
func NewMemStore(self string) *MemStore {
return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)}
}
func NewMemStorePersistent(self, path string) *MemStore {
m := NewMemStore(self)
m.persistPath = strings.TrimSpace(path)
// beim Start versuchen zu laden
_ = m.loadFromDisk()
return m
}
// --- Persistenz-Helper (NEU) ---
func (m *MemStore) loadFromDisk() error {
if m.persistPath == "" {
return nil
}
f, err := os.Open(m.persistPath)
if err != nil {
return nil // Datei existiert beim ersten Start nicht ok
}
defer f.Close()
var snap Snapshot
if err := json.NewDecoder(f).Decode(&snap); err != nil {
return err
}
m.mu.Lock()
for _, it := range snap.Items {
m.items[it.ID] = it
}
m.mu.Unlock()
return nil
}
func (m *MemStore) saveLocked() error {
if m.persistPath == "" {
return nil
}
if err := os.MkdirAll(filepath.Dir(m.persistPath), 0o755); err != nil {
return err
}
// Snapshot aus Map bauen
snap := Snapshot{Items: make([]File, 0, len(m.items))}
for _, it := range m.items {
snap.Items = append(snap.Items, it)
}
// atomar schreiben
tmp := m.persistPath + ".tmp"
f, err := os.Create(tmp)
if err != nil {
return err
}
enc := json.NewEncoder(f)
enc.SetIndent("", " ")
if err := enc.Encode(&snap); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Sync(); err != nil {
f.Close()
_ = os.Remove(tmp)
return err
}
if err := f.Close(); err != nil {
_ = os.Remove(tmp)
return err
}
return os.Rename(tmp, m.persistPath)
}
/*** Store ***/
@@ -102,8 +172,9 @@ func (m *MemStore) Create(_ context.Context, name string) (File, error) {
if err != nil {
return File{}, err
}
it := File{ID: uid, Name: name, UpdatedAt: now}
it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self}
m.items[it.ID] = it
_ = m.saveLocked()
m.emit(it)
return it, nil
}
@@ -120,9 +191,13 @@ func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error
if !ok || it.Deleted {
return File{}, ErrNotFound
}
it.Name = newName
if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
it.Name = strings.TrimSpace(newName)
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
_ = m.saveLocked()
m.emit(it)
return it, nil
}
@@ -134,16 +209,46 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) {
if !ok {
return File{}, ErrNotFound
}
if it.Owner != "" && it.Owner != m.self { // ← nur Owner
return File{}, ErrForbidden
}
if it.Deleted {
return it, nil
}
it.Deleted = true
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
_ = m.saveLocked()
m.emit(it)
return it, nil
}
func (m *MemStore) TakeoverOwner(_ context.Context, id ID, newOwner string) (File, error) {
m.mu.Lock()
defer m.mu.Unlock()
it, ok := m.items[id]
if !ok || it.Deleted {
return File{}, ErrNotFound
}
newOwner = strings.TrimSpace(newOwner)
if newOwner == "" {
return File{}, ErrBadInput
}
// Sicherheit: nur für sich selbst übernehmen
if newOwner != m.self {
return File{}, ErrForbidden
}
if it.Owner == newOwner {
return it, nil
}
it.Owner = newOwner
it.UpdatedAt = time.Now().UnixNano()
m.items[id] = it
_ = m.saveLocked()
m.emitLocked(it)
return it, nil
}
/*** Replicable ***/
func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
@@ -159,13 +264,22 @@ func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
m.mu.Lock()
defer m.mu.Unlock()
changed := false
for _, ri := range s.Items {
li, ok := m.items[ri.ID]
if !ok || ri.UpdatedAt > li.UpdatedAt {
// Owner nie überschreiben, außer er ist leer
if ok && li.Owner != "" && ri.Owner != "" && ri.Owner != li.Owner {
ri.Owner = li.Owner
}
m.items[ri.ID] = ri
changed = true
m.emitLocked(ri)
}
}
if changed {
_ = m.saveLocked() // ← NEU
}
return nil
}

View File

@@ -13,11 +13,11 @@ import (
type ID = string
type File struct {
ID ID `json:"id"`
Name string `json:"name"`
// weitere Metadaten optional: Size, Hash, Owner, Tags, ...
UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW
Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete
ID ID `json:"id"`
Name string `json:"name"`
UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW
Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete
Owner string `json:"owner"` //AdvertURL/NodeID des Erzeugers
}
/*** Fehler ***/
@@ -36,11 +36,10 @@ type Store interface {
// Lesen & Auflisten
Get(ctx context.Context, id ID) (File, error)
List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error)
// Mutationen mit LWW-Semantik (UpdatedAt wird intern gesetzt, außer bei ApplyRemote)
Create(ctx context.Context, name string) (File, error)
Rename(ctx context.Context, id ID, newName string) (File, error)
Delete(ctx context.Context, id ID) (File, error)
Rename(ctx context.Context, id ID, newName string) (File, error) // nur Owner darf
Delete(ctx context.Context, id ID) (File, error) // nur Owner darf
TakeoverOwner(ctx context.Context, id ID, newOwner string) (File, error)
}
/*** Mesh-Replikation ***/

View File

@@ -12,6 +12,7 @@ import (
"hash/crc32"
"io"
"log"
"math/rand/v2"
"net"
"net/http"
"os"
@@ -33,6 +34,15 @@ type Config struct {
DiscoveryAddress string // "239.8.8.8:9898"
PeerTTL time.Duration // wie lange darf ein Peer inaktiv sein (Default siehe unten)
PruneInterval time.Duration // wie oft wird gepruned
SyncInterval time.Duration
Fanout int
// NEU:
HelloInterval time.Duration // wie oft pingen
HelloFanout int // wie viele Peers pro Tick
BlobTimeout time.Duration
}
type Peer struct {
@@ -47,6 +57,7 @@ type Item struct {
Name string `json:"name"`
UpdatedAt int64 `json:"updatedAt"`
Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes
Owner string `json:"owner"`
}
type Snapshot struct {
@@ -92,6 +103,8 @@ func (n *Node) RemovePeer(url string) bool {
return false
}
func (n *Node) Config() Config { return n.cfg }
// PruneNow entfernt alle Peers, deren LastSeen vor cutoff liegt (Seeds bleiben).
func (n *Node) PruneNow(cutoff time.Time) int {
n.mu.Lock()
@@ -187,21 +200,38 @@ func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) {
http.Error(w, "bad signature", http.StatusUnauthorized)
return
}
var p Peer
if err := json.Unmarshal(body, &p); err != nil {
var req struct {
URL string `json:"url"`
}
if err := json.Unmarshal(body, &req); err != nil || strings.TrimSpace(req.URL) == "" {
http.Error(w, "bad json", http.StatusBadRequest)
return
}
p.LastSeen = time.Now()
// Peer anlegen (falls neu) und LastSeen setzen
n.mu.Lock()
if existing, ok := n.peers[p.URL]; ok {
existing.LastSeen = p.LastSeen
} else if p.URL != n.self.URL {
cp := p
n.peers[p.URL] = &cp
if req.URL != n.self.URL {
if p, ok := n.peers[req.URL]; ok {
p.LastSeen = time.Now()
} else {
cp := Peer{URL: req.URL, LastSeen: time.Now()} // weitere Felder wie bekannt
n.peers[req.URL] = &cp
}
}
n.mu.Unlock()
w.WriteHeader(http.StatusOK)
}
func (n *Node) touchPeer(url string) {
if strings.TrimSpace(url) == "" {
return
}
n.mu.Lock()
if p, ok := n.peers[url]; ok {
p.LastSeen = time.Now()
}
n.mu.Unlock()
w.WriteHeader(http.StatusNoContent)
}
func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) {
@@ -280,6 +310,12 @@ func (n *Node) Serve() error {
n.loopAntiEntropy()
}()
n.wg.Add(1)
go func() {
defer n.wg.Done()
n.loopHello()
}()
// http server
errc := make(chan error, 1)
go func() {
@@ -464,28 +500,100 @@ func (n *Node) loopBeaconRecv() {
/*** Outgoing ***/
func (n *Node) sendHello(url string) error {
p := n.self
b, _ := json.Marshal(p)
b, _ := json.Marshal(struct {
URL string `json:"url"`
}{URL: n.self.URL})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
resp, err := n.client.Do(req)
if err == nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if err != nil {
return err
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // eigenen Blick auf den Peer direkt als „gesehen“ markieren
return nil
}
return fmt.Errorf("hello %s: %s", url, resp.Status)
}
func (n *Node) loopHello() {
interval := n.cfg.HelloInterval
if interval <= 0 {
interval = 20 * time.Second
}
fanout := n.cfg.HelloFanout
if fanout <= 0 {
fanout = 8
}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-n.stop:
return
case <-t.C:
}
// Liste der *bekannten* Peers (nicht nur Seeds)
n.mu.RLock()
targets := make([]string, 0, len(n.peers))
for url := range n.peers {
if url != n.self.URL {
targets = append(targets, url)
}
}
n.mu.RUnlock()
if len(targets) == 0 {
continue
}
// zufällig mischen und auf Fanout begrenzen
rand.Shuffle(len(targets), func(i, j int) { targets[i], targets[j] = targets[j], targets[i] })
if fanout < len(targets) {
targets = targets[:fanout]
}
// leicht parallel pingen
var wg sync.WaitGroup
for _, u := range targets {
u := u
wg.Add(1)
go func() {
defer wg.Done()
_ = n.sendHello(u)
}()
}
wg.Wait()
}
return err
}
func (n *Node) sendSync(url string, s Snapshot) error {
b, _ := json.Marshal(s)
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second)
defer cancel()
req = req.WithContext(ctx)
resp, err := n.client.Do(req)
if err == nil {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
if err != nil {
return err
}
return err
defer resp.Body.Close()
io.Copy(io.Discard, resp.Body)
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("sync %s: %s", url, resp.Status)
}
n.touchPeer(url)
return nil
}
// PeerList liefert eine Kopie der bekannten Peers inkl. Self.
@@ -605,14 +713,37 @@ func (n *Node) blobHandler(w http.ResponseWriter, r *http.Request) {
_, _ = io.Copy(w, rc)
}
// interner Helper: signierter Blob-Request an einen Peer
func (n *Node) sendBlobRequest(url string, id string) (*http.Response, error) {
// sendBlobRequest schickt eine signierte Anfrage an /mesh/blob und liefert die Response
// zurück (Caller MUSS resp.Body schließen!). Bei HTTP 200 wird der Peer als gesehen markiert.
func (n *Node) sendBlobRequest(url, id string) (*http.Response, error) {
b, _ := json.Marshal(struct {
ID string `json:"id"`
}{ID: id})
req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/blob", bytes.NewReader(b))
req.Header.Set("X-Mesh-Sig", n.sign(b))
return n.client.Do(req)
req.Header.Set("Content-Type", "application/json")
// ❗ WICHTIG: kein kurzer Timeout. Optional: großer Timeout aus Config
ctx := context.Background()
if n.cfg.BlobTimeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), n.cfg.BlobTimeout)
defer cancel()
}
req = req.WithContext(ctx)
resp, err := n.client.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode == http.StatusOK {
n.touchPeer(url) // ausgehender Erfolg zählt als "gesehen"
return resp, nil
}
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
return nil, fmt.Errorf("blob %s from %s: %s", id, url, resp.Status)
}
// Öffentliche Methode: versuche Blob bei irgendeinem Peer zu holen