Instrument authentication and WebSocket connection logic for telemetry events

This commit is contained in:
Marc Schäfer
2025-10-07 09:13:04 +02:00
parent 0d55e35784
commit 660adcc72d

View File

@@ -18,6 +18,10 @@ import (
"github.com/fosrl/newt/logger" "github.com/fosrl/newt/logger"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
"context"
"github.com/fosrl/newt/internal/telemetry"
"go.opentelemetry.io/otel"
) )
type Client struct { type Client struct {
@@ -287,6 +291,7 @@ func (c *Client) getToken() (string, error) {
} }
resp, err := client.Do(req) resp, err := client.Do(req)
if err != nil { if err != nil {
telemetry.IncConnError(context.Background(), c.config.ID, "auth", classifyConnError(err))
return "", fmt.Errorf("failed to request new token: %w", err) return "", fmt.Errorf("failed to request new token: %w", err)
} }
defer resp.Body.Close() defer resp.Body.Close()
@@ -294,6 +299,18 @@ func (c *Client) getToken() (string, error) {
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) logger.Error("Failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "failure")
bin := "http_other"
if resp.StatusCode >= 500 {
bin = "http_5xx"
} else if resp.StatusCode >= 400 {
bin = "http_4xx"
}
telemetry.IncConnError(context.Background(), c.config.ID, "auth", bin)
// Reconnect reason mapping for auth failures
if resp.StatusCode == http.StatusUnauthorized || resp.StatusCode == http.StatusForbidden {
telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonAuthError)
}
return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body)) return "", fmt.Errorf("failed to get token with status code: %d, body: %s", resp.StatusCode, string(body))
} }
@@ -312,10 +329,33 @@ func (c *Client) getToken() (string, error) {
} }
logger.Debug("Received token: %s", tokenResp.Data.Token) logger.Debug("Received token: %s", tokenResp.Data.Token)
telemetry.IncConnAttempt(context.Background(), c.config.ID, "auth", "success")
return tokenResp.Data.Token, nil return tokenResp.Data.Token, nil
} }
// classifyConnError maps common errors to low-cardinality error_type labels
func classifyConnError(err error) string {
if err == nil {
return ""
}
msg := strings.ToLower(err.Error())
switch {
case strings.Contains(msg, "tls") || strings.Contains(msg, "certificate"):
return "tls"
case strings.Contains(msg, "timeout") || strings.Contains(msg, "i/o timeout"):
return "timeout"
case strings.Contains(msg, "no such host") || strings.Contains(msg, "dns"):
return "dns"
case strings.Contains(msg, "unauthorized") || strings.Contains(msg, "forbidden"):
return "auth"
case strings.Contains(msg, "broken pipe") || strings.Contains(msg, "connection reset") || strings.Contains(msg, "connection refused") || strings.Contains(msg, "use of closed network connection") || strings.Contains(msg, "network is unreachable"):
return "io"
default:
return "other"
}
}
func (c *Client) connectWithRetry() { func (c *Client) connectWithRetry() {
for { for {
select { select {
@@ -337,6 +377,10 @@ func (c *Client) establishConnection() error {
// Get token for authentication // Get token for authentication
token, err := c.getToken() token, err := c.getToken()
if err != nil { if err != nil {
// telemetry: connection attempt failed before dialing
// site_id isn't globally available here; use client ID as site_id (low cardinality)
telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure")
telemetry.IncConnError(context.Background(), c.config.ID, "websocket", classifyConnError(err))
return fmt.Errorf("failed to get token: %w", err) return fmt.Errorf("failed to get token: %w", err)
} }
@@ -369,7 +413,11 @@ func (c *Client) establishConnection() error {
q.Set("clientType", c.clientType) q.Set("clientType", c.clientType)
u.RawQuery = q.Encode() u.RawQuery = q.Encode()
// Connect to WebSocket // Connect to WebSocket (optional span)
tr := otel.Tracer("newt")
spanCtx, span := tr.Start(context.Background(), "ws.connect")
defer span.End()
dialer := websocket.DefaultDialer dialer := websocket.DefaultDialer
// Use new TLS configuration method // Use new TLS configuration method
@@ -391,11 +439,23 @@ func (c *Client) establishConnection() error {
logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable")
} }
conn, _, err := dialer.Dial(u.String(), nil) conn, _, err := dialer.DialContext(spanCtx, u.String(), nil)
if err != nil { if err != nil {
telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "failure")
etype := classifyConnError(err)
telemetry.IncConnError(context.Background(), c.config.ID, "websocket", etype)
// Map handshake-related errors to reconnect reasons where appropriate
if etype == "tls" {
telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonHandshakeError)
} else if etype == "timeout" {
telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonTimeout)
} else {
telemetry.IncReconnect(context.Background(), "", c.config.ID, telemetry.ReasonError)
}
return fmt.Errorf("failed to connect to WebSocket: %w", err) return fmt.Errorf("failed to connect to WebSocket: %w", err)
} }
telemetry.IncConnAttempt(context.Background(), c.config.ID, "websocket", "success")
c.conn = conn c.conn = conn
c.setConnected(true) c.setConnected(true)