Compare commits

...

16 Commits

Author SHA1 Message Date
Zoltán Papp
546538f570 Check 5 min intervals 2025-06-23 20:49:02 +02:00
Zoltán Papp
9307e7e0ea Add historical based algorithm 2025-06-21 19:38:44 +02:00
Zoltán Papp
9691e197df Merge branch 'main' into feature/poc-lazy-detection 2025-06-20 18:19:02 +02:00
Viktor Liu
2a51609436 [client] Handle lazy routing peers that are part of HA groups (#3943)
* Activate new lazy routing peers if the HA group is active
* Prevent lazy peers going to idle if HA group members are active (#3948)
2025-06-20 18:07:19 +02:00
Zoltán Papp
d380c925c2 Merge branch 'main' into feature/poc-lazy-detection 2025-06-20 17:54:27 +02:00
Zoltán Papp
a22a6f6d26 Remove unused ctx 2025-06-20 16:41:40 +02:00
Pascal Fischer
83457f8b99 [management] add transaction for integrated validator groups update and primary account update (#4014) 2025-06-20 12:13:24 +02:00
Zoltán Papp
55e7ca96df Fix counter 2025-06-20 10:24:05 +02:00
Pascal Fischer
b45284f086 [management] export ephemeral peer flag on api (#4004) 2025-06-19 16:46:56 +02:00
Zoltán Papp
2d401a7dce Add signal message 2025-06-19 16:22:59 +02:00
Bethuel Mmbaga
e9016aecea [management] Add backward compatibility for older clients without firewall rules port range support (#4003)
Adds backward compatibility for clients with versions prior to v0.48.0 that do not support port range firewall rules.

- Skips generation of firewall rules with multi-port ranges for older clients
- Preserves support for single-port ranges by treating them as individual port rules, ensuring compatibility with older clients
2025-06-19 13:07:06 +03:00
Zoltán Papp
d7e68ff812 Fix test 2025-06-19 10:28:13 +02:00
Zoltán Papp
f3a5e34c3f Change the logic and add moc data 2025-06-18 20:31:06 +02:00
Zoltan Papp
b3c0b46a88 Merge remote-tracking branch 'origin/feature/poc-lazy-detection' into feature/poc-lazy-detection 2025-06-18 09:58:22 +02:00
Zoltan Papp
6c7a8a7741 Add new algorithm 2025-06-18 09:57:37 +02:00
Zoltan Papp
93d8d272bf Add new algorithm 2025-06-14 02:25:05 +02:00
29 changed files with 1286 additions and 472 deletions

View File

@@ -198,6 +198,7 @@ func (s *ICEBind) createIPv4ReceiverFn(pc *ipv4.PacketConn, conn *net.UDPConn, r
if sizes[i] == 0 { if sizes[i] == 0 {
continue continue
} }
log.Infof("--- received Datagram %s from %s, size: %d", msg.Addr, msg.Addr.String(), sizes[i])
addrPort := msg.Addr.(*net.UDPAddr).AddrPort() addrPort := msg.Addr.(*net.UDPAddr).AddrPort()
ep := &wgConn.StdNetEndpoint{AddrPort: addrPort} // TODO: remove allocation ep := &wgConn.StdNetEndpoint{AddrPort: addrPort} // TODO: remove allocation
wgConn.GetSrcFromControl(msg.OOB[:msg.NN], ep) wgConn.GetSrcFromControl(msg.OOB[:msg.NN], ep)

View File

@@ -133,7 +133,7 @@ func (e *ConnMgr) SetExcludeList(ctx context.Context, peerIDs map[string]bool) {
excludedPeers = append(excludedPeers, lazyPeerCfg) excludedPeers = append(excludedPeers, lazyPeerCfg)
} }
added := e.lazyConnMgr.ExcludePeer(e.lazyCtx, excludedPeers) added := e.lazyConnMgr.ExcludePeer(excludedPeers)
for _, peerID := range added { for _, peerID := range added {
var peerConn *peer.Conn var peerConn *peer.Conn
var exists bool var exists bool
@@ -201,7 +201,7 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) {
if !ok { if !ok {
return return
} }
defer conn.Close() defer conn.Close(false)
if !e.isStartedWithLazyMgr() { if !e.isStartedWithLazyMgr() {
return return
@@ -211,23 +211,25 @@ func (e *ConnMgr) RemovePeerConn(peerKey string) {
conn.Log.Infof("removed peer from lazy conn manager") conn.Log.Infof("removed peer from lazy conn manager")
} }
func (e *ConnMgr) OnSignalMsg(ctx context.Context, peerKey string) (*peer.Conn, bool) { func (e *ConnMgr) ActivatePeer(ctx context.Context, conn *peer.Conn) {
conn, ok := e.peerStore.PeerConn(peerKey)
if !ok {
return nil, false
}
if !e.isStartedWithLazyMgr() { if !e.isStartedWithLazyMgr() {
return conn, true return
} }
if found := e.lazyConnMgr.ActivatePeer(e.lazyCtx, peerKey); found { if found := e.lazyConnMgr.ActivatePeer(conn.GetKey()); found {
conn.Log.Infof("activated peer from inactive state") conn.Log.Infof("activated peer from inactive state")
if err := conn.Open(ctx); err != nil { if err := conn.Open(ctx); err != nil {
conn.Log.Errorf("failed to open connection: %v", err) conn.Log.Errorf("failed to open connection: %v", err)
} }
} }
return conn, true }
func (e *ConnMgr) DeactivatePeer(conn *peer.Conn) {
if !e.isStartedWithLazyMgr() {
return
}
e.lazyConnMgr.DeactivatePeer(conn.ConnID())
} }
func (e *ConnMgr) Close() { func (e *ConnMgr) Close() {
@@ -275,7 +277,7 @@ func (e *ConnMgr) addPeersToLazyConnManager() error {
lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg) lazyPeerCfgs = append(lazyPeerCfgs, lazyPeerCfg)
} }
return e.lazyConnMgr.AddActivePeers(e.lazyCtx, lazyPeerCfgs) return e.lazyConnMgr.AddActivePeers(lazyPeerCfgs)
} }
func (e *ConnMgr) closeManager(ctx context.Context) { func (e *ConnMgr) closeManager(ctx context.Context) {

View File

@@ -1255,7 +1255,7 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
} }
if exists := e.connMgr.AddPeerConn(e.ctx, peerKey, conn); exists { if exists := e.connMgr.AddPeerConn(e.ctx, peerKey, conn); exists {
conn.Close() conn.Close(false)
return fmt.Errorf("peer already exists: %s", peerKey) return fmt.Errorf("peer already exists: %s", peerKey)
} }
@@ -1331,11 +1331,16 @@ func (e *Engine) receiveSignalEvents() {
e.syncMsgMux.Lock() e.syncMsgMux.Lock()
defer e.syncMsgMux.Unlock() defer e.syncMsgMux.Unlock()
conn, ok := e.connMgr.OnSignalMsg(e.ctx, msg.Key) conn, ok := e.peerStore.PeerConn(msg.Key)
if !ok { if !ok {
return fmt.Errorf("wrongly addressed message %s", msg.Key) return fmt.Errorf("wrongly addressed message %s", msg.Key)
} }
msgType := msg.GetBody().GetType()
if msgType != sProto.Body_GO_IDLE {
e.connMgr.ActivatePeer(e.ctx, conn)
}
switch msg.GetBody().Type { switch msg.GetBody().Type {
case sProto.Body_OFFER: case sProto.Body_OFFER:
remoteCred, err := signal.UnMarshalCredential(msg) remoteCred, err := signal.UnMarshalCredential(msg)
@@ -1392,6 +1397,8 @@ func (e *Engine) receiveSignalEvents() {
go conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes()) go conn.OnRemoteCandidate(candidate, e.routeManager.GetClientRoutes())
case sProto.Body_MODE: case sProto.Body_MODE:
case sProto.Body_GO_IDLE:
e.connMgr.DeactivatePeer(conn)
} }
return nil return nil

View File

@@ -1,70 +0,0 @@
package inactivity
import (
"context"
"time"
peer "github.com/netbirdio/netbird/client/internal/peer/id"
)
const (
DefaultInactivityThreshold = 60 * time.Minute // idle after 1 hour inactivity
MinimumInactivityThreshold = 3 * time.Minute
)
type Monitor struct {
id peer.ConnID
timer *time.Timer
cancel context.CancelFunc
inactivityThreshold time.Duration
}
func NewInactivityMonitor(peerID peer.ConnID, threshold time.Duration) *Monitor {
i := &Monitor{
id: peerID,
timer: time.NewTimer(0),
inactivityThreshold: threshold,
}
i.timer.Stop()
return i
}
func (i *Monitor) Start(ctx context.Context, timeoutChan chan peer.ConnID) {
i.timer.Reset(i.inactivityThreshold)
defer i.timer.Stop()
ctx, i.cancel = context.WithCancel(ctx)
defer func() {
defer i.cancel()
select {
case <-i.timer.C:
default:
}
}()
select {
case <-i.timer.C:
select {
case timeoutChan <- i.id:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
func (i *Monitor) Stop() {
if i.cancel == nil {
return
}
i.cancel()
}
func (i *Monitor) PauseTimer() {
i.timer.Stop()
}
func (i *Monitor) ResetTimer() {
i.timer.Reset(i.inactivityThreshold)
}

View File

@@ -1,156 +0,0 @@
package inactivity
import (
"context"
"testing"
"time"
peerid "github.com/netbirdio/netbird/client/internal/peer/id"
)
type MocPeer struct {
}
func (m *MocPeer) ConnID() peerid.ConnID {
return peerid.ConnID(m)
}
func TestInactivityMonitor(t *testing.T) {
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
defer testTimeoutCancel()
p := &MocPeer{}
im := NewInactivityMonitor(p.ConnID(), time.Second*2)
timeoutChan := make(chan peerid.ConnID)
exitChan := make(chan struct{})
go func() {
defer close(exitChan)
im.Start(tCtx, timeoutChan)
}()
select {
case <-timeoutChan:
case <-tCtx.Done():
t.Fatal("timeout")
}
select {
case <-exitChan:
case <-tCtx.Done():
t.Fatal("timeout")
}
}
func TestReuseInactivityMonitor(t *testing.T) {
p := &MocPeer{}
im := NewInactivityMonitor(p.ConnID(), time.Second*2)
timeoutChan := make(chan peerid.ConnID)
for i := 2; i > 0; i-- {
exitChan := make(chan struct{})
testTimeoutCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
go func() {
defer close(exitChan)
im.Start(testTimeoutCtx, timeoutChan)
}()
select {
case <-timeoutChan:
case <-testTimeoutCtx.Done():
t.Fatal("timeout")
}
select {
case <-exitChan:
case <-testTimeoutCtx.Done():
t.Fatal("timeout")
}
testTimeoutCancel()
}
}
func TestStopInactivityMonitor(t *testing.T) {
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*5)
defer testTimeoutCancel()
p := &MocPeer{}
im := NewInactivityMonitor(p.ConnID(), DefaultInactivityThreshold)
timeoutChan := make(chan peerid.ConnID)
exitChan := make(chan struct{})
go func() {
defer close(exitChan)
im.Start(tCtx, timeoutChan)
}()
go func() {
time.Sleep(3 * time.Second)
im.Stop()
}()
select {
case <-timeoutChan:
t.Fatal("unexpected timeout")
case <-exitChan:
case <-tCtx.Done():
t.Fatal("timeout")
}
}
func TestPauseInactivityMonitor(t *testing.T) {
tCtx, testTimeoutCancel := context.WithTimeout(context.Background(), time.Second*10)
defer testTimeoutCancel()
p := &MocPeer{}
trashHold := time.Second * 3
im := NewInactivityMonitor(p.ConnID(), trashHold)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
timeoutChan := make(chan peerid.ConnID)
exitChan := make(chan struct{})
go func() {
defer close(exitChan)
im.Start(ctx, timeoutChan)
}()
time.Sleep(1 * time.Second) // grant time to start the monitor
im.PauseTimer()
// check to do not receive timeout
thresholdCtx, thresholdCancel := context.WithTimeout(context.Background(), trashHold+time.Second)
defer thresholdCancel()
select {
case <-exitChan:
t.Fatal("unexpected exit")
case <-timeoutChan:
t.Fatal("unexpected timeout")
case <-thresholdCtx.Done():
// test ok
case <-tCtx.Done():
t.Fatal("test timed out")
}
// test reset timer
im.ResetTimer()
select {
case <-tCtx.Done():
t.Fatal("test timed out")
case <-exitChan:
t.Fatal("unexpected exit")
case <-timeoutChan:
// expected timeout
}
}

View File

@@ -0,0 +1,249 @@
package inactivity
import (
"container/list"
"context"
"fmt"
"math"
"os"
"strconv"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/iface/configurer"
"github.com/netbirdio/netbird/client/internal/lazyconn"
)
// Responder: vmp2
// - Receive handshake initiation: 148 bytes + extra 32 bytes, every 02:00 - 03:00 minutes
// - Receive keep alive: 32 bytes, every 25 sec
// Initiator: mp1
// - Receive handshake response:
// - Receive keep alive: 32 bytes, every 25 sec
const (
keepAliveBytes = 32
keepAliveInterval = 25 * time.Second
handshakeInitBytes = 148
handshakeRespBytes = 92
handshakeMaxInterval = 3 * time.Minute
checkInterval = 1 * time.Minute
historySize = 5 * time.Minute
DefaultInactivityThreshold = 15 * time.Minute
MinimumInactivityThreshold = 5 * time.Minute
recorderEnv = "NB_LAZYCONN_RECORDER_ENABLED"
)
type WgInterface interface {
GetStats() (map[string]configurer.WGStats, error)
}
type peerHistory struct {
lastRxBytes int64 // last received bytes
bytesHistory *list.List // linked list of int64
historySize int
summarizedBytes int64
inactivityCounter int // counter to track inactivity
log *log.Entry
}
func newPeerHistory(log *log.Entry, historySize int) *peerHistory {
return &peerHistory{
bytesHistory: list.New(),
historySize: historySize,
log: log,
}
}
func (pi *peerHistory) appendRxBytes(rxBytes int64) {
// If at capacity, remove the oldest element (front)
if pi.bytesHistory.Len() == pi.historySize {
pi.summarizedBytes -= pi.bytesHistory.Front().Value.(int64)
pi.bytesHistory.Remove(pi.bytesHistory.Front())
}
// Add the new rxBytes at the back
pi.bytesHistory.PushBack(rxBytes)
pi.summarizedBytes += rxBytes
}
func (pi *peerHistory) historyString() string {
var history []string
for e := pi.bytesHistory.Front(); e != nil; e = e.Next() {
history = append(history, fmt.Sprintf("%d", e.Value.(int64)))
}
return fmt.Sprintf("%s", history)
}
func (pi *peerHistory) reset() {
for e := pi.bytesHistory.Front(); e != nil; e = e.Next() {
e.Value = int64(0)
}
pi.summarizedBytes = 0
}
type Manager struct {
InactivePeersChan chan []string
iface WgInterface
interestedPeers map[string]*peerHistory
maxBytesPerPeriod int64
historySize int // Size of the history buffer for each peer, used to track received bytes over time
recorder *Recorder
thresholdOfInactivity int // Number of consecutive checks with low activity to consider a peer inactive
}
func NewManager(iface WgInterface, configuredThreshold *time.Duration) *Manager {
inactivityThreshold, err := validateInactivityThreshold(configuredThreshold)
if err != nil {
inactivityThreshold = DefaultInactivityThreshold
log.Warnf("invalid inactivity threshold configured: %v, using default: %v", err, DefaultInactivityThreshold)
}
expectedMaxBytes := calculateExpectedMaxBytes()
log.Infof("receive less than %d bytes per %v, will be considered inactive", expectedMaxBytes, inactivityThreshold)
return &Manager{
InactivePeersChan: make(chan []string, 1),
iface: iface,
interestedPeers: make(map[string]*peerHistory),
historySize: int(historySize.Minutes()),
maxBytesPerPeriod: expectedMaxBytes,
thresholdOfInactivity: int(math.Ceil(inactivityThreshold.Minutes() / checkInterval.Minutes())),
}
}
func (m *Manager) AddPeer(peerCfg *lazyconn.PeerConfig) {
if _, exists := m.interestedPeers[peerCfg.PublicKey]; exists {
return
}
peerCfg.Log.Debugf("adding peer to inactivity manager")
m.interestedPeers[peerCfg.PublicKey] = newPeerHistory(peerCfg.Log, m.historySize)
}
func (m *Manager) RemovePeer(peer string) {
pi, ok := m.interestedPeers[peer]
if !ok {
return
}
pi.log.Debugf("remove peer from inactivity manager")
delete(m.interestedPeers, peer)
}
func (m *Manager) Start(ctx context.Context) {
enabled, err := strconv.ParseBool(os.Getenv(recorderEnv))
if err == nil && enabled {
m.recorder = NewRecorder()
defer m.recorder.Close()
}
ticker := newTicker(checkInterval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case tickTime := <-ticker.C():
idlePeers, err := m.checkStats(tickTime)
if err != nil {
log.Errorf("error checking stats: %v", err)
return
}
if len(idlePeers) == 0 {
continue
}
m.notifyInactivePeers(ctx, idlePeers)
}
}
}
func (m *Manager) notifyInactivePeers(ctx context.Context, inactivePeers []string) {
select {
case m.InactivePeersChan <- inactivePeers:
case <-ctx.Done():
return
default:
return
}
}
func (m *Manager) checkStats(now time.Time) ([]string, error) {
stats, err := m.iface.GetStats()
if err != nil {
return nil, err
}
var idlePeers []string
for peer, history := range m.interestedPeers {
stat, found := stats[peer]
if !found {
// when peer is in connecting state
history.log.Warnf("peer not found in wg stats")
}
deltaRx := stat.RxBytes - history.lastRxBytes
if deltaRx < 0 {
deltaRx = 0 // reset to zero if negative
history.reset()
}
m.recorder.ReceivedBytes(peer, now, deltaRx)
history.lastRxBytes = stat.RxBytes
history.appendRxBytes(deltaRx)
// not enough history to determine inactivity
if history.bytesHistory.Len() < m.historySize {
history.log.Debugf("not enough history to determine inactivity, current history size: %d, required: %d", history.bytesHistory.Len(), m.historySize)
continue
}
if history.summarizedBytes <= m.maxBytesPerPeriod {
history.inactivityCounter++
history.log.Debugf("peer is inactive, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString())
} else {
history.inactivityCounter = 0 // reset inactivity counter if activity is detected
history.log.Debugf("peer is active, summarizedBytes: %d, maxBytesPerPeriod: %d, %v", history.summarizedBytes, m.maxBytesPerPeriod, history.historyString())
}
if history.inactivityCounter >= m.thresholdOfInactivity {
history.log.Infof("peer is inactive for %d consecutive checks, marking as idle (limit %d) ", history.inactivityCounter, m.thresholdOfInactivity)
idlePeers = append(idlePeers, peer)
history.inactivityCounter = 0 // reset inactivity counter after marking as idle
}
}
return idlePeers, nil
}
func validateInactivityThreshold(configuredThreshold *time.Duration) (time.Duration, error) {
if configuredThreshold == nil {
return DefaultInactivityThreshold, nil
}
if *configuredThreshold < MinimumInactivityThreshold {
return 0, fmt.Errorf("configured inactivity threshold %v is too low, using %v", *configuredThreshold, MinimumInactivityThreshold)
}
return *configuredThreshold, nil
}
func calculateExpectedMaxBytes() int64 {
// Calculate number of keep-alive packets expected
keepAliveCount := int64(historySize.Seconds() / keepAliveInterval.Seconds())
keepAliveBytes := keepAliveCount * keepAliveBytes
// Calculate potential handshake packets (conservative estimate)
handshakeCount := int64(historySize.Minutes() / handshakeMaxInterval.Minutes())
handshakeBytes := handshakeCount * (handshakeInitBytes + keepAliveBytes) // handshake + extra bytes
return keepAliveBytes + handshakeBytes
}

View File

@@ -0,0 +1,53 @@
package inactivity
import (
"context"
"fmt"
"testing"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/client/internal/lazyconn"
"github.com/netbirdio/netbird/util"
)
func init() {
_ = util.InitLog("trace", "console")
}
func TestNewManager(t *testing.T) {
for i, sc := range scenarios {
timer := NewFakeTimer()
newTicker = func(d time.Duration) Ticker {
return newFakeTicker(d, timer)
}
t.Run(fmt.Sprintf("Scenario %d", i), func(t *testing.T) {
mock := newMockWgInterface("peer1", sc.Data, timer)
manager := NewManager(mock, nil)
peerCfg := &lazyconn.PeerConfig{
PublicKey: "peer1",
Log: log.WithField("peer", "peer1"),
}
manager.AddPeer(peerCfg)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
manager.Start(ctx)
var inactiveResult bool
select {
case <-manager.InactivePeersChan:
inactiveResult = true
default:
inactiveResult = false
}
if inactiveResult != sc.ExpectedInactive {
t.Errorf("Expected inactive peers: %v, got: %v", sc.ExpectedInactive, inactiveResult)
}
})
}
}

View File

@@ -0,0 +1,102 @@
package inactivity
import (
"fmt"
"time"
"github.com/netbirdio/netbird/client/iface/configurer"
)
type rxHistory struct {
when time.Duration
RxBytes int64
}
// mockWgInterface mocks WgInterface to simulate peer stats.
type mockWgInterface struct {
peerID string
statsSequence []rxHistory
timer *FakeTimer
initialTime time.Time
reachedLast bool
}
func newMockWgInterface(peerID string, history []rxHistory, timer *FakeTimer) *mockWgInterface {
return &mockWgInterface{
peerID: peerID,
statsSequence: history,
timer: timer,
initialTime: timer.Now(),
}
}
func (m *mockWgInterface) GetStats() (map[string]configurer.WGStats, error) {
if m.reachedLast {
return nil, fmt.Errorf("no more data")
}
now := m.timer.Now()
var rx int64
for i, history := range m.statsSequence {
if now.Before(m.initialTime.Add(history.when)) {
break
}
if len(m.statsSequence)-1 == i {
m.reachedLast = true
}
rx += history.RxBytes
}
wgStats := make(map[string]configurer.WGStats)
wgStats[m.peerID] = configurer.WGStats{
RxBytes: rx,
}
return wgStats, nil
}
// fakeTicker is a controllable ticker for use in tests
type fakeTicker struct {
interval time.Duration
timer *FakeTimer
ch chan time.Time
now time.Time
}
func newFakeTicker(interval time.Duration, timer *FakeTimer) *fakeTicker {
return &fakeTicker{
interval: interval,
timer: timer,
ch: make(chan time.Time, 1),
now: timer.Now(),
}
}
func (f *fakeTicker) C() <-chan time.Time {
f.now = f.now.Add(f.interval)
f.timer.Set(f.now)
f.ch <- f.now
return f.ch
}
func (f *fakeTicker) Stop() {}
type FakeTimer struct {
now time.Time
}
func NewFakeTimer() *FakeTimer {
return &FakeTimer{
now: time.Date(2025, time.June, 1, 0, 0, 0, 0, time.UTC),
}
}
func (f *FakeTimer) Set(t time.Time) {
f.now = t
}
func (f *FakeTimer) Now() time.Time {
return f.now
}

View File

@@ -0,0 +1,58 @@
package inactivity
import (
"fmt"
"os"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type Recorder struct {
mu sync.Mutex
file *os.File
filename string
}
func NewRecorder() *Recorder {
timestamp := time.Now().Format("20060102_150405")
filename := fmt.Sprintf("inactivity_log_%s.txt", timestamp)
file, err := os.OpenFile(filename, os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644)
if err != nil {
log.Errorf("error opening file: %v", err)
}
return &Recorder{
file: file,
filename: filename,
}
}
func (r *Recorder) ReceivedBytes(peer string, now time.Time, bytes int64) {
if r == nil {
return
}
r.mu.Lock()
defer r.mu.Unlock()
entry := fmt.Sprintf("%s; %s; %d\n", now.Format(time.RFC3339), peer, bytes)
_, err := r.file.WriteString(entry)
if err != nil {
log.Errorf("error writing to file: %v", err)
}
}
func (r *Recorder) Close() {
if r == nil {
return
}
r.mu.Lock()
defer r.mu.Unlock()
if err := r.file.Close(); err != nil {
log.Errorf("error closing file: %v", err)
}
}

View File

@@ -0,0 +1,209 @@
package inactivity
import "time"
type scenario struct {
ExpectedInactive bool
Data []rxHistory
}
var scenarios = []scenario{
{
ExpectedInactive: true,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 32},
{when: 25 * time.Second, RxBytes: 32},
{when: 50 * time.Second, RxBytes: 32},
{when: 75 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 92},
{when: 150 * time.Second, RxBytes: 32},
{when: 175 * time.Second, RxBytes: 32},
{when: 200 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 32},
{when: 250 * time.Second, RxBytes: 32},
{when: 250 * time.Second, RxBytes: 92},
{when: 300 * time.Second, RxBytes: 32},
{when: 325 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 32},
{when: 375 * time.Second, RxBytes: 32},
{when: 375 * time.Second, RxBytes: 92},
{when: 400 * time.Second, RxBytes: 32},
{when: 425 * time.Second, RxBytes: 32},
{when: 450 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 32},
{when: 500 * time.Second, RxBytes: 32},
{when: 500 * time.Second, RxBytes: 92},
{when: 525 * time.Second, RxBytes: 32},
{when: 550 * time.Second, RxBytes: 32},
{when: 575 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 32},
{when: 625 * time.Second, RxBytes: 32},
{when: 625 * time.Second, RxBytes: 92},
{when: 650 * time.Second, RxBytes: 32},
{when: 675 * time.Second, RxBytes: 32},
{when: 700 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 32},
{when: 750 * time.Second, RxBytes: 32},
{when: 750 * time.Second, RxBytes: 92},
{when: 775 * time.Second, RxBytes: 32},
},
},
{
ExpectedInactive: true,
Data: []rxHistory{
//96
{when: 0 * time.Second, RxBytes: 32},
{when: 25 * time.Second, RxBytes: 32},
{when: 50 * time.Second, RxBytes: 32},
//212
{when: 75 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 32},
{when: 100 * time.Second, RxBytes: 148},
//96
{when: 125 * time.Second, RxBytes: 32},
{when: 150 * time.Second, RxBytes: 32},
{when: 175 * time.Second, RxBytes: 32},
//212
{when: 200 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 32},
{when: 225 * time.Second, RxBytes: 148},
//96
{when: 250 * time.Second, RxBytes: 32},
{when: 275 * time.Second, RxBytes: 32},
{when: 300 * time.Second, RxBytes: 32},
{when: 325 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 32},
{when: 350 * time.Second, RxBytes: 148},
{when: 375 * time.Second, RxBytes: 32},
{when: 400 * time.Second, RxBytes: 32},
{when: 425 * time.Second, RxBytes: 32},
{when: 450 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 148},
{when: 500 * time.Second, RxBytes: 32},
{when: 525 * time.Second, RxBytes: 32},
{when: 550 * time.Second, RxBytes: 32},
{when: 575 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 32},
{when: 600 * time.Second, RxBytes: 148},
{when: 625 * time.Second, RxBytes: 32},
{when: 650 * time.Second, RxBytes: 32},
{when: 675 * time.Second, RxBytes: 32},
{when: 700 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 32},
{when: 725 * time.Second, RxBytes: 148},
{when: 750 * time.Second, RxBytes: 32},
},
},
{
// Rosenpass
ExpectedInactive: true,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 1200},
{when: 0 * time.Second, RxBytes: 1200},
{when: 0 * time.Second, RxBytes: 128},
{when: 0 * time.Second, RxBytes: 1200},
{when: 0 * time.Second, RxBytes: 128},
{when: 0 * time.Second, RxBytes: 2},
{when: 35 * time.Second, RxBytes: 32},
{when: 60 * time.Second, RxBytes: 32},
{when: 85 * time.Second, RxBytes: 32},
{when: 110 * time.Second, RxBytes: 32},
{when: 120 * time.Second, RxBytes: 1152},
{when: 120 * time.Second, RxBytes: 92},
{when: 120 * time.Second, RxBytes: 240},
{when: 130 * time.Second, RxBytes: 1200},
{when: 130 * time.Second, RxBytes: 32},
{when: 130 * time.Second, RxBytes: 1200},
{when: 130 * time.Second, RxBytes: 128},
{when: 165 * time.Second, RxBytes: 32},
{when: 190 * time.Second, RxBytes: 32},
{when: 215 * time.Second, RxBytes: 32},
{when: 240 * time.Second, RxBytes: 92},
{when: 240 * time.Second, RxBytes: 1200},
{when: 240 * time.Second, RxBytes: 128},
{when: 260 * time.Second, RxBytes: 1200},
{when: 260 * time.Second, RxBytes: 1200},
{when: 260 * time.Second, RxBytes: 128},
{when: 320 * time.Second, RxBytes: 32},
{when: 345 * time.Second, RxBytes: 32},
{when: 370 * time.Second, RxBytes: 92},
{when: 370 * time.Second, RxBytes: 1200},
{when: 370 * time.Second, RxBytes: 128},
{when: 390 * time.Second, RxBytes: 1200},
{when: 390 * time.Second, RxBytes: 128},
{when: 450 * time.Second, RxBytes: 32},
{when: 475 * time.Second, RxBytes: 32},
{when: 500 * time.Second, RxBytes: 92},
{when: 500 * time.Second, RxBytes: 1200},
{when: 500 * time.Second, RxBytes: 128},
{when: 520 * time.Second, RxBytes: 1200},
{when: 520 * time.Second, RxBytes: 128},
},
},
{
ExpectedInactive: true,
Data: []rxHistory{
{when: 0 * time.Second, RxBytes: 1152},
{when: 0 * time.Second, RxBytes: 1152},
{when: 0 * time.Second, RxBytes: 240},
{when: 0 * time.Second, RxBytes: 1152},
{when: 1 * time.Second, RxBytes: 240},
{when: 1 * time.Second, RxBytes: 2},
{when: 11 * time.Second, RxBytes: 32},
{when: 121 * time.Second, RxBytes: 1200},
{when: 121 * time.Second, RxBytes: 148},
{when: 121 * time.Second, RxBytes: 32},
{when: 121 * time.Second, RxBytes: 128},
{when: 131 * time.Second, RxBytes: 1152},
{when: 131 * time.Second, RxBytes: 1152},
{when: 131 * time.Second, RxBytes: 240},
{when: 141 * time.Second, RxBytes: 32},
{when: 191 * time.Second, RxBytes: 32},
{when: 241 * time.Second, RxBytes: 1152},
{when: 241 * time.Second, RxBytes: 148},
{when: 241 * time.Second, RxBytes: 32},
{when: 241 * time.Second, RxBytes: 240},
{when: 251 * time.Second, RxBytes: 32},
{when: 261 * time.Second, RxBytes: 1152},
{when: 261 * time.Second, RxBytes: 1152},
{when: 261 * time.Second, RxBytes: 240},
{when: 271 * time.Second, RxBytes: 32},
{when: 296 * time.Second, RxBytes: 32},
{when: 371 * time.Second, RxBytes: 1152},
{when: 371 * time.Second, RxBytes: 148},
{when: 371 * time.Second, RxBytes: 32},
{when: 371 * time.Second, RxBytes: 240},
{when: 381 * time.Second, RxBytes: 32},
{when: 391 * time.Second, RxBytes: 1152},
{when: 391 * time.Second, RxBytes: 240},
{when: 401 * time.Second, RxBytes: 32},
{when: 426 * time.Second, RxBytes: 32},
{when: 501 * time.Second, RxBytes: 1152},
{when: 501 * time.Second, RxBytes: 148},
{when: 501 * time.Second, RxBytes: 32},
{when: 501 * time.Second, RxBytes: 240},
{when: 511 * time.Second, RxBytes: 32},
{when: 521 * time.Second, RxBytes: 1152},
{when: 521 * time.Second, RxBytes: 240},
{when: 531 * time.Second, RxBytes: 32},
{when: 631 * time.Second, RxBytes: 1152},
{when: 631 * time.Second, RxBytes: 148},
{when: 631 * time.Second, RxBytes: 32},
{when: 631 * time.Second, RxBytes: 240},
},
},
}

View File

@@ -0,0 +1,24 @@
package inactivity
import "time"
var newTicker = func(d time.Duration) Ticker {
return &realTicker{t: time.NewTicker(d)}
}
type Ticker interface {
C() <-chan time.Time
Stop()
}
type realTicker struct {
t *time.Ticker
}
func (r *realTicker) C() <-chan time.Time {
return r.t.C
}
func (r *realTicker) Stop() {
r.t.Stop()
}

View File

@@ -53,12 +53,12 @@ type Manager struct {
managedPeersMu sync.Mutex managedPeersMu sync.Mutex
activityManager *activity.Manager activityManager *activity.Manager
inactivityMonitors map[peerid.ConnID]*inactivity.Monitor inactivityManager *inactivity.Manager
// Route HA group management // Route HA group management
peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to peerToHAGroups map[string][]route.HAUniqueID // peer ID -> HA groups they belong to
haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group haGroupToPeers map[route.HAUniqueID][]string // HA group -> peer IDs in the group
routesMu sync.RWMutex // protects route mappings routesMu sync.RWMutex
onInactive chan peerid.ConnID onInactive chan peerid.ConnID
} }
@@ -67,6 +67,7 @@ type Manager struct {
// engineCtx is the context for creating peer Connection // engineCtx is the context for creating peer Connection
func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager { func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.Store, wgIface lazyconn.WGIface, connStateDispatcher *dispatcher.ConnectionDispatcher) *Manager {
log.Infof("setup lazy connection service") log.Infof("setup lazy connection service")
m := &Manager{ m := &Manager{
engineCtx: engineCtx, engineCtx: engineCtx,
peerStore: peerStore, peerStore: peerStore,
@@ -76,18 +77,9 @@ func NewManager(config Config, engineCtx context.Context, peerStore *peerstore.S
managedPeersByConnID: make(map[peerid.ConnID]*managedPeer), managedPeersByConnID: make(map[peerid.ConnID]*managedPeer),
excludes: make(map[string]lazyconn.PeerConfig), excludes: make(map[string]lazyconn.PeerConfig),
activityManager: activity.NewManager(wgIface), activityManager: activity.NewManager(wgIface),
inactivityMonitors: make(map[peerid.ConnID]*inactivity.Monitor), inactivityManager: inactivity.NewManager(wgIface, config.InactivityThreshold),
peerToHAGroups: make(map[string][]route.HAUniqueID), peerToHAGroups: make(map[string][]route.HAUniqueID),
haGroupToPeers: make(map[route.HAUniqueID][]string), haGroupToPeers: make(map[route.HAUniqueID][]string),
onInactive: make(chan peerid.ConnID),
}
if config.InactivityThreshold != nil {
if *config.InactivityThreshold >= inactivity.MinimumInactivityThreshold {
m.inactivityThreshold = *config.InactivityThreshold
} else {
log.Warnf("inactivity threshold is too low, using %v", m.inactivityThreshold)
}
} }
m.connStateListener = &dispatcher.ConnectionListener{ m.connStateListener = &dispatcher.ConnectionListener{
@@ -139,14 +131,18 @@ func (m *Manager) UpdateRouteHAMap(haMap route.HAMap) {
func (m *Manager) Start(ctx context.Context) { func (m *Manager) Start(ctx context.Context) {
defer m.close() defer m.close()
go m.inactivityManager.Start(ctx)
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return
case peerConnID := <-m.activityManager.OnActivityChan: case peerConnID := <-m.activityManager.OnActivityChan:
m.onPeerActivity(ctx, peerConnID) m.onPeerActivity(peerConnID)
case peerConnID := <-m.onInactive: case peerIDs := <-m.inactivityManager.InactivePeersChan:
m.onPeerInactivityTimedOut(peerConnID) for _, peerID := range peerIDs {
m.onPeerInactivityTimedOut(peerID)
}
} }
} }
} }
@@ -156,7 +152,7 @@ func (m *Manager) Start(ctx context.Context) {
// Adds them back to the managed list and start the inactivity listener if they are removed from the exclude list. In // Adds them back to the managed list and start the inactivity listener if they are removed from the exclude list. In
// this case, we suppose that the connection status is connected or connecting. // this case, we suppose that the connection status is connected or connecting.
// If the peer is not exists yet in the managed list then the responsibility is the upper layer to call the AddPeer function // If the peer is not exists yet in the managed list then the responsibility is the upper layer to call the AddPeer function
func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerConfig) []string { func (m *Manager) ExcludePeer(peerConfigs []lazyconn.PeerConfig) []string {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -187,7 +183,7 @@ func (m *Manager) ExcludePeer(ctx context.Context, peerConfigs []lazyconn.PeerCo
peerCfg.Log.Infof("peer removed from lazy connection exclude list") peerCfg.Log.Infof("peer removed from lazy connection exclude list")
if err := m.addActivePeer(ctx, peerCfg); err != nil { if err := m.addActivePeer(&peerCfg); err != nil {
log.Errorf("failed to add peer to lazy connection manager: %s", err) log.Errorf("failed to add peer to lazy connection manager: %s", err)
continue continue
} }
@@ -217,20 +213,24 @@ func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
return false, err return false, err
} }
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold)
m.inactivityMonitors[peerCfg.PeerConnID] = im
m.managedPeers[peerCfg.PublicKey] = &peerCfg m.managedPeers[peerCfg.PublicKey] = &peerCfg
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
peerCfg: &peerCfg, peerCfg: &peerCfg,
expectedWatcher: watcherActivity, expectedWatcher: watcherActivity,
} }
// Check if this peer should be activated because its HA group peers are active
if group, ok := m.shouldActivateNewPeer(peerCfg.PublicKey); ok {
peerCfg.Log.Debugf("peer belongs to active HA group %s, will activate immediately", group)
m.activateNewPeerInActiveGroup(peerCfg)
}
return false, nil return false, nil
} }
// AddActivePeers adds a list of peers to the lazy connection manager // AddActivePeers adds a list of peers to the lazy connection manager
// suppose these peers was in connected or in connecting states // suppose these peers was in connected or in connecting states
func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerConfig) error { func (m *Manager) AddActivePeers(peerCfg []lazyconn.PeerConfig) error {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -240,7 +240,7 @@ func (m *Manager) AddActivePeers(ctx context.Context, peerCfg []lazyconn.PeerCon
continue continue
} }
if err := m.addActivePeer(ctx, cfg); err != nil { if err := m.addActivePeer(&cfg); err != nil {
cfg.Log.Errorf("failed to add peer to lazy connection manager: %v", err) cfg.Log.Errorf("failed to add peer to lazy connection manager: %v", err)
return err return err
} }
@@ -257,7 +257,7 @@ func (m *Manager) RemovePeer(peerID string) {
// ActivatePeer activates a peer connection when a signal message is received // ActivatePeer activates a peer connection when a signal message is received
// Also activates all peers in the same HA groups as this peer // Also activates all peers in the same HA groups as this peer
func (m *Manager) ActivatePeer(ctx context.Context, peerID string) (found bool) { func (m *Manager) ActivatePeer(peerID string) (found bool) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
cfg, mp := m.getPeerForActivation(peerID) cfg, mp := m.getPeerForActivation(peerID)
@@ -265,15 +265,42 @@ func (m *Manager) ActivatePeer(ctx context.Context, peerID string) (found bool)
return false return false
} }
if !m.activateSinglePeer(ctx, cfg, mp) { if !m.activateSinglePeer(cfg, mp) {
return false return false
} }
m.activateHAGroupPeers(ctx, peerID) m.activateHAGroupPeers(peerID)
return true return true
} }
func (m *Manager) DeactivatePeer(peerID peerid.ConnID) {
m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock()
mp, ok := m.managedPeersByConnID[peerID]
if !ok {
return
}
if mp.expectedWatcher != watcherInactivity {
return
}
m.peerStore.PeerConnClose(mp.peerCfg.PublicKey)
mp.peerCfg.Log.Infof("start activity monitor")
mp.expectedWatcher = watcherActivity
m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey)
if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil {
mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err)
return
}
}
// getPeerForActivation checks if a peer can be activated and returns the necessary structs // getPeerForActivation checks if a peer can be activated and returns the necessary structs
// Returns nil values if the peer should be skipped // Returns nil values if the peer should be skipped
func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *managedPeer) { func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *managedPeer) {
@@ -296,57 +323,53 @@ func (m *Manager) getPeerForActivation(peerID string) (*lazyconn.PeerConfig, *ma
} }
// activateSinglePeer activates a single peer (internal method) // activateSinglePeer activates a single peer (internal method)
func (m *Manager) activateSinglePeer(ctx context.Context, cfg *lazyconn.PeerConfig, mp *managedPeer) bool { func (m *Manager) activateSinglePeer(cfg *lazyconn.PeerConfig, mp *managedPeer) bool {
mp.expectedWatcher = watcherInactivity mp.expectedWatcher = watcherInactivity
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
im, ok := m.inactivityMonitors[cfg.PeerConnID] cfg.Log.Infof("starting inactivity monitor for peer: %s", cfg.PublicKey)
if !ok { m.inactivityManager.AddPeer(cfg)
cfg.Log.Errorf("inactivity monitor not found for peer")
return false
}
cfg.Log.Infof("starting inactivity monitor")
go im.Start(ctx, m.onInactive)
return true return true
} }
// activateHAGroupPeers activates all peers in HA groups that the given peer belongs to // activateHAGroupPeers activates all peers in HA groups that the given peer belongs to
func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string) { func (m *Manager) activateHAGroupPeers(triggerPeerID string) {
var peersToActivate []string
m.routesMu.RLock() m.routesMu.RLock()
haGroups := m.peerToHAGroups[triggerPeerID] haGroups := m.peerToHAGroups[triggerPeerID]
m.routesMu.RUnlock()
if len(haGroups) == 0 { if len(haGroups) == 0 {
m.routesMu.RUnlock()
log.Debugf("peer %s is not part of any HA groups", triggerPeerID) log.Debugf("peer %s is not part of any HA groups", triggerPeerID)
return return
} }
activatedCount := 0
for _, haGroup := range haGroups { for _, haGroup := range haGroups {
m.routesMu.RLock()
peers := m.haGroupToPeers[haGroup] peers := m.haGroupToPeers[haGroup]
for _, peerID := range peers {
if peerID != triggerPeerID {
peersToActivate = append(peersToActivate, peerID)
}
}
}
m.routesMu.RUnlock() m.routesMu.RUnlock()
for _, peerID := range peers { activatedCount := 0
if peerID == triggerPeerID { for _, peerID := range peersToActivate {
continue
}
cfg, mp := m.getPeerForActivation(peerID) cfg, mp := m.getPeerForActivation(peerID)
if cfg == nil { if cfg == nil {
continue continue
} }
if m.activateSinglePeer(ctx, cfg, mp) { if m.activateSinglePeer(cfg, mp) {
activatedCount++ activatedCount++
cfg.Log.Infof("activated peer as part of HA group %s (triggered by %s)", haGroup, triggerPeerID) cfg.Log.Infof("activated peer as part of HA group (triggered by %s)", triggerPeerID)
m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey) m.peerStore.PeerConnOpen(m.engineCtx, cfg.PublicKey)
} }
} }
}
if activatedCount > 0 { if activatedCount > 0 {
log.Infof("activated %d additional peers in HA groups for peer %s (groups: %v)", log.Infof("activated %d additional peers in HA groups for peer %s (groups: %v)",
@@ -354,23 +377,64 @@ func (m *Manager) activateHAGroupPeers(ctx context.Context, triggerPeerID string
} }
} }
func (m *Manager) addActivePeer(ctx context.Context, peerCfg lazyconn.PeerConfig) error { // shouldActivateNewPeer checks if a newly added peer should be activated
// because other peers in its HA groups are already active
func (m *Manager) shouldActivateNewPeer(peerID string) (route.HAUniqueID, bool) {
m.routesMu.RLock()
defer m.routesMu.RUnlock()
haGroups := m.peerToHAGroups[peerID]
if len(haGroups) == 0 {
return "", false
}
for _, haGroup := range haGroups {
peers := m.haGroupToPeers[haGroup]
for _, groupPeerID := range peers {
if groupPeerID == peerID {
continue
}
cfg, ok := m.managedPeers[groupPeerID]
if !ok {
continue
}
if mp, ok := m.managedPeersByConnID[cfg.PeerConnID]; ok && mp.expectedWatcher == watcherInactivity {
return haGroup, true
}
}
}
return "", false
}
// activateNewPeerInActiveGroup activates a newly added peer that should be active due to HA group
func (m *Manager) activateNewPeerInActiveGroup(peerCfg lazyconn.PeerConfig) {
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
if !ok {
return
}
if !m.activateSinglePeer(&peerCfg, mp) {
return
}
peerCfg.Log.Infof("activated newly added peer due to active HA group peers")
m.peerStore.PeerConnOpen(m.engineCtx, peerCfg.PublicKey)
}
func (m *Manager) addActivePeer(peerCfg *lazyconn.PeerConfig) error {
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok { if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
peerCfg.Log.Warnf("peer already managed") peerCfg.Log.Warnf("peer already managed")
return nil return nil
} }
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID, m.inactivityThreshold) m.managedPeers[peerCfg.PublicKey] = peerCfg
m.inactivityMonitors[peerCfg.PeerConnID] = im
m.managedPeers[peerCfg.PublicKey] = &peerCfg
m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{ m.managedPeersByConnID[peerCfg.PeerConnID] = &managedPeer{
peerCfg: &peerCfg, peerCfg: peerCfg,
expectedWatcher: watcherInactivity, expectedWatcher: watcherInactivity,
} }
peerCfg.Log.Infof("starting inactivity monitor on peer that has been removed from exclude list") m.inactivityManager.AddPeer(peerCfg)
go im.Start(ctx, m.onInactive)
return nil return nil
} }
@@ -382,12 +446,7 @@ func (m *Manager) removePeer(peerID string) {
cfg.Log.Infof("removing lazy peer") cfg.Log.Infof("removing lazy peer")
if im, ok := m.inactivityMonitors[cfg.PeerConnID]; ok { m.inactivityManager.RemovePeer(cfg.PublicKey)
im.Stop()
delete(m.inactivityMonitors, cfg.PeerConnID)
cfg.Log.Debugf("inactivity monitor stopped")
}
m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID) m.activityManager.RemovePeer(cfg.Log, cfg.PeerConnID)
delete(m.managedPeers, peerID) delete(m.managedPeers, peerID)
delete(m.managedPeersByConnID, cfg.PeerConnID) delete(m.managedPeersByConnID, cfg.PeerConnID)
@@ -399,10 +458,7 @@ func (m *Manager) close() {
m.connStateDispatcher.RemoveListener(m.connStateListener) m.connStateDispatcher.RemoveListener(m.connStateListener)
m.activityManager.Close() m.activityManager.Close()
for _, iw := range m.inactivityMonitors {
iw.Stop()
}
m.inactivityMonitors = make(map[peerid.ConnID]*inactivity.Monitor)
m.managedPeers = make(map[string]*lazyconn.PeerConfig) m.managedPeers = make(map[string]*lazyconn.PeerConfig)
m.managedPeersByConnID = make(map[peerid.ConnID]*managedPeer) m.managedPeersByConnID = make(map[peerid.ConnID]*managedPeer)
@@ -415,7 +471,49 @@ func (m *Manager) close() {
log.Infof("lazy connection manager closed") log.Infof("lazy connection manager closed")
} }
func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID) { // shouldDeferIdleForHA checks if peer should stay connected due to HA group requirements
func (m *Manager) shouldDeferIdleForHA(peerID string) bool {
m.routesMu.RLock()
defer m.routesMu.RUnlock()
haGroups := m.peerToHAGroups[peerID]
if len(haGroups) == 0 {
return false
}
for _, haGroup := range haGroups {
groupPeers := m.haGroupToPeers[haGroup]
for _, groupPeerID := range groupPeers {
if groupPeerID == peerID {
continue
}
cfg, ok := m.managedPeers[groupPeerID]
if !ok {
continue
}
groupMp, ok := m.managedPeersByConnID[cfg.PeerConnID]
if !ok {
continue
}
if groupMp.expectedWatcher != watcherInactivity {
continue
}
// Other member is still connected, defer idle
if peer, ok := m.peerStore.PeerConn(groupPeerID); ok && peer.IsConnected() {
return true
}
}
}
return false
}
func (m *Manager) onPeerActivity(peerConnID peerid.ConnID) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
@@ -432,22 +530,28 @@ func (m *Manager) onPeerActivity(ctx context.Context, peerConnID peerid.ConnID)
mp.peerCfg.Log.Infof("detected peer activity") mp.peerCfg.Log.Infof("detected peer activity")
if !m.activateSinglePeer(ctx, mp.peerCfg, mp) { if !m.activateSinglePeer(mp.peerCfg, mp) {
return return
} }
m.activateHAGroupPeers(ctx, mp.peerCfg.PublicKey) m.activateHAGroupPeers(mp.peerCfg.PublicKey)
m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey) m.peerStore.PeerConnOpen(m.engineCtx, mp.peerCfg.PublicKey)
} }
func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) { func (m *Manager) onPeerInactivityTimedOut(peerID string) {
m.managedPeersMu.Lock() m.managedPeersMu.Lock()
defer m.managedPeersMu.Unlock() defer m.managedPeersMu.Unlock()
mp, ok := m.managedPeersByConnID[peerConnID] peerCfg, ok := m.managedPeers[peerID]
if !ok { if !ok {
log.Errorf("peer not found by id: %v", peerConnID) log.Errorf("peer not found by peerId: %v", peerID)
return
}
mp, ok := m.managedPeersByConnID[peerCfg.PeerConnID]
if !ok {
log.Errorf("peer not found by conn id: %v", peerCfg.PeerConnID)
return return
} }
@@ -456,6 +560,11 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
return return
} }
if m.shouldDeferIdleForHA(mp.peerCfg.PublicKey) {
// todo: review to how reset the inactivity detection
return
}
mp.peerCfg.Log.Infof("connection timed out") mp.peerCfg.Log.Infof("connection timed out")
// this is blocking operation, potentially can be optimized // this is blocking operation, potentially can be optimized
@@ -465,8 +574,7 @@ func (m *Manager) onPeerInactivityTimedOut(peerConnID peerid.ConnID) {
mp.expectedWatcher = watcherActivity mp.expectedWatcher = watcherActivity
// just in case free up m.inactivityManager.RemovePeer(mp.peerCfg.PublicKey)
m.inactivityMonitors[peerConnID].PauseTimer()
if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil { if err := m.activityManager.MonitorPeerActivity(*mp.peerCfg); err != nil {
mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err) mp.peerCfg.Log.Errorf("failed to create activity monitor: %v", err)
@@ -487,14 +595,8 @@ func (m *Manager) onPeerConnected(peerConnID peerid.ConnID) {
return return
} }
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID] mp.peerCfg.Log.Infof("peer connected, starting inactivity monitor")
if !ok { m.inactivityManager.AddPeer(mp.peerCfg)
mp.peerCfg.Log.Errorf("inactivity monitor not found for peer")
return
}
mp.peerCfg.Log.Infof("peer connected, pausing inactivity monitor while connection is not disconnected")
iw.PauseTimer()
} }
func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) { func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
@@ -510,11 +612,6 @@ func (m *Manager) onPeerDisconnected(peerConnID peerid.ConnID) {
return return
} }
iw, ok := m.inactivityMonitors[mp.peerCfg.PeerConnID] // todo reset inactivity monitor
if !ok { mp.peerCfg.Log.Warnf("--- peer disconnected, stopping inactivity monitor?")
return
}
mp.peerCfg.Log.Infof("reset inactivity monitor timer")
iw.ResetTimer()
} }

