Enhance telemetry metrics and context propagation

This commit is contained in:
Marc Schäfer
2025-10-10 18:17:59 +02:00
parent b6f5458ad9
commit 52e4a57cc1
6 changed files with 330 additions and 81 deletions

View File

@@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
@@ -42,6 +43,8 @@ type Client struct {
writeMux sync.Mutex
clientType string // Type of client (e.g., "newt", "olm")
tlsConfig TLSConfig
metricsCtxMu sync.RWMutex
metricsCtx context.Context
}
type ClientOption func(*Client)
@@ -85,6 +88,26 @@ func (c *Client) OnTokenUpdate(callback func(token string)) {
c.onTokenUpdate = callback
}
func (c *Client) metricsContext() context.Context {
c.metricsCtxMu.RLock()
defer c.metricsCtxMu.RUnlock()
if c.metricsCtx != nil {
return c.metricsCtx
}
return context.Background()
}
func (c *Client) setMetricsContext(ctx context.Context) {
c.metricsCtxMu.Lock()
c.metricsCtx = ctx
c.metricsCtxMu.Unlock()
}
// MetricsContext exposes the context used for telemetry emission when a connection is active.
func (c *Client) MetricsContext() context.Context {
return c.metricsContext()
}
// NewClient creates a new websocket client
func NewClient(clientType string, ID, secret string, endpoint string, pingInterval time.Duration, pingTimeout time.Duration, opts ...ClientOption) (*Client, error) {
config := &Config{
@@ -177,7 +200,7 @@ func (c *Client) SendMessage(messageType string, data interface{}) error {
if err := c.conn.WriteJSON(msg); err != nil {
return err
}
telemetry.IncWSMessage(context.Background(), "out", "text")
telemetry.IncWSMessage(c.metricsContext(), "out", "text")
return nil
}
@@ -273,8 +296,12 @@ func (c *Client) getToken() (string, error) {
return "", fmt.Errorf("failed to marshal token request data: %w", err)
}
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// Create a new request
req, err := http.NewRequest(
req, err := http.NewRequestWithContext(
ctx,
"POST",
baseEndpoint+"/api/v1/auth/"+c.clientType+"/get-token",
bytes.NewBuffer(jsonData),
@@ -296,7 +323,8 @@ func (c *Client) getToken() (string, error) {
}
resp, err := client.Do(req)
if err != nil {
telemetry.IncConnError(context.Background(), "auth", classifyConnError(err))
telemetry.IncConnAttempt(ctx, "auth", "failure")
telemetry.IncConnError(ctx, "auth", classifyConnError(err))
return "", fmt.Errorf("failed to request new token: %w", err)
}
defer resp.Body.Close()
@@ -304,15 +332,15 @@ func (c *Client) getToken() (string, error) {
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body)
logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
telemetry.IncConnAttempt(context.Background(), "auth", "failure")
telemetry.IncConnAttempt(ctx, "auth", "failure")
etype := "io_error"
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
etype = "auth_failed"
}
telemetry.IncConnError(context.Background(), "auth", etype)
telemetry.IncConnError(ctx, "auth", etype)
// Reconnect reason mapping for auth failures
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonAuthError)
telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonAuthError)
}
return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
}
@@ -332,7 +360,7 @@ func (c *Client) getToken() (string, error) {
}
logger.Debug("Received token: %s", tokenResp.Data.Token)
telemetry.IncConnAttempt(context.Background(), "auth", "success")
telemetry.IncConnAttempt(ctx, "auth", "success")
return tokenResp.Data.Token, nil
}
@@ -357,6 +385,30 @@ func classifyConnError(err error) string {
}
}
func classifyWSDisconnect(err error) (result, reason string) {
if err == nil {
return "success", "normal"
}
if websocket.IsCloseError(err, websocket.CloseNormalClosure) {
return "success", "normal"
}
if ne, ok := err.(net.Error); ok && ne.Timeout() {
return "error", "timeout"
}
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
return "error", "unexpected_close"
}
msg := strings.ToLower(err.Error())
switch {
case strings.Contains(msg, "eof"):
return "error", "eof"
case strings.Contains(msg, "reset"):
return "error", "connection_reset"
default:
return "error", "read_error"
}
}
func (c *Client) connectWithRetry() {
for {
select {
@@ -375,13 +427,13 @@ func (c *Client) connectWithRetry() {
}
func (c *Client) establishConnection() error {
ctx := context.Background()
// Get token for authentication
token, err := c.getToken()
if err != nil {
// telemetry: connection attempt failed before dialing
// site_id isn't globally available here; use client ID as site_id (low cardinality)
telemetry.IncConnAttempt(context.Background(), "websocket", "failure")
telemetry.IncConnError(context.Background(), "websocket", classifyConnError(err))
telemetry.IncConnAttempt(ctx, "websocket", "failure")
telemetry.IncConnError(ctx, "websocket", classifyConnError(err))
return fmt.Errorf("failed to get token: %w", err)
}
@@ -416,7 +468,7 @@ func (c *Client) establishConnection() error {
// Connect to WebSocket (optional span)
tr := otel.Tracer("newt")
spanCtx, span := tr.Start(context.Background(), "ws.connect")
ctx, span := tr.Start(ctx, "ws.connect")
defer span.End()
start := time.Now()
@@ -441,38 +493,40 @@ func (c *Client) establishConnection() error {
logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable")
}
conn, _, err := dialer.DialContext(spanCtx, u.String(), nil)
conn, _, err := dialer.DialContext(ctx, u.String(), nil)
lat := time.Since(start).Seconds()
if err != nil {
telemetry.IncConnAttempt(context.Background(), "websocket", "failure")
telemetry.IncConnAttempt(ctx, "websocket", "failure")
etype := classifyConnError(err)
telemetry.IncConnError(context.Background(), "websocket", etype)
telemetry.ObserveWSConnectLatency(context.Background(), lat, "failure", etype)
telemetry.IncConnError(ctx, "websocket", etype)
telemetry.ObserveWSConnectLatency(ctx, lat, "failure", etype)
// Map handshake-related errors to reconnect reasons where appropriate
if etype == "tls_handshake" {
telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonHandshakeError)
telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonHandshakeError)
} else if etype == "dial_timeout" {
telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonTimeout)
telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonTimeout)
} else {
telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonError)
telemetry.IncReconnect(ctx, c.config.ID, "client", telemetry.ReasonError)
}
return fmt.Errorf("failed to connect to WebSocket: %w", err)
}
telemetry.IncConnAttempt(context.Background(), "websocket", "success")
telemetry.ObserveWSConnectLatency(context.Background(), lat, "success", "")
telemetry.IncConnAttempt(ctx, "websocket", "success")
telemetry.ObserveWSConnectLatency(ctx, lat, "success", "")
c.conn = conn
c.setConnected(true)
c.setMetricsContext(ctx)
sessionStart := time.Now()
// Wire up pong handler for metrics
c.conn.SetPongHandler(func(appData string) error {
telemetry.IncWSMessage(context.Background(), "in", "pong")
telemetry.IncWSMessage(c.metricsContext(), "in", "pong")
return nil
})
// Start the ping monitor
go c.pingMonitor()
// Start the read pump with disconnect detection
go c.readPumpWithDisconnectDetection()
go c.readPumpWithDisconnectDetection(sessionStart)
if c.onConnect != nil {
err := c.saveConfig()
@@ -566,7 +620,7 @@ func (c *Client) pingMonitor() {
c.writeMux.Lock()
err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.pingTimeout))
if err == nil {
telemetry.IncWSMessage(context.Background(), "out", "ping")
telemetry.IncWSMessage(c.metricsContext(), "out", "ping")
}
c.writeMux.Unlock()
if err != nil {
@@ -577,6 +631,7 @@ func (c *Client) pingMonitor() {
return
default:
logger.Error("Ping failed: %v", err)
telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write")
c.reconnect()
return
}
@@ -586,11 +641,19 @@ func (c *Client) pingMonitor() {
}
// readPumpWithDisconnectDetection reads messages and triggers reconnect on error
func (c *Client) readPumpWithDisconnectDetection() {
func (c *Client) readPumpWithDisconnectDetection(started time.Time) {
ctx := c.metricsContext()
disconnectReason := "shutdown"
disconnectResult := "success"
defer func() {
if c.conn != nil {
c.conn.Close()
}
if !started.IsZero() {
telemetry.ObserveWSSessionDuration(ctx, time.Since(started).Seconds(), disconnectResult)
}
telemetry.IncWSDisconnect(ctx, disconnectReason, disconnectResult)
// Only attempt reconnect if we're not shutting down
select {
case <-c.done:
@@ -604,12 +667,14 @@ func (c *Client) readPumpWithDisconnectDetection() {
for {
select {
case <-c.done:
disconnectReason = "shutdown"
disconnectResult = "success"
return
default:
var msg WSMessage
err := c.conn.ReadJSON(&msg)
if err == nil {
telemetry.IncWSMessage(context.Background(), "in", "text")
telemetry.IncWSMessage(c.metricsContext(), "in", "text")
}
if err != nil {
// Check if we're shutting down before logging error
@@ -617,13 +682,18 @@ func (c *Client) readPumpWithDisconnectDetection() {
case <-c.done:
// Expected during shutdown, don't log as error
logger.Debug("WebSocket connection closed during shutdown")
disconnectReason = "shutdown"
disconnectResult = "success"
return
default:
// Unexpected error during normal operation
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
logger.Error("WebSocket read error: %v", err)
} else {
logger.Debug("WebSocket connection closed: %v", err)
disconnectResult, disconnectReason = classifyWSDisconnect(err)
if disconnectResult == "error" {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure, websocket.CloseNormalClosure) {
logger.Error("WebSocket read error: %v", err)
} else {
logger.Debug("WebSocket connection closed: %v", err)
}
}
return // triggers reconnect via defer
}