mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
Compare commits
40 Commits
v0.64.2
...
feature/la
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
35db89a40a | ||
|
|
7e422b1d76 | ||
|
|
e7098d7b77 | ||
|
|
6107b394a0 | ||
|
|
231a481c0f | ||
|
|
f50b5d3ec2 | ||
|
|
d026a62076 | ||
|
|
332b2c5a88 | ||
|
|
bfe71a3f5a | ||
|
|
ab7463d7ac | ||
|
|
dddb4bcf7e | ||
|
|
f09d86151b | ||
|
|
a3e7604661 | ||
|
|
0f1f023c2b | ||
|
|
04f18bfbdb | ||
|
|
a89a6c00de | ||
|
|
3ff3bbe782 | ||
|
|
b38ddd8479 | ||
|
|
f8a4cfb611 | ||
|
|
4c39ba3ffb | ||
|
|
feb8355cdf | ||
|
|
4e33582aaa | ||
|
|
4a5edc1374 | ||
|
|
d0d37babe2 | ||
|
|
aa3cd20214 | ||
|
|
4f668fdf5f | ||
|
|
fcca194c8d | ||
|
|
a27227ff6d | ||
|
|
6ad66fe3cd | ||
|
|
9188fcabaf | ||
|
|
884d10cceb | ||
|
|
e6e070b2e5 | ||
|
|
f898904fa8 | ||
|
|
e2e1458878 | ||
|
|
309e825d58 | ||
|
|
8542674a83 | ||
|
|
0457251d09 | ||
|
|
d6e185086f | ||
|
|
c8b031e819 | ||
|
|
db278dba14 |
@@ -201,14 +201,30 @@ func (c *KernelConfigurer) configure(config wgtypes.Config) error {
|
|||||||
func (c *KernelConfigurer) Close() {
|
func (c *KernelConfigurer) Close() {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *KernelConfigurer) GetStats(peerKey string) (WGStats, error) {
|
func (c *KernelConfigurer) GetStats() (map[string]WGStats, error) {
|
||||||
peer, err := c.getPeer(c.deviceName, peerKey)
|
stats := make(map[string]WGStats)
|
||||||
|
wg, err := wgctrl.New()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return WGStats{}, fmt.Errorf("get wireguard stats: %w", err)
|
return nil, fmt.Errorf("wgctl: %w", err)
|
||||||
}
|
}
|
||||||
return WGStats{
|
defer func() {
|
||||||
LastHandshake: peer.LastHandshakeTime,
|
err = wg.Close()
|
||||||
TxBytes: peer.TransmitBytes,
|
if err != nil {
|
||||||
RxBytes: peer.ReceiveBytes,
|
log.Errorf("Got error while closing wgctl: %v", err)
|
||||||
}, nil
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
wgDevice, err := wg.Device(c.deviceName)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get device %s: %w", c.deviceName, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, peer := range wgDevice.Peers {
|
||||||
|
stats[peer.PublicKey.String()] = WGStats{
|
||||||
|
LastHandshake: peer.LastHandshakeTime,
|
||||||
|
TxBytes: peer.TransmitBytes,
|
||||||
|
RxBytes: peer.ReceiveBytes,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return stats, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package configurer
|
package configurer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/base64"
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
@@ -17,6 +18,13 @@ import (
|
|||||||
nbnet "github.com/netbirdio/netbird/util/net"
|
nbnet "github.com/netbirdio/netbird/util/net"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
ipcKeyLastHandshakeTimeSec = "last_handshake_time_sec"
|
||||||
|
ipcKeyLastHandshakeTimeNsec = "last_handshake_time_nsec"
|
||||||
|
ipcKeyTxBytes = "tx_bytes"
|
||||||
|
ipcKeyRxBytes = "rx_bytes"
|
||||||
|
)
|
||||||
|
|
||||||
var ErrAllowedIPNotFound = fmt.Errorf("allowed IP not found")
|
var ErrAllowedIPNotFound = fmt.Errorf("allowed IP not found")
|
||||||
|
|
||||||
type WGUSPConfigurer struct {
|
type WGUSPConfigurer struct {
|
||||||
@@ -217,91 +225,75 @@ func (t *WGUSPConfigurer) Close() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *WGUSPConfigurer) GetStats(peerKey string) (WGStats, error) {
|
func (t *WGUSPConfigurer) GetStats() (map[string]WGStats, error) {
|
||||||
ipc, err := t.device.IpcGet()
|
ipc, err := t.device.IpcGet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return WGStats{}, fmt.Errorf("ipc get: %w", err)
|
return nil, fmt.Errorf("ipc get: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
stats, err := findPeerInfo(ipc, peerKey, []string{
|
return parseTransfers(ipc)
|
||||||
"last_handshake_time_sec",
|
|
||||||
"last_handshake_time_nsec",
|
|
||||||
"tx_bytes",
|
|
||||||
"rx_bytes",
|
|
||||||
})
|
|
||||||
if err != nil {
|
|
||||||
return WGStats{}, fmt.Errorf("find peer info: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
sec, err := strconv.ParseInt(stats["last_handshake_time_sec"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return WGStats{}, fmt.Errorf("parse handshake sec: %w", err)
|
|
||||||
}
|
|
||||||
nsec, err := strconv.ParseInt(stats["last_handshake_time_nsec"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return WGStats{}, fmt.Errorf("parse handshake nsec: %w", err)
|
|
||||||
}
|
|
||||||
txBytes, err := strconv.ParseInt(stats["tx_bytes"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return WGStats{}, fmt.Errorf("parse tx_bytes: %w", err)
|
|
||||||
}
|
|
||||||
rxBytes, err := strconv.ParseInt(stats["rx_bytes"], 10, 64)
|
|
||||||
if err != nil {
|
|
||||||
return WGStats{}, fmt.Errorf("parse rx_bytes: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return WGStats{
|
|
||||||
LastHandshake: time.Unix(sec, nsec),
|
|
||||||
TxBytes: txBytes,
|
|
||||||
RxBytes: rxBytes,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func findPeerInfo(ipcInput string, peerKey string, searchConfigKeys []string) (map[string]string, error) {
|
func parseTransfers(ipc string) (map[string]WGStats, error) {
|
||||||
peerKeyParsed, err := wgtypes.ParseKey(peerKey)
|
stats := make(map[string]WGStats)
|
||||||
if err != nil {
|
var (
|
||||||
return nil, fmt.Errorf("parse key: %w", err)
|
currentKey string
|
||||||
}
|
currentStats WGStats
|
||||||
|
hasPeer bool
|
||||||
hexKey := hex.EncodeToString(peerKeyParsed[:])
|
)
|
||||||
|
lines := strings.Split(ipc, "\n")
|
||||||
lines := strings.Split(ipcInput, "\n")
|
|
||||||
|
|
||||||
configFound := map[string]string{}
|
|
||||||
foundPeer := false
|
|
||||||
for _, line := range lines {
|
for _, line := range lines {
|
||||||
line = strings.TrimSpace(line)
|
line = strings.TrimSpace(line)
|
||||||
|
|
||||||
// If we're within the details of the found peer and encounter another public key,
|
// If we're within the details of the found peer and encounter another public key,
|
||||||
// this means we're starting another peer's details. So, stop.
|
// this means we're starting another peer's details. So, stop.
|
||||||
if strings.HasPrefix(line, "public_key=") && foundPeer {
|
if strings.HasPrefix(line, "public_key=") {
|
||||||
break
|
peerID := strings.TrimPrefix(line, "public_key=")
|
||||||
}
|
h, err := hex.DecodeString(peerID)
|
||||||
|
if err != nil {
|
||||||
// Identify the peer with the specific public key
|
return nil, fmt.Errorf("decode peerID: %w", err)
|
||||||
if line == fmt.Sprintf("public_key=%s", hexKey) {
|
|
||||||
foundPeer = true
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, key := range searchConfigKeys {
|
|
||||||
if foundPeer && strings.HasPrefix(line, key+"=") {
|
|
||||||
v := strings.SplitN(line, "=", 2)
|
|
||||||
configFound[v[0]] = v[1]
|
|
||||||
}
|
}
|
||||||
|
currentKey = base64.StdEncoding.EncodeToString(h)
|
||||||
|
currentStats = WGStats{} // Reset stats for the new peer
|
||||||
|
hasPeer = true
|
||||||
|
stats[currentKey] = currentStats
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hasPeer {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
key := strings.SplitN(line, "=", 2)
|
||||||
|
if len(key) != 2 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
switch key[0] {
|
||||||
|
case ipcKeyLastHandshakeTimeSec:
|
||||||
|
hs, err := toLastHandshake(key[1])
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
currentStats.LastHandshake = hs
|
||||||
|
stats[currentKey] = currentStats
|
||||||
|
case ipcKeyRxBytes:
|
||||||
|
rxBytes, err := toBytes(key[1])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse rx_bytes: %w", err)
|
||||||
|
}
|
||||||
|
currentStats.RxBytes = rxBytes
|
||||||
|
stats[currentKey] = currentStats
|
||||||
|
case ipcKeyTxBytes:
|
||||||
|
TxBytes, err := toBytes(key[1])
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("parse tx_bytes: %w", err)
|
||||||
|
}
|
||||||
|
currentStats.TxBytes = TxBytes
|
||||||
|
stats[currentKey] = currentStats
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: use multierr
|
return stats, nil
|
||||||
for _, key := range searchConfigKeys {
|
|
||||||
if _, ok := configFound[key]; !ok {
|
|
||||||
return configFound, fmt.Errorf("config key not found: %s", key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if !foundPeer {
|
|
||||||
return nil, fmt.Errorf("%w: %s", ErrPeerNotFound, peerKey)
|
|
||||||
}
|
|
||||||
|
|
||||||
return configFound, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func toWgUserspaceString(wgCfg wgtypes.Config) string {
|
func toWgUserspaceString(wgCfg wgtypes.Config) string {
|
||||||
@@ -355,6 +347,22 @@ func toWgUserspaceString(wgCfg wgtypes.Config) string {
|
|||||||
return sb.String()
|
return sb.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func toLastHandshake(stringVar string) (time.Time, error) {
|
||||||
|
sec, err := strconv.ParseInt(stringVar, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}, fmt.Errorf("parse handshake sec: %w", err)
|
||||||
|
}
|
||||||
|
nsec, err := strconv.ParseInt(stringVar, 10, 64)
|
||||||
|
if err != nil {
|
||||||
|
return time.Time{}, fmt.Errorf("parse handshake nsec: %w", err)
|
||||||
|
}
|
||||||
|
return time.Unix(sec, nsec), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func toBytes(s string) (int64, error) {
|
||||||
|
return strconv.ParseInt(s, 10, 64)
|
||||||
|
}
|
||||||
|
|
||||||
func getFwmark() int {
|
func getFwmark() int {
|
||||||
if nbnet.AdvancedRouting() {
|
if nbnet.AdvancedRouting() {
|
||||||
return nbnet.NetbirdFwmark
|
return nbnet.NetbirdFwmark
|
||||||
|
|||||||
@@ -2,10 +2,8 @@ package configurer
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/hex"
|
"encoding/hex"
|
||||||
"fmt"
|
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
)
|
)
|
||||||
@@ -34,58 +32,35 @@ errno=0
|
|||||||
|
|
||||||
`
|
`
|
||||||
|
|
||||||
func Test_findPeerInfo(t *testing.T) {
|
func Test_parseTransfers(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
peerKey string
|
peerKey string
|
||||||
searchKeys []string
|
want WGStats
|
||||||
want map[string]string
|
|
||||||
wantErr bool
|
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "single",
|
name: "single",
|
||||||
peerKey: "58402e695ba1772b1cc9309755f043251ea77fdcf10fbe63989ceb7e19321376",
|
peerKey: "b85996fecc9c7f1fc6d2572a76eda11d59bcd20be8e543b15ce4bd85a8e75a33",
|
||||||
searchKeys: []string{"tx_bytes"},
|
want: WGStats{
|
||||||
want: map[string]string{
|
TxBytes: 0,
|
||||||
"tx_bytes": "38333",
|
RxBytes: 0,
|
||||||
},
|
},
|
||||||
wantErr: false,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "multiple",
|
name: "multiple",
|
||||||
peerKey: "58402e695ba1772b1cc9309755f043251ea77fdcf10fbe63989ceb7e19321376",
|
peerKey: "58402e695ba1772b1cc9309755f043251ea77fdcf10fbe63989ceb7e19321376",
|
||||||
searchKeys: []string{"tx_bytes", "rx_bytes"},
|
want: WGStats{
|
||||||
want: map[string]string{
|
TxBytes: 38333,
|
||||||
"tx_bytes": "38333",
|
RxBytes: 2224,
|
||||||
"rx_bytes": "2224",
|
|
||||||
},
|
},
|
||||||
wantErr: false,
|
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "lastpeer",
|
name: "lastpeer",
|
||||||
peerKey: "662e14fd594556f522604703340351258903b64f35553763f19426ab2a515c58",
|
peerKey: "662e14fd594556f522604703340351258903b64f35553763f19426ab2a515c58",
|
||||||
searchKeys: []string{"tx_bytes", "rx_bytes"},
|
want: WGStats{
|
||||||
want: map[string]string{
|
TxBytes: 1212111,
|
||||||
"tx_bytes": "1212111",
|
RxBytes: 1929999999,
|
||||||
"rx_bytes": "1929999999",
|
|
||||||
},
|
},
|
||||||
wantErr: false,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "peer not found",
|
|
||||||
peerKey: "1111111111111111111111111111111111111111111111111111111111111111",
|
|
||||||
searchKeys: nil,
|
|
||||||
want: nil,
|
|
||||||
wantErr: true,
|
|
||||||
},
|
|
||||||
{
|
|
||||||
name: "key not found",
|
|
||||||
peerKey: "662e14fd594556f522604703340351258903b64f35553763f19426ab2a515c58",
|
|
||||||
searchKeys: []string{"tx_bytes", "unknown_key"},
|
|
||||||
want: map[string]string{
|
|
||||||
"tx_bytes": "1212111",
|
|
||||||
},
|
|
||||||
wantErr: true,
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
@@ -96,9 +71,19 @@ func Test_findPeerInfo(t *testing.T) {
|
|||||||
key, err := wgtypes.NewKey(res)
|
key, err := wgtypes.NewKey(res)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
got, err := findPeerInfo(ipcFixture, key.String(), tt.searchKeys)
|
stats, err := parseTransfers(ipcFixture)
|
||||||
assert.Equalf(t, tt.wantErr, err != nil, fmt.Sprintf("findPeerInfo(%v, %v, %v)", ipcFixture, key.String(), tt.searchKeys))
|
if err != nil {
|
||||||
assert.Equalf(t, tt.want, got, "findPeerInfo(%v, %v, %v)", ipcFixture, key.String(), tt.searchKeys)
|
require.NoError(t, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
stat, ok := stats[key.String()]
|
||||||
|
if !ok {
|
||||||
|
require.True(t, ok)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
require.Equal(t, tt.want, stat)
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -16,5 +16,5 @@ type WGConfigurer interface {
|
|||||||
AddAllowedIP(peerKey string, allowedIP string) error
|
AddAllowedIP(peerKey string, allowedIP string) error
|
||||||
RemoveAllowedIP(peerKey string, allowedIP string) error
|
RemoveAllowedIP(peerKey string, allowedIP string) error
|
||||||
Close()
|
Close()
|
||||||
GetStats(peerKey string) (configurer.WGStats, error)
|
GetStats() (map[string]configurer.WGStats, error)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -213,9 +213,9 @@ func (w *WGIface) GetWGDevice() *wgdevice.Device {
|
|||||||
return w.tun.Device()
|
return w.tun.Device()
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetStats returns the last handshake time, rx and tx bytes for the given peer
|
// GetStats returns the last handshake time, rx and tx bytes
|
||||||
func (w *WGIface) GetStats(peerKey string) (configurer.WGStats, error) {
|
func (w *WGIface) GetStats() (map[string]configurer.WGStats, error) {
|
||||||
return w.configurer.GetStats(peerKey)
|
return w.configurer.GetStats()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WGIface) waitUntilRemoved() error {
|
func (w *WGIface) waitUntilRemoved() error {
|
||||||
|
|||||||
179
client/internal/conn_mgr.go
Normal file
179
client/internal/conn_mgr.go
Normal file
@@ -0,0 +1,179 @@
|
|||||||
|
package internal
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"os"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn/manager"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/peer"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/peerstore"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
envDisableLazyConn = "NB_LAZY_CONN_DISABLE"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ConnMgr coordinates both lazy connections (established on-demand) and permanent peer connections.
|
||||||
|
//
|
||||||
|
// The connection manager is responsible for:
|
||||||
|
// - Managing lazy connections via the lazyConnManager
|
||||||
|
// - Maintaining a list of excluded peers that should always have permanent connections
|
||||||
|
// - Handling connection establishment based on peer signaling
|
||||||
|
type ConnMgr struct {
|
||||||
|
peerStore *peerstore.Store
|
||||||
|
lazyConnMgr *manager.Manager
|
||||||
|
|
||||||
|
connStateListener *peer.ConnectionListener
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
wg sync.WaitGroup
|
||||||
|
ctx context.Context
|
||||||
|
ctxCancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConnMgr(peerStore *peerstore.Store, iface lazyconn.WGIface, dispatcher *peer.ConnectionDispatcher) *ConnMgr {
|
||||||
|
var lazyConnMgr *manager.Manager
|
||||||
|
if os.Getenv(envDisableLazyConn) != "true" {
|
||||||
|
lazyConnMgr = manager.NewManager(iface, dispatcher)
|
||||||
|
}
|
||||||
|
|
||||||
|
e := &ConnMgr{
|
||||||
|
peerStore: peerStore,
|
||||||
|
lazyConnMgr: lazyConnMgr,
|
||||||
|
}
|
||||||
|
return e
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) Start(parentCtx context.Context) {
|
||||||
|
if e.lazyConnMgr == nil {
|
||||||
|
log.Infof("lazy connection manager is disabled")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(parentCtx)
|
||||||
|
e.ctx = ctx
|
||||||
|
e.ctxCancel = cancel
|
||||||
|
|
||||||
|
e.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer e.wg.Done()
|
||||||
|
e.lazyConnMgr.Start(ctx, e.onActive, e.onInactive)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) AddExcludeFromLazyConnection(peerID string) {
|
||||||
|
e.lazyConnMgr.ExcludePeer(peerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) AddPeerConn(peerKey string, conn *peer.Conn) (exists bool) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
if success := e.peerStore.AddPeerConn(peerKey, conn); !success {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
if !e.isStartedWithLazyMgr() {
|
||||||
|
if err := conn.Open(e.ctx); err != nil {
|
||||||
|
conn.Log.Errorf("failed to open connection: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
lazyPeerCfg := lazyconn.PeerConfig{
|
||||||
|
PublicKey: peerKey,
|
||||||
|
AllowedIPs: conn.WgConfig().AllowedIps,
|
||||||
|
PeerConnID: conn.ConnID(),
|
||||||
|
Log: conn.Log,
|
||||||
|
}
|
||||||
|
excluded, err := e.lazyConnMgr.AddPeer(lazyPeerCfg)
|
||||||
|
if err != nil {
|
||||||
|
conn.Log.Errorf("failed to add peer to lazyconn manager: %v", err)
|
||||||
|
if err := conn.Open(e.ctx); err != nil {
|
||||||
|
conn.Log.Errorf("failed to open connection: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if excluded {
|
||||||
|
conn.Log.Infof("peer is on lazy conn manager exclude list, opening connection")
|
||||||
|
if err := conn.Open(e.ctx); err != nil {
|
||||||
|
conn.Log.Errorf("failed to open connection: %v", err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.Log.Infof("peer added to lazy conn manager")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) OnSignalMsg(peerKey string) (*peer.Conn, bool) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
conn, ok := e.peerStore.PeerConn(peerKey)
|
||||||
|
if !ok {
|
||||||
|
return nil, false
|
||||||
|
}
|
||||||
|
|
||||||
|
if !e.isStartedWithLazyMgr() {
|
||||||
|
return conn, true
|
||||||
|
}
|
||||||
|
|
||||||
|
if found := e.lazyConnMgr.RunInactivityMonitor(peerKey); found {
|
||||||
|
if err := conn.Open(e.ctx); err != nil {
|
||||||
|
conn.Log.Errorf("failed to open connection: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return conn, true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) RemovePeerConn(peerKey string) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
conn, ok := e.peerStore.Remove(peerKey)
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer conn.Close()
|
||||||
|
|
||||||
|
if !e.isStartedWithLazyMgr() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
e.lazyConnMgr.RemovePeer(peerKey)
|
||||||
|
conn.Log.Infof("removed peer from lazy conn manager")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) Close() {
|
||||||
|
if !e.isStartedWithLazyMgr() {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
e.ctxCancel()
|
||||||
|
e.wg.Wait()
|
||||||
|
e.lazyConnMgr = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) onActive(peerID string) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
e.peerStore.PeerConnOpen(e.ctx, peerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) onInactive(peerID string) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
e.peerStore.PeerConnClose(peerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnMgr) isStartedWithLazyMgr() bool {
|
||||||
|
return e.lazyConnMgr != nil && e.ctxCancel != nil
|
||||||
|
}
|
||||||
@@ -6,7 +6,6 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/iface"
|
"github.com/netbirdio/netbird/client/iface"
|
||||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
|
||||||
"github.com/netbirdio/netbird/client/iface/device"
|
"github.com/netbirdio/netbird/client/iface/device"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -18,5 +17,4 @@ type WGIface interface {
|
|||||||
IsUserspaceBind() bool
|
IsUserspaceBind() bool
|
||||||
GetFilter() device.PacketFilter
|
GetFilter() device.PacketFilter
|
||||||
GetDevice() *device.FilteredDevice
|
GetDevice() *device.FilteredDevice
|
||||||
GetStats(peerKey string) (configurer.WGStats, error)
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ package dns
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/netbirdio/netbird/client/iface"
|
"github.com/netbirdio/netbird/client/iface"
|
||||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
|
||||||
"github.com/netbirdio/netbird/client/iface/device"
|
"github.com/netbirdio/netbird/client/iface/device"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -13,6 +12,5 @@ type WGIface interface {
|
|||||||
IsUserspaceBind() bool
|
IsUserspaceBind() bool
|
||||||
GetFilter() device.PacketFilter
|
GetFilter() device.PacketFilter
|
||||||
GetDevice() *device.FilteredDevice
|
GetDevice() *device.FilteredDevice
|
||||||
GetStats(peerKey string) (configurer.WGStats, error)
|
|
||||||
GetInterfaceGUIDString() (string, error)
|
GetInterfaceGUIDString() (string, error)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -131,6 +131,8 @@ type Engine struct {
|
|||||||
// peerConns is a map that holds all the peers that are known to this peer
|
// peerConns is a map that holds all the peers that are known to this peer
|
||||||
peerStore *peerstore.Store
|
peerStore *peerstore.Store
|
||||||
|
|
||||||
|
connMgr *ConnMgr
|
||||||
|
|
||||||
beforePeerHook nbnet.AddHookFunc
|
beforePeerHook nbnet.AddHookFunc
|
||||||
afterPeerHook nbnet.RemoveHookFunc
|
afterPeerHook nbnet.RemoveHookFunc
|
||||||
|
|
||||||
@@ -167,7 +169,8 @@ type Engine struct {
|
|||||||
sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error)
|
sshServerFunc func(hostKeyPEM []byte, addr string) (nbssh.Server, error)
|
||||||
sshServer nbssh.Server
|
sshServer nbssh.Server
|
||||||
|
|
||||||
statusRecorder *peer.Status
|
statusRecorder *peer.Status
|
||||||
|
peerConnDispatcher *peer.ConnectionDispatcher
|
||||||
|
|
||||||
firewall manager.Manager
|
firewall manager.Manager
|
||||||
routeManager routemanager.Manager
|
routeManager routemanager.Manager
|
||||||
@@ -257,6 +260,8 @@ func (e *Engine) Stop() error {
|
|||||||
e.syncMsgMux.Lock()
|
e.syncMsgMux.Lock()
|
||||||
defer e.syncMsgMux.Unlock()
|
defer e.syncMsgMux.Unlock()
|
||||||
|
|
||||||
|
e.connMgr.Close()
|
||||||
|
|
||||||
// stopping network monitor first to avoid starting the engine again
|
// stopping network monitor first to avoid starting the engine again
|
||||||
if e.networkMonitor != nil {
|
if e.networkMonitor != nil {
|
||||||
e.networkMonitor.Stop()
|
e.networkMonitor.Stop()
|
||||||
@@ -285,8 +290,7 @@ func (e *Engine) Stop() error {
|
|||||||
e.statusRecorder.UpdateDNSStates([]peer.NSGroupState{})
|
e.statusRecorder.UpdateDNSStates([]peer.NSGroupState{})
|
||||||
e.statusRecorder.UpdateRelayStates([]relay.ProbeResult{})
|
e.statusRecorder.UpdateRelayStates([]relay.ProbeResult{})
|
||||||
|
|
||||||
err := e.removeAllPeers()
|
if err := e.removeAllPeers(); err != nil {
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to remove all peers: %s", err)
|
return fmt.Errorf("failed to remove all peers: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -420,6 +424,11 @@ func (e *Engine) Start() error {
|
|||||||
NATExternalIPs: e.parseNATExternalIPMappings(),
|
NATExternalIPs: e.parseNATExternalIPMappings(),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.peerConnDispatcher = peer.NewConnectionDispatcher()
|
||||||
|
|
||||||
|
e.connMgr = NewConnMgr(e.peerStore, wgIface, e.peerConnDispatcher)
|
||||||
|
e.connMgr.Start(e.ctx)
|
||||||
|
|
||||||
e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg)
|
e.srWatcher = guard.NewSRWatcher(e.signal, e.relayManager, e.mobileDep.IFaceDiscover, iceCfg)
|
||||||
e.srWatcher.Start()
|
e.srWatcher.Start()
|
||||||
|
|
||||||
@@ -598,16 +607,11 @@ func (e *Engine) removePeer(peerKey string) error {
|
|||||||
e.sshServer.RemoveAuthorizedKey(peerKey)
|
e.sshServer.RemoveAuthorizedKey(peerKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
defer func() {
|
e.connMgr.RemovePeerConn(peerKey)
|
||||||
err := e.statusRecorder.RemovePeer(peerKey)
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
conn, exists := e.peerStore.Remove(peerKey)
|
err := e.statusRecorder.RemovePeer(peerKey)
|
||||||
if exists {
|
if err != nil {
|
||||||
conn.Close()
|
log.Warnf("received error when removing peer %s from status recorder: %v", peerKey, err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -1084,7 +1088,7 @@ func (e *Engine) updateOfflinePeers(offlinePeers []*mgmProto.RemotePeerConfig) {
|
|||||||
IP: strings.Join(offlinePeer.GetAllowedIps(), ","),
|
IP: strings.Join(offlinePeer.GetAllowedIps(), ","),
|
||||||
PubKey: offlinePeer.GetWgPubKey(),
|
PubKey: offlinePeer.GetWgPubKey(),
|
||||||
FQDN: offlinePeer.GetFqdn(),
|
FQDN: offlinePeer.GetFqdn(),
|
||||||
ConnStatus: peer.StatusDisconnected,
|
ConnStatus: peer.StatusIdle,
|
||||||
ConnStatusUpdate: time.Now(),
|
ConnStatusUpdate: time.Now(),
|
||||||
Mux: new(sync.RWMutex),
|
Mux: new(sync.RWMutex),
|
||||||
}
|
}
|
||||||
@@ -1125,7 +1129,7 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
|
|||||||
return fmt.Errorf("create peer connection: %w", err)
|
return fmt.Errorf("create peer connection: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ok := e.peerStore.AddPeerConn(peerKey, conn); !ok {
|
if exists := e.connMgr.AddPeerConn(peerKey, conn); exists {
|
||||||
conn.Close()
|
conn.Close()
|
||||||
return fmt.Errorf("peer already exists: %s", peerKey)
|
return fmt.Errorf("peer already exists: %s", peerKey)
|
||||||
}
|
}
|
||||||
@@ -1135,12 +1139,11 @@ func (e *Engine) addNewPeer(peerConfig *mgmProto.RemotePeerConfig) error {
|
|||||||
conn.AddAfterRemovePeerHook(e.afterPeerHook)
|
conn.AddAfterRemovePeerHook(e.afterPeerHook)
|
||||||
}
|
}
|
||||||
|
|
||||||
err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn)
|
err = e.statusRecorder.AddPeer(peerKey, peerConfig.Fqdn, peerIPs[0].Addr().String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
|
log.Warnf("error adding peer %s to status recorder, got error: %v", peerKey, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.Open()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1195,7 +1198,7 @@ func (e *Engine) createPeerConn(pubKey string, allowedIPs []netip.Prefix) (*peer
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
peerConn, err := peer.NewConn(e.ctx, config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore)
|
peerConn, err := peer.NewConn(config, e.statusRecorder, e.signaler, e.mobileDep.IFaceDiscover, e.relayManager, e.srWatcher, e.connSemaphore, e.peerConnDispatcher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -1216,7 +1219,7 @@ func (e *Engine) receiveSignalEvents() {
|
|||||||
e.syncMsgMux.Lock()
|
e.syncMsgMux.Lock()
|
||||||
defer e.syncMsgMux.Unlock()
|
defer e.syncMsgMux.Unlock()
|
||||||
|
|
||||||
conn, ok := e.peerStore.PeerConn(msg.Key)
|
conn, ok := e.connMgr.OnSignalMsg(msg.Key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
||||||
}
|
}
|
||||||
@@ -1542,13 +1545,17 @@ func (e *Engine) RunHealthProbes() bool {
|
|||||||
}
|
}
|
||||||
log.Debugf("relay health check: healthy=%t", relayHealthy)
|
log.Debugf("relay health check: healthy=%t", relayHealthy)
|
||||||
|
|
||||||
|
stats, err := e.wgInterface.GetStats()
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to get wireguard stats: %v", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
for _, key := range e.peerStore.PeersPubKey() {
|
for _, key := range e.peerStore.PeersPubKey() {
|
||||||
wgStats, err := e.wgInterface.GetStats(key)
|
// wgStats could be zero value, in which case we just reset the stats
|
||||||
if err != nil {
|
wgStats, ok := stats[key]
|
||||||
log.Debugf("failed to get wg stats for peer %s: %s", key, err)
|
if !ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// wgStats could be zero value, in which case we just reset the stats
|
|
||||||
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
|
if err := e.statusRecorder.UpdateWireGuardPeerState(key, wgStats); err != nil {
|
||||||
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
|
log.Debugf("failed to update wg stats for peer %s: %s", key, err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -87,7 +87,7 @@ type MockWGIface struct {
|
|||||||
GetFilterFunc func() device.PacketFilter
|
GetFilterFunc func() device.PacketFilter
|
||||||
GetDeviceFunc func() *device.FilteredDevice
|
GetDeviceFunc func() *device.FilteredDevice
|
||||||
GetWGDeviceFunc func() *wgdevice.Device
|
GetWGDeviceFunc func() *wgdevice.Device
|
||||||
GetStatsFunc func(peerKey string) (configurer.WGStats, error)
|
GetStatsFunc func() (map[string]configurer.WGStats, error)
|
||||||
GetInterfaceGUIDStringFunc func() (string, error)
|
GetInterfaceGUIDStringFunc func() (string, error)
|
||||||
GetProxyFunc func() wgproxy.Proxy
|
GetProxyFunc func() wgproxy.Proxy
|
||||||
GetNetFunc func() *netstack.Net
|
GetNetFunc func() *netstack.Net
|
||||||
@@ -165,8 +165,8 @@ func (m *MockWGIface) GetWGDevice() *wgdevice.Device {
|
|||||||
return m.GetWGDeviceFunc()
|
return m.GetWGDeviceFunc()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockWGIface) GetStats(peerKey string) (configurer.WGStats, error) {
|
func (m *MockWGIface) GetStats() (map[string]configurer.WGStats, error) {
|
||||||
return m.GetStatsFunc(peerKey)
|
return m.GetStatsFunc()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockWGIface) GetProxy() wgproxy.Proxy {
|
func (m *MockWGIface) GetProxy() wgproxy.Proxy {
|
||||||
@@ -394,6 +394,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
|
|||||||
engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn})
|
engine.udpMux = bind.NewUniversalUDPMuxDefault(bind.UniversalUDPMuxParams{UDPConn: conn})
|
||||||
engine.ctx = ctx
|
engine.ctx = ctx
|
||||||
engine.srWatcher = guard.NewSRWatcher(nil, nil, nil, icemaker.Config{})
|
engine.srWatcher = guard.NewSRWatcher(nil, nil, nil, icemaker.Config{})
|
||||||
|
engine.connMgr = NewConnMgr(engine.peerStore, wgIface, peer.NewConnectionDispatcher())
|
||||||
|
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
name string
|
name string
|
||||||
@@ -764,6 +765,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
|
|||||||
|
|
||||||
engine.routeManager = mockRouteManager
|
engine.routeManager = mockRouteManager
|
||||||
engine.dnsServer = &dns.MockServer{}
|
engine.dnsServer = &dns.MockServer{}
|
||||||
|
engine.connMgr = NewConnMgr(engine.peerStore, engine.wgInterface, peer.NewConnectionDispatcher())
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
exitErr := engine.Stop()
|
exitErr := engine.Stop()
|
||||||
@@ -960,6 +962,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
engine.dnsServer = mockDNSServer
|
engine.dnsServer = mockDNSServer
|
||||||
|
engine.connMgr = NewConnMgr(engine.peerStore, engine.wgInterface, peer.NewConnectionDispatcher())
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
exitErr := engine.Stop()
|
exitErr := engine.Stop()
|
||||||
@@ -981,6 +984,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
|
|||||||
|
|
||||||
func TestEngine_MultiplePeers(t *testing.T) {
|
func TestEngine_MultiplePeers(t *testing.T) {
|
||||||
// log.SetLevel(log.DebugLevel)
|
// log.SetLevel(log.DebugLevel)
|
||||||
|
t.Setenv(envDisableLazyConn, "true")
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(CtxInitState(context.Background()))
|
ctx, cancel := context.WithCancel(CtxInitState(context.Background()))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|||||||
@@ -34,6 +34,6 @@ type wgIfaceBase interface {
|
|||||||
GetFilter() device.PacketFilter
|
GetFilter() device.PacketFilter
|
||||||
GetDevice() *device.FilteredDevice
|
GetDevice() *device.FilteredDevice
|
||||||
GetWGDevice() *wgdevice.Device
|
GetWGDevice() *wgdevice.Device
|
||||||
GetStats(peerKey string) (configurer.WGStats, error)
|
GetStats() (map[string]configurer.WGStats, error)
|
||||||
GetNet() *netstack.Net
|
GetNet() *netstack.Net
|
||||||
}
|
}
|
||||||
|
|||||||
56
client/internal/lazyconn/activity/allocator.go
Normal file
56
client/internal/lazyconn/activity/allocator.go
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
package activity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"net"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
retryLimit = 5000
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
listenIP = net.ParseIP("127.0.0.1")
|
||||||
|
ErrNoFreePort = fmt.Errorf("no free port")
|
||||||
|
)
|
||||||
|
|
||||||
|
// portAllocator lookup for free port and allocate it
|
||||||
|
type portAllocator struct {
|
||||||
|
nextFreePort uint16
|
||||||
|
}
|
||||||
|
|
||||||
|
func newPortAllocator() *portAllocator {
|
||||||
|
return &portAllocator{
|
||||||
|
nextFreePort: 65535,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *portAllocator) newConn() (*net.UDPConn, *net.UDPAddr, error) {
|
||||||
|
for i := 0; i < retryLimit; i++ {
|
||||||
|
addr := &net.UDPAddr{
|
||||||
|
Port: p.nextPort(),
|
||||||
|
IP: listenIP,
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, err := net.ListenUDP("udp", addr)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to listen on port %d: %v", addr.Port, err)
|
||||||
|
// port could be allocated by another process
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
return conn, addr, nil
|
||||||
|
}
|
||||||
|
return nil, nil, ErrNoFreePort
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *portAllocator) nextPort() int {
|
||||||
|
port := p.nextFreePort
|
||||||
|
p.nextFreePort--
|
||||||
|
if p.nextFreePort == 1024 {
|
||||||
|
p.nextFreePort = 65535
|
||||||
|
}
|
||||||
|
return int(port)
|
||||||
|
}
|
||||||
34
client/internal/lazyconn/activity/allocator_test.go
Normal file
34
client/internal/lazyconn/activity/allocator_test.go
Normal file
@@ -0,0 +1,34 @@
|
|||||||
|
package activity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
func Test_portAllocator_newConn(t *testing.T) {
|
||||||
|
pa := newPortAllocator()
|
||||||
|
for i := 65535; i > 65535-10; i-- {
|
||||||
|
conn, addr, err := pa.newConn()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("newConn() error = %v, want nil", err)
|
||||||
|
}
|
||||||
|
if addr.Port != i {
|
||||||
|
t.Errorf("newConn() addr.Port = %v, want %d", addr.Port, i)
|
||||||
|
}
|
||||||
|
_ = conn.Close()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func Test_portAllocator_port_bottom(t *testing.T) {
|
||||||
|
pa := newPortAllocator()
|
||||||
|
pa.nextFreePort = 1025
|
||||||
|
|
||||||
|
port := pa.nextPort()
|
||||||
|
if port != 1025 {
|
||||||
|
t.Errorf("nextPort() = %v, want %d", port, 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
port = pa.nextPort()
|
||||||
|
if port != 65535 {
|
||||||
|
t.Errorf("nextPort() = %v, want %d", port, 65535)
|
||||||
|
}
|
||||||
|
}
|
||||||
69
client/internal/lazyconn/activity/listener.go
Normal file
69
client/internal/lazyconn/activity/listener.go
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
package activity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Listener struct {
|
||||||
|
wgIface lazyconn.WGIface
|
||||||
|
peerCfg lazyconn.PeerConfig
|
||||||
|
conn *net.UDPConn
|
||||||
|
endpoint *net.UDPAddr
|
||||||
|
done sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewListener(wgIface lazyconn.WGIface, cfg lazyconn.PeerConfig, conn *net.UDPConn, addr *net.UDPAddr) (*Listener, error) {
|
||||||
|
d := &Listener{
|
||||||
|
wgIface: wgIface,
|
||||||
|
peerCfg: cfg,
|
||||||
|
conn: conn,
|
||||||
|
endpoint: addr,
|
||||||
|
}
|
||||||
|
if err := d.createEndpoint(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
d.done.Lock()
|
||||||
|
return d, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Listener) ReadPackets() {
|
||||||
|
for {
|
||||||
|
buffer := make([]byte, 10)
|
||||||
|
n, remoteAddr, err := d.conn.ReadFromUDP(buffer)
|
||||||
|
if err != nil {
|
||||||
|
log.Infof("exit from peer listener reader: %v", err)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
if n < 4 {
|
||||||
|
log.Warnf("received %d bytes from %s, too short", n, remoteAddr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
d.removeEndpoint()
|
||||||
|
d.done.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Listener) Close() {
|
||||||
|
if err := d.conn.Close(); err != nil {
|
||||||
|
log.Errorf("failed to close UDP listener: %s", err)
|
||||||
|
}
|
||||||
|
d.done.Lock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Listener) removeEndpoint() {
|
||||||
|
if err := d.wgIface.RemovePeer(d.peerCfg.PublicKey); err != nil {
|
||||||
|
log.Warnf("failed to remove peer listener: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *Listener) createEndpoint() error {
|
||||||
|
return d.wgIface.UpdatePeer(d.peerCfg.PublicKey, d.peerCfg.AllowedIPs, 0, d.endpoint, nil)
|
||||||
|
}
|
||||||
111
client/internal/lazyconn/activity/manager.go
Normal file
111
client/internal/lazyconn/activity/manager.go
Normal file
@@ -0,0 +1,111 @@
|
|||||||
|
package activity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type OnAcitvityEvent struct {
|
||||||
|
PeerID string
|
||||||
|
PeerConnId peer.ConnID
|
||||||
|
}
|
||||||
|
|
||||||
|
type Manager struct {
|
||||||
|
OnActivityChan chan OnAcitvityEvent
|
||||||
|
|
||||||
|
wgIface lazyconn.WGIface
|
||||||
|
|
||||||
|
portGenerator *portAllocator
|
||||||
|
peers map[string]*Listener
|
||||||
|
done chan struct{}
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewManager(wgIface lazyconn.WGIface) *Manager {
|
||||||
|
m := &Manager{
|
||||||
|
OnActivityChan: make(chan OnAcitvityEvent, 1),
|
||||||
|
wgIface: wgIface,
|
||||||
|
portGenerator: newPortAllocator(),
|
||||||
|
peers: make(map[string]*Listener),
|
||||||
|
done: make(chan struct{}),
|
||||||
|
}
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) MonitorPeerActivity(peerCfg lazyconn.PeerConfig) error {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
if _, ok := m.peers[peerCfg.PublicKey]; ok {
|
||||||
|
log.Warnf("activity listener already exists for: %s", peerCfg.PublicKey)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, addr, err := m.portGenerator.newConn()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to bind activity listener: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
listener, err := NewListener(m.wgIface, peerCfg, conn, addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.peers[peerCfg.PublicKey] = listener
|
||||||
|
|
||||||
|
go m.waitForTraffic(listener, peerCfg.PublicKey, peerCfg.PeerConnID)
|
||||||
|
|
||||||
|
peerCfg.Log.Infof("created activity listener: %s", addr.String())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) RemovePeer(peerID string) bool {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
listener, ok := m.peers[peerID]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
delete(m.peers, peerID)
|
||||||
|
listener.Close()
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Close() {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
close(m.done)
|
||||||
|
for peerID, listener := range m.peers {
|
||||||
|
listener.Close()
|
||||||
|
delete(m.peers, peerID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) waitForTraffic(listener *Listener, peerID string, peerConnID peer.ConnID) {
|
||||||
|
listener.ReadPackets()
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
if _, ok := m.peers[peerID]; !ok {
|
||||||
|
m.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
delete(m.peers, peerID)
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
m.notify(OnAcitvityEvent{PeerID: peerID, PeerConnId: peerConnID})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) notify(event OnAcitvityEvent) {
|
||||||
|
log.Debugf("peer activity detected: %s", event.PeerID)
|
||||||
|
select {
|
||||||
|
case <-m.done:
|
||||||
|
case m.OnActivityChan <- event:
|
||||||
|
}
|
||||||
|
}
|
||||||
58
client/internal/lazyconn/inactivity/inactivity.go
Normal file
58
client/internal/lazyconn/inactivity/inactivity.go
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
package inactivity
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
inactivityThreshold = 30 * time.Second // idle after 1 hour inactivity
|
||||||
|
)
|
||||||
|
|
||||||
|
type InactivityMonitor struct {
|
||||||
|
id peer.ConnID
|
||||||
|
timer *time.Timer
|
||||||
|
cancel context.CancelFunc
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewInactivityMonitor(peerID peer.ConnID) *InactivityMonitor {
|
||||||
|
i := &InactivityMonitor{
|
||||||
|
id: peerID,
|
||||||
|
timer: time.NewTimer(0),
|
||||||
|
}
|
||||||
|
i.timer.Stop()
|
||||||
|
return i
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InactivityMonitor) Start(ctx context.Context, timeoutChan chan peer.ConnID) {
|
||||||
|
i.timer.Reset(inactivityThreshold)
|
||||||
|
defer i.timer.Stop()
|
||||||
|
|
||||||
|
ctx, i.cancel = context.WithCancel(ctx)
|
||||||
|
defer i.cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-i.timer.C:
|
||||||
|
select {
|
||||||
|
case timeoutChan <- i.id:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InactivityMonitor) Stop() {
|
||||||
|
i.cancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InactivityMonitor) PauseTimer() {
|
||||||
|
i.timer.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *InactivityMonitor) ResetTimer() {
|
||||||
|
i.timer.Reset(inactivityThreshold)
|
||||||
|
}
|
||||||
242
client/internal/lazyconn/manager/manager.go
Normal file
242
client/internal/lazyconn/manager/manager.go
Normal file
@@ -0,0 +1,242 @@
|
|||||||
|
package manager
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn/activity"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/lazyconn/inactivity"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Manager manages lazy connections
|
||||||
|
// This is not a thread safe implementation, do not call exported functions concurrently
|
||||||
|
type Manager struct {
|
||||||
|
connStateDispatcher *peer.ConnectionDispatcher
|
||||||
|
connStateListener *peer.ConnectionListener
|
||||||
|
|
||||||
|
managedPeers map[string]*lazyconn.PeerConfig
|
||||||
|
managedPeersByConnID map[peer.ConnID]*lazyconn.PeerConfig
|
||||||
|
excludes map[string]struct{}
|
||||||
|
managedPeersMu sync.Mutex
|
||||||
|
|
||||||
|
activityManager *activity.Manager
|
||||||
|
inactivityMonitors map[peer.ConnID]*inactivity.InactivityMonitor
|
||||||
|
|
||||||
|
cancel context.CancelFunc
|
||||||
|
onInactive chan peer.ConnID
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewManager(wgIface lazyconn.WGIface, connStateDispatcher *peer.ConnectionDispatcher) *Manager {
|
||||||
|
m := &Manager{
|
||||||
|
connStateDispatcher: connStateDispatcher,
|
||||||
|
managedPeers: make(map[string]*lazyconn.PeerConfig),
|
||||||
|
managedPeersByConnID: make(map[peer.ConnID]*lazyconn.PeerConfig),
|
||||||
|
excludes: make(map[string]struct{}),
|
||||||
|
activityManager: activity.NewManager(wgIface),
|
||||||
|
inactivityMonitors: make(map[peer.ConnID]*inactivity.InactivityMonitor),
|
||||||
|
onInactive: make(chan peer.ConnID),
|
||||||
|
}
|
||||||
|
|
||||||
|
m.connStateListener = &peer.ConnectionListener{
|
||||||
|
OnConnected: m.onPeerConnected,
|
||||||
|
OnDisconnected: m.onPeerDisconnected,
|
||||||
|
}
|
||||||
|
|
||||||
|
connStateDispatcher.AddListener(m.connStateListener)
|
||||||
|
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) Start(ctx context.Context, activeFn func(peerID string), inactiveFn func(peerID string)) {
|
||||||
|
defer m.close()
|
||||||
|
|
||||||
|
ctx, m.cancel = context.WithCancel(ctx)
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case e := <-m.activityManager.OnActivityChan:
|
||||||
|
m.onPeerActivity(ctx, e, activeFn)
|
||||||
|
case peerConnID := <-m.onInactive:
|
||||||
|
m.onPeerInactivityTimedOut(peerConnID, inactiveFn)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) AddPeer(peerCfg lazyconn.PeerConfig) (bool, error) {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
peerCfg.Log.Debugf("adding peer to lazy connection manager")
|
||||||
|
|
||||||
|
_, exists := m.excludes[peerCfg.PublicKey]
|
||||||
|
if exists {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, ok := m.managedPeers[peerCfg.PublicKey]; ok {
|
||||||
|
peerCfg.Log.Warnf("peer already managed")
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := m.activityManager.MonitorPeerActivity(peerCfg); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
im := inactivity.NewInactivityMonitor(peerCfg.PeerConnID)
|
||||||
|
m.inactivityMonitors[peerCfg.PeerConnID] = im
|
||||||
|
|
||||||
|
m.managedPeers[peerCfg.PublicKey] = &peerCfg
|
||||||
|
m.managedPeersByConnID[peerCfg.PeerConnID] = &peerCfg
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) RemovePeer(peerID string) {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
cfg, ok := m.managedPeers[peerID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
cfg.Log.Infof("removing lazy peer")
|
||||||
|
|
||||||
|
if im, ok := m.inactivityMonitors[cfg.PeerConnID]; ok {
|
||||||
|
im.Stop()
|
||||||
|
delete(m.inactivityMonitors, cfg.PeerConnID)
|
||||||
|
cfg.Log.Debugf("inactivity monitor stopped")
|
||||||
|
}
|
||||||
|
|
||||||
|
m.activityManager.RemovePeer(peerID)
|
||||||
|
delete(m.managedPeers, peerID)
|
||||||
|
delete(m.managedPeersByConnID, cfg.PeerConnID)
|
||||||
|
cfg.Log.Debugf("activity listener removed")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) RunInactivityMonitor(peerID string) (found bool) {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
cfg, ok := m.managedPeers[peerID]
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if removed := m.activityManager.RemovePeer(peerID); !removed {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
m.inactivityMonitors[cfg.PeerConnID].ResetTimer()
|
||||||
|
|
||||||
|
cfg.Log.Infof("stoped activity monitor and reset inactivity monitor")
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) ExcludePeer(peerID string) {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
m.excludes[peerID] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) close() {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
m.cancel()
|
||||||
|
|
||||||
|
m.connStateDispatcher.RemoveListener(m.connStateListener)
|
||||||
|
m.activityManager.Close()
|
||||||
|
for _, iw := range m.inactivityMonitors {
|
||||||
|
iw.Stop()
|
||||||
|
}
|
||||||
|
m.inactivityMonitors = make(map[peer.ConnID]*inactivity.InactivityMonitor)
|
||||||
|
m.managedPeers = make(map[string]*lazyconn.PeerConfig)
|
||||||
|
log.Infof("lazy connection manager closed")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) onPeerActivity(ctx context.Context, e activity.OnAcitvityEvent, onActiveListenerFn func(peerID string)) {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
pcfg, ok := m.managedPeersByConnID[e.PeerConnId]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
pcfg.Log.Infof("detected peer activity")
|
||||||
|
|
||||||
|
pcfg.Log.Infof("starting inactivity monitor")
|
||||||
|
go m.inactivityMonitors[e.PeerConnId].Start(ctx, m.onInactive)
|
||||||
|
|
||||||
|
onActiveListenerFn(e.PeerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) onPeerInactivityTimedOut(peerConnID peer.ConnID, onInactiveListenerFn func(peerID string)) {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
pcfg, ok := m.managedPeersByConnID[peerConnID]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
pcfg.Log.Infof("connection timed out")
|
||||||
|
|
||||||
|
if _, ok := m.inactivityMonitors[peerConnID]; !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
onInactiveListenerFn(pcfg.PublicKey)
|
||||||
|
|
||||||
|
pcfg.Log.Infof("start activity monitor")
|
||||||
|
|
||||||
|
// just in case free up
|
||||||
|
m.inactivityMonitors[pcfg.PeerConnID].PauseTimer()
|
||||||
|
if err := m.activityManager.MonitorPeerActivity(*pcfg); err != nil {
|
||||||
|
pcfg.Log.Errorf("failed to create activity monitor: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) onPeerConnected(conn *peer.Conn) {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
peerCfg, ok := m.managedPeers[conn.GetKey()]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
iw, ok := m.inactivityMonitors[conn.ConnID()]
|
||||||
|
if !ok {
|
||||||
|
conn.Log.Errorf("inactivity monitor not found for peer")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
peerCfg.Log.Infof("pause inactivity monitor")
|
||||||
|
iw.PauseTimer()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) onPeerDisconnected(conn *peer.Conn) {
|
||||||
|
m.managedPeersMu.Lock()
|
||||||
|
defer m.managedPeersMu.Unlock()
|
||||||
|
|
||||||
|
peerCfg, ok := m.managedPeers[conn.GetKey()]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
iw, ok := m.inactivityMonitors[conn.ConnID()]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
peerCfg.Log.Infof("reset inactivity monitor timer")
|
||||||
|
iw.ResetTimer()
|
||||||
|
}
|
||||||
16
client/internal/lazyconn/peercfg.go
Normal file
16
client/internal/lazyconn/peercfg.go
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
package lazyconn
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/netip"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/peer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type PeerConfig struct {
|
||||||
|
PublicKey string
|
||||||
|
AllowedIPs []netip.Prefix
|
||||||
|
PeerConnID peer.ConnID
|
||||||
|
Log *log.Entry
|
||||||
|
}
|
||||||
17
client/internal/lazyconn/wgiface.go
Normal file
17
client/internal/lazyconn/wgiface.go
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
package lazyconn
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net"
|
||||||
|
"net/netip"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/iface/configurer"
|
||||||
|
)
|
||||||
|
|
||||||
|
type WGIface interface {
|
||||||
|
RemovePeer(peerKey string) error
|
||||||
|
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
|
||||||
|
GetStats() (map[string]configurer.WGStats, error)
|
||||||
|
}
|
||||||
@@ -10,6 +10,7 @@ import (
|
|||||||
"runtime"
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
|
||||||
"github.com/pion/ice/v3"
|
"github.com/pion/ice/v3"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
@@ -26,6 +27,8 @@ import (
|
|||||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ConnID unsafe.Pointer
|
||||||
|
|
||||||
type ConnPriority int
|
type ConnPriority int
|
||||||
|
|
||||||
func (cp ConnPriority) String() string {
|
func (cp ConnPriority) String() string {
|
||||||
@@ -83,15 +86,16 @@ type ConnConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type Conn struct {
|
type Conn struct {
|
||||||
log *log.Entry
|
Log *log.Entry
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
ctxCancel context.CancelFunc
|
ctxCancel context.CancelFunc
|
||||||
config ConnConfig
|
config ConnConfig
|
||||||
statusRecorder *Status
|
statusRecorder *Status
|
||||||
signaler *Signaler
|
signaler *Signaler
|
||||||
|
iFaceDiscover stdnet.ExternalIFaceDiscover
|
||||||
relayManager *relayClient.Manager
|
relayManager *relayClient.Manager
|
||||||
handshaker *Handshaker
|
srWatcher *guard.SRWatcher
|
||||||
|
|
||||||
onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)
|
onConnected func(remoteWireGuardKey string, remoteRosenpassPubKey []byte, wireGuardIP string, remoteRosenpassAddr string)
|
||||||
onDisconnected func(remotePeer string)
|
onDisconnected func(remotePeer string)
|
||||||
@@ -111,95 +115,110 @@ type Conn struct {
|
|||||||
|
|
||||||
wgProxyICE wgproxy.Proxy
|
wgProxyICE wgproxy.Proxy
|
||||||
wgProxyRelay wgproxy.Proxy
|
wgProxyRelay wgproxy.Proxy
|
||||||
|
handshaker *Handshaker
|
||||||
|
|
||||||
guard *guard.Guard
|
guard *guard.Guard
|
||||||
semaphore *semaphoregroup.SemaphoreGroup
|
semaphore *semaphoregroup.SemaphoreGroup
|
||||||
|
wg sync.WaitGroup
|
||||||
|
peerConnDispatcher *ConnectionDispatcher
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConn creates a new not opened Conn to the remote peer.
|
// NewConn creates a new not opened Conn to the remote peer.
|
||||||
// To establish a connection run Conn.Open
|
// To establish a connection run Conn.Open
|
||||||
func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup) (*Conn, error) {
|
func NewConn(config ConnConfig, statusRecorder *Status, signaler *Signaler, iFaceDiscover stdnet.ExternalIFaceDiscover, relayManager *relayClient.Manager, srWatcher *guard.SRWatcher, semaphore *semaphoregroup.SemaphoreGroup, peerConnDispatcher *ConnectionDispatcher) (*Conn, error) {
|
||||||
if len(config.WgConfig.AllowedIps) == 0 {
|
if len(config.WgConfig.AllowedIps) == 0 {
|
||||||
return nil, fmt.Errorf("allowed IPs is empty")
|
return nil, fmt.Errorf("allowed IPs is empty")
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, ctxCancel := context.WithCancel(engineCtx)
|
|
||||||
connLog := log.WithField("peer", config.Key)
|
connLog := log.WithField("peer", config.Key)
|
||||||
|
|
||||||
var conn = &Conn{
|
var conn = &Conn{
|
||||||
log: connLog,
|
Log: connLog,
|
||||||
ctx: ctx,
|
config: config,
|
||||||
ctxCancel: ctxCancel,
|
statusRecorder: statusRecorder,
|
||||||
config: config,
|
signaler: signaler,
|
||||||
statusRecorder: statusRecorder,
|
iFaceDiscover: iFaceDiscover,
|
||||||
signaler: signaler,
|
relayManager: relayManager,
|
||||||
relayManager: relayManager,
|
srWatcher: srWatcher,
|
||||||
statusRelay: NewAtomicConnStatus(),
|
statusRelay: NewAtomicConnStatus(),
|
||||||
statusICE: NewAtomicConnStatus(),
|
statusICE: NewAtomicConnStatus(),
|
||||||
semaphore: semaphore,
|
semaphore: semaphore,
|
||||||
|
peerConnDispatcher: peerConnDispatcher,
|
||||||
}
|
}
|
||||||
|
|
||||||
ctrl := isController(config)
|
|
||||||
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, conn, relayManager)
|
|
||||||
|
|
||||||
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
|
||||||
workerICE, err := NewWorkerICE(ctx, connLog, config, conn, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
conn.workerICE = workerICE
|
|
||||||
|
|
||||||
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
|
|
||||||
|
|
||||||
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
|
|
||||||
if os.Getenv("NB_FORCE_RELAY") != "true" {
|
|
||||||
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher)
|
|
||||||
|
|
||||||
go conn.handshaker.Listen()
|
|
||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open opens connection to the remote peer
|
// Open opens connection to the remote peer
|
||||||
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
// It will try to establish a connection using ICE and in parallel with relay. The higher priority connection type will
|
||||||
// be used.
|
// be used.
|
||||||
func (conn *Conn) Open() {
|
func (conn *Conn) Open(engineCtx context.Context) error {
|
||||||
conn.semaphore.Add(conn.ctx)
|
conn.semaphore.Add(engineCtx)
|
||||||
conn.log.Debugf("open connection to peer")
|
|
||||||
|
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
conn.opened = true
|
|
||||||
|
if conn.opened {
|
||||||
|
conn.semaphore.Done(engineCtx)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.ctx, conn.ctxCancel = context.WithCancel(engineCtx)
|
||||||
|
|
||||||
|
ctrl := isController(conn.config)
|
||||||
|
|
||||||
|
conn.workerRelay = NewWorkerRelay(conn.Log, ctrl, conn.config, conn, conn.relayManager)
|
||||||
|
|
||||||
|
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
||||||
|
workerICE, err := NewWorkerICE(conn.ctx, conn.Log, conn.config, conn, conn.signaler, conn.iFaceDiscover, conn.statusRecorder, relayIsSupportedLocally)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
conn.workerICE = workerICE
|
||||||
|
|
||||||
|
conn.handshaker = NewHandshaker(conn.Log, conn.config, conn.signaler, conn.workerICE, conn.workerRelay)
|
||||||
|
|
||||||
|
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
|
||||||
|
if os.Getenv("NB_FORCE_RELAY") != "true" {
|
||||||
|
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn.guard = guard.NewGuard(conn.Log, ctrl, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
|
||||||
|
|
||||||
|
conn.wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer conn.wg.Done()
|
||||||
|
conn.handshaker.Listen(conn.ctx)
|
||||||
|
}()
|
||||||
|
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
IP: conn.config.WgConfig.AllowedIps[0].Addr().String(),
|
|
||||||
ConnStatusUpdate: time.Now(),
|
ConnStatusUpdate: time.Now(),
|
||||||
ConnStatus: StatusDisconnected,
|
ConnStatus: StatusConnecting,
|
||||||
Mux: new(sync.RWMutex),
|
Mux: new(sync.RWMutex),
|
||||||
}
|
}
|
||||||
err := conn.statusRecorder.UpdatePeerState(peerState)
|
if err := conn.statusRecorder.UpdatePeerState(peerState); err != nil {
|
||||||
if err != nil {
|
conn.Log.Warnf("error while updating the state err: %v", err)
|
||||||
conn.log.Warnf("error while updating the state err: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go conn.startHandshakeAndReconnect(conn.ctx)
|
conn.wg.Add(1)
|
||||||
}
|
go func() {
|
||||||
|
defer conn.wg.Done()
|
||||||
|
conn.waitInitialRandomSleepTime(conn.ctx)
|
||||||
|
conn.semaphore.Done(conn.ctx)
|
||||||
|
|
||||||
func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) {
|
if err := conn.handshaker.sendOffer(); err != nil {
|
||||||
defer conn.semaphore.Done(conn.ctx)
|
conn.Log.Errorf("failed to send initial offer: %v", err)
|
||||||
conn.waitInitialRandomSleepTime(ctx)
|
}
|
||||||
|
|
||||||
err := conn.handshaker.sendOffer()
|
conn.wg.Add(1)
|
||||||
if err != nil {
|
go func() {
|
||||||
conn.log.Errorf("failed to send initial offer: %v", err)
|
conn.guard.Start(conn.ctx, conn.onGuardEvent)
|
||||||
}
|
conn.wg.Done()
|
||||||
|
}()
|
||||||
go conn.guard.Start(ctx)
|
}()
|
||||||
go conn.listenGuardEvent(ctx)
|
conn.opened = true
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close closes this peer Conn issuing a close event to the Conn closeCh
|
// Close closes this peer Conn issuing a close event to the Conn closeCh
|
||||||
@@ -207,11 +226,11 @@ func (conn *Conn) Close() {
|
|||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
conn.log.Infof("close peer connection")
|
conn.Log.Infof("close peer connection")
|
||||||
conn.ctxCancel()
|
conn.ctxCancel()
|
||||||
|
|
||||||
if !conn.opened {
|
if !conn.opened {
|
||||||
conn.log.Debugf("ignore close connection to peer")
|
conn.Log.Debugf("ignore close connection to peer")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -222,7 +241,7 @@ func (conn *Conn) Close() {
|
|||||||
if conn.wgProxyRelay != nil {
|
if conn.wgProxyRelay != nil {
|
||||||
err := conn.wgProxyRelay.CloseConn()
|
err := conn.wgProxyRelay.CloseConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.log.Errorf("failed to close wg proxy for relay: %v", err)
|
conn.Log.Errorf("failed to close wg proxy for relay: %v", err)
|
||||||
}
|
}
|
||||||
conn.wgProxyRelay = nil
|
conn.wgProxyRelay = nil
|
||||||
}
|
}
|
||||||
@@ -230,13 +249,13 @@ func (conn *Conn) Close() {
|
|||||||
if conn.wgProxyICE != nil {
|
if conn.wgProxyICE != nil {
|
||||||
err := conn.wgProxyICE.CloseConn()
|
err := conn.wgProxyICE.CloseConn()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.log.Errorf("failed to close wg proxy for ice: %v", err)
|
conn.Log.Errorf("failed to close wg proxy for ice: %v", err)
|
||||||
}
|
}
|
||||||
conn.wgProxyICE = nil
|
conn.wgProxyICE = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := conn.removeWgPeer(); err != nil {
|
if err := conn.removeWgPeer(); err != nil {
|
||||||
conn.log.Errorf("failed to remove wg endpoint: %v", err)
|
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.freeUpConnID()
|
conn.freeUpConnID()
|
||||||
@@ -246,12 +265,15 @@ func (conn *Conn) Close() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
conn.setStatusToDisconnected()
|
conn.setStatusToDisconnected()
|
||||||
|
conn.opened = false
|
||||||
|
conn.wg.Wait()
|
||||||
|
conn.Log.Infof("peer connection closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
// OnRemoteAnswer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
|
||||||
// doesn't block, discards the message if connection wasn't ready
|
// doesn't block, discards the message if connection wasn't ready
|
||||||
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool {
|
func (conn *Conn) OnRemoteAnswer(answer OfferAnswer) bool {
|
||||||
conn.log.Debugf("OnRemoteAnswer, status ICE: %s, status relay: %s", conn.statusICE, conn.statusRelay)
|
conn.Log.Debugf("OnRemoteAnswer, status ICE: %s, status relay: %s", conn.statusICE, conn.statusRelay)
|
||||||
return conn.handshaker.OnRemoteAnswer(answer)
|
return conn.handshaker.OnRemoteAnswer(answer)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -278,7 +300,7 @@ func (conn *Conn) SetOnDisconnected(handler func(remotePeer string)) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool {
|
func (conn *Conn) OnRemoteOffer(offer OfferAnswer) bool {
|
||||||
conn.log.Debugf("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
|
conn.Log.Debugf("OnRemoteOffer, on status ICE: %s, status Relay: %s", conn.statusICE, conn.statusRelay)
|
||||||
return conn.handshaker.OnRemoteOffer(offer)
|
return conn.handshaker.OnRemoteOffer(offer)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -298,30 +320,30 @@ func (conn *Conn) GetKey() string {
|
|||||||
return conn.config.Key
|
return conn.config.Key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (conn *Conn) ConnID() ConnID {
|
||||||
|
return ConnID(conn)
|
||||||
|
}
|
||||||
|
|
||||||
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
||||||
func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
|
func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
if conn.ctx.Err() != nil {
|
if remoteConnNil(conn.Log, iceConnInfo.RemoteConn) {
|
||||||
return
|
conn.Log.Errorf("remote ICE connection is nil")
|
||||||
}
|
|
||||||
|
|
||||||
if remoteConnNil(conn.log, iceConnInfo.RemoteConn) {
|
|
||||||
conn.log.Errorf("remote ICE connection is nil")
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// this never should happen, because Relay is the lower priority and ICE always close the deprecated connection before upgrade
|
// this never should happen, because Relay is the lower priority and ICE always close the deprecated connection before upgrade
|
||||||
// todo consider to remove this check
|
// todo consider to remove this check
|
||||||
if conn.currentConnPriority > priority {
|
if conn.currentConnPriority > priority {
|
||||||
conn.log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
|
conn.Log.Infof("current connection priority (%s) is higher than the new one (%s), do not upgrade connection", conn.currentConnPriority, priority)
|
||||||
conn.statusICE.Set(StatusConnected)
|
conn.statusICE.Set(StatusConnected)
|
||||||
conn.updateIceState(iceConnInfo)
|
conn.updateIceState(iceConnInfo)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.log.Infof("set ICE to active connection")
|
conn.Log.Infof("set ICE to active connection")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ep *net.UDPAddr
|
ep *net.UDPAddr
|
||||||
@@ -331,7 +353,7 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
|
|||||||
if iceConnInfo.RelayedOnLocal {
|
if iceConnInfo.RelayedOnLocal {
|
||||||
wgProxy, err = conn.newProxy(iceConnInfo.RemoteConn)
|
wgProxy, err = conn.newProxy(iceConnInfo.RemoteConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ep = wgProxy.EndpointAddr()
|
ep = wgProxy.EndpointAddr()
|
||||||
@@ -347,7 +369,7 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := conn.runBeforeAddPeerHooks(ep.IP); err != nil {
|
if err := conn.runBeforeAddPeerHooks(ep.IP); err != nil {
|
||||||
conn.log.Errorf("Before add peer hook failed: %v", err)
|
conn.Log.Errorf("Before add peer hook failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.workerRelay.DisableWgWatcher()
|
conn.workerRelay.DisableWgWatcher()
|
||||||
@@ -365,48 +387,51 @@ func (conn *Conn) onICEConnectionIsReady(priority ConnPriority, iceConnInfo ICEC
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
wgConfigWorkaround()
|
wgConfigWorkaround()
|
||||||
|
|
||||||
|
oldState := conn.currentConnPriority
|
||||||
conn.currentConnPriority = priority
|
conn.currentConnPriority = priority
|
||||||
conn.statusICE.Set(StatusConnected)
|
conn.statusICE.Set(StatusConnected)
|
||||||
conn.updateIceState(iceConnInfo)
|
conn.updateIceState(iceConnInfo)
|
||||||
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
|
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
|
||||||
|
|
||||||
|
if oldState == connPriorityNone {
|
||||||
|
conn.peerConnDispatcher.NotifyConnected(conn)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) onICEStateDisconnected() {
|
func (conn *Conn) onICEStateDisconnected() {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
if conn.ctx.Err() != nil {
|
conn.Log.Tracef("ICE connection state changed to disconnected")
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.log.Tracef("ICE connection state changed to disconnected")
|
|
||||||
|
|
||||||
if conn.wgProxyICE != nil {
|
if conn.wgProxyICE != nil {
|
||||||
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
||||||
conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// switch back to relay connection
|
// switch back to relay connection
|
||||||
if conn.isReadyToUpgrade() {
|
if conn.isReadyToUpgrade() {
|
||||||
conn.log.Infof("ICE disconnected, set Relay to active connection")
|
conn.Log.Infof("ICE disconnected, set Relay to active connection")
|
||||||
conn.wgProxyRelay.Work()
|
conn.wgProxyRelay.Work()
|
||||||
|
|
||||||
if err := conn.configureWGEndpoint(conn.wgProxyRelay.EndpointAddr()); err != nil {
|
if err := conn.configureWGEndpoint(conn.wgProxyRelay.EndpointAddr()); err != nil {
|
||||||
conn.log.Errorf("failed to switch to relay conn: %v", err)
|
conn.Log.Errorf("failed to switch to relay conn: %v", err)
|
||||||
}
|
}
|
||||||
conn.workerRelay.EnableWgWatcher(conn.ctx)
|
conn.workerRelay.EnableWgWatcher(conn.ctx)
|
||||||
conn.currentConnPriority = connPriorityRelay
|
conn.currentConnPriority = connPriorityRelay
|
||||||
} else {
|
} else {
|
||||||
conn.log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", connPriorityNone.String())
|
conn.Log.Infof("ICE disconnected, do not switch to Relay. Reset priority to: %s", connPriorityNone.String())
|
||||||
conn.currentConnPriority = connPriorityNone
|
conn.currentConnPriority = connPriorityNone
|
||||||
|
conn.peerConnDispatcher.NotifyDisconnected(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
changed := conn.statusICE.Get() != StatusDisconnected
|
changed := conn.statusICE.Get() != StatusIdle
|
||||||
if changed {
|
if changed {
|
||||||
conn.guard.SetICEConnDisconnected()
|
conn.guard.SetICEConnDisconnected()
|
||||||
}
|
}
|
||||||
conn.statusICE.Set(StatusDisconnected)
|
conn.statusICE.Set(StatusIdle)
|
||||||
|
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
@@ -417,7 +442,7 @@ func (conn *Conn) onICEStateDisconnected() {
|
|||||||
|
|
||||||
err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState)
|
err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err)
|
conn.Log.Warnf("unable to set peer's state to disconnected ice, got error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -425,25 +450,18 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
|||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
if conn.ctx.Err() != nil {
|
conn.Log.Debugf("Relay connection has been established, setup the WireGuard")
|
||||||
if err := rci.relayedConn.Close(); err != nil {
|
|
||||||
conn.log.Warnf("failed to close unnecessary relayed connection: %v", err)
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.log.Debugf("Relay connection has been established, setup the WireGuard")
|
|
||||||
|
|
||||||
wgProxy, err := conn.newProxy(rci.relayedConn)
|
wgProxy, err := conn.newProxy(rci.relayedConn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
|
conn.Log.Errorf("failed to add relayed net.Conn to local proxy: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
|
conn.Log.Infof("created new wgProxy for relay connection: %s", wgProxy.EndpointAddr().String())
|
||||||
|
|
||||||
if conn.isICEActive() {
|
if conn.isICEActive() {
|
||||||
conn.log.Infof("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
|
conn.Log.Debugf("do not switch to relay because current priority is: %s", conn.currentConnPriority.String())
|
||||||
conn.setRelayedProxy(wgProxy)
|
conn.setRelayedProxy(wgProxy)
|
||||||
conn.statusRelay.Set(StatusConnected)
|
conn.statusRelay.Set(StatusConnected)
|
||||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
||||||
@@ -451,15 +469,15 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err := conn.runBeforeAddPeerHooks(wgProxy.EndpointAddr().IP); err != nil {
|
if err := conn.runBeforeAddPeerHooks(wgProxy.EndpointAddr().IP); err != nil {
|
||||||
conn.log.Errorf("Before add peer hook failed: %v", err)
|
conn.Log.Errorf("Before add peer hook failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
wgProxy.Work()
|
wgProxy.Work()
|
||||||
if err := conn.configureWGEndpoint(wgProxy.EndpointAddr()); err != nil {
|
if err := conn.configureWGEndpoint(wgProxy.EndpointAddr()); err != nil {
|
||||||
if err := wgProxy.CloseConn(); err != nil {
|
if err := wgProxy.CloseConn(); err != nil {
|
||||||
conn.log.Warnf("Failed to close relay connection: %v", err)
|
conn.Log.Warnf("Failed to close relay connection: %v", err)
|
||||||
}
|
}
|
||||||
conn.log.Errorf("Failed to update WireGuard peer configuration: %v", err)
|
conn.Log.Errorf("Failed to update WireGuard peer configuration: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
conn.workerRelay.EnableWgWatcher(conn.ctx)
|
conn.workerRelay.EnableWgWatcher(conn.ctx)
|
||||||
@@ -469,25 +487,24 @@ func (conn *Conn) onRelayConnectionIsReady(rci RelayConnInfo) {
|
|||||||
conn.statusRelay.Set(StatusConnected)
|
conn.statusRelay.Set(StatusConnected)
|
||||||
conn.setRelayedProxy(wgProxy)
|
conn.setRelayedProxy(wgProxy)
|
||||||
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
|
||||||
conn.log.Infof("start to communicate with peer via relay")
|
conn.Log.Infof("start to communicate with peer via relay")
|
||||||
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
|
||||||
|
conn.peerConnDispatcher.NotifyConnected(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) onRelayDisconnected() {
|
func (conn *Conn) onRelayDisconnected() {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
if conn.ctx.Err() != nil {
|
conn.Log.Debugf("relay connection is disconnected")
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
conn.log.Debugf("relay connection is disconnected")
|
|
||||||
|
|
||||||
if conn.currentConnPriority == connPriorityRelay {
|
if conn.currentConnPriority == connPriorityRelay {
|
||||||
conn.log.Debugf("clean up WireGuard config")
|
conn.Log.Debugf("clean up WireGuard config")
|
||||||
if err := conn.removeWgPeer(); err != nil {
|
if err := conn.removeWgPeer(); err != nil {
|
||||||
conn.log.Errorf("failed to remove wg endpoint: %v", err)
|
conn.Log.Errorf("failed to remove wg endpoint: %v", err)
|
||||||
}
|
}
|
||||||
|
conn.currentConnPriority = connPriorityNone
|
||||||
|
conn.peerConnDispatcher.NotifyDisconnected(conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
if conn.wgProxyRelay != nil {
|
if conn.wgProxyRelay != nil {
|
||||||
@@ -495,11 +512,11 @@ func (conn *Conn) onRelayDisconnected() {
|
|||||||
conn.wgProxyRelay = nil
|
conn.wgProxyRelay = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
changed := conn.statusRelay.Get() != StatusDisconnected
|
changed := conn.statusRelay.Get() != StatusIdle
|
||||||
if changed {
|
if changed {
|
||||||
conn.guard.SetRelayedConnDisconnected()
|
conn.guard.SetRelayedConnDisconnected()
|
||||||
}
|
}
|
||||||
conn.statusRelay.Set(StatusDisconnected)
|
conn.statusRelay.Set(StatusIdle)
|
||||||
|
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
@@ -508,21 +525,14 @@ func (conn *Conn) onRelayDisconnected() {
|
|||||||
ConnStatusUpdate: time.Now(),
|
ConnStatusUpdate: time.Now(),
|
||||||
}
|
}
|
||||||
if err := conn.statusRecorder.UpdatePeerRelayedStateToDisconnected(peerState); err != nil {
|
if err := conn.statusRecorder.UpdatePeerRelayedStateToDisconnected(peerState); err != nil {
|
||||||
conn.log.Warnf("unable to save peer's state to Relay disconnected, got error: %v", err)
|
conn.Log.Warnf("unable to save peer's state to Relay disconnected, got error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) listenGuardEvent(ctx context.Context) {
|
func (conn *Conn) onGuardEvent() {
|
||||||
for {
|
conn.Log.Debugf("send offer to peer")
|
||||||
select {
|
if err := conn.handshaker.SendOffer(); err != nil {
|
||||||
case <-conn.guard.Reconnect:
|
conn.Log.Errorf("failed to send offer: %v", err)
|
||||||
conn.log.Debugf("send offer to peer")
|
|
||||||
if err := conn.handshaker.SendOffer(); err != nil {
|
|
||||||
conn.log.Errorf("failed to send offer: %v", err)
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -548,7 +558,7 @@ func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []by
|
|||||||
|
|
||||||
err := conn.statusRecorder.UpdatePeerRelayedState(peerState)
|
err := conn.statusRecorder.UpdatePeerRelayedState(peerState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.log.Warnf("unable to save peer's Relay state, got error: %v", err)
|
conn.Log.Warnf("unable to save peer's Relay state, got error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -567,17 +577,17 @@ func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
|
|||||||
|
|
||||||
err := conn.statusRecorder.UpdatePeerICEState(peerState)
|
err := conn.statusRecorder.UpdatePeerICEState(peerState)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
conn.log.Warnf("unable to save peer's ICE state, got error: %v", err)
|
conn.Log.Warnf("unable to save peer's ICE state, got error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) setStatusToDisconnected() {
|
func (conn *Conn) setStatusToDisconnected() {
|
||||||
conn.statusRelay.Set(StatusDisconnected)
|
conn.statusRelay.Set(StatusIdle)
|
||||||
conn.statusICE.Set(StatusDisconnected)
|
conn.statusICE.Set(StatusIdle)
|
||||||
|
|
||||||
peerState := State{
|
peerState := State{
|
||||||
PubKey: conn.config.Key,
|
PubKey: conn.config.Key,
|
||||||
ConnStatus: StatusDisconnected,
|
ConnStatus: StatusIdle,
|
||||||
ConnStatusUpdate: time.Now(),
|
ConnStatusUpdate: time.Now(),
|
||||||
Mux: new(sync.RWMutex),
|
Mux: new(sync.RWMutex),
|
||||||
}
|
}
|
||||||
@@ -585,10 +595,10 @@ func (conn *Conn) setStatusToDisconnected() {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
// pretty common error because by that time Engine can already remove the peer and status won't be available.
|
// pretty common error because by that time Engine can already remove the peer and status won't be available.
|
||||||
// todo rethink status updates
|
// todo rethink status updates
|
||||||
conn.log.Debugf("error while updating peer's state, err: %v", err)
|
conn.Log.Debugf("error while updating peer's state, err: %v", err)
|
||||||
}
|
}
|
||||||
if err := conn.statusRecorder.UpdateWireGuardPeerState(conn.config.Key, configurer.WGStats{}); err != nil {
|
if err := conn.statusRecorder.UpdateWireGuardPeerState(conn.config.Key, configurer.WGStats{}); err != nil {
|
||||||
conn.log.Debugf("failed to reset wireguard stats for peer: %s", err)
|
conn.Log.Debugf("failed to reset wireguard stats for peer: %s", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -616,7 +626,7 @@ func (conn *Conn) waitInitialRandomSleepTime(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) isRelayed() bool {
|
func (conn *Conn) isRelayed() bool {
|
||||||
if conn.statusRelay.Get() == StatusDisconnected && (conn.statusICE.Get() == StatusDisconnected || conn.statusICE.Get() == StatusConnecting) {
|
if conn.statusRelay.Get() == StatusIdle && (conn.statusICE.Get() == StatusIdle || conn.statusICE.Get() == StatusConnecting) {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -636,7 +646,7 @@ func (conn *Conn) evalStatus() ConnStatus {
|
|||||||
return StatusConnecting
|
return StatusConnecting
|
||||||
}
|
}
|
||||||
|
|
||||||
return StatusDisconnected
|
return StatusIdle
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) isConnectedOnAllWay() (connected bool) {
|
func (conn *Conn) isConnectedOnAllWay() (connected bool) {
|
||||||
@@ -649,7 +659,7 @@ func (conn *Conn) isConnectedOnAllWay() (connected bool) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if conn.statusICE.Get() == StatusDisconnected {
|
if conn.statusICE.Get() == StatusIdle {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -676,7 +686,7 @@ func (conn *Conn) freeUpConnID() {
|
|||||||
if conn.connIDRelay != "" {
|
if conn.connIDRelay != "" {
|
||||||
for _, hook := range conn.afterRemovePeerHooks {
|
for _, hook := range conn.afterRemovePeerHooks {
|
||||||
if err := hook(conn.connIDRelay); err != nil {
|
if err := hook(conn.connIDRelay); err != nil {
|
||||||
conn.log.Errorf("After remove peer hook failed: %v", err)
|
conn.Log.Errorf("After remove peer hook failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.connIDRelay = ""
|
conn.connIDRelay = ""
|
||||||
@@ -685,7 +695,7 @@ func (conn *Conn) freeUpConnID() {
|
|||||||
if conn.connIDICE != "" {
|
if conn.connIDICE != "" {
|
||||||
for _, hook := range conn.afterRemovePeerHooks {
|
for _, hook := range conn.afterRemovePeerHooks {
|
||||||
if err := hook(conn.connIDICE); err != nil {
|
if err := hook(conn.connIDICE); err != nil {
|
||||||
conn.log.Errorf("After remove peer hook failed: %v", err)
|
conn.Log.Errorf("After remove peer hook failed: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.connIDICE = ""
|
conn.connIDICE = ""
|
||||||
@@ -693,7 +703,7 @@ func (conn *Conn) freeUpConnID() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
|
func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
|
||||||
conn.log.Debugf("setup proxied WireGuard connection")
|
conn.Log.Debugf("setup proxied WireGuard connection")
|
||||||
udpAddr := &net.UDPAddr{
|
udpAddr := &net.UDPAddr{
|
||||||
IP: conn.config.WgConfig.AllowedIps[0].Addr().AsSlice(),
|
IP: conn.config.WgConfig.AllowedIps[0].Addr().AsSlice(),
|
||||||
Port: conn.config.WgConfig.WgListenPort,
|
Port: conn.config.WgConfig.WgListenPort,
|
||||||
@@ -701,7 +711,7 @@ func (conn *Conn) newProxy(remoteConn net.Conn) (wgproxy.Proxy, error) {
|
|||||||
|
|
||||||
wgProxy := conn.config.WgConfig.WgInterface.GetProxy()
|
wgProxy := conn.config.WgConfig.WgInterface.GetProxy()
|
||||||
if err := wgProxy.AddTurnConn(conn.ctx, udpAddr, remoteConn); err != nil {
|
if err := wgProxy.AddTurnConn(conn.ctx, udpAddr, remoteConn); err != nil {
|
||||||
conn.log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
conn.Log.Errorf("failed to add turn net.Conn to local proxy: %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return wgProxy, nil
|
return wgProxy, nil
|
||||||
@@ -720,10 +730,10 @@ func (conn *Conn) removeWgPeer() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
|
func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
|
||||||
conn.log.Warnf("Failed to update wg peer configuration: %v", err)
|
conn.Log.Warnf("Failed to update wg peer configuration: %v", err)
|
||||||
if wgProxy != nil {
|
if wgProxy != nil {
|
||||||
if ierr := wgProxy.CloseConn(); ierr != nil {
|
if ierr := wgProxy.CloseConn(); ierr != nil {
|
||||||
conn.log.Warnf("Failed to close wg proxy: %v", ierr)
|
conn.Log.Warnf("Failed to close wg proxy: %v", ierr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if conn.wgProxyRelay != nil {
|
if conn.wgProxyRelay != nil {
|
||||||
@@ -733,16 +743,16 @@ func (conn *Conn) handleConfigurationFailure(err error, wgProxy wgproxy.Proxy) {
|
|||||||
|
|
||||||
func (conn *Conn) logTraceConnState() {
|
func (conn *Conn) logTraceConnState() {
|
||||||
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
|
if conn.workerRelay.IsRelayConnectionSupportedWithPeer() {
|
||||||
conn.log.Tracef("connectivity guard check, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
|
conn.Log.Tracef("connectivity guard check, relay state: %s, ice state: %s", conn.statusRelay, conn.statusICE)
|
||||||
} else {
|
} else {
|
||||||
conn.log.Tracef("connectivity guard check, ice state: %s", conn.statusICE)
|
conn.Log.Tracef("connectivity guard check, ice state: %s", conn.statusICE)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
|
func (conn *Conn) setRelayedProxy(proxy wgproxy.Proxy) {
|
||||||
if conn.wgProxyRelay != nil {
|
if conn.wgProxyRelay != nil {
|
||||||
if err := conn.wgProxyRelay.CloseConn(); err != nil {
|
if err := conn.wgProxyRelay.CloseConn(); err != nil {
|
||||||
conn.log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
conn.Log.Warnf("failed to close deprecated wg proxy conn: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
conn.wgProxyRelay = proxy
|
conn.wgProxyRelay = proxy
|
||||||
|
|||||||
@@ -7,12 +7,12 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// StatusConnected indicate the peer is in connected state
|
// StatusIdle indicate the peer is in disconnected state
|
||||||
StatusConnected ConnStatus = iota
|
StatusIdle ConnStatus = iota
|
||||||
// StatusConnecting indicate the peer is in connecting state
|
// StatusConnecting indicate the peer is in connecting state
|
||||||
StatusConnecting
|
StatusConnecting
|
||||||
// StatusDisconnected indicate the peer is in disconnected state
|
// StatusConnected indicate the peer is in connected state
|
||||||
StatusDisconnected
|
StatusConnected
|
||||||
)
|
)
|
||||||
|
|
||||||
// ConnStatus describe the status of a peer's connection
|
// ConnStatus describe the status of a peer's connection
|
||||||
@@ -26,7 +26,7 @@ type AtomicConnStatus struct {
|
|||||||
// NewAtomicConnStatus creates a new AtomicConnStatus with the given initial status
|
// NewAtomicConnStatus creates a new AtomicConnStatus with the given initial status
|
||||||
func NewAtomicConnStatus() *AtomicConnStatus {
|
func NewAtomicConnStatus() *AtomicConnStatus {
|
||||||
acs := &AtomicConnStatus{}
|
acs := &AtomicConnStatus{}
|
||||||
acs.Set(StatusDisconnected)
|
acs.Set(StatusIdle)
|
||||||
return acs
|
return acs
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -51,8 +51,8 @@ func (s ConnStatus) String() string {
|
|||||||
return "Connecting"
|
return "Connecting"
|
||||||
case StatusConnected:
|
case StatusConnected:
|
||||||
return "Connected"
|
return "Connected"
|
||||||
case StatusDisconnected:
|
case StatusIdle:
|
||||||
return "Disconnected"
|
return "Idle"
|
||||||
default:
|
default:
|
||||||
log.Errorf("unknown status: %d", s)
|
log.Errorf("unknown status: %d", s)
|
||||||
return "INVALID_PEER_CONNECTION_STATUS"
|
return "INVALID_PEER_CONNECTION_STATUS"
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ func TestConnStatus_String(t *testing.T) {
|
|||||||
want string
|
want string
|
||||||
}{
|
}{
|
||||||
{"StatusConnected", StatusConnected, "Connected"},
|
{"StatusConnected", StatusConnected, "Connected"},
|
||||||
{"StatusDisconnected", StatusDisconnected, "Disconnected"},
|
{"StatusIdle", StatusIdle, "Idle"},
|
||||||
{"StatusConnecting", StatusConnecting, "Connecting"},
|
{"StatusConnecting", StatusConnecting, "Connecting"},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package peer
|
package peer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
|
||||||
"os"
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
@@ -17,6 +16,8 @@ import (
|
|||||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var dispatcher = NewConnectionDispatcher()
|
||||||
|
|
||||||
var connConf = ConnConfig{
|
var connConf = ConnConfig{
|
||||||
Key: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
|
Key: "LLHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
|
||||||
LocalKey: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
|
LocalKey: "RRHf3Ma6z6mdLbriAJbqhX7+nM/B71lgw2+91q3LfhU=",
|
||||||
@@ -47,7 +48,7 @@ func TestNewConn_interfaceFilter(t *testing.T) {
|
|||||||
|
|
||||||
func TestConn_GetKey(t *testing.T) {
|
func TestConn_GetKey(t *testing.T) {
|
||||||
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
||||||
conn, err := NewConn(context.Background(), connConf, nil, nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
|
conn, err := NewConn(connConf, nil, nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -59,7 +60,7 @@ func TestConn_GetKey(t *testing.T) {
|
|||||||
|
|
||||||
func TestConn_OnRemoteOffer(t *testing.T) {
|
func TestConn_OnRemoteOffer(t *testing.T) {
|
||||||
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
||||||
conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
|
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -93,7 +94,7 @@ func TestConn_OnRemoteOffer(t *testing.T) {
|
|||||||
|
|
||||||
func TestConn_OnRemoteAnswer(t *testing.T) {
|
func TestConn_OnRemoteAnswer(t *testing.T) {
|
||||||
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
||||||
conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
|
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -126,7 +127,7 @@ func TestConn_OnRemoteAnswer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
func TestConn_Status(t *testing.T) {
|
func TestConn_Status(t *testing.T) {
|
||||||
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
swWatcher := guard.NewSRWatcher(nil, nil, nil, connConf.ICEConfig)
|
||||||
conn, err := NewConn(context.Background(), connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1))
|
conn, err := NewConn(connConf, NewRecorder("https://mgm"), nil, nil, nil, swWatcher, semaphoregroup.NewSemaphoreGroup(1), dispatcher)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -138,11 +139,11 @@ func TestConn_Status(t *testing.T) {
|
|||||||
want ConnStatus
|
want ConnStatus
|
||||||
}{
|
}{
|
||||||
{"StatusConnected", StatusConnected, StatusConnected, StatusConnected},
|
{"StatusConnected", StatusConnected, StatusConnected, StatusConnected},
|
||||||
{"StatusDisconnected", StatusDisconnected, StatusDisconnected, StatusDisconnected},
|
{"StatusIdle", StatusIdle, StatusIdle, StatusIdle},
|
||||||
{"StatusConnecting", StatusConnecting, StatusConnecting, StatusConnecting},
|
{"StatusConnecting", StatusConnecting, StatusConnecting, StatusConnecting},
|
||||||
{"StatusConnectingIce", StatusConnecting, StatusDisconnected, StatusConnecting},
|
{"StatusConnectingIce", StatusConnecting, StatusIdle, StatusConnecting},
|
||||||
{"StatusConnectingIceAlternative", StatusConnecting, StatusConnected, StatusConnected},
|
{"StatusConnectingIceAlternative", StatusConnecting, StatusConnected, StatusConnected},
|
||||||
{"StatusConnectingRelay", StatusDisconnected, StatusConnecting, StatusConnecting},
|
{"StatusConnectingRelay", StatusIdle, StatusConnecting, StatusConnecting},
|
||||||
{"StatusConnectingRelayAlternative", StatusConnected, StatusConnecting, StatusConnected},
|
{"StatusConnectingRelayAlternative", StatusConnected, StatusConnecting, StatusConnected},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
50
client/internal/peer/dispatcher.go
Normal file
50
client/internal/peer/dispatcher.go
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
package peer
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ConnectionListener struct {
|
||||||
|
OnConnected func(peer *Conn)
|
||||||
|
OnDisconnected func(peer *Conn)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ConnectionDispatcher struct {
|
||||||
|
listeners map[*ConnectionListener]struct{}
|
||||||
|
mu sync.Mutex
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewConnectionDispatcher() *ConnectionDispatcher {
|
||||||
|
return &ConnectionDispatcher{
|
||||||
|
listeners: make(map[*ConnectionListener]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnectionDispatcher) AddListener(listener *ConnectionListener) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
e.listeners[listener] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnectionDispatcher) RemoveListener(listener *ConnectionListener) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
|
||||||
|
delete(e.listeners, listener)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnectionDispatcher) NotifyConnected(peer *Conn) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
for listener, _ := range e.listeners {
|
||||||
|
listener.OnConnected(peer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *ConnectionDispatcher) NotifyDisconnected(peer *Conn) {
|
||||||
|
e.mu.Lock()
|
||||||
|
defer e.mu.Unlock()
|
||||||
|
for listener, _ := range e.listeners {
|
||||||
|
listener.OnDisconnected(peer)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -46,11 +46,11 @@ func NewGuard(log *log.Entry, isController bool, isConnectedFn isConnectedFunc,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Guard) Start(ctx context.Context) {
|
func (g *Guard) Start(ctx context.Context, eventCallback func()) {
|
||||||
if g.isController {
|
if g.isController {
|
||||||
g.reconnectLoopWithRetry(ctx)
|
g.reconnectLoopWithRetry(ctx, eventCallback)
|
||||||
} else {
|
} else {
|
||||||
g.listenForDisconnectEvents(ctx)
|
g.listenForDisconnectEvents(ctx, eventCallback)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -70,7 +70,7 @@ func (g *Guard) SetICEConnDisconnected() {
|
|||||||
|
|
||||||
// reconnectLoopWithRetry periodically check (max 30 min) the connection status.
|
// reconnectLoopWithRetry periodically check (max 30 min) the connection status.
|
||||||
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
|
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
|
||||||
func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
|
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
|
||||||
waitForInitialConnectionTry(ctx)
|
waitForInitialConnectionTry(ctx)
|
||||||
|
|
||||||
srReconnectedChan := g.srWatcher.NewListener()
|
srReconnectedChan := g.srWatcher.NewListener()
|
||||||
@@ -93,7 +93,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !g.isConnectedOnAllWay() {
|
if !g.isConnectedOnAllWay() {
|
||||||
g.triggerOfferSending()
|
callback()
|
||||||
}
|
}
|
||||||
|
|
||||||
case <-g.relayedConnDisconnected:
|
case <-g.relayedConnDisconnected:
|
||||||
@@ -125,7 +125,7 @@ func (g *Guard) reconnectLoopWithRetry(ctx context.Context) {
|
|||||||
// when the connection is lost. It will try to establish a connection only once time if before the connection was established
|
// when the connection is lost. It will try to establish a connection only once time if before the connection was established
|
||||||
// It track separately the ice and relay connection status. Just because a lower priority connection reestablished it does not
|
// It track separately the ice and relay connection status. Just because a lower priority connection reestablished it does not
|
||||||
// mean that to switch to it. We always force to use the higher priority connection.
|
// mean that to switch to it. We always force to use the higher priority connection.
|
||||||
func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
|
func (g *Guard) listenForDisconnectEvents(ctx context.Context, callback func()) {
|
||||||
srReconnectedChan := g.srWatcher.NewListener()
|
srReconnectedChan := g.srWatcher.NewListener()
|
||||||
defer g.srWatcher.RemoveListener(srReconnectedChan)
|
defer g.srWatcher.RemoveListener(srReconnectedChan)
|
||||||
|
|
||||||
@@ -134,12 +134,12 @@ func (g *Guard) listenForDisconnectEvents(ctx context.Context) {
|
|||||||
select {
|
select {
|
||||||
case <-g.relayedConnDisconnected:
|
case <-g.relayedConnDisconnected:
|
||||||
g.log.Debugf("Relay connection changed, triggering reconnect")
|
g.log.Debugf("Relay connection changed, triggering reconnect")
|
||||||
g.triggerOfferSending()
|
callback()
|
||||||
case <-g.iCEConnDisconnected:
|
case <-g.iCEConnDisconnected:
|
||||||
g.log.Debugf("ICE state changed, try to send new offer")
|
g.log.Debugf("ICE state changed, try to send new offer")
|
||||||
g.triggerOfferSending()
|
callback()
|
||||||
case <-srReconnectedChan:
|
case <-srReconnectedChan:
|
||||||
g.triggerOfferSending()
|
callback()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
g.log.Debugf("context is done, stop reconnect loop")
|
g.log.Debugf("context is done, stop reconnect loop")
|
||||||
return
|
return
|
||||||
@@ -164,13 +164,6 @@ func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
|
|||||||
return ticker
|
return ticker
|
||||||
}
|
}
|
||||||
|
|
||||||
func (g *Guard) triggerOfferSending() {
|
|
||||||
select {
|
|
||||||
case g.Reconnect <- struct{}{}:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Give chance to the peer to establish the initial connection.
|
// Give chance to the peer to establish the initial connection.
|
||||||
// With it, we can decrease to send necessary offer
|
// With it, we can decrease to send necessary offer
|
||||||
func waitForInitialConnectionTry(ctx context.Context) {
|
func waitForInitialConnectionTry(ctx context.Context) {
|
||||||
|
|||||||
@@ -43,7 +43,6 @@ type OfferAnswer struct {
|
|||||||
|
|
||||||
type Handshaker struct {
|
type Handshaker struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
ctx context.Context
|
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
config ConnConfig
|
config ConnConfig
|
||||||
signaler *Signaler
|
signaler *Signaler
|
||||||
@@ -57,9 +56,8 @@ type Handshaker struct {
|
|||||||
remoteAnswerCh chan OfferAnswer
|
remoteAnswerCh chan OfferAnswer
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
|
func NewHandshaker(log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
|
||||||
return &Handshaker{
|
return &Handshaker{
|
||||||
ctx: ctx,
|
|
||||||
log: log,
|
log: log,
|
||||||
config: config,
|
config: config,
|
||||||
signaler: signaler,
|
signaler: signaler,
|
||||||
@@ -74,10 +72,10 @@ func (h *Handshaker) AddOnNewOfferListener(offer func(remoteOfferAnswer *OfferAn
|
|||||||
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
|
h.onNewOfferListeners = append(h.onNewOfferListeners, offer)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handshaker) Listen() {
|
func (h *Handshaker) Listen(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
h.log.Debugf("wait for remote offer confirmation")
|
h.log.Debugf("wait for remote offer confirmation")
|
||||||
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation()
|
remoteOfferAnswer, err := h.waitForRemoteOfferConfirmation(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
var connectionClosedError *ConnectionClosedError
|
var connectionClosedError *ConnectionClosedError
|
||||||
if errors.As(err, &connectionClosedError) {
|
if errors.As(err, &connectionClosedError) {
|
||||||
@@ -127,7 +125,7 @@ func (h *Handshaker) OnRemoteAnswer(answer OfferAnswer) bool {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
|
func (h *Handshaker) waitForRemoteOfferConfirmation(ctx context.Context) (*OfferAnswer, error) {
|
||||||
select {
|
select {
|
||||||
case remoteOfferAnswer := <-h.remoteOffersCh:
|
case remoteOfferAnswer := <-h.remoteOffersCh:
|
||||||
// received confirmation from the remote peer -> ready to proceed
|
// received confirmation from the remote peer -> ready to proceed
|
||||||
@@ -138,7 +136,7 @@ func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
|
|||||||
return &remoteOfferAnswer, nil
|
return &remoteOfferAnswer, nil
|
||||||
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
case remoteOfferAnswer := <-h.remoteAnswerCh:
|
||||||
return &remoteOfferAnswer, nil
|
return &remoteOfferAnswer, nil
|
||||||
case <-h.ctx.Done():
|
case <-ctx.Done():
|
||||||
// closed externally
|
// closed externally
|
||||||
return nil, NewConnectionClosedError(h.config.Key)
|
return nil, NewConnectionClosedError(h.config.Key)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -14,6 +14,6 @@ import (
|
|||||||
type WGIface interface {
|
type WGIface interface {
|
||||||
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
|
UpdatePeer(peerKey string, allowedIps []netip.Prefix, keepAlive time.Duration, endpoint *net.UDPAddr, preSharedKey *wgtypes.Key) error
|
||||||
RemovePeer(peerKey string) error
|
RemovePeer(peerKey string) error
|
||||||
GetStats(peerKey string) (configurer.WGStats, error)
|
GetStats() (map[string]configurer.WGStats, error)
|
||||||
GetProxy() wgproxy.Proxy
|
GetProxy() wgproxy.Proxy
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -205,7 +205,7 @@ func (d *Status) ReplaceOfflinePeers(replacement []State) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// AddPeer adds peer to Daemon status map
|
// AddPeer adds peer to Daemon status map
|
||||||
func (d *Status) AddPeer(peerPubKey string, fqdn string) error {
|
func (d *Status) AddPeer(peerPubKey string, fqdn string, ip string) error {
|
||||||
d.mux.Lock()
|
d.mux.Lock()
|
||||||
defer d.mux.Unlock()
|
defer d.mux.Unlock()
|
||||||
|
|
||||||
@@ -215,7 +215,8 @@ func (d *Status) AddPeer(peerPubKey string, fqdn string) error {
|
|||||||
}
|
}
|
||||||
d.peers[peerPubKey] = State{
|
d.peers[peerPubKey] = State{
|
||||||
PubKey: peerPubKey,
|
PubKey: peerPubKey,
|
||||||
ConnStatus: StatusDisconnected,
|
IP: ip,
|
||||||
|
ConnStatus: StatusIdle,
|
||||||
FQDN: fqdn,
|
FQDN: fqdn,
|
||||||
Mux: new(sync.RWMutex),
|
Mux: new(sync.RWMutex),
|
||||||
}
|
}
|
||||||
@@ -465,9 +466,9 @@ func shouldSkipNotify(receivedConnStatus ConnStatus, curr State) bool {
|
|||||||
switch {
|
switch {
|
||||||
case receivedConnStatus == StatusConnecting:
|
case receivedConnStatus == StatusConnecting:
|
||||||
return true
|
return true
|
||||||
case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusConnecting:
|
case receivedConnStatus == StatusIdle && curr.ConnStatus == StatusConnecting:
|
||||||
return true
|
return true
|
||||||
case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusDisconnected:
|
case receivedConnStatus == StatusIdle && curr.ConnStatus == StatusIdle:
|
||||||
return curr.IP != ""
|
return curr.IP != ""
|
||||||
default:
|
default:
|
||||||
return false
|
return false
|
||||||
|
|||||||
@@ -10,22 +10,24 @@ import (
|
|||||||
|
|
||||||
func TestAddPeer(t *testing.T) {
|
func TestAddPeer(t *testing.T) {
|
||||||
key := "abc"
|
key := "abc"
|
||||||
|
ip := "100.108.254.1"
|
||||||
status := NewRecorder("https://mgm")
|
status := NewRecorder("https://mgm")
|
||||||
err := status.AddPeer(key, "abc.netbird")
|
err := status.AddPeer(key, "abc.netbird", ip)
|
||||||
assert.NoError(t, err, "shouldn't return error")
|
assert.NoError(t, err, "shouldn't return error")
|
||||||
|
|
||||||
_, exists := status.peers[key]
|
_, exists := status.peers[key]
|
||||||
assert.True(t, exists, "value was found")
|
assert.True(t, exists, "value was found")
|
||||||
|
|
||||||
err = status.AddPeer(key, "abc.netbird")
|
err = status.AddPeer(key, "abc.netbird", ip)
|
||||||
|
|
||||||
assert.Error(t, err, "should return error on duplicate")
|
assert.Error(t, err, "should return error on duplicate")
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestGetPeer(t *testing.T) {
|
func TestGetPeer(t *testing.T) {
|
||||||
key := "abc"
|
key := "abc"
|
||||||
|
ip := "100.108.254.1"
|
||||||
status := NewRecorder("https://mgm")
|
status := NewRecorder("https://mgm")
|
||||||
err := status.AddPeer(key, "abc.netbird")
|
err := status.AddPeer(key, "abc.netbird", ip)
|
||||||
assert.NoError(t, err, "shouldn't return error")
|
assert.NoError(t, err, "shouldn't return error")
|
||||||
|
|
||||||
peerStatus, err := status.GetPeer(key)
|
peerStatus, err := status.GetPeer(key)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package peer
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -20,7 +21,7 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type WGInterfaceStater interface {
|
type WGInterfaceStater interface {
|
||||||
GetStats(key string) (configurer.WGStats, error)
|
GetStats() (map[string]configurer.WGStats, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type WGWatcher struct {
|
type WGWatcher struct {
|
||||||
@@ -146,9 +147,13 @@ func (w *WGWatcher) handshakeCheck(lastHandshake time.Time) (*time.Time, bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (w *WGWatcher) wgState() (time.Time, error) {
|
func (w *WGWatcher) wgState() (time.Time, error) {
|
||||||
wgState, err := w.wgIfaceStater.GetStats(w.peerKey)
|
wgStates, err := w.wgIfaceStater.GetStats()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return time.Time{}, err
|
return time.Time{}, err
|
||||||
}
|
}
|
||||||
|
wgState, ok := wgStates[w.peerKey]
|
||||||
|
if !ok {
|
||||||
|
return time.Time{}, fmt.Errorf("peer %s not found in WireGuard endpoints", w.peerKey)
|
||||||
|
}
|
||||||
return wgState.LastHandshake, nil
|
return wgState.LastHandshake, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,26 +11,11 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
type MocWgIface struct {
|
type MocWgIface struct {
|
||||||
initial bool
|
stop bool
|
||||||
lastHandshake time.Time
|
|
||||||
stop bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MocWgIface) GetStats(key string) (configurer.WGStats, error) {
|
func (m *MocWgIface) GetStats() (map[string]configurer.WGStats, error) {
|
||||||
if !m.initial {
|
return map[string]configurer.WGStats{}, nil
|
||||||
m.initial = true
|
|
||||||
return configurer.WGStats{}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if !m.stop {
|
|
||||||
m.lastHandshake = time.Now()
|
|
||||||
}
|
|
||||||
|
|
||||||
stats := configurer.WGStats{
|
|
||||||
LastHandshake: m.lastHandshake,
|
|
||||||
}
|
|
||||||
|
|
||||||
return stats, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MocWgIface) disconnect() {
|
func (m *MocWgIface) disconnect() {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package peerstore
|
package peerstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
@@ -79,6 +80,29 @@ func (s *Store) PeerConn(pubKey string) (*peer.Conn, bool) {
|
|||||||
return p, true
|
return p, true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Store) PeerConnOpen(ctx context.Context, pubKey string) {
|
||||||
|
s.peerConnsMu.RLock()
|
||||||
|
defer s.peerConnsMu.RUnlock()
|
||||||
|
|
||||||
|
p, ok := s.peerConns[pubKey]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// this can be blocked because of the connect open limiter semaphore
|
||||||
|
p.Open(ctx)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Store) PeerConnClose(pubKey string) {
|
||||||
|
s.peerConnsMu.RLock()
|
||||||
|
defer s.peerConnsMu.RUnlock()
|
||||||
|
|
||||||
|
p, ok := s.peerConns[pubKey]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
p.Close()
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) PeersPubKey() []string {
|
func (s *Store) PeersPubKey() []string {
|
||||||
s.peerConnsMu.RLock()
|
s.peerConnsMu.RLock()
|
||||||
defer s.peerConnsMu.RUnlock()
|
defer s.peerConnsMu.RUnlock()
|
||||||
|
|||||||
@@ -4,7 +4,6 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/iface"
|
"github.com/netbirdio/netbird/client/iface"
|
||||||
"github.com/netbirdio/netbird/client/iface/configurer"
|
|
||||||
"github.com/netbirdio/netbird/client/iface/device"
|
"github.com/netbirdio/netbird/client/iface/device"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -18,5 +17,4 @@ type wgIfaceBase interface {
|
|||||||
IsUserspaceBind() bool
|
IsUserspaceBind() bool
|
||||||
GetFilter() device.PacketFilter
|
GetFilter() device.PacketFilter
|
||||||
GetDevice() *device.FilteredDevice
|
GetDevice() *device.FilteredDevice
|
||||||
GetStats(peerKey string) (configurer.WGStats, error)
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user