View File

@@ -1,6 +1,7 @@
package lazyconn package lazyconn
import ( import (
"github.com/netbirdio/netbird/client/iface/configurer"
"net" "net"
"net/netip" "net/netip"
"time" "time"
@@ -11,4 +12,5 @@ import (
type WGIface interface { type WGIface interface {
RemovePeer(peerKey string) error RemovePeer(peerKey string) error
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
GetStats() (map[string]configurer.WGStats, error)
} }

View File

@@ -226,7 +226,7 @@ func (conn *Conn) Open(engineCtx context.Context) error {
} }
// Close closes this peer Conn issuing a close event to the Conn closeCh // Close closes this peer Conn issuing a close event to the Conn closeCh
func (conn *Conn) Close() { func (conn *Conn) Close(graceful bool) {
conn.mu.Lock() conn.mu.Lock()
defer conn.wgWatcherWg.Wait() defer conn.wgWatcherWg.Wait()
defer conn.mu.Unlock() defer conn.mu.Unlock()
@@ -236,6 +236,10 @@ func (conn *Conn) Close() {
return return
} }
if graceful {
conn.signaler.SignalIdle(conn.config.Key)
}
conn.Log.Infof("close peer connection") conn.Log.Infof("close peer connection")
conn.ctxCancel() conn.ctxCancel()
@@ -317,12 +321,12 @@ func (conn *Conn) WgConfig() WgConfig {
return conn.config.WgConfig return conn.config.WgConfig
} }
// IsConnected unit tests only // IsConnected returns true if the peer is connected
// refactor unit test to use status recorder use refactor status recorded to manage connection status in peer.Conn
func (conn *Conn) IsConnected() bool { func (conn *Conn) IsConnected() bool {
conn.mu.Lock() conn.mu.Lock()
defer conn.mu.Unlock() defer conn.mu.Unlock()
return conn.currentConnPriority != conntype.None
return conn.evalStatus() == StatusConnected
} }
func (conn *Conn) GetKey() string { func (conn *Conn) GetKey() string {

View File

@@ -68,3 +68,13 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string,
return nil return nil
} }
func (s *Signaler) SignalIdle(remoteKey string) error {
return s.signal.Send(&sProto.Message{
Key: s.wgPrivateKey.PublicKey().String(),
RemoteKey: remoteKey,
Body: &sProto.Body{
Type: sProto.Body_GO_IDLE,
},
})
}

View File

@@ -95,6 +95,17 @@ func (s *Store) PeerConnOpen(ctx context.Context, pubKey string) {
} }
func (s *Store) PeerConnIdle(pubKey string) {
s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock()
p, ok := s.peerConns[pubKey]
if !ok {
return
}
p.Close(true)
}
func (s *Store) PeerConnClose(pubKey string) { func (s *Store) PeerConnClose(pubKey string) {
s.peerConnsMu.RLock() s.peerConnsMu.RLock()
defer s.peerConnsMu.RUnlock() defer s.peerConnsMu.RUnlock()
@@ -103,7 +114,7 @@ func (s *Store) PeerConnClose(pubKey string) {
if !ok { if !ok {
return return
} }
p.Close() p.Close(false)
} }
func (s *Store) PeersPubKey() []string { func (s *Store) PeersPubKey() []string {

View File

@@ -1853,20 +1853,23 @@ func (am *DefaultAccountManager) GetOrCreateAccountByPrivateDomain(ctx context.C
} }
func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) { func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, accountId string) (*types.Account, error) {
account, err := am.Store.GetAccount(ctx, accountId) var account *types.Account
err := am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
var err error
account, err = transaction.GetAccount(ctx, accountId)
if err != nil { if err != nil {
return nil, err return err
} }
if account.IsDomainPrimaryAccount { if account.IsDomainPrimaryAccount {
return account, nil return nil
} }
existingPrimaryAccountID, err := am.Store.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain) existingPrimaryAccountID, err := transaction.GetAccountIDByPrivateDomain(ctx, store.LockingStrengthShare, account.Domain)
// error is not a not found error // error is not a not found error
if handleNotFound(err) != nil { if handleNotFound(err) != nil {
return nil, err return err
} }
// a primary account already exists for this private domain // a primary account already exists for this private domain
@@ -1875,16 +1878,22 @@ func (am *DefaultAccountManager) UpdateToPrimaryAccount(ctx context.Context, acc
"accountId": accountId, "accountId": accountId,
"existingAccountId": existingPrimaryAccountID, "existingAccountId": existingPrimaryAccountID,
}).Errorf("cannot update account to primary, another account already exists as primary for the same domain") }).Errorf("cannot update account to primary, another account already exists as primary for the same domain")
return nil, status.Errorf(status.Internal, "cannot update account to primary") return status.Errorf(status.Internal, "cannot update account to primary")
} }
account.IsDomainPrimaryAccount = true account.IsDomainPrimaryAccount = true
if err := am.Store.SaveAccount(ctx, account); err != nil { if err := transaction.SaveAccount(ctx, account); err != nil {
log.WithContext(ctx).WithFields(log.Fields{ log.WithContext(ctx).WithFields(log.Fields{
"accountId": accountId, "accountId": accountId,
}).Errorf("failed to update account to primary: %v", err) }).Errorf("failed to update account to primary: %v", err)
return nil, status.Errorf(status.Internal, "failed to update account to primary") return status.Errorf(status.Internal, "failed to update account to primary")
}
return nil
})
if err != nil {
return nil, err
} }
return account, nil return account, nil

