mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-27 20:56:44 +00:00
Compare commits
16 Commits
test/multi
...
feature/po
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
546538f570 | ||
|
|
9307e7e0ea | ||
|
|
9691e197df | ||
|
|
2a51609436 | ||
|
|
d380c925c2 | ||
|
|
a22a6f6d26 | ||
|
|
83457f8b99 | ||
|
|
55e7ca96df | ||
|
|
b45284f086 | ||
|
|
2d401a7dce | ||
|
|
e9016aecea | ||
|
|
d7e68ff812 | ||
|
|
f3a5e34c3f | ||
|
|
b3c0b46a88 | ||
|
|
6c7a8a7741 | ||
|
|
93d8d272bf |
@@ -198,6 +198,7 @@ func (s *ICEBind) createIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UDPConn, r
|
|||||||
if sizes[i] == 0 {
|
if sizes[i] == 0 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
log.Infof("--- received Datagram %s from %s, size: %d", msg.Addr, msg.Addr.String(), sizes[i])
|
||||||
addrPort := msg.Addr.(*net.UDPAddr).AddrPort()
|
addrPort := msg.Addr.(*net.UDPAddr).AddrPort()
|
||||||
ep := &wgConn.StdNetEndpoint{AddrPort: addrPort} // TODO: remove allocation
|
ep := &wgConn.StdNetEndpoint{AddrPort: addrPort} // TODO: remove allocation
|
||||||
wgConn.GetSrcFromControl(msg.OOB[:msg.NN], ep)
|
wgConn.GetSrcFromControl(msg.OOB[:msg.NN], ep)
|
||||||
|
|||||||
@@ -133,7 +133,7 @@ func (e *ConnMgr) SetExcludeList(ctx context.Context, peerIDs map[string]bool) {
|
|||||||
excludedPeers = append(excludedPeers, lazyPeerCfg)
|
excludedPeers = append(excludedPeers, lazyPeerCfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
added := e.lazyConnMgr.ExcludePeer(e.lazyCtx, excludedPeers)
|
added := e.lazyConnMgr.ExcludePeer(excludedPeers)
|
||||||
for _, peerID := range added {
|
for _, peerID := range added {
|
||||||
var peerConn *peer.Conn
|
var peerConn *peer.Conn
|
||||||
var exists bool
|
var exists bool
|
||||||
@@ -201,7 +201,7 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close(false)
|
||||||
|
|
||||||
if !e.isStartedWithLazyMgr() {
|
if !e.isStartedWithLazyMgr() {
|
||||||
return
|
return
|
||||||
@@ -211,23 +211,25 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) {
|
|||||||
conn.Log.Infof("removed peer from lazy conn manager")
|
conn.Log.Infof("removed peer from lazy conn manager")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ConnMgr) OnSignalMsg(ctx context.Context, peerKey string) (*peer.Conn, bool) {
|
func (e *ConnMgr) ActivatePeer(ctx context.Context, conn *peer.Conn) {
|
||||||
conn, ok := e.peerStore.PeerConn(peerKey)
|
|
||||||
if !ok {
|
|
||||||
return nil, false
|
|
||||||
}
|
|
||||||
|
|
||||||
if !e.isStartedWithLazyMgr() {
|
if !e.isStartedWithLazyMgr() {
|
||||||
return conn, true
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if found := e.lazyConnMgr.ActivatePeer(e.lazyCtx, peerKey); found {
|
if found := e.lazyConnMgr.ActivatePeer(conn.GetKey()); found {
|
||||||
conn.Log.Infof("activated peer from inactive state")
|
conn.Log.Infof("activated peer from inactive state")
|
||||||
if err := conn.Open(ctx); err != nil {
|
if err := conn.Open(ctx); err != nil {
|
||||||
conn.Log.Errorf("failed to open connection: %v", err)
|
conn.Log.Errorf("failed to open connection: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return conn, true
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) DeactivatePeer(conn *peer.Conn) {
|
||||||
|
if !e.isStartedWithLazyMgr() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
e.lazyConnMgr.DeactivatePeer(conn.ConnID())
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ConnMgr) Close() {
|
func (e *ConnMgr) Close() {
|
||||||
@@ -275,7 +277,7 @@ func (e *ConnMgr) addPeersToLazyConnManager() error {
|
|||||||
lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg)
|
lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg)
|
||||||
}
|
}
|
||||||
|
|
||||||
return e.lazyConnMgr.AddActivePeers(e.lazyCtx, lazyPeerCfgs)
|
return e.lazyConnMgr.AddActivePeers(lazyPeerCfgs)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *ConnMgr) closeManager(ctx context.Context) {
|
func (e *ConnMgr) closeManager(ctx context.Context) {
|
||||||
|
|||||||
@@ -1255,7 +1255,7 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if exists := e.connMgr.AddPeerConn(e.ctx, peerKey, conn); exists {
|
if exists := e.connMgr.AddPeerConn(e.ctx, peerKey, conn); exists {
|
||||||
conn.Close()
|
conn.Close(false)
|
||||||
return fmt.Errorf("peer already exists: %s", peerKey)
|
return fmt.Errorf("peer already exists: %s", peerKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1331,11 +1331,16 @@ func (e *Engine) receiveSignalEvents() {
|
|||||||
e.syncMsgMux.Lock()
|
e.syncMsgMux.Lock()
|
||||||
defer e.syncMsgMux.Unlock()
|
defer e.syncMsgMux.Unlock()
|
||||||
|
|
||||||
conn, ok := e.connMgr.OnSignalMsg(e.ctx, msg.Key)
|
conn, ok := e.peerStore.PeerConn(msg.Key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
msgType := msg.GetBody().GetType()
|
||||||
|
if msgType != sProto.Body_GO_IDLE {
|
||||||
|
e.connMgr.ActivatePeer(e.ctx, conn)
|
||||||
|
}
|
||||||
|
|
||||||
switch msg.GetBody().Type {
|
switch msg.GetBody().Type {
|
||||||
case sProto.Body_OFFER:
|
case sProto.Body_OFFER:
|
||||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||||
@@ -1392,6 +1397,8 @@ func (e *Engine) receiveSignalEvents() {
|
|||||||
|
|
||||||
go conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes())
|
go conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes())
|
||||||
case sProto.Body_MODE:
|
case sProto.Body_MODE:
|
||||||
|
case sProto.Body_GO_IDLE:
|
||||||
|
e.connMgr.DeactivatePeer(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|||||||
@@ -1,70 +0,0 @@
|
|||||||
package inactivity
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
peer "github.com/netbirdio/netbird/client/internal/peer/id"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity
|
|
||||||
MinimumInactivityThreshold = 3 * time.Minute
|
|
||||||
)
|
|
||||||
|
|
||||||
type Monitor struct {
|
|
||||||
id peer.ConnID
|
|
||||||
timer *time.Timer
|
|
||||||
cancel context.CancelFunc
|
|
||||||
inactivityThreshold time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewInactivityMonitor(peerID peer.ConnID, threshold time.Duration) *Monitor {
|
|
||||||
i := &Monitor{
|
|
||||||
id: peerID,
|
|
||||||
timer: time.NewTimer(0),
|
|
||||||
inactivityThreshold: threshold,
|
|
||||||
}
|
|
||||||
i.timer.Stop()
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *Monitor) Start(ctx context.Context, timeoutChan chan peer.ConnID) {
|
|
||||||
i.timer.Reset(i.inactivityThreshold)
|
|
||||||
defer i.timer.Stop()
|
|
||||||
|
|
||||||
ctx, i.cancel = context.WithCancel(ctx)
|
|
||||||
defer func() {
|
|
||||||
defer i.cancel()
|
|
||||||
select {
|
|
||||||
case <-i.timer.C:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-i.timer.C:
|
|
||||||
select {
|
|
||||||
case timeoutChan <- i.id:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *Monitor) Stop() {
|
|
||||||
if i.cancel == nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
i.cancel()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *Monitor) PauseTimer() {
|
|
||||||
i.timer.Stop()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (i *Monitor) ResetTimer() {
|
|
||||||
i.timer.Reset(i.inactivityThreshold)
|
|
||||||
}
|
|
||||||
@@ -1,156 +0,0 @@
|
|||||||
package inactivity
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"testing"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
peerid "github.com/netbirdio/netbird/client/internal/peer/id"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MocPeer struct {
|
|
||||||
}
|
|
||||||
|
|
||||||
func (m *MocPeer) ConnID() peerid.ConnID {
|
|
||||||
return peerid.ConnID(m)
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestInactivityMonitor(t *testing.T) {
|
|
||||||
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
||||||
defer testTimeoutCancel()
|
|
||||||
|
|
||||||
p := &MocPeer{}
|
|
||||||
im := NewInactivityMonitor(p.ConnID(), time.Second*2)
|
|
||||||
|
|
||||||
timeoutChan := make(chan peerid.ConnID)
|
|
||||||
|
|
||||||
exitChan := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(exitChan)
|
|
||||||
im.Start(tCtx, timeoutChan)
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-timeoutChan:
|
|
||||||
case <-tCtx.Done():
|
|
||||||
t.Fatal("timeout")
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-exitChan:
|
|
||||||
case <-tCtx.Done():
|
|
||||||
t.Fatal("timeout")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestReuseInactivityMonitor(t *testing.T) {
|
|
||||||
p := &MocPeer{}
|
|
||||||
im := NewInactivityMonitor(p.ConnID(), time.Second*2)
|
|
||||||
|
|
||||||
timeoutChan := make(chan peerid.ConnID)
|
|
||||||
|
|
||||||
for i := 2; i > 0; i-- {
|
|
||||||
exitChan := make(chan struct{})
|
|
||||||
|
|
||||||
testTimeoutCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(exitChan)
|
|
||||||
im.Start(testTimeoutCtx, timeoutChan)
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-timeoutChan:
|
|
||||||
case <-testTimeoutCtx.Done():
|
|
||||||
t.Fatal("timeout")
|
|
||||||
}
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-exitChan:
|
|
||||||
case <-testTimeoutCtx.Done():
|
|
||||||
t.Fatal("timeout")
|
|
||||||
}
|
|
||||||
testTimeoutCancel()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestStopInactivityMonitor(t *testing.T) {
|
|
||||||
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
|
|
||||||
defer testTimeoutCancel()
|
|
||||||
|
|
||||||
p := &MocPeer{}
|
|
||||||
im := NewInactivityMonitor(p.ConnID(), DefaultInactivityThreshold)
|
|
||||||
|
|
||||||
timeoutChan := make(chan peerid.ConnID)
|
|
||||||
|
|
||||||
exitChan := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(exitChan)
|
|
||||||
im.Start(tCtx, timeoutChan)
|
|
||||||
}()
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
time.Sleep(3 * time.Second)
|
|
||||||
im.Stop()
|
|
||||||
}()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-timeoutChan:
|
|
||||||
t.Fatal("unexpected timeout")
|
|
||||||
case <-exitChan:
|
|
||||||
case <-tCtx.Done():
|
|
||||||
t.Fatal("timeout")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPauseInactivityMonitor(t *testing.T) {
|
|
||||||
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*10)
|
|
||||||
defer testTimeoutCancel()
|
|
||||||
|
|
||||||
p := &MocPeer{}
|
|
||||||
trashHold := time.Second * 3
|
|
||||||
im := NewInactivityMonitor(p.ConnID(), trashHold)
|
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
timeoutChan := make(chan peerid.ConnID)
|
|
||||||
|
|
||||||
exitChan := make(chan struct{})
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
defer close(exitChan)
|
|
||||||
im.Start(ctx, timeoutChan)
|
|
||||||
}()
|
|
||||||
|
|
||||||
time.Sleep(1 * time.Second) // grant time to start the monitor
|
|
||||||
im.PauseTimer()
|
|
||||||
|
|
||||||
// check to do not receive timeout
|
|
||||||
thresholdCtx, thresholdCancel := context.WithTimeout(context.Background(), trashHold+time.Second)
|
|
||||||
defer thresholdCancel()
|
|
||||||
select {
|
|
||||||
case <-exitChan:
|
|
||||||
t.Fatal("unexpected exit")
|
|
||||||
case <-timeoutChan:
|
|
||||||
t.Fatal("unexpected timeout")
|
|
||||||
case <-thresholdCtx.Done():
|
|
||||||
// test ok
|
|
||||||
case <-tCtx.Done():
|
|
||||||
t.Fatal("test timed out")
|
|
||||||
}
|
|
||||||
|
|
||||||
// test reset timer
|
|
||||||
im.ResetTimer()
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-tCtx.Done():
|
|
||||||
t.Fatal("test timed out")
|
|
||||||
case <-exitChan:
|
|
||||||
t.Fatal("unexpected exit")
|
|
||||||
case <-timeoutChan:
|
|
||||||
// expected timeout
|
|
||||||
}
|
|
||||||
}
|
|
||||||
249
client/internal/lazyconn/inactivity/manager.go
Normal file
249
client/internal/lazyconn/inactivity/manager.go
Normal file
@@ -0,0 +1,249 @@
|
|||||||
|
package inactivity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"container/list"
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"math"
|
||||||
|
"os"
|
||||||
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Responder: vmp2
|
||||||
|
// - Receive handshake initiation: 148 bytes + extra 32 bytes, every 02:00 - 03:00 minutes
|
||||||
|
// - Receive keep alive: 32 bytes, every 25 sec
|
||||||
|
// Initiator: mp1
|
||||||
|
// - Receive handshake response:
|
||||||
|
// - Receive keep alive: 32 bytes, every 25 sec
|
||||||
|
|
||||||
|
const (
|
||||||
|
keepAliveBytes = 32
|
||||||
|
keepAliveInterval = 25 * time.Second
|
||||||
|
handshakeInitBytes = 148
|
||||||
|
handshakeRespBytes = 92
|
||||||
|
handshakeMaxInterval = 3 * time.Minute
|
||||||
|
|
||||||
|
checkInterval = 1 * time.Minute
|
||||||
|
historySize = 5 * time.Minute
|
||||||
|
|
||||||
|
DefaultInactivityThreshold = 15 * time.Minute
|
||||||
|
MinimumInactivityThreshold = 5 * time.Minute
|
||||||
|
|
||||||
|
recorderEnv = "NB_LAZYCONN_RECORDER_ENABLED"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WgInterface interface {
|
||||||
|
GetStats() (map[string]configurer.WGStats, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
type peerHistory struct {
|
||||||
|
lastRxBytes int64 // last received bytes
|
||||||
|
bytesHistory *list.List // linked list of int64
|
||||||
|
historySize int
|
||||||
|
summarizedBytes int64
|
||||||
|
inactivityCounter int // counter to track inactivity
|
||||||
|
log *log.Entry
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPeerHistory(log *log.Entry, historySize int) *peerHistory {
|
||||||
|
return &peerHistory{
|
||||||
|
bytesHistory: list.New(),
|
||||||
|
historySize: historySize,
|
||||||
|
log: log,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pi *peerHistory) appendRxBytes(rxBytes int64) {
|
||||||
|
// If at capacity, remove the oldest element (front)
|
||||||
|
if pi.bytesHistory.Len() == pi.historySize {
|
||||||
|
pi.summarizedBytes -= pi.bytesHistory.Front().Value.(int64)
|
||||||
|
pi.bytesHistory.Remove(pi.bytesHistory.Front())
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the new rxBytes at the back
|
||||||
|
pi.bytesHistory.PushBack(rxBytes)
|
||||||
|
pi.summarizedBytes += rxBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pi *peerHistory) historyString() string {
|
||||||
|
var history []string
|
||||||
|
for e := pi.bytesHistory.Front(); e != nil; e = e.Next() {
|
||||||
|
history = append(history, fmt.Sprintf("%d", e.Value.(int64)))
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s", history)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (pi *peerHistory) reset() {
|
||||||
|
for e := pi.bytesHistory.Front(); e != nil; e = e.Next() {
|
||||||
|
e.Value = int64(0)
|
||||||
|
}
|
||||||
|
pi.summarizedBytes = 0
|
||||||
|
}
|
||||||
|
|
||||||
|
type Manager struct {
|
||||||
|
InactivePeersChan chan []string
|
||||||
|
iface WgInterface
|
||||||
|
interestedPeers map[string]*peerHistory
|
||||||
|
|
||||||
|
maxBytesPerPeriod int64
|
||||||
|
historySize int // Size of the history buffer for each peer, used to track received bytes over time
|
||||||
|
recorder *Recorder
|
||||||
|
|
||||||
|
thresholdOfInactivity int // Number of consecutive checks with low activity to consider a peer inactive
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewManager(iface WgInterface, configuredThreshold *time.Duration) *Manager {
|
||||||
|
inactivityThreshold, err := validateInactivityThreshold(configuredThreshold)
|
||||||
|
if err != nil {
|
||||||
|
inactivityThreshold = DefaultInactivityThreshold
|
||||||
|
log.Warnf("invalid inactivity threshold configured: %v, using default: %v", err, DefaultInactivityThreshold)
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedMaxBytes := calculateExpectedMaxBytes()
|
||||||
|
log.Infof("receive less than %d bytes per %v, will be considered inactive", expectedMaxBytes, inactivityThreshold)
|
||||||
|
return &Manager{
|
||||||
|
InactivePeersChan: make(chan []string, 1),
|
||||||
|
iface: iface,
|
||||||
|
interestedPeers: make(map[string]*peerHistory),
|
||||||
|
historySize: int(historySize.Minutes()),
|
||||||
|
maxBytesPerPeriod: expectedMaxBytes,
|
||||||
|
thresholdOfInactivity: int(math.Ceil(inactivityThreshold.Minutes() / checkInterval.Minutes())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) AddPeer(peerCfg *lazyconn.PeerConfig) {
|
||||||
|
if _, exists := m.interestedPeers[peerCfg.PublicKey]; exists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
peerCfg.Log.Debugf("adding peer to inactivity manager")
|
||||||
|
m.interestedPeers[peerCfg.PublicKey] = newPeerHistory(peerCfg.Log, m.historySize)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) RemovePeer(peer string) {
|
||||||
|
pi, ok := m.interestedPeers[peer]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
pi.log.Debugf("remove peer from inactivity manager")
|
||||||
|
delete(m.interestedPeers, peer)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Start(ctx context.Context) {
|
||||||
|
enabled, err := strconv.ParseBool(os.Getenv(recorderEnv))
|
||||||
|
if err == nil && enabled {
|
||||||
|
m.recorder = NewRecorder()
|
||||||
|
defer m.recorder.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
ticker := newTicker(checkInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case tickTime := <-ticker.C():
|
||||||
|
idlePeers, err := m.checkStats(tickTime)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error checking stats: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(idlePeers) == 0 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
m.notifyInactivePeers(ctx, idlePeers)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) notifyInactivePeers(ctx context.Context, inactivePeers []string) {
|
||||||
|
select {
|
||||||
|
case m.InactivePeersChan <- inactivePeers:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) checkStats(now time.Time) ([]string, error) {
|
||||||
|
stats, err := m.iface.GetStats()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var idlePeers []string
|
||||||
|
|
||||||
|
for peer, history := range m.interestedPeers {
|
||||||
|
stat, found := stats[peer]
|
||||||
|
if !found {
|
||||||
|
// when peer is in connecting state
|
||||||
|
history.log.Warnf("peer not found in wg stats")
|
||||||
|
}
|
||||||
|
|
||||||
|
deltaRx := stat.RxBytes - history.lastRxBytes
|
||||||
|
if deltaRx < 0 {
|
||||||
|
deltaRx = 0 // reset to zero if negative
|
||||||
|
history.reset()
|
||||||
|
}
|
||||||
|
|
||||||
|
m.recorder.ReceivedBytes(peer, now, deltaRx)
|
||||||
|
|
||||||
|
history.lastRxBytes = stat.RxBytes
|
||||||
|
history.appendRxBytes(deltaRx)
|
||||||
|
|
||||||
|
// not enough history to determine inactivity
|
||||||
|
if history.bytesHistory.Len() < m.historySize {
|
||||||
|
history.log.Debugf("not enough history to determine inactivity, current history size: %d, required: %d", history.bytesHistory.Len(), m.historySize)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if history.summarizedBytes <= m.maxBytesPerPeriod {
|
||||||
|
history.inactivityCounter++
|
||||||
|
history.log.Debugf("peer is inactive, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString())
|
||||||
|
} else {
|
||||||
|
history.inactivityCounter = 0 // reset inactivity counter if activity is detected
|
||||||
|
history.log.Debugf("peer is active, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString())
|
||||||
|
}
|
||||||
|
|
||||||
|
if history.inactivityCounter >= m.thresholdOfInactivity {
|
||||||
|
history.log.Infof("peer is inactive for %d consecutive checks, marking as idle (limit %d) ", history.inactivityCounter, m.thresholdOfInactivity)
|
||||||
|
idlePeers = append(idlePeers, peer)
|
||||||
|
history.inactivityCounter = 0 // reset inactivity counter after marking as idle
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return idlePeers, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateInactivityThreshold(configuredThreshold *time.Duration) (time.Duration, error) {
|
||||||
|
if configuredThreshold == nil {
|
||||||
|
return DefaultInactivityThreshold, nil
|
||||||
|
}
|
||||||
|
if *configuredThreshold < MinimumInactivityThreshold {
|
||||||
|
return 0, fmt.Errorf("configured inactivity threshold %v is too low, using %v", *configuredThreshold, MinimumInactivityThreshold)
|
||||||
|
}
|
||||||
|
return *configuredThreshold, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func calculateExpectedMaxBytes() int64 {
|
||||||
|
// Calculate number of keep-alive packets expected
|
||||||
|
keepAliveCount := int64(historySize.Seconds() / keepAliveInterval.Seconds())
|
||||||
|
keepAliveBytes := keepAliveCount * keepAliveBytes
|
||||||
|
|
||||||
|
// Calculate potential handshake packets (conservative estimate)
|
||||||
|
handshakeCount := int64(historySize.Minutes() / handshakeMaxInterval.Minutes())
|
||||||
|
handshakeBytes := handshakeCount * (handshakeInitBytes + keepAliveBytes) // handshake + extra bytes
|
||||||
|
|
||||||
|
return keepAliveBytes + handshakeBytes
|
||||||
|
}
|
||||||
53
client/internal/lazyconn/inactivity/manager_test.go
Normal file
53
client/internal/lazyconn/inactivity/manager_test.go
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
package inactivity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||||
|
"github.com/netbirdio/netbird/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
_ = util.InitLog("trace", "console")
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNewManager(t *testing.T) {
|
||||||
|
for i, sc := range scenarios {
|
||||||
|
timer := NewFakeTimer()
|
||||||
|
newTicker = func(d time.Duration) Ticker {
|
||||||
|
return newFakeTicker(d, timer)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Run(fmt.Sprintf("Scenario %d", i), func(t *testing.T) {
|
||||||
|
mock := newMockWgInterface("peer1", sc.Data, timer)
|
||||||
|
manager := NewManager(mock, nil)
|
||||||
|
peerCfg := &lazyconn.PeerConfig{
|
||||||
|
PublicKey: "peer1",
|
||||||
|
Log: log.WithField("peer", "peer1"),
|
||||||
|
}
|
||||||
|
manager.AddPeer(peerCfg)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
manager.Start(ctx)
|
||||||
|
|
||||||
|
var inactiveResult bool
|
||||||
|
select {
|
||||||
|
case <-manager.InactivePeersChan:
|
||||||
|
inactiveResult = true
|
||||||
|
default:
|
||||||
|
inactiveResult = false
|
||||||
|
}
|
||||||
|
|
||||||
|
if inactiveResult != sc.ExpectedInactive {
|
||||||
|
t.Errorf("Expected inactive peers: %v, got: %v", sc.ExpectedInactive, inactiveResult)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
102
client/internal/lazyconn/inactivity/moc_test.go
Normal file
102
client/internal/lazyconn/inactivity/moc_test.go
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
package inactivity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type rxHistory struct {
|
||||||
|
when time.Duration
|
||||||
|
RxBytes int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// mockWgInterface mocks WgInterface to simulate peer stats.
|
||||||
|
type mockWgInterface struct {
|
||||||
|
peerID string
|
||||||
|
statsSequence []rxHistory
|
||||||
|
timer *FakeTimer
|
||||||
|
initialTime time.Time
|
||||||
|
reachedLast bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func newMockWgInterface(peerID string, history []rxHistory, timer *FakeTimer) *mockWgInterface {
|
||||||
|
return &mockWgInterface{
|
||||||
|
peerID: peerID,
|
||||||
|
statsSequence: history,
|
||||||
|
timer: timer,
|
||||||
|
initialTime: timer.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *mockWgInterface) GetStats() (map[string]configurer.WGStats, error) {
|
||||||
|
if m.reachedLast {
|
||||||
|
return nil, fmt.Errorf("no more data")
|
||||||
|
}
|
||||||
|
|
||||||
|
now := m.timer.Now()
|
||||||
|
var rx int64
|
||||||
|
for i, history := range m.statsSequence {
|
||||||
|
if now.Before(m.initialTime.Add(history.when)) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(m.statsSequence)-1 == i {
|
||||||
|
m.reachedLast = true
|
||||||
|
}
|
||||||
|
|
||||||
|
rx += history.RxBytes
|
||||||
|
}
|
||||||
|
|
||||||
|
wgStats := make(map[string]configurer.WGStats)
|
||||||
|
wgStats[m.peerID] = configurer.WGStats{
|
||||||
|
RxBytes: rx,
|
||||||
|
}
|
||||||
|
return wgStats, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// fakeTicker is a controllable ticker for use in tests
|
||||||
|
type fakeTicker struct {
|
||||||
|
interval time.Duration
|
||||||
|
timer *FakeTimer
|
||||||
|
|
||||||
|
ch chan time.Time
|
||||||
|
now time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func newFakeTicker(interval time.Duration, timer *FakeTimer) *fakeTicker {
|
||||||
|
return &fakeTicker{
|
||||||
|
interval: interval,
|
||||||
|
timer: timer,
|
||||||
|
ch: make(chan time.Time, 1),
|
||||||
|
now: timer.Now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeTicker) C() <-chan time.Time {
|
||||||
|
f.now = f.now.Add(f.interval)
|
||||||
|
f.timer.Set(f.now)
|
||||||
|
f.ch <- f.now
|
||||||
|
return f.ch
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *fakeTicker) Stop() {}
|
||||||
|
|
||||||
|
type FakeTimer struct {
|
||||||
|
now time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewFakeTimer() *FakeTimer {
|
||||||
|
return &FakeTimer{
|
||||||
|
now: time.Date(2025, time.June, 1, 0, 0, 0, 0, time.UTC),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeTimer) Set(t time.Time) {
|
||||||
|
f.now = t
|
||||||
|
}
|
||||||
|
|
||||||
|
func (f *FakeTimer) Now() time.Time {
|
||||||
|
return f.now
|
||||||
|
}
|
||||||
58
client/internal/lazyconn/inactivity/recorder.go
Normal file
58
client/internal/lazyconn/inactivity/recorder.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package inactivity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Recorder struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
file *os.File
|
||||||
|
filename string
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewRecorder() *Recorder {
|
||||||
|
timestamp := time.Now().Format("20060102_150405")
|
||||||
|
filename := fmt.Sprintf("inactivity_log_%s.txt", timestamp)
|
||||||
|
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error opening file: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Recorder{
|
||||||
|
file: file,
|
||||||
|
filename: filename,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Recorder) ReceivedBytes(peer string, now time.Time, bytes int64) {
|
||||||
|
if r == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
entry := fmt.Sprintf("%s; %s; %d\n", now.Format(time.RFC3339), peer, bytes)
|
||||||
|
_, err := r.file.WriteString(entry)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error writing to file: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Recorder) Close() {
|
||||||
|
if r == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.mu.Lock()
|
||||||
|
defer r.mu.Unlock()
|
||||||
|
|
||||||
|
if err := r.file.Close(); err != nil {
|
||||||
|
log.Errorf("error closing file: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
209
client/internal/lazyconn/inactivity/scenarios_test.go
Normal file
209
client/internal/lazyconn/inactivity/scenarios_test.go
Normal file
@@ -0,0 +1,209 @@
|
|||||||
|
package inactivity
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
type scenario struct {
|
||||||
|
ExpectedInactive bool
|
||||||
|
Data []rxHistory
|
||||||
|
}
|
||||||
|
|
||||||
|
var scenarios = []scenario{
|
||||||
|
{
|
||||||
|
ExpectedInactive: true,
|
||||||
|
Data: []rxHistory{
|
||||||
|
{when: 0 * time.Second, RxBytes: 32},
|
||||||
|
{when: 25 * time.Second, RxBytes: 32},
|
||||||
|
{when: 50 * time.Second, RxBytes: 32},
|
||||||
|
{when: 75 * time.Second, RxBytes: 32},
|
||||||
|
{when: 100 * time.Second, RxBytes: 32},
|
||||||
|
{when: 100 * time.Second, RxBytes: 92},
|
||||||
|
{when: 150 * time.Second, RxBytes: 32},
|
||||||
|
{when: 175 * time.Second, RxBytes: 32},
|
||||||
|
{when: 200 * time.Second, RxBytes: 32},
|
||||||
|
{when: 225 * time.Second, RxBytes: 32},
|
||||||
|
{when: 250 * time.Second, RxBytes: 32},
|
||||||
|
{when: 250 * time.Second, RxBytes: 92},
|
||||||
|
{when: 300 * time.Second, RxBytes: 32},
|
||||||
|
{when: 325 * time.Second, RxBytes: 32},
|
||||||
|
{when: 350 * time.Second, RxBytes: 32},
|
||||||
|
{when: 375 * time.Second, RxBytes: 32},
|
||||||
|
{when: 375 * time.Second, RxBytes: 92},
|
||||||
|
{when: 400 * time.Second, RxBytes: 32},
|
||||||
|
{when: 425 * time.Second, RxBytes: 32},
|
||||||
|
{when: 450 * time.Second, RxBytes: 32},
|
||||||
|
{when: 475 * time.Second, RxBytes: 32},
|
||||||
|
{when: 500 * time.Second, RxBytes: 32},
|
||||||
|
{when: 500 * time.Second, RxBytes: 92},
|
||||||
|
{when: 525 * time.Second, RxBytes: 32},
|
||||||
|
{when: 550 * time.Second, RxBytes: 32},
|
||||||
|
{when: 575 * time.Second, RxBytes: 32},
|
||||||
|
{when: 600 * time.Second, RxBytes: 32},
|
||||||
|
{when: 625 * time.Second, RxBytes: 32},
|
||||||
|
{when: 625 * time.Second, RxBytes: 92},
|
||||||
|
{when: 650 * time.Second, RxBytes: 32},
|
||||||
|
{when: 675 * time.Second, RxBytes: 32},
|
||||||
|
{when: 700 * time.Second, RxBytes: 32},
|
||||||
|
{when: 725 * time.Second, RxBytes: 32},
|
||||||
|
{when: 750 * time.Second, RxBytes: 32},
|
||||||
|
{when: 750 * time.Second, RxBytes: 92},
|
||||||
|
{when: 775 * time.Second, RxBytes: 32},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ExpectedInactive: true,
|
||||||
|
Data: []rxHistory{
|
||||||
|
//96
|
||||||
|
{when: 0 * time.Second, RxBytes: 32},
|
||||||
|
{when: 25 * time.Second, RxBytes: 32},
|
||||||
|
{when: 50 * time.Second, RxBytes: 32},
|
||||||
|
|
||||||
|
//212
|
||||||
|
{when: 75 * time.Second, RxBytes: 32},
|
||||||
|
{when: 100 * time.Second, RxBytes: 32},
|
||||||
|
{when: 100 * time.Second, RxBytes: 148},
|
||||||
|
|
||||||
|
//96
|
||||||
|
{when: 125 * time.Second, RxBytes: 32},
|
||||||
|
{when: 150 * time.Second, RxBytes: 32},
|
||||||
|
{when: 175 * time.Second, RxBytes: 32},
|
||||||
|
|
||||||
|
//212
|
||||||
|
{when: 200 * time.Second, RxBytes: 32},
|
||||||
|
{when: 225 * time.Second, RxBytes: 32},
|
||||||
|
{when: 225 * time.Second, RxBytes: 148},
|
||||||
|
|
||||||
|
//96
|
||||||
|
{when: 250 * time.Second, RxBytes: 32},
|
||||||
|
{when: 275 * time.Second, RxBytes: 32},
|
||||||
|
{when: 300 * time.Second, RxBytes: 32},
|
||||||
|
|
||||||
|
{when: 325 * time.Second, RxBytes: 32},
|
||||||
|
{when: 350 * time.Second, RxBytes: 32},
|
||||||
|
{when: 350 * time.Second, RxBytes: 148},
|
||||||
|
|
||||||
|
{when: 375 * time.Second, RxBytes: 32},
|
||||||
|
{when: 400 * time.Second, RxBytes: 32},
|
||||||
|
|
||||||
|
{when: 425 * time.Second, RxBytes: 32},
|
||||||
|
{when: 450 * time.Second, RxBytes: 32},
|
||||||
|
{when: 475 * time.Second, RxBytes: 32},
|
||||||
|
{when: 475 * time.Second, RxBytes: 148},
|
||||||
|
|
||||||
|
{when: 500 * time.Second, RxBytes: 32},
|
||||||
|
{when: 525 * time.Second, RxBytes: 32},
|
||||||
|
|
||||||
|
{when: 550 * time.Second, RxBytes: 32},
|
||||||
|
{when: 575 * time.Second, RxBytes: 32},
|
||||||
|
{when: 600 * time.Second, RxBytes: 32},
|
||||||
|
{when: 600 * time.Second, RxBytes: 148},
|
||||||
|
|
||||||
|
{when: 625 * time.Second, RxBytes: 32},
|
||||||
|
{when: 650 * time.Second, RxBytes: 32},
|
||||||
|
|
||||||
|
{when: 675 * time.Second, RxBytes: 32},
|
||||||
|
{when: 700 * time.Second, RxBytes: 32},
|
||||||
|
|
||||||
|
{when: 725 * time.Second, RxBytes: 32},
|
||||||
|
{when: 725 * time.Second, RxBytes: 148},
|
||||||
|
{when: 750 * time.Second, RxBytes: 32},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
// Rosenpass
|
||||||
|
ExpectedInactive: true,
|
||||||
|
Data: []rxHistory{
|
||||||
|
{when: 0 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 0 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 0 * time.Second, RxBytes: 128},
|
||||||
|
{when: 0 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 0 * time.Second, RxBytes: 128},
|
||||||
|
{when: 0 * time.Second, RxBytes: 2},
|
||||||
|
{when: 35 * time.Second, RxBytes: 32},
|
||||||
|
{when: 60 * time.Second, RxBytes: 32},
|
||||||
|
{when: 85 * time.Second, RxBytes: 32},
|
||||||
|
{when: 110 * time.Second, RxBytes: 32},
|
||||||
|
{when: 120 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 120 * time.Second, RxBytes: 92},
|
||||||
|
{when: 120 * time.Second, RxBytes: 240},
|
||||||
|
{when: 130 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 130 * time.Second, RxBytes: 32},
|
||||||
|
{when: 130 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 130 * time.Second, RxBytes: 128},
|
||||||
|
{when: 165 * time.Second, RxBytes: 32},
|
||||||
|
{when: 190 * time.Second, RxBytes: 32},
|
||||||
|
{when: 215 * time.Second, RxBytes: 32},
|
||||||
|
{when: 240 * time.Second, RxBytes: 92},
|
||||||
|
{when: 240 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 240 * time.Second, RxBytes: 128},
|
||||||
|
{when: 260 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 260 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 260 * time.Second, RxBytes: 128},
|
||||||
|
{when: 320 * time.Second, RxBytes: 32},
|
||||||
|
{when: 345 * time.Second, RxBytes: 32},
|
||||||
|
{when: 370 * time.Second, RxBytes: 92},
|
||||||
|
{when: 370 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 370 * time.Second, RxBytes: 128},
|
||||||
|
{when: 390 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 390 * time.Second, RxBytes: 128},
|
||||||
|
{when: 450 * time.Second, RxBytes: 32},
|
||||||
|
{when: 475 * time.Second, RxBytes: 32},
|
||||||
|
{when: 500 * time.Second, RxBytes: 92},
|
||||||
|
{when: 500 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 500 * time.Second, RxBytes: 128},
|
||||||
|
{when: 520 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 520 * time.Second, RxBytes: 128},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
ExpectedInactive: true,
|
||||||
|
Data: []rxHistory{
|
||||||
|
{when: 0 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 0 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 0 * time.Second, RxBytes: 240},
|
||||||
|
{when: 0 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 1 * time.Second, RxBytes: 240},
|
||||||
|
{when: 1 * time.Second, RxBytes: 2},
|
||||||
|
{when: 11 * time.Second, RxBytes: 32},
|
||||||
|
{when: 121 * time.Second, RxBytes: 1200},
|
||||||
|
{when: 121 * time.Second, RxBytes: 148},
|
||||||
|
{when: 121 * time.Second, RxBytes: 32},
|
||||||
|
{when: 121 * time.Second, RxBytes: 128},
|
||||||
|
{when: 131 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 131 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 131 * time.Second, RxBytes: 240},
|
||||||
|
{when: 141 * time.Second, RxBytes: 32},
|
||||||
|
{when: 191 * time.Second, RxBytes: 32},
|
||||||
|
{when: 241 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 241 * time.Second, RxBytes: 148},
|
||||||
|
{when: 241 * time.Second, RxBytes: 32},
|
||||||
|
{when: 241 * time.Second, RxBytes: 240},
|
||||||
|
{when: 251 * time.Second, RxBytes: 32},
|
||||||
|
{when: 261 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 261 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 261 * time.Second, RxBytes: 240},
|
||||||
|
{when: 271 * time.Second, RxBytes: 32},
|
||||||
|
{when: 296 * time.Second, RxBytes: 32},
|
||||||
|
{when: 371 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 371 * time.Second, RxBytes: 148},
|
||||||
|
{when: 371 * time.Second, RxBytes: 32},
|
||||||
|
{when: 371 * time.Second, RxBytes: 240},
|
||||||
|
{when: 381 * time.Second, RxBytes: 32},
|
||||||
|
{when: 391 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 391 * time.Second, RxBytes: 240},
|
||||||
|
{when: 401 * time.Second, RxBytes: 32},
|
||||||
|
{when: 426 * time.Second, RxBytes: 32},
|
||||||
|
{when: 501 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 501 * time.Second, RxBytes: 148},
|
||||||
|
{when: 501 * time.Second, RxBytes: 32},
|
||||||
|
{when: 501 * time.Second, RxBytes: 240},
|
||||||
|
{when: 511 * time.Second, RxBytes: 32},
|
||||||
|
{when: 521 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 521 * time.Second, RxBytes: 240},
|
||||||
|
{when: 531 * time.Second, RxBytes: 32},
|
||||||
|
{when: 631 * time.Second, RxBytes: 1152},
|
||||||
|
{when: 631 * time.Second, RxBytes: 148},
|
||||||
|
{when: 631 * time.Second, RxBytes: 32},
|
||||||
|
{when: 631 * time.Second, RxBytes: 240},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
24
client/internal/lazyconn/inactivity/ticker.go
Normal file
24
client/internal/lazyconn/inactivity/ticker.go
Normal file
@@ -0,0 +1,24 @@
|
|||||||
|
package inactivity
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
var newTicker = func(d time.Duration) Ticker {
|
||||||
|
return &realTicker{t: time.NewTicker(d)}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Ticker interface {
|
||||||
|
C() <-chan time.Time
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
type realTicker struct {
|
||||||
|
t *time.Ticker
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realTicker) C() <-chan time.Time {
|
||||||
|
return r.t.C
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *realTicker) Stop() {
|
||||||
|
r.t.Stop()
|
||||||
|
}
|
||||||
@@ -53,12 +53,12 @@ type Manager struct {
|
|||||||
managedPeersMu sync.Mutex
|
managedPeersMu sync.Mutex
|
||||||
|
|
||||||
activityManager *activity.Manager
|
activityManager *activity.Manager
|
||||||
inactivityMonitors map[peerid.ConnID]*inactivity.Monitor
|
inactivityManager *inactivity.Manager
|
||||||
|
|
||||||
// Route HA group management
|
// Route HA group management
|
||||||
peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to
|
peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to
|
||||||
haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group
|
haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group
|
||||||
routesMu sync.RWMutex // protects route mappings
|
routesMu sync.RWMutex
|
||||||
|
|
||||||
onInactive chan peerid.ConnID
|
onInactive chan peerid.ConnID
|
||||||
}
|
}
|
||||||
@@ -67,6 +67,7 @@ type Manager struct {
|
|||||||
// engineCtx is the context for creating peer Connection
|
// engineCtx is the context for creating peer Connection
|
||||||
func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager {
|
func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager {
|
||||||
log.Infof("setup lazy connection service")
|
log.Infof("setup lazy connection service")
|
||||||
|
|
||||||
m := &Manager{
|
m := &Manager{
|
||||||
engineCtx: engineCtx,
|
engineCtx: engineCtx,
|
||||||
peerStore: peerStore,
|
peerStore: peerStore,
|
||||||
@@ -76,18 +77,9 @@ func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.S
|
|||||||
managedPeersByConnID: make(map[peerid.ConnID]*managedPeer),
|
managedPeersByConnID: make(map[peerid.ConnID]*managedPeer),
|
||||||
excludes: make(map[string]lazyconn.PeerConfig),
|
excludes: make(map[string]lazyconn.PeerConfig),
|
||||||
activityManager: activity.NewManager(wgIface),
|
activityManager: activity.NewManager(wgIface),
|
||||||
inactivityMonitors: make(map[peerid.ConnID]*inactivity.Monitor),
|
inactivityManager: inactivity.NewManager(wgIface, config.InactivityThreshold),
|
||||||
peerToHAGroups: make(map[string][]route.HAUniqueID),
|
peerToHAGroups: make(map[string][]route.HAUniqueID),
|
||||||
haGroupToPeers: make(map[route.HAUniqueID][]string),
|
haGroupToPeers: make(map[route.HAUniqueID][]string),
|
||||||
onInactive: make(chan peerid.ConnID),
|
|
||||||
}
|
|
||||||
|
|
||||||
if config.InactivityThreshold != nil {
|
|
||||||
if *config.InactivityThreshold >= inactivity.MinimumInactivityThreshold {
|
|
||||||
m.inactivityThreshold = *config.InactivityThreshold
|
|
||||||
} else {
|
|
||||||
log.Warnf("inactivity threshold is too low, using %v", m.inactivityThreshold)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
m.connStateListener = &dispatcher.ConnectionListener{
|
m.connStateListener = &dispatcher.ConnectionListener{
|
||||||
@@ -139,14 +131,18 @@ func (m *Manager) UpdateRouteHAMap(haMap route.HAMap) {
|
|||||||
func (m *Manager) Start(ctx context.Context) {
|
func (m *Manager) Start(ctx context.Context) {
|
||||||
defer m.close()
|
defer m.close()
|
||||||
|
|
||||||
|
go m.inactivityManager.Start(ctx)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case peerConnID := <-m.activityManager.OnActivityChan:
|
case peerConnID := <-m.activityManager.OnActivityChan:
|
||||||
m.onPeerActivity(ctx, peerConnID)
|
m.onPeerActivity(peerConnID)
|
||||||
case peerConnID := <-m.onInactive:
|
case peerIDs := <-m.inactivityManager.InactivePeersChan:
|
||||||
m.onPeerInactivityTimedOut(peerConnID)
|
for _, peerID := range peerIDs {
|
||||||
|
m.onPeerInactivityTimedOut(peerID)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -156,7 +152,7 @@ func (m *Manager) Start(ctx context.Context) {
|
|||||||
// Adds them back to the managed list and start the inactivity listener if they are removed from the exclude list. In
|
// Adds them back to the managed list and start the inactivity listener if they are removed from the exclude list. In
|
||||||
// this case, we suppose that the connection status is connected or connecting.
|
// this case, we suppose that the connection status is connected or connecting.
|
||||||
// If the peer is not exists yet in the managed list then the responsibility is the upper layer to call the AddPeer function
|
// If the peer is not exists yet in the managed list then the responsibility is the upper layer to call the AddPeer function
|
||||||
func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerConfig) []string {
|
func (m *Manager) ExcludePeer(peerConfigs []lazyconn.PeerConfig) []string {
|
||||||
m.managedPeersMu.Lock()
|
m.managedPeersMu.Lock()
|
||||||
defer m.managedPeersMu.Unlock()
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
@@ -187,7 +183,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo
|
|||||||
|
|
||||||
peerCfg.Log.Infof("peer removed from lazy connection exclude list")
|
peerCfg.Log.Infof("peer removed from lazy connection exclude list")
|
||||||
|
|
||||||
if err := m.addActivePeer(ctx, peerCfg); err != nil {
|
if err := m.addActivePeer(&peerCfg); err != nil {
|
||||||
log.Errorf("failed to add peer to lazy connection manager: %s", err)
|
log.Errorf("failed to add peer to lazy connection manager: %s", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
@@ -217,20 +213,24 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
|
|||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold)
|
|
||||||
m.inactivityMonitors[peerCfg.PeerConnID] = im
|
|
||||||
|
|
||||||
m.managedPeers[peerCfg.PublicKey] = &peerCfg
|
m.managedPeers[peerCfg.PublicKey] = &peerCfg
|
||||||
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
|
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
|
||||||
peerCfg: &peerCfg,
|
peerCfg: &peerCfg,
|
||||||
expectedWatcher: watcherActivity,
|
expectedWatcher: watcherActivity,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if this peer should be activated because its HA group peers are active
|
||||||
|
if group, ok := m.shouldActivateNewPeer(peerCfg.PublicKey); ok {
|
||||||
|
peerCfg.Log.Debugf("peer belongs to active HA group %s, will activate immediately", group)
|
||||||
|
m.activateNewPeerInActiveGroup(peerCfg)
|
||||||
|
}
|
||||||
|
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// AddActivePeers adds a list of peers to the lazy connection manager
|
// AddActivePeers adds a list of peers to the lazy connection manager
|
||||||
// suppose these peers was in connected or in connecting states
|
// suppose these peers was in connected or in connecting states
|
||||||
func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerConfig) error {
|
func (m *Manager) AddActivePeers(peerCfg []lazyconn.PeerConfig) error {
|
||||||
m.managedPeersMu.Lock()
|
m.managedPeersMu.Lock()
|
||||||
defer m.managedPeersMu.Unlock()
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
@@ -240,7 +240,7 @@ func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerCon
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.addActivePeer(ctx, cfg); err != nil {
|
if err := m.addActivePeer(&cfg); err != nil {
|
||||||
cfg.Log.Errorf("failed to add peer to lazy connection manager: %v", err)
|
cfg.Log.Errorf("failed to add peer to lazy connection manager: %v", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -257,7 +257,7 @@ func (m *Manager) RemovePeer(peerID string) {
|
|||||||
|
|
||||||
// ActivatePeer activates a peer connection when a signal message is received
|
// ActivatePeer activates a peer connection when a signal message is received
|
||||||
// Also activates all peers in the same HA groups as this peer
|
// Also activates all peers in the same HA groups as this peer
|
||||||
func (m *Manager) ActivatePeer(ctx context.Context, peerID string) (found bool) {
|
func (m *Manager) ActivatePeer(peerID string) (found bool) {
|
||||||
m.managedPeersMu.Lock()
|
m.managedPeersMu.Lock()
|
||||||
defer m.managedPeersMu.Unlock()
|
defer m.managedPeersMu.Unlock()
|
||||||
cfg, mp := m.getPeerForActivation(peerID)
|
cfg, mp := m.getPeerForActivation(peerID)
|
||||||
@@ -265,15 +265,42 @@ func (m *Manager) ActivatePeer(ctx context.Context, peerID string) (found bool)
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if !m.activateSinglePeer(ctx, cfg, mp) {
|
if !m.activateSinglePeer(cfg, mp) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
m.activateHAGroupPeers(ctx, peerID)
|
m.activateHAGroupPeers(peerID)
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Manager) DeactivatePeer(peerID peerid.ConnID) {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
mp, ok := m.managedPeersByConnID[peerID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if mp.expectedWatcher != watcherInactivity {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.peerStore.PeerConnClose(mp.peerCfg.PublicKey)
|
||||||
|
|
||||||
|
mp.peerCfg.Log.Infof("start activity monitor")
|
||||||
|
|
||||||
|
mp.expectedWatcher = watcherActivity
|
||||||
|
|
||||||
|
m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey)
|
||||||
|
|
||||||
|
if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil {
|
||||||
|
mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// getPeerForActivation checks if a peer can be activated and returns the necessary structs
|
// getPeerForActivation checks if a peer can be activated and returns the necessary structs
|
||||||
// Returns nil values if the peer should be skipped
|
// Returns nil values if the peer should be skipped
|
||||||
func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *managedPeer) {
|
func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *managedPeer) {
|
||||||
@@ -296,57 +323,53 @@ func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *ma
|
|||||||
}
|
}
|
||||||
|
|
||||||
// activateSinglePeer activates a single peer (internal method)
|
// activateSinglePeer activates a single peer (internal method)
|
||||||
func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConfig, mp *managedPeer) bool {
|
func (m *Manager) activateSinglePeer(cfg *lazyconn.PeerConfig, mp *managedPeer) bool {
|
||||||
mp.expectedWatcher = watcherInactivity
|
mp.expectedWatcher = watcherInactivity
|
||||||
|
|
||||||
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
|
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
|
||||||
|
|
||||||
im, ok := m.inactivityMonitors[cfg.PeerConnID]
|
cfg.Log.Infof("starting inactivity monitor for peer: %s", cfg.PublicKey)
|
||||||
if !ok {
|
m.inactivityManager.AddPeer(cfg)
|
||||||
cfg.Log.Errorf("inactivity monitor not found for peer")
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg.Log.Infof("starting inactivity monitor")
|
|
||||||
go im.Start(ctx, m.onInactive)
|
|
||||||
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
// activateHAGroupPeers activates all peers in HA groups that the given peer belongs to
|
// activateHAGroupPeers activates all peers in HA groups that the given peer belongs to
|
||||||
func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) {
|
func (m *Manager) activateHAGroupPeers(triggerPeerID string) {
|
||||||
|
var peersToActivate []string
|
||||||
|
|
||||||
m.routesMu.RLock()
|
m.routesMu.RLock()
|
||||||
haGroups := m.peerToHAGroups[triggerPeerID]
|
haGroups := m.peerToHAGroups[triggerPeerID]
|
||||||
m.routesMu.RUnlock()
|
|
||||||
|
|
||||||
if len(haGroups) == 0 {
|
if len(haGroups) == 0 {
|
||||||
|
m.routesMu.RUnlock()
|
||||||
log.Debugf("peer %s is not part of any HA groups", triggerPeerID)
|
log.Debugf("peer %s is not part of any HA groups", triggerPeerID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
activatedCount := 0
|
|
||||||
for _, haGroup := range haGroups {
|
for _, haGroup := range haGroups {
|
||||||
m.routesMu.RLock()
|
|
||||||
peers := m.haGroupToPeers[haGroup]
|
peers := m.haGroupToPeers[haGroup]
|
||||||
|
for _, peerID := range peers {
|
||||||
|
if peerID != triggerPeerID {
|
||||||
|
peersToActivate = append(peersToActivate, peerID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
m.routesMu.RUnlock()
|
m.routesMu.RUnlock()
|
||||||
|
|
||||||
for _, peerID := range peers {
|
activatedCount := 0
|
||||||
if peerID == triggerPeerID {
|
for _, peerID := range peersToActivate {
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
cfg, mp := m.getPeerForActivation(peerID)
|
cfg, mp := m.getPeerForActivation(peerID)
|
||||||
if cfg == nil {
|
if cfg == nil {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if m.activateSinglePeer(ctx, cfg, mp) {
|
if m.activateSinglePeer(cfg, mp) {
|
||||||
activatedCount++
|
activatedCount++
|
||||||
cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID)
|
cfg.Log.Infof("activated peer as part of HA group (triggered by %s)", triggerPeerID)
|
||||||
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
|
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if activatedCount > 0 {
|
if activatedCount > 0 {
|
||||||
log.Infof("activated %d additional peers in HA groups for peer %s (groups: %v)",
|
log.Infof("activated %d additional peers in HA groups for peer %s (groups: %v)",
|
||||||
@@ -354,23 +377,64 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error {
|
// shouldActivateNewPeer checks if a newly added peer should be activated
|
||||||
|
// because other peers in its HA groups are already active
|
||||||
|
func (m *Manager) shouldActivateNewPeer(peerID string) (route.HAUniqueID, bool) {
|
||||||
|
m.routesMu.RLock()
|
||||||
|
defer m.routesMu.RUnlock()
|
||||||
|
|
||||||
|
haGroups := m.peerToHAGroups[peerID]
|
||||||
|
if len(haGroups) == 0 {
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, haGroup := range haGroups {
|
||||||
|
peers := m.haGroupToPeers[haGroup]
|
||||||
|
for _, groupPeerID := range peers {
|
||||||
|
if groupPeerID == peerID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg, ok := m.managedPeers[groupPeerID]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if mp, ok := m.managedPeersByConnID[cfg.PeerConnID]; ok && mp.expectedWatcher == watcherInactivity {
|
||||||
|
return haGroup, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "", false
|
||||||
|
}
|
||||||
|
|
||||||
|
// activateNewPeerInActiveGroup activates a newly added peer that should be active due to HA group
|
||||||
|
func (m *Manager) activateNewPeerInActiveGroup(peerCfg lazyconn.PeerConfig) {
|
||||||
|
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !m.activateSinglePeer(&peerCfg, mp) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
peerCfg.Log.Infof("activated newly added peer due to active HA group peers")
|
||||||
|
m.peerStore.PeerConnOpen(m.engineCtx, peerCfg.PublicKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) addActivePeer(peerCfg *lazyconn.PeerConfig) error {
|
||||||
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
|
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
|
||||||
peerCfg.Log.Warnf("peer already managed")
|
peerCfg.Log.Warnf("peer already managed")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold)
|
m.managedPeers[peerCfg.PublicKey] = peerCfg
|
||||||
m.inactivityMonitors[peerCfg.PeerConnID] = im
|
|
||||||
|
|
||||||
m.managedPeers[peerCfg.PublicKey] = &peerCfg
|
|
||||||
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
|
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
|
||||||
peerCfg: &peerCfg,
|
peerCfg: peerCfg,
|
||||||
expectedWatcher: watcherInactivity,
|
expectedWatcher: watcherInactivity,
|
||||||
}
|
}
|
||||||
|
|
||||||
peerCfg.Log.Infof("starting inactivity monitor on peer that has been removed from exclude list")
|
m.inactivityManager.AddPeer(peerCfg)
|
||||||
go im.Start(ctx, m.onInactive)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -382,12 +446,7 @@ func (m *Manager) removePeer(peerID string) {
|
|||||||
|
|
||||||
cfg.Log.Infof("removing lazy peer")
|
cfg.Log.Infof("removing lazy peer")
|
||||||
|
|
||||||
if im, ok := m.inactivityMonitors[cfg.PeerConnID]; ok {
|
m.inactivityManager.RemovePeer(cfg.PublicKey)
|
||||||
im.Stop()
|
|
||||||
delete(m.inactivityMonitors, cfg.PeerConnID)
|
|
||||||
cfg.Log.Debugf("inactivity monitor stopped")
|
|
||||||
}
|
|
||||||
|
|
||||||
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
|
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
|
||||||
delete(m.managedPeers, peerID)
|
delete(m.managedPeers, peerID)
|
||||||
delete(m.managedPeersByConnID, cfg.PeerConnID)
|
delete(m.managedPeersByConnID, cfg.PeerConnID)
|
||||||
@@ -399,10 +458,7 @@ func (m *Manager) close() {
|
|||||||
|
|
||||||
m.connStateDispatcher.RemoveListener(m.connStateListener)
|
m.connStateDispatcher.RemoveListener(m.connStateListener)
|
||||||
m.activityManager.Close()
|
m.activityManager.Close()
|
||||||
for _, iw := range m.inactivityMonitors {
|
|
||||||
iw.Stop()
|
|
||||||
}
|
|
||||||
m.inactivityMonitors = make(map[peerid.ConnID]*inactivity.Monitor)
|
|
||||||
m.managedPeers = make(map[string]*lazyconn.PeerConfig)
|
m.managedPeers = make(map[string]*lazyconn.PeerConfig)
|
||||||
m.managedPeersByConnID = make(map[peerid.ConnID]*managedPeer)
|
m.managedPeersByConnID = make(map[peerid.ConnID]*managedPeer)
|
||||||
|
|
||||||
@@ -415,7 +471,49 @@ func (m *Manager) close() {
|
|||||||
log.Infof("lazy connection manager closed")
|
log.Infof("lazy connection manager closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) {
|
// shouldDeferIdleForHA checks if peer should stay connected due to HA group requirements
|
||||||
|
func (m *Manager) shouldDeferIdleForHA(peerID string) bool {
|
||||||
|
m.routesMu.RLock()
|
||||||
|
defer m.routesMu.RUnlock()
|
||||||
|
|
||||||
|
haGroups := m.peerToHAGroups[peerID]
|
||||||
|
if len(haGroups) == 0 {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, haGroup := range haGroups {
|
||||||
|
groupPeers := m.haGroupToPeers[haGroup]
|
||||||
|
|
||||||
|
for _, groupPeerID := range groupPeers {
|
||||||
|
if groupPeerID == peerID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg, ok := m.managedPeers[groupPeerID]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
groupMp, ok := m.managedPeersByConnID[cfg.PeerConnID]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if groupMp.expectedWatcher != watcherInactivity {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Other member is still connected, defer idle
|
||||||
|
if peer, ok := m.peerStore.PeerConn(groupPeerID); ok && peer.IsConnected() {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) onPeerActivity(peerConnID peerid.ConnID) {
|
||||||
m.managedPeersMu.Lock()
|
m.managedPeersMu.Lock()
|
||||||
defer m.managedPeersMu.Unlock()
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
@@ -432,22 +530,28 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID)
|
|||||||
|
|
||||||
mp.peerCfg.Log.Infof("detected peer activity")
|
mp.peerCfg.Log.Infof("detected peer activity")
|
||||||
|
|
||||||
if !m.activateSinglePeer(ctx, mp.peerCfg, mp) {
|
if !m.activateSinglePeer(mp.peerCfg, mp) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
m.activateHAGroupPeers(ctx, mp.peerCfg.PublicKey)
|
m.activateHAGroupPeers(mp.peerCfg.PublicKey)
|
||||||
|
|
||||||
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
|
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
|
func (m *Manager) onPeerInactivityTimedOut(peerID string) {
|
||||||
m.managedPeersMu.Lock()
|
m.managedPeersMu.Lock()
|
||||||
defer m.managedPeersMu.Unlock()
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
mp, ok := m.managedPeersByConnID[peerConnID]
|
peerCfg, ok := m.managedPeers[peerID]
|
||||||
if !ok {
|
if !ok {
|
||||||
log.Errorf("peer not found by id: %v", peerConnID)
|
log.Errorf("peer not found by peerId: %v", peerID)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
|
||||||
|
if !ok {
|
||||||
|
log.Errorf("peer not found by conn id: %v", peerCfg.PeerConnID)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -456,6 +560,11 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if m.shouldDeferIdleForHA(mp.peerCfg.PublicKey) {
|
||||||
|
// todo: review to how reset the inactivity detection
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
mp.peerCfg.Log.Infof("connection timed out")
|
mp.peerCfg.Log.Infof("connection timed out")
|
||||||
|
|
||||||
// this is blocking operation, potentially can be optimized
|
// this is blocking operation, potentially can be optimized
|
||||||
@@ -465,8 +574,7 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
|
|||||||
|
|
||||||
mp.expectedWatcher = watcherActivity
|
mp.expectedWatcher = watcherActivity
|
||||||
|
|
||||||
// just in case free up
|
m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey)
|
||||||
m.inactivityMonitors[peerConnID].PauseTimer()
|
|
||||||
|
|
||||||
if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil {
|
if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil {
|
||||||
mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err)
|
mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err)
|
||||||
@@ -487,14 +595,8 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID]
|
mp.peerCfg.Log.Infof("peer connected, starting inactivity monitor")
|
||||||
if !ok {
|
m.inactivityManager.AddPeer(mp.peerCfg)
|
||||||
mp.peerCfg.Log.Errorf("inactivity monitor not found for peer")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
mp.peerCfg.Log.Infof("peer connected, pausing inactivity monitor while connection is not disconnected")
|
|
||||||
iw.PauseTimer()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
|
func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
|
||||||
@@ -510,11 +612,6 @@ func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID]
|
// todo reset inactivity monitor
|
||||||
if !ok {
|
mp.peerCfg.Log.Warnf("--- peer disconnected, stopping inactivity monitor?")
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
mp.peerCfg.Log.Infof("reset inactivity monitor timer")
|
|
||||||
iw.ResetTimer()
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package lazyconn
|
package lazyconn
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"time"
|
"time"
|
||||||
@@ -11,4 +12,5 @@ import (
|
|||||||
type WGIface interface {
|
type WGIface interface {
|
||||||
RemovePeer(peerKey string) error
|
RemovePeer(peerKey string) error
|
||||||
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
|
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
|
||||||
|
GetStats() (map[string]configurer.WGStats, error)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -226,7 +226,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Close closes this peer Conn issuing a close event to the Conn closeCh
|
// Close closes this peer Conn issuing a close event to the Conn closeCh
|
||||||
func (conn *Conn) Close() {
|
func (conn *Conn) Close(graceful bool) {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.wgWatcherWg.Wait()
|
defer conn.wgWatcherWg.Wait()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
@@ -236,6 +236,10 @@ func (conn *Conn) Close() {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if graceful {
|
||||||
|
conn.signaler.SignalIdle(conn.config.Key)
|
||||||
|
}
|
||||||
|
|
||||||
conn.Log.Infof("close peer connection")
|
conn.Log.Infof("close peer connection")
|
||||||
conn.ctxCancel()
|
conn.ctxCancel()
|
||||||
|
|
||||||
@@ -317,12 +321,12 @@ func (conn *Conn) WgConfig() WgConfig {
|
|||||||
return conn.config.WgConfig
|
return conn.config.WgConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsConnected unit tests only
|
// IsConnected returns true if the peer is connected
|
||||||
// refactor unit test to use status recorder use refactor status recorded to manage connection status in peer.Conn
|
|
||||||
func (conn *Conn) IsConnected() bool {
|
func (conn *Conn) IsConnected() bool {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
return conn.currentConnPriority != conntype.None
|
|
||||||
|
return conn.evalStatus() == StatusConnected
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) GetKey() string {
|
func (conn *Conn) GetKey() string {
|
||||||
|
|||||||
@@ -68,3 +68,13 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string,
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Signaler) SignalIdle(remoteKey string) error {
|
||||||
|
return s.signal.Send(&sProto.Message{
|
||||||
|
Key: s.wgPrivateKey.PublicKey().String(),
|
||||||
|
RemoteKey: remoteKey,
|
||||||
|
Body: &sProto.Body{
|
||||||
|
Type: sProto.Body_GO_IDLE,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|||||||
@@ -95,6 +95,17 @@ func (s *Store) PeerConnOpen(ctx context.Context, pubKey string) {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) PeerConnIdle(pubKey string) {
|
||||||
|
s.peerConnsMu.RLock()
|
||||||
|
defer s.peerConnsMu.RUnlock()
|
||||||
|
|
||||||
|
p, ok := s.peerConns[pubKey]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.Close(true)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) PeerConnClose(pubKey string) {
|
func (s *Store) PeerConnClose(pubKey string) {
|
||||||
s.peerConnsMu.RLock()
|
s.peerConnsMu.RLock()
|
||||||
defer s.peerConnsMu.RUnlock()
|
defer s.peerConnsMu.RUnlock()
|
||||||
@@ -103,7 +114,7 @@ func (s *Store) PeerConnClose(pubKey string) {
|
|||||||
if !ok {
|
if !ok {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
p.Close()
|
p.Close(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) PeersPubKey() []string {
|
func (s *Store) PeersPubKey() []string {
|
||||||
|
|||||||
@@ -1853,20 +1853,23 @@ func (am *DefaultAccountManager) GetOrCreateAccountByPrivateDomain(ctx context.C
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) {
|
func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) {
|
||||||
account, err := am.Store.GetAccount(ctx, accountId)
|
var account *types.Account
|
||||||
|
err := am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||||
|
var err error
|
||||||
|
account, err = transaction.GetAccount(ctx, accountId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if account.IsDomainPrimaryAccount {
|
if account.IsDomainPrimaryAccount {
|
||||||
return account, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
existingPrimaryAccountID, err := am.Store.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain)
|
existingPrimaryAccountID, err := transaction.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain)
|
||||||
|
|
||||||
// error is not a not found error
|
// error is not a not found error
|
||||||
if handleNotFound(err) != nil {
|
if handleNotFound(err) != nil {
|
||||||
return nil, err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// a primary account already exists for this private domain
|
// a primary account already exists for this private domain
|
||||||
@@ -1875,16 +1878,22 @@ func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, acc
|
|||||||
"accountId": accountId,
|
"accountId": accountId,
|
||||||
"existingAccountId": existingPrimaryAccountID,
|
"existingAccountId": existingPrimaryAccountID,
|
||||||
}).Errorf("cannot update account to primary, another account already exists as primary for the same domain")
|
}).Errorf("cannot update account to primary, another account already exists as primary for the same domain")
|
||||||
return nil, status.Errorf(status.Internal, "cannot update account to primary")
|
return status.Errorf(status.Internal, "cannot update account to primary")
|
||||||
}
|
}
|
||||||
|
|
||||||
account.IsDomainPrimaryAccount = true
|
account.IsDomainPrimaryAccount = true
|
||||||
|
|
||||||
if err := am.Store.SaveAccount(ctx, account); err != nil {
|
if err := transaction.SaveAccount(ctx, account); err != nil {
|
||||||
log.WithContext(ctx).WithFields(log.Fields{
|
log.WithContext(ctx).WithFields(log.Fields{
|
||||||
"accountId": accountId,
|
"accountId": accountId,
|
||||||
}).Errorf("failed to update account to primary: %v", err)
|
}).Errorf("failed to update account to primary: %v", err)
|
||||||
return nil, status.Errorf(status.Internal, "failed to update account to primary")
|
return status.Errorf(status.Internal, "failed to update account to primary")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return account, nil
|
return account, nil
|
||||||
|
|||||||
@@ -426,6 +426,10 @@ components:
|
|||||||
items:
|
items:
|
||||||
type: string
|
type: string
|
||||||
example: "stage-host-1"
|
example: "stage-host-1"
|
||||||
|
ephemeral:
|
||||||
|
description: Indicates whether the peer is ephemeral or not
|
||||||
|
type: boolean
|
||||||
|
example: false
|
||||||
required:
|
required:
|
||||||
- city_name
|
- city_name
|
||||||
- connected
|
- connected
|
||||||
@@ -450,6 +454,7 @@ components:
|
|||||||
- approval_required
|
- approval_required
|
||||||
- serial_number
|
- serial_number
|
||||||
- extra_dns_labels
|
- extra_dns_labels
|
||||||
|
- ephemeral
|
||||||
AccessiblePeer:
|
AccessiblePeer:
|
||||||
allOf:
|
allOf:
|
||||||
- $ref: '#/components/schemas/PeerMinimum'
|
- $ref: '#/components/schemas/PeerMinimum'
|
||||||
|
|||||||
@@ -1016,6 +1016,9 @@ type Peer struct {
|
|||||||
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
|
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
|
||||||
DnsLabel string `json:"dns_label"`
|
DnsLabel string `json:"dns_label"`
|
||||||
|
|
||||||
|
// Ephemeral Indicates whether the peer is ephemeral or not
|
||||||
|
Ephemeral bool `json:"ephemeral"`
|
||||||
|
|
||||||
// ExtraDnsLabels Extra DNS labels added to the peer
|
// ExtraDnsLabels Extra DNS labels added to the peer
|
||||||
ExtraDnsLabels []string `json:"extra_dns_labels"`
|
ExtraDnsLabels []string `json:"extra_dns_labels"`
|
||||||
|
|
||||||
@@ -1097,6 +1100,9 @@ type PeerBatch struct {
|
|||||||
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
|
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
|
||||||
DnsLabel string `json:"dns_label"`
|
DnsLabel string `json:"dns_label"`
|
||||||
|
|
||||||
|
// Ephemeral Indicates whether the peer is ephemeral or not
|
||||||
|
Ephemeral bool `json:"ephemeral"`
|
||||||
|
|
||||||
// ExtraDnsLabels Extra DNS labels added to the peer
|
// ExtraDnsLabels Extra DNS labels added to the peer
|
||||||
ExtraDnsLabels []string `json:"extra_dns_labels"`
|
ExtraDnsLabels []string `json:"extra_dns_labels"`
|
||||||
|
|
||||||
|
|||||||
@@ -365,6 +365,7 @@ func toSinglePeerResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dnsD
|
|||||||
CityName: peer.Location.CityName,
|
CityName: peer.Location.CityName,
|
||||||
SerialNumber: peer.Meta.SystemSerialNumber,
|
SerialNumber: peer.Meta.SystemSerialNumber,
|
||||||
InactivityExpirationEnabled: peer.InactivityExpirationEnabled,
|
InactivityExpirationEnabled: peer.InactivityExpirationEnabled,
|
||||||
|
Ephemeral: peer.Ephemeral,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -37,7 +37,8 @@ func (am *DefaultAccountManager) UpdateIntegratedValidatorGroups(ctx context.Con
|
|||||||
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
|
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
|
||||||
defer unlock()
|
defer unlock()
|
||||||
|
|
||||||
a, err := am.Store.GetAccountByUser(ctx, userID)
|
return am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||||
|
a, err := transaction.GetAccountByUser(ctx, userID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@@ -51,7 +52,8 @@ func (am *DefaultAccountManager) UpdateIntegratedValidatorGroups(ctx context.Con
|
|||||||
a.Settings.Extra = extra
|
a.Settings.Extra = extra
|
||||||
}
|
}
|
||||||
extra.IntegratedValidatorGroups = groups
|
extra.IntegratedValidatorGroups = groups
|
||||||
return am.Store.SaveAccount(ctx, a)
|
return transaction.SaveAccount(ctx, a)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *DefaultAccountManager) GroupValidation(ctx context.Context, accountID string, groupIDs []string) (bool, error) {
|
func (am *DefaultAccountManager) GroupValidation(ctx context.Context, accountID string, groupIDs []string) (bool, error) {
|
||||||
|
|||||||
@@ -92,7 +92,7 @@ func (am *DefaultAccountManager) getUserAccessiblePeers(ctx context.Context, acc
|
|||||||
|
|
||||||
// fetch all the peers that have access to the user's peers
|
// fetch all the peers that have access to the user's peers
|
||||||
for _, peer := range peers {
|
for _, peer := range peers {
|
||||||
aclPeers, _ := account.GetPeerConnectionResources(ctx, peer.ID, approvedPeersMap)
|
aclPeers, _ := account.GetPeerConnectionResources(ctx, peer, approvedPeersMap)
|
||||||
for _, p := range aclPeers {
|
for _, p := range aclPeers {
|
||||||
peersMap[p.ID] = p
|
peersMap[p.ID] = p
|
||||||
}
|
}
|
||||||
@@ -1149,7 +1149,7 @@ func (am *DefaultAccountManager) checkIfUserOwnsPeer(ctx context.Context, accoun
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, p := range userPeers {
|
for _, p := range userPeers {
|
||||||
aclPeers, _ := account.GetPeerConnectionResources(ctx, p.ID, approvedPeersMap)
|
aclPeers, _ := account.GetPeerConnectionResources(ctx, p, approvedPeersMap)
|
||||||
for _, aclPeer := range aclPeers {
|
for _, aclPeer := range aclPeers {
|
||||||
if aclPeer.ID == peer.ID {
|
if aclPeer.ID == peer.ID {
|
||||||
return peer, nil
|
return peer, nil
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
|
|||||||
ID: "peerB",
|
ID: "peerB",
|
||||||
IP: net.ParseIP("100.65.80.39"),
|
IP: net.ParseIP("100.65.80.39"),
|
||||||
Status: &nbpeer.PeerStatus{},
|
Status: &nbpeer.PeerStatus{},
|
||||||
|
Meta: nbpeer.PeerSystemMeta{WtVersion: "0.48.0"},
|
||||||
},
|
},
|
||||||
"peerC": {
|
"peerC": {
|
||||||
ID: "peerC",
|
ID: "peerC",
|
||||||
@@ -63,6 +64,12 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
|
|||||||
IP: net.ParseIP("100.65.31.2"),
|
IP: net.ParseIP("100.65.31.2"),
|
||||||
Status: &nbpeer.PeerStatus{},
|
Status: &nbpeer.PeerStatus{},
|
||||||
},
|
},
|
||||||
|
"peerK": {
|
||||||
|
ID: "peerK",
|
||||||
|
IP: net.ParseIP("100.32.80.1"),
|
||||||
|
Status: &nbpeer.PeerStatus{},
|
||||||
|
Meta: nbpeer.PeerSystemMeta{WtVersion: "0.30.0"},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Groups: map[string]*types.Group{
|
Groups: map[string]*types.Group{
|
||||||
"GroupAll": {
|
"GroupAll": {
|
||||||
@@ -111,6 +118,13 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
|
|||||||
"peerI",
|
"peerI",
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"GroupWorkflow": {
|
||||||
|
ID: "GroupWorkflow",
|
||||||
|
Name: "workflow",
|
||||||
|
Peers: []string{
|
||||||
|
"peerK",
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Policies: []*types.Policy{
|
Policies: []*types.Policy{
|
||||||
{
|
{
|
||||||
@@ -189,6 +203,39 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
ID: "RuleWorkflow",
|
||||||
|
Name: "Workflow",
|
||||||
|
Description: "No description",
|
||||||
|
Enabled: true,
|
||||||
|
Rules: []*types.PolicyRule{
|
||||||
|
{
|
||||||
|
ID: "RuleWorkflow",
|
||||||
|
Name: "Workflow",
|
||||||
|
Description: "No description",
|
||||||
|
Bidirectional: true,
|
||||||
|
Enabled: true,
|
||||||
|
Protocol: types.PolicyRuleProtocolTCP,
|
||||||
|
Action: types.PolicyTrafficActionAccept,
|
||||||
|
PortRanges: []types.RulePortRange{
|
||||||
|
{
|
||||||
|
Start: 8088,
|
||||||
|
End: 8088,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Start: 9090,
|
||||||
|
End: 9095,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Sources: []string{
|
||||||
|
"GroupWorkflow",
|
||||||
|
},
|
||||||
|
Destinations: []string{
|
||||||
|
"GroupDMZ",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -199,14 +246,14 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
|
|||||||
|
|
||||||
t.Run("check that all peers get map", func(t *testing.T) {
|
t.Run("check that all peers get map", func(t *testing.T) {
|
||||||
for _, p := range account.Peers {
|
for _, p := range account.Peers {
|
||||||
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), p.ID, validatedPeers)
|
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), p, validatedPeers)
|
||||||
assert.GreaterOrEqual(t, len(peers), 2, "minimum number peers should present")
|
assert.GreaterOrEqual(t, len(peers), 1, "minimum number peers should present")
|
||||||
assert.GreaterOrEqual(t, len(firewallRules), 2, "minimum number of firewall rules should present")
|
assert.GreaterOrEqual(t, len(firewallRules), 1, "minimum number of firewall rules should present")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("check first peer map details", func(t *testing.T) {
|
t.Run("check first peer map details", func(t *testing.T) {
|
||||||
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", validatedPeers)
|
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], validatedPeers)
|
||||||
assert.Len(t, peers, 8)
|
assert.Len(t, peers, 8)
|
||||||
assert.Contains(t, peers, account.Peers["peerA"])
|
assert.Contains(t, peers, account.Peers["peerA"])
|
||||||
assert.Contains(t, peers, account.Peers["peerC"])
|
assert.Contains(t, peers, account.Peers["peerC"])
|
||||||
@@ -364,6 +411,32 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
|
|||||||
assert.True(t, contains, "rule not found in expected rules %#v", rule)
|
assert.True(t, contains, "rule not found in expected rules %#v", rule)
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.Run("check port ranges support for older peers", func(t *testing.T) {
|
||||||
|
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerK"], validatedPeers)
|
||||||
|
assert.Len(t, peers, 1)
|
||||||
|
assert.Contains(t, peers, account.Peers["peerI"])
|
||||||
|
|
||||||
|
expectedFirewallRules := []*types.FirewallRule{
|
||||||
|
{
|
||||||
|
PeerIP: "100.65.31.2",
|
||||||
|
Direction: types.FirewallRuleDirectionIN,
|
||||||
|
Action: "accept",
|
||||||
|
Protocol: "tcp",
|
||||||
|
Port: "8088",
|
||||||
|
PolicyID: "RuleWorkflow",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
PeerIP: "100.65.31.2",
|
||||||
|
Direction: types.FirewallRuleDirectionOUT,
|
||||||
|
Action: "accept",
|
||||||
|
Protocol: "tcp",
|
||||||
|
Port: "8088",
|
||||||
|
PolicyID: "RuleWorkflow",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
assert.ElementsMatch(t, firewallRules, expectedFirewallRules)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestAccount_getPeersByPolicyDirect(t *testing.T) {
|
func TestAccount_getPeersByPolicyDirect(t *testing.T) {
|
||||||
@@ -466,10 +539,10 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
t.Run("check first peer map", func(t *testing.T) {
|
t.Run("check first peer map", func(t *testing.T) {
|
||||||
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", approvedPeers)
|
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], approvedPeers)
|
||||||
assert.Contains(t, peers, account.Peers["peerC"])
|
assert.Contains(t, peers, account.Peers["peerC"])
|
||||||
|
|
||||||
epectedFirewallRules := []*types.FirewallRule{
|
expectedFirewallRules := []*types.FirewallRule{
|
||||||
{
|
{
|
||||||
PeerIP: "100.65.254.139",
|
PeerIP: "100.65.254.139",
|
||||||
Direction: types.FirewallRuleDirectionIN,
|
Direction: types.FirewallRuleDirectionIN,
|
||||||
@@ -487,19 +560,19 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
|
|||||||
PolicyID: "RuleSwarm",
|
PolicyID: "RuleSwarm",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Len(t, firewallRules, len(epectedFirewallRules))
|
assert.Len(t, firewallRules, len(expectedFirewallRules))
|
||||||
slices.SortFunc(epectedFirewallRules, sortFunc())
|
slices.SortFunc(expectedFirewallRules, sortFunc())
|
||||||
slices.SortFunc(firewallRules, sortFunc())
|
slices.SortFunc(firewallRules, sortFunc())
|
||||||
for i := range firewallRules {
|
for i := range firewallRules {
|
||||||
assert.Equal(t, epectedFirewallRules[i], firewallRules[i])
|
assert.Equal(t, expectedFirewallRules[i], firewallRules[i])
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("check second peer map", func(t *testing.T) {
|
t.Run("check second peer map", func(t *testing.T) {
|
||||||
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerC", approvedPeers)
|
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerC"], approvedPeers)
|
||||||
assert.Contains(t, peers, account.Peers["peerB"])
|
assert.Contains(t, peers, account.Peers["peerB"])
|
||||||
|
|
||||||
epectedFirewallRules := []*types.FirewallRule{
|
expectedFirewallRules := []*types.FirewallRule{
|
||||||
{
|
{
|
||||||
PeerIP: "100.65.80.39",
|
PeerIP: "100.65.80.39",
|
||||||
Direction: types.FirewallRuleDirectionIN,
|
Direction: types.FirewallRuleDirectionIN,
|
||||||
@@ -517,21 +590,21 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
|
|||||||
PolicyID: "RuleSwarm",
|
PolicyID: "RuleSwarm",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Len(t, firewallRules, len(epectedFirewallRules))
|
assert.Len(t, firewallRules, len(expectedFirewallRules))
|
||||||
slices.SortFunc(epectedFirewallRules, sortFunc())
|
slices.SortFunc(expectedFirewallRules, sortFunc())
|
||||||
slices.SortFunc(firewallRules, sortFunc())
|
slices.SortFunc(firewallRules, sortFunc())
|
||||||
for i := range firewallRules {
|
for i := range firewallRules {
|
||||||
assert.Equal(t, epectedFirewallRules[i], firewallRules[i])
|
assert.Equal(t, expectedFirewallRules[i], firewallRules[i])
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
account.Policies[1].Rules[0].Bidirectional = false
|
account.Policies[1].Rules[0].Bidirectional = false
|
||||||
|
|
||||||
t.Run("check first peer map directional only", func(t *testing.T) {
|
t.Run("check first peer map directional only", func(t *testing.T) {
|
||||||
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", approvedPeers)
|
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], approvedPeers)
|
||||||
assert.Contains(t, peers, account.Peers["peerC"])
|
assert.Contains(t, peers, account.Peers["peerC"])
|
||||||
|
|
||||||
epectedFirewallRules := []*types.FirewallRule{
|
expectedFirewallRules := []*types.FirewallRule{
|
||||||
{
|
{
|
||||||
PeerIP: "100.65.254.139",
|
PeerIP: "100.65.254.139",
|
||||||
Direction: types.FirewallRuleDirectionOUT,
|
Direction: types.FirewallRuleDirectionOUT,
|
||||||
@@ -541,19 +614,19 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
|
|||||||
PolicyID: "RuleSwarm",
|
PolicyID: "RuleSwarm",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Len(t, firewallRules, len(epectedFirewallRules))
|
assert.Len(t, firewallRules, len(expectedFirewallRules))
|
||||||
slices.SortFunc(epectedFirewallRules, sortFunc())
|
slices.SortFunc(expectedFirewallRules, sortFunc())
|
||||||
slices.SortFunc(firewallRules, sortFunc())
|
slices.SortFunc(firewallRules, sortFunc())
|
||||||
for i := range firewallRules {
|
for i := range firewallRules {
|
||||||
assert.Equal(t, epectedFirewallRules[i], firewallRules[i])
|
assert.Equal(t, expectedFirewallRules[i], firewallRules[i])
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("check second peer map directional only", func(t *testing.T) {
|
t.Run("check second peer map directional only", func(t *testing.T) {
|
||||||
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerC", approvedPeers)
|
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerC"], approvedPeers)
|
||||||
assert.Contains(t, peers, account.Peers["peerB"])
|
assert.Contains(t, peers, account.Peers["peerB"])
|
||||||
|
|
||||||
epectedFirewallRules := []*types.FirewallRule{
|
expectedFirewallRules := []*types.FirewallRule{
|
||||||
{
|
{
|
||||||
PeerIP: "100.65.80.39",
|
PeerIP: "100.65.80.39",
|
||||||
Direction: types.FirewallRuleDirectionIN,
|
Direction: types.FirewallRuleDirectionIN,
|
||||||
@@ -563,11 +636,11 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
|
|||||||
PolicyID: "RuleSwarm",
|
PolicyID: "RuleSwarm",
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
assert.Len(t, firewallRules, len(epectedFirewallRules))
|
assert.Len(t, firewallRules, len(expectedFirewallRules))
|
||||||
slices.SortFunc(epectedFirewallRules, sortFunc())
|
slices.SortFunc(expectedFirewallRules, sortFunc())
|
||||||
slices.SortFunc(firewallRules, sortFunc())
|
slices.SortFunc(firewallRules, sortFunc())
|
||||||
for i := range firewallRules {
|
for i := range firewallRules {
|
||||||
assert.Equal(t, epectedFirewallRules[i], firewallRules[i])
|
assert.Equal(t, expectedFirewallRules[i], firewallRules[i])
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@@ -748,7 +821,7 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
|
|||||||
t.Run("verify peer's network map with default group peer list", func(t *testing.T) {
|
t.Run("verify peer's network map with default group peer list", func(t *testing.T) {
|
||||||
// peerB doesn't fulfill the NB posture check but is included in the destination group Swarm,
|
// peerB doesn't fulfill the NB posture check but is included in the destination group Swarm,
|
||||||
// will establish a connection with all source peers satisfying the NB posture check.
|
// will establish a connection with all source peers satisfying the NB posture check.
|
||||||
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", approvedPeers)
|
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], approvedPeers)
|
||||||
assert.Len(t, peers, 4)
|
assert.Len(t, peers, 4)
|
||||||
assert.Len(t, firewallRules, 4)
|
assert.Len(t, firewallRules, 4)
|
||||||
assert.Contains(t, peers, account.Peers["peerA"])
|
assert.Contains(t, peers, account.Peers["peerA"])
|
||||||
@@ -758,7 +831,7 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
|
|||||||
|
|
||||||
// peerC satisfy the NB posture check, should establish connection to all destination group peer's
|
// peerC satisfy the NB posture check, should establish connection to all destination group peer's
|
||||||
// We expect a single permissive firewall rule which all outgoing connections
|
// We expect a single permissive firewall rule which all outgoing connections
|
||||||
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerC", approvedPeers)
|
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerC"], approvedPeers)
|
||||||
assert.Len(t, peers, len(account.Groups["GroupSwarm"].Peers))
|
assert.Len(t, peers, len(account.Groups["GroupSwarm"].Peers))
|
||||||
assert.Len(t, firewallRules, 1)
|
assert.Len(t, firewallRules, 1)
|
||||||
expectedFirewallRules := []*types.FirewallRule{
|
expectedFirewallRules := []*types.FirewallRule{
|
||||||
@@ -775,7 +848,7 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
|
|||||||
|
|
||||||
// peerE doesn't fulfill the NB posture check and exists in only destination group Swarm,
|
// peerE doesn't fulfill the NB posture check and exists in only destination group Swarm,
|
||||||
// all source group peers satisfying the NB posture check should establish connection
|
// all source group peers satisfying the NB posture check should establish connection
|
||||||
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerE", approvedPeers)
|
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerE"], approvedPeers)
|
||||||
assert.Len(t, peers, 4)
|
assert.Len(t, peers, 4)
|
||||||
assert.Len(t, firewallRules, 4)
|
assert.Len(t, firewallRules, 4)
|
||||||
assert.Contains(t, peers, account.Peers["peerA"])
|
assert.Contains(t, peers, account.Peers["peerA"])
|
||||||
@@ -785,7 +858,7 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
|
|||||||
|
|
||||||
// peerI doesn't fulfill the OS version posture check and exists in only destination group Swarm,
|
// peerI doesn't fulfill the OS version posture check and exists in only destination group Swarm,
|
||||||
// all source group peers satisfying the NB posture check should establish connection
|
// all source group peers satisfying the NB posture check should establish connection
|
||||||
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerI", approvedPeers)
|
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerI"], approvedPeers)
|
||||||
assert.Len(t, peers, 4)
|
assert.Len(t, peers, 4)
|
||||||
assert.Len(t, firewallRules, 4)
|
assert.Len(t, firewallRules, 4)
|
||||||
assert.Contains(t, peers, account.Peers["peerA"])
|
assert.Contains(t, peers, account.Peers["peerA"])
|
||||||
@@ -800,19 +873,19 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
|
|||||||
|
|
||||||
// peerB doesn't satisfy the NB posture check, and doesn't exist in destination group peer's
|
// peerB doesn't satisfy the NB posture check, and doesn't exist in destination group peer's
|
||||||
// no connection should be established to any peer of destination group
|
// no connection should be established to any peer of destination group
|
||||||
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", approvedPeers)
|
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], approvedPeers)
|
||||||
assert.Len(t, peers, 0)
|
assert.Len(t, peers, 0)
|
||||||
assert.Len(t, firewallRules, 0)
|
assert.Len(t, firewallRules, 0)
|
||||||
|
|
||||||
// peerI doesn't satisfy the OS version posture check, and doesn't exist in destination group peer's
|
// peerI doesn't satisfy the OS version posture check, and doesn't exist in destination group peer's
|
||||||
// no connection should be established to any peer of destination group
|
// no connection should be established to any peer of destination group
|
||||||
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerI", approvedPeers)
|
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerI"], approvedPeers)
|
||||||
assert.Len(t, peers, 0)
|
assert.Len(t, peers, 0)
|
||||||
assert.Len(t, firewallRules, 0)
|
assert.Len(t, firewallRules, 0)
|
||||||
|
|
||||||
// peerC satisfy the NB posture check, should establish connection to all destination group peer's
|
// peerC satisfy the NB posture check, should establish connection to all destination group peer's
|
||||||
// We expect a single permissive firewall rule which all outgoing connections
|
// We expect a single permissive firewall rule which all outgoing connections
|
||||||
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerC", approvedPeers)
|
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerC"], approvedPeers)
|
||||||
assert.Len(t, peers, len(account.Groups["GroupSwarm"].Peers))
|
assert.Len(t, peers, len(account.Groups["GroupSwarm"].Peers))
|
||||||
assert.Len(t, firewallRules, len(account.Groups["GroupSwarm"].Peers))
|
assert.Len(t, firewallRules, len(account.Groups["GroupSwarm"].Peers))
|
||||||
|
|
||||||
@@ -827,14 +900,14 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
|
|||||||
|
|
||||||
// peerE doesn't fulfill the NB posture check and exists in only destination group Swarm,
|
// peerE doesn't fulfill the NB posture check and exists in only destination group Swarm,
|
||||||
// all source group peers satisfying the NB posture check should establish connection
|
// all source group peers satisfying the NB posture check should establish connection
|
||||||
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerE", approvedPeers)
|
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerE"], approvedPeers)
|
||||||
assert.Len(t, peers, 3)
|
assert.Len(t, peers, 3)
|
||||||
assert.Len(t, firewallRules, 3)
|
assert.Len(t, firewallRules, 3)
|
||||||
assert.Contains(t, peers, account.Peers["peerA"])
|
assert.Contains(t, peers, account.Peers["peerA"])
|
||||||
assert.Contains(t, peers, account.Peers["peerC"])
|
assert.Contains(t, peers, account.Peers["peerC"])
|
||||||
assert.Contains(t, peers, account.Peers["peerD"])
|
assert.Contains(t, peers, account.Peers["peerD"])
|
||||||
|
|
||||||
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerA", approvedPeers)
|
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerA"], approvedPeers)
|
||||||
assert.Len(t, peers, 5)
|
assert.Len(t, peers, 5)
|
||||||
// assert peers from Group Swarm
|
// assert peers from Group Swarm
|
||||||
assert.Contains(t, peers, account.Peers["peerD"])
|
assert.Contains(t, peers, account.Peers["peerD"])
|
||||||
|
|||||||
@@ -24,20 +24,12 @@ func sanitizeVersion(version string) string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (n *NBVersionCheck) Check(ctx context.Context, peer nbpeer.Peer) (bool, error) {
|
func (n *NBVersionCheck) Check(ctx context.Context, peer nbpeer.Peer) (bool, error) {
|
||||||
peerVersion := sanitizeVersion(peer.Meta.WtVersion)
|
meetsMin, err := MeetsMinVersion(n.MinVersion, peer.Meta.WtVersion)
|
||||||
minVersion := sanitizeVersion(n.MinVersion)
|
|
||||||
|
|
||||||
peerNBVersion, err := version.NewVersion(peerVersion)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
constraints, err := version.NewConstraint(">= " + minVersion)
|
if meetsMin {
|
||||||
if err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if constraints.Check(peerNBVersion) {
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -60,3 +52,21 @@ func (n *NBVersionCheck) Validate() error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MeetsMinVersion checks if the peer's version meets or exceeds the minimum required version
|
||||||
|
func MeetsMinVersion(minVer, peerVer string) (bool, error) {
|
||||||
|
peerVer = sanitizeVersion(peerVer)
|
||||||
|
minVer = sanitizeVersion(minVer)
|
||||||
|
|
||||||
|
peerNBVer, err := version.NewVersion(peerVer)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
constraints, err := version.NewConstraint(">= " + minVer)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return constraints.Check(peerNBVer), nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -139,3 +139,68 @@ func TestNBVersionCheck_Validate(t *testing.T) {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestMeetsMinVersion(t *testing.T) {
|
||||||
|
tests := []struct {
|
||||||
|
name string
|
||||||
|
minVer string
|
||||||
|
peerVer string
|
||||||
|
want bool
|
||||||
|
wantErr bool
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "Peer version greater than min version",
|
||||||
|
minVer: "0.26.0",
|
||||||
|
peerVer: "0.60.1",
|
||||||
|
want: true,
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Peer version equals min version",
|
||||||
|
minVer: "1.0.0",
|
||||||
|
peerVer: "1.0.0",
|
||||||
|
want: true,
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Peer version less than min version",
|
||||||
|
minVer: "1.0.0",
|
||||||
|
peerVer: "0.9.9",
|
||||||
|
want: false,
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Peer version with pre-release tag greater than min version",
|
||||||
|
minVer: "1.0.0",
|
||||||
|
peerVer: "1.0.1-alpha",
|
||||||
|
want: true,
|
||||||
|
wantErr: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Invalid peer version format",
|
||||||
|
minVer: "1.0.0",
|
||||||
|
peerVer: "dev",
|
||||||
|
want: false,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Invalid min version format",
|
||||||
|
minVer: "invalid.version",
|
||||||
|
peerVer: "1.0.0",
|
||||||
|
want: false,
|
||||||
|
wantErr: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tt := range tests {
|
||||||
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
|
got, err := MeetsMinVersion(tt.minVer, tt.peerVer)
|
||||||
|
if tt.wantErr {
|
||||||
|
assert.Error(t, err)
|
||||||
|
} else {
|
||||||
|
assert.NoError(t, err)
|
||||||
|
}
|
||||||
|
assert.Equal(t, tt.want, got)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -36,6 +36,9 @@ const (
|
|||||||
PublicCategory = "public"
|
PublicCategory = "public"
|
||||||
PrivateCategory = "private"
|
PrivateCategory = "private"
|
||||||
UnknownCategory = "unknown"
|
UnknownCategory = "unknown"
|
||||||
|
|
||||||
|
// firewallRuleMinPortRangesVer defines the minimum peer version that supports port range rules.
|
||||||
|
firewallRuleMinPortRangesVer = "0.48.0"
|
||||||
)
|
)
|
||||||
|
|
||||||
type LookupMap map[string]struct{}
|
type LookupMap map[string]struct{}
|
||||||
@@ -248,7 +251,7 @@ func (a *Account) GetPeerNetworkMap(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
aclPeers, firewallRules := a.GetPeerConnectionResources(ctx, peerID, validatedPeersMap)
|
aclPeers, firewallRules := a.GetPeerConnectionResources(ctx, peer, validatedPeersMap)
|
||||||
// exclude expired peers
|
// exclude expired peers
|
||||||
var peersToConnect []*nbpeer.Peer
|
var peersToConnect []*nbpeer.Peer
|
||||||
var expiredPeers []*nbpeer.Peer
|
var expiredPeers []*nbpeer.Peer
|
||||||
@@ -961,8 +964,9 @@ func (a *Account) UserGroupsRemoveFromPeers(userID string, groups ...string) map
|
|||||||
// GetPeerConnectionResources for a given peer
|
// GetPeerConnectionResources for a given peer
|
||||||
//
|
//
|
||||||
// This function returns the list of peers and firewall rules that are applicable to a given peer.
|
// This function returns the list of peers and firewall rules that are applicable to a given peer.
|
||||||
func (a *Account) GetPeerConnectionResources(ctx context.Context, peerID string, validatedPeersMap map[string]struct{}) ([]*nbpeer.Peer, []*FirewallRule) {
|
func (a *Account) GetPeerConnectionResources(ctx context.Context, peer *nbpeer.Peer, validatedPeersMap map[string]struct{}) ([]*nbpeer.Peer, []*FirewallRule) {
|
||||||
generateResources, getAccumulatedResources := a.connResourcesGenerator(ctx)
|
generateResources, getAccumulatedResources := a.connResourcesGenerator(ctx, peer)
|
||||||
|
|
||||||
for _, policy := range a.Policies {
|
for _, policy := range a.Policies {
|
||||||
if !policy.Enabled {
|
if !policy.Enabled {
|
||||||
continue
|
continue
|
||||||
@@ -973,8 +977,8 @@ func (a *Account) GetPeerConnectionResources(ctx context.Context, peerID string,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
sourcePeers, peerInSources := a.getAllPeersFromGroups(ctx, rule.Sources, peerID, policy.SourcePostureChecks, validatedPeersMap)
|
sourcePeers, peerInSources := a.getAllPeersFromGroups(ctx, rule.Sources, peer.ID, policy.SourcePostureChecks, validatedPeersMap)
|
||||||
destinationPeers, peerInDestinations := a.getAllPeersFromGroups(ctx, rule.Destinations, peerID, nil, validatedPeersMap)
|
destinationPeers, peerInDestinations := a.getAllPeersFromGroups(ctx, rule.Destinations, peer.ID, nil, validatedPeersMap)
|
||||||
|
|
||||||
if rule.Bidirectional {
|
if rule.Bidirectional {
|
||||||
if peerInSources {
|
if peerInSources {
|
||||||
@@ -1003,7 +1007,7 @@ func (a *Account) GetPeerConnectionResources(ctx context.Context, peerID string,
|
|||||||
// The generator function is used to generate the list of peers and firewall rules that are applicable to a given peer.
|
// The generator function is used to generate the list of peers and firewall rules that are applicable to a given peer.
|
||||||
// It safe to call the generator function multiple times for same peer and different rules no duplicates will be
|
// It safe to call the generator function multiple times for same peer and different rules no duplicates will be
|
||||||
// generated. The accumulator function returns the result of all the generator calls.
|
// generated. The accumulator function returns the result of all the generator calls.
|
||||||
func (a *Account) connResourcesGenerator(ctx context.Context) (func(*PolicyRule, []*nbpeer.Peer, int), func() ([]*nbpeer.Peer, []*FirewallRule)) {
|
func (a *Account) connResourcesGenerator(ctx context.Context, targetPeer *nbpeer.Peer) (func(*PolicyRule, []*nbpeer.Peer, int), func() ([]*nbpeer.Peer, []*FirewallRule)) {
|
||||||
rulesExists := make(map[string]struct{})
|
rulesExists := make(map[string]struct{})
|
||||||
peersExists := make(map[string]struct{})
|
peersExists := make(map[string]struct{})
|
||||||
rules := make([]*FirewallRule, 0)
|
rules := make([]*FirewallRule, 0)
|
||||||
@@ -1051,17 +1055,7 @@ func (a *Account) connResourcesGenerator(ctx context.Context) (func(*PolicyRule,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, port := range rule.Ports {
|
rules = append(rules, expandPortsAndRanges(fr, rule, targetPeer)...)
|
||||||
pr := fr // clone rule and add set new port
|
|
||||||
pr.Port = port
|
|
||||||
rules = append(rules, &pr)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, portRange := range rule.PortRanges {
|
|
||||||
pr := fr
|
|
||||||
pr.PortRange = portRange
|
|
||||||
rules = append(rules, &pr)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}, func() ([]*nbpeer.Peer, []*FirewallRule) {
|
}, func() ([]*nbpeer.Peer, []*FirewallRule) {
|
||||||
return peers, rules
|
return peers, rules
|
||||||
@@ -1590,3 +1584,45 @@ func (a *Account) AddAllGroup() error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// expandPortsAndRanges expands Ports and PortRanges of a rule into individual firewall rules
|
||||||
|
func expandPortsAndRanges(base FirewallRule, rule *PolicyRule, peer *nbpeer.Peer) []*FirewallRule {
|
||||||
|
var expanded []*FirewallRule
|
||||||
|
|
||||||
|
if len(rule.Ports) > 0 {
|
||||||
|
for _, port := range rule.Ports {
|
||||||
|
fr := base
|
||||||
|
fr.Port = port
|
||||||
|
expanded = append(expanded, &fr)
|
||||||
|
}
|
||||||
|
return expanded
|
||||||
|
}
|
||||||
|
|
||||||
|
supportPortRanges := peerSupportsPortRanges(peer.Meta.WtVersion)
|
||||||
|
for _, portRange := range rule.PortRanges {
|
||||||
|
fr := base
|
||||||
|
|
||||||
|
if supportPortRanges {
|
||||||
|
fr.PortRange = portRange
|
||||||
|
} else {
|
||||||
|
// Peer doesn't support port ranges, only allow single-port ranges
|
||||||
|
if portRange.Start != portRange.End {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
fr.Port = strconv.FormatUint(uint64(portRange.Start), 10)
|
||||||
|
}
|
||||||
|
expanded = append(expanded, &fr)
|
||||||
|
}
|
||||||
|
|
||||||
|
return expanded
|
||||||
|
}
|
||||||
|
|
||||||
|
// peerSupportsPortRanges checks if the peer version supports port ranges.
|
||||||
|
func peerSupportsPortRanges(peerVer string) bool {
|
||||||
|
if strings.Contains(peerVer, "dev") {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
meetMinVer, err := posture.MeetsMinVersion(firewallRuleMinPortRangesVer, peerVer)
|
||||||
|
return err == nil && meetMinVer
|
||||||
|
}
|
||||||
|
|||||||
@@ -76,7 +76,6 @@ func generateRouteFirewallRules(ctx context.Context, route *nbroute.Route, rule
|
|||||||
rules = append(rules, generateRulesWithPortRanges(baseRule, rule, rulesExists)...)
|
rules = append(rules, generateRulesWithPortRanges(baseRule, rule, rulesExists)...)
|
||||||
} else {
|
} else {
|
||||||
rules = append(rules, generateRulesWithPorts(ctx, baseRule, rule, rulesExists)...)
|
rules = append(rules, generateRulesWithPorts(ctx, baseRule, rule, rulesExists)...)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: generate IPv6 rules for dynamic routes
|
// TODO: generate IPv6 rules for dynamic routes
|
||||||
|
|||||||
@@ -29,6 +29,7 @@ const (
|
|||||||
Body_ANSWER Body_Type = 1
|
Body_ANSWER Body_Type = 1
|
||||||
Body_CANDIDATE Body_Type = 2
|
Body_CANDIDATE Body_Type = 2
|
||||||
Body_MODE Body_Type = 4
|
Body_MODE Body_Type = 4
|
||||||
|
Body_GO_IDLE Body_Type = 5
|
||||||
)
|
)
|
||||||
|
|
||||||
// Enum value maps for Body_Type.
|
// Enum value maps for Body_Type.
|
||||||
@@ -38,12 +39,14 @@ var (
|
|||||||
1: "ANSWER",
|
1: "ANSWER",
|
||||||
2: "CANDIDATE",
|
2: "CANDIDATE",
|
||||||
4: "MODE",
|
4: "MODE",
|
||||||
|
5: "GO_IDLE",
|
||||||
}
|
}
|
||||||
Body_Type_value = map[string]int32{
|
Body_Type_value = map[string]int32{
|
||||||
"OFFER": 0,
|
"OFFER": 0,
|
||||||
"ANSWER": 1,
|
"ANSWER": 1,
|
||||||
"CANDIDATE": 2,
|
"CANDIDATE": 2,
|
||||||
"MODE": 4,
|
"MODE": 4,
|
||||||
|
"GO_IDLE": 5,
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -225,7 +228,7 @@ type Body struct {
|
|||||||
FeaturesSupported []uint32 `protobuf:"varint,6,rep,packed,name=featuresSupported,proto3" json:"featuresSupported,omitempty"`
|
FeaturesSupported []uint32 `protobuf:"varint,6,rep,packed,name=featuresSupported,proto3" json:"featuresSupported,omitempty"`
|
||||||
// RosenpassConfig is a Rosenpass config of the remote peer our peer tries to connect to
|
// RosenpassConfig is a Rosenpass config of the remote peer our peer tries to connect to
|
||||||
RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"`
|
RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"`
|
||||||
// relayServerAddress is an IP:port of the relay server
|
// relayServerAddress is url of the relay server
|
||||||
RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"`
|
RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -440,7 +443,7 @@ var file_signalexchange_proto_rawDesc = []byte{
|
|||||||
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62,
|
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62,
|
||||||
0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e,
|
0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e,
|
||||||
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52,
|
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52,
|
||||||
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xa6, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
|
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xb3, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
|
||||||
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73,
|
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73,
|
||||||
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f,
|
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f,
|
||||||
0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
|
0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
|
||||||
@@ -463,33 +466,34 @@ var file_signalexchange_proto_rawDesc = []byte{
|
|||||||
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
|
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
|
||||||
0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01,
|
0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01,
|
||||||
0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41,
|
0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41,
|
||||||
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x36, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09,
|
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09,
|
||||||
0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53,
|
0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53,
|
||||||
0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41,
|
0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41,
|
||||||
0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x22, 0x2e,
|
0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x12, 0x0b,
|
||||||
0x0a, 0x04, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74,
|
0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x22, 0x2e, 0x0a, 0x04, 0x4d,
|
||||||
0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74,
|
0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20,
|
||||||
0x88, 0x01, 0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d,
|
0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01,
|
||||||
0x0a, 0x0f, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69,
|
0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52,
|
||||||
0x67, 0x12, 0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75,
|
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28,
|
||||||
0x62, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65,
|
0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65,
|
||||||
0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72,
|
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61,
|
||||||
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64,
|
0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65,
|
||||||
0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
|
0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18,
|
||||||
0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01,
|
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
|
||||||
0x0a, 0x0e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
|
0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, 0x53,
|
||||||
0x12, 0x4c, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
|
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a,
|
||||||
0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70,
|
0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
|
||||||
0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67,
|
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64,
|
||||||
0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72,
|
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c,
|
||||||
0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59,
|
0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74,
|
||||||
0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12,
|
0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, 0x43,
|
||||||
0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
|
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x73,
|
||||||
0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
|
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e,
|
||||||
0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
|
0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20,
|
||||||
0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73,
|
0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e,
|
||||||
0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72,
|
0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
|
||||||
0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||||
|
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|||||||
@@ -47,6 +47,7 @@ message Body {
|
|||||||
ANSWER = 1;
|
ANSWER = 1;
|
||||||
CANDIDATE = 2;
|
CANDIDATE = 2;
|
||||||
MODE = 4;
|
MODE = 4;
|
||||||
|
GO_IDLE = 5;
|
||||||
}
|
}
|
||||||
Type type = 1;
|
Type type = 1;
|
||||||
string payload = 2;
|
string payload = 2;
|
||||||
|
|||||||
Reference in New Issue
Block a user