package main import ( "context" "fmt" "html/template" "log" "mime" "net/http" "net/http/httputil" "net/url" "os" "path/filepath" "strings" "sync" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/httprate" "github.com/goccy/go-json" "git.send.nrw/sendnrw/nginx-stream-server/internal/mtx" ) var ( listen = env("LISTEN", ":8080") mtxAPI = env("MTX_API", "http://127.0.0.1:9997") mtxHLS = env("MTX_HLS", "http://127.0.0.1:8888") streamsCSV = os.Getenv("STREAMS") basicUser = os.Getenv("BASIC_AUTH_USER") basicPass = os.Getenv("BASIC_AUTH_PASS") // Root für Webassets (per ENV überschreibbar) webRoot = env("WEB_ROOT", "web") // Templates werden beim Start geladen tpls *template.Template ) // ----- SSE Broker ----- type sseClient chan []byte type sseBroker struct { mu sync.Mutex clients map[sseClient]struct{} } func newSSEBroker() *sseBroker { return &sseBroker{clients: make(map[sseClient]struct{})} } func (b *sseBroker) add(c sseClient) { b.mu.Lock() b.clients[c] = struct{}{} b.mu.Unlock() } func (b *sseBroker) del(c sseClient) { b.mu.Lock() delete(b.clients, c) b.mu.Unlock() } func (b *sseBroker) broadcast(msg []byte) { b.mu.Lock() for c := range b.clients { select { case c <- msg: default: /* slow client, drop */ } } b.mu.Unlock() } var ( streamsJSONCache []byte streamsJSONETag string streamsCacheMu sync.RWMutex sseB = newSSEBroker() ) func eq(a, b []byte) bool { if len(a) != len(b) { return false } for i := range a { if a[i] != b[i] { return false } } return true } type hlsMuxersListResp struct { Items []struct { Name string `json:"name"` Clients []interface{} `json:"clients"` // echte HLS-Viewer (HTTP-Clients) } `json:"items"` // manche Builds liefern "items" flach; das fangen wir ab: Name string `json:"name"` Clients []interface{} `json:"clients"` } func fetchHLSClientMap(ctx context.Context, base, user, pass string) (map[string]int, error) { u := strings.TrimRight(base, "/") + "/v3/hlsmuxers/list" req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) if err != nil { return nil, err } if user != "" || pass != "" { req.SetBasicAuth(user, pass) } resp, err := http.DefaultClient.Do(req) if err != nil { return nil, err } defer resp.Body.Close() // Wenn noch kein einziger HLS-Client existiert, liefert MediaMTX oft 200 mit leerer Liste // (oder 200 mit items=[]). 404 sollte hier nicht auftreten – falls doch, behandeln wir es als "leer". if resp.StatusCode == http.StatusNotFound { return map[string]int{}, nil } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("hlsmuxers/list: %s", resp.Status) } var out hlsMuxersListResp if err := json.NewDecoder(resp.Body).Decode(&out); err != nil { return nil, err } m := make(map[string]int) if len(out.Items) > 0 { for _, it := range out.Items { m[it.Name] = len(it.Clients) } return m, nil } // Fallback: flache Struktur (ein einzelner Muxer) if out.Name != "" { m[out.Name] = len(out.Clients) } return m, nil } type mtxHLSPayload struct { // v1.15.0 liefert einen Muxer mit Clients-Liste (Feldname kann "clients" heißen) Item struct { Name string `json:"name"` Clients []interface{} `json:"clients"` // Anzahl = echte HLS-Zuschauer } `json:"item"` // manche Builds liefern flach: Name string `json:"name"` Clients []interface{} `json:"clients"` } func hlsViewers(ctx context.Context, base, user, pass, name string) (int, error) { u := strings.TrimRight(base, "/") + "/v3/hlsmuxers/get/" + url.PathEscape(name) req, _ := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) if user != "" || pass != "" { req.SetBasicAuth(user, pass) } res, err := http.DefaultClient.Do(req) if err != nil { return 0, err } defer res.Body.Close() if res.StatusCode != http.StatusOK { return 0, fmt.Errorf("hlsmuxers/get %s: %s", name, res.Status) } var out mtxHLSPayload if err := json.NewDecoder(res.Body).Decode(&out); err != nil { return 0, err } if out.Item.Clients != nil { return len(out.Item.Clients), nil } return len(out.Clients), nil } func init() { _ = mime.AddExtensionType(".css", "text/css") _ = mime.AddExtensionType(".js", "application/javascript") _ = mime.AddExtensionType(".mjs", "application/javascript") _ = mime.AddExtensionType(".woff2", "font/woff2") _ = mime.AddExtensionType(".woff", "font/woff") } func env(k, def string) string { if v := os.Getenv(k); v != "" { return v } return def } func mustLoadTemplates() *template.Template { pattern := filepath.Join(webRoot, "templates", "*.html") t, err := template.ParseGlob(pattern) if err != nil { log.Fatalf("templates laden fehlgeschlagen (%s): %v", pattern, err) } return t } func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) tpls = mustLoadTemplates() r := chi.NewRouter() // Strenge CSP (alles lokal) r.Use(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { w.Header().Set("Content-Security-Policy", strings.Join([]string{ "default-src 'self'", "img-src 'self' data:", "style-src 'self'", "font-src 'self'", "script-src 'self'", "connect-src 'self'", "media-src 'self' blob:", // <— HIER blob: zulassen "worker-src 'self' blob:", }, "; "), ) w.Header().Set("X-Content-Type-Options", "nosniff") w.Header().Set("X-Frame-Options", "SAMEORIGIN") w.Header().Set("Referrer-Policy", "no-referrer") next.ServeHTTP(w, req) }) }) // Rate Limit nur auf REST, nicht auf SSE: r.Group(func(r chi.Router) { r.Use(httprate.LimitByIP(240, time.Minute)) // fertig r.Get("/api/streams", apiStreams) }) // SSE ohne Rate-Limit (1 Verbindung pro Browser): r.Get("/api/streams/events", sseHandler) // Optional Basic Auth für Seiten if basicUser != "" { creds := basicUser + ":" + basicPass r.Group(func(p chi.Router) { p.Use(func(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { a := strings.SplitN(r.Header.Get("Authorization"), " ", 2) if len(a) == 2 && a[0] == "Basic" && a[1] == basic(creds) { next.ServeHTTP(w, r) return } w.Header().Set("WWW-Authenticate", "Basic realm=Restricted") http.Error(w, "auth required", http.StatusUnauthorized) }) }) p.Get("/", pageIndex) p.Get("/{name}", pageStream) }) } else { r.Get("/", pageIndex) r.Get("/{name}", pageStream) } // /static → echtes Dateisystem staticDir := http.Dir(filepath.Join(webRoot, "static")) r.Handle("/static/*", http.StripPrefix("/static", http.FileServer(staticDir), ), ) // HLS-Reverse-Proxy up, _ := url.Parse(mtxHLS) proxy := httputil.NewSingleHostReverseProxy(up) proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) { http.Error(w, "upstream error", http.StatusBadGateway) } r.Handle("/hls/*", http.StripPrefix("/hls", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if strings.Contains(r.URL.Path, "..") { http.NotFound(w, r) return } proxy.ServeHTTP(w, r) }))) // Health r.Get("/health", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) }) go func() { t := time.NewTicker(1 * time.Second) defer t.Stop() for range t.C { // gleiche Logik wie in apiStreams, nur ohne Response ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) c := &mtx.Client{BaseURL: mtxAPI, User: os.Getenv("MTX_API_USER"), Pass: os.Getenv("MTX_API_PASS")} pl, err := c.Paths(ctx) cancel() if err != nil { // optional: bei Fehler nichts senden continue } allowed := map[string]bool{} if streamsCSV != "" { for _, s := range strings.Split(streamsCSV, ",") { allowed[strings.TrimSpace(s)] = true } } type item struct { Name string `json:"name"` Live bool `json:"live"` Viewers int `json:"viewers"` } out := struct { Items []item `json:"items"` }{} for _, p := range pl.Items { if len(allowed) > 0 && !allowed[p.Name] { continue } // echte HLS-Viewerzahlen einmalig holen (Batch) hlsMap := map[string]int{} { ctxH, cancelH := context.WithTimeout(context.Background(), 2*time.Second) tmp, err := fetchHLSClientMap(ctxH, mtxAPI, os.Getenv("MTX_API_USER"), os.Getenv("MTX_API_PASS")) cancelH() if err == nil && tmp != nil { hlsMap = tmp } else { log.Printf("warn:a: hlsmuxers/list: %v", err) } } viewers := 0 // 1) Echte HLS-Zuschauer aus der Batch-Map if v, ok := hlsMap[p.Name]; ok { viewers = v } // 2) Fallback: Nicht-HLS-Reader (oder wenn Map leer ist) // Bei HLS ist p.Viewers() meist 1 (der Muxer), aber das wollen wir nur nutzen, // wenn wir keinen HLS-Wert haben. if viewers == 0 { if v := p.Viewers(); v > 0 { viewers = v } } out.Items = append(out.Items, item{ Name: p.Name, Live: p.Live(), Viewers: viewers, }) } buf, _ := json.Marshal(out) streamsCacheMu.RLock() same := eq(buf, streamsJSONCache) streamsCacheMu.RUnlock() if same { continue } streamsCacheMu.Lock() streamsJSONCache = buf streamsJSONETag = fmt.Sprintf("%x", len(buf)) streamsCacheMu.Unlock() // an alle SSE-Clients senden sseB.broadcast(buf) } }() log.Printf("Dashboard listening on %s (API=%s HLS=%s, WEB_ROOT=%s)\n", listen, mtxAPI, mtxHLS, webRoot) if err := http.ListenAndServe(listen, r); err != nil { log.Fatal(err) } } func sseHandler(w http.ResponseWriter, r *http.Request) { // SSE-Header w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") // CORS brauchst du nicht, gleiche Origin // Flush unterstützen flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "stream unsupported", http.StatusInternalServerError) return } // Client-Kanal registrieren ch := make(sseClient, 8) sseB.add(ch) defer sseB.del(ch) // Beim Connect sofort den aktuellen Snapshot schicken (falls vorhanden) streamsCacheMu.RLock() snap := streamsJSONCache streamsCacheMu.RUnlock() if len(snap) > 0 { fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(snap)) flusher.Flush() } // Heartbeat (hält Proxies freundlich) hb := time.NewTicker(15 * time.Second) defer hb.Stop() // Abbruch, wenn Client trennt notify := r.Context().Done() for { select { case <-notify: return case msg := <-ch: // JSON als "update" senden fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(msg)) flusher.Flush() case <-hb.C: // Kommentar als Ping fmt.Fprintf(w, ": ping\n\n") flusher.Flush() } } } func apiStreams(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) defer cancel() c := &mtx.Client{BaseURL: mtxAPI, User: os.Getenv("MTX_API_USER"), Pass: os.Getenv("MTX_API_PASS")} pl, err := c.Paths(ctx) if err != nil { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusBadGateway) _ = json.NewEncoder(w).Encode(map[string]any{"error": err.Error()}) return } allowed := map[string]bool{} if streamsCSV != "" { for _, s := range strings.Split(streamsCSV, ",") { allowed[strings.TrimSpace(s)] = true } } type item struct { Name string `json:"name"` Live bool `json:"live"` Viewers int `json:"viewers"` } out := struct { Items []item `json:"items"` }{} for _, p := range pl.Items { if len(allowed) > 0 && !allowed[p.Name] { continue } // echte HLS-Viewerzahlen einmalig holen (Batch) hlsMap := map[string]int{} { ctxH, cancelH := context.WithTimeout(context.Background(), 2*time.Second) tmp, err := fetchHLSClientMap(ctxH, mtxAPI, os.Getenv("MTX_API_USER"), os.Getenv("MTX_API_PASS")) cancelH() if err == nil && tmp != nil { hlsMap = tmp } else { log.Printf("warn:b: hlsmuxers/list: %v", err) } } viewers := 0 // 1) Echte HLS-Zuschauer aus der Batch-Map if v, ok := hlsMap[p.Name]; ok { viewers = v } // 2) Fallback: Nicht-HLS-Reader (oder wenn Map leer ist) // Bei HLS ist p.Viewers() meist 1 (der Muxer), aber das wollen wir nur nutzen, // wenn wir keinen HLS-Wert haben. if viewers == 0 { if v := p.Viewers(); v > 0 { viewers = v } } out.Items = append(out.Items, item{ Name: p.Name, Live: p.Live(), Viewers: viewers, }) } w.Header().Set("Content-Type", "application/json") _ = json.NewEncoder(w).Encode(out) } func pageIndex(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/html; charset=utf-8") if err := tpls.ExecuteTemplate(w, "index.html", nil); err != nil { http.Error(w, err.Error(), 500) } } func pageStream(w http.ResponseWriter, r *http.Request) { name := chi.URLParam(r, "name") w.Header().Set("Content-Type", "text/html; charset=utf-8") if err := tpls.ExecuteTemplate(w, "stream.html", map[string]any{ "Name": name, "JSONName": fmt.Sprintf("%q", name), }); err != nil { http.Error(w, err.Error(), 500) } } // Basic helper bleibt, falls du Basic Auth nutzt func basic(creds string) string { const tbl = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/" b := []byte(creds) var out []byte for i := 0; i < len(b); i += 3 { var v uint32 n := 0 for j := 0; j < 3; j++ { v <<= 8 if i+j < len(b) { v |= uint32(b[i+j]) n++ } } for j := 0; j < 4; j++ { if j <= n { out = append(out, tbl[(v>>(18-6*uint(j)))&0x3f]) } else { out = append(out, '=') } } } return string(out) }