View File

@@ -426,6 +426,10 @@ components:
items: items:
type: string type: string
example: "stage-host-1" example: "stage-host-1"
ephemeral:
description: Indicates whether the peer is ephemeral or not
type: boolean
example: false
required: required:
- city_name - city_name
- connected - connected
@@ -450,6 +454,7 @@ components:
- approval_required - approval_required
- serial_number - serial_number
- extra_dns_labels - extra_dns_labels
- ephemeral
AccessiblePeer: AccessiblePeer:
allOf: allOf:
- $ref: '#/components/schemas/PeerMinimum' - $ref: '#/components/schemas/PeerMinimum'

View File

@@ -1016,6 +1016,9 @@ type Peer struct {
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud // DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
DnsLabel string `json:"dns_label"` DnsLabel string `json:"dns_label"`
// Ephemeral Indicates whether the peer is ephemeral or not
Ephemeral bool `json:"ephemeral"`
// ExtraDnsLabels Extra DNS labels added to the peer // ExtraDnsLabels Extra DNS labels added to the peer
ExtraDnsLabels []string `json:"extra_dns_labels"` ExtraDnsLabels []string `json:"extra_dns_labels"`
@@ -1097,6 +1100,9 @@ type PeerBatch struct {
// DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud // DnsLabel Peer's DNS label is the parsed peer name for domain resolution. It is used to form an FQDN by appending the account's domain to the peer label. e.g. peer-dns-label.netbird.cloud
DnsLabel string `json:"dns_label"` DnsLabel string `json:"dns_label"`
// Ephemeral Indicates whether the peer is ephemeral or not
Ephemeral bool `json:"ephemeral"`
// ExtraDnsLabels Extra DNS labels added to the peer // ExtraDnsLabels Extra DNS labels added to the peer
ExtraDnsLabels []string `json:"extra_dns_labels"` ExtraDnsLabels []string `json:"extra_dns_labels"`

View File

@@ -365,6 +365,7 @@ func toSinglePeerResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dnsD
CityName: peer.Location.CityName, CityName: peer.Location.CityName,
SerialNumber: peer.Meta.SystemSerialNumber, SerialNumber: peer.Meta.SystemSerialNumber,
InactivityExpirationEnabled: peer.InactivityExpirationEnabled, InactivityExpirationEnabled: peer.InactivityExpirationEnabled,
Ephemeral: peer.Ephemeral,
} }
} }

