diff --git a/internal/telemetry/metrics.go b/internal/telemetry/metrics.go index 2f4b005..2b332e4 100644 --- a/internal/telemetry/metrics.go +++ b/internal/telemetry/metrics.go @@ -36,12 +36,24 @@ var ( mConnErrors metric.Int64Counter // Config/Restart - mConfigReloads metric.Int64Counter - mRestartCount metric.Int64Counter + mConfigReloads metric.Int64Counter + mRestartCount metric.Int64Counter + mConfigApply metric.Float64Histogram + mCertRotationTotal metric.Int64Counter // Build info mBuildInfo metric.Int64ObservableGauge + // WebSocket + mWSConnectLatency metric.Float64Histogram + mWSMessages metric.Int64Counter + + // Proxy + mProxyActiveConns metric.Int64ObservableGauge + mProxyBufferBytes metric.Int64ObservableGauge + mProxyAsyncBacklogByte metric.Int64ObservableGauge + mProxyDropsTotal metric.Int64Counter + buildVersion string buildCommit string ) @@ -115,11 +127,31 @@ func registerInstruments() error { metric.WithDescription("Configuration reloads")) mRestartCount, _ = meter.Int64Counter("newt_restart_count_total", metric.WithDescription("Process restart count (incremented on start)")) + mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds", + metric.WithDescription("Configuration apply duration in seconds")) + mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total", + metric.WithDescription("Certificate rotation events (success/failure)")) // Build info gauge (value 1 with version/commit attributes) mBuildInfo, _ = meter.Int64ObservableGauge("newt_build_info", metric.WithDescription("Newt build information (value is always 1)")) + // WebSocket + mWSConnectLatency, _ = meter.Float64Histogram("newt_websocket_connect_latency_seconds", + metric.WithDescription("WebSocket connect latency in seconds")) + mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total", + metric.WithDescription("WebSocket messages by direction and type")) + + // Proxy + mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections", + metric.WithDescription("Proxy active connections per tunnel and protocol")) + mProxyBufferBytes, _ = meter.Int64ObservableGauge("newt_proxy_buffer_bytes", + metric.WithDescription("Proxy buffer bytes (may approximate async backlog)")) + mProxyAsyncBacklogByte, _ = meter.Int64ObservableGauge("newt_proxy_async_backlog_bytes", + metric.WithDescription("Unflushed async byte backlog per tunnel and protocol")) + mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total", + metric.WithDescription("Proxy drops due to write errors")) + // Register a default callback for build info if version/commit set meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { if buildVersion == "" && buildCommit == "" { @@ -145,8 +177,10 @@ func registerInstruments() error { // heartbeat seconds, and active sessions. var ( - obsOnce sync.Once - obsStopper func() + obsOnce sync.Once + obsStopper func() + proxyObsOnce sync.Once + proxyStopper func() ) // SetObservableCallback registers a single callback that will be invoked @@ -168,6 +202,14 @@ func SetObservableCallback(cb func(context.Context, metric.Observer) error) { }) } +// SetProxyObservableCallback registers a callback to observe proxy gauges. +func SetProxyObservableCallback(cb func(context.Context, metric.Observer) error) { + proxyObsOnce.Do(func() { + meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte) + proxyStopper = func() {} + }) +} + // Build info registration func RegisterBuildInfo(version, commit string) { buildVersion = version @@ -204,6 +246,62 @@ func AddTunnelBytesSet(ctx context.Context, n int64, attrs attribute.Set) { mTunnelBytes.Add(ctx, n, metric.WithAttributeSet(attrs)) } +// --- WebSocket helpers --- + +func ObserveWSConnectLatency(ctx context.Context, seconds float64, result, errorType string) { + attrs := []attribute.KeyValue{ + attribute.String("transport", "websocket"), + attribute.String("result", result), + } + if errorType != "" { + attrs = append(attrs, attribute.String("error_type", errorType)) + } + mWSConnectLatency.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(attrs...)...)) +} + +func IncWSMessage(ctx context.Context, direction, msgType string) { + mWSMessages.Add(ctx, 1, metric.WithAttributes(attrsWithSite( + attribute.String("direction", direction), + attribute.String("msg_type", msgType), + )...)) +} + +// --- Proxy helpers --- + +func ObserveProxyActiveConnsObs(o metric.Observer, value int64, attrs []attribute.KeyValue) { + o.ObserveInt64(mProxyActiveConns, value, metric.WithAttributes(attrs...)) +} + +func ObserveProxyBufferBytesObs(o metric.Observer, value int64, attrs []attribute.KeyValue) { + o.ObserveInt64(mProxyBufferBytes, value, metric.WithAttributes(attrs...)) +} + +func ObserveProxyAsyncBacklogObs(o metric.Observer, value int64, attrs []attribute.KeyValue) { + o.ObserveInt64(mProxyAsyncBacklogByte, value, metric.WithAttributes(attrs...)) +} + +func IncProxyDrops(ctx context.Context, tunnelID, protocol string) { + mProxyDropsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite( + attribute.String("tunnel_id", tunnelID), + attribute.String("protocol", protocol), + )...)) +} + +// --- Config/PKI helpers --- + +func ObserveConfigApply(ctx context.Context, phase, result string, seconds float64) { + mConfigApply.Record(ctx, seconds, metric.WithAttributes(attrsWithSite( + attribute.String("phase", phase), + attribute.String("result", result), + )...)) +} + +func IncCertRotation(ctx context.Context, result string) { + mCertRotationTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite( + attribute.String("result", result), + )...)) +} + func ObserveTunnelLatency(ctx context.Context, tunnelID, transport string, seconds float64) { mTunnelLatency.Record(ctx, seconds, metric.WithAttributes(attrsWithSite( attribute.String("tunnel_id", tunnelID), @@ -211,9 +309,10 @@ func ObserveTunnelLatency(ctx context.Context, tunnelID, transport string, secon )...)) } -func IncReconnect(ctx context.Context, tunnelID, reason string) { +func IncReconnect(ctx context.Context, tunnelID, initiator, reason string) { mReconnects.Add(ctx, 1, metric.WithAttributes(attrsWithSite( attribute.String("tunnel_id", tunnelID), + attribute.String("initiator", initiator), attribute.String("reason", reason), )...)) } diff --git a/internal/telemetry/telemetry.go b/internal/telemetry/telemetry.go index 30efd46..9b2a84c 100644 --- a/internal/telemetry/telemetry.go +++ b/internal/telemetry/telemetry.go @@ -195,7 +195,7 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) { AttributeFilter: func(kv attribute.KeyValue) bool { k := string(kv.Key) switch k { - case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type", "version", "commit", "site_id", "region": + case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "version", "commit", "site_id", "region": return true default: return false diff --git a/main.go b/main.go index 6c68431..8360888 100644 --- a/main.go +++ b/main.go @@ -730,7 +730,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) { logger.Info("Received reconnect message") if wgData.PublicKey != "" { - telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server_request") + telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server", telemetry.ReasonServerRequest) } // Close the WireGuard device and TUN @@ -759,7 +759,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) { logger.Info("Received termination message") if wgData.PublicKey != "" { - telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server_request") + telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server", telemetry.ReasonServerRequest) } // Close the WireGuard device and TUN diff --git a/proxy/manager.go b/proxy/manager.go index 2fd731e..9cdcf43 100644 --- a/proxy/manager.go +++ b/proxy/manager.go @@ -53,6 +53,9 @@ type tunnelEntry struct { bytesOutTCP atomic.Uint64 bytesInUDP atomic.Uint64 bytesOutUDP atomic.Uint64 + + activeTCP atomic.Int64 + activeUDP atomic.Int64 } // countingWriter wraps an io.Writer and adds bytes to OTel counter using a pre-built attribute set. @@ -256,6 +259,21 @@ func (pm *ProxyManager) RemoveTarget(proto, listenIP string, port int) error { // Start begins listening for all configured proxy targets func (pm *ProxyManager) Start() error { + // Register proxy observables once per process + telemetry.SetProxyObservableCallback(func(ctx context.Context, o metric.Observer) error { + pm.mutex.RLock() + defer pm.mutex.RUnlock() + for _, e := range pm.tunnels { + // active connections + telemetry.ObserveProxyActiveConnsObs(o, e.activeTCP.Load(), e.attrOutTCP.ToSlice()) + telemetry.ObserveProxyActiveConnsObs(o, e.activeUDP.Load(), e.attrOutUDP.ToSlice()) + // backlog bytes (sum of unflushed counters) + b := int64(e.bytesInTCP.Load()+e.bytesOutTCP.Load()+e.bytesInUDP.Load()+e.bytesOutUDP.Load()) + telemetry.ObserveProxyAsyncBacklogObs(o, b, e.attrOutTCP.ToSlice()) + telemetry.ObserveProxyBufferBytesObs(o, b, e.attrOutTCP.ToSlice()) + } + return nil + }) pm.mutex.Lock() defer pm.mutex.Unlock() @@ -462,6 +480,9 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) // Count sessions only once per accepted TCP connection if pm.currentTunnelID != "" { state.Global().IncSessions(pm.currentTunnelID) + if e := pm.getEntry(pm.currentTunnelID); e != nil { + e.activeTCP.Add(1) + } } go func() { @@ -500,6 +521,9 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string) wg.Wait() if pm.currentTunnelID != "" { state.Global().DecSessions(pm.currentTunnelID) + if e := pm.getEntry(pm.currentTunnelID); e != nil { + e.activeTCP.Add(-1) + } } }() } @@ -567,7 +591,10 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { continue } - targetConn, err = net.DialUDP("udp", nil, targetUDPAddr) + targetConn, err = net.DialUDP("udp", nil, targetUDPAddr) + if e := pm.getEntry(pm.currentTunnelID); e != nil { + e.activeUDP.Add(1) + } if err != nil { logger.Error("Error connecting to target: %v", err) continue @@ -584,6 +611,9 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { if storedConn, exists := clientConns[clientKey]; exists && storedConn == targetConn { delete(clientConns, clientKey) targetConn.Close() + if e := pm.getEntry(pm.currentTunnelID); e != nil { + e.activeUDP.Add(-1) + } } clientsMutex.Unlock() }() @@ -612,20 +642,22 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) { _, err = conn.WriteTo(buffer[:n], remoteAddr) if err != nil { logger.Error("Error writing to client: %v", err) + telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp") return // defer will handle cleanup } } }(clientKey, targetConn, remoteAddr) } - written, err := targetConn.Write(buffer[:n]) - if err != nil { - logger.Error("Error writing to target: %v", err) - targetConn.Close() - clientsMutex.Lock() - delete(clientConns, clientKey) - clientsMutex.Unlock() - } else if pm.currentTunnelID != "" && written > 0 { + written, err := targetConn.Write(buffer[:n]) + if err != nil { + logger.Error("Error writing to target: %v", err) + telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp") + targetConn.Close() + clientsMutex.Lock() + delete(clientConns, clientKey) + clientsMutex.Unlock() + } else if pm.currentTunnelID != "" && written > 0 { if pm.asyncBytes { if e := pm.getEntry(pm.currentTunnelID); e != nil { e.bytesInUDP.Add(uint64(written)) diff --git a/util.go b/util.go index 25cdb9d..64bf24d 100644 --- a/util.go +++ b/util.go @@ -291,12 +291,12 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien // More lenient threshold for declaring connection lost under load failureThreshold := 4 if consecutiveFailures >= failureThreshold && currentInterval < maxInterval { - if !connectionLost { - connectionLost = true - logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) - if tunnelID != "" { - telemetry.IncReconnect(context.Background(), tunnelID, telemetry.ReasonTimeout) - } + if !connectionLost { + connectionLost = true + logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures) + if tunnelID != "" { + telemetry.IncReconnect(context.Background(), tunnelID, "client", telemetry.ReasonTimeout) + } stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second) // Send registration message to the server for backward compatibility err := client.SendMessage("newt/wg/register", map[string]interface{}{ diff --git a/websocket/client.go b/websocket/client.go index e38a6c9..1f4dc49 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -173,7 +173,11 @@ func (c *Client) SendMessage(messageType string, data interface{}) error { c.writeMux.Lock() defer c.writeMux.Unlock() - return c.conn.WriteJSON(msg) + if err := c.conn.WriteJSON(msg); err != nil { + return err + } + telemetry.IncWSMessage(context.Background(), "out", "text") + return nil } func (c *Client) SendMessageInterval(messageType string, data interface{}, interval time.Duration) (stop func()) { @@ -418,6 +422,7 @@ func (c *Client) establishConnection() error { spanCtx, span := tr.Start(context.Background(), "ws.connect") defer span.End() + start := time.Now() dialer := websocket.DefaultDialer // Use new TLS configuration method @@ -440,24 +445,32 @@ func (c *Client) establishConnection() error { } conn, _, err := dialer.DialContext(spanCtx, u.String(), nil) + lat := time.Since(start).Seconds() if err != nil { telemetry.IncConnAttempt(context.Background(), "websocket", "failure") etype := classifyConnError(err) telemetry.IncConnError(context.Background(), "websocket", etype) + telemetry.ObserveWSConnectLatency(context.Background(), lat, "failure", etype) // Map handshake-related errors to reconnect reasons where appropriate if etype == "tls" { - telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonHandshakeError) + telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonHandshakeError) } else if etype == "timeout" { - telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonTimeout) + telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonTimeout) } else { - telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonError) + telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonError) } return fmt.Errorf("failed to connect to WebSocket: %w", err) } telemetry.IncConnAttempt(context.Background(), "websocket", "success") + telemetry.ObserveWSConnectLatency(context.Background(), lat, "success", "") c.conn = conn c.setConnected(true) + // Wire up pong handler for metrics + c.conn.SetPongHandler(func(appData string) error { + telemetry.IncWSMessage(context.Background(), "in", "pong") + return nil + }) // Start the ping monitor go c.pingMonitor() @@ -554,7 +567,10 @@ func (c *Client) pingMonitor() { return } c.writeMux.Lock() - err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.pingTimeout)) + err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.pingTimeout)) + if err == nil { + telemetry.IncWSMessage(context.Background(), "out", "ping") + } c.writeMux.Unlock() if err != nil { // Check if we're shutting down before logging error and reconnecting @@ -595,6 +611,9 @@ func (c *Client) readPumpWithDisconnectDetection() { default: var msg WSMessage err := c.conn.ReadJSON(&msg) + if err == nil { + telemetry.IncWSMessage(context.Background(), "in", "text") + } if err != nil { // Check if we're shutting down before logging error select { diff --git a/wg/wg.go b/wg/wg.go index 1607427..eccd64f 100644 --- a/wg/wg.go +++ b/wg/wg.go @@ -306,16 +306,24 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) { telemetry.IncConfigReload(context.Background(), "success") // Optional reconnect reason mapping: config change if s.serverPubKey != "" { - telemetry.IncReconnect(context.Background(), s.serverPubKey, telemetry.ReasonConfigChange) + telemetry.IncReconnect(context.Background(), s.serverPubKey, "client", telemetry.ReasonConfigChange) } // Ensure the WireGuard interface and peers are configured + start := time.Now() if err := s.ensureWireguardInterface(config); err != nil { logger.Error("Failed to ensure WireGuard interface: %v", err) + telemetry.ObserveConfigApply(context.Background(), "interface", "failure", time.Since(start).Seconds()) + } else { + telemetry.ObserveConfigApply(context.Background(), "interface", "success", time.Since(start).Seconds()) } + startPeers := time.Now() if err := s.ensureWireguardPeers(config.Peers); err != nil { logger.Error("Failed to ensure WireGuard peers: %v", err) + telemetry.ObserveConfigApply(context.Background(), "peer", "failure", time.Since(startPeers).Seconds()) + } else { + telemetry.ObserveConfigApply(context.Background(), "peer", "success", time.Since(startPeers).Seconds()) } }