Compare commits

...

3 Commits

3 changed files with 116 additions and 22 deletions

View File

@@ -17,6 +17,8 @@ import (
const (
ephemeralLifeTime = 10 * time.Minute
// cleanupWindow is the time window to wait after nearest peer deadline to start the cleanup procedure.
cleanupWindow = 1 * time.Minute
)
var (
@@ -43,6 +45,9 @@ type EphemeralManager struct {
tailPeer *ephemeralPeer
peersLock sync.Mutex
timer *time.Timer
lifeTime time.Duration
cleanupWindow time.Duration
}
// NewEphemeralManager instantiate new EphemeralManager
@@ -50,6 +55,9 @@ func NewEphemeralManager(store store.Store, accountManager nbAccount.Manager) *E
return &EphemeralManager{
store: store,
accountManager: accountManager,
lifeTime: ephemeralLifeTime,
cleanupWindow: cleanupWindow,
}
}
@@ -62,7 +70,7 @@ func (e *EphemeralManager) LoadInitialPeers(ctx context.Context) {
e.loadEphemeralPeers(ctx)
if e.headPeer != nil {
e.timer = time.AfterFunc(ephemeralLifeTime, func() {
e.timer = time.AfterFunc(e.lifeTime, func() {
e.cleanup(ctx)
})
}
@@ -115,9 +123,13 @@ func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer.
return
}
e.addPeer(peer.AccountID, peer.ID, newDeadLine())
e.addPeer(peer.AccountID, peer.ID, e.newDeadLine())
if e.timer == nil {
e.timer = time.AfterFunc(e.headPeer.deadline.Sub(timeNow()), func() {
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
if delay < 0 {
delay = 0
}
e.timer = time.AfterFunc(delay, func() {
e.cleanup(ctx)
})
}
@@ -130,7 +142,7 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
return
}
t := newDeadLine()
t := e.newDeadLine()
for _, p := range peers {
e.addPeer(p.AccountID, p.ID, t)
}
@@ -160,7 +172,11 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
}
if e.headPeer != nil {
e.timer = time.AfterFunc(e.headPeer.deadline.Sub(timeNow()), func() {
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
if delay < 0 {
delay = 0
}
e.timer = time.AfterFunc(delay, func() {
e.cleanup(ctx)
})
} else {
@@ -236,6 +252,6 @@ func (e *EphemeralManager) isPeerOnList(id string) bool {
return false
}
func newDeadLine() time.Time {
return timeNow().Add(ephemeralLifeTime)
func (e *EphemeralManager) newDeadLine() time.Time {
return timeNow().Add(e.lifeTime)
}

View File

@@ -3,9 +3,12 @@ package server
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
nbAccount "github.com/netbirdio/netbird/management/server/account"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store"
@@ -27,32 +30,65 @@ func (s *MockStore) GetAllEphemeralPeers(_ context.Context, _ store.LockingStren
return peers, nil
}
type MocAccountManager struct {
type MockAccountManager struct {
mu sync.Mutex
nbAccount.Manager
store *MockStore
store *MockStore
deletePeerCalls int
bufferUpdateCalls map[string]int
wg *sync.WaitGroup
}
func (a MocAccountManager) DeletePeer(_ context.Context, accountID, peerID, userID string) error {
func (a *MockAccountManager) DeletePeer(_ context.Context, accountID, peerID, userID string) error {
a.mu.Lock()
defer a.mu.Unlock()
a.deletePeerCalls++
if a.wg != nil {
a.wg.Done()
}
delete(a.store.account.Peers, peerID)
return nil //nolint:nil
return nil
}
func (a MocAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
// noop
func (a *MockAccountManager) GetDeletePeerCalls() int {
a.mu.Lock()
defer a.mu.Unlock()
return a.deletePeerCalls
}
func (a MocAccountManager) GetStore() store.Store {
func (a *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
a.mu.Lock()
defer a.mu.Unlock()
if a.bufferUpdateCalls == nil {
a.bufferUpdateCalls = make(map[string]int)
}
a.bufferUpdateCalls[accountID]++
}
func (a *MockAccountManager) GetBufferUpdateCalls(accountID string) int {
a.mu.Lock()
defer a.mu.Unlock()
if a.bufferUpdateCalls == nil {
return 0
}
return a.bufferUpdateCalls[accountID]
}
func (a *MockAccountManager) GetStore() store.Store {
return a.store
}
func TestNewManager(t *testing.T) {
t.Cleanup(func() {
timeNow = time.Now
})
startTime := time.Now()
timeNow = func() time.Time {
return startTime
}
store := &MockStore{}
am := MocAccountManager{
am := MockAccountManager{
store: store,
}
@@ -60,7 +96,7 @@ func TestNewManager(t *testing.T) {
numberOfEphemeralPeers := 3
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
mgr := NewEphemeralManager(store, am)
mgr := NewEphemeralManager(store, &am)
mgr.loadEphemeralPeers(context.Background())
startTime = startTime.Add(ephemeralLifeTime + 1)
mgr.cleanup(context.Background())
@@ -71,13 +107,16 @@ func TestNewManager(t *testing.T) {
}
func TestNewManagerPeerConnected(t *testing.T) {
t.Cleanup(func() {
timeNow = time.Now
})
startTime := time.Now()
timeNow = func() time.Time {
return startTime
}
store := &MockStore{}
am := MocAccountManager{
am := MockAccountManager{
store: store,
}
@@ -85,7 +124,7 @@ func TestNewManagerPeerConnected(t *testing.T) {
numberOfEphemeralPeers := 3
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
mgr := NewEphemeralManager(store, am)
mgr := NewEphemeralManager(store, &am)
mgr.loadEphemeralPeers(context.Background())
mgr.OnPeerConnected(context.Background(), store.account.Peers["ephemeral_peer_0"])
@@ -99,13 +138,16 @@ func TestNewManagerPeerConnected(t *testing.T) {
}
func TestNewManagerPeerDisconnected(t *testing.T) {
t.Cleanup(func() {
timeNow = time.Now
})
startTime := time.Now()
timeNow = func() time.Time {
return startTime
}
store := &MockStore{}
am := MocAccountManager{
am := MockAccountManager{
store: store,
}
@@ -113,7 +155,7 @@ func TestNewManagerPeerDisconnected(t *testing.T) {
numberOfEphemeralPeers := 3
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
mgr := NewEphemeralManager(store, am)
mgr := NewEphemeralManager(store, &am)
mgr.loadEphemeralPeers(context.Background())
for _, v := range store.account.Peers {
mgr.OnPeerConnected(context.Background(), v)
@@ -130,6 +172,36 @@ func TestNewManagerPeerDisconnected(t *testing.T) {
}
}
func TestCleanupSchedulingBehaviorIsBatched(t *testing.T) {
const (
ephemeralPeers = 10
testLifeTime = 1 * time.Second
testCleanupWindow = 100 * time.Millisecond
)
mockStore := &MockStore{}
mockAM := &MockAccountManager{
store: mockStore,
}
mockAM.wg = &sync.WaitGroup{}
mockAM.wg.Add(ephemeralPeers)
mgr := NewEphemeralManager(mockStore, mockAM)
mgr.lifeTime = testLifeTime
mgr.cleanupWindow = testCleanupWindow
account := newAccountWithId(context.Background(), "account", "", "", false)
mockStore.account = account
for i := range ephemeralPeers {
p := &nbpeer.Peer{ID: fmt.Sprintf("peer-%d", i), AccountID: account.Id, Ephemeral: true}
mockStore.account.Peers[p.ID] = p
time.Sleep(testCleanupWindow / ephemeralPeers)
mgr.OnPeerDisconnected(context.Background(), p)
}
mockAM.wg.Wait()
assert.Len(t, mockStore.account.Peers, 0, "all ephemeral peers should be cleaned up after the lifetime")
assert.Equal(t, 1, mockAM.GetBufferUpdateCalls(account.Id), "buffer update should be called once")
assert.Equal(t, ephemeralPeers, mockAM.GetDeletePeerCalls(), "should have deleted only the first peer")
}
func seedPeers(store *MockStore, numberOfPeers int, numberOfEphemeralPeers int) {
store.account = newAccountWithId(context.Background(), "my account", "", "", false)

View File

@@ -1261,14 +1261,20 @@ func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, a
mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{})
lock := mu.(*sync.Mutex)
log.WithContext(ctx).Debugf("try to BufferUpdateAccountPeers for account %s", accountID)
if !lock.TryLock() {
log.WithContext(ctx).Debugf("BufferUpdateAccountPeers for an account %s locked - returning", accountID)
return
}
go func() {
time.Sleep(time.Duration(am.updateAccountPeersBufferInterval.Load()))
lock.Unlock()
// time.Sleep(time.Duration(am.updateAccountPeersBufferInterval.Load()))
defer lock.Unlock()
log.WithContext(ctx).Debugf("BufferUpdateAccountPeers for an account %s - in progress", accountID)
tn := time.Now()
am.UpdateAccountPeers(ctx, accountID)
log.WithContext(ctx).Debugf("BufferUpdateAccountPeers for an account %s - took %dms", accountID, time.Since(tn).Milliseconds())
}()
}