package main import ( "context" "encoding/json" "errors" "fmt" "io" "log" "net/http" "os" "os/signal" "path/filepath" "strconv" "strings" "syscall" "time" // ← Passe diese Import-Pfade an dein go.mod an "git.send.nrw/sendnrw/decent-webui/internal/admin" "git.send.nrw/sendnrw/decent-webui/internal/blobfs" "git.send.nrw/sendnrw/decent-webui/internal/filesvc" "git.send.nrw/sendnrw/decent-webui/internal/mesh" ) func parseDuration(s string, def time.Duration) time.Duration { d, err := time.ParseDuration(strings.TrimSpace(s)) if err != nil || d <= 0 { return def } return d } /*** Config ***/ func loadConfig() AppConfig { // HTTP httpAddr := getenvDefault("ADDR", ":8080") // API apiKey := os.Getenv("FILE_SERVICE_API_KEY") // Admin UI (BasicAuth optional) adminUser := os.Getenv("ADMIN_USER") adminPass := os.Getenv("ADMIN_PASS") // Mesh (mit sinnvollen Defaults) m := mesh.Config{ BindAddr: getenvDefault("MESH_BIND", ":9090"), AdvertURL: os.Getenv("MESH_ADVERT"), // kann leer sein → wir leiten ab Seeds: splitCSV(os.Getenv("MESH_SEEDS")), ClusterSecret: os.Getenv("MESH_CLUSTER_SECRET"), EnableDiscovery: parseBoolEnv("MESH_ENABLE_DISCOVERY", false), DiscoveryAddress: getenvDefault("MESH_DISCOVERY_ADDR", "239.8.8.8:9898"), } m.PeerTTL = parseDuration(os.Getenv("MESH_PEER_TTL"), 2*time.Minute) m.PruneInterval = parseDuration(os.Getenv("MESH_PRUNE_INTERVAL"), 30*time.Second) m.HelloInterval = parseDuration(os.Getenv("MESH_HELLO_INTERVAL"), 20*time.Second) m.HelloFanout = parseIntEnv(os.Getenv("MESH_HELLO_FANOUT"), 8) m.BlobTimeout = parseDuration(os.Getenv("MESH_BLOB_TIMEOUT"), 0) // Wenn keine AdvertURL gesetzt ist, versuche eine sinnvolle Herleitung: if strings.TrimSpace(m.AdvertURL) == "" { m.AdvertURL = inferAdvertURL(m.BindAddr) log.Printf("[mesh] MESH_ADVERT nicht gesetzt – abgeleitet: %s", m.AdvertURL) } // Minimal-Validierung mit hilfreicher Meldung if strings.TrimSpace(m.BindAddr) == "" { log.Fatal("MESH_BIND fehlt (z.B. :9090)") } if strings.TrimSpace(m.AdvertURL) == "" { log.Fatal("MESH_ADVERT fehlt und konnte nicht abgeleitet werden (z.B. http://unified_a:9090)") } if strings.TrimSpace(m.ClusterSecret) == "" { log.Printf("[mesh] WARN: MESH_CLUSTER_SECRET ist leer – für produktive Netze unbedingt setzen!") } return AppConfig{ HTTPAddr: httpAddr, APIKey: apiKey, AdminUser: adminUser, AdminPass: adminPass, Mesh: m, PublicDownloads: parseBoolEnv("PUBLIC_DOWNLOADS", false), PublicPath: getenvDefault("PUBLIC_DOWNLOAD_PATH", "/dl"), } } // --- Helpers func getenvDefault(k, def string) string { v := os.Getenv(k) if v == "" { return def } return v } func parseBoolEnv(k string, def bool) bool { v := strings.ToLower(strings.TrimSpace(os.Getenv(k))) if v == "" { return def } return v == "1" || v == "true" || v == "yes" || v == "on" } func parseIntEnv(k string, def int) int { v := strings.TrimSpace(os.Getenv(k)) if v == "" { return def } n, err := strconv.Atoi(v) if err != nil { return def } return n } func splitCSV(s string) []string { s = strings.TrimSpace(s) if s == "" { return nil } parts := strings.Split(s, ",") out := make([]string, 0, len(parts)) seen := map[string]struct{}{} for _, p := range parts { p = strings.TrimSpace(p) if p == "" { continue } if _, ok := seen[p]; ok { continue } seen[p] = struct{}{} out = append(out, p) } return out } // inferAdvertURL baut eine brauchbare Default-AdvertURL: // - MESH_ADVERT_HOST (wenn gesetzt) → z.B. Service-Name aus Compose // - sonst HOSTNAME // - sonst "localhost" // // Port wird aus MESH_BIND entnommen (z.B. ":9090" → 9090) func inferAdvertURL(meshBind string) string { host := strings.TrimSpace(os.Getenv("MESH_ADVERT_HOST")) if host == "" { host = strings.TrimSpace(os.Getenv("HOSTNAME")) } if host == "" { host = "localhost" } port := "9090" if i := strings.LastIndex(meshBind, ":"); i != -1 && len(meshBind) > i+1 { port = meshBind[i+1:] } return fmt.Sprintf("http://%s:%s", host, port) } type AppConfig struct { HTTPAddr string APIKey string AdminUser string AdminPass string Mesh mesh.Config PublicDownloads bool // ENV: PUBLIC_DOWNLOADS (default false) PublicPath string // ENV: PUBLIC_DOWNLOAD_PATH (default "/dl") } /*** Middleware ***/ func authMiddleware(apiKey string, next http.Handler) http.Handler { // Dev-Mode: ohne API-Key kein Auth-Zwang if strings.TrimSpace(apiKey) == "" { return next } return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { got := r.Header.Get("Authorization") if !strings.HasPrefix(got, "Bearer ") || strings.TrimPrefix(got, "Bearer ") != apiKey { http.Error(w, "unauthorized", http.StatusUnauthorized) return } next.ServeHTTP(w, r) }) } func cors(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // Passe nach Bedarf an (Origin-Whitelist etc.) w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type") w.Header().Set("Access-Control-Allow-Methods", "GET,POST,PUT,DELETE,OPTIONS") if r.Method == http.MethodOptions { w.WriteHeader(http.StatusNoContent) return } next.ServeHTTP(w, r) }) } func accessLog(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { start := time.Now() next.ServeHTTP(w, r) log.Printf("%s %s %s", r.Method, r.URL.Path, time.Since(start)) }) } /*** HTTP helpers ***/ func writeJSON(w http.ResponseWriter, code int, v any) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(code) _ = json.NewEncoder(w).Encode(v) } /*** API-Routen ***/ func fileRoutes(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store) { // Health mux.HandleFunc("/api/v1/health", func(w http.ResponseWriter, r *http.Request) { writeJSON(w, http.StatusOK, map[string]string{"status": "ok"}) }) // List + Create mux.HandleFunc("/api/v1/items", func(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: nextStr := strings.TrimSpace(r.URL.Query().Get("next")) next := filesvc.ID(nextStr) items, nextOut, err := store.List(r.Context(), next, 100) if err != nil { writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, map[string]any{"items": items, "next": nextOut}) case http.MethodPost: var in struct { Name string `json:"name"` } if err := json.NewDecoder(r.Body).Decode(&in); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid json"}) return } it, err := store.Create(r.Context(), in.Name) if err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusCreated, it) default: w.WriteHeader(http.StatusMethodNotAllowed) } }) // Rename mux.HandleFunc("/api/v1/items/rename", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } var in struct { ID filesvc.ID `json:"id"` Name string `json:"name"` } if err := json.NewDecoder(r.Body).Decode(&in); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid json"}) return } it, err := store.Rename(r.Context(), in.ID, in.Name) if err != nil { if errors.Is(err, filesvc.ErrForbidden) { writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"}) return } status := http.StatusBadRequest if errors.Is(err, filesvc.ErrNotFound) { status = http.StatusNotFound } writeJSON(w, status, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, it) }) // Delete mux.HandleFunc("/api/v1/items/delete", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } var in struct { ID filesvc.ID `json:"id"` } if err := json.NewDecoder(r.Body).Decode(&in); err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid json"}) return } it, err := store.Delete(r.Context(), in.ID) _ = blobs.Delete(r.Context(), string(in.ID)) if err != nil { if errors.Is(err, filesvc.ErrForbidden) { writeJSON(w, http.StatusForbidden, map[string]string{"error": "only owner may modify"}) return } status := http.StatusBadRequest if errors.Is(err, filesvc.ErrNotFound) { status = http.StatusNotFound } writeJSON(w, status, map[string]string{"error": err.Error()}) return } writeJSON(w, http.StatusOK, it) }) } // apiFiles wires upload/download endpoints func apiFiles(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, meshNode *mesh.Node) { // Multipart-Upload mux.HandleFunc("/api/v1/files/upload", func(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { w.WriteHeader(http.StatusMethodNotAllowed) return } if err := r.ParseMultipartForm(128 << 20); err != nil { // 128MB writeJSON(w, http.StatusBadRequest, map[string]string{"error": "bad form"}) return } fh, hdr, err := r.FormFile("file") if err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": "missing file"}) return } defer fh.Close() name := strings.TrimSpace(r.FormValue("name")) if name == "" { name = hdr.Filename } it, err := store.Create(r.Context(), name) if err != nil { writeJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()}) return } meta, err := blobs.Save(r.Context(), string(it.ID), name, fh) if err != nil { _, _ = store.Delete(r.Context(), it.ID) writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) return } _ = meshNode.SyncNow(r.Context()) writeJSON(w, http.StatusCreated, map[string]any{ "file": it, "blob": meta, }) }) // Download mux.HandleFunc("/api/v1/files/", func(w http.ResponseWriter, r *http.Request) { parts := strings.Split(strings.TrimPrefix(r.URL.Path, "/api/v1/files/"), "/") if len(parts) != 2 || parts[1] != "download" { http.NotFound(w, r) return } id := parts[0] if strings.TrimSpace(id) == "" { http.NotFound(w, r) return } // 1) Metadaten prüfen it, err := store.Get(r.Context(), filesvc.ID(id)) if err != nil || it.Deleted { http.NotFound(w, r) return } // 2) lokal if rc, meta, err := blobs.Open(r.Context(), id); err == nil { defer rc.Close() serveBlob(w, r, rc, meta, it.Name) return } // 3) remote holen & cachen it1, _ := store.Get(r.Context(), filesvc.ID(id)) peers := meshNode.PeerList() ttl := 2 * time.Minute if cfg := meshNode.Config(); cfg.PeerTTL > 0 { ttl = cfg.PeerTTL } if !isOwnerActive(it1.Owner, peers, ttl) { http.NotFound(w, r) return } rrc, _, _, _, err := meshNode.FetchBlobAny(r.Context(), id) if err != nil { http.NotFound(w, r) return } defer rrc.Close() if _, err := blobs.Save(r.Context(), id, it1.Name, rrc); err != nil { http.Error(w, "cache failed", http.StatusInternalServerError) return } // 4) lokal streamen lrc, meta, err := blobs.Open(r.Context(), id) if err != nil { http.Error(w, "open failed", http.StatusInternalServerError) return } defer lrc.Close() serveBlob(w, r, lrc, meta, it1.Name) }) } /*** Mesh <-> Store Mapping (falls Typen getrennt sind) ***/ func toMeshSnapshot(s filesvc.Snapshot) mesh.Snapshot { out := mesh.Snapshot{Items: make([]mesh.Item, 0, len(s.Items))} for _, it := range s.Items { out.Items = append(out.Items, mesh.Item{ ID: it.ID, Name: it.Name, UpdatedAt: it.UpdatedAt, Deleted: it.Deleted, Owner: it.Owner, }) } return out } func fromMeshSnapshot(ms mesh.Snapshot) filesvc.Snapshot { out := filesvc.Snapshot{Items: make([]filesvc.File, 0, len(ms.Items))} for _, it := range ms.Items { out.Items = append(out.Items, filesvc.File{ ID: it.ID, Name: it.Name, UpdatedAt: it.UpdatedAt, Deleted: it.Deleted, Owner: it.Owner, }) } return out } // isOwnerActive prüft, ob der Owner in der Peer-Liste als "aktiv" gilt. func isOwnerActive(owner string, peers []mesh.Peer, ttl time.Duration) bool { owner = strings.TrimSpace(owner) if owner == "" { return true } cutoff := time.Now().Add(-ttl) for _, p := range peers { if strings.TrimSpace(p.URL) != owner { continue } // Self ist per Definition aktiv if p.Self { return true } // ohne LastSeen: als inaktiv behandeln if p.LastSeen.IsZero() { return false } return p.LastSeen.After(cutoff) } // Owner ist nicht mal in der Liste: inaktiv return false } /*** main ***/ func main() { cfg := loadConfig() // Domain-Store (mesh-fähig) nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL) //st := filesvc.NewMemStore(nodeID) // Mesh starten //mcfg := mesh.FromEnv() blobs := blobfs.New(getenvDefault("DATA_DIR", "./data")) dataDir := getenvDefault("DATA_DIR", "./data") metaPath := filepath.Join(dataDir, "meta", "items.json") st := filesvc.NewMemStorePersistent(nodeID, metaPath) mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{ GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) { s, err := st.Snapshot(ctx) if err != nil { return mesh.Snapshot{}, err } return toMeshSnapshot(s), nil }, ApplyRemote: func(ctx context.Context, s mesh.Snapshot) error { return st.ApplyRemote(ctx, fromMeshSnapshot(s)) }, BlobOpen: func(ctx context.Context, id string) (io.ReadCloser, string, string, int64, error) { //5588 it, err := st.Get(ctx, filesvc.ID(id)) if err != nil || it.Deleted { return nil, "", "", 0, fmt.Errorf("not found") } rc, meta, err := blobs.Open(ctx, id) if err != nil { return nil, "", "", 0, err } return rc, it.Name, meta.ContentType, meta.Size, nil }, }) if err != nil { log.Fatalf("mesh init: %v", err) } // Hintergrund-Pruner starten mnode.StartPeerPruner() go func() { log.Printf("[mesh] listening on %s advertise %s seeds=%v discovery=%v", cfg.Mesh.BindAddr, cfg.Mesh.AdvertURL, cfg.Mesh.Seeds, cfg.Mesh.EnableDiscovery) if err := mnode.Serve(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("mesh serve: %v", err) } }() go func() { ticker := time.NewTicker(1 * time.Minute) defer ticker.Stop() for range ticker.C { // aktive Owner bestimmen peers := mnode.PeerList() ttl := 2 * time.Minute if cfg := mnode.Config(); cfg.PeerTTL > 0 { ttl = cfg.PeerTTL } cutoff := time.Now().Add(-ttl) active := map[string]bool{} for _, p := range peers { if p.Self { active[p.URL] = true continue } if !p.LastSeen.IsZero() && p.LastSeen.After(cutoff) { active[p.URL] = true } } // alle Items durchgehen; Blobs von Offline-Ownern löschen var next filesvc.ID for { items, nextOut, _ := st.List(context.Background(), next, 1000) for _, it := range items { if it.Owner == "" || active[it.Owner] { continue } _ = blobs.Delete(context.Background(), it.ID) } if nextOut == "" { break } next = nextOut } } }() // Root-Mux root := http.NewServeMux() // API (Bearer-Auth) //blobs := blobfs.New(getenvDefault("DATA_DIR", "./data")) apiMux := http.NewServeMux() fileRoutes(apiMux, st, blobs) apiFiles(apiMux, st, blobs, mnode) root.Handle("/api/", authMiddleware(cfg.APIKey, apiMux)) if cfg.PublicDownloads { registerPublicDownloads(root, st, blobs, mnode, cfg.PublicPath) } // Admin-UI (optional BasicAuth via ADMIN_USER/ADMIN_PASS) adminRoot := http.NewServeMux() admin.Register(adminRoot, admin.Deps{Store: st, Mesh: mnode, Blob: blobs}) adminUser := os.Getenv("ADMIN_USER") adminPass := os.Getenv("ADMIN_PASS") if strings.TrimSpace(adminUser) != "" { wrapped := admin.BasicAuth(adminUser, adminPass, adminRoot) root.Handle("/admin", wrapped) root.Handle("/admin/", wrapped) } else { root.Handle("/admin", adminRoot) root.Handle("/admin/", adminRoot) } // Startseite → /admin root.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { http.Redirect(w, r, "/admin", http.StatusFound) }) // Finaler Handler-Stack handler := cors(accessLog(root)) srv := &http.Server{ Addr: cfg.HTTPAddr, Handler: handler, ReadHeaderTimeout: 5 * time.Second, } // Graceful shutdown go func() { log.Printf("http listening on %s (api=/api/v1, admin=/admin)", cfg.HTTPAddr) log.Printf("mesh listening on %s", cfg.Mesh.BindAddr) if err := srv.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { log.Fatalf("http server: %v", err) } }() // OS-Signale abfangen stop := make(chan os.Signal, 1) signal.Notify(stop, syscall.SIGINT, syscall.SIGTERM) <-stop ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() _ = srv.Shutdown(ctx) _ = mnode.Close(ctx) log.Println("shutdown complete") } // Public: GET {base}/{id} // Beispiel: /dl/1 func registerPublicDownloads(mux *http.ServeMux, store filesvc.MeshStore, blobs blobfs.Store, meshNode *mesh.Node, base string) { if !strings.HasPrefix(base, "/") { base = "/" + base } mux.HandleFunc(base+"/", func(w http.ResponseWriter, r *http.Request) { id := strings.TrimSpace(strings.TrimPrefix(r.URL.Path, base+"/")) if id == "" { http.NotFound(w, r) return } // 1) Metadaten prüfen it, err := store.Get(r.Context(), filesvc.ID(id)) if err != nil || it.Deleted { http.NotFound(w, r) return } // 2) Lokal versuchen if rc, meta, err := blobs.Open(context.Background(), id); err == nil { defer rc.Close() serveBlob(w, r, rc, meta, it.Name) return } // (Optional) Owner-Online-Check — wenn du auch bei offline Ownern liefern willst, block auskommentieren { peers := meshNode.PeerList() ttl := 2 * time.Minute if !isOwnerActive(it.Owner, peers, ttl) { http.NotFound(w, r) return } } // 3) Aus Mesh holen — EIGENER Timeout-Kontext, NICHT r.Context() rrc, remoteName, _, _, err := meshNode.FetchBlobAny(context.Background(), id) if err != nil { http.NotFound(w, r) return } defer rrc.Close() filename := strings.TrimSpace(remoteName) if filename == "" { filename = it.Name } // 4) Lokal cachen — KEIN Request-Kontext, damit Save nicht abbricht if _, err := blobs.Save(context.Background(), id, filename, rrc); err != nil { log.Printf("[public] cache save failed id=%s name=%q: %v", id, filename, err) http.Error(w, "cache failed: "+err.Error(), http.StatusInternalServerError) return } // 5) Erneut lokal öffnen und streamen lrc, meta, err := blobs.Open(context.Background(), id) if err != nil { http.Error(w, "open failed: "+err.Error(), http.StatusInternalServerError) return } defer lrc.Close() serveBlob(w, r, lrc, meta, filename) }) } // Hilfsfunktion: setzt sinnvolle Header und streamt die Datei func serveBlob(w http.ResponseWriter, r *http.Request, rc io.ReadSeeker, meta blobfs.Meta, downloadName string) { if meta.SHA256 != "" { etag := `W/"` + meta.SHA256 + `"` if r.Header.Get("If-None-Match") == etag { w.WriteHeader(http.StatusNotModified) return } w.Header().Set("ETag", etag) } if meta.ContentType == "" { meta.ContentType = "application/octet-stream" } if downloadName == "" { downloadName = meta.Name } w.Header().Set("Content-Type", meta.ContentType) w.Header().Set("Content-Length", strconv.FormatInt(meta.Size, 10)) w.Header().Set("Content-Disposition", fmt.Sprintf(`attachment; filename="%s"`, downloadName)) w.Header().Set("Access-Control-Expose-Headers", "Content-Disposition") w.Header().Set("X-Robots-Tag", "noindex") _, _ = io.Copy(w, rc) }