Files
decent-webui/cmd/unified/main.go
jbergner b7729e8b39
All checks were successful
release-tag / release-image (push) Successful in 1m52s
Test-1
2025-09-29 23:08:34 +02:00

708 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/http"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"time"
// ← Passe diese Import-Pfade an dein go.mod an
"git.send.nrw/sendnrw/decent-webui/internal/admin"
"git.send.nrw/sendnrw/decent-webui/internal/blobfs"
"git.send.nrw/sendnrw/decent-webui/internal/filesvc"
"git.send.nrw/sendnrw/decent-webui/internal/mesh"
)
func parseDuration(s string, def time.Duration) time.Duration {
d, err := time.ParseDuration(strings.TrimSpace(s))
if err != nil || d <= 0 {
return def
}
return d
}
/*** Config ***/
func loadConfig() AppConfig {
// HTTP
httpAddr := getenvDefault("ADDR", ":8080")
// API
apiKey := os.Getenv("FILE_SERVICE_API_KEY")
// Admin UI (BasicAuth optional)
adminUser := os.Getenv("ADMIN_USER")
adminPass := os.Getenv("ADMIN_PASS")
// Mesh (mit sinnvollen Defaults)
m := mesh.Config{
BindAddr: getenvDefault("MESH_BIND", ":9090"),
AdvertURL: os.Getenv("MESH_ADVERT"), // kann leer sein → wir leiten ab
Seeds: splitCSV(os.Getenv("MESH_SEEDS")),
ClusterSecret: os.Getenv("MESH_CLUSTER_SECRET"),
EnableDiscovery: parseBoolEnv("MESH_ENABLE_DISCOVERY", false),
DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"),
}
m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute)
m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second)
// Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung:
if strings.TrimSpace(m.AdvertURL) == "" {
m.AdvertURL = inferAdvertURL(m.BindAddr)
log.Printf("[mesh] MESH_ADVERT nicht gesetzt abgeleitet: %s", m.AdvertURL)
}
// Minimal-Validierung mit hilfreicher Meldung
if strings.TrimSpace(m.BindAddr) == "" {
log.Fatal("MESH_BIND fehlt (z.B. :9090)")
}
if strings.TrimSpace(m.AdvertURL) == "" {
log.Fatal("MESH_ADVERT fehlt und konnte nicht abgeleitet werden (z.B. http://unified_a:9090)")
}
if strings.TrimSpace(m.ClusterSecret) == "" {
log.Printf("[mesh] WARN: MESH_CLUSTER_SECRET ist leer für produktive Netze unbedingt setzen!")
}
return AppConfig{
HTTPAddr: httpAddr,
APIKey: apiKey,
AdminUser: adminUser,
AdminPass: adminPass,
Mesh: m,
PublicDownloads: parseBoolEnv("PUBLIC_DOWNLOADS", false),
PublicPath: getenvDefault("PUBLIC_DOWNLOAD_PATH", "/dl"),
}
}
// --- Helpers
func getenvDefault(k, def string) string {
v := os.Getenv(k)
if v == "" {
return def
}
return v
}
func parseBoolEnv(k string, def bool) bool {
v := strings.ToLower(strings.TrimSpace(os.Getenv(k)))
if v == "" {
return def
}
return v == "1" || v == "true" || v == "yes" || v == "on"
}
func splitCSV(s string) []string {
s = strings.TrimSpace(s)
if s == "" {
return nil
}
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
seen := map[string]struct{}{}
for _, p := range parts {
p = strings.TrimSpace(p)
if p == "" {
continue
}
if _, ok := seen[p]; ok {
continue
}
seen[p] = struct{}{}
out = append(out, p)
}
return out
}
// inferAdvertURL baut eine brauchbare Default-AdvertURL:
// - MESH_ADVERT_HOST (wenn gesetzt) → z.B. Service-Name aus Compose
// - sonst HOSTNAME
// - sonst "localhost"
//
// Port wird aus MESH_BIND entnommen (z.B. ":9090" → 9090)
func inferAdvertURL(meshBind string) string {
host := strings.TrimSpace(os.Getenv("MESH_ADVERT_HOST"))
if host == "" {
host = strings.TrimSpace(os.Getenv("HOSTNAME"))
}
if host == "" {
host = "localhost"
}
port := "9090"
if i := strings.LastIndex(meshBind, ":"); i != -1 && len(meshBind) > i+1 {
port = meshBind[i+1:]
}
return fmt.Sprintf("http://%s:%s", host, port)
}
type AppConfig struct {
HTTPAddr string
APIKey string
AdminUser string
AdminPass string
Mesh mesh.Config
PublicDownloads bool // ENV: PUBLIC_DOWNLOADS (default false)
PublicPath string // ENV: PUBLIC_DOWNLOAD_PATH (default "/dl")
}
/*** Middleware ***/
func authMiddleware(apiKey string, next http.Handler) http.Handler {
// Dev-Mode: ohne API-Key kein Auth-Zwang
if strings.TrimSpace(apiKey) == "" {
return next
}
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
got := r.Header.Get("Authorization")
if !strings.HasPrefix(got, "Bearer ") || strings.TrimPrefix(got, "Bearer ") != apiKey {
http.Error(w, "unauthorized", http.StatusUnauthorized)
return
}
next.ServeHTTP(w, r)
})
}
func cors(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// Passe nach Bedarf an (Origin-Whitelist etc.)
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type")
w.Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS")
if r.Method == http.MethodOptions {
w.WriteHeader(http.StatusNoContent)
return
}
next.ServeHTTP(w, r)
})
}
func accessLog(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
next.ServeHTTP(w, r)
log.Printf("%s %s %s", r.Method, r.URL.Path, time.Since(start))
})
}
/*** HTTP helpers ***/
func writeJSON(w http.ResponseWriter, code int, v any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(code)
_ = json.NewEncoder(w).Encode(v)
}
/*** API-Routen ***/
func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store) {
// Health
mux.HandleFunc("/api/v1/health", func(w http.ResponseWriter, r *http.Request) {
writeJSON(w, http.StatusOK, map[string]string{"status": "ok"})
})
// List + Create
mux.HandleFunc("/api/v1/items", func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
nextStr := strings.TrimSpace(r.URL.Query().Get("next"))
next := filesvc.ID(nextStr)
items, nextOut, err := store.List(r.Context(), next, 100)
if err != nil {
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusOK, map[string]any{"items": items, "next": nextOut})
case http.MethodPost:
var in struct {
Name string `json:"name"`
}
if err := json.NewDecoder(r.Body).Decode(&in); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid json"})
return
}
it, err := store.Create(r.Context(), in.Name)
if err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusCreated, it)
default:
w.WriteHeader(http.StatusMethodNotAllowed)
}
})
// Rename
mux.HandleFunc("/api/v1/items/rename", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var in struct {
ID filesvc.ID `json:"id"`
Name string `json:"name"`
}
if err := json.NewDecoder(r.Body).Decode(&in); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid json"})
return
}
it, err := store.Rename(r.Context(), in.ID, in.Name)
if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound
}
writeJSON(w, status, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusOK, it)
})
// Delete
mux.HandleFunc("/api/v1/items/delete", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
var in struct {
ID filesvc.ID `json:"id"`
}
if err := json.NewDecoder(r.Body).Decode(&in); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid json"})
return
}
it, err := store.Delete(r.Context(), in.ID)
_ = blobs.Delete(r.Context(), string(in.ID))
if err != nil {
if errors.Is(err, filesvc.ErrForbidden) {
writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"})
return
}
status := http.StatusBadRequest
if errors.Is(err, filesvc.ErrNotFound) {
status = http.StatusNotFound
}
writeJSON(w, status, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusOK, it)
})
}
// apiFiles wires upload/download endpoints
func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, meshNode *mesh.Node) {
// Multipart-Upload
mux.HandleFunc("/api/v1/files/upload", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
w.WriteHeader(http.StatusMethodNotAllowed)
return
}
if err := r.ParseMultipartForm(128 << 20); err != nil { // 128MB
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "bad form"})
return
}
fh, hdr, err := r.FormFile("file")
if err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing file"})
return
}
defer fh.Close()
name := strings.TrimSpace(r.FormValue("name"))
if name == "" {
name = hdr.Filename
}
it, err := store.Create(r.Context(), name)
if err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
return
}
meta, err := blobs.Save(r.Context(), string(it.ID), name, fh)
if err != nil {
_, _ = store.Delete(r.Context(), it.ID)
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
_ = meshNode.SyncNow(r.Context())
writeJSON(w, http.StatusCreated, map[string]any{
"file": it,
"blob": meta,
})
})
// Download
mux.HandleFunc("/api/v1/files/", func(w http.ResponseWriter, r *http.Request) {
parts := strings.Split(strings.TrimPrefix(r.URL.Path, "/api/v1/files/"), "/")
if len(parts) != 2 || parts[1] != "download" {
http.NotFound(w, r)
return
}
id := parts[0]
if strings.TrimSpace(id) == "" {
http.NotFound(w, r)
return
}
// 1) Metadaten prüfen
it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted {
http.NotFound(w, r)
return
}
// 2) lokal
if rc, meta, err := blobs.Open(r.Context(), id); err == nil {
defer rc.Close()
serveBlob(w, r, rc, meta, it.Name)
return
}
// 3) remote holen & cachen
it1, _ := store.Get(r.Context(), filesvc.ID(id))
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if cfg := meshNode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
if !isOwnerActive(it1.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id)
if err != nil {
http.NotFound(w, r)
return
}
defer rrc.Close()
if _, err := blobs.Save(r.Context(), id, it1.Name, rrc); err != nil {
http.Error(w, "cache failed", http.StatusInternalServerError)
return
}
// 4) lokal streamen
lrc, meta, err := blobs.Open(r.Context(), id)
if err != nil {
http.Error(w, "open failed", http.StatusInternalServerError)
return
}
defer lrc.Close()
serveBlob(w, r, lrc, meta, it1.Name)
})
}
/*** Mesh <-> Store Mapping (falls Typen getrennt sind) ***/
func toMeshSnapshot(s filesvc.Snapshot) mesh.Snapshot {
out := mesh.Snapshot{Items: make([]mesh.Item, 0, len(s.Items))}
for _, it := range s.Items {
out.Items = append(out.Items, mesh.Item{
ID: it.ID,
Name: it.Name,
UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted,
Owner: it.Owner,
})
}
return out
}
func fromMeshSnapshot(ms mesh.Snapshot) filesvc.Snapshot {
out := filesvc.Snapshot{Items: make([]filesvc.File, 0, len(ms.Items))}
for _, it := range ms.Items {
out.Items = append(out.Items, filesvc.File{
ID: it.ID,
Name: it.Name,
UpdatedAt: it.UpdatedAt,
Deleted: it.Deleted,
Owner: it.Owner,
})
}
return out
}
// isOwnerActive prüft, ob der Owner in der Peer-Liste als "aktiv" gilt.
func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool {
owner = strings.TrimSpace(owner)
if owner == "" {
return true
}
cutoff := time.Now().Add(-ttl)
for _, p := range peers {
if strings.TrimSpace(p.URL) != owner {
continue
}
// Self ist per Definition aktiv
if p.Self {
return true
}
// ohne LastSeen: als inaktiv behandeln
if p.LastSeen.IsZero() {
return false
}
return p.LastSeen.After(cutoff)
}
// Owner ist nicht mal in der Liste: inaktiv
return false
}
/*** main ***/
func main() {
cfg := loadConfig()
// Domain-Store (mesh-fähig)
nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL)
st := filesvc.NewMemStore(nodeID)
// Mesh starten
//mcfg := mesh.FromEnv()
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
s, err := st.Snapshot(ctx)
if err != nil {
return mesh.Snapshot{}, err
}
return toMeshSnapshot(s), nil
},
ApplyRemote: func(ctx context.Context, s mesh.Snapshot) error {
return st.ApplyRemote(ctx, fromMeshSnapshot(s))
},
BlobOpen: func(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) { //5588
it, err := st.Get(ctx, filesvc.ID(id))
if err != nil || it.Deleted {
return nil, "", "", 0, fmt.Errorf("not found")
}
rc, meta, err := blobs.Open(ctx, id)
if err != nil {
return nil, "", "", 0, err
}
return rc, it.Name, meta.ContentType, meta.Size, nil
},
})
if err != nil {
log.Fatalf("mesh init: %v", err)
}
// Hintergrund-Pruner starten
mnode.StartPeerPruner()
go func() {
log.Printf("[mesh] listening on %s advertise %s seeds=%v discovery=%v",
cfg.Mesh.BindAddr, cfg.Mesh.AdvertURL, cfg.Mesh.Seeds, cfg.Mesh.EnableDiscovery)
if err := mnode.Serve(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("mesh serve: %v", err)
}
}()
go func() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for range ticker.C {
// aktive Owner bestimmen
peers := mnode.PeerList()
ttl := 2 * time.Minute
if cfg := mnode.Config(); cfg.PeerTTL > 0 {
ttl = cfg.PeerTTL
}
cutoff := time.Now().Add(-ttl)
active := map[string]bool{}
for _, p := range peers {
if p.Self {
active[p.URL] = true
continue
}
if !p.LastSeen.IsZero() && p.LastSeen.After(cutoff) {
active[p.URL] = true
}
}
// alle Items durchgehen; Blobs von Offline-Ownern löschen
var next filesvc.ID
for {
items, nextOut, _ := st.List(context.Background(), next, 1000)
for _, it := range items {
if it.Owner == "" || active[it.Owner] {
continue
}
_ = blobs.Delete(context.Background(), it.ID)
}
if nextOut == "" {
break
}
next = nextOut
}
}
}()
// Root-Mux
root := http.NewServeMux()
// API (Bearer-Auth)
//blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
apiMux := http.NewServeMux()
fileRoutes(apiMux, st, blobs)
apiFiles(apiMux, st, blobs, mnode)
root.Handle("/api/", authMiddleware(cfg.APIKey, apiMux))
if cfg.PublicDownloads {
registerPublicDownloads(root, st, blobs, mnode, cfg.PublicPath)
}
// Admin-UI (optional BasicAuth via ADMIN_USER/ADMIN_PASS)
adminRoot := http.NewServeMux()
admin.Register(adminRoot, admin.Deps{Store: st, Mesh: mnode, Blob: blobs})
adminUser := os.Getenv("ADMIN_USER")
adminPass := os.Getenv("ADMIN_PASS")
if strings.TrimSpace(adminUser) != "" {
wrapped := admin.BasicAuth(adminUser, adminPass, adminRoot)
root.Handle("/admin", wrapped)
root.Handle("/admin/", wrapped)
} else {
root.Handle("/admin", adminRoot)
root.Handle("/admin/", adminRoot)
}
// Startseite → /admin
root.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.Redirect(w, r, "/admin", http.StatusFound)
})
// Finaler Handler-Stack
handler := cors(accessLog(root))
srv := &http.Server{
Addr: cfg.HTTPAddr,
Handler: handler,
ReadHeaderTimeout: 5 * time.Second,
}
// Graceful shutdown
go func() {
log.Printf("http listening on %s (api=/api/v1, admin=/admin)", cfg.HTTPAddr)
log.Printf("mesh listening on %s", cfg.Mesh.BindAddr)
if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Fatalf("http server: %v", err)
}
}()
// OS-Signale abfangen
stop := make(chan os.Signal, 1)
signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM)
<-stop
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_ = srv.Shutdown(ctx)
_ = mnode.Close(ctx)
log.Println("shutdown complete")
}
// Public: GET {base}/{id}
// Beispiel: /dl/1
func registerPublicDownloads(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, meshNode *mesh.Node, base string) {
if !strings.HasPrefix(base, "/") {
base = "/" + base
}
mux.HandleFunc(base+"/", func(w http.ResponseWriter, r *http.Request) {
idStr := strings.TrimPrefix(r.URL.Path, base+"/")
if idStr == "" {
http.NotFound(w, r)
return
}
id := idStr
if strings.TrimSpace(id) == "" {
http.NotFound(w, r)
return
}
// 1) Metadaten prüfen (nicht vorhanden oder gelöscht → 404)
it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted {
http.NotFound(w, r)
return
}
// 2) lokal versuchen
if rc, meta, err := blobs.Open(r.Context(), id); err == nil {
defer rc.Close()
serveBlob(w, r, rc, meta, it.Name) // Download-Name aus Store!
return
} else { // ########## HIER EVENTUELL PROBLEM!!! ###########
// Lokal nicht vorhanden → nur aus Mesh ziehen, wenn Owner aktiv ist
it, err := store.Get(r.Context(), filesvc.ID(id))
if err != nil || it.Deleted {
http.NotFound(w, r)
return
}
peers := meshNode.PeerList()
ttl := 2 * time.Minute
if !isOwnerActive(it.Owner, peers, ttl) {
http.NotFound(w, r)
return
}
}
// 3) aus Mesh holen (signiert) und cachen
rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id)
if err != nil {
http.NotFound(w, r)
return
}
defer rrc.Close()
if _, err := blobs.Save(r.Context(), id, it.Name, rrc); err != nil {
http.Error(w, "cache failed", http.StatusInternalServerError)
return
}
// 4) erneut lokal öffnen und streamen
lrc, meta, err := blobs.Open(r.Context(), id)
if err != nil {
http.Error(w, "open failed", http.StatusInternalServerError)
return
}
defer lrc.Close()
serveBlob(w, r, lrc, meta, it.Name)
})
}
// Hilfsfunktion: setzt sinnvolle Header und streamt die Datei
func serveBlob(w http.ResponseWriter, r *http.Request, rc io.ReadSeeker, meta blobfs.Meta, downloadName string) {
if meta.SHA256 != "" {
etag := `W/"` + meta.SHA256 + `"`
if r.Header.Get("If-None-Match") == etag {
w.WriteHeader(http.StatusNotModified)
return
}
w.Header().Set("ETag", etag)
}
if meta.ContentType == "" {
meta.ContentType = "application/octet-stream"
}
if downloadName == "" {
downloadName = meta.Name
}
w.Header().Set("Content-Type", meta.ContentType)
w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10))
w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, downloadName))
w.Header().Set("Access-Control-Expose-Headers", "Content-Disposition")
w.Header().Set("X-Robots-Tag", "noindex")
_, _ = io.Copy(w, rc)
}