This commit is contained in:
@@ -12,6 +12,7 @@ import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
@@ -35,6 +36,57 @@ var (
|
||||
tpls *template.Template
|
||||
)
|
||||
|
||||
// ----- SSE Broker -----
|
||||
type sseClient chan []byte
|
||||
|
||||
type sseBroker struct {
|
||||
mu sync.Mutex
|
||||
clients map[sseClient]struct{}
|
||||
}
|
||||
|
||||
func newSSEBroker() *sseBroker {
|
||||
return &sseBroker{clients: make(map[sseClient]struct{})}
|
||||
}
|
||||
func (b *sseBroker) add(c sseClient) {
|
||||
b.mu.Lock()
|
||||
b.clients[c] = struct{}{}
|
||||
b.mu.Unlock()
|
||||
}
|
||||
func (b *sseBroker) del(c sseClient) {
|
||||
b.mu.Lock()
|
||||
delete(b.clients, c)
|
||||
b.mu.Unlock()
|
||||
}
|
||||
func (b *sseBroker) broadcast(msg []byte) {
|
||||
b.mu.Lock()
|
||||
for c := range b.clients {
|
||||
select {
|
||||
case c <- msg:
|
||||
default: /* slow client, drop */
|
||||
}
|
||||
}
|
||||
b.mu.Unlock()
|
||||
}
|
||||
|
||||
var (
|
||||
streamsJSONCache []byte
|
||||
streamsJSONETag string
|
||||
streamsCacheMu sync.RWMutex
|
||||
sseB = newSSEBroker()
|
||||
)
|
||||
|
||||
func eq(a, b []byte) bool {
|
||||
if len(a) != len(b) {
|
||||
return false
|
||||
}
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func init() {
|
||||
_ = mime.AddExtensionType(".css", "text/css")
|
||||
_ = mime.AddExtensionType(".js", "application/javascript")
|
||||
@@ -86,12 +138,15 @@ func main() {
|
||||
})
|
||||
})
|
||||
|
||||
// API
|
||||
// Rate Limit nur auf REST, nicht auf SSE:
|
||||
r.Group(func(r chi.Router) {
|
||||
r.Use(httprate.LimitByIP(30, time.Minute))
|
||||
r.Use(httprate.LimitByIP(240, time.Minute)) // fertig
|
||||
r.Get("/api/streams", apiStreams)
|
||||
})
|
||||
|
||||
// SSE ohne Rate-Limit (1 Verbindung pro Browser):
|
||||
r.Get("/api/streams/events", sseHandler)
|
||||
|
||||
// Optional Basic Auth für Seiten
|
||||
if basicUser != "" {
|
||||
creds := basicUser + ":" + basicPass
|
||||
@@ -140,12 +195,117 @@ func main() {
|
||||
// Health
|
||||
r.Get("/health", func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("ok")) })
|
||||
|
||||
go func() {
|
||||
t := time.NewTicker(1 * time.Second)
|
||||
defer t.Stop()
|
||||
for range t.C {
|
||||
// gleiche Logik wie in apiStreams, nur ohne Response
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
c := &mtx.Client{BaseURL: mtxAPI, User: os.Getenv("MTX_API_USER"), Pass: os.Getenv("MTX_API_PASS")}
|
||||
pl, err := c.Paths(ctx)
|
||||
cancel()
|
||||
if err != nil {
|
||||
// optional: bei Fehler nichts senden
|
||||
continue
|
||||
}
|
||||
|
||||
allowed := map[string]bool{}
|
||||
if streamsCSV != "" {
|
||||
for _, s := range strings.Split(streamsCSV, ",") {
|
||||
allowed[strings.TrimSpace(s)] = true
|
||||
}
|
||||
}
|
||||
type item struct {
|
||||
Name string `json:"name"`
|
||||
Live bool `json:"live"`
|
||||
Viewers int `json:"viewers"`
|
||||
}
|
||||
out := struct {
|
||||
Items []item `json:"items"`
|
||||
}{}
|
||||
for _, p := range pl.Items {
|
||||
if len(allowed) > 0 && !allowed[p.Name] {
|
||||
continue
|
||||
}
|
||||
out.Items = append(out.Items, item{Name: p.Name, Live: p.Live(), Viewers: p.Viewers()})
|
||||
}
|
||||
|
||||
buf, _ := json.Marshal(out)
|
||||
|
||||
streamsCacheMu.RLock()
|
||||
same := eq(buf, streamsJSONCache)
|
||||
streamsCacheMu.RUnlock()
|
||||
if same {
|
||||
continue
|
||||
}
|
||||
|
||||
streamsCacheMu.Lock()
|
||||
streamsJSONCache = buf
|
||||
streamsJSONETag = fmt.Sprintf("%x", len(buf))
|
||||
streamsCacheMu.Unlock()
|
||||
|
||||
// an alle SSE-Clients senden
|
||||
sseB.broadcast(buf)
|
||||
}
|
||||
}()
|
||||
|
||||
log.Printf("Dashboard listening on %s (API=%s HLS=%s, WEB_ROOT=%s)\n", listen, mtxAPI, mtxHLS, webRoot)
|
||||
if err := http.ListenAndServe(listen, r); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
func sseHandler(w http.ResponseWriter, r *http.Request) {
|
||||
// SSE-Header
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
// CORS brauchst du nicht, gleiche Origin
|
||||
|
||||
// Flush unterstützen
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
http.Error(w, "stream unsupported", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
// Client-Kanal registrieren
|
||||
ch := make(sseClient, 8)
|
||||
sseB.add(ch)
|
||||
defer sseB.del(ch)
|
||||
|
||||
// Beim Connect sofort den aktuellen Snapshot schicken (falls vorhanden)
|
||||
streamsCacheMu.RLock()
|
||||
snap := streamsJSONCache
|
||||
streamsCacheMu.RUnlock()
|
||||
if len(snap) > 0 {
|
||||
fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(snap))
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
// Heartbeat (hält Proxies freundlich)
|
||||
hb := time.NewTicker(15 * time.Second)
|
||||
defer hb.Stop()
|
||||
|
||||
// Abbruch, wenn Client trennt
|
||||
notify := r.Context().Done()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-notify:
|
||||
return
|
||||
case msg := <-ch:
|
||||
// JSON als "update" senden
|
||||
fmt.Fprintf(w, "event: update\ndata: %s\n\n", string(msg))
|
||||
flusher.Flush()
|
||||
case <-hb.C:
|
||||
// Kommentar als Ping
|
||||
fmt.Fprintf(w, ": ping\n\n")
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func apiStreams(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 3*time.Second)
|
||||
defer cancel()
|
||||
|
||||
Reference in New Issue
Block a user