View File

@@ -37,7 +37,8 @@ func (am *DefaultAccountManager) UpdateIntegratedValidatorGroups(ctx context.Con
unlock := am.Store.AcquireWriteLockByUID(ctx, accountID) unlock := am.Store.AcquireWriteLockByUID(ctx, accountID)
defer unlock() defer unlock()
a, err := am.Store.GetAccountByUser(ctx, userID) return am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
a, err := transaction.GetAccountByUser(ctx, userID)
if err != nil { if err != nil {
return err return err
} }
@@ -51,7 +52,8 @@ func (am *DefaultAccountManager) UpdateIntegratedValidatorGroups(ctx context.Con
a.Settings.Extra = extra a.Settings.Extra = extra
} }
extra.IntegratedValidatorGroups = groups extra.IntegratedValidatorGroups = groups
return am.Store.SaveAccount(ctx, a) return transaction.SaveAccount(ctx, a)
})
} }
func (am *DefaultAccountManager) GroupValidation(ctx context.Context, accountID string, groupIDs []string) (bool, error) { func (am *DefaultAccountManager) GroupValidation(ctx context.Context, accountID string, groupIDs []string) (bool, error) {

View File

@@ -92,7 +92,7 @@ func (am *DefaultAccountManager) getUserAccessiblePeers(ctx context.Context, acc
// fetch all the peers that have access to the user's peers // fetch all the peers that have access to the user's peers
for _, peer := range peers { for _, peer := range peers {
aclPeers, _ := account.GetPeerConnectionResources(ctx, peer.ID, approvedPeersMap) aclPeers, _ := account.GetPeerConnectionResources(ctx, peer, approvedPeersMap)
for _, p := range aclPeers { for _, p := range aclPeers {
peersMap[p.ID] = p peersMap[p.ID] = p
} }
@@ -1149,7 +1149,7 @@ func (am *DefaultAccountManager) checkIfUserOwnsPeer(ctx context.Context, accoun
} }
for _, p := range userPeers { for _, p := range userPeers {
aclPeers, _ := account.GetPeerConnectionResources(ctx, p.ID, approvedPeersMap) aclPeers, _ := account.GetPeerConnectionResources(ctx, p, approvedPeersMap)
for _, aclPeer := range aclPeers { for _, aclPeer := range aclPeers {
if aclPeer.ID == peer.ID { if aclPeer.ID == peer.ID {
return peer, nil return peer, nil

View File

@@ -27,6 +27,7 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
ID: "peerB", ID: "peerB",
IP: net.ParseIP("100.65.80.39"), IP: net.ParseIP("100.65.80.39"),
Status: &nbpeer.PeerStatus{}, Status: &nbpeer.PeerStatus{},
Meta: nbpeer.PeerSystemMeta{WtVersion: "0.48.0"},
}, },
"peerC": { "peerC": {
ID: "peerC", ID: "peerC",
@@ -63,6 +64,12 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
IP: net.ParseIP("100.65.31.2"), IP: net.ParseIP("100.65.31.2"),
Status: &nbpeer.PeerStatus{}, Status: &nbpeer.PeerStatus{},
}, },
"peerK": {
ID: "peerK",
IP: net.ParseIP("100.32.80.1"),
Status: &nbpeer.PeerStatus{},
Meta: nbpeer.PeerSystemMeta{WtVersion: "0.30.0"},
},
}, },
Groups: map[string]*types.Group{ Groups: map[string]*types.Group{
"GroupAll": { "GroupAll": {
@@ -111,6 +118,13 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
"peerI", "peerI",
}, },
}, },
"GroupWorkflow": {
ID: "GroupWorkflow",
Name: "workflow",
Peers: []string{
"peerK",
},
},
}, },
Policies: []*types.Policy{ Policies: []*types.Policy{
{ {
@@ -189,6 +203,39 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
}, },
}, },
}, },
{
ID: "RuleWorkflow",
Name: "Workflow",
Description: "No description",
Enabled: true,
Rules: []*types.PolicyRule{
{
ID: "RuleWorkflow",
Name: "Workflow",
Description: "No description",
Bidirectional: true,
Enabled: true,
Protocol: types.PolicyRuleProtocolTCP,
Action: types.PolicyTrafficActionAccept,
PortRanges: []types.RulePortRange{
{
Start: 8088,
End: 8088,
},
{
Start: 9090,
End: 9095,
},
},
Sources: []string{
"GroupWorkflow",
},
Destinations: []string{
"GroupDMZ",
},
},
},
},
}, },
} }
@@ -199,14 +246,14 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
t.Run("check that all peers get map", func(t *testing.T) { t.Run("check that all peers get map", func(t *testing.T) {
for _, p := range account.Peers { for _, p := range account.Peers {
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), p.ID, validatedPeers) peers, firewallRules := account.GetPeerConnectionResources(context.Background(), p, validatedPeers)
assert.GreaterOrEqual(t, len(peers), 2, "minimum number peers should present") assert.GreaterOrEqual(t, len(peers), 1, "minimum number peers should present")
assert.GreaterOrEqual(t, len(firewallRules), 2, "minimum number of firewall rules should present") assert.GreaterOrEqual(t, len(firewallRules), 1, "minimum number of firewall rules should present")
} }
}) })
t.Run("check first peer map details", func(t *testing.T) { t.Run("check first peer map details", func(t *testing.T) {
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", validatedPeers) peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], validatedPeers)
assert.Len(t, peers, 8) assert.Len(t, peers, 8)
assert.Contains(t, peers, account.Peers["peerA"]) assert.Contains(t, peers, account.Peers["peerA"])
assert.Contains(t, peers, account.Peers["peerC"]) assert.Contains(t, peers, account.Peers["peerC"])
@@ -364,6 +411,32 @@ func TestAccount_getPeersByPolicy(t *testing.T) {
assert.True(t, contains, "rule not found in expected rules %#v", rule) assert.True(t, contains, "rule not found in expected rules %#v", rule)
} }
}) })
t.Run("check port ranges support for older peers", func(t *testing.T) {
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerK"], validatedPeers)
assert.Len(t, peers, 1)
assert.Contains(t, peers, account.Peers["peerI"])
expectedFirewallRules := []*types.FirewallRule{
{
PeerIP: "100.65.31.2",
Direction: types.FirewallRuleDirectionIN,
Action: "accept",
Protocol: "tcp",
Port: "8088",
PolicyID: "RuleWorkflow",
},
{
PeerIP: "100.65.31.2",
Direction: types.FirewallRuleDirectionOUT,
Action: "accept",
Protocol: "tcp",
Port: "8088",
PolicyID: "RuleWorkflow",
},
}
assert.ElementsMatch(t, firewallRules, expectedFirewallRules)
})
} }
func TestAccount_getPeersByPolicyDirect(t *testing.T) { func TestAccount_getPeersByPolicyDirect(t *testing.T) {
@@ -466,10 +539,10 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
} }
t.Run("check first peer map", func(t *testing.T) { t.Run("check first peer map", func(t *testing.T) {
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", approvedPeers) peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], approvedPeers)
assert.Contains(t, peers, account.Peers["peerC"]) assert.Contains(t, peers, account.Peers["peerC"])
epectedFirewallRules := []*types.FirewallRule{ expectedFirewallRules := []*types.FirewallRule{
{ {
PeerIP: "100.65.254.139", PeerIP: "100.65.254.139",
Direction: types.FirewallRuleDirectionIN, Direction: types.FirewallRuleDirectionIN,
@@ -487,19 +560,19 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
PolicyID: "RuleSwarm", PolicyID: "RuleSwarm",
}, },
} }
assert.Len(t, firewallRules, len(epectedFirewallRules)) assert.Len(t, firewallRules, len(expectedFirewallRules))
slices.SortFunc(epectedFirewallRules, sortFunc()) slices.SortFunc(expectedFirewallRules, sortFunc())
slices.SortFunc(firewallRules, sortFunc()) slices.SortFunc(firewallRules, sortFunc())
for i := range firewallRules { for i := range firewallRules {
assert.Equal(t, epectedFirewallRules[i], firewallRules[i]) assert.Equal(t, expectedFirewallRules[i], firewallRules[i])
} }
}) })
t.Run("check second peer map", func(t *testing.T) { t.Run("check second peer map", func(t *testing.T) {
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerC", approvedPeers) peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerC"], approvedPeers)
assert.Contains(t, peers, account.Peers["peerB"]) assert.Contains(t, peers, account.Peers["peerB"])
epectedFirewallRules := []*types.FirewallRule{ expectedFirewallRules := []*types.FirewallRule{
{ {
PeerIP: "100.65.80.39", PeerIP: "100.65.80.39",
Direction: types.FirewallRuleDirectionIN, Direction: types.FirewallRuleDirectionIN,
@@ -517,21 +590,21 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
PolicyID: "RuleSwarm", PolicyID: "RuleSwarm",
}, },
} }
assert.Len(t, firewallRules, len(epectedFirewallRules)) assert.Len(t, firewallRules, len(expectedFirewallRules))
slices.SortFunc(epectedFirewallRules, sortFunc()) slices.SortFunc(expectedFirewallRules, sortFunc())
slices.SortFunc(firewallRules, sortFunc()) slices.SortFunc(firewallRules, sortFunc())
for i := range firewallRules { for i := range firewallRules {
assert.Equal(t, epectedFirewallRules[i], firewallRules[i]) assert.Equal(t, expectedFirewallRules[i], firewallRules[i])
} }
}) })
account.Policies[1].Rules[0].Bidirectional = false account.Policies[1].Rules[0].Bidirectional = false
t.Run("check first peer map directional only", func(t *testing.T) { t.Run("check first peer map directional only", func(t *testing.T) {
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", approvedPeers) peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], approvedPeers)
assert.Contains(t, peers, account.Peers["peerC"]) assert.Contains(t, peers, account.Peers["peerC"])
epectedFirewallRules := []*types.FirewallRule{ expectedFirewallRules := []*types.FirewallRule{
{ {
PeerIP: "100.65.254.139", PeerIP: "100.65.254.139",
Direction: types.FirewallRuleDirectionOUT, Direction: types.FirewallRuleDirectionOUT,
@@ -541,19 +614,19 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
PolicyID: "RuleSwarm", PolicyID: "RuleSwarm",
}, },
} }
assert.Len(t, firewallRules, len(epectedFirewallRules)) assert.Len(t, firewallRules, len(expectedFirewallRules))
slices.SortFunc(epectedFirewallRules, sortFunc()) slices.SortFunc(expectedFirewallRules, sortFunc())
slices.SortFunc(firewallRules, sortFunc()) slices.SortFunc(firewallRules, sortFunc())
for i := range firewallRules { for i := range firewallRules {
assert.Equal(t, epectedFirewallRules[i], firewallRules[i]) assert.Equal(t, expectedFirewallRules[i], firewallRules[i])
} }
}) })
t.Run("check second peer map directional only", func(t *testing.T) { t.Run("check second peer map directional only", func(t *testing.T) {
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerC", approvedPeers) peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerC"], approvedPeers)
assert.Contains(t, peers, account.Peers["peerB"]) assert.Contains(t, peers, account.Peers["peerB"])
epectedFirewallRules := []*types.FirewallRule{ expectedFirewallRules := []*types.FirewallRule{
{ {
PeerIP: "100.65.80.39", PeerIP: "100.65.80.39",
Direction: types.FirewallRuleDirectionIN, Direction: types.FirewallRuleDirectionIN,
@@ -563,11 +636,11 @@ func TestAccount_getPeersByPolicyDirect(t *testing.T) {
PolicyID: "RuleSwarm", PolicyID: "RuleSwarm",
}, },
} }
assert.Len(t, firewallRules, len(epectedFirewallRules)) assert.Len(t, firewallRules, len(expectedFirewallRules))
slices.SortFunc(epectedFirewallRules, sortFunc()) slices.SortFunc(expectedFirewallRules, sortFunc())
slices.SortFunc(firewallRules, sortFunc()) slices.SortFunc(firewallRules, sortFunc())
for i := range firewallRules { for i := range firewallRules {
assert.Equal(t, epectedFirewallRules[i], firewallRules[i]) assert.Equal(t, expectedFirewallRules[i], firewallRules[i])
} }
}) })
} }
@@ -748,7 +821,7 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
t.Run("verify peer's network map with default group peer list", func(t *testing.T) { t.Run("verify peer's network map with default group peer list", func(t *testing.T) {
// peerB doesn't fulfill the NB posture check but is included in the destination group Swarm, // peerB doesn't fulfill the NB posture check but is included in the destination group Swarm,
// will establish a connection with all source peers satisfying the NB posture check. // will establish a connection with all source peers satisfying the NB posture check.
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", approvedPeers) peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], approvedPeers)
assert.Len(t, peers, 4) assert.Len(t, peers, 4)
assert.Len(t, firewallRules, 4) assert.Len(t, firewallRules, 4)
assert.Contains(t, peers, account.Peers["peerA"]) assert.Contains(t, peers, account.Peers["peerA"])
@@ -758,7 +831,7 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
// peerC satisfy the NB posture check, should establish connection to all destination group peer's // peerC satisfy the NB posture check, should establish connection to all destination group peer's
// We expect a single permissive firewall rule which all outgoing connections // We expect a single permissive firewall rule which all outgoing connections
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerC", approvedPeers) peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerC"], approvedPeers)
assert.Len(t, peers, len(account.Groups["GroupSwarm"].Peers)) assert.Len(t, peers, len(account.Groups["GroupSwarm"].Peers))
assert.Len(t, firewallRules, 1) assert.Len(t, firewallRules, 1)
expectedFirewallRules := []*types.FirewallRule{ expectedFirewallRules := []*types.FirewallRule{
@@ -775,7 +848,7 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
// peerE doesn't fulfill the NB posture check and exists in only destination group Swarm, // peerE doesn't fulfill the NB posture check and exists in only destination group Swarm,
// all source group peers satisfying the NB posture check should establish connection // all source group peers satisfying the NB posture check should establish connection
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerE", approvedPeers) peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerE"], approvedPeers)
assert.Len(t, peers, 4) assert.Len(t, peers, 4)
assert.Len(t, firewallRules, 4) assert.Len(t, firewallRules, 4)
assert.Contains(t, peers, account.Peers["peerA"]) assert.Contains(t, peers, account.Peers["peerA"])
@@ -785,7 +858,7 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
// peerI doesn't fulfill the OS version posture check and exists in only destination group Swarm, // peerI doesn't fulfill the OS version posture check and exists in only destination group Swarm,
// all source group peers satisfying the NB posture check should establish connection // all source group peers satisfying the NB posture check should establish connection
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerI", approvedPeers) peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerI"], approvedPeers)
assert.Len(t, peers, 4) assert.Len(t, peers, 4)
assert.Len(t, firewallRules, 4) assert.Len(t, firewallRules, 4)
assert.Contains(t, peers, account.Peers["peerA"]) assert.Contains(t, peers, account.Peers["peerA"])
@@ -800,19 +873,19 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
// peerB doesn't satisfy the NB posture check, and doesn't exist in destination group peer's // peerB doesn't satisfy the NB posture check, and doesn't exist in destination group peer's
// no connection should be established to any peer of destination group // no connection should be established to any peer of destination group
peers, firewallRules := account.GetPeerConnectionResources(context.Background(), "peerB", approvedPeers) peers, firewallRules := account.GetPeerConnectionResources(context.Background(), account.Peers["peerB"], approvedPeers)
assert.Len(t, peers, 0) assert.Len(t, peers, 0)
assert.Len(t, firewallRules, 0) assert.Len(t, firewallRules, 0)
// peerI doesn't satisfy the OS version posture check, and doesn't exist in destination group peer's // peerI doesn't satisfy the OS version posture check, and doesn't exist in destination group peer's
// no connection should be established to any peer of destination group // no connection should be established to any peer of destination group
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerI", approvedPeers) peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerI"], approvedPeers)
assert.Len(t, peers, 0) assert.Len(t, peers, 0)
assert.Len(t, firewallRules, 0) assert.Len(t, firewallRules, 0)
// peerC satisfy the NB posture check, should establish connection to all destination group peer's // peerC satisfy the NB posture check, should establish connection to all destination group peer's
// We expect a single permissive firewall rule which all outgoing connections // We expect a single permissive firewall rule which all outgoing connections
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerC", approvedPeers) peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerC"], approvedPeers)
assert.Len(t, peers, len(account.Groups["GroupSwarm"].Peers)) assert.Len(t, peers, len(account.Groups["GroupSwarm"].Peers))
assert.Len(t, firewallRules, len(account.Groups["GroupSwarm"].Peers)) assert.Len(t, firewallRules, len(account.Groups["GroupSwarm"].Peers))
@@ -827,14 +900,14 @@ func TestAccount_getPeersByPolicyPostureChecks(t *testing.T) {
// peerE doesn't fulfill the NB posture check and exists in only destination group Swarm, // peerE doesn't fulfill the NB posture check and exists in only destination group Swarm,
// all source group peers satisfying the NB posture check should establish connection // all source group peers satisfying the NB posture check should establish connection
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerE", approvedPeers) peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerE"], approvedPeers)
assert.Len(t, peers, 3) assert.Len(t, peers, 3)
assert.Len(t, firewallRules, 3) assert.Len(t, firewallRules, 3)
assert.Contains(t, peers, account.Peers["peerA"]) assert.Contains(t, peers, account.Peers["peerA"])
assert.Contains(t, peers, account.Peers["peerC"]) assert.Contains(t, peers, account.Peers["peerC"])
assert.Contains(t, peers, account.Peers["peerD"]) assert.Contains(t, peers, account.Peers["peerD"])
peers, firewallRules = account.GetPeerConnectionResources(context.Background(), "peerA", approvedPeers) peers, firewallRules = account.GetPeerConnectionResources(context.Background(), account.Peers["peerA"], approvedPeers)
assert.Len(t, peers, 5) assert.Len(t, peers, 5)
// assert peers from Group Swarm // assert peers from Group Swarm
assert.Contains(t, peers, account.Peers["peerD"]) assert.Contains(t, peers, account.Peers["peerD"])

