mirror of
https://github.com/fosrl/newt.git
synced 2026-02-07 21:46:39 +00:00
107 lines
3.1 KiB
Go
107 lines
3.1 KiB
Go
package telemetry
|
|
|
|
import (
|
|
"context"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/metric"
|
|
)
|
|
|
|
// StateView provides a read-only view for observable gauges.
|
|
// Implementations must be concurrency-safe and avoid blocking operations.
|
|
// All methods should be fast and use RLocks where applicable.
|
|
type StateView interface {
|
|
// ListSites returns a stable, low-cardinality list of site IDs to expose.
|
|
ListSites() []string
|
|
// Online returns whether the site is online.
|
|
Online(siteID string) (online bool, ok bool)
|
|
// LastHeartbeat returns the last heartbeat time for a site.
|
|
LastHeartbeat(siteID string) (t time.Time, ok bool)
|
|
// ActiveSessions returns the current number of active sessions for a site (across tunnels),
|
|
// or scoped to site if your model is site-scoped.
|
|
ActiveSessions(siteID string) (n int64, ok bool)
|
|
}
|
|
|
|
var (
|
|
stateView atomic.Value // of type StateView
|
|
)
|
|
|
|
// RegisterStateView sets the global StateView used by the default observable callback.
|
|
func RegisterStateView(v StateView) {
|
|
stateView.Store(v)
|
|
// If instruments are registered, ensure a callback exists.
|
|
if v != nil {
|
|
SetObservableCallback(func(ctx context.Context, o metric.Observer) error {
|
|
if any := stateView.Load(); any != nil {
|
|
if sv, ok := any.(StateView); ok {
|
|
for _, siteID := range sv.ListSites() {
|
|
observeSiteOnlineFor(o, sv, siteID)
|
|
observeLastHeartbeatFor(o, sv, siteID)
|
|
observeSessionsFor(o, siteID, sv)
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
}
|
|
}
|
|
|
|
func observeSiteOnlineFor(o metric.Observer, sv StateView, siteID string) {
|
|
if online, ok := sv.Online(siteID); ok {
|
|
val := int64(0)
|
|
if online {
|
|
val = 1
|
|
}
|
|
o.ObserveInt64(mSiteOnline, val, metric.WithAttributes(
|
|
attribute.String("site_id", siteID),
|
|
))
|
|
}
|
|
}
|
|
|
|
func observeLastHeartbeatFor(o metric.Observer, sv StateView, siteID string) {
|
|
if t, ok := sv.LastHeartbeat(siteID); ok {
|
|
ts := float64(t.UnixNano()) / 1e9
|
|
o.ObserveFloat64(mSiteLastHeartbeat, ts, metric.WithAttributes(
|
|
attribute.String("site_id", siteID),
|
|
))
|
|
}
|
|
}
|
|
|
|
func observeSessionsFor(o metric.Observer, siteID string, any interface{}) {
|
|
if tm, ok := any.(interface{ SessionsByTunnel() map[string]int64 }); ok {
|
|
sessions := tm.SessionsByTunnel()
|
|
// If tunnel_id labels are enabled, preserve existing per-tunnel observations
|
|
if ShouldIncludeTunnelID() {
|
|
for tid, n := range sessions {
|
|
attrs := []attribute.KeyValue{
|
|
attribute.String("site_id", siteID),
|
|
}
|
|
if tid != "" {
|
|
attrs = append(attrs, attribute.String("tunnel_id", tid))
|
|
}
|
|
o.ObserveInt64(mTunnelSessions, n, metric.WithAttributes(attrs...))
|
|
}
|
|
return
|
|
}
|
|
// When tunnel_id is disabled, collapse per-tunnel counts into a single site-level value
|
|
var total int64
|
|
for _, n := range sessions {
|
|
total += n
|
|
}
|
|
// If there are no per-tunnel entries, fall back to ActiveSessions() if available
|
|
if total == 0 {
|
|
if svAny := stateView.Load(); svAny != nil {
|
|
if sv, ok := svAny.(StateView); ok {
|
|
if n, ok2 := sv.ActiveSessions(siteID); ok2 {
|
|
total = n
|
|
}
|
|
}
|
|
}
|
|
}
|
|
o.ObserveInt64(mTunnelSessions, total, metric.WithAttributes(attribute.String("site_id", siteID)))
|
|
return
|
|
}
|
|
}
|