Files
newt/internal/telemetry/state_view.go
2025-10-11 18:19:51 +02:00

107 lines
3.1 KiB
Go

package telemetry
import (
"context"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)
// StateView provides a read-only view for observable gauges.
// Implementations must be concurrency-safe and avoid blocking operations.
// All methods should be fast and use RLocks where applicable.
type StateView interface {
// ListSites returns a stable, low-cardinality list of site IDs to expose.
ListSites() []string
// Online returns whether the site is online.
Online(siteID string) (online bool, ok bool)
// LastHeartbeat returns the last heartbeat time for a site.
LastHeartbeat(siteID string) (t time.Time, ok bool)
// ActiveSessions returns the current number of active sessions for a site (across tunnels),
// or scoped to site if your model is site-scoped.
ActiveSessions(siteID string) (n int64, ok bool)
}
var (
stateView atomic.Value // of type StateView
)
// RegisterStateView sets the global StateView used by the default observable callback.
func RegisterStateView(v StateView) {
stateView.Store(v)
// If instruments are registered, ensure a callback exists.
if v != nil {
SetObservableCallback(func(ctx context.Context, o metric.Observer) error {
if any := stateView.Load(); any != nil {
if sv, ok := any.(StateView); ok {
for _, siteID := range sv.ListSites() {
observeSiteOnlineFor(o, sv, siteID)
observeLastHeartbeatFor(o, sv, siteID)
observeSessionsFor(o, siteID, sv)
}
}
}
return nil
})
}
}
func observeSiteOnlineFor(o metric.Observer, sv StateView, siteID string) {
if online, ok := sv.Online(siteID); ok {
val := int64(0)
if online {
val = 1
}
o.ObserveInt64(mSiteOnline, val, metric.WithAttributes(
attribute.String("site_id", siteID),
))
}
}
func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) {
if t, ok := sv.LastHeartbeat(siteID); ok {
ts := float64(t.UnixNano()) / 1e9
o.ObserveFloat64(mSiteLastHeartbeat, ts, metric.WithAttributes(
attribute.String("site_id", siteID),
))
}
}
func observeSessionsFor(o metric.Observer, siteID string, any interface{}) {
if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok {
sessions := tm.SessionsByTunnel()
// If tunnel_id labels are enabled, preserve existing per-tunnel observations
if ShouldIncludeTunnelID() {
for tid, n := range sessions {
attrs := []attribute.KeyValue{
attribute.String("site_id", siteID),
}
if tid != "" {
attrs = append(attrs, attribute.String("tunnel_id", tid))
}
o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attrs...))
}
return
}
// When tunnel_id is disabled, collapse per-tunnel counts into a single site-level value
var total int64
for _, n := range sessions {
total += n
}
// If there are no per-tunnel entries, fall back to ActiveSessions() if available
if total == 0 {
if svAny := stateView.Load(); svAny != nil {
if sv, ok := svAny.(StateView); ok {
if n, ok2 := sv.ActiveSessions(siteID); ok2 {
total = n
}
}
}
}
o.ObserveInt64(mTunnelSessions, total, metric.WithAttributes(attribute.String("site_id", siteID)))
return
}
}