View File

@@ -24,20 +24,12 @@ func sanitizeVersion(version string) string {
} }
func (n *NBVersionCheck) Check(ctx context.Context, peer nbpeer.Peer) (bool, error) { func (n *NBVersionCheck) Check(ctx context.Context, peer nbpeer.Peer) (bool, error) {
peerVersion := sanitizeVersion(peer.Meta.WtVersion) meetsMin, err := MeetsMinVersion(n.MinVersion, peer.Meta.WtVersion)
minVersion := sanitizeVersion(n.MinVersion)
peerNBVersion, err := version.NewVersion(peerVersion)
if err != nil { if err != nil {
return false, err return false, err
} }
constraints, err := version.NewConstraint(">= " + minVersion) if meetsMin {
if err != nil {
return false, err
}
if constraints.Check(peerNBVersion) {
return true, nil return true, nil
} }
@@ -60,3 +52,21 @@ func (n *NBVersionCheck) Validate() error {
} }
return nil return nil
} }
// MeetsMinVersion checks if the peer's version meets or exceeds the minimum required version
func MeetsMinVersion(minVer, peerVer string) (bool, error) {
peerVer = sanitizeVersion(peerVer)
minVer = sanitizeVersion(minVer)
peerNBVer, err := version.NewVersion(peerVer)
if err != nil {
return false, err
}
constraints, err := version.NewConstraint(">= " + minVer)
if err != nil {
return false, err
}
return constraints.Check(peerNBVer), nil
}

