diff --git a/websocket/client.go b/websocket/client.go index 8703b51..8b64e21 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -47,8 +47,11 @@ type Client struct { metricsCtx context.Context configNeedsSave bool // Flag to track if config needs to be saved serverVersion string - configVersion int64 // Latest config version received from server + configVersion int64 // Latest config version received from server configVersionMux sync.RWMutex + processingMessage bool // Flag to track if a message is currently being processed + processingMux sync.RWMutex // Protects processingMessage + processingWg sync.WaitGroup // WaitGroup to wait for message processing to complete } type ClientOption func(*Client) @@ -163,13 +166,11 @@ func (c *Client) GetConfigVersion() int64 { return c.configVersion } -// setConfigVersion updates the config version if the new version is higher +// setConfigVersion updates the config version func (c *Client) setConfigVersion(version int64) { c.configVersionMux.Lock() defer c.configVersionMux.Unlock() - if version > c.configVersion { - c.configVersion = version - } + c.configVersion = version } // Connect establishes the WebSocket connection @@ -672,12 +673,21 @@ func (c *Client) pingMonitor() { return } + // Skip ping if a message is currently being processed + c.processingMux.RLock() + isProcessing := c.processingMessage + c.processingMux.RUnlock() + if isProcessing { + logger.Debug("Skipping ping, message is being processed") + continue + } + c.configVersionMux.RLock() configVersion := c.configVersion c.configVersionMux.RUnlock() pingMsg := WSMessage{ - Type: "ping", + Type: "newt/ping", Data: map[string]interface{}{}, ConfigVersion: configVersion, } @@ -767,14 +777,24 @@ func (c *Client) readPumpWithDisconnectDetection(started time.Time) { } } - // Extract and update config version from message if present - if msg.ConfigVersion > 0 { - c.setConfigVersion(msg.ConfigVersion) - } + // Update config version from incoming message + c.setConfigVersion(msg.ConfigVersion) c.handlersMux.RLock() if handler, ok := c.handlers[msg.Type]; ok { + // Mark that we're processing a message + c.processingMux.Lock() + c.processingMessage = true + c.processingMux.Unlock() + c.processingWg.Add(1) + handler(msg) + + // Mark that we're done processing + c.processingWg.Done() + c.processingMux.Lock() + c.processingMessage = false + c.processingMux.Unlock() } c.handlersMux.RUnlock() }