Files
jbergner 787b7a0555
All checks were successful
release-tag / release-image (push) Successful in 1m55s
test
2025-09-21 22:35:19 +02:00

548 lines
14 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"context"
"fmt"
"html/template"
"log"
"mime"
"net/http"
"net/http/httputil"
"net/url"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/go-chi/httprate"
"github.com/goccy/go-json"
"git.send.nrw/sendnrw/nginx-stream-server/internal/mtx"
)
var (
listen = env("LISTEN", ":8080")
mtxAPI = env("MTX_API", "http://127.0.0.1:9997")
mtxHLS = env("MTX_HLS", "http://127.0.0.1:8888")
streamsCSV = os.Getenv("STREAMS")
basicUser = os.Getenv("BASIC_AUTH_USER")
basicPass = os.Getenv("BASIC_AUTH_PASS")
// Root für Webassets (per ENV überschreibbar)
webRoot = env("WEB_ROOT", "web")
// Templates werden beim Start geladen
tpls *template.Template
)
// ----- SSE Broker -----
type sseClient chan []byte
type sseBroker struct {
mu sync.Mutex
clients map[sseClient]struct{}
}
func newSSEBroker() *sseBroker {
return &sseBroker{clients: make(map[sseClient]struct{})}
}
func (b *sseBroker) add(c sseClient) {
b.mu.Lock()
b.clients[c] = struct{}{}
b.mu.Unlock()
}
func (b *sseBroker) del(c sseClient) {
b.mu.Lock()
delete(b.clients, c)
b.mu.Unlock()
}
func (b *sseBroker) broadcast(msg []byte) {
b.mu.Lock()
for c := range b.clients {
select {
case c <- msg:
default: /* slow client, drop */
}
}
b.mu.Unlock()
}
var (
streamsJSONCache []byte
streamsJSONETag string
streamsCacheMu sync.RWMutex
sseB = newSSEBroker()
)
func eq(a, b []byte) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}
type hlsMuxersListResp struct {
Items []struct {
Name string `json:"name"`
Clients []interface{} `json:"clients"` // echte HLS-Viewer (HTTP-Clients)
} `json:"items"`
// manche Builds liefern "items" flach; das fangen wir ab:
Name string `json:"name"`
Clients []interface{} `json:"clients"`
}
func fetchHLSClientMap(ctx context.Context, base, user, pass string) (map[string]int, error) {
u := strings.TrimRight(base, "/") + "/v3/hlsmuxers/list"
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, err
}
if user != "" || pass != "" {
req.SetBasicAuth(user, pass)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
// Wenn noch kein einziger HLS-Client existiert, liefert MediaMTX oft 200 mit leerer Liste
// (oder 200 mit items=[]). 404 sollte hier nicht auftreten falls doch, behandeln wir es als "leer".
if resp.StatusCode == http.StatusNotFound {
return map[string]int{}, nil
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("hlsmuxers/list: %s", resp.Status)
}
var out hlsMuxersListResp
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
return nil, err
}
m := make(map[string]int)
if len(out.Items) > 0 {
for _, it := range out.Items {
m[it.Name] = len(it.Clients)
}
return m, nil
}
// Fallback: flache Struktur (ein einzelner Muxer)
if out.Name != "" {
m[out.Name] = len(out.Clients)
}
return m, nil
}
type mtxHLSPayload struct {
// v1.15.0 liefert einen Muxer mit Clients-Liste (Feldname kann "clients" heißen)
Item struct {
Name string `json:"name"`
Clients []interface{} `json:"clients"` // Anzahl = echte HLS-Zuschauer
} `json:"item"`
// manche Builds liefern flach:
Name string `json:"name"`
Clients []interface{} `json:"clients"`
}
func hlsViewers(ctx context.Context, base, user, pass, name string) (int, error) {
u := strings.TrimRight(base, "/") + "/v3/hlsmuxers/get/" + url.PathEscape(name)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if user != "" || pass != "" {
req.SetBasicAuth(user, pass)
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return 0, err
}
defer res.Body.Close()
if res.StatusCode != http.StatusOK {
return 0, fmt.Errorf("hlsmuxers/get %s: %s", name, res.Status)
}
var out mtxHLSPayload
if err := json.NewDecoder(res.Body).Decode(&out); err != nil {
return 0, err
}
if out.Item.Clients != nil {
return len(out.Item.Clients), nil
}
return len(out.Clients), nil
}
func init() {
_ = mime.AddExtensionType(".css", "text/css")
_ = mime.AddExtensionType(".js", "application/javascript")
_ = mime.AddExtensionType(".mjs", "application/javascript")
_ = mime.AddExtensionType(".woff2", "font/woff2")
_ = mime.AddExtensionType(".woff", "font/woff")
}
func env(k, def string) string {
if v := os.Getenv(k); v != "" {
return v
}
return def
}
func mustLoadTemplates() *template.Template {
pattern := filepath.Join(webRoot, "templates", "*.html")
t, err := template.ParseGlob(pattern)
if err != nil {
log.Fatalf("templates laden fehlgeschlagen (%s): %v", pattern, err)
}
return t
}
func main() {
log.SetFlags(log.LstdFlags | log.Lshortfile)
tpls = mustLoadTemplates()
r := chi.NewRouter()
// Strenge CSP (alles lokal)
r.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Security-Policy",
strings.Join([]string{
"default-src 'self'",
"img-src 'self' data:",
"style-src 'self'",
"font-src 'self'",
"script-src 'self'",
"connect-src 'self'",
"media-src 'self' blob:", // <— HIER blob: zulassen
"worker-src 'self' blob:",
}, "; "),
)
w.Header().Set("X-Content-Type-Options", "nosniff")
w.Header().Set("X-Frame-Options", "SAMEORIGIN")
w.Header().Set("Referrer-Policy", "no-referrer")
next.ServeHTTP(w, req)
})
})
// Rate Limit nur auf REST, nicht auf SSE:
r.Group(func(r chi.Router) {
r.Use(httprate.LimitByIP(240, time.Minute)) // fertig
r.Get("/api/streams", apiStreams)
})
// SSE ohne Rate-Limit (1 Verbindung pro Browser):
r.Get("/api/streams/events", sseHandler)
// Optional Basic Auth für Seiten
if basicUser != "" {
creds := basicUser + ":" + basicPass
r.Group(func(p chi.Router) {
p.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
a := strings.SplitN(r.Header.Get("Authorization"), " ", 2)
if len(a) == 2 && a[0] == "Basic" && a[1] == basic(creds) {
next.ServeHTTP(w, r)
return
}
w.Header().Set("WWW-Authenticate", "Basic realm=Restricted")
http.Error(w, "auth required", http.StatusUnauthorized)
})
})
p.Get("/", pageIndex)
p.Get("/{name}", pageStream)
})
} else {
r.Get("/", pageIndex)
r.Get("/{name}", pageStream)
}
// /static → echtes Dateisystem
staticDir := http.Dir(filepath.Join(webRoot, "static"))
r.Handle("/static/*",
http.StripPrefix("/static",
http.FileServer(staticDir),
),
)
// HLS-Reverse-Proxy
up, _ := url.Parse(mtxHLS)
proxy := httputil.NewSingleHostReverseProxy(up)
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
http.Error(w, "upstream error", http.StatusBadGateway)
}
r.Handle("/hls/*", http.StripPrefix("/hls", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if strings.Contains(r.URL.Path, "..") {
http.NotFound(w, r)
return
}
proxy.ServeHTTP(w, r)
})))
// Health
r.Get("/health", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) })
go func() {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for range t.C {
// gleiche Logik wie in apiStreams, nur ohne Response
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
c := &mtx.Client{BaseURL: mtxAPI, User: os.Getenv("MTX_API_USER"), Pass: os.Getenv("MTX_API_PASS")}
pl, err := c.Paths(ctx)
cancel()
if err != nil {
// optional: bei Fehler nichts senden
continue
}
allowed := map[string]bool{}
if streamsCSV != "" {
for _, s := range strings.Split(streamsCSV, ",") {
allowed[strings.TrimSpace(s)] = true
}
}
type item struct {
Name string `json:"name"`
Live bool `json:"live"`
Viewers int `json:"viewers"`
}
out := struct {
Items []item `json:"items"`
}{}
for _, p := range pl.Items {
if len(allowed) > 0 && !allowed[p.Name] {
continue
}
// echte HLS-Viewerzahlen einmalig holen (Batch)
hlsMap := map[string]int{}
{
ctxH, cancelH := context.WithTimeout(context.Background(), 2*time.Second)
tmp, err := fetchHLSClientMap(ctxH, mtxAPI, os.Getenv("MTX_API_USER"), os.Getenv("MTX_API_PASS"))
cancelH()
if err == nil && tmp != nil {
hlsMap = tmp
} else {
log.Printf("warn:a: hlsmuxers/list: %v", err)
}
}
viewers := 0
// 1) Echte HLS-Zuschauer aus der Batch-Map
if v, ok := hlsMap[p.Name]; ok {
viewers = v
}
// 2) Fallback: Nicht-HLS-Reader (oder wenn Map leer ist)
// Bei HLS ist p.Viewers() meist 1 (der Muxer), aber das wollen wir nur nutzen,
// wenn wir keinen HLS-Wert haben.
if viewers == 0 {
if v := p.Viewers(); v > 0 {
viewers = v
}
}
out.Items = append(out.Items, item{
Name: p.Name,
Live: p.Live(),
Viewers: viewers,
})
}
buf, _ := json.Marshal(out)
streamsCacheMu.RLock()
same := eq(buf, streamsJSONCache)
streamsCacheMu.RUnlock()
if same {
continue
}
streamsCacheMu.Lock()
streamsJSONCache = buf
streamsJSONETag = fmt.Sprintf("%x", len(buf))
streamsCacheMu.Unlock()
// an alle SSE-Clients senden
sseB.broadcast(buf)
}
}()
log.Printf("Dashboard listening on %s (API=%s HLS=%s, WEB_ROOT=%s)\n", listen, mtxAPI, mtxHLS, webRoot)
if err := http.ListenAndServe(listen, r); err != nil {
log.Fatal(err)
}
}
func sseHandler(w http.ResponseWriter, r *http.Request) {
// SSE-Header
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// CORS brauchst du nicht, gleiche Origin
// Flush unterstützen
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "stream unsupported", http.StatusInternalServerError)
return
}
// Client-Kanal registrieren
ch := make(sseClient, 8)
sseB.add(ch)
defer sseB.del(ch)
// Beim Connect sofort den aktuellen Snapshot schicken (falls vorhanden)
streamsCacheMu.RLock()
snap := streamsJSONCache
streamsCacheMu.RUnlock()
if len(snap) > 0 {
fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(snap))
flusher.Flush()
}
// Heartbeat (hält Proxies freundlich)
hb := time.NewTicker(15 * time.Second)
defer hb.Stop()
// Abbruch, wenn Client trennt
notify := r.Context().Done()
for {
select {
case <-notify:
return
case msg := <-ch:
// JSON als "update" senden
fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(msg))
flusher.Flush()
case <-hb.C:
// Kommentar als Ping
fmt.Fprintf(w, ": ping\n\n")
flusher.Flush()
}
}
}
func apiStreams(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
defer cancel()
c := &mtx.Client{BaseURL: mtxAPI, User: os.Getenv("MTX_API_USER"), Pass: os.Getenv("MTX_API_PASS")}
pl, err := c.Paths(ctx)
if err != nil {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusBadGateway)
_ = json.NewEncoder(w).Encode(map[string]any{"error": err.Error()})
return
}
allowed := map[string]bool{}
if streamsCSV != "" {
for _, s := range strings.Split(streamsCSV, ",") {
allowed[strings.TrimSpace(s)] = true
}
}
type item struct {
Name string `json:"name"`
Live bool `json:"live"`
Viewers int `json:"viewers"`
}
out := struct {
Items []item `json:"items"`
}{}
for _, p := range pl.Items {
if len(allowed) > 0 && !allowed[p.Name] {
continue
}
// echte HLS-Viewerzahlen einmalig holen (Batch)
hlsMap := map[string]int{}
{
ctxH, cancelH := context.WithTimeout(context.Background(), 2*time.Second)
tmp, err := fetchHLSClientMap(ctxH, mtxAPI, os.Getenv("MTX_API_USER"), os.Getenv("MTX_API_PASS"))
cancelH()
if err == nil && tmp != nil {
hlsMap = tmp
} else {
log.Printf("warn:b: hlsmuxers/list: %v", err)
}
}
viewers := 0
// 1) Echte HLS-Zuschauer aus der Batch-Map
if v, ok := hlsMap[p.Name]; ok {
viewers = v
}
// 2) Fallback: Nicht-HLS-Reader (oder wenn Map leer ist)
// Bei HLS ist p.Viewers() meist 1 (der Muxer), aber das wollen wir nur nutzen,
// wenn wir keinen HLS-Wert haben.
if viewers == 0 {
if v := p.Viewers(); v > 0 {
viewers = v
}
}
out.Items = append(out.Items, item{
Name: p.Name,
Live: p.Live(),
Viewers: viewers,
})
}
w.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(w).Encode(out)
}
func pageIndex(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := tpls.ExecuteTemplate(w, "index.html", nil); err != nil {
http.Error(w, err.Error(), 500)
}
}
func pageStream(w http.ResponseWriter, r *http.Request) {
name := chi.URLParam(r, "name")
w.Header().Set("Content-Type", "text/html; charset=utf-8")
if err := tpls.ExecuteTemplate(w, "stream.html", map[string]any{
"Name": name,
"JSONName": fmt.Sprintf("%q", name),
}); err != nil {
http.Error(w, err.Error(), 500)
}
}
// Basic helper bleibt, falls du Basic Auth nutzt
func basic(creds string) string {
const tbl = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
b := []byte(creds)
var out []byte
for i := 0; i < len(b); i += 3 {
var v uint32
n := 0
for j := 0; j < 3; j++ {
v <<= 8
if i+j < len(b) {
v |= uint32(b[i+j])
n++
}
}
for j := 0; j < 4; j++ {
if j <= n {
out = append(out, tbl[(v>>(18-6*uint(j)))&0x3f])
} else {
out = append(out, '=')
}
}
}
return string(out)
}