View File

@@ -139,3 +139,68 @@ func TestNBVersionCheck_Validate(t *testing.T) {
}) })
} }
} }
func TestMeetsMinVersion(t *testing.T) {
tests := []struct {
name string
minVer string
peerVer string
want bool
wantErr bool
}{
{
name: "Peer version greater than min version",
minVer: "0.26.0",
peerVer: "0.60.1",
want: true,
wantErr: false,
},
{
name: "Peer version equals min version",
minVer: "1.0.0",
peerVer: "1.0.0",
want: true,
wantErr: false,
},
{
name: "Peer version less than min version",
minVer: "1.0.0",
peerVer: "0.9.9",
want: false,
wantErr: false,
},
{
name: "Peer version with pre-release tag greater than min version",
minVer: "1.0.0",
peerVer: "1.0.1-alpha",
want: true,
wantErr: false,
},
{
name: "Invalid peer version format",
minVer: "1.0.0",
peerVer: "dev",
want: false,
wantErr: true,
},
{
name: "Invalid min version format",
minVer: "invalid.version",
peerVer: "1.0.0",
want: false,
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := MeetsMinVersion(tt.minVer, tt.peerVer)
if tt.wantErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
assert.Equal(t, tt.want, got)
})
}
}

View File

@@ -36,6 +36,9 @@ const (
PublicCategory = "public" PublicCategory = "public"
PrivateCategory = "private" PrivateCategory = "private"
UnknownCategory = "unknown" UnknownCategory = "unknown"
// firewallRuleMinPortRangesVer defines the minimum peer version that supports port range rules.
firewallRuleMinPortRangesVer = "0.48.0"
) )
type LookupMap map[string]struct{} type LookupMap map[string]struct{}
@@ -248,7 +251,7 @@ func (a *Account) GetPeerNetworkMap(
} }
} }
aclPeers, firewallRules := a.GetPeerConnectionResources(ctx, peerID, validatedPeersMap) aclPeers, firewallRules := a.GetPeerConnectionResources(ctx, peer, validatedPeersMap)
// exclude expired peers // exclude expired peers
var peersToConnect []*nbpeer.Peer var peersToConnect []*nbpeer.Peer
var expiredPeers []*nbpeer.Peer var expiredPeers []*nbpeer.Peer
@@ -961,8 +964,9 @@ func (a *Account) UserGroupsRemoveFromPeers(userID string, groups ...string) map
// GetPeerConnectionResources for a given peer // GetPeerConnectionResources for a given peer
// //
// This function returns the list of peers and firewall rules that are applicable to a given peer. // This function returns the list of peers and firewall rules that are applicable to a given peer.
func (a *Account) GetPeerConnectionResources(ctx context.Context, peerID string, validatedPeersMap map[string]struct{}) ([]*nbpeer.Peer, []*FirewallRule) { func (a *Account) GetPeerConnectionResources(ctx context.Context, peer *nbpeer.Peer, validatedPeersMap map[string]struct{}) ([]*nbpeer.Peer, []*FirewallRule) {
generateResources, getAccumulatedResources := a.connResourcesGenerator(ctx) generateResources, getAccumulatedResources := a.connResourcesGenerator(ctx, peer)
for _, policy := range a.Policies { for _, policy := range a.Policies {
if !policy.Enabled { if !policy.Enabled {
continue continue
@@ -973,8 +977,8 @@ func (a *Account) GetPeerConnectionResources(ctx context.Context, peerID string,
continue continue
} }
sourcePeers, peerInSources := a.getAllPeersFromGroups(ctx, rule.Sources, peerID, policy.SourcePostureChecks, validatedPeersMap) sourcePeers, peerInSources := a.getAllPeersFromGroups(ctx, rule.Sources, peer.ID, policy.SourcePostureChecks, validatedPeersMap)
destinationPeers, peerInDestinations := a.getAllPeersFromGroups(ctx, rule.Destinations, peerID, nil, validatedPeersMap) destinationPeers, peerInDestinations := a.getAllPeersFromGroups(ctx, rule.Destinations, peer.ID, nil, validatedPeersMap)
if rule.Bidirectional { if rule.Bidirectional {
if peerInSources { if peerInSources {
@@ -1003,7 +1007,7 @@ func (a *Account) GetPeerConnectionResources(ctx context.Context, peerID string,
// The generator function is used to generate the list of peers and firewall rules that are applicable to a given peer. // The generator function is used to generate the list of peers and firewall rules that are applicable to a given peer.
// It safe to call the generator function multiple times for same peer and different rules no duplicates will be // It safe to call the generator function multiple times for same peer and different rules no duplicates will be
// generated. The accumulator function returns the result of all the generator calls. // generated. The accumulator function returns the result of all the generator calls.
func (a *Account) connResourcesGenerator(ctx context.Context) (func(*PolicyRule, []*nbpeer.Peer, int), func() ([]*nbpeer.Peer, []*FirewallRule)) { func (a *Account) connResourcesGenerator(ctx context.Context, targetPeer *nbpeer.Peer) (func(*PolicyRule, []*nbpeer.Peer, int), func() ([]*nbpeer.Peer, []*FirewallRule)) {
rulesExists := make(map[string]struct{}) rulesExists := make(map[string]struct{})
peersExists := make(map[string]struct{}) peersExists := make(map[string]struct{})
rules := make([]*FirewallRule, 0) rules := make([]*FirewallRule, 0)
@@ -1051,17 +1055,7 @@ func (a *Account) connResourcesGenerator(ctx context.Context) (func(*PolicyRule,
continue continue
} }
for _, port := range rule.Ports { rules = append(rules, expandPortsAndRanges(fr, rule, targetPeer)...)
pr := fr // clone rule and add set new port
pr.Port = port
rules = append(rules, &pr)
}
for _, portRange := range rule.PortRanges {
pr := fr
pr.PortRange = portRange
rules = append(rules, &pr)
}
} }
}, func() ([]*nbpeer.Peer, []*FirewallRule) { }, func() ([]*nbpeer.Peer, []*FirewallRule) {
return peers, rules return peers, rules
@@ -1590,3 +1584,45 @@ func (a *Account) AddAllGroup() error {
} }
return nil return nil
} }
// expandPortsAndRanges expands Ports and PortRanges of a rule into individual firewall rules
func expandPortsAndRanges(base FirewallRule, rule *PolicyRule, peer *nbpeer.Peer) []*FirewallRule {
var expanded []*FirewallRule
if len(rule.Ports) > 0 {
for _, port := range rule.Ports {
fr := base
fr.Port = port
expanded = append(expanded, &fr)
}
return expanded
}
supportPortRanges := peerSupportsPortRanges(peer.Meta.WtVersion)
for _, portRange := range rule.PortRanges {
fr := base
if supportPortRanges {
fr.PortRange = portRange
} else {
// Peer doesn't support port ranges, only allow single-port ranges
if portRange.Start != portRange.End {
continue
}
fr.Port = strconv.FormatUint(uint64(portRange.Start), 10)
}
expanded = append(expanded, &fr)
}
return expanded
}
// peerSupportsPortRanges checks if the peer version supports port ranges.
func peerSupportsPortRanges(peerVer string) bool {
if strings.Contains(peerVer, "dev") {
return true
}
meetMinVer, err := posture.MeetsMinVersion(firewallRuleMinPortRangesVer, peerVer)
return err == nil && meetMinVer
}

View File

@@ -76,7 +76,6 @@ func generateRouteFirewallRules(ctx context.Context, route *nbroute.Route, rule
rules = append(rules, generateRulesWithPortRanges(baseRule, rule, rulesExists)...) rules = append(rules, generateRulesWithPortRanges(baseRule, rule, rulesExists)...)
} else { } else {
rules = append(rules, generateRulesWithPorts(ctx, baseRule, rule, rulesExists)...) rules = append(rules, generateRulesWithPorts(ctx, baseRule, rule, rulesExists)...)
} }
// TODO: generate IPv6 rules for dynamic routes // TODO: generate IPv6 rules for dynamic routes

View File

@@ -29,6 +29,7 @@ const (
Body_ANSWER Body_Type = 1 Body_ANSWER Body_Type = 1
Body_CANDIDATE Body_Type = 2 Body_CANDIDATE Body_Type = 2
Body_MODE Body_Type = 4 Body_MODE Body_Type = 4
Body_GO_IDLE Body_Type = 5
) )
// Enum value maps for Body_Type. // Enum value maps for Body_Type.
@@ -38,12 +39,14 @@ var (
1: "ANSWER", 1: "ANSWER",
2: "CANDIDATE", 2: "CANDIDATE",
4: "MODE", 4: "MODE",
5: "GO_IDLE",
} }
Body_Type_value = map[string]int32{ Body_Type_value = map[string]int32{
"OFFER": 0, "OFFER": 0,
"ANSWER": 1, "ANSWER": 1,
"CANDIDATE": 2, "CANDIDATE": 2,
"MODE": 4, "MODE": 4,
"GO_IDLE": 5,
} }
) )
@@ -225,7 +228,7 @@ type Body struct {
FeaturesSupported []uint32 `protobuf:"varint,6,rep,packed,name=featuresSupported,proto3" json:"featuresSupported,omitempty"` FeaturesSupported []uint32 `protobuf:"varint,6,rep,packed,name=featuresSupported,proto3" json:"featuresSupported,omitempty"`
// RosenpassConfig is a Rosenpass config of the remote peer our peer tries to connect to // RosenpassConfig is a Rosenpass config of the remote peer our peer tries to connect to
RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"` RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"`
// relayServerAddress is an IP:port of the relay server // relayServerAddress is url of the relay server
RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"` RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"`
} }
@@ -440,7 +443,7 @@ var file_signalexchange_proto_rawDesc = []byte{
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62, 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62,
0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52,
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xa6, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xb3, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f,
0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
@@ -463,33 +466,34 @@ var file_signalexchange_proto_rawDesc = []byte{
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01,
0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x36, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09,
0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53, 0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53,
0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41, 0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41,
0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x22, 0x2e, 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x12, 0x0b,
0x0a, 0x04, 0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x22, 0x2e, 0x0a, 0x04, 0x4d,
0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20,
0x88, 0x01, 0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01,
0x0a, 0x0f, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52,
0x67, 0x12, 0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28,
0x62, 0x4b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65,
0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61,
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65,
0x64, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18,
0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
0x0a, 0x0e, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, 0x53,
0x12, 0x4c, 0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a,
0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64,
0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c,
0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74,
0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, 0x43,
0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x73,
0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e,
0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20,
0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e,
0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
} }
var ( var (

View File

@@ -47,6 +47,7 @@ message Body {
ANSWER = 1; ANSWER = 1;
CANDIDATE = 2; CANDIDATE = 2;
MODE = 4; MODE = 4;
GO_IDLE = 5;
} }
Type type = 1; Type type = 1;
string payload = 2; string payload = 2;