mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-17 15:56:39 +00:00
Compare commits
3 Commits
fix/merge-
...
add-accoun
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3945d2b170 | ||
|
|
0957defa54 | ||
|
|
0c6ab1de30 |
@@ -17,6 +17,8 @@ import (
|
||||
|
||||
const (
|
||||
ephemeralLifeTime = 10 * time.Minute
|
||||
// cleanupWindow is the time window to wait after nearest peer deadline to start the cleanup procedure.
|
||||
cleanupWindow = 1 * time.Minute
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -43,6 +45,9 @@ type EphemeralManager struct {
|
||||
tailPeer *ephemeralPeer
|
||||
peersLock sync.Mutex
|
||||
timer *time.Timer
|
||||
|
||||
lifeTime time.Duration
|
||||
cleanupWindow time.Duration
|
||||
}
|
||||
|
||||
// NewEphemeralManager instantiate new EphemeralManager
|
||||
@@ -50,6 +55,9 @@ func NewEphemeralManager(store store.Store, accountManager nbAccount.Manager) *E
|
||||
return &EphemeralManager{
|
||||
store: store,
|
||||
accountManager: accountManager,
|
||||
|
||||
lifeTime: ephemeralLifeTime,
|
||||
cleanupWindow: cleanupWindow,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +70,7 @@ func (e *EphemeralManager) LoadInitialPeers(ctx context.Context) {
|
||||
|
||||
e.loadEphemeralPeers(ctx)
|
||||
if e.headPeer != nil {
|
||||
e.timer = time.AfterFunc(ephemeralLifeTime, func() {
|
||||
e.timer = time.AfterFunc(e.lifeTime, func() {
|
||||
e.cleanup(ctx)
|
||||
})
|
||||
}
|
||||
@@ -115,9 +123,13 @@ func (e *EphemeralManager) OnPeerDisconnected(ctx context.Context, peer *nbpeer.
|
||||
return
|
||||
}
|
||||
|
||||
e.addPeer(peer.AccountID, peer.ID, newDeadLine())
|
||||
e.addPeer(peer.AccountID, peer.ID, e.newDeadLine())
|
||||
if e.timer == nil {
|
||||
e.timer = time.AfterFunc(e.headPeer.deadline.Sub(timeNow()), func() {
|
||||
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
|
||||
if delay < 0 {
|
||||
delay = 0
|
||||
}
|
||||
e.timer = time.AfterFunc(delay, func() {
|
||||
e.cleanup(ctx)
|
||||
})
|
||||
}
|
||||
@@ -130,7 +142,7 @@ func (e *EphemeralManager) loadEphemeralPeers(ctx context.Context) {
|
||||
return
|
||||
}
|
||||
|
||||
t := newDeadLine()
|
||||
t := e.newDeadLine()
|
||||
for _, p := range peers {
|
||||
e.addPeer(p.AccountID, p.ID, t)
|
||||
}
|
||||
@@ -160,7 +172,11 @@ func (e *EphemeralManager) cleanup(ctx context.Context) {
|
||||
}
|
||||
|
||||
if e.headPeer != nil {
|
||||
e.timer = time.AfterFunc(e.headPeer.deadline.Sub(timeNow()), func() {
|
||||
delay := e.headPeer.deadline.Sub(timeNow()) + e.cleanupWindow
|
||||
if delay < 0 {
|
||||
delay = 0
|
||||
}
|
||||
e.timer = time.AfterFunc(delay, func() {
|
||||
e.cleanup(ctx)
|
||||
})
|
||||
} else {
|
||||
@@ -236,6 +252,6 @@ func (e *EphemeralManager) isPeerOnList(id string) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func newDeadLine() time.Time {
|
||||
return timeNow().Add(ephemeralLifeTime)
|
||||
func (e *EphemeralManager) newDeadLine() time.Time {
|
||||
return timeNow().Add(e.lifeTime)
|
||||
}
|
||||
|
||||
@@ -3,9 +3,12 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
nbAccount "github.com/netbirdio/netbird/management/server/account"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
@@ -27,32 +30,65 @@ func (s *MockStore) GetAllEphemeralPeers(_ context.Context, _ store.LockingStren
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
type MocAccountManager struct {
|
||||
type MockAccountManager struct {
|
||||
mu sync.Mutex
|
||||
nbAccount.Manager
|
||||
store *MockStore
|
||||
store *MockStore
|
||||
deletePeerCalls int
|
||||
bufferUpdateCalls map[string]int
|
||||
wg *sync.WaitGroup
|
||||
}
|
||||
|
||||
func (a MocAccountManager) DeletePeer(_ context.Context, accountID, peerID, userID string) error {
|
||||
func (a *MockAccountManager) DeletePeer(_ context.Context, accountID, peerID, userID string) error {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
a.deletePeerCalls++
|
||||
if a.wg != nil {
|
||||
a.wg.Done()
|
||||
}
|
||||
delete(a.store.account.Peers, peerID)
|
||||
return nil //nolint:nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (a MocAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||
// noop
|
||||
func (a *MockAccountManager) GetDeletePeerCalls() int {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
return a.deletePeerCalls
|
||||
}
|
||||
|
||||
func (a MocAccountManager) GetStore() store.Store {
|
||||
func (a *MockAccountManager) BufferUpdateAccountPeers(ctx context.Context, accountID string) {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
if a.bufferUpdateCalls == nil {
|
||||
a.bufferUpdateCalls = make(map[string]int)
|
||||
}
|
||||
a.bufferUpdateCalls[accountID]++
|
||||
}
|
||||
|
||||
func (a *MockAccountManager) GetBufferUpdateCalls(accountID string) int {
|
||||
a.mu.Lock()
|
||||
defer a.mu.Unlock()
|
||||
if a.bufferUpdateCalls == nil {
|
||||
return 0
|
||||
}
|
||||
return a.bufferUpdateCalls[accountID]
|
||||
}
|
||||
|
||||
func (a *MockAccountManager) GetStore() store.Store {
|
||||
return a.store
|
||||
}
|
||||
|
||||
func TestNewManager(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
timeNow = time.Now
|
||||
})
|
||||
startTime := time.Now()
|
||||
timeNow = func() time.Time {
|
||||
return startTime
|
||||
}
|
||||
|
||||
store := &MockStore{}
|
||||
am := MocAccountManager{
|
||||
am := MockAccountManager{
|
||||
store: store,
|
||||
}
|
||||
|
||||
@@ -60,7 +96,7 @@ func TestNewManager(t *testing.T) {
|
||||
numberOfEphemeralPeers := 3
|
||||
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
|
||||
|
||||
mgr := NewEphemeralManager(store, am)
|
||||
mgr := NewEphemeralManager(store, &am)
|
||||
mgr.loadEphemeralPeers(context.Background())
|
||||
startTime = startTime.Add(ephemeralLifeTime + 1)
|
||||
mgr.cleanup(context.Background())
|
||||
@@ -71,13 +107,16 @@ func TestNewManager(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewManagerPeerConnected(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
timeNow = time.Now
|
||||
})
|
||||
startTime := time.Now()
|
||||
timeNow = func() time.Time {
|
||||
return startTime
|
||||
}
|
||||
|
||||
store := &MockStore{}
|
||||
am := MocAccountManager{
|
||||
am := MockAccountManager{
|
||||
store: store,
|
||||
}
|
||||
|
||||
@@ -85,7 +124,7 @@ func TestNewManagerPeerConnected(t *testing.T) {
|
||||
numberOfEphemeralPeers := 3
|
||||
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
|
||||
|
||||
mgr := NewEphemeralManager(store, am)
|
||||
mgr := NewEphemeralManager(store, &am)
|
||||
mgr.loadEphemeralPeers(context.Background())
|
||||
mgr.OnPeerConnected(context.Background(), store.account.Peers["ephemeral_peer_0"])
|
||||
|
||||
@@ -99,13 +138,16 @@ func TestNewManagerPeerConnected(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestNewManagerPeerDisconnected(t *testing.T) {
|
||||
t.Cleanup(func() {
|
||||
timeNow = time.Now
|
||||
})
|
||||
startTime := time.Now()
|
||||
timeNow = func() time.Time {
|
||||
return startTime
|
||||
}
|
||||
|
||||
store := &MockStore{}
|
||||
am := MocAccountManager{
|
||||
am := MockAccountManager{
|
||||
store: store,
|
||||
}
|
||||
|
||||
@@ -113,7 +155,7 @@ func TestNewManagerPeerDisconnected(t *testing.T) {
|
||||
numberOfEphemeralPeers := 3
|
||||
seedPeers(store, numberOfPeers, numberOfEphemeralPeers)
|
||||
|
||||
mgr := NewEphemeralManager(store, am)
|
||||
mgr := NewEphemeralManager(store, &am)
|
||||
mgr.loadEphemeralPeers(context.Background())
|
||||
for _, v := range store.account.Peers {
|
||||
mgr.OnPeerConnected(context.Background(), v)
|
||||
@@ -130,6 +172,36 @@ func TestNewManagerPeerDisconnected(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestCleanupSchedulingBehaviorIsBatched(t *testing.T) {
|
||||
const (
|
||||
ephemeralPeers = 10
|
||||
testLifeTime = 1 * time.Second
|
||||
testCleanupWindow = 100 * time.Millisecond
|
||||
)
|
||||
mockStore := &MockStore{}
|
||||
mockAM := &MockAccountManager{
|
||||
store: mockStore,
|
||||
}
|
||||
mockAM.wg = &sync.WaitGroup{}
|
||||
mockAM.wg.Add(ephemeralPeers)
|
||||
mgr := NewEphemeralManager(mockStore, mockAM)
|
||||
mgr.lifeTime = testLifeTime
|
||||
mgr.cleanupWindow = testCleanupWindow
|
||||
|
||||
account := newAccountWithId(context.Background(), "account", "", "", false)
|
||||
mockStore.account = account
|
||||
for i := range ephemeralPeers {
|
||||
p := &nbpeer.Peer{ID: fmt.Sprintf("peer-%d", i), AccountID: account.Id, Ephemeral: true}
|
||||
mockStore.account.Peers[p.ID] = p
|
||||
time.Sleep(testCleanupWindow / ephemeralPeers)
|
||||
mgr.OnPeerDisconnected(context.Background(), p)
|
||||
}
|
||||
mockAM.wg.Wait()
|
||||
assert.Len(t, mockStore.account.Peers, 0, "all ephemeral peers should be cleaned up after the lifetime")
|
||||
assert.Equal(t, 1, mockAM.GetBufferUpdateCalls(account.Id), "buffer update should be called once")
|
||||
assert.Equal(t, ephemeralPeers, mockAM.GetDeletePeerCalls(), "should have deleted only the first peer")
|
||||
}
|
||||
|
||||
func seedPeers(store *MockStore, numberOfPeers int, numberOfEphemeralPeers int) {
|
||||
store.account = newAccountWithId(context.Background(), "my account", "", "", false)
|
||||
|
||||
|
||||
@@ -1261,14 +1261,20 @@ func (am *DefaultAccountManager) BufferUpdateAccountPeers(ctx context.Context, a
|
||||
mu, _ := am.accountUpdateLocks.LoadOrStore(accountID, &sync.Mutex{})
|
||||
lock := mu.(*sync.Mutex)
|
||||
|
||||
log.WithContext(ctx).Debugf("try to BufferUpdateAccountPeers for account %s", accountID)
|
||||
|
||||
if !lock.TryLock() {
|
||||
log.WithContext(ctx).Debugf("BufferUpdateAccountPeers for an account %s locked - returning", accountID)
|
||||
return
|
||||
}
|
||||
|
||||
go func() {
|
||||
time.Sleep(time.Duration(am.updateAccountPeersBufferInterval.Load()))
|
||||
lock.Unlock()
|
||||
// time.Sleep(time.Duration(am.updateAccountPeersBufferInterval.Load()))
|
||||
defer lock.Unlock()
|
||||
log.WithContext(ctx).Debugf("BufferUpdateAccountPeers for an account %s - in progress", accountID)
|
||||
tn := time.Now()
|
||||
am.UpdateAccountPeers(ctx, accountID)
|
||||
log.WithContext(ctx).Debugf("BufferUpdateAccountPeers for an account %s - took %dms", accountID, time.Since(tn).Milliseconds())
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user