mirror of
https://github.com/fosrl/newt.git
synced 2026-03-26 12:36:45 +00:00
feat(phase2): websocket connect latency and message counters; proxy active/buffer/drops gauges and counters; config apply histogram; reconnect initiator label; update call-sites
This commit is contained in:
@@ -36,12 +36,24 @@ var (
|
||||
mConnErrors metric.Int64Counter
|
||||
|
||||
// Config/Restart
|
||||
mConfigReloads metric.Int64Counter
|
||||
mRestartCount metric.Int64Counter
|
||||
mConfigReloads metric.Int64Counter
|
||||
mRestartCount metric.Int64Counter
|
||||
mConfigApply metric.Float64Histogram
|
||||
mCertRotationTotal metric.Int64Counter
|
||||
|
||||
// Build info
|
||||
mBuildInfo metric.Int64ObservableGauge
|
||||
|
||||
// WebSocket
|
||||
mWSConnectLatency metric.Float64Histogram
|
||||
mWSMessages metric.Int64Counter
|
||||
|
||||
// Proxy
|
||||
mProxyActiveConns metric.Int64ObservableGauge
|
||||
mProxyBufferBytes metric.Int64ObservableGauge
|
||||
mProxyAsyncBacklogByte metric.Int64ObservableGauge
|
||||
mProxyDropsTotal metric.Int64Counter
|
||||
|
||||
buildVersion string
|
||||
buildCommit string
|
||||
)
|
||||
@@ -115,11 +127,31 @@ func registerInstruments() error {
|
||||
metric.WithDescription("Configuration reloads"))
|
||||
mRestartCount, _ = meter.Int64Counter("newt_restart_count_total",
|
||||
metric.WithDescription("Process restart count (incremented on start)"))
|
||||
mConfigApply, _ = meter.Float64Histogram("newt_config_apply_seconds",
|
||||
metric.WithDescription("Configuration apply duration in seconds"))
|
||||
mCertRotationTotal, _ = meter.Int64Counter("newt_cert_rotation_total",
|
||||
metric.WithDescription("Certificate rotation events (success/failure)"))
|
||||
|
||||
// Build info gauge (value 1 with version/commit attributes)
|
||||
mBuildInfo, _ = meter.Int64ObservableGauge("newt_build_info",
|
||||
metric.WithDescription("Newt build information (value is always 1)"))
|
||||
|
||||
// WebSocket
|
||||
mWSConnectLatency, _ = meter.Float64Histogram("newt_websocket_connect_latency_seconds",
|
||||
metric.WithDescription("WebSocket connect latency in seconds"))
|
||||
mWSMessages, _ = meter.Int64Counter("newt_websocket_messages_total",
|
||||
metric.WithDescription("WebSocket messages by direction and type"))
|
||||
|
||||
// Proxy
|
||||
mProxyActiveConns, _ = meter.Int64ObservableGauge("newt_proxy_active_connections",
|
||||
metric.WithDescription("Proxy active connections per tunnel and protocol"))
|
||||
mProxyBufferBytes, _ = meter.Int64ObservableGauge("newt_proxy_buffer_bytes",
|
||||
metric.WithDescription("Proxy buffer bytes (may approximate async backlog)"))
|
||||
mProxyAsyncBacklogByte, _ = meter.Int64ObservableGauge("newt_proxy_async_backlog_bytes",
|
||||
metric.WithDescription("Unflushed async byte backlog per tunnel and protocol"))
|
||||
mProxyDropsTotal, _ = meter.Int64Counter("newt_proxy_drops_total",
|
||||
metric.WithDescription("Proxy drops due to write errors"))
|
||||
|
||||
// Register a default callback for build info if version/commit set
|
||||
meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error {
|
||||
if buildVersion == "" && buildCommit == "" {
|
||||
@@ -145,8 +177,10 @@ func registerInstruments() error {
|
||||
// heartbeat seconds, and active sessions.
|
||||
|
||||
var (
|
||||
obsOnce sync.Once
|
||||
obsStopper func()
|
||||
obsOnce sync.Once
|
||||
obsStopper func()
|
||||
proxyObsOnce sync.Once
|
||||
proxyStopper func()
|
||||
)
|
||||
|
||||
// SetObservableCallback registers a single callback that will be invoked
|
||||
@@ -168,6 +202,14 @@ func SetObservableCallback(cb func(context.Context, metric.Observer) error) {
|
||||
})
|
||||
}
|
||||
|
||||
// SetProxyObservableCallback registers a callback to observe proxy gauges.
|
||||
func SetProxyObservableCallback(cb func(context.Context, metric.Observer) error) {
|
||||
proxyObsOnce.Do(func() {
|
||||
meter.RegisterCallback(cb, mProxyActiveConns, mProxyBufferBytes, mProxyAsyncBacklogByte)
|
||||
proxyStopper = func() {}
|
||||
})
|
||||
}
|
||||
|
||||
// Build info registration
|
||||
func RegisterBuildInfo(version, commit string) {
|
||||
buildVersion = version
|
||||
@@ -204,6 +246,62 @@ func AddTunnelBytesSet(ctx context.Context, n int64, attrs attribute.Set) {
|
||||
mTunnelBytes.Add(ctx, n, metric.WithAttributeSet(attrs))
|
||||
}
|
||||
|
||||
// --- WebSocket helpers ---
|
||||
|
||||
func ObserveWSConnectLatency(ctx context.Context, seconds float64, result, errorType string) {
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String("transport", "websocket"),
|
||||
attribute.String("result", result),
|
||||
}
|
||||
if errorType != "" {
|
||||
attrs = append(attrs, attribute.String("error_type", errorType))
|
||||
}
|
||||
mWSConnectLatency.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(attrs...)...))
|
||||
}
|
||||
|
||||
func IncWSMessage(ctx context.Context, direction, msgType string) {
|
||||
mWSMessages.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
|
||||
attribute.String("direction", direction),
|
||||
attribute.String("msg_type", msgType),
|
||||
)...))
|
||||
}
|
||||
|
||||
// --- Proxy helpers ---
|
||||
|
||||
func ObserveProxyActiveConnsObs(o metric.Observer, value int64, attrs []attribute.KeyValue) {
|
||||
o.ObserveInt64(mProxyActiveConns, value, metric.WithAttributes(attrs...))
|
||||
}
|
||||
|
||||
func ObserveProxyBufferBytesObs(o metric.Observer, value int64, attrs []attribute.KeyValue) {
|
||||
o.ObserveInt64(mProxyBufferBytes, value, metric.WithAttributes(attrs...))
|
||||
}
|
||||
|
||||
func ObserveProxyAsyncBacklogObs(o metric.Observer, value int64, attrs []attribute.KeyValue) {
|
||||
o.ObserveInt64(mProxyAsyncBacklogByte, value, metric.WithAttributes(attrs...))
|
||||
}
|
||||
|
||||
func IncProxyDrops(ctx context.Context, tunnelID, protocol string) {
|
||||
mProxyDropsTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
|
||||
attribute.String("tunnel_id", tunnelID),
|
||||
attribute.String("protocol", protocol),
|
||||
)...))
|
||||
}
|
||||
|
||||
// --- Config/PKI helpers ---
|
||||
|
||||
func ObserveConfigApply(ctx context.Context, phase, result string, seconds float64) {
|
||||
mConfigApply.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(
|
||||
attribute.String("phase", phase),
|
||||
attribute.String("result", result),
|
||||
)...))
|
||||
}
|
||||
|
||||
func IncCertRotation(ctx context.Context, result string) {
|
||||
mCertRotationTotal.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
|
||||
attribute.String("result", result),
|
||||
)...))
|
||||
}
|
||||
|
||||
func ObserveTunnelLatency(ctx context.Context, tunnelID, transport string, seconds float64) {
|
||||
mTunnelLatency.Record(ctx, seconds, metric.WithAttributes(attrsWithSite(
|
||||
attribute.String("tunnel_id", tunnelID),
|
||||
@@ -211,9 +309,10 @@ func ObserveTunnelLatency(ctx context.Context, tunnelID, transport string, secon
|
||||
)...))
|
||||
}
|
||||
|
||||
func IncReconnect(ctx context.Context, tunnelID, reason string) {
|
||||
func IncReconnect(ctx context.Context, tunnelID, initiator, reason string) {
|
||||
mReconnects.Add(ctx, 1, metric.WithAttributes(attrsWithSite(
|
||||
attribute.String("tunnel_id", tunnelID),
|
||||
attribute.String("initiator", initiator),
|
||||
attribute.String("reason", reason),
|
||||
)...))
|
||||
}
|
||||
|
||||
@@ -195,7 +195,7 @@ func Init(ctx context.Context, cfg Config) (*Setup, error) {
|
||||
AttributeFilter: func(kv attribute.KeyValue) bool {
|
||||
k := string(kv.Key)
|
||||
switch k {
|
||||
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "error_type", "version", "commit", "site_id", "region":
|
||||
case "tunnel_id", "transport", "direction", "protocol", "result", "reason", "initiator", "error_type", "version", "commit", "site_id", "region":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
|
||||
4
main.go
4
main.go
@@ -730,7 +730,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
client.RegisterHandler("newt/wg/reconnect", func(msg websocket.WSMessage) {
|
||||
logger.Info("Received reconnect message")
|
||||
if wgData.PublicKey != "" {
|
||||
telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server_request")
|
||||
telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server", telemetry.ReasonServerRequest)
|
||||
}
|
||||
|
||||
// Close the WireGuard device and TUN
|
||||
@@ -759,7 +759,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
client.RegisterHandler("newt/wg/terminate", func(msg websocket.WSMessage) {
|
||||
logger.Info("Received termination message")
|
||||
if wgData.PublicKey != "" {
|
||||
telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server_request")
|
||||
telemetry.IncReconnect(context.Background(), wgData.PublicKey, "server", telemetry.ReasonServerRequest)
|
||||
}
|
||||
|
||||
// Close the WireGuard device and TUN
|
||||
|
||||
@@ -53,6 +53,9 @@ type tunnelEntry struct {
|
||||
bytesOutTCP atomic.Uint64
|
||||
bytesInUDP atomic.Uint64
|
||||
bytesOutUDP atomic.Uint64
|
||||
|
||||
activeTCP atomic.Int64
|
||||
activeUDP atomic.Int64
|
||||
}
|
||||
|
||||
// countingWriter wraps an io.Writer and adds bytes to OTel counter using a pre-built attribute set.
|
||||
@@ -256,6 +259,21 @@ func (pm *ProxyManager) RemoveTarget(proto, listenIP string, port int) error {
|
||||
|
||||
// Start begins listening for all configured proxy targets
|
||||
func (pm *ProxyManager) Start() error {
|
||||
// Register proxy observables once per process
|
||||
telemetry.SetProxyObservableCallback(func(ctx context.Context, o metric.Observer) error {
|
||||
pm.mutex.RLock()
|
||||
defer pm.mutex.RUnlock()
|
||||
for _, e := range pm.tunnels {
|
||||
// active connections
|
||||
telemetry.ObserveProxyActiveConnsObs(o, e.activeTCP.Load(), e.attrOutTCP.ToSlice())
|
||||
telemetry.ObserveProxyActiveConnsObs(o, e.activeUDP.Load(), e.attrOutUDP.ToSlice())
|
||||
// backlog bytes (sum of unflushed counters)
|
||||
b := int64(e.bytesInTCP.Load()+e.bytesOutTCP.Load()+e.bytesInUDP.Load()+e.bytesOutUDP.Load())
|
||||
telemetry.ObserveProxyAsyncBacklogObs(o, b, e.attrOutTCP.ToSlice())
|
||||
telemetry.ObserveProxyBufferBytesObs(o, b, e.attrOutTCP.ToSlice())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
pm.mutex.Lock()
|
||||
defer pm.mutex.Unlock()
|
||||
|
||||
@@ -462,6 +480,9 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
|
||||
// Count sessions only once per accepted TCP connection
|
||||
if pm.currentTunnelID != "" {
|
||||
state.Global().IncSessions(pm.currentTunnelID)
|
||||
if e := pm.getEntry(pm.currentTunnelID); e != nil {
|
||||
e.activeTCP.Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
@@ -500,6 +521,9 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
|
||||
wg.Wait()
|
||||
if pm.currentTunnelID != "" {
|
||||
state.Global().DecSessions(pm.currentTunnelID)
|
||||
if e := pm.getEntry(pm.currentTunnelID); e != nil {
|
||||
e.activeTCP.Add(-1)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
@@ -567,7 +591,10 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
||||
continue
|
||||
}
|
||||
|
||||
targetConn, err = net.DialUDP("udp", nil, targetUDPAddr)
|
||||
targetConn, err = net.DialUDP("udp", nil, targetUDPAddr)
|
||||
if e := pm.getEntry(pm.currentTunnelID); e != nil {
|
||||
e.activeUDP.Add(1)
|
||||
}
|
||||
if err != nil {
|
||||
logger.Error("Error connecting to target: %v", err)
|
||||
continue
|
||||
@@ -584,6 +611,9 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
||||
if storedConn, exists := clientConns[clientKey]; exists && storedConn == targetConn {
|
||||
delete(clientConns, clientKey)
|
||||
targetConn.Close()
|
||||
if e := pm.getEntry(pm.currentTunnelID); e != nil {
|
||||
e.activeUDP.Add(-1)
|
||||
}
|
||||
}
|
||||
clientsMutex.Unlock()
|
||||
}()
|
||||
@@ -612,20 +642,22 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
||||
_, err = conn.WriteTo(buffer[:n], remoteAddr)
|
||||
if err != nil {
|
||||
logger.Error("Error writing to client: %v", err)
|
||||
telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp")
|
||||
return // defer will handle cleanup
|
||||
}
|
||||
}
|
||||
}(clientKey, targetConn, remoteAddr)
|
||||
}
|
||||
|
||||
written, err := targetConn.Write(buffer[:n])
|
||||
if err != nil {
|
||||
logger.Error("Error writing to target: %v", err)
|
||||
targetConn.Close()
|
||||
clientsMutex.Lock()
|
||||
delete(clientConns, clientKey)
|
||||
clientsMutex.Unlock()
|
||||
} else if pm.currentTunnelID != "" && written > 0 {
|
||||
written, err := targetConn.Write(buffer[:n])
|
||||
if err != nil {
|
||||
logger.Error("Error writing to target: %v", err)
|
||||
telemetry.IncProxyDrops(context.Background(), pm.currentTunnelID, "udp")
|
||||
targetConn.Close()
|
||||
clientsMutex.Lock()
|
||||
delete(clientConns, clientKey)
|
||||
clientsMutex.Unlock()
|
||||
} else if pm.currentTunnelID != "" && written > 0 {
|
||||
if pm.asyncBytes {
|
||||
if e := pm.getEntry(pm.currentTunnelID); e != nil {
|
||||
e.bytesInUDP.Add(uint64(written))
|
||||
|
||||
12
util.go
12
util.go
@@ -291,12 +291,12 @@ func startPingCheck(tnet *netstack.Net, serverIP string, client *websocket.Clien
|
||||
// More lenient threshold for declaring connection lost under load
|
||||
failureThreshold := 4
|
||||
if consecutiveFailures >= failureThreshold && currentInterval < maxInterval {
|
||||
if !connectionLost {
|
||||
connectionLost = true
|
||||
logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures)
|
||||
if tunnelID != "" {
|
||||
telemetry.IncReconnect(context.Background(), tunnelID, telemetry.ReasonTimeout)
|
||||
}
|
||||
if !connectionLost {
|
||||
connectionLost = true
|
||||
logger.Warn("Connection to server lost after %d failures. Continuous reconnection attempts will be made.", consecutiveFailures)
|
||||
if tunnelID != "" {
|
||||
telemetry.IncReconnect(context.Background(), tunnelID, "client", telemetry.ReasonTimeout)
|
||||
}
|
||||
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second)
|
||||
// Send registration message to the server for backward compatibility
|
||||
err := client.SendMessage("newt/wg/register", map[string]interface{}{
|
||||
|
||||
@@ -173,7 +173,11 @@ func (c *Client) SendMessage(messageType string, data interface{}) error {
|
||||
|
||||
c.writeMux.Lock()
|
||||
defer c.writeMux.Unlock()
|
||||
return c.conn.WriteJSON(msg)
|
||||
if err := c.conn.WriteJSON(msg); err != nil {
|
||||
return err
|
||||
}
|
||||
telemetry.IncWSMessage(context.Background(), "out", "text")
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) SendMessageInterval(messageType string, data interface{}, interval time.Duration) (stop func()) {
|
||||
@@ -418,6 +422,7 @@ func (c *Client) establishConnection() error {
|
||||
spanCtx, span := tr.Start(context.Background(), "ws.connect")
|
||||
defer span.End()
|
||||
|
||||
start := time.Now()
|
||||
dialer := websocket.DefaultDialer
|
||||
|
||||
// Use new TLS configuration method
|
||||
@@ -440,24 +445,32 @@ func (c *Client) establishConnection() error {
|
||||
}
|
||||
|
||||
conn, _, err := dialer.DialContext(spanCtx, u.String(), nil)
|
||||
lat := time.Since(start).Seconds()
|
||||
if err != nil {
|
||||
telemetry.IncConnAttempt(context.Background(), "websocket", "failure")
|
||||
etype := classifyConnError(err)
|
||||
telemetry.IncConnError(context.Background(), "websocket", etype)
|
||||
telemetry.ObserveWSConnectLatency(context.Background(), lat, "failure", etype)
|
||||
// Map handshake-related errors to reconnect reasons where appropriate
|
||||
if etype == "tls" {
|
||||
telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonHandshakeError)
|
||||
telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonHandshakeError)
|
||||
} else if etype == "timeout" {
|
||||
telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonTimeout)
|
||||
telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonTimeout)
|
||||
} else {
|
||||
telemetry.IncReconnect(context.Background(), c.config.ID, telemetry.ReasonError)
|
||||
telemetry.IncReconnect(context.Background(), c.config.ID, "client", telemetry.ReasonError)
|
||||
}
|
||||
return fmt.Errorf("failed to connect to WebSocket: %w", err)
|
||||
}
|
||||
|
||||
telemetry.IncConnAttempt(context.Background(), "websocket", "success")
|
||||
telemetry.ObserveWSConnectLatency(context.Background(), lat, "success", "")
|
||||
c.conn = conn
|
||||
c.setConnected(true)
|
||||
// Wire up pong handler for metrics
|
||||
c.conn.SetPongHandler(func(appData string) error {
|
||||
telemetry.IncWSMessage(context.Background(), "in", "pong")
|
||||
return nil
|
||||
})
|
||||
|
||||
// Start the ping monitor
|
||||
go c.pingMonitor()
|
||||
@@ -554,7 +567,10 @@ func (c *Client) pingMonitor() {
|
||||
return
|
||||
}
|
||||
c.writeMux.Lock()
|
||||
err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.pingTimeout))
|
||||
err := c.conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(c.pingTimeout))
|
||||
if err == nil {
|
||||
telemetry.IncWSMessage(context.Background(), "out", "ping")
|
||||
}
|
||||
c.writeMux.Unlock()
|
||||
if err != nil {
|
||||
// Check if we're shutting down before logging error and reconnecting
|
||||
@@ -595,6 +611,9 @@ func (c *Client) readPumpWithDisconnectDetection() {
|
||||
default:
|
||||
var msg WSMessage
|
||||
err := c.conn.ReadJSON(&msg)
|
||||
if err == nil {
|
||||
telemetry.IncWSMessage(context.Background(), "in", "text")
|
||||
}
|
||||
if err != nil {
|
||||
// Check if we're shutting down before logging error
|
||||
select {
|
||||
|
||||
10
wg/wg.go
10
wg/wg.go
@@ -306,16 +306,24 @@ func (s *WireGuardService) handleConfig(msg websocket.WSMessage) {
|
||||
telemetry.IncConfigReload(context.Background(), "success")
|
||||
// Optional reconnect reason mapping: config change
|
||||
if s.serverPubKey != "" {
|
||||
telemetry.IncReconnect(context.Background(), s.serverPubKey, telemetry.ReasonConfigChange)
|
||||
telemetry.IncReconnect(context.Background(), s.serverPubKey, "client", telemetry.ReasonConfigChange)
|
||||
}
|
||||
|
||||
// Ensure the WireGuard interface and peers are configured
|
||||
start := time.Now()
|
||||
if err := s.ensureWireguardInterface(config); err != nil {
|
||||
logger.Error("Failed to ensure WireGuard interface: %v", err)
|
||||
telemetry.ObserveConfigApply(context.Background(), "interface", "failure", time.Since(start).Seconds())
|
||||
} else {
|
||||
telemetry.ObserveConfigApply(context.Background(), "interface", "success", time.Since(start).Seconds())
|
||||
}
|
||||
|
||||
startPeers := time.Now()
|
||||
if err := s.ensureWireguardPeers(config.Peers); err != nil {
|
||||
logger.Error("Failed to ensure WireGuard peers: %v", err)
|
||||
telemetry.ObserveConfigApply(context.Background(), "peer", "failure", time.Since(startPeers).Seconds())
|
||||
} else {
|
||||
telemetry.ObserveConfigApply(context.Background(), "peer", "success", time.Since(startPeers).Seconds())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user