From a39e2aaade1a7b0032ca31d1bcdaf5518e023d55 Mon Sep 17 00:00:00 2001 From: jbergner Date: Sun, 21 Sep 2025 20:56:22 +0200 Subject: [PATCH] test mit webevents --- cmd/dashboard/main.go | 164 +++++++++++++++++++++++++++++++++++++++- go.mod | 4 +- go.sum | 8 +- web/static/js/index.js | 73 ++++++++++-------- web/static/js/stream.js | 19 ++++- 5 files changed, 226 insertions(+), 42 deletions(-) diff --git a/cmd/dashboard/main.go b/cmd/dashboard/main.go index 4e69857..d2e1123 100644 --- a/cmd/dashboard/main.go +++ b/cmd/dashboard/main.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "strings" + "sync" "time" "github.com/go-chi/chi/v5" @@ -35,6 +36,57 @@ var ( tpls *template.Template ) +// ----- SSE Broker ----- +type sseClient chan []byte + +type sseBroker struct { + mu sync.Mutex + clients map[sseClient]struct{} +} + +func newSSEBroker() *sseBroker { + return &sseBroker{clients: make(map[sseClient]struct{})} +} +func (b *sseBroker) add(c sseClient) { + b.mu.Lock() + b.clients[c] = struct{}{} + b.mu.Unlock() +} +func (b *sseBroker) del(c sseClient) { + b.mu.Lock() + delete(b.clients, c) + b.mu.Unlock() +} +func (b *sseBroker) broadcast(msg []byte) { + b.mu.Lock() + for c := range b.clients { + select { + case c <- msg: + default: /* slow client, drop */ + } + } + b.mu.Unlock() +} + +var ( + streamsJSONCache []byte + streamsJSONETag string + streamsCacheMu sync.RWMutex + sseB = newSSEBroker() +) + +func eq(a, b []byte) bool { + if len(a) != len(b) { + return false + } + for i := range a { + if a[i] != b[i] { + return false + } + } + return true +} + func init() { _ = mime.AddExtensionType(".css", "text/css") _ = mime.AddExtensionType(".js", "application/javascript") @@ -86,12 +138,15 @@ func main() { }) }) - // API + // Rate Limit nur auf REST, nicht auf SSE: r.Group(func(r chi.Router) { - r.Use(httprate.LimitByIP(30, time.Minute)) + r.Use(httprate.LimitByIP(240, time.Minute)) // fertig r.Get("/api/streams", apiStreams) }) + // SSE ohne Rate-Limit (1 Verbindung pro Browser): + r.Get("/api/streams/events", sseHandler) + // Optional Basic Auth für Seiten if basicUser != "" { creds := basicUser + ":" + basicPass @@ -140,12 +195,117 @@ func main() { // Health r.Get("/health", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) }) + go func() { + t := time.NewTicker(1 * time.Second) + defer t.Stop() + for range t.C { + // gleiche Logik wie in apiStreams, nur ohne Response + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + c := &mtx.Client{BaseURL: mtxAPI, User: os.Getenv("MTX_API_USER"), Pass: os.Getenv("MTX_API_PASS")} + pl, err := c.Paths(ctx) + cancel() + if err != nil { + // optional: bei Fehler nichts senden + continue + } + + allowed := map[string]bool{} + if streamsCSV != "" { + for _, s := range strings.Split(streamsCSV, ",") { + allowed[strings.TrimSpace(s)] = true + } + } + type item struct { + Name string `json:"name"` + Live bool `json:"live"` + Viewers int `json:"viewers"` + } + out := struct { + Items []item `json:"items"` + }{} + for _, p := range pl.Items { + if len(allowed) > 0 && !allowed[p.Name] { + continue + } + out.Items = append(out.Items, item{Name: p.Name, Live: p.Live(), Viewers: p.Viewers()}) + } + + buf, _ := json.Marshal(out) + + streamsCacheMu.RLock() + same := eq(buf, streamsJSONCache) + streamsCacheMu.RUnlock() + if same { + continue + } + + streamsCacheMu.Lock() + streamsJSONCache = buf + streamsJSONETag = fmt.Sprintf("%x", len(buf)) + streamsCacheMu.Unlock() + + // an alle SSE-Clients senden + sseB.broadcast(buf) + } + }() + log.Printf("Dashboard listening on %s (API=%s HLS=%s, WEB_ROOT=%s)\n", listen, mtxAPI, mtxHLS, webRoot) if err := http.ListenAndServe(listen, r); err != nil { log.Fatal(err) } } +func sseHandler(w http.ResponseWriter, r *http.Request) { + // SSE-Header + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + // CORS brauchst du nicht, gleiche Origin + + // Flush unterstützen + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "stream unsupported", http.StatusInternalServerError) + return + } + + // Client-Kanal registrieren + ch := make(sseClient, 8) + sseB.add(ch) + defer sseB.del(ch) + + // Beim Connect sofort den aktuellen Snapshot schicken (falls vorhanden) + streamsCacheMu.RLock() + snap := streamsJSONCache + streamsCacheMu.RUnlock() + if len(snap) > 0 { + fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(snap)) + flusher.Flush() + } + + // Heartbeat (hält Proxies freundlich) + hb := time.NewTicker(15 * time.Second) + defer hb.Stop() + + // Abbruch, wenn Client trennt + notify := r.Context().Done() + + for { + select { + case <-notify: + return + case msg := <-ch: + // JSON als "update" senden + fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(msg)) + flusher.Flush() + case <-hb.C: + // Kommentar als Ping + fmt.Fprintf(w, ": ping\n\n") + flusher.Flush() + } + } +} + func apiStreams(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second) defer cancel() diff --git a/go.mod b/go.mod index 5814038..06e2ecd 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( ) require ( - github.com/klauspost/cpuid/v2 v2.2.10 // indirect + github.com/klauspost/cpuid/v2 v2.3.0 // indirect github.com/zeebo/xxh3 v1.0.2 // indirect - golang.org/x/sys v0.30.0 // indirect + golang.org/x/sys v0.36.0 // indirect ) diff --git a/go.sum b/go.sum index 4b22399..2dd7cf3 100644 --- a/go.sum +++ b/go.sum @@ -4,13 +4,13 @@ github.com/go-chi/httprate v0.15.0 h1:j54xcWV9KGmPf/X4H32/aTH+wBlrvxL7P+SdnRqxh5 github.com/go-chi/httprate v0.15.0/go.mod h1:rzGHhVrsBn3IMLYDOZQsSU4fJNWcjui4fWKJcCId1R4= github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= -github.com/klauspost/cpuid/v2 v2.2.10 h1:tBs3QSyvjDyFTq3uoc/9xFpCuOsJQFNPiAhYdw2skhE= -github.com/klauspost/cpuid/v2 v2.2.10/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= +github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= +github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= golang.org/x/sync v0.12.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.36.0 h1:KVRy2GtZBrk1cBYA7MKu5bEZFxQk4NIDV6RLVcC8o0k= +golang.org/x/sys v0.36.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= diff --git a/web/static/js/index.js b/web/static/js/index.js index 10e70b9..f247e93 100644 --- a/web/static/js/index.js +++ b/web/static/js/index.js @@ -1,35 +1,42 @@ -async function load(){ - let data; - try { - const r = await fetch('/api/streams', { cache: 'no-store' }); - if (!r.ok) throw new Error('api '+r.status); - data = await r.json(); - } catch (e) { - console.warn('streams api error:', e); - // UI freundlich degradieren - document.getElementById('list').innerHTML = - '
Keine Daten (API-Fehler)
'; - return; - } - const q = (document.getElementById('filter').value||'').toLowerCase(); +(function(){ const list = document.getElementById('list'); - list.innerHTML = ''; - data.items.filter(it => !q || it.name.toLowerCase().includes(q)).forEach(it => { - const a = document.createElement('a'); - a.href = '/' + encodeURIComponent(it.name); - a.className = 'card'; - a.innerHTML = ` -
-
-
${it.name}
-
Zuschauer: ${it.viewers}
-
-
${it.live ? 'LIVE' : 'Offline'}
-
`; - list.appendChild(a); + const filter = document.getElementById('filter'); + + function render(data){ + const q = (filter.value||'').toLowerCase(); + list.innerHTML = ''; + data.items + .filter(it => !q || it.name.toLowerCase().includes(q)) + .forEach(it => { + const a = document.createElement('a'); + a.href = '/' + encodeURIComponent(it.name); + a.className = 'card'; + a.innerHTML = ` +
+
+
${it.name}
+
Zuschauer: ${it.viewers ?? 0}
+
+
${it.live ? 'LIVE' : 'Offline'}
+
`; + list.appendChild(a); + }); + } + + filter.addEventListener('input', ()=>{/* re-render mit letztem snapshot */} + ); + + let last = {items:[]}; + const es = new EventSource('/api/streams/events', { withCredentials: false }); + es.addEventListener('update', (ev)=>{ + try { + last = JSON.parse(ev.data); + render(last); + } catch(e) { console.warn('sse parse', e); } }); -} -document.getElementById('filter').addEventListener('input', load); -document.getElementById('reload').addEventListener('click', load); -load(); -setInterval(load, 3000); + es.onerror = (e)=>console.warn('sse error', e); + + // Optionaler Reload-Button: + const btn = document.getElementById('reload'); + if (btn) btn.addEventListener('click', ()=>render(last)); +})(); diff --git a/web/static/js/stream.js b/web/static/js/stream.js index 8cc1fab..1106d2d 100644 --- a/web/static/js/stream.js +++ b/web/static/js/stream.js @@ -96,6 +96,23 @@ await new Promise(r => setTimeout(r, 1200)); } refreshMeta(); - setInterval(refreshMeta, 2500); + //setInterval(refreshMeta, 2500); })(); + + + const es = new EventSource('/api/streams/events'); + es.addEventListener('update', (ev)=>{ + try { + const data = JSON.parse(ev.data); + const it = data.items.find(x=>x.name===name); + if (!it) return; + const apiLive = !!it.live; + const combinedLive = playerLive || apiLive; + setLive(combinedLive); + + let viewers = it.viewers ?? 0; + if (combinedLive && viewers === 0) viewers = '≥1'; + viewersEl.textContent = 'Zuschauer: ' + viewers; + } catch(e){ /* ignore */ } + }); })();