mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-17 15:56:39 +00:00
Compare commits
7 Commits
debug
...
fix/async-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b33c83c3f8 | ||
|
|
ec5095ba6b | ||
|
|
49a54624f8 | ||
|
|
729bcf2b01 | ||
|
|
a0cdb58303 | ||
|
|
39c99781cb | ||
|
|
01f24907c5 |
3
.github/FUNDING.yml
vendored
Normal file
3
.github/FUNDING.yml
vendored
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
# These are supported funding model platforms
|
||||||
|
|
||||||
|
github: [netbirdio]
|
||||||
@@ -680,7 +680,7 @@ func parsePeers(peers peersStateOutput, rosenpassEnabled, rosenpassPermissive bo
|
|||||||
func skipDetailByFilters(peerState *proto.PeerState, isConnected bool) bool {
|
func skipDetailByFilters(peerState *proto.PeerState, isConnected bool) bool {
|
||||||
statusEval := false
|
statusEval := false
|
||||||
ipEval := false
|
ipEval := false
|
||||||
nameEval := false
|
nameEval := true
|
||||||
|
|
||||||
if statusFilter != "" {
|
if statusFilter != "" {
|
||||||
lowerStatusFilter := strings.ToLower(statusFilter)
|
lowerStatusFilter := strings.ToLower(statusFilter)
|
||||||
@@ -700,11 +700,13 @@ func skipDetailByFilters(peerState *proto.PeerState, isConnected bool) bool {
|
|||||||
|
|
||||||
if len(prefixNamesFilter) > 0 {
|
if len(prefixNamesFilter) > 0 {
|
||||||
for prefixNameFilter := range prefixNamesFilterMap {
|
for prefixNameFilter := range prefixNamesFilterMap {
|
||||||
if !strings.HasPrefix(peerState.Fqdn, prefixNameFilter) {
|
if strings.HasPrefix(peerState.Fqdn, prefixNameFilter) {
|
||||||
nameEval = true
|
nameEval = false
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
nameEval = false
|
||||||
}
|
}
|
||||||
|
|
||||||
return statusEval || ipEval || nameEval
|
return statusEval || ipEval || nameEval
|
||||||
|
|||||||
@@ -134,36 +134,29 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
|
|||||||
statusICE: NewAtomicConnStatus(),
|
statusICE: NewAtomicConnStatus(),
|
||||||
}
|
}
|
||||||
|
|
||||||
rFns := WorkerRelayCallbacks{
|
|
||||||
OnConnReady: conn.relayConnectionIsReady,
|
|
||||||
OnDisconnected: conn.onWorkerRelayStateDisconnected,
|
|
||||||
}
|
|
||||||
|
|
||||||
wFns := WorkerICECallbacks{
|
|
||||||
OnConnReady: conn.iCEConnectionIsReady,
|
|
||||||
OnStatusChanged: conn.onWorkerICEStateDisconnected,
|
|
||||||
}
|
|
||||||
|
|
||||||
ctrl := isController(config)
|
ctrl := isController(config)
|
||||||
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager, rFns)
|
conn.workerRelay = NewWorkerRelay(connLog, ctrl, config, relayManager)
|
||||||
|
|
||||||
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
|
||||||
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally, wFns)
|
conn.workerICE, err = NewWorkerICE(ctx, connLog, config, signaler, iFaceDiscover, statusRecorder, relayIsSupportedLocally)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
|
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
|
||||||
|
|
||||||
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
|
conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) {
|
||||||
|
conn.workerRelay.OnNewOffer(ctx, remoteOfferAnswer)
|
||||||
|
})
|
||||||
if os.Getenv("NB_FORCE_RELAY") != "true" {
|
if os.Getenv("NB_FORCE_RELAY") != "true" {
|
||||||
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
|
conn.handshaker.AddOnNewOfferListener(func(remoteOfferAnswer *OfferAnswer) {
|
||||||
|
conn.workerICE.OnNewOffer(ctx, remoteOfferAnswer)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher)
|
conn.guard = guard.NewGuard(connLog, ctrl, conn.isConnectedOnAllWay, config.Timeout, srWatcher)
|
||||||
|
|
||||||
go conn.handshaker.Listen()
|
go conn.handshaker.Listen()
|
||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -190,6 +183,7 @@ func (conn *Conn) Open() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
go conn.startHandshakeAndReconnect(conn.ctx)
|
go conn.startHandshakeAndReconnect(conn.ctx)
|
||||||
|
go conn.listenWorkersEvents()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) {
|
func (conn *Conn) startHandshakeAndReconnect(ctx context.Context) {
|
||||||
@@ -301,7 +295,7 @@ func (conn *Conn) GetKey() string {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
// configureConnection starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected
|
||||||
func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICEConnInfo) {
|
func (conn *Conn) iCEConnectionIsReady(iceConnInfo ICEConnInfo) {
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
@@ -311,7 +305,7 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
|
|||||||
|
|
||||||
conn.log.Debugf("ICE connection is ready")
|
conn.log.Debugf("ICE connection is ready")
|
||||||
|
|
||||||
if conn.currentConnPriority > priority {
|
if conn.currentConnPriority > iceConnInfo.ConnPriority {
|
||||||
conn.statusICE.Set(StatusConnected)
|
conn.statusICE.Set(StatusConnected)
|
||||||
conn.updateIceState(iceConnInfo)
|
conn.updateIceState(iceConnInfo)
|
||||||
return
|
return
|
||||||
@@ -333,7 +327,7 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
|
|||||||
ep = wgProxy.EndpointAddr()
|
ep = wgProxy.EndpointAddr()
|
||||||
conn.wgProxyICE = wgProxy
|
conn.wgProxyICE = wgProxy
|
||||||
} else {
|
} else {
|
||||||
directEp, err := net.ResolveUDPAddr("udp", iceConnInfo.RemoteConn.RemoteAddr().String())
|
directEp, err := net.ResolveUDPAddr("udp", iceConnInfo.RemoteIceCandidateEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to resolveUDPaddr")
|
log.Errorf("failed to resolveUDPaddr")
|
||||||
conn.handleConfigurationFailure(err, nil)
|
conn.handleConfigurationFailure(err, nil)
|
||||||
@@ -361,14 +355,13 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
wgConfigWorkaround()
|
wgConfigWorkaround()
|
||||||
conn.currentConnPriority = priority
|
conn.currentConnPriority = iceConnInfo.ConnPriority
|
||||||
conn.statusICE.Set(StatusConnected)
|
conn.statusICE.Set(StatusConnected)
|
||||||
conn.updateIceState(iceConnInfo)
|
conn.updateIceState(iceConnInfo)
|
||||||
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
|
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo review to make sense to handle connecting and disconnected status also?
|
func (conn *Conn) onWorkerICEStateDisconnected() {
|
||||||
func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
|
||||||
conn.mu.Lock()
|
conn.mu.Lock()
|
||||||
defer conn.mu.Unlock()
|
defer conn.mu.Unlock()
|
||||||
|
|
||||||
@@ -376,7 +369,7 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
conn.log.Tracef("ICE connection state changed to %s", newState)
|
conn.log.Tracef("ICE connection state changed to disconnected")
|
||||||
|
|
||||||
if conn.wgProxyICE != nil {
|
if conn.wgProxyICE != nil {
|
||||||
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
if err := conn.wgProxyICE.CloseConn(); err != nil {
|
||||||
@@ -396,8 +389,8 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
|
|||||||
conn.currentConnPriority = connPriorityRelay
|
conn.currentConnPriority = connPriorityRelay
|
||||||
}
|
}
|
||||||
|
|
||||||
changed := conn.statusICE.Get() != newState && newState != StatusConnecting
|
changed := conn.statusICE.Get() != stateDisconnected
|
||||||
conn.statusICE.Set(newState)
|
conn.statusICE.Set(stateDisconnected)
|
||||||
|
|
||||||
conn.guard.SetICEConnDisconnected(changed)
|
conn.guard.SetICEConnDisconnected(changed)
|
||||||
|
|
||||||
@@ -731,6 +724,33 @@ func (conn *Conn) logTraceConnState() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (conn *Conn) listenWorkersEvents() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case e := <-conn.workerRelay.EventChan:
|
||||||
|
switch e.ConnStatus {
|
||||||
|
case StatusConnected:
|
||||||
|
conn.relayConnectionIsReady(e.RelayConnInfo)
|
||||||
|
case StatusDisconnected:
|
||||||
|
conn.onWorkerRelayStateDisconnected()
|
||||||
|
default:
|
||||||
|
log.Errorf("unexpected relay connection status: %v", e.ConnStatus)
|
||||||
|
}
|
||||||
|
case e := <-conn.workerICE.EventChan:
|
||||||
|
switch e.ConnStatus {
|
||||||
|
case StatusConnected:
|
||||||
|
conn.iCEConnectionIsReady(e.ICEConnInfo)
|
||||||
|
case StatusDisconnected:
|
||||||
|
conn.onWorkerICEStateDisconnected()
|
||||||
|
default:
|
||||||
|
log.Errorf("unexpected ICE connection status: %v", e.ConnStatus)
|
||||||
|
}
|
||||||
|
case <-conn.ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func isController(config ConnConfig) bool {
|
func isController(config ConnConfig) bool {
|
||||||
return config.LocalKey > config.Key
|
return config.LocalKey > config.Key
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,8 +19,14 @@ import (
|
|||||||
"github.com/netbirdio/netbird/route"
|
"github.com/netbirdio/netbird/route"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type ICEEvent struct {
|
||||||
|
ConnStatus ConnStatus
|
||||||
|
ICEConnInfo ICEConnInfo
|
||||||
|
}
|
||||||
|
|
||||||
type ICEConnInfo struct {
|
type ICEConnInfo struct {
|
||||||
RemoteConn net.Conn
|
RemoteConn net.Conn
|
||||||
|
RemoteAddr net.Addr
|
||||||
RosenpassPubKey []byte
|
RosenpassPubKey []byte
|
||||||
RosenpassAddr string
|
RosenpassAddr string
|
||||||
LocalIceCandidateType string
|
LocalIceCandidateType string
|
||||||
@@ -29,14 +35,11 @@ type ICEConnInfo struct {
|
|||||||
LocalIceCandidateEndpoint string
|
LocalIceCandidateEndpoint string
|
||||||
Relayed bool
|
Relayed bool
|
||||||
RelayedOnLocal bool
|
RelayedOnLocal bool
|
||||||
}
|
ConnPriority ConnPriority
|
||||||
|
|
||||||
type WorkerICECallbacks struct {
|
|
||||||
OnConnReady func(ConnPriority, ICEConnInfo)
|
|
||||||
OnStatusChanged func(ConnStatus)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkerICE struct {
|
type WorkerICE struct {
|
||||||
|
EventChan chan ICEEvent
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
config ConnConfig
|
config ConnConfig
|
||||||
@@ -44,7 +47,6 @@ type WorkerICE struct {
|
|||||||
iFaceDiscover stdnet.ExternalIFaceDiscover
|
iFaceDiscover stdnet.ExternalIFaceDiscover
|
||||||
statusRecorder *Status
|
statusRecorder *Status
|
||||||
hasRelayOnLocally bool
|
hasRelayOnLocally bool
|
||||||
conn WorkerICECallbacks
|
|
||||||
|
|
||||||
selectedPriority ConnPriority
|
selectedPriority ConnPriority
|
||||||
|
|
||||||
@@ -59,8 +61,9 @@ type WorkerICE struct {
|
|||||||
localPwd string
|
localPwd string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool, callBacks WorkerICECallbacks) (*WorkerICE, error) {
|
func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ifaceDiscover stdnet.ExternalIFaceDiscover, statusRecorder *Status, hasRelayOnLocally bool) (*WorkerICE, error) {
|
||||||
w := &WorkerICE{
|
w := &WorkerICE{
|
||||||
|
EventChan: make(chan ICEEvent, 2),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
log: log,
|
log: log,
|
||||||
config: config,
|
config: config,
|
||||||
@@ -68,7 +71,6 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signal
|
|||||||
iFaceDiscover: ifaceDiscover,
|
iFaceDiscover: ifaceDiscover,
|
||||||
statusRecorder: statusRecorder,
|
statusRecorder: statusRecorder,
|
||||||
hasRelayOnLocally: hasRelayOnLocally,
|
hasRelayOnLocally: hasRelayOnLocally,
|
||||||
conn: callBacks,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
localUfrag, localPwd, err := icemaker.GenerateICECredentials()
|
localUfrag, localPwd, err := icemaker.GenerateICECredentials()
|
||||||
@@ -80,7 +82,7 @@ func NewWorkerICE(ctx context.Context, log *log.Entry, config ConnConfig, signal
|
|||||||
return w, nil
|
return w, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
func (w *WorkerICE) OnNewOffer(_ context.Context, remoteOfferAnswer *OfferAnswer) {
|
||||||
w.log.Debugf("OnNewOffer for ICE")
|
w.log.Debugf("OnNewOffer for ICE")
|
||||||
w.muxAgent.Lock()
|
w.muxAgent.Lock()
|
||||||
|
|
||||||
@@ -133,6 +135,11 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if pair == nil {
|
||||||
|
w.log.Errorf("remote address is nil, ICE conn already closed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
if !isRelayCandidate(pair.Local) {
|
if !isRelayCandidate(pair.Local) {
|
||||||
// dynamically set remote WireGuard port if other side specified a different one from the default one
|
// dynamically set remote WireGuard port if other side specified a different one from the default one
|
||||||
remoteWgPort := iface.DefaultWgPort
|
remoteWgPort := iface.DefaultWgPort
|
||||||
@@ -154,9 +161,13 @@ func (w *WorkerICE) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()),
|
RemoteIceCandidateEndpoint: fmt.Sprintf("%s:%d", pair.Remote.Address(), pair.Remote.Port()),
|
||||||
Relayed: isRelayed(pair),
|
Relayed: isRelayed(pair),
|
||||||
RelayedOnLocal: isRelayCandidate(pair.Local),
|
RelayedOnLocal: isRelayCandidate(pair.Local),
|
||||||
|
ConnPriority: w.selectedPriority,
|
||||||
}
|
}
|
||||||
w.log.Debugf("on ICE conn read to use ready")
|
w.log.Debugf("on ICE conn read to use ready")
|
||||||
go w.conn.OnConnReady(w.selectedPriority, ci)
|
select {
|
||||||
|
case w.EventChan <- ICEEvent{ConnStatus: StatusConnected, ICEConnInfo: ci}:
|
||||||
|
case <-w.ctx.Done():
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer.
|
||||||
@@ -216,7 +227,10 @@ func (w *WorkerICE) reCreateAgent(agentCancel context.CancelFunc, candidates []i
|
|||||||
err = agent.OnConnectionStateChange(func(state ice.ConnectionState) {
|
err = agent.OnConnectionStateChange(func(state ice.ConnectionState) {
|
||||||
w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
|
w.log.Debugf("ICE ConnectionState has changed to %s", state.String())
|
||||||
if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected {
|
if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected {
|
||||||
w.conn.OnStatusChanged(StatusDisconnected)
|
select {
|
||||||
|
case w.EventChan <- ICEEvent{ConnStatus: StatusDisconnected}:
|
||||||
|
case <-w.ctx.Done():
|
||||||
|
}
|
||||||
|
|
||||||
w.muxAgent.Lock()
|
w.muxAgent.Lock()
|
||||||
agentCancel()
|
agentCancel()
|
||||||
|
|||||||
@@ -18,23 +18,23 @@ var (
|
|||||||
wgHandshakeOvertime = 30 * time.Second
|
wgHandshakeOvertime = 30 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type RelayEvent struct {
|
||||||
|
ConnStatus ConnStatus
|
||||||
|
RelayConnInfo RelayConnInfo
|
||||||
|
}
|
||||||
|
|
||||||
type RelayConnInfo struct {
|
type RelayConnInfo struct {
|
||||||
relayedConn net.Conn
|
relayedConn net.Conn
|
||||||
rosenpassPubKey []byte
|
rosenpassPubKey []byte
|
||||||
rosenpassAddr string
|
rosenpassAddr string
|
||||||
}
|
}
|
||||||
|
|
||||||
type WorkerRelayCallbacks struct {
|
|
||||||
OnConnReady func(RelayConnInfo)
|
|
||||||
OnDisconnected func()
|
|
||||||
}
|
|
||||||
|
|
||||||
type WorkerRelay struct {
|
type WorkerRelay struct {
|
||||||
|
EventChan chan RelayEvent
|
||||||
log *log.Entry
|
log *log.Entry
|
||||||
isController bool
|
isController bool
|
||||||
config ConnConfig
|
config ConnConfig
|
||||||
relayManager relayClient.ManagerService
|
relayManager relayClient.ManagerService
|
||||||
callBacks WorkerRelayCallbacks
|
|
||||||
|
|
||||||
relayedConn net.Conn
|
relayedConn net.Conn
|
||||||
relayLock sync.Mutex
|
relayLock sync.Mutex
|
||||||
@@ -45,18 +45,18 @@ type WorkerRelay struct {
|
|||||||
relaySupportedOnRemotePeer atomic.Bool
|
relaySupportedOnRemotePeer atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService, callbacks WorkerRelayCallbacks) *WorkerRelay {
|
func NewWorkerRelay(log *log.Entry, ctrl bool, config ConnConfig, relayManager relayClient.ManagerService) *WorkerRelay {
|
||||||
r := &WorkerRelay{
|
r := &WorkerRelay{
|
||||||
|
EventChan: make(chan RelayEvent, 2),
|
||||||
log: log,
|
log: log,
|
||||||
isController: ctrl,
|
isController: ctrl,
|
||||||
config: config,
|
config: config,
|
||||||
relayManager: relayManager,
|
relayManager: relayManager,
|
||||||
callBacks: callbacks,
|
|
||||||
}
|
}
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
func (w *WorkerRelay) OnNewOffer(ctx context.Context, remoteOfferAnswer *OfferAnswer) {
|
||||||
if !w.isRelaySupported(remoteOfferAnswer) {
|
if !w.isRelaySupported(remoteOfferAnswer) {
|
||||||
w.log.Infof("Relay is not supported by remote peer")
|
w.log.Infof("Relay is not supported by remote peer")
|
||||||
w.relaySupportedOnRemotePeer.Store(false)
|
w.relaySupportedOnRemotePeer.Store(false)
|
||||||
@@ -87,7 +87,9 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
w.relayedConn = relayedConn
|
w.relayedConn = relayedConn
|
||||||
w.relayLock.Unlock()
|
w.relayLock.Unlock()
|
||||||
|
|
||||||
err = w.relayManager.AddCloseListener(srv, w.onRelayMGDisconnected)
|
err = w.relayManager.AddCloseListener(srv, func() {
|
||||||
|
w.onRelayMGDisconnected(ctx)
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to add close listener: %s", err)
|
log.Errorf("failed to add close listener: %s", err)
|
||||||
_ = relayedConn.Close()
|
_ = relayedConn.Close()
|
||||||
@@ -95,11 +97,17 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
w.log.Debugf("peer conn opened via Relay: %s", srv)
|
w.log.Debugf("peer conn opened via Relay: %s", srv)
|
||||||
go w.callBacks.OnConnReady(RelayConnInfo{
|
select {
|
||||||
relayedConn: relayedConn,
|
case w.EventChan <- RelayEvent{
|
||||||
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
|
ConnStatus: StatusConnected,
|
||||||
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
|
RelayConnInfo: RelayConnInfo{
|
||||||
})
|
relayedConn: relayedConn,
|
||||||
|
rosenpassPubKey: remoteOfferAnswer.RosenpassPubKey,
|
||||||
|
rosenpassAddr: remoteOfferAnswer.RosenpassAddr,
|
||||||
|
},
|
||||||
|
}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) {
|
func (w *WorkerRelay) EnableWgWatcher(ctx context.Context) {
|
||||||
@@ -187,7 +195,11 @@ func (w *WorkerRelay) wgStateCheck(ctx context.Context, ctxCancel context.Cancel
|
|||||||
w.relayLock.Lock()
|
w.relayLock.Lock()
|
||||||
_ = w.relayedConn.Close()
|
_ = w.relayedConn.Close()
|
||||||
w.relayLock.Unlock()
|
w.relayLock.Unlock()
|
||||||
w.callBacks.OnDisconnected()
|
|
||||||
|
select {
|
||||||
|
case w.EventChan <- RelayEvent{ConnStatus: StatusDisconnected}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -225,12 +237,16 @@ func (w *WorkerRelay) wgState() (time.Time, error) {
|
|||||||
return wgState.LastHandshake, nil
|
return wgState.LastHandshake, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *WorkerRelay) onRelayMGDisconnected() {
|
func (w *WorkerRelay) onRelayMGDisconnected(ctx context.Context) {
|
||||||
w.ctxLock.Lock()
|
w.ctxLock.Lock()
|
||||||
defer w.ctxLock.Unlock()
|
defer w.ctxLock.Unlock()
|
||||||
|
|
||||||
if w.ctxCancelWgWatch != nil {
|
if w.ctxCancelWgWatch != nil {
|
||||||
w.ctxCancelWgWatch()
|
w.ctxCancelWgWatch()
|
||||||
}
|
}
|
||||||
go w.callBacks.OnDisconnected()
|
|
||||||
|
select {
|
||||||
|
case w.EventChan <- RelayEvent{ConnStatus: StatusDisconnected}:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
126
funding.json
Normal file
126
funding.json
Normal file
@@ -0,0 +1,126 @@
|
|||||||
|
{
|
||||||
|
"version": "v1.0.0",
|
||||||
|
"entity": {
|
||||||
|
"type": "organisation",
|
||||||
|
"role": "owner",
|
||||||
|
"name": "NetBird GmbH",
|
||||||
|
"email": "hello@netbird.io",
|
||||||
|
"phone": "",
|
||||||
|
"description": "NetBird GmbH is a Berlin-based software company specializing in the development of open-source network security solutions. Network security is utterly complex and expensive, accessible only to companies with multi-million dollar IT budgets. In contrast, there are millions of companies left behind. Our mission is to create an advanced network and cybersecurity platform that is both easy-to-use and affordable for teams of all sizes and budgets. By leveraging the open-source strategy and technological advancements, NetBird aims to set the industry standard for connecting and securing IT infrastructure.",
|
||||||
|
"webpageUrl": {
|
||||||
|
"url": "https://github.com/netbirdio"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"projects": [
|
||||||
|
{
|
||||||
|
"guid": "netbird",
|
||||||
|
"name": "NetBird",
|
||||||
|
"description": "NetBird is a configuration-free peer-to-peer private network and a centralized access control system combined in a single open-source platform. It makes it easy to create secure WireGuard-based private networks for your organization or home.",
|
||||||
|
"webpageUrl": {
|
||||||
|
"url": "https://github.com/netbirdio/netbird"
|
||||||
|
},
|
||||||
|
"repositoryUrl": {
|
||||||
|
"url": "https://github.com/netbirdio/netbird"
|
||||||
|
},
|
||||||
|
"licenses": [
|
||||||
|
"BSD-3"
|
||||||
|
],
|
||||||
|
"tags": [
|
||||||
|
"network-security",
|
||||||
|
"vpn",
|
||||||
|
"developer-tools",
|
||||||
|
"ztna",
|
||||||
|
"zero-trust",
|
||||||
|
"remote-access",
|
||||||
|
"wireguard",
|
||||||
|
"peer-to-peer",
|
||||||
|
"private-networking",
|
||||||
|
"software-defined-networking"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"funding": {
|
||||||
|
"channels": [
|
||||||
|
{
|
||||||
|
"guid": "github-sponsors",
|
||||||
|
"type": "payment-provider",
|
||||||
|
"address": "https://github.com/sponsors/netbirdio",
|
||||||
|
"description": ""
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"guid": "bank-transfer",
|
||||||
|
"type": "bank",
|
||||||
|
"address": "",
|
||||||
|
"description": "Contact us at hello@netbird.io for bank transfer details."
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"plans": [
|
||||||
|
{
|
||||||
|
"guid": "support-yearly",
|
||||||
|
"status": "active",
|
||||||
|
"name": "Support Open Source Development and Maintenance - Yearly",
|
||||||
|
"description": "This will help us partially cover the yearly cost of maintaining the open-source NetBird project.",
|
||||||
|
"amount": 100000,
|
||||||
|
"currency": "USD",
|
||||||
|
"frequency": "yearly",
|
||||||
|
"channels": [
|
||||||
|
"github-sponsors",
|
||||||
|
"bank-transfer"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"guid": "support-one-time-year",
|
||||||
|
"status": "active",
|
||||||
|
"name": "Support Open Source Development and Maintenance - One Year",
|
||||||
|
"description": "This will help us partially cover the yearly cost of maintaining the open-source NetBird project.",
|
||||||
|
"amount": 100000,
|
||||||
|
"currency": "USD",
|
||||||
|
"frequency": "one-time",
|
||||||
|
"channels": [
|
||||||
|
"github-sponsors",
|
||||||
|
"bank-transfer"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"guid": "support-one-time-monthly",
|
||||||
|
"status": "active",
|
||||||
|
"name": "Support Open Source Development and Maintenance - Monthly",
|
||||||
|
"description": "This will help us partially cover the monthly cost of maintaining the open-source NetBird project.",
|
||||||
|
"amount": 10000,
|
||||||
|
"currency": "USD",
|
||||||
|
"frequency": "monthly",
|
||||||
|
"channels": [
|
||||||
|
"github-sponsors",
|
||||||
|
"bank-transfer"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"guid": "support-monthly",
|
||||||
|
"status": "active",
|
||||||
|
"name": "Support Open Source Development and Maintenance - One Month",
|
||||||
|
"description": "This will help us partially cover the monthly cost of maintaining the open-source NetBird project.",
|
||||||
|
"amount": 10000,
|
||||||
|
"currency": "USD",
|
||||||
|
"frequency": "monthly",
|
||||||
|
"channels": [
|
||||||
|
"github-sponsors",
|
||||||
|
"bank-transfer"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"guid": "goodwill",
|
||||||
|
"status": "active",
|
||||||
|
"name": "Goodwill Plan",
|
||||||
|
"description": "Pay anything you wish to show your goodwill for the project.",
|
||||||
|
"amount": 0,
|
||||||
|
"currency": "USD",
|
||||||
|
"frequency": "monthly",
|
||||||
|
"channels": [
|
||||||
|
"github-sponsors",
|
||||||
|
"bank-transfer"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"history": null
|
||||||
|
}
|
||||||
|
}
|
||||||
4
go.mod
4
go.mod
@@ -156,7 +156,7 @@ require (
|
|||||||
github.com/go-text/typesetting v0.1.0 // indirect
|
github.com/go-text/typesetting v0.1.0 // indirect
|
||||||
github.com/gogo/protobuf v1.3.2 // indirect
|
github.com/gogo/protobuf v1.3.2 // indirect
|
||||||
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
|
||||||
github.com/google/btree v1.0.1 // indirect
|
github.com/google/btree v1.1.2 // indirect
|
||||||
github.com/google/s2a-go v0.1.7 // indirect
|
github.com/google/s2a-go v0.1.7 // indirect
|
||||||
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
|
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
|
||||||
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
|
github.com/googleapis/gax-go/v2 v2.12.3 // indirect
|
||||||
@@ -231,7 +231,7 @@ require (
|
|||||||
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
|
gopkg.in/square/go-jose.v2 v2.6.0 // indirect
|
||||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||||
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
|
gopkg.in/tomb.v2 v2.0.0-20161208151619-d5d1b5820637 // indirect
|
||||||
gvisor.dev/gvisor v0.0.0-20230927004350-cbd86285d259 // indirect
|
gvisor.dev/gvisor v0.0.0-20231020174304-b8a429915ff1 // indirect
|
||||||
k8s.io/apimachinery v0.26.2 // indirect
|
k8s.io/apimachinery v0.26.2 // indirect
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
8
go.sum
8
go.sum
@@ -297,8 +297,8 @@ github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek
|
|||||||
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
|
||||||
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||||
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
|
||||||
github.com/google/btree v1.0.1 h1:gK4Kx5IaGY9CD5sPJ36FHiBJ6ZXl0kilRiiCj+jdYp4=
|
github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU=
|
||||||
github.com/google/btree v1.0.1/go.mod h1:xXMiIv4Fb/0kKde4SpL7qlzvu5cMJDRkFDxJfI9uaxA=
|
github.com/google/btree v1.1.2/go.mod h1:qOPhT0dTNdNzV6Z/lhRX0YXUafgPLFUh+gZMl761Gm4=
|
||||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
|
||||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
|
||||||
@@ -1238,8 +1238,8 @@ gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde h1:9DShaph9qhkIYw7QF91I/ynrr4
|
|||||||
gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
|
gorm.io/gorm v1.25.7-0.20240204074919-46816ad31dde/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8=
|
||||||
gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY=
|
gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY=
|
||||||
gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
|
||||||
gvisor.dev/gvisor v0.0.0-20230927004350-cbd86285d259 h1:TbRPT0HtzFP3Cno1zZo7yPzEEnfu8EjLfl6IU9VfqkQ=
|
gvisor.dev/gvisor v0.0.0-20231020174304-b8a429915ff1 h1:qDCwdCWECGnwQSQC01Dpnp09fRHxJs9PbktotUqG+hs=
|
||||||
gvisor.dev/gvisor v0.0.0-20230927004350-cbd86285d259/go.mod h1:AVgIgHMwK63XvmAzWG9vLQ41YnVHN0du0tEC46fI7yY=
|
gvisor.dev/gvisor v0.0.0-20231020174304-b8a429915ff1/go.mod h1:8hmigyCdYtw5xJGfQDJzSH5Ju8XEIDBnpyi8+O6GRt8=
|
||||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"slices"
|
"slices"
|
||||||
|
"sort"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -107,6 +108,12 @@ type PeerSystemMeta struct { //nolint:revive
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (p PeerSystemMeta) isEqual(other PeerSystemMeta) bool {
|
func (p PeerSystemMeta) isEqual(other PeerSystemMeta) bool {
|
||||||
|
sort.Slice(p.NetworkAddresses, func(i, j int) bool {
|
||||||
|
return p.NetworkAddresses[i].Mac < p.NetworkAddresses[j].Mac
|
||||||
|
})
|
||||||
|
sort.Slice(other.NetworkAddresses, func(i, j int) bool {
|
||||||
|
return other.NetworkAddresses[i].Mac < other.NetworkAddresses[j].Mac
|
||||||
|
})
|
||||||
equalNetworkAddresses := slices.EqualFunc(p.NetworkAddresses, other.NetworkAddresses, func(addr NetworkAddress, oAddr NetworkAddress) bool {
|
equalNetworkAddresses := slices.EqualFunc(p.NetworkAddresses, other.NetworkAddresses, func(addr NetworkAddress, oAddr NetworkAddress) bool {
|
||||||
return addr.Mac == oAddr.Mac && addr.NetIP == oAddr.NetIP
|
return addr.Mac == oAddr.Mac && addr.NetIP == oAddr.NetIP
|
||||||
})
|
})
|
||||||
@@ -114,6 +121,12 @@ func (p PeerSystemMeta) isEqual(other PeerSystemMeta) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sort.Slice(p.Files, func(i, j int) bool {
|
||||||
|
return p.Files[i].Path < p.Files[j].Path
|
||||||
|
})
|
||||||
|
sort.Slice(other.Files, func(i, j int) bool {
|
||||||
|
return other.Files[i].Path < other.Files[j].Path
|
||||||
|
})
|
||||||
equalFiles := slices.EqualFunc(p.Files, other.Files, func(file File, oFile File) bool {
|
equalFiles := slices.EqualFunc(p.Files, other.Files, func(file File, oFile File) bool {
|
||||||
return file.Path == oFile.Path && file.Exist == oFile.Exist && file.ProcessIsRunning == oFile.ProcessIsRunning
|
return file.Path == oFile.Path && file.Exist == oFile.Exist && file.ProcessIsRunning == oFile.ProcessIsRunning
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package peer
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/netip"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -29,3 +30,56 @@ func BenchmarkFQDN(b *testing.B) {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIsEqual(t *testing.T) {
|
||||||
|
meta1 := PeerSystemMeta{
|
||||||
|
NetworkAddresses: []NetworkAddress{{
|
||||||
|
NetIP: netip.MustParsePrefix("192.168.1.2/24"),
|
||||||
|
Mac: "2",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
NetIP: netip.MustParsePrefix("192.168.1.0/24"),
|
||||||
|
Mac: "1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Files: []File{
|
||||||
|
{
|
||||||
|
Path: "/etc/hosts1",
|
||||||
|
Exist: true,
|
||||||
|
ProcessIsRunning: true,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Path: "/etc/hosts2",
|
||||||
|
Exist: false,
|
||||||
|
ProcessIsRunning: false,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
meta2 := PeerSystemMeta{
|
||||||
|
NetworkAddresses: []NetworkAddress{
|
||||||
|
{
|
||||||
|
NetIP: netip.MustParsePrefix("192.168.1.0/24"),
|
||||||
|
Mac: "1",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
NetIP: netip.MustParsePrefix("192.168.1.2/24"),
|
||||||
|
Mac: "2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Files: []File{
|
||||||
|
{
|
||||||
|
Path: "/etc/hosts2",
|
||||||
|
Exist: false,
|
||||||
|
ProcessIsRunning: false,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
Path: "/etc/hosts1",
|
||||||
|
Exist: true,
|
||||||
|
ProcessIsRunning: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if !meta1.isEqual(meta2) {
|
||||||
|
t.Error("meta1 should be equal to meta2")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ type UpdateChannelMetrics struct {
|
|||||||
getAllConnectedPeersDurationMicro metric.Int64Histogram
|
getAllConnectedPeersDurationMicro metric.Int64Histogram
|
||||||
getAllConnectedPeers metric.Int64Histogram
|
getAllConnectedPeers metric.Int64Histogram
|
||||||
hasChannelDurationMicro metric.Int64Histogram
|
hasChannelDurationMicro metric.Int64Histogram
|
||||||
|
networkMapDiffDurationMicro metric.Int64Histogram
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -63,6 +64,11 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
networkMapDiffDurationMicro, err := meter.Int64Histogram("management.updatechannel.networkmap.diff.duration.micro")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
return &UpdateChannelMetrics{
|
return &UpdateChannelMetrics{
|
||||||
createChannelDurationMicro: createChannelDurationMicro,
|
createChannelDurationMicro: createChannelDurationMicro,
|
||||||
closeChannelDurationMicro: closeChannelDurationMicro,
|
closeChannelDurationMicro: closeChannelDurationMicro,
|
||||||
@@ -72,6 +78,7 @@ func NewUpdateChannelMetrics(ctx context.Context, meter metric.Meter) (*UpdateCh
|
|||||||
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
|
getAllConnectedPeersDurationMicro: getAllConnectedPeersDurationMicro,
|
||||||
getAllConnectedPeers: getAllConnectedPeers,
|
getAllConnectedPeers: getAllConnectedPeers,
|
||||||
hasChannelDurationMicro: hasChannelDurationMicro,
|
hasChannelDurationMicro: hasChannelDurationMicro,
|
||||||
|
networkMapDiffDurationMicro: networkMapDiffDurationMicro,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
@@ -111,3 +118,8 @@ func (metrics *UpdateChannelMetrics) CountGetAllConnectedPeersDuration(duration
|
|||||||
func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) {
|
func (metrics *UpdateChannelMetrics) CountHasChannelDuration(duration time.Duration) {
|
||||||
metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
|
metrics.hasChannelDurationMicro.Record(metrics.ctx, duration.Microseconds())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CountNetworkMapDiffDurationMicro counts the duration of the NetworkMapDiff method
|
||||||
|
func (metrics *UpdateChannelMetrics) CountNetworkMapDiffDurationMicro(duration time.Duration) {
|
||||||
|
metrics.networkMapDiffDurationMicro.Record(metrics.ctx, duration.Microseconds())
|
||||||
|
}
|
||||||
|
|||||||
@@ -7,11 +7,11 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/server/differs"
|
|
||||||
"github.com/r3labs/diff/v3"
|
"github.com/r3labs/diff/v3"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/proto"
|
"github.com/netbirdio/netbird/management/proto"
|
||||||
|
"github.com/netbirdio/netbird/management/server/differs"
|
||||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -208,10 +208,10 @@ func (p *PeersUpdateManager) handlePeerMessageUpdate(ctx context.Context, peerID
|
|||||||
p.channelsMux.RUnlock()
|
p.channelsMux.RUnlock()
|
||||||
|
|
||||||
if lastSentUpdate != nil {
|
if lastSentUpdate != nil {
|
||||||
updated, err := isNewPeerUpdateMessage(ctx, lastSentUpdate, update)
|
updated, err := isNewPeerUpdateMessage(ctx, lastSentUpdate, update, p.metrics)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.WithContext(ctx).Errorf("error checking for SyncResponse updates: %v", err)
|
log.WithContext(ctx).Errorf("error checking for SyncResponse updates: %v", err)
|
||||||
return false
|
return true
|
||||||
}
|
}
|
||||||
if !updated {
|
if !updated {
|
||||||
log.WithContext(ctx).Debugf("peer %s network map is not updated, skip sending update", peerID)
|
log.WithContext(ctx).Debugf("peer %s network map is not updated, skip sending update", peerID)
|
||||||
@@ -223,7 +223,9 @@ func (p *PeersUpdateManager) handlePeerMessageUpdate(ctx context.Context, peerID
|
|||||||
}
|
}
|
||||||
|
|
||||||
// isNewPeerUpdateMessage checks if the given current update message is a new update that should be sent.
|
// isNewPeerUpdateMessage checks if the given current update message is a new update that should be sent.
|
||||||
func isNewPeerUpdateMessage(ctx context.Context, lastSentUpdate, currUpdateToSend *UpdateMessage) (isNew bool, err error) {
|
func isNewPeerUpdateMessage(ctx context.Context, lastSentUpdate, currUpdateToSend *UpdateMessage, metric telemetry.AppMetrics) (isNew bool, err error) {
|
||||||
|
startTime := time.Now()
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
log.WithContext(ctx).Panicf("comparing peer update messages. Trace: %s", debug.Stack())
|
log.WithContext(ctx).Panicf("comparing peer update messages. Trace: %s", debug.Stack())
|
||||||
@@ -258,6 +260,11 @@ func isNewPeerUpdateMessage(ctx context.Context, lastSentUpdate, currUpdateToSen
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return false, fmt.Errorf("failed to diff network map: %v", err)
|
return false, fmt.Errorf("failed to diff network map: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if metric != nil {
|
||||||
|
metric.UpdateChannelMetrics().CountNetworkMapDiffDurationMicro(time.Since(startTime))
|
||||||
|
}
|
||||||
|
|
||||||
return len(changelog) > 0, nil
|
return len(changelog) > 0, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -7,14 +7,16 @@ import (
|
|||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/assert"
|
||||||
|
|
||||||
nbdns "github.com/netbirdio/netbird/dns"
|
nbdns "github.com/netbirdio/netbird/dns"
|
||||||
"github.com/netbirdio/netbird/management/domain"
|
"github.com/netbirdio/netbird/management/domain"
|
||||||
"github.com/netbirdio/netbird/management/proto"
|
"github.com/netbirdio/netbird/management/proto"
|
||||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||||
"github.com/netbirdio/netbird/management/server/posture"
|
"github.com/netbirdio/netbird/management/server/posture"
|
||||||
|
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||||
nbroute "github.com/netbirdio/netbird/route"
|
nbroute "github.com/netbirdio/netbird/route"
|
||||||
"github.com/netbirdio/netbird/util"
|
"github.com/netbirdio/netbird/util"
|
||||||
"github.com/stretchr/testify/assert"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// var peersUpdater *PeersUpdateManager
|
// var peersUpdater *PeersUpdateManager
|
||||||
@@ -175,8 +177,12 @@ func TestHandlePeerMessageUpdate(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
|
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
p := NewPeersUpdateManager(nil)
|
p := NewPeersUpdateManager(metrics)
|
||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
|
|
||||||
if tt.existingUpdate != nil {
|
if tt.existingUpdate != nil {
|
||||||
@@ -194,7 +200,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage1 := createMockUpdateMessage(t)
|
newUpdateMessage1 := createMockUpdateMessage(t)
|
||||||
newUpdateMessage2 := createMockUpdateMessage(t)
|
newUpdateMessage2 := createMockUpdateMessage(t)
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.False(t, message)
|
assert.False(t, message)
|
||||||
})
|
})
|
||||||
@@ -205,7 +211,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
|
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.False(t, message)
|
assert.False(t, message)
|
||||||
})
|
})
|
||||||
@@ -217,7 +223,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage2.NetworkMap.Routes[0].Network = netip.MustParsePrefix("1.1.1.1/32")
|
newUpdateMessage2.NetworkMap.Routes[0].Network = netip.MustParsePrefix("1.1.1.1/32")
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
|
|
||||||
@@ -230,7 +236,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage2.NetworkMap.Routes[0].Groups = []string{"randomGroup1"}
|
newUpdateMessage2.NetworkMap.Routes[0].Groups = []string{"randomGroup1"}
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
@@ -249,7 +255,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage2.NetworkMap.Peers = append(newUpdateMessage2.NetworkMap.Peers, newPeer)
|
newUpdateMessage2.NetworkMap.Peers = append(newUpdateMessage2.NetworkMap.Peers, newPeer)
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
@@ -259,14 +265,14 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
|
|
||||||
newUpdateMessage2 := createMockUpdateMessage(t)
|
newUpdateMessage2 := createMockUpdateMessage(t)
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.False(t, message)
|
assert.False(t, message)
|
||||||
|
|
||||||
newUpdateMessage3 := createMockUpdateMessage(t)
|
newUpdateMessage3 := createMockUpdateMessage(t)
|
||||||
newUpdateMessage3.Update.Checks = []*proto.Checks{}
|
newUpdateMessage3.Update.Checks = []*proto.Checks{}
|
||||||
newUpdateMessage3.Update.NetworkMap.Serial++
|
newUpdateMessage3.Update.NetworkMap.Serial++
|
||||||
message, err = isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage3)
|
message, err = isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage3, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
|
|
||||||
@@ -285,7 +291,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
}
|
}
|
||||||
newUpdateMessage4.Update.Checks = []*proto.Checks{toProtocolCheck(check)}
|
newUpdateMessage4.Update.Checks = []*proto.Checks{toProtocolCheck(check)}
|
||||||
newUpdateMessage4.Update.NetworkMap.Serial++
|
newUpdateMessage4.Update.NetworkMap.Serial++
|
||||||
message, err = isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage4)
|
message, err = isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage4, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
|
|
||||||
@@ -305,7 +311,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
}
|
}
|
||||||
newUpdateMessage5.Update.Checks = []*proto.Checks{toProtocolCheck(check)}
|
newUpdateMessage5.Update.Checks = []*proto.Checks{toProtocolCheck(check)}
|
||||||
newUpdateMessage5.Update.NetworkMap.Serial++
|
newUpdateMessage5.Update.NetworkMap.Serial++
|
||||||
message, err = isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage5)
|
message, err = isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage5, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
@@ -321,7 +327,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
)
|
)
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
@@ -333,7 +339,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage2.NetworkMap.Peers[0].IP = net.ParseIP("192.168.1.10")
|
newUpdateMessage2.NetworkMap.Peers[0].IP = net.ParseIP("192.168.1.10")
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
@@ -345,7 +351,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage2.NetworkMap.FirewallRules[0].Port = "443"
|
newUpdateMessage2.NetworkMap.FirewallRules[0].Port = "443"
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
@@ -364,7 +370,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage2.NetworkMap.FirewallRules = append(newUpdateMessage2.NetworkMap.FirewallRules, newRule)
|
newUpdateMessage2.NetworkMap.FirewallRules = append(newUpdateMessage2.NetworkMap.FirewallRules, newRule)
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
@@ -376,7 +382,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage2.NetworkMap.DNSConfig.NameServerGroups[0].NameServers = make([]nbdns.NameServer, 0)
|
newUpdateMessage2.NetworkMap.DNSConfig.NameServerGroups[0].NameServers = make([]nbdns.NameServer, 0)
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
@@ -388,7 +394,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage2.NetworkMap.DNSConfig.NameServerGroups[0].NameServers[0].IP = netip.MustParseAddr("8.8.4.4")
|
newUpdateMessage2.NetworkMap.DNSConfig.NameServerGroups[0].NameServers[0].IP = netip.MustParseAddr("8.8.4.4")
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
@@ -400,7 +406,7 @@ func TestIsNewPeerUpdateMessage(t *testing.T) {
|
|||||||
newUpdateMessage2.NetworkMap.DNSConfig.CustomZones[0].Records[0].RData = "100.64.0.2"
|
newUpdateMessage2.NetworkMap.DNSConfig.CustomZones[0].Records[0].RData = "100.64.0.2"
|
||||||
newUpdateMessage2.Update.NetworkMap.Serial++
|
newUpdateMessage2.Update.NetworkMap.Serial++
|
||||||
|
|
||||||
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2)
|
message, err := isNewPeerUpdateMessage(context.Background(), newUpdateMessage1, newUpdateMessage2, nil)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.True(t, message)
|
assert.True(t, message)
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user