diff --git a/websocket/client.go b/websocket/client.go index 8b8893a..dd3f39a 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -661,7 +661,57 @@ func (c *Client) setupPKCS12TLS() (*tls.Config, error) { } // pingMonitor sends pings at a short interval and triggers reconnect on failure +func (c *Client) sendPing() { + if c.conn == nil { + return + } + + // Skip ping if a message is currently being processed + c.processingMux.RLock() + isProcessing := c.processingMessage + c.processingMux.RUnlock() + if isProcessing { + logger.Debug("Skipping ping, message is being processed") + return + } + + c.configVersionMux.RLock() + configVersion := c.configVersion + c.configVersionMux.RUnlock() + + pingMsg := WSMessage{ + Type: "newt/ping", + Data: map[string]interface{}{}, + ConfigVersion: configVersion, + } + + c.writeMux.Lock() + err := c.conn.WriteJSON(pingMsg) + if err == nil { + telemetry.IncWSMessage(c.metricsContext(), "out", "ping") + } + c.writeMux.Unlock() + + if err != nil { + // Check if we're shutting down before logging error and reconnecting + select { + case <-c.done: + // Expected during shutdown + return + default: + logger.Error("Ping failed: %v", err) + telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write") + telemetry.IncWSReconnect(c.metricsContext(), "ping_write") + c.reconnect() + return + } + } +} + func (c *Client) pingMonitor() { + // Send an immediate ping as soon as we connect + c.sendPing() + ticker := time.NewTicker(c.pingInterval) defer ticker.Stop() @@ -670,50 +720,7 @@ func (c *Client) pingMonitor() { case <-c.done: return case <-ticker.C: - if c.conn == nil { - return - } - - // Skip ping if a message is currently being processed - c.processingMux.RLock() - isProcessing := c.processingMessage - c.processingMux.RUnlock() - if isProcessing { - logger.Debug("Skipping ping, message is being processed") - continue - } - - c.configVersionMux.RLock() - configVersion := c.configVersion - c.configVersionMux.RUnlock() - - pingMsg := WSMessage{ - Type: "newt/ping", - Data: map[string]interface{}{}, - ConfigVersion: configVersion, - } - - c.writeMux.Lock() - err := c.conn.WriteJSON(pingMsg) - if err == nil { - telemetry.IncWSMessage(c.metricsContext(), "out", "ping") - } - c.writeMux.Unlock() - - if err != nil { - // Check if we're shutting down before logging error and reconnecting - select { - case <-c.done: - // Expected during shutdown - return - default: - logger.Error("Ping failed: %v", err) - telemetry.IncWSKeepaliveFailure(c.metricsContext(), "ping_write") - telemetry.IncWSReconnect(c.metricsContext(), "ping_write") - c.reconnect() - return - } - } + c.sendPing() } } }