mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
Compare commits
5 Commits
v0.64.2
...
feature/up
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
06c5564dd2 | ||
|
|
cace564a19 | ||
|
|
d07c83c111 | ||
|
|
b0a5696bec | ||
|
|
dc24b9e276 |
@@ -196,14 +196,19 @@ func BuildManager(
|
|||||||
}
|
}
|
||||||
|
|
||||||
var initialInterval int64
|
var initialInterval int64
|
||||||
intervalStr := os.Getenv("PEER_UPDATE_INTERVAL_MS")
|
intervalStr := os.Getenv("NB_PEER_UPDATE_INTERVAL_MS")
|
||||||
interval, err := strconv.Atoi(intervalStr)
|
interval, err := strconv.Atoi(intervalStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
initialInterval = 1
|
initialInterval = 1
|
||||||
} else {
|
} else {
|
||||||
initialInterval = int64(interval) * 10
|
initialInterval = int64(interval) * 10
|
||||||
go func() {
|
go func() {
|
||||||
time.Sleep(30 * time.Second)
|
startupPeriodStr := os.Getenv("NB_PEER_UPDATE_STARTUP_PERIOD_S")
|
||||||
|
startupPeriod, err := strconv.Atoi(startupPeriodStr)
|
||||||
|
if err != nil {
|
||||||
|
startupPeriod = 1
|
||||||
|
}
|
||||||
|
time.Sleep(time.Duration(startupPeriod) * time.Second)
|
||||||
am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond))
|
am.updateAccountPeersBufferInterval.Store(int64(time.Duration(interval) * time.Millisecond))
|
||||||
log.WithContext(ctx).Infof("set peer update buffer interval to %dms", interval)
|
log.WithContext(ctx).Infof("set peer update buffer interval to %dms", interval)
|
||||||
}()
|
}()
|
||||||
@@ -1439,9 +1444,15 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
|
|||||||
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
peerUnlock := am.Store.AcquireWriteLockByUID(ctx, peerPubKey)
|
||||||
defer peerUnlock()
|
defer peerUnlock()
|
||||||
|
|
||||||
peer, netMap, postureChecks, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID)
|
// peer, netMap, postureChecks, err := am.SyncPeer(ctx, types.PeerSync{WireGuardPubKey: peerPubKey, Meta: meta}, accountID)
|
||||||
|
// if err != nil {
|
||||||
|
// return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err)
|
||||||
|
// }
|
||||||
|
|
||||||
|
peer, err := am.Store.GetPeerByPeerPubKey(ctx, store.LockingStrengthUpdate, peerPubKey)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, nil, fmt.Errorf("error syncing peer: %w", err)
|
log.WithContext(ctx).Errorf("error getting peer by pubkey %s: %v", peerPubKey, err)
|
||||||
|
return nil, nil, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, accountID)
|
err = am.MarkPeerConnected(ctx, peerPubKey, true, realIP, accountID)
|
||||||
@@ -1449,7 +1460,7 @@ func (am *DefaultAccountManager) SyncAndMarkPeer(ctx context.Context, accountID
|
|||||||
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
|
log.WithContext(ctx).Warnf("failed marking peer as connected %s %v", peerPubKey, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return peer, netMap, postureChecks, nil
|
return peer, &types.NetworkMap{}, []*posture.Checks{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error {
|
func (am *DefaultAccountManager) OnPeerDisconnected(ctx context.Context, accountID string, peerPubKey string) error {
|
||||||
|
|||||||
@@ -172,23 +172,23 @@ func (s *GRPCServer) Sync(req *proto.EncryptedMessage, srv proto.ManagementServi
|
|||||||
log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP)
|
log.WithContext(ctx).Tracef("peer system meta has to be provided on sync. Peer %s, remote addr %s", peerKey.String(), realIP)
|
||||||
}
|
}
|
||||||
|
|
||||||
peer, netMap, postureChecks, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP)
|
peer, _, _, err := s.accountManager.SyncAndMarkPeer(ctx, accountID, peerKey.String(), extractPeerMeta(ctx, syncReq.GetMeta()), realIP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
|
log.WithContext(ctx).Debugf("error while syncing peer %s: %v", peerKey.String(), err)
|
||||||
return mapError(ctx, err)
|
return mapError(ctx, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.sendInitialSync(ctx, peerKey, peer, netMap, postureChecks, srv)
|
// err = s.sendInitialSync(ctx, peerKey, peer, netMap, postureChecks, srv)
|
||||||
if err != nil {
|
// if err != nil {
|
||||||
log.WithContext(ctx).Debugf("error while sending initial sync for %s: %v", peerKey.String(), err)
|
// log.WithContext(ctx).Debugf("error while sending initial sync for %s: %v", peerKey.String(), err)
|
||||||
return err
|
// return err
|
||||||
}
|
// }
|
||||||
|
|
||||||
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
|
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
|
||||||
|
|
||||||
s.ephemeralManager.OnPeerConnected(ctx, peer)
|
// s.ephemeralManager.OnPeerConnected(ctx, peer)
|
||||||
|
|
||||||
s.secretsManager.SetupRefresh(ctx, accountID, peer.ID)
|
// s.secretsManager.SetupRefresh(ctx, accountID, peer.ID)
|
||||||
|
|
||||||
if s.appMetrics != nil {
|
if s.appMetrics != nil {
|
||||||
s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart))
|
s.appMetrics.GRPCMetrics().CountSyncRequestDuration(time.Since(reqStart))
|
||||||
|
|||||||
@@ -133,36 +133,41 @@ func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
|
|||||||
|
|
||||||
// AcquireWriteLockByUID acquires an ID lock for writing to a resource and returns a function that releases the lock
|
// AcquireWriteLockByUID acquires an ID lock for writing to a resource and returns a function that releases the lock
|
||||||
func (s *SqlStore) AcquireWriteLockByUID(ctx context.Context, uniqueID string) (unlock func()) {
|
func (s *SqlStore) AcquireWriteLockByUID(ctx context.Context, uniqueID string) (unlock func()) {
|
||||||
log.WithContext(ctx).Tracef("acquiring write lock for ID %s", uniqueID)
|
// log.WithContext(ctx).Tracef("acquiring write lock for ID %s", uniqueID)
|
||||||
|
//
|
||||||
start := time.Now()
|
// start := time.Now()
|
||||||
value, _ := s.resourceLocks.LoadOrStore(uniqueID, &sync.RWMutex{})
|
// value, _ := s.resourceLocks.LoadOrStore(uniqueID, &sync.RWMutex{})
|
||||||
mtx := value.(*sync.RWMutex)
|
// mtx := value.(*sync.RWMutex)
|
||||||
mtx.Lock()
|
// mtx.Lock()
|
||||||
|
//
|
||||||
unlock = func() {
|
// unlock = func() {
|
||||||
mtx.Unlock()
|
// mtx.Unlock()
|
||||||
log.WithContext(ctx).Tracef("released write lock for ID %s in %v", uniqueID, time.Since(start))
|
// log.WithContext(ctx).Tracef("released write lock for ID %s in %v", uniqueID, time.Since(start))
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return unlock
|
||||||
|
return func() {
|
||||||
|
// noop
|
||||||
}
|
}
|
||||||
|
|
||||||
return unlock
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// AcquireReadLockByUID acquires an ID lock for writing to a resource and returns a function that releases the lock
|
// AcquireReadLockByUID acquires an ID lock for writing to a resource and returns a function that releases the lock
|
||||||
func (s *SqlStore) AcquireReadLockByUID(ctx context.Context, uniqueID string) (unlock func()) {
|
func (s *SqlStore) AcquireReadLockByUID(ctx context.Context, uniqueID string) (unlock func()) {
|
||||||
log.WithContext(ctx).Tracef("acquiring read lock for ID %s", uniqueID)
|
// log.WithContext(ctx).Tracef("acquiring read lock for ID %s", uniqueID)
|
||||||
|
//
|
||||||
|
// start := time.Now()
|
||||||
|
// value, _ := s.resourceLocks.LoadOrStore(uniqueID, &sync.RWMutex{})
|
||||||
|
// mtx := value.(*sync.RWMutex)
|
||||||
|
// mtx.RLock()
|
||||||
|
//
|
||||||
|
// unlock = func() {
|
||||||
|
// mtx.RUnlock()
|
||||||
|
// log.WithContext(ctx).Tracef("released read lock for ID %s in %v", uniqueID, time.Since(start))
|
||||||
|
// }
|
||||||
|
|
||||||
start := time.Now()
|
return func() {
|
||||||
value, _ := s.resourceLocks.LoadOrStore(uniqueID, &sync.RWMutex{})
|
// noop
|
||||||
mtx := value.(*sync.RWMutex)
|
|
||||||
mtx.RLock()
|
|
||||||
|
|
||||||
unlock = func() {
|
|
||||||
mtx.RUnlock()
|
|
||||||
log.WithContext(ctx).Tracef("released read lock for ID %s in %v", uniqueID, time.Since(start))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return unlock
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *SqlStore) SaveAccount(ctx context.Context, account *types.Account) error {
|
func (s *SqlStore) SaveAccount(ctx context.Context, account *types.Account) error {
|
||||||
|
|||||||
Reference in New Issue
Block a user