diff --git a/client/internal/engine.go b/client/internal/engine.go index de88ae7cb..4e847758d 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -14,7 +14,6 @@ import ( "sort" "strings" "sync" - "sync/atomic" "time" "github.com/hashicorp/go-multierror" @@ -199,9 +198,10 @@ type Engine struct { stateManager *statemanager.Manager srWatcher *guard.SRWatcher - // Sync response persistence - persistSyncResponse atomic.Bool - latestSyncResponse atomic.Pointer[mgmProto.SyncResponse] + // Sync response persistence (protected by syncRespMux) + syncRespMux sync.RWMutex + persistSyncResponse bool + latestSyncResponse *mgmProto.SyncResponse connSemaphore *semaphoregroup.SemaphoreGroup flowManager nftypes.FlowManager @@ -662,8 +662,10 @@ func (e *Engine) removePeer(peerKey string) error { return nil } -// 1. xyzzz func (e *Engine) handleSync(update *mgmProto.SyncResponse) error { + e.syncMsgMux.Lock() + defer e.syncMsgMux.Unlock() + if update.GetNetbirdConfig() != nil { wCfg := update.GetNetbirdConfig() err := e.updateTURNs(wCfg.GetTurns()) @@ -702,11 +704,19 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error { if nm == nil { return nil } - persistSyncResponse := e.persistSyncResponse.Load() + + // Persist sync response under the dedicated lock (syncRespMux), not under syncMsgMux. + // Read the storage-enabled flag under the syncRespMux too. + e.syncRespMux.RLock() + enabled := e.persistSyncResponse + e.syncRespMux.RUnlock() // Store sync response if persistence is enabled - if persistSyncResponse { - e.latestSyncResponse.Store(update) + if enabled { + e.syncRespMux.Lock() + e.latestSyncResponse = update + e.syncRespMux.Unlock() + log.Debugf("sync response persisted with serial %d", nm.GetSerial()) } @@ -1816,34 +1826,38 @@ func (e *Engine) stopDNSServer() { // SetSyncResponsePersistence enables or disables sync response persistence func (e *Engine) SetSyncResponsePersistence(enabled bool) { - persistSyncResponse := e.persistSyncResponse.Load() - if enabled == persistSyncResponse { + e.syncRespMux.Lock() + defer e.syncRespMux.Unlock() + + if enabled == e.persistSyncResponse { return } - e.persistSyncResponse.Store(enabled) + e.persistSyncResponse = enabled log.Debugf("Sync response persistence is set to %t", enabled) if !enabled { - e.latestSyncResponse.Store(nil) + e.latestSyncResponse = nil } } // GetLatestSyncResponse returns the stored sync response if persistence is enabled func (e *Engine) GetLatestSyncResponse() (*mgmProto.SyncResponse, error) { - persistSyncResponse := e.persistSyncResponse.Load() + e.syncRespMux.RLock() + enabled := e.persistSyncResponse + latest := e.latestSyncResponse + e.syncRespMux.RUnlock() - if !persistSyncResponse { + if !enabled { return nil, errors.New("sync response persistence is disabled") } - latestSyncResponse := e.latestSyncResponse.Load() - if latestSyncResponse == nil { + if latest == nil { //nolint:nilnil return nil, nil } - log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(latestSyncResponse)) - sr, ok := proto.Clone(latestSyncResponse).(*mgmProto.SyncResponse) + log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(latest)) + sr, ok := proto.Clone(latest).(*mgmProto.SyncResponse) if !ok { return nil, fmt.Errorf("failed to clone sync response") }