From de22d7c0f283d7838a0af4f00695f743fffa7458 Mon Sep 17 00:00:00 2001 From: jbergner Date: Sat, 27 Sep 2025 09:33:40 +0200 Subject: [PATCH] create_aio --- cmd/filesvc/main.go | 363 -------------------- cmd/filesvc/ui/app.js | 132 -------- cmd/filesvc/ui/index.html | 77 ----- cmd/filesvc/ui/style.css | 19 -- cmd/unified/main.go | 398 ++++++++++++++++++++++ internal/admin/admin.go | 146 ++++++++ internal/admin/tpl/layout.html | 39 +++ internal/admin/tpl/partials_items.html | 57 ++++ internal/admin/tpl/partials_peers.html | 23 ++ internal/filesvc/memstore.go | 195 +++++++++++ internal/filesvc/store.go | 78 +++++ internal/mesh/mesh.go | 451 +++++++++++++++++++++++++ internal/store/store.go | 316 ----------------- 13 files changed, 1387 insertions(+), 907 deletions(-) delete mode 100644 cmd/filesvc/main.go delete mode 100644 cmd/filesvc/ui/app.js delete mode 100644 cmd/filesvc/ui/index.html delete mode 100644 cmd/filesvc/ui/style.css create mode 100644 cmd/unified/main.go create mode 100644 internal/admin/admin.go create mode 100644 internal/admin/tpl/layout.html create mode 100644 internal/admin/tpl/partials_items.html create mode 100644 internal/admin/tpl/partials_peers.html create mode 100644 internal/filesvc/memstore.go create mode 100644 internal/filesvc/store.go create mode 100644 internal/mesh/mesh.go delete mode 100644 internal/store/store.go diff --git a/cmd/filesvc/main.go b/cmd/filesvc/main.go deleted file mode 100644 index 82b183d..0000000 --- a/cmd/filesvc/main.go +++ /dev/null @@ -1,363 +0,0 @@ -package main - -import ( - "context" - "crypto/subtle" - "embed" - "encoding/json" - "errors" - "log" - "mime" - "net/http" - "os" - "os/signal" - "path/filepath" - "strconv" - "strings" - "syscall" - "time" - - "git.send.nrw/sendnrw/decent-webui/internal/store" -) - -//go:embed ui/* -var uiFS embed.FS - -type Config struct { - ListenAddr string - DataDir string - APIKey string -} - -func (c Config) BlobDir() string { return filepath.Join(c.DataDir, "blobs") } -func (c Config) MetaDir() string { return filepath.Join(c.DataDir, "meta") } -func (c Config) TempDir() string { return filepath.Join(c.DataDir, "tmp") } - -func getenv(k, d string) string { - if v := os.Getenv(k); v != "" { - return v - } - return d -} - -func LoadConfig() Config { - addr := getenv("FILESVC_LISTEN", ":8085") - datadir := getenv("FILESVC_DATA", "/data") - key := os.Getenv("FILESVC_API_KEY") - if key == "" { - log.Println("[warn] FILESVC_API_KEY is empty — set it for protection") - } - return Config{ListenAddr: addr, DataDir: datadir, APIKey: key} -} - -type App struct { - cfg Config - store *store.Store -} - -func main() { - cfg := LoadConfig() - for _, p := range []string{cfg.DataDir, cfg.BlobDir(), cfg.MetaDir(), cfg.TempDir()} { - if err := os.MkdirAll(p, 0o755); err != nil { - log.Fatalf("mkdir %s: %v", p, err) - } - } - - st, err := store.Open(cfg.BlobDir(), cfg.MetaDir(), cfg.TempDir()) - if err != nil { - log.Fatal(err) - } - app := &App{cfg: cfg, store: st} - - mux := http.NewServeMux() - // API routes - mux.HandleFunc("/healthz", app.health) - mux.HandleFunc("/v1/files", app.with(app.files)) - mux.HandleFunc("/v1/files/", app.with(app.fileByID)) // /v1/files/{id}[ /meta] - mux.HandleFunc("/v1/uploads", app.with(app.uploadsRoot)) // POST init - mux.HandleFunc("/v1/uploads/", app.with(app.uploadsByID)) // parts/complete/abort - - // UI routes (embedded) - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - if r.URL.Path != "/" { - http.NotFound(w, r) - return - } - http.ServeFileFS(w, r, uiFS, "ui/index.html") - }) - mux.Handle("/static/", http.StripPrefix("/static/", http.FileServerFS(uiFS))) - - srv := &http.Server{ - Addr: cfg.ListenAddr, - Handler: logMiddleware(securityHeaders(mux)), - ReadTimeout: 60 * time.Second, - ReadHeaderTimeout: 10 * time.Second, - WriteTimeout: 0, - IdleTimeout: 120 * time.Second, - } - - go func() { - log.Printf("file-service listening on %s", cfg.ListenAddr) - if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - log.Fatalf("server: %v", err) - } - }() - - stop := make(chan os.Signal, 1) - signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - <-stop - log.Println("shutting down...") - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - _ = srv.Shutdown(ctx) -} - -func (a *App) with(h func(http.ResponseWriter, *http.Request)) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - if a.cfg.APIKey != "" { - key := r.Header.Get("X-API-Key") - if subtle.ConstantTimeCompare([]byte(key), []byte(a.cfg.APIKey)) != 1 { - http.Error(w, "unauthorized", http.StatusUnauthorized) - return - } - } - h(w, r) - } -} - -func logMiddleware(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - start := time.Now() - next.ServeHTTP(w, r) - log.Printf("%s %s %s", r.Method, r.URL.Path, time.Since(start)) - }) -} - -func securityHeaders(next http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - // immer sinnvolle Sicherheits-Header - w.Header().Set("X-Content-Type-Options", "nosniff") - w.Header().Set("X-Frame-Options", "DENY") - w.Header().Set("Referrer-Policy", "no-referrer") - - // Für UI (/, /static/...) dürfen CSS/JS & XHR von "self" laden. - if r.URL.Path == "/" || strings.HasPrefix(r.URL.Path, "/static/") { - w.Header().Set("Content-Security-Policy", - "default-src 'self'; script-src 'self'; style-src 'self'; img-src 'self' data:; connect-src 'self'; object-src 'none'; base-uri 'self'; frame-ancestors 'none'") - } else { - // Für API schön streng - w.Header().Set("Content-Security-Policy", - "default-src 'none'; object-src 'none'; base-uri 'none'; frame-ancestors 'none'") - } - - next.ServeHTTP(w, r) - }) -} - -func (a *App) writeJSON(w http.ResponseWriter, status int, v any) { - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(status) - _ = json.NewEncoder(w).Encode(v) -} - -func (a *App) health(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(200) - _, _ = w.Write([]byte("ok")) -} - -// --- Routes --- -// /v1/files (GET list, POST upload) -func (a *App) files(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case http.MethodGet: - q := r.URL.Query().Get("q") - off := atoiDefault(r.URL.Query().Get("offset"), 0) - lim := atoiDefault(r.URL.Query().Get("limit"), 50) - items, next, err := a.store.List(r.Context(), q, off, lim) - if err != nil { - http.Error(w, err.Error(), 500) - return - } - a.writeJSON(w, 200, map[string]any{"items": items, "next": next}) - case http.MethodPost: - r.Body = http.MaxBytesReader(w, r.Body, 1<<34) // ~16GiB - ct := r.Header.Get("Content-Type") - name := r.Header.Get("X-Filename") - meta := r.URL.Query().Get("meta") - if strings.HasPrefix(ct, "multipart/") { - if err := r.ParseMultipartForm(32 << 20); err != nil { - http.Error(w, err.Error(), 400) - return - } - f, hdr, err := r.FormFile("file") - if err != nil { - http.Error(w, err.Error(), 400) - return - } - defer f.Close() - if hdr != nil { - name = hdr.Filename - } - rec, err := a.store.Put(r.Context(), f, name, meta) - if err != nil { - http.Error(w, err.Error(), 500) - return - } - a.writeJSON(w, 201, rec) - return - } - rec, err := a.store.Put(r.Context(), r.Body, name, meta) - if err != nil { - http.Error(w, err.Error(), 500) - return - } - a.writeJSON(w, 201, rec) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } -} - -// /v1/files/{id} and /v1/files/{id}/meta -func (a *App) fileByID(w http.ResponseWriter, r *http.Request) { - // path after /v1/files/ - rest := strings.TrimPrefix(r.URL.Path, "/v1/files/") - parts := strings.Split(rest, "/") - if len(parts) == 0 || parts[0] == "" { - http.NotFound(w, r) - return - } - id := parts[0] - - if len(parts) == 2 && parts[1] == "meta" { - switch r.Method { - case http.MethodGet: - rec, err := a.store.GetMeta(r.Context(), id) - if err != nil { - http.Error(w, err.Error(), 404) - return - } - a.writeJSON(w, 200, rec) - case http.MethodPut: - var m map[string]string - if err := json.NewDecoder(r.Body).Decode(&m); err != nil { - http.Error(w, err.Error(), 400) - return - } - rec, err := a.store.UpdateMeta(r.Context(), id, m) - if err != nil { - http.Error(w, err.Error(), 500) - return - } - a.writeJSON(w, 200, rec) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } - return - } - - // /v1/files/{id} - switch r.Method { - case http.MethodGet: - f, rec, err := a.store.Open(r.Context(), id) - if err != nil { - http.Error(w, err.Error(), 404) - return - } - defer f.Close() - ctype := rec.ContentType - if ctype == "" { - ctype = mime.TypeByExtension(filepath.Ext(rec.Name)) - } - if ctype == "" { - ctype = "application/octet-stream" - } - w.Header().Set("Content-Type", ctype) - w.Header().Set("Content-Length", strconv.FormatInt(rec.Size, 10)) - w.Header().Set("Accept-Ranges", "bytes") - if r.URL.Query().Get("download") == "1" { - w.Header().Set("Content-Disposition", "attachment; filename=\""+rec.SafeName()+"\"") - } - http.ServeContent(w, r, rec.SafeName(), rec.CreatedAt, f) - case http.MethodDelete: - if err := a.store.Delete(r.Context(), id); err != nil { - http.Error(w, err.Error(), 404) - return - } - w.WriteHeader(204) - default: - w.WriteHeader(http.StatusMethodNotAllowed) - } -} - -// /v1/uploads (POST) and /v1/uploads/{uid}/ ... -func (a *App) uploadsRoot(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodPost { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - name := r.URL.Query().Get("name") - meta := r.URL.Query().Get("meta") - u, err := a.store.UploadInit(r.Context(), name, meta) - if err != nil { - http.Error(w, err.Error(), 500) - return - } - a.writeJSON(w, 201, u) -} - -func (a *App) uploadsByID(w http.ResponseWriter, r *http.Request) { - rest := strings.TrimPrefix(r.URL.Path, "/v1/uploads/") - parts := strings.Split(rest, "/") - if len(parts) < 1 || parts[0] == "" { - http.NotFound(w, r) - return - } - uid := parts[0] - - if len(parts) == 3 && parts[1] == "parts" { - n := atoiDefault(parts[2], -1) - if r.Method != http.MethodPut || n < 1 { - http.Error(w, "invalid part", 400) - return - } - if err := a.store.UploadPart(r.Context(), uid, n, r.Body); err != nil { - http.Error(w, err.Error(), 400) - return - } - w.WriteHeader(204) - return - } - - if len(parts) == 2 && parts[1] == "complete" { - if r.Method != http.MethodPost { - w.WriteHeader(http.StatusMethodNotAllowed) - return - } - rec, err := a.store.UploadComplete(r.Context(), uid) - if err != nil { - http.Error(w, err.Error(), 400) - return - } - a.writeJSON(w, 201, rec) - return - } - - if len(parts) == 1 && r.Method == http.MethodDelete { - if err := a.store.UploadAbort(r.Context(), uid); err != nil { - http.Error(w, err.Error(), 400) - return - } - w.WriteHeader(204) - return - } - - http.NotFound(w, r) -} - -func atoiDefault(s string, d int) int { - n, err := strconv.Atoi(s) - if err != nil { - return d - } - return n -} diff --git a/cmd/filesvc/ui/app.js b/cmd/filesvc/ui/app.js deleted file mode 100644 index b1ba128..0000000 --- a/cmd/filesvc/ui/app.js +++ /dev/null @@ -1,132 +0,0 @@ -(function() { - const $ = sel => document.querySelector(sel); - const $$ = sel => Array.from(document.querySelectorAll(sel)); - const state = { offset: 0, limit: 20, total: null }; - - function loadCfg() { - try { return JSON.parse(localStorage.getItem('cfg')) || {}; } catch { return {}; } - } - function saveCfg(cfg) { localStorage.setItem('cfg', JSON.stringify(cfg)); } - const cfg = loadCfg(); - $('#apiKey').value = cfg.apiKey || ''; - $('#baseUrl').value = cfg.baseUrl || ''; - $('#saveCfg').onclick = () => { - cfg.apiKey = $('#apiKey').value.trim(); - cfg.baseUrl = $('#baseUrl').value.trim(); - saveCfg(cfg); - refresh(); - }; - - function api(path, opts = {}) { - const base = cfg.baseUrl || ''; - opts.headers = Object.assign({ 'X-API-Key': cfg.apiKey || '' }, opts.headers || {}); - return fetch(base + path, opts).then(r => { - if (!r.ok) throw new Error(`${r.status} ${r.statusText}`); - const ct = r.headers.get('content-type') || ''; - if (ct.includes('application/json')) return r.json(); - return r.text(); - }); - } - - async function refresh() { - const q = encodeURIComponent($('#q').value || ''); - try { - const data = await api(`/v1/files?limit=${state.limit}&offset=${state.offset}&q=${q}`); - renderTable(data.items || []); - const next = data.next || 0; - state.hasNext = next > 0; - state.nextOffset = next; - $('#pageInfo').textContent = `offset ${state.offset}`; - } catch (e) { - alert('List failed: ' + e.message); - } - } - - function renderTable(items) { - const tbody = $('#files tbody'); - tbody.innerHTML = ''; - const tpl = $('#rowTpl').content; - for (const it of items) { - const tr = tpl.cloneNode(true); - tr.querySelector('.id').textContent = it.id; - tr.querySelector('.name').textContent = it.name; - tr.querySelector('.size').textContent = human(it.size); - tr.querySelector('.created').textContent = new Date(it.createdAt).toLocaleString(); - const act = tr.querySelector('.actions'); - - const dl = btn('Download', async () => { - const base = cfg.baseUrl || ''; - const url = `${base}/v1/files/${it.id}?download=1`; - const a = document.createElement('a'); - a.href = url; a.download = ''; - a.click(); - }); - const meta = btn('Meta', async () => showMeta(it.id)); - const del = btn('Delete', async () => { - if (!confirm('Delete file?')) return; - try { await api(`/v1/files/${it.id}`, { method:'DELETE' }); refresh(); } catch(e){ alert('Delete failed: '+e.message); } - }); - act.append(dl, meta, del); - tbody.appendChild(tr); - } - } - - function btn(text, on) { const b = document.createElement('button'); b.textContent = text; b.onclick = on; return b; } - function human(n) { if (n < 1024) return n + ' B'; const u=['KB','MB','GB','TB']; let i=-1; do { n/=1024; i++; } while(n>=1024 && i { state.offset = 0; refresh(); }; - $('#q').addEventListener('keydown', e => { if (e.key==='Enter') { state.offset=0; refresh(); } }); - $('#prev').onclick = () => { state.offset = Math.max(0, state.offset - state.limit); refresh(); }; - $('#next').onclick = () => { if (state.hasNext) { state.offset = state.nextOffset; refresh(); } }; - - // Upload form - $('#uploadForm').addEventListener('submit', async (e) => { - e.preventDefault(); - const f = $('#fileInput').files[0]; - if (!f) return alert('Pick a file'); - const meta = $('#metaInput').value.trim(); - const fd = new FormData(); - fd.append('file', f); - fd.append('meta', meta); - try { await api('/v1/files?meta='+encodeURIComponent(meta), { method: 'POST', body: fd }); refresh(); } catch(e){ alert('Upload failed: '+e.message); } - }); - - // Chunked upload - $('#chunkInit').onclick = async () => { - try { - const name = $('#chunkName').value.trim() || 'file'; - const meta = $('#chunkMeta').value.trim(); - const r = await api(`/v1/uploads?name=${encodeURIComponent(name)}&meta=${encodeURIComponent(meta)}`, { method:'POST' }); - $('#chunkId').textContent = r.id; - } catch(e){ alert('Init failed: '+e.message); } - }; - $('#chunkPut').onclick = async () => { - const uid = $('#chunkId').textContent.trim(); - const part = parseInt($('#chunkPart').value,10) || 1; - const file = $('#chunkFile').files[0]; - if (!uid) return alert('Init first'); - if (!file) return alert('Choose a file (this will send the whole file as one part).'); - try { await api(`/v1/uploads/${uid}/parts/${part}`, { method:'PUT', body: file }); alert('Part uploaded'); } catch(e){ alert('PUT failed: '+e.message); } - }; - $('#chunkComplete').onclick = async () => { - const uid = $('#chunkId').textContent.trim(); if (!uid) return; - try { await api(`/v1/uploads/${uid}/complete`, { method:'POST' }); refresh(); } catch(e){ alert('Complete failed: '+e.message); } - }; - $('#chunkAbort').onclick = async () => { - const uid = $('#chunkId').textContent.trim(); if (!uid) return; - try { await api(`/v1/uploads/${uid}`, { method:'DELETE' }); $('#chunkId').textContent=''; alert('Aborted'); } catch(e){ alert('Abort failed: '+e.message); } - }; - - async function showMeta(id) { - try { - const rec = await api(`/v1/files/${id}/meta`); - const json = prompt('Edit meta as JSON (object of string:string)', JSON.stringify(rec.meta||{})); - if (json == null) return; - const obj = JSON.parse(json); - await api(`/v1/files/${id}/meta`, { method:'PUT', headers:{'Content-Type':'application/json'}, body: JSON.stringify(obj) }); - refresh(); - } catch(e){ alert('Meta failed: '+e.message); } - } - - refresh(); -})(); \ No newline at end of file diff --git a/cmd/filesvc/ui/index.html b/cmd/filesvc/ui/index.html deleted file mode 100644 index 8a6a289..0000000 --- a/cmd/filesvc/ui/index.html +++ /dev/null @@ -1,77 +0,0 @@ - - - - - - File Service UI - - - -
-

