use new lock

This commit is contained in:
aliamerj
2025-09-18 12:41:09 +03:00
parent 3027029f7b
commit 2eba4af5ba

View File

@@ -14,7 +14,6 @@ import (
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/go-multierror"
@@ -199,9 +198,10 @@ type Engine struct {
stateManager *statemanager.Manager
srWatcher *guard.SRWatcher
// Sync response persistence
persistSyncResponse atomic.Bool
latestSyncResponse atomic.Pointer[mgmProto.SyncResponse]
// Sync response persistence (protected by syncRespMux)
syncRespMux sync.RWMutex
persistSyncResponse bool
latestSyncResponse *mgmProto.SyncResponse
connSemaphore *semaphoregroup.SemaphoreGroup
flowManager nftypes.FlowManager
@@ -662,8 +662,10 @@ func (e *Engine) removePeer(peerKey string) error {
return nil
}
// 1. xyzzz
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock()
if update.GetNetbirdConfig() != nil {
wCfg := update.GetNetbirdConfig()
err := e.updateTURNs(wCfg.GetTurns())
@@ -702,11 +704,19 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
if nm == nil {
return nil
}
persistSyncResponse := e.persistSyncResponse.Load()
// Persist sync response under the dedicated lock (syncRespMux), not under syncMsgMux.
// Read the storage-enabled flag under the syncRespMux too.
e.syncRespMux.RLock()
enabled := e.persistSyncResponse
e.syncRespMux.RUnlock()
// Store sync response if persistence is enabled
if persistSyncResponse {
e.latestSyncResponse.Store(update)
if enabled {
e.syncRespMux.Lock()
e.latestSyncResponse = update
e.syncRespMux.Unlock()
log.Debugf("sync response persisted with serial %d", nm.GetSerial())
}
@@ -1816,34 +1826,38 @@ func (e *Engine) stopDNSServer() {
// SetSyncResponsePersistence enables or disables sync response persistence
func (e *Engine) SetSyncResponsePersistence(enabled bool) {
persistSyncResponse := e.persistSyncResponse.Load()
if enabled == persistSyncResponse {
e.syncRespMux.Lock()
defer e.syncRespMux.Unlock()
if enabled == e.persistSyncResponse {
return
}
e.persistSyncResponse.Store(enabled)
e.persistSyncResponse = enabled
log.Debugf("Sync response persistence is set to %t", enabled)
if !enabled {
e.latestSyncResponse.Store(nil)
e.latestSyncResponse = nil
}
}
// GetLatestSyncResponse returns the stored sync response if persistence is enabled
func (e *Engine) GetLatestSyncResponse() (*mgmProto.SyncResponse, error) {
persistSyncResponse := e.persistSyncResponse.Load()
e.syncRespMux.RLock()
enabled := e.persistSyncResponse
latest := e.latestSyncResponse
e.syncRespMux.RUnlock()
if !persistSyncResponse {
if !enabled {
return nil, errors.New("sync response persistence is disabled")
}
latestSyncResponse := e.latestSyncResponse.Load()
if latestSyncResponse == nil {
if latest == nil {
//nolint:nilnil
return nil, nil
}
log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(latestSyncResponse))
sr, ok := proto.Clone(latestSyncResponse).(*mgmProto.SyncResponse)
log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(latest))
sr, ok := proto.Clone(latest).(*mgmProto.SyncResponse)
if !ok {
return nil, fmt.Errorf("failed to clone sync response")
}