All checks were successful
release-tag / release-image (push) Successful in 1m54s
548 lines
14 KiB
Go
548 lines
14 KiB
Go
package main
|
||
|
||
import (
|
||
"context"
|
||
"fmt"
|
||
"html/template"
|
||
"log"
|
||
"mime"
|
||
"net/http"
|
||
"net/http/httputil"
|
||
"net/url"
|
||
"os"
|
||
"path/filepath"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/go-chi/chi/v5"
|
||
"github.com/go-chi/httprate"
|
||
"github.com/goccy/go-json"
|
||
|
||
"git.send.nrw/sendnrw/nginx-stream-server/internal/mtx"
|
||
)
|
||
|
||
var (
|
||
listen = env("LISTEN", ":8080")
|
||
mtxAPI = env("MTX_API", "http://127.0.0.1:9997")
|
||
mtxHLS = env("MTX_HLS", "http://127.0.0.1:8888")
|
||
streamsCSV = os.Getenv("STREAMS")
|
||
basicUser = os.Getenv("BASIC_AUTH_USER")
|
||
basicPass = os.Getenv("BASIC_AUTH_PASS")
|
||
|
||
// Root für Webassets (per ENV überschreibbar)
|
||
webRoot = env("WEB_ROOT", "web")
|
||
// Templates werden beim Start geladen
|
||
tpls *template.Template
|
||
)
|
||
|
||
// ----- SSE Broker -----
|
||
type sseClient chan []byte
|
||
|
||
type sseBroker struct {
|
||
mu sync.Mutex
|
||
clients map[sseClient]struct{}
|
||
}
|
||
|
||
func newSSEBroker() *sseBroker {
|
||
return &sseBroker{clients: make(map[sseClient]struct{})}
|
||
}
|
||
func (b *sseBroker) add(c sseClient) {
|
||
b.mu.Lock()
|
||
b.clients[c] = struct{}{}
|
||
b.mu.Unlock()
|
||
}
|
||
func (b *sseBroker) del(c sseClient) {
|
||
b.mu.Lock()
|
||
delete(b.clients, c)
|
||
b.mu.Unlock()
|
||
}
|
||
func (b *sseBroker) broadcast(msg []byte) {
|
||
b.mu.Lock()
|
||
for c := range b.clients {
|
||
select {
|
||
case c <- msg:
|
||
default: /* slow client, drop */
|
||
}
|
||
}
|
||
b.mu.Unlock()
|
||
}
|
||
|
||
var (
|
||
streamsJSONCache []byte
|
||
streamsJSONETag string
|
||
streamsCacheMu sync.RWMutex
|
||
sseB = newSSEBroker()
|
||
)
|
||
|
||
func eq(a, b []byte) bool {
|
||
if len(a) != len(b) {
|
||
return false
|
||
}
|
||
for i := range a {
|
||
if a[i] != b[i] {
|
||
return false
|
||
}
|
||
}
|
||
return true
|
||
}
|
||
|
||
type hlsMuxersListResp struct {
|
||
Items []struct {
|
||
Name string `json:"name"`
|
||
Clients []interface{} `json:"clients"` // echte HLS-Viewer (HTTP-Clients)
|
||
} `json:"items"`
|
||
// manche Builds liefern "items" flach; das fangen wir ab:
|
||
Name string `json:"name"`
|
||
Clients []interface{} `json:"clients"`
|
||
}
|
||
|
||
func fetchHLSClientMap(ctx context.Context, base, user, pass string) (map[string]int, error) {
|
||
u := strings.TrimRight(base, "/") + "/v3/hlsmuxers/list"
|
||
|
||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
if user != "" || pass != "" {
|
||
req.SetBasicAuth(user, pass)
|
||
}
|
||
|
||
resp, err := http.DefaultClient.Do(req)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
defer resp.Body.Close()
|
||
|
||
// Wenn noch kein einziger HLS-Client existiert, liefert MediaMTX oft 200 mit leerer Liste
|
||
// (oder 200 mit items=[]). 404 sollte hier nicht auftreten – falls doch, behandeln wir es als "leer".
|
||
if resp.StatusCode == http.StatusNotFound {
|
||
return map[string]int{}, nil
|
||
}
|
||
if resp.StatusCode != http.StatusOK {
|
||
return nil, fmt.Errorf("hlsmuxers/list: %s", resp.Status)
|
||
}
|
||
|
||
var out hlsMuxersListResp
|
||
if err := json.NewDecoder(resp.Body).Decode(&out); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
m := make(map[string]int)
|
||
|
||
if len(out.Items) > 0 {
|
||
for _, it := range out.Items {
|
||
m[it.Name] = len(it.Clients)
|
||
}
|
||
return m, nil
|
||
}
|
||
|
||
// Fallback: flache Struktur (ein einzelner Muxer)
|
||
if out.Name != "" {
|
||
m[out.Name] = len(out.Clients)
|
||
}
|
||
return m, nil
|
||
}
|
||
|
||
type mtxHLSPayload struct {
|
||
// v1.15.0 liefert einen Muxer mit Clients-Liste (Feldname kann "clients" heißen)
|
||
Item struct {
|
||
Name string `json:"name"`
|
||
Clients []interface{} `json:"clients"` // Anzahl = echte HLS-Zuschauer
|
||
} `json:"item"`
|
||
// manche Builds liefern flach:
|
||
Name string `json:"name"`
|
||
Clients []interface{} `json:"clients"`
|
||
}
|
||
|
||
func hlsViewers(ctx context.Context, base, user, pass, name string) (int, error) {
|
||
u := strings.TrimRight(base, "/") + "/v3/hlsmuxers/get/" + url.PathEscape(name)
|
||
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
|
||
if user != "" || pass != "" {
|
||
req.SetBasicAuth(user, pass)
|
||
}
|
||
res, err := http.DefaultClient.Do(req)
|
||
if err != nil {
|
||
return 0, err
|
||
}
|
||
defer res.Body.Close()
|
||
if res.StatusCode != http.StatusOK {
|
||
return 0, fmt.Errorf("hlsmuxers/get %s: %s", name, res.Status)
|
||
}
|
||
var out mtxHLSPayload
|
||
if err := json.NewDecoder(res.Body).Decode(&out); err != nil {
|
||
return 0, err
|
||
}
|
||
if out.Item.Clients != nil {
|
||
return len(out.Item.Clients), nil
|
||
}
|
||
return len(out.Clients), nil
|
||
}
|
||
|
||
func init() {
|
||
_ = mime.AddExtensionType(".css", "text/css")
|
||
_ = mime.AddExtensionType(".js", "application/javascript")
|
||
_ = mime.AddExtensionType(".mjs", "application/javascript")
|
||
_ = mime.AddExtensionType(".woff2", "font/woff2")
|
||
_ = mime.AddExtensionType(".woff", "font/woff")
|
||
}
|
||
|
||
func env(k, def string) string {
|
||
if v := os.Getenv(k); v != "" {
|
||
return v
|
||
}
|
||
return def
|
||
}
|
||
|
||
func mustLoadTemplates() *template.Template {
|
||
pattern := filepath.Join(webRoot, "templates", "*.html")
|
||
t, err := template.ParseGlob(pattern)
|
||
if err != nil {
|
||
log.Fatalf("templates laden fehlgeschlagen (%s): %v", pattern, err)
|
||
}
|
||
return t
|
||
}
|
||
|
||
func main() {
|
||
log.SetFlags(log.LstdFlags | log.Lshortfile)
|
||
tpls = mustLoadTemplates()
|
||
|
||
r := chi.NewRouter()
|
||
// Strenge CSP (alles lokal)
|
||
r.Use(func(next http.Handler) http.Handler {
|
||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||
w.Header().Set("Content-Security-Policy",
|
||
strings.Join([]string{
|
||
"default-src 'self'",
|
||
"img-src 'self' data:",
|
||
"style-src 'self'",
|
||
"font-src 'self'",
|
||
"script-src 'self'",
|
||
"connect-src 'self'",
|
||
"media-src 'self' blob:", // <— HIER blob: zulassen
|
||
"worker-src 'self' blob:",
|
||
}, "; "),
|
||
)
|
||
w.Header().Set("X-Content-Type-Options", "nosniff")
|
||
w.Header().Set("X-Frame-Options", "SAMEORIGIN")
|
||
w.Header().Set("Referrer-Policy", "no-referrer")
|
||
next.ServeHTTP(w, req)
|
||
})
|
||
})
|
||
|
||
// Rate Limit nur auf REST, nicht auf SSE:
|
||
r.Group(func(r chi.Router) {
|
||
r.Use(httprate.LimitByIP(240, time.Minute)) // fertig
|
||
r.Get("/api/streams", apiStreams)
|
||
})
|
||
|
||
// SSE ohne Rate-Limit (1 Verbindung pro Browser):
|
||
r.Get("/api/streams/events", sseHandler)
|
||
|
||
// Optional Basic Auth für Seiten
|
||
if basicUser != "" {
|
||
creds := basicUser + ":" + basicPass
|
||
r.Group(func(p chi.Router) {
|
||
p.Use(func(next http.Handler) http.Handler {
|
||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||
a := strings.SplitN(r.Header.Get("Authorization"), " ", 2)
|
||
if len(a) == 2 && a[0] == "Basic" && a[1] == basic(creds) {
|
||
next.ServeHTTP(w, r)
|
||
return
|
||
}
|
||
w.Header().Set("WWW-Authenticate", "Basic realm=Restricted")
|
||
http.Error(w, "auth required", http.StatusUnauthorized)
|
||
})
|
||
})
|
||
p.Get("/", pageIndex)
|
||
p.Get("/{name}", pageStream)
|
||
})
|
||
} else {
|
||
r.Get("/", pageIndex)
|
||
r.Get("/{name}", pageStream)
|
||
}
|
||
|
||
// /static → echtes Dateisystem
|
||
staticDir := http.Dir(filepath.Join(webRoot, "static"))
|
||
r.Handle("/static/*",
|
||
http.StripPrefix("/static",
|
||
http.FileServer(staticDir),
|
||
),
|
||
)
|
||
|
||
// HLS-Reverse-Proxy
|
||
up, _ := url.Parse(mtxHLS)
|
||
proxy := httputil.NewSingleHostReverseProxy(up)
|
||
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, e error) {
|
||
http.Error(w, "upstream error", http.StatusBadGateway)
|
||
}
|
||
r.Handle("/hls/*", http.StripPrefix("/hls", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||
if strings.Contains(r.URL.Path, "..") {
|
||
http.NotFound(w, r)
|
||
return
|
||
}
|
||
proxy.ServeHTTP(w, r)
|
||
})))
|
||
|
||
// Health
|
||
r.Get("/health", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) })
|
||
|
||
go func() {
|
||
t := time.NewTicker(1 * time.Second)
|
||
defer t.Stop()
|
||
for range t.C {
|
||
// gleiche Logik wie in apiStreams, nur ohne Response
|
||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||
c := &mtx.Client{BaseURL: mtxAPI, User: os.Getenv("MTX_API_USER"), Pass: os.Getenv("MTX_API_PASS")}
|
||
pl, err := c.Paths(ctx)
|
||
cancel()
|
||
if err != nil {
|
||
// optional: bei Fehler nichts senden
|
||
continue
|
||
}
|
||
|
||
allowed := map[string]bool{}
|
||
if streamsCSV != "" {
|
||
for _, s := range strings.Split(streamsCSV, ",") {
|
||
allowed[strings.TrimSpace(s)] = true
|
||
}
|
||
}
|
||
type item struct {
|
||
Name string `json:"name"`
|
||
Live bool `json:"live"`
|
||
Viewers int `json:"viewers"`
|
||
}
|
||
out := struct {
|
||
Items []item `json:"items"`
|
||
}{}
|
||
for _, p := range pl.Items {
|
||
if len(allowed) > 0 && !allowed[p.Name] {
|
||
continue
|
||
}
|
||
|
||
// echte HLS-Viewerzahlen einmalig holen (Batch)
|
||
hlsMap := map[string]int{}
|
||
{
|
||
ctxH, cancelH := context.WithTimeout(context.Background(), 2*time.Second)
|
||
tmp, err := fetchHLSClientMap(ctxH, mtxAPI, os.Getenv("MTX_API_USER"), os.Getenv("MTX_API_PASS"))
|
||
cancelH()
|
||
if err == nil && tmp != nil {
|
||
hlsMap = tmp
|
||
} else {
|
||
// optional: log.Printf("warn: hlsmuxers/list: %v", err)
|
||
}
|
||
}
|
||
|
||
viewers := 0
|
||
|
||
// 1) Echte HLS-Zuschauer aus der Batch-Map
|
||
if v, ok := hlsMap[p.Name]; ok {
|
||
viewers = v
|
||
}
|
||
|
||
// 2) Fallback: Nicht-HLS-Reader (oder wenn Map leer ist)
|
||
// Bei HLS ist p.Viewers() meist 1 (der Muxer), aber das wollen wir nur nutzen,
|
||
// wenn wir keinen HLS-Wert haben.
|
||
if viewers == 0 {
|
||
if v := p.Viewers(); v > 0 {
|
||
viewers = v
|
||
}
|
||
}
|
||
|
||
out.Items = append(out.Items, item{
|
||
Name: p.Name,
|
||
Live: p.Live(),
|
||
Viewers: viewers,
|
||
})
|
||
}
|
||
|
||
buf, _ := json.Marshal(out)
|
||
|
||
streamsCacheMu.RLock()
|
||
same := eq(buf, streamsJSONCache)
|
||
streamsCacheMu.RUnlock()
|
||
if same {
|
||
continue
|
||
}
|
||
|
||
streamsCacheMu.Lock()
|
||
streamsJSONCache = buf
|
||
streamsJSONETag = fmt.Sprintf("%x", len(buf))
|
||
streamsCacheMu.Unlock()
|
||
|
||
// an alle SSE-Clients senden
|
||
sseB.broadcast(buf)
|
||
}
|
||
}()
|
||
|
||
log.Printf("Dashboard listening on %s (API=%s HLS=%s, WEB_ROOT=%s)\n", listen, mtxAPI, mtxHLS, webRoot)
|
||
if err := http.ListenAndServe(listen, r); err != nil {
|
||
log.Fatal(err)
|
||
}
|
||
}
|
||
|
||
func sseHandler(w http.ResponseWriter, r *http.Request) {
|
||
// SSE-Header
|
||
w.Header().Set("Content-Type", "text/event-stream")
|
||
w.Header().Set("Cache-Control", "no-cache")
|
||
w.Header().Set("Connection", "keep-alive")
|
||
// CORS brauchst du nicht, gleiche Origin
|
||
|
||
// Flush unterstützen
|
||
flusher, ok := w.(http.Flusher)
|
||
if !ok {
|
||
http.Error(w, "stream unsupported", http.StatusInternalServerError)
|
||
return
|
||
}
|
||
|
||
// Client-Kanal registrieren
|
||
ch := make(sseClient, 8)
|
||
sseB.add(ch)
|
||
defer sseB.del(ch)
|
||
|
||
// Beim Connect sofort den aktuellen Snapshot schicken (falls vorhanden)
|
||
streamsCacheMu.RLock()
|
||
snap := streamsJSONCache
|
||
streamsCacheMu.RUnlock()
|
||
if len(snap) > 0 {
|
||
fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(snap))
|
||
flusher.Flush()
|
||
}
|
||
|
||
// Heartbeat (hält Proxies freundlich)
|
||
hb := time.NewTicker(15 * time.Second)
|
||
defer hb.Stop()
|
||
|
||
// Abbruch, wenn Client trennt
|
||
notify := r.Context().Done()
|
||
|
||
for {
|
||
select {
|
||
case <-notify:
|
||
return
|
||
case msg := <-ch:
|
||
// JSON als "update" senden
|
||
fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(msg))
|
||
flusher.Flush()
|
||
case <-hb.C:
|
||
// Kommentar als Ping
|
||
fmt.Fprintf(w, ": ping\n\n")
|
||
flusher.Flush()
|
||
}
|
||
}
|
||
}
|
||
|
||
func apiStreams(w http.ResponseWriter, r *http.Request) {
|
||
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||
defer cancel()
|
||
|
||
c := &mtx.Client{BaseURL: mtxAPI, User: os.Getenv("MTX_API_USER"), Pass: os.Getenv("MTX_API_PASS")}
|
||
pl, err := c.Paths(ctx)
|
||
if err != nil {
|
||
w.Header().Set("Content-Type", "application/json")
|
||
w.WriteHeader(http.StatusBadGateway)
|
||
_ = json.NewEncoder(w).Encode(map[string]any{"error": err.Error()})
|
||
return
|
||
}
|
||
|
||
allowed := map[string]bool{}
|
||
if streamsCSV != "" {
|
||
for _, s := range strings.Split(streamsCSV, ",") {
|
||
allowed[strings.TrimSpace(s)] = true
|
||
}
|
||
}
|
||
type item struct {
|
||
Name string `json:"name"`
|
||
Live bool `json:"live"`
|
||
Viewers int `json:"viewers"`
|
||
}
|
||
out := struct {
|
||
Items []item `json:"items"`
|
||
}{}
|
||
for _, p := range pl.Items {
|
||
if len(allowed) > 0 && !allowed[p.Name] {
|
||
continue
|
||
}
|
||
// echte HLS-Viewerzahlen einmalig holen (Batch)
|
||
hlsMap := map[string]int{}
|
||
{
|
||
ctxH, cancelH := context.WithTimeout(context.Background(), 2*time.Second)
|
||
tmp, err := fetchHLSClientMap(ctxH, mtxAPI, os.Getenv("MTX_API_USER"), os.Getenv("MTX_API_PASS"))
|
||
cancelH()
|
||
if err == nil && tmp != nil {
|
||
hlsMap = tmp
|
||
} else {
|
||
// optional: log.Printf("warn: hlsmuxers/list: %v", err)
|
||
}
|
||
}
|
||
|
||
viewers := 0
|
||
|
||
// 1) Echte HLS-Zuschauer aus der Batch-Map
|
||
if v, ok := hlsMap[p.Name]; ok {
|
||
viewers = v
|
||
}
|
||
|
||
// 2) Fallback: Nicht-HLS-Reader (oder wenn Map leer ist)
|
||
// Bei HLS ist p.Viewers() meist 1 (der Muxer), aber das wollen wir nur nutzen,
|
||
// wenn wir keinen HLS-Wert haben.
|
||
if viewers == 0 {
|
||
if v := p.Viewers(); v > 0 {
|
||
viewers = v
|
||
}
|
||
}
|
||
|
||
out.Items = append(out.Items, item{
|
||
Name: p.Name,
|
||
Live: p.Live(),
|
||
Viewers: viewers,
|
||
})
|
||
}
|
||
w.Header().Set("Content-Type", "application/json")
|
||
_ = json.NewEncoder(w).Encode(out)
|
||
}
|
||
|
||
func pageIndex(w http.ResponseWriter, r *http.Request) {
|
||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||
if err := tpls.ExecuteTemplate(w, "index.html", nil); err != nil {
|
||
http.Error(w, err.Error(), 500)
|
||
}
|
||
}
|
||
|
||
func pageStream(w http.ResponseWriter, r *http.Request) {
|
||
name := chi.URLParam(r, "name")
|
||
w.Header().Set("Content-Type", "text/html; charset=utf-8")
|
||
if err := tpls.ExecuteTemplate(w, "stream.html", map[string]any{
|
||
"Name": name,
|
||
"JSONName": fmt.Sprintf("%q", name),
|
||
}); err != nil {
|
||
http.Error(w, err.Error(), 500)
|
||
}
|
||
}
|
||
|
||
// Basic helper bleibt, falls du Basic Auth nutzt
|
||
func basic(creds string) string {
|
||
const tbl = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/"
|
||
b := []byte(creds)
|
||
var out []byte
|
||
for i := 0; i < len(b); i += 3 {
|
||
var v uint32
|
||
n := 0
|
||
for j := 0; j < 3; j++ {
|
||
v <<= 8
|
||
if i+j < len(b) {
|
||
v |= uint32(b[i+j])
|
||
n++
|
||
}
|
||
}
|
||
for j := 0; j < 4; j++ {
|
||
if j <= n {
|
||
out = append(out, tbl[(v>>(18-6*uint(j)))&0x3f])
|
||
} else {
|
||
out = append(out, '=')
|
||
}
|
||
}
|
||
}
|
||
return string(out)
|
||
}
|