File Service

-
- - - -
-
- -
-
-

Upload

-
- - - -
-
- Chunked upload -
- - - - -
- - - - - -
-
-
-
- -
-

Files

-
- - -
- - - - - -
IDNameSizeCreatedActions
-
- - - -
-
-
- - - - - - \ No newline at end of file diff --git a/cmd/filesvc/ui/style.css b/cmd/filesvc/ui/style.css deleted file mode 100644 index 2c01782..0000000 --- a/cmd/filesvc/ui/style.css +++ /dev/null @@ -1,19 +0,0 @@ -:root { --bg: #0b0f14; --fg: #e6eef8; --muted: #9bb0c8; --card: #121923; --accent: #5aa9ff; } -* { box-sizing: border-box; } -body { margin: 0; font-family: ui-sans-serif, system-ui, -apple-system, Segoe UI, Roboto, sans-serif; background: var(--bg); color: var(--fg); } -header { display: flex; justify-content: space-between; align-items: center; padding: 16px 20px; background: #0e141b; border-bottom: 1px solid #2a3543; } -h1 { margin: 0; font-size: 20px; } -.cfg label { margin-right: 8px; font-size: 12px; color: var(--muted); } -.cfg input { margin-left: 6px; padding: 6px 8px; background: #0c1219; border: 1px solid #2a3543; color: var(--fg); border-radius: 6px; } -button { padding: 8px 12px; border: 1px solid #2a3543; background: #111a24; color: var(--fg); border-radius: 8px; cursor: pointer; } -button:hover { border-color: var(--accent); } -main { padding: 20px; max-width: 1100px; margin: 0 auto; } -.card { background: var(--card); border: 1px solid #1f2a38; border-radius: 14px; padding: 16px; margin-bottom: 16px; box-shadow: 0 6px 20px rgba(0,0,0,.25); } -.toolbar { display:flex; gap: 8px; align-items: center; margin-bottom: 10px; } -table { width: 100%; border-collapse: collapse; } -th, td { text-align: left; padding: 8px; border-bottom: 1px solid #213043; } -.mono { font-family: ui-monospace, SFMono-Regular, Menlo, Consolas, monospace; font-size: 12px; } -.name { max-width: 340px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap; } -.pager { display:flex; gap: 8px; align-items:center; justify-content:flex-end; padding-top: 8px; } -.actions button { margin-right: 6px; } -summary { cursor: pointer; } \ No newline at end of file diff --git a/cmd/unified/main.go b/cmd/unified/main.go new file mode 100644 index 0000000..f193d0e --- /dev/null +++ b/cmd/unified/main.go @@ -0,0 +1,398 @@ +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "os" + "os/signal" + "strconv" + "strings" + "syscall" + "time" + + // ← Passe diese Import-Pfade an dein go.mod an + "git.send.nrw/sendnrw/decent-webui/internal/admin" + "git.send.nrw/sendnrw/decent-webui/internal/filesvc" + "git.send.nrw/sendnrw/decent-webui/internal/mesh" +) + +/*** Config ***/ +func loadConfig() AppConfig { + // HTTP + httpAddr := getenvDefault("ADDR", ":8080") + + // API + apiKey := os.Getenv("FILE_SERVICE_API_KEY") + + // Admin UI (BasicAuth optional) + adminUser := os.Getenv("ADMIN_USER") + adminPass := os.Getenv("ADMIN_PASS") + + // Mesh (mit sinnvollen Defaults) + m := mesh.Config{ + BindAddr: getenvDefault("MESH_BIND", ":9090"), + AdvertURL: os.Getenv("MESH_ADVERT"), // kann leer sein → wir leiten ab + Seeds: splitCSV(os.Getenv("MESH_SEEDS")), + ClusterSecret: os.Getenv("MESH_CLUSTER_SECRET"), + EnableDiscovery: parseBoolEnv("MESH_ENABLE_DISCOVERY", false), + DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"), + } + + // Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung: + if strings.TrimSpace(m.AdvertURL) == "" { + m.AdvertURL = inferAdvertURL(m.BindAddr) + log.Printf("[mesh] MESH_ADVERT nicht gesetzt – abgeleitet: %s", m.AdvertURL) + } + + // Minimal-Validierung mit hilfreicher Meldung + if strings.TrimSpace(m.BindAddr) == "" { + log.Fatal("MESH_BIND fehlt (z.B. :9090)") + } + if strings.TrimSpace(m.AdvertURL) == "" { + log.Fatal("MESH_ADVERT fehlt und konnte nicht abgeleitet werden (z.B. http://unified_a:9090)") + } + if strings.TrimSpace(m.ClusterSecret) == "" { + log.Printf("[mesh] WARN: MESH_CLUSTER_SECRET ist leer – für produktive Netze unbedingt setzen!") + } + + return AppConfig{ + HTTPAddr: httpAddr, + APIKey: apiKey, + AdminUser: adminUser, + AdminPass: adminPass, + Mesh: m, + } +} + +// --- Helpers + +func getenvDefault(k, def string) string { + v := os.Getenv(k) + if v == "" { + return def + } + return v +} + +func parseBoolEnv(k string, def bool) bool { + v := strings.ToLower(strings.TrimSpace(os.Getenv(k))) + if v == "" { + return def + } + return v == "1" || v == "true" || v == "yes" || v == "on" +} + +func splitCSV(s string) []string { + s = strings.TrimSpace(s) + if s == "" { + return nil + } + parts := strings.Split(s, ",") + out := make([]string, 0, len(parts)) + seen := map[string]struct{}{} + for _, p := range parts { + p = strings.TrimSpace(p) + if p == "" { + continue + } + if _, ok := seen[p]; ok { + continue + } + seen[p] = struct{}{} + out = append(out, p) + } + return out +} + +// inferAdvertURL baut eine brauchbare Default-AdvertURL: +// - MESH_ADVERT_HOST (wenn gesetzt) → z.B. Service-Name aus Compose +// - sonst HOSTNAME +// - sonst "localhost" +// +// Port wird aus MESH_BIND entnommen (z.B. ":9090" → 9090) +func inferAdvertURL(meshBind string) string { + host := strings.TrimSpace(os.Getenv("MESH_ADVERT_HOST")) + if host == "" { + host = strings.TrimSpace(os.Getenv("HOSTNAME")) + } + if host == "" { + host = "localhost" + } + port := "9090" + if i := strings.LastIndex(meshBind, ":"); i != -1 && len(meshBind) > i+1 { + port = meshBind[i+1:] + } + return fmt.Sprintf("http://%s:%s", host, port) +} + +type AppConfig struct { + HTTPAddr string + APIKey string + AdminUser string + AdminPass string + Mesh mesh.Config +} + +/*** Middleware ***/ + +func authMiddleware(apiKey string, next http.Handler) http.Handler { + // Dev-Mode: ohne API-Key kein Auth-Zwang + if strings.TrimSpace(apiKey) == "" { + return next + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + got := r.Header.Get("Authorization") + if !strings.HasPrefix(got, "Bearer ") || strings.TrimPrefix(got, "Bearer ") != apiKey { + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + next.ServeHTTP(w, r) + }) +} + +func cors(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Passe nach Bedarf an (Origin-Whitelist etc.) + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type") + w.Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS") + if r.Method == http.MethodOptions { + w.WriteHeader(http.StatusNoContent) + return + } + next.ServeHTTP(w, r) + }) +} + +func accessLog(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + start := time.Now() + next.ServeHTTP(w, r) + log.Printf("%s %s %s", r.Method, r.URL.Path, time.Since(start)) + }) +} + +/*** HTTP helpers ***/ + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +/*** API-Routen ***/ + +func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore) { + // Health + mux.HandleFunc("/api/v1/health", func(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) + }) + + // List + Create + mux.HandleFunc("/api/v1/items", func(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case http.MethodGet: + nextStr := r.URL.Query().Get("next") + var next filesvc.ID + if nextStr != "" { + if n, err := strconv.ParseInt(nextStr, 10, 64); err == nil { + next = filesvc.ID(n) + } + } + items, nextOut, err := store.List(r.Context(), next, 100) + if err != nil { + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusOK, map[string]any{"items": items, "next": nextOut}) + case http.MethodPost: + var in struct { + Name string `json:"name"` + } + if err := json.NewDecoder(r.Body).Decode(&in); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid json"}) + return + } + it, err := store.Create(r.Context(), in.Name) + if err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusCreated, it) + default: + w.WriteHeader(http.StatusMethodNotAllowed) + } + }) + + // Rename + mux.HandleFunc("/api/v1/items/rename", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + var in struct { + ID filesvc.ID `json:"id"` + Name string `json:"name"` + } + if err := json.NewDecoder(r.Body).Decode(&in); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid json"}) + return + } + it, err := store.Rename(r.Context(), in.ID, in.Name) + if err != nil { + status := http.StatusBadRequest + if errors.Is(err, filesvc.ErrNotFound) { + status = http.StatusNotFound + } + writeJSON(w, status, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusOK, it) + }) + + // Delete + mux.HandleFunc("/api/v1/items/delete", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + var in struct { + ID filesvc.ID `json:"id"` + } + if err := json.NewDecoder(r.Body).Decode(&in); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid json"}) + return + } + it, err := store.Delete(r.Context(), in.ID) + if err != nil { + status := http.StatusBadRequest + if errors.Is(err, filesvc.ErrNotFound) { + status = http.StatusNotFound + } + writeJSON(w, status, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusOK, it) + }) +} + +/*** Mesh <-> Store Mapping (falls Typen getrennt sind) ***/ + +func toMeshSnapshot(s filesvc.Snapshot) mesh.Snapshot { + out := mesh.Snapshot{Items: make([]mesh.Item, 0, len(s.Items))} + for _, it := range s.Items { + out.Items = append(out.Items, mesh.Item{ + ID: int64(it.ID), + Name: it.Name, + UpdatedAt: it.UpdatedAt, + Deleted: it.Deleted, + }) + } + return out +} + +func fromMeshSnapshot(ms mesh.Snapshot) filesvc.Snapshot { + out := filesvc.Snapshot{Items: make([]filesvc.File, 0, len(ms.Items))} + for _, it := range ms.Items { + out.Items = append(out.Items, filesvc.File{ + ID: filesvc.ID(it.ID), + Name: it.Name, + UpdatedAt: it.UpdatedAt, + Deleted: it.Deleted, + }) + } + return out +} + +/*** main ***/ + +func main() { + cfg := loadConfig() + + // Domain-Store (mesh-fähig) + st := filesvc.NewMemStore() + + // Mesh starten + mcfg := mesh.FromEnv() + mnode, err := mesh.New(mcfg, mesh.Callbacks{ + GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) { + s, err := st.Snapshot(ctx) + if err != nil { + return mesh.Snapshot{}, err + } + return toMeshSnapshot(s), nil + }, + ApplyRemote: func(ctx context.Context, s mesh.Snapshot) error { + return st.ApplyRemote(ctx, fromMeshSnapshot(s)) + }, + }) + if err != nil { + log.Fatalf("mesh init: %v", err) + } + go func() { + log.Printf("[mesh] listening on %s advertise %s seeds=%v discovery=%v", + mcfg.BindAddr, mcfg.AdvertURL, mcfg.Seeds, mcfg.EnableDiscovery) + if err := mnode.Serve(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("mesh serve: %v", err) + } + }() + + // Root-Mux + root := http.NewServeMux() + + // API (Bearer-Auth) + apiMux := http.NewServeMux() + fileRoutes(apiMux, st) + root.Handle("/api/", authMiddleware(cfg.APIKey, apiMux)) + + // Admin-UI (optional BasicAuth via ADMIN_USER/ADMIN_PASS) + adminRoot := http.NewServeMux() + admin.Register(adminRoot, admin.Deps{Store: st, Mesh: mnode}) + adminUser := os.Getenv("ADMIN_USER") + adminPass := os.Getenv("ADMIN_PASS") + if strings.TrimSpace(adminUser) != "" { + wrapped := admin.BasicAuth(adminUser, adminPass, adminRoot) + root.Handle("/admin", wrapped) + root.Handle("/admin/", wrapped) + } else { + root.Handle("/admin", adminRoot) + root.Handle("/admin/", adminRoot) + } + + // Startseite → /admin + root.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + http.Redirect(w, r, "/admin", http.StatusFound) + }) + + // Finaler Handler-Stack + handler := cors(accessLog(root)) + + srv := &http.Server{ + Addr: cfg.HTTPAddr, + Handler: handler, + ReadHeaderTimeout: 5 * time.Second, + } + + // Graceful shutdown + go func() { + log.Printf("http listening on %s (api=/api/v1, admin=/admin)", cfg.Mesh.BindAddr) + if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Fatalf("http server: %v", err) + } + }() + + // OS-Signale abfangen + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) + <-stop + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _ = srv.Shutdown(ctx) + _ = mnode.Close(ctx) + log.Println("shutdown complete") +} diff --git a/internal/admin/admin.go b/internal/admin/admin.go new file mode 100644 index 0000000..470bf2e --- /dev/null +++ b/internal/admin/admin.go @@ -0,0 +1,146 @@ +package admin + +import ( + "context" + _ "embed" + "html/template" + "net/http" + "strconv" + "strings" + "time" + + "git.send.nrw/sendnrw/decent-webui/internal/filesvc" + "git.send.nrw/sendnrw/decent-webui/internal/mesh" +) + +/*** Templates einbetten ***/ + +//go:embed tpl/layout.html +var layoutHTML string + +//go:embed tpl/partials_items.html +var itemsPartialHTML string + +//go:embed tpl/partials_peers.html +var peersPartialHTML string + +var ( + tplLayout = template.Must(template.New("layout").Parse(layoutHTML)) + tplItems = template.Must(template.New("items").Funcs(template.FuncMap{ + "timeRFC3339": func(unixNano int64) string { + if unixNano == 0 { + return "" + } + return time.Unix(0, unixNano).UTC().Format(time.RFC3339) + }, + }).Parse(itemsPartialHTML)) + tplPeers = template.Must(template.New("peers").Parse(peersPartialHTML)) +) + +type Deps struct { + Store filesvc.MeshStore + Mesh *mesh.Node +} + +// Register hängt alle /admin Routen ein. +// Auth liegt optional VOR Register (BasicAuth-Middleware), siehe main.go. +func Register(mux *http.ServeMux, d Deps) { + // Dashboard + mux.HandleFunc("/admin", func(w http.ResponseWriter, r *http.Request) { + renderLayout(w, r, "Files", "/admin/items") + }) + + // Partials + mux.HandleFunc("/admin/items", func(w http.ResponseWriter, r *http.Request) { + // Liste rendern (Pagination optional via ?next=) + nextQ := strings.TrimSpace(r.URL.Query().Get("next")) + var nextID filesvc.ID + if nextQ != "" { + if n, err := strconv.ParseInt(nextQ, 10, 64); err == nil { + nextID = filesvc.ID(n) + } + } + items, nextOut, _ := d.Store.List(r.Context(), nextID, 100) + _ = tplItems.Execute(w, map[string]any{ + "Items": items, + "Next": nextOut, + }) + }) + + mux.HandleFunc("/admin/peers", func(w http.ResponseWriter, r *http.Request) { + peers := d.Mesh.PeerList() + _ = tplPeers.Execute(w, map[string]any{ + "Peers": peers, + "Now": time.Now(), + }) + }) + + // Actions (HTMX POSTs) + mux.HandleFunc("/admin/items/create", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + name := strings.TrimSpace(r.FormValue("name")) + if name != "" { + _, _ = d.Store.Create(r.Context(), name) + _ = d.Mesh.SyncNow(r.Context()) // prompt push (best effort) + } + // Nach Aktion Items partial zurückgeben (HTMX swap) + http.Redirect(w, r, "/admin/items", http.StatusSeeOther) + }) + + mux.HandleFunc("/admin/items/rename", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + idStr := r.FormValue("id") + newName := strings.TrimSpace(r.FormValue("name")) + if id, err := strconv.ParseInt(idStr, 10, 64); err == nil && newName != "" { + _, _ = d.Store.Rename(r.Context(), filesvc.ID(id), newName) + _ = d.Mesh.SyncNow(r.Context()) + } + http.Redirect(w, r, "/admin/items", http.StatusSeeOther) + }) + + mux.HandleFunc("/admin/items/delete", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.Error(w, "method not allowed", http.StatusMethodNotAllowed) + return + } + if id, err := strconv.ParseInt(r.FormValue("id"), 10, 64); err == nil { + _, _ = d.Store.Delete(r.Context(), filesvc.ID(id)) + _ = d.Mesh.SyncNow(r.Context()) + } + http.Redirect(w, r, "/admin/items", http.StatusSeeOther) + }) + + mux.HandleFunc("/admin/mesh/syncnow", func(w http.ResponseWriter, r *http.Request) { + _ = d.Mesh.SyncNow(context.Background()) + http.Redirect(w, r, "/admin/peers", http.StatusSeeOther) + }) +} + +func renderLayout(w http.ResponseWriter, _ *http.Request, active string, initial string) { + _ = tplLayout.Execute(w, map[string]any{ + "Active": active, + "Init": initial, // initialer HTMX Swap-Endpunkt + }) +} + +/*** Optional: einfache BasicAuth (siehe main.go) ***/ +func BasicAuth(user, pass string, next http.Handler) http.Handler { + if strings.TrimSpace(user) == "" { + return next // deaktiviert + } + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + u, p, ok := r.BasicAuth() + if !ok || u != user || p != pass { + w.Header().Set("WWW-Authenticate", `Basic realm="admin"`) + http.Error(w, "unauthorized", http.StatusUnauthorized) + return + } + next.ServeHTTP(w, r) + }) +} diff --git a/internal/admin/tpl/layout.html b/internal/admin/tpl/layout.html new file mode 100644 index 0000000..9a0861e --- /dev/null +++ b/internal/admin/tpl/layout.html @@ -0,0 +1,39 @@ + + + + + + Admin + + + + +
+

Unified Admin

+ +
+
Lade…
+
+
© Admin UI
+
+ + diff --git a/internal/admin/tpl/partials_items.html b/internal/admin/tpl/partials_items.html new file mode 100644 index 0000000..f97b1fe --- /dev/null +++ b/internal/admin/tpl/partials_items.html @@ -0,0 +1,57 @@ +
+
+
+ +
+ + +
+
+
+
+ +
+ + + + + + + + {{ range .Items }} + + + + + + + {{ else }} + + {{ end }} + +
IDNameUpdatedAktionen
{{ .ID }}{{ .Name }}{{ printf "%.19s" (timeRFC3339 .UpdatedAt) }} +
+ + + +
+
+ + +
+
Keine Dateien vorhanden.
+ + {{ if .Next }} +
+ + next={{ .Next }} +
+ {{ end }} +
diff --git a/internal/admin/tpl/partials_peers.html b/internal/admin/tpl/partials_peers.html new file mode 100644 index 0000000..b06e92c --- /dev/null +++ b/internal/admin/tpl/partials_peers.html @@ -0,0 +1,23 @@ +
+

Mesh Peers

+
+ +
+
+
+ + + + {{ range .Peers }} + + + + + + {{ else }} + + {{ end }} + +
URLSelfLast Seen
{{ .URL }}{{ if .Self }}✅{{ else }}—{{ end }}{{ .LastSeen }}
Keine Peers bekannt.
+
Stand: {{ .Now }}
+
diff --git a/internal/filesvc/memstore.go b/internal/filesvc/memstore.go new file mode 100644 index 0000000..5aad781 --- /dev/null +++ b/internal/filesvc/memstore.go @@ -0,0 +1,195 @@ +package filesvc + +import ( + "context" + "slices" + "strings" + "sync" + "time" +) + +type MemStore struct { + mu sync.Mutex + items map[ID]File + next ID + + // optionales Eventing + subs []chan ChangeEvent +} + +func NewMemStore() *MemStore { + return &MemStore{ + items: make(map[ID]File), + next: 1, + } +} + +/*** Store ***/ + +func (m *MemStore) Get(_ context.Context, id ID) (File, error) { + m.mu.Lock() + defer m.mu.Unlock() + it, ok := m.items[id] + if !ok || it.Deleted { + return File{}, ErrNotFound + } + return it, nil +} + +func (m *MemStore) List(_ context.Context, next ID, limit int) ([]File, ID, error) { + m.mu.Lock() + defer m.mu.Unlock() + if limit <= 0 || limit > 1000 { + limit = 100 + } + ids := make([]ID, 0, len(m.items)) + for id := range m.items { + ids = append(ids, id) + } + slices.Sort(ids) + + start := 0 + if next > 0 { + for i, id := range ids { + if id >= next { + start = i + break + } + } + } + end := start + limit + if end > len(ids) { + end = len(ids) + } + + out := make([]File, 0, end-start) + for _, id := range ids[start:end] { + it := m.items[id] + if !it.Deleted { + out = append(out, it) + } + } + var nextOut ID + if end < len(ids) { + nextOut = ids[end] + } + return out, nextOut, nil +} + +func (m *MemStore) Create(_ context.Context, name string) (File, error) { + name = strings.TrimSpace(name) + if name == "" { + return File{}, ErrBadInput + } + + m.mu.Lock() + defer m.mu.Unlock() + now := time.Now().UnixNano() + it := File{ID: m.next, Name: name, UpdatedAt: now} + m.items[it.ID] = it + m.next++ + m.emit(it) + return it, nil +} + +func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error) { + newName = strings.TrimSpace(newName) + if newName == "" { + return File{}, ErrBadInput + } + + m.mu.Lock() + defer m.mu.Unlock() + it, ok := m.items[id] + if !ok || it.Deleted { + return File{}, ErrNotFound + } + it.Name = newName + it.UpdatedAt = time.Now().UnixNano() + m.items[id] = it + m.emit(it) + return it, nil +} + +func (m *MemStore) Delete(_ context.Context, id ID) (File, error) { + m.mu.Lock() + defer m.mu.Unlock() + it, ok := m.items[id] + if !ok { + return File{}, ErrNotFound + } + if it.Deleted { + return it, nil + } + it.Deleted = true + it.UpdatedAt = time.Now().UnixNano() + m.items[id] = it + m.emit(it) + return it, nil +} + +/*** Replicable ***/ + +func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) { + m.mu.Lock() + defer m.mu.Unlock() + s := Snapshot{Items: make([]File, 0, len(m.items))} + for _, it := range m.items { + s.Items = append(s.Items, it) // inkl. Tombstones + } + return s, nil +} + +func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error { + m.mu.Lock() + defer m.mu.Unlock() + for _, ri := range s.Items { + li, ok := m.items[ri.ID] + if !ok || ri.UpdatedAt > li.UpdatedAt { + m.items[ri.ID] = ri + if ri.ID >= m.next { + m.next = ri.ID + 1 + } + m.emitLocked(ri) + } + } + return nil +} + +/*** Watchable (optional) ***/ + +func (m *MemStore) Watch(stop <-chan struct{}) <-chan ChangeEvent { + ch := make(chan ChangeEvent, 32) + + m.mu.Lock() + m.subs = append(m.subs, ch) + m.mu.Unlock() + + go func() { + <-stop + m.mu.Lock() + // entferne ch aus subs + for i, s := range m.subs { + if s == ch { + m.subs = append(m.subs[:i], m.subs[i+1:]...) + break + } + } + m.mu.Unlock() + close(ch) + }() + return ch +} + +func (m *MemStore) emit(it File) { + m.emitLocked(it) // mu wird im Aufrufer gehalten +} +func (m *MemStore) emitLocked(it File) { + ev := ChangeEvent{At: time.Now(), Item: it} + for _, s := range m.subs { + select { + case s <- ev: + default: /* drop wenn voll */ + } + } +} diff --git a/internal/filesvc/store.go b/internal/filesvc/store.go new file mode 100644 index 0000000..c55afa1 --- /dev/null +++ b/internal/filesvc/store.go @@ -0,0 +1,78 @@ +package filesvc + +import ( + "context" + "errors" + "time" +) + +/*** Domain ***/ + +type ID = int64 + +type File struct { + ID ID `json:"id"` + Name string `json:"name"` + // weitere Metadaten optional: Size, Hash, Owner, Tags, ... + UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW + Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete +} + +/*** Fehler ***/ + +var ( + ErrNotFound = errors.New("file not found") + ErrBadInput = errors.New("bad input") + ErrConflict = errors.New("conflict") + ErrForbidden = errors.New("forbidden") + ErrTransient = errors.New("transient") +) + +/*** Basis-API (lokal nutzbar) ***/ + +type Store interface { + // Lesen & Auflisten + Get(ctx context.Context, id ID) (File, error) + List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error) + + // Mutationen mit LWW-Semantik (UpdatedAt wird intern gesetzt, außer bei ApplyRemote) + Create(ctx context.Context, name string) (File, error) + Rename(ctx context.Context, id ID, newName string) (File, error) + Delete(ctx context.Context, id ID) (File, error) +} + +/*** Mesh-Replikation ***/ + +type Snapshot struct { + Items []File `json:"items"` +} + +type Replicable interface { + // Snapshot liefert den vollständigen aktuellen Stand (inkl. Tombstones). + Snapshot(ctx context.Context) (Snapshot, error) + // ApplyRemote wendet LWW an. next-ID wird dabei korrekt fortgeschrieben. + ApplyRemote(ctx context.Context, s Snapshot) error +} + +/*** Events (optional) ***/ + +// ChangeEvent kann genutzt werden, um proaktive Mesh-Pushes zu triggern. +// Bei deiner Pull-basierten Anti-Entropy ist es optional. +type ChangeEvent struct { + At time.Time + Item File +} + +// Watch gibt Änderungen aus; close(stop) beendet den Stream. +// Eine Noop-Implementierung ist erlaubt, wenn Pull-Sync genügt. +type Watchable interface { + Watch(stop <-chan struct{}) <-chan ChangeEvent +} + +/*** Kombiniertes Interface ***/ + +type MeshStore interface { + Store + Replicable + Watchable // optional – kann Noop sein +} diff --git a/internal/mesh/mesh.go b/internal/mesh/mesh.go new file mode 100644 index 0000000..c8049a6 --- /dev/null +++ b/internal/mesh/mesh.go @@ -0,0 +1,451 @@ +package mesh + +import ( + "bytes" + "context" + "crypto/hmac" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "hash/crc32" + "io" + "log" + "net" + "net/http" + "os" + "slices" + "strings" + "sync" + "time" +) + +/*** Types & Config ***/ + +type Config struct { + BindAddr string // e.g. ":9090" + AdvertURL string // e.g. "http://10.0.0.5:9090" + Seeds []string // other peers' mesh base URLs + ClusterSecret string // HMAC key + EnableDiscovery bool + DiscoveryAddress string // "239.8.8.8:9898" +} + +type Peer struct { + URL string `json:"url"` + LastSeen time.Time `json:"lastSeen"` + Self bool `json:"self"` + OwnerHint int `json:"ownerHint"` // optional +} + +type Item struct { + ID int64 `json:"id"` + Name string `json:"name"` + UpdatedAt int64 `json:"updatedAt"` + Deleted bool `json:"deleted"` // <— NEU: Tombstone für Deletes +} + +type Snapshot struct { + Items []Item `json:"items"` +} + +// Callbacks that your app provides +type Callbacks struct { + GetSnapshot func(ctx context.Context) (Snapshot, error) + ApplyRemote func(ctx context.Context, s Snapshot) error +} + +/*** Node ***/ + +type Node struct { + cfg Config + cbs Callbacks + self Peer + mu sync.RWMutex + peers map[string]*Peer + client *http.Client + srv *http.Server + stop chan struct{} + wg sync.WaitGroup +} + +func New(cfg Config, cbs Callbacks) (*Node, error) { + if cfg.BindAddr == "" || cfg.AdvertURL == "" { + return nil, errors.New("mesh: BindAddr and AdvertURL required") + } + if cfg.ClusterSecret == "" { + return nil, errors.New("mesh: ClusterSecret required") + } + n := &Node{ + cfg: cfg, + cbs: cbs, + self: Peer{URL: cfg.AdvertURL, LastSeen: time.Now(), Self: true}, + peers: make(map[string]*Peer), + client: &http.Client{ + Timeout: 5 * time.Second, + }, + stop: make(chan struct{}), + } + return n, nil +} + +/*** HMAC helpers ***/ + +func (n *Node) sign(b []byte) string { + m := hmac.New(sha256.New, []byte(n.cfg.ClusterSecret)) + m.Write(b) + return hex.EncodeToString(m.Sum(nil)) +} + +func (n *Node) verify(b []byte, sig string) bool { + want := n.sign(b) + return hmac.Equal([]byte(want), []byte(sig)) +} + +/*** HTTP handlers (control plane) ***/ + +func (n *Node) helloHandler(w http.ResponseWriter, r *http.Request) { + body, _ := io.ReadAll(r.Body) + if !n.verify(body, r.Header.Get("X-Mesh-Sig")) { + http.Error(w, "bad signature", http.StatusUnauthorized) + return + } + var p Peer + if err := json.Unmarshal(body, &p); err != nil { + http.Error(w, "bad json", http.StatusBadRequest) + return + } + p.LastSeen = time.Now() + n.mu.Lock() + if existing, ok := n.peers[p.URL]; ok { + existing.LastSeen = p.LastSeen + } else if p.URL != n.self.URL { + cp := p + n.peers[p.URL] = &cp + } + n.mu.Unlock() + w.WriteHeader(http.StatusNoContent) +} + +func (n *Node) peersHandler(w http.ResponseWriter, r *http.Request) { + n.mu.RLock() + defer n.mu.RUnlock() + var list []Peer + list = append(list, n.self) + for _, p := range n.peers { + list = append(list, *p) + } + writeJSON(w, http.StatusOK, list) +} + +func (n *Node) syncHandler(w http.ResponseWriter, r *http.Request) { + // verify signature + body, _ := io.ReadAll(r.Body) + if !n.verify(body, r.Header.Get("X-Mesh-Sig")) { + http.Error(w, "bad signature", http.StatusUnauthorized) + return + } + var s Snapshot + if err := json.Unmarshal(body, &s); err != nil { + http.Error(w, "bad json", http.StatusBadRequest) + return + } + // apply + if err := n.cbs.ApplyRemote(r.Context(), s); err != nil { + http.Error(w, "apply error: "+err.Error(), http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusNoContent) +} + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + _ = json.NewEncoder(w).Encode(v) +} + +/*** Serve ***/ + +func (n *Node) Serve() error { + mux := http.NewServeMux() + mux.HandleFunc("/mesh/peers", n.peersHandler) + mux.HandleFunc("/mesh/hello", n.helloHandler) + mux.HandleFunc("/mesh/sync", n.syncHandler) + n.srv = &http.Server{Addr: n.cfg.BindAddr, Handler: mux} + + // background loops + n.wg.Add(1) + go func() { + defer n.wg.Done() + n.loopSeeder() + }() + + if n.cfg.EnableDiscovery && n.cfg.DiscoveryAddress != "" { + n.wg.Add(2) + go func() { + defer n.wg.Done() + n.loopBeaconSend() + }() + go func() { + defer n.wg.Done() + n.loopBeaconRecv() + }() + } + + n.wg.Add(1) + go func() { + defer n.wg.Done() + n.loopAntiEntropy() + }() + + // http server + errc := make(chan error, 1) + go func() { + errc <- n.srv.ListenAndServe() + }() + select { + case err := <-errc: + return err + case <-n.stop: + return http.ErrServerClosed + } +} + +func (n *Node) Close(ctx context.Context) error { + close(n.stop) + if n.srv != nil { + _ = n.srv.Shutdown(ctx) + } + n.wg.Wait() + return nil +} + +/*** Loops ***/ + +func (n *Node) loopSeeder() { + // attempt to hello known seeds every 5s at start, then every 30s + backoff := 5 * time.Second + for { + select { + case <-n.stop: + return + case <-time.After(backoff): + } + if len(n.cfg.Seeds) == 0 { + backoff = 30 * time.Second + continue + } + for _, s := range n.cfg.Seeds { + if s == "" || s == n.self.URL { + continue + } + _ = n.sendHello(s) + } + backoff = 30 * time.Second + } +} + +func (n *Node) loopAntiEntropy() { + t := time.NewTicker(10 * time.Second) + defer t.Stop() + for { + select { + case <-n.stop: + return + case <-t.C: + n.mu.RLock() + targets := make([]string, 0, len(n.peers)) + for url := range n.peers { + targets = append(targets, url) + } + n.mu.RUnlock() + if len(targets) == 0 { + continue + } + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Second) + snap, err := n.cbs.GetSnapshot(ctx) + cancel() + if err != nil { + continue + } + for _, url := range targets { + _ = n.sendSync(url, snap) + } + } + } +} + +func (n *Node) loopBeaconSend() { + addr, err := net.ResolveUDPAddr("udp", n.cfg.DiscoveryAddress) + if err != nil { + log.Printf("mesh beacon send resolve: %v", err) + return + } + conn, err := net.DialUDP("udp", nil, addr) + if err != nil { + log.Printf("mesh beacon send dial: %v", err) + return + } + defer conn.Close() + type beacon struct { + URL string `json:"url"` + } + t := time.NewTicker(5 * time.Second) + defer t.Stop() + for { + select { + case <-n.stop: + return + case <-t.C: + b, _ := json.Marshal(beacon{URL: n.self.URL}) + _, _ = conn.Write(b) + } + } +} + +func (n *Node) loopBeaconRecv() { + addr, err := net.ResolveUDPAddr("udp", n.cfg.DiscoveryAddress) + if err != nil { + log.Printf("mesh beacon recv resolve: %v", err) + return + } + // enable multicast receive + l, err := net.ListenMulticastUDP("udp", nil, addr) + if err != nil { + log.Printf("mesh beacon recv listen: %v", err) + return + } + defer l.Close() + _ = l.SetReadBuffer(1 << 16) + + buf := make([]byte, 2048) + for { + select { + case <-n.stop: + return + default: + } + _ = l.SetDeadline(time.Now().Add(6 * time.Second)) + nr, _, err := l.ReadFromUDP(buf) + if err != nil { + if ne, ok := err.(net.Error); ok && ne.Timeout() { + continue + } + continue + } + var msg struct{ URL string } + if err := json.Unmarshal(buf[:nr], &msg); err == nil { + if msg.URL != "" && msg.URL != n.self.URL { + _ = n.sendHello(msg.URL) + } + } + } +} + +/*** Outgoing ***/ + +func (n *Node) sendHello(url string) error { + p := n.self + b, _ := json.Marshal(p) + req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/hello", bytes.NewReader(b)) + req.Header.Set("X-Mesh-Sig", n.sign(b)) + resp, err := n.client.Do(req) + if err == nil { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + return err +} + +func (n *Node) sendSync(url string, s Snapshot) error { + b, _ := json.Marshal(s) + req, _ := http.NewRequest(http.MethodPost, strings.TrimRight(url, "/")+"/mesh/sync", bytes.NewReader(b)) + req.Header.Set("X-Mesh-Sig", n.sign(b)) + resp, err := n.client.Do(req) + if err == nil { + io.Copy(io.Discard, resp.Body) + resp.Body.Close() + } + return err +} + +// PeerList liefert eine Kopie der bekannten Peers inkl. Self. +func (n *Node) PeerList() []Peer { + n.mu.RLock() + defer n.mu.RUnlock() + out := make([]Peer, 0, len(n.peers)+1) + out = append(out, n.self) + for _, p := range n.peers { + cp := *p + out = append(out, cp) + } + return out +} + +// SyncNow verschickt sofort den aktuellen Snapshot an alle bekannten Peers. +func (n *Node) SyncNow(ctx context.Context) error { + snap, err := n.cbs.GetSnapshot(ctx) + if err != nil { + return err + } + n.mu.RLock() + targets := make([]string, 0, len(n.peers)) + for url := range n.peers { + targets = append(targets, url) + } + n.mu.RUnlock() + for _, u := range targets { + _ = n.sendSync(u, snap) + } + return nil +} + +/*** Utilities ***/ + +// OwnerHint is a simple, optional mapping to distribute responsibility. +func OwnerHint(id int64, peers []string) int { + if len(peers) == 0 { + return 0 + } + h := crc32.ChecksumIEEE([]byte(string(rune(id)))) + return int(h % uint32(len(peers))) +} + +// Helpers to load from ENV quickly +func FromEnv() Config { + return Config{ + BindAddr: getenvDefault("MESH_BIND", ":9090"), + AdvertURL: os.Getenv("MESH_ADVERT"), + Seeds: splitCSV(os.Getenv("MESH_SEEDS")), + ClusterSecret: os.Getenv("MESH_CLUSTER_SECRET"), + EnableDiscovery: os.Getenv("MESH_ENABLE_DISCOVERY") == "true", + DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"), + } +} + +func splitCSV(s string) []string { + if strings.TrimSpace(s) == "" { + return nil + } + parts := strings.Split(s, ",") + for i := range parts { + parts[i] = strings.TrimSpace(parts[i]) + } + // dedup + out := make([]string, 0, len(parts)) + for _, p := range parts { + if p == "" || slices.Contains(out, p) { + continue + } + out = append(out, p) + } + return out +} + +func getenvDefault(k, def string) string { + v := os.Getenv(k) + if v == "" { + return def + } + return v +} diff --git a/internal/store/store.go b/internal/store/store.go deleted file mode 100644 index 7b11233..0000000 --- a/internal/store/store.go +++ /dev/null @@ -1,316 +0,0 @@ -package store - -import ( - "context" - "crypto/sha256" - "encoding/hex" - "encoding/json" - "errors" - "fmt" - "io" - "os" - "path/filepath" - "sort" - "strings" - "sync" - "time" -) - -type Store struct { - blobDir string - metaDir string - tmpDir string - mu sync.RWMutex -} - -type FileRecord struct { - ID string `json:"id"` - Name string `json:"name"` - Hash string `json:"hash"` - Size int64 `json:"size"` - Meta map[string]string `json:"meta,omitempty"` - CreatedAt time.Time `json:"createdAt"` - ContentType string `json:"contentType,omitempty"` -} - -func (fr FileRecord) SafeName() string { - n := strings.TrimSpace(fr.Name) - if n == "" { - return fr.ID - } - return n -} - -func Open(blobDir, metaDir, tmpDir string) (*Store, error) { - for _, p := range []string{blobDir, metaDir, tmpDir} { - if err := os.MkdirAll(p, 0o755); err != nil { - return nil, err - } - } - return &Store{blobDir: blobDir, metaDir: metaDir, tmpDir: tmpDir}, nil -} - -func (s *Store) Put(ctx context.Context, r io.Reader, name, metaStr string) (*FileRecord, error) { - if name == "" { - name = "file" - } - - tmp, err := os.CreateTemp(s.tmpDir, "upload-*") - if err != nil { - return nil, err - } - defer func() { tmp.Close(); os.Remove(tmp.Name()) }() - - h := sha256.New() - n, err := io.Copy(io.MultiWriter(tmp, h), r) - if err != nil { - return nil, err - } - hash := hex.EncodeToString(h.Sum(nil)) - - blobPath := filepath.Join(s.blobDir, hash) - if _, err := os.Stat(blobPath); errors.Is(err, os.ErrNotExist) { - if err := os.Rename(tmp.Name(), blobPath); err != nil { - return nil, err - } - } else { - _ = os.Remove(tmp.Name()) - } - - rec := &FileRecord{ - ID: newID(hash), - Name: name, - Hash: hash, - Size: n, - Meta: parseMeta(metaStr), - CreatedAt: time.Now().UTC(), - ContentType: "", // filled on GET via extension - } - - if err := s.writeMeta(rec); err != nil { - return nil, err - } - return rec, nil -} - -func (s *Store) Open(ctx context.Context, id string) (io.ReadSeekCloser, *FileRecord, error) { - rec, err := s.GetMeta(ctx, id) - if err != nil { - return nil, nil, err - } - f, err := os.Open(filepath.Join(s.blobDir, rec.Hash)) - if err != nil { - return nil, nil, err - } - return f, rec, nil -} - -func (s *Store) GetMeta(_ context.Context, id string) (*FileRecord, error) { - s.mu.RLock() - defer s.mu.RUnlock() - bb, err := os.ReadFile(filepath.Join(s.metaDir, id+".json")) - if err != nil { - return nil, err - } - var rec FileRecord - if err := json.Unmarshal(bb, &rec); err != nil { - return nil, err - } - return &rec, nil -} - -func (s *Store) UpdateMeta(_ context.Context, id string, meta map[string]string) (*FileRecord, error) { - s.mu.Lock() - defer s.mu.Unlock() - path := filepath.Join(s.metaDir, id+".json") - bb, err := os.ReadFile(path) - if err != nil { - return nil, err - } - var rec FileRecord - if err := json.Unmarshal(bb, &rec); err != nil { - return nil, err - } - if rec.Meta == nil { - rec.Meta = map[string]string{} - } - for k, v := range meta { - rec.Meta[k] = v - } - nb, _ := json.Marshal(&rec) - if err := os.WriteFile(path, nb, 0o600); err != nil { - return nil, err - } - return &rec, nil -} - -func (s *Store) Delete(_ context.Context, id string) error { - s.mu.Lock() - defer s.mu.Unlock() - // Only delete metadata; GC for unreferenced blobs is a separate task - return os.Remove(filepath.Join(s.metaDir, id+".json")) -} - -func (s *Store) List(_ context.Context, q string, offset, limit int) ([]*FileRecord, int, error) { - if limit <= 0 || limit > 200 { - limit = 50 - } - entries, err := os.ReadDir(s.metaDir) - if err != nil { - return nil, 0, err - } - var items []*FileRecord - for _, e := range entries { - if e.IsDir() || !strings.HasSuffix(e.Name(), ".json") { - continue - } - bb, err := os.ReadFile(filepath.Join(s.metaDir, e.Name())) - if err != nil { - continue - } - var rec FileRecord - if err := json.Unmarshal(bb, &rec); err != nil { - continue - } - if q == "" || strings.Contains(strings.ToLower(rec.Name), strings.ToLower(q)) { - items = append(items, &rec) - } - } - sort.Slice(items, func(i, j int) bool { return items[i].CreatedAt.After(items[j].CreatedAt) }) - end := offset + limit - if offset > len(items) { - return []*FileRecord{}, 0, nil - } - if end > len(items) { - end = len(items) - } - next := 0 - if end < len(items) { - next = end - } - return items[offset:end], next, nil -} - -// --- Chunked uploads --- - -type UploadSession struct { - ID string `json:"id"` - Name string `json:"name"` - Meta string `json:"meta"` - CreatedAt time.Time `json:"createdAt"` -} - -func (s *Store) UploadInit(_ context.Context, name, meta string) (*UploadSession, error) { - id := newID(fmt.Sprintf("sess-%d", time.Now().UnixNano())) - us := &UploadSession{ID: id, Name: name, Meta: meta, CreatedAt: time.Now().UTC()} - // session file marker - if err := os.WriteFile(filepath.Join(s.tmpDir, id+".session"), []byte(name+""+meta), 0o600); err != nil { - return nil, err - } - return us, nil -} - -func (s *Store) partPath(uid string, n int) string { - return filepath.Join(s.tmpDir, fmt.Sprintf("%s.part.%06d", uid, n)) -} - -func (s *Store) UploadPart(_ context.Context, uid string, n int, r io.Reader) error { - if _, err := os.Stat(filepath.Join(s.tmpDir, uid+".session")); err != nil { - return err - } - f, err := os.Create(s.partPath(uid, n)) - if err != nil { - return err - } - defer f.Close() - _, err = io.Copy(f, r) - return err -} - -func (s *Store) UploadComplete(ctx context.Context, uid string) (*FileRecord, error) { - if _, err := os.Stat(filepath.Join(s.tmpDir, uid+".session")); err != nil { - return nil, err - } - matches, _ := filepath.Glob(filepath.Join(s.tmpDir, uid+".part.*")) - if len(matches) == 0 { - return nil, errors.New("no parts uploaded") - } - sort.Strings(matches) - pr, pw := io.Pipe() - go func() { - for _, p := range matches { - f, err := os.Open(p) - if err != nil { - _ = pw.CloseWithError(err) - return - } - if _, err := io.Copy(pw, f); err != nil { - _ = pw.CloseWithError(err) - _ = f.Close() - return - } - _ = f.Close() - } - _ = pw.Close() - }() - // Read first line of session file for name/meta (simple format) - bb, _ := os.ReadFile(filepath.Join(s.tmpDir, uid+".session")) - lines := strings.SplitN(string(bb), "", 2) - name := "file" - meta := "" - if len(lines) >= 1 && strings.TrimSpace(lines[0]) != "" { - name = strings.TrimSpace(lines[0]) - } - if len(lines) == 2 { - meta = strings.TrimSpace(lines[1]) - } - - rec, err := s.Put(ctx, pr, name, meta) - if err != nil { - return nil, err - } - for _, p := range matches { - _ = os.Remove(p) - } - _ = os.Remove(filepath.Join(s.tmpDir, uid+".session")) - return rec, nil -} - -func (s *Store) UploadAbort(_ context.Context, uid string) error { - if _, err := os.Stat(filepath.Join(s.tmpDir, uid+".session")); err != nil { - return err - } - matches, _ := filepath.Glob(filepath.Join(s.tmpDir, uid+".part.*")) - for _, p := range matches { - _ = os.Remove(p) - } - return os.Remove(filepath.Join(s.tmpDir, uid+".session")) -} - -// --- helpers --- - -func (s *Store) writeMeta(rec *FileRecord) error { - s.mu.Lock() - defer s.mu.Unlock() - bb, _ := json.Marshal(rec) - return os.WriteFile(filepath.Join(s.metaDir, rec.ID+".json"), bb, 0o600) -} - -func newID(seed string) string { - h := sha256.Sum256([]byte(fmt.Sprintf("%s|%d", seed, time.Now().UnixNano()))) - return hex.EncodeToString(h[:16]) -} - -func parseMeta(s string) map[string]string { - if s == "" { - return nil - } - m := map[string]string{} - for _, kv := range strings.Split(s, ",") { - kvp := strings.SplitN(kv, "=", 2) - if len(kvp) == 2 { - m[strings.TrimSpace(kvp[0])] = strings.TrimSpace(kvp[1]) - } - } - return m -} -- 2.49.1