mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-06 00:56:39 +00:00
Compare commits
38 Commits
feature/st
...
refactor/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6ce67f383e | ||
|
|
c813b6ef50 | ||
|
|
7f4f6e0553 | ||
|
|
e137a78541 | ||
|
|
40cdccd9d7 | ||
|
|
28f8007d0f | ||
|
|
ae78baade6 | ||
|
|
a35215f720 | ||
|
|
91278c2bea | ||
|
|
66b2a2f9ca | ||
|
|
f7bf9f6c24 | ||
|
|
d20b50ac45 | ||
|
|
db9b5bfd7f | ||
|
|
acfcef3d54 | ||
|
|
f9992c3ac8 | ||
|
|
52f4290be6 | ||
|
|
a2f8667f9d | ||
|
|
33f4bf8bbd | ||
|
|
1d940b7ec0 | ||
|
|
47036cc625 | ||
|
|
caf0c81524 | ||
|
|
e5f61c0361 | ||
|
|
9c002c3f68 | ||
|
|
f0788e2d60 | ||
|
|
f17ea2aa57 | ||
|
|
316fc15701 | ||
|
|
9aab153090 | ||
|
|
c38e0d9678 | ||
|
|
48e45b64bb | ||
|
|
2e20c978b3 | ||
|
|
d9c585f575 | ||
|
|
9184a0c6ac | ||
|
|
7ba8b926f0 | ||
|
|
71733dff3e | ||
|
|
4d46adbb68 | ||
|
|
0753766336 | ||
|
|
52d8bdfc78 | ||
|
|
56c67fdf08 |
@@ -57,7 +57,7 @@ func startSignal(t *testing.T) (*grpc.Server, net.Listener) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
s := grpc.NewServer()
|
||||
srv, err := sig.NewServer(context.Background(), otel.Meter(""))
|
||||
srv, err := sig.NewServer(context.Background(), otel.Meter(""), nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
sigProto.RegisterSignalExchangeServer(s, srv)
|
||||
|
||||
@@ -31,7 +31,6 @@ const (
|
||||
systemdDbusSetDefaultRouteMethodSuffix = systemdDbusLinkInterface + ".SetDefaultRoute"
|
||||
systemdDbusSetDomainsMethodSuffix = systemdDbusLinkInterface + ".SetDomains"
|
||||
systemdDbusSetDNSSECMethodSuffix = systemdDbusLinkInterface + ".SetDNSSEC"
|
||||
systemdDbusSetDNSOverTLSMethodSuffix = systemdDbusLinkInterface + ".SetDNSOverTLS"
|
||||
systemdDbusResolvConfModeForeign = "foreign"
|
||||
|
||||
dbusErrorUnknownObject = "org.freedesktop.DBus.Error.UnknownObject"
|
||||
@@ -103,11 +102,6 @@ func (s *systemdDbusConfigurator) applyDNSConfig(config HostDNSConfig, stateMana
|
||||
log.Warnf("failed to set DNSSEC to 'no': %v", err)
|
||||
}
|
||||
|
||||
// We don't support DNSOverTLS. On some machines this is default on so we explicitly set it to off
|
||||
if err := s.callLinkMethod(systemdDbusSetDNSOverTLSMethodSuffix, dnsSecDisabled); err != nil {
|
||||
log.Warnf("failed to set DNSOverTLS to 'no': %v", err)
|
||||
}
|
||||
|
||||
var (
|
||||
searchDomains []string
|
||||
matchDomains []string
|
||||
|
||||
@@ -40,6 +40,7 @@ type Manager struct {
|
||||
fwRules []firewall.Rule
|
||||
tcpRules []firewall.Rule
|
||||
dnsForwarder *DNSForwarder
|
||||
port uint16
|
||||
}
|
||||
|
||||
func ListenPort() uint16 {
|
||||
@@ -48,16 +49,11 @@ func ListenPort() uint16 {
|
||||
return listenPort
|
||||
}
|
||||
|
||||
func SetListenPort(port uint16) {
|
||||
listenPortMu.Lock()
|
||||
listenPort = port
|
||||
listenPortMu.Unlock()
|
||||
}
|
||||
|
||||
func NewManager(fw firewall.Manager, statusRecorder *peer.Status) *Manager {
|
||||
func NewManager(fw firewall.Manager, statusRecorder *peer.Status, port uint16) *Manager {
|
||||
return &Manager{
|
||||
firewall: fw,
|
||||
statusRecorder: statusRecorder,
|
||||
port: port,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -71,6 +67,12 @@ func (m *Manager) Start(fwdEntries []*ForwarderEntry) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if m.port > 0 {
|
||||
listenPortMu.Lock()
|
||||
listenPort = m.port
|
||||
listenPortMu.Unlock()
|
||||
}
|
||||
|
||||
m.dnsForwarder = NewDNSForwarder(fmt.Sprintf(":%d", ListenPort()), dnsTTL, m.firewall, m.statusRecorder)
|
||||
go func() {
|
||||
if err := m.dnsForwarder.Listen(fwdEntries); err != nil {
|
||||
|
||||
@@ -1849,10 +1849,6 @@ func (e *Engine) updateDNSForwarder(
|
||||
return
|
||||
}
|
||||
|
||||
if forwarderPort > 0 {
|
||||
dnsfwd.SetListenPort(forwarderPort)
|
||||
}
|
||||
|
||||
if !enabled {
|
||||
if e.dnsForwardMgr == nil {
|
||||
return
|
||||
@@ -1866,7 +1862,7 @@ func (e *Engine) updateDNSForwarder(
|
||||
if len(fwdEntries) > 0 {
|
||||
switch {
|
||||
case e.dnsForwardMgr == nil:
|
||||
e.dnsForwardMgr = dnsfwd.NewManager(e.firewall, e.statusRecorder)
|
||||
e.dnsForwardMgr = dnsfwd.NewManager(e.firewall, e.statusRecorder, forwarderPort)
|
||||
if err := e.dnsForwardMgr.Start(fwdEntries); err != nil {
|
||||
log.Errorf("failed to start DNS forward: %v", err)
|
||||
e.dnsForwardMgr = nil
|
||||
@@ -1896,7 +1892,7 @@ func (e *Engine) restartDnsFwd(fwdEntries []*dnsfwd.ForwarderEntry, forwarderPor
|
||||
if err := e.dnsForwardMgr.Stop(context.Background()); err != nil {
|
||||
log.Errorf("failed to stop DNS forward: %v", err)
|
||||
}
|
||||
e.dnsForwardMgr = dnsfwd.NewManager(e.firewall, e.statusRecorder)
|
||||
e.dnsForwardMgr = dnsfwd.NewManager(e.firewall, e.statusRecorder, forwarderPort)
|
||||
if err := e.dnsForwardMgr.Start(fwdEntries); err != nil {
|
||||
log.Errorf("failed to start DNS forward: %v", err)
|
||||
e.dnsForwardMgr = nil
|
||||
|
||||
@@ -1512,7 +1512,7 @@ func startSignal(t *testing.T) (*grpc.Server, string, error) {
|
||||
log.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
|
||||
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""))
|
||||
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""), nil)
|
||||
require.NoError(t, err)
|
||||
proto.RegisterSignalExchangeServer(s, srv)
|
||||
|
||||
|
||||
@@ -107,10 +107,14 @@ type Conn struct {
|
||||
wgProxyRelay wgproxy.Proxy
|
||||
handshaker *Handshaker
|
||||
|
||||
guard *guard.Guard
|
||||
guard Guard
|
||||
semaphore *semaphoregroup.SemaphoreGroup
|
||||
wg sync.WaitGroup
|
||||
|
||||
// used for replace the new guard with the old one in a thread-safe way
|
||||
guardCtxCancel context.CancelFunc
|
||||
wgGuard sync.WaitGroup
|
||||
|
||||
// debug purpose
|
||||
dumpState *stateDump
|
||||
|
||||
@@ -196,14 +200,34 @@ func (conn *Conn) Open(engineCtx context.Context) error {
|
||||
}
|
||||
|
||||
conn.wg.Add(1)
|
||||
conn.wgGuard.Add(1)
|
||||
guardCtx, cancel := context.WithCancel(conn.ctx)
|
||||
conn.guardCtxCancel = cancel
|
||||
go func() {
|
||||
defer conn.wg.Done()
|
||||
defer conn.wgGuard.Done()
|
||||
defer cancel()
|
||||
|
||||
conn.waitInitialRandomSleepTime(conn.ctx)
|
||||
conn.semaphore.Done(conn.ctx)
|
||||
|
||||
conn.guard.Start(conn.ctx, conn.onGuardEvent)
|
||||
conn.guard.Start(guardCtx, conn.onGuardEvent)
|
||||
}()
|
||||
|
||||
// both peer send offer
|
||||
if err := conn.handshaker.SendOffer(); err != nil {
|
||||
switch err {
|
||||
case ErrPeerNotAvailable:
|
||||
conn.Log.Warnf("failed to deliver offer to peer. Peer is not available")
|
||||
case ErrSignalNotSupportDeliveryCheck:
|
||||
conn.Log.Infof("signal delivery check is not supported, switch guard to retry mode")
|
||||
conn.switchGuard()
|
||||
default:
|
||||
conn.Log.Errorf("failed to deliver offer to peer: %v", err)
|
||||
conn.guard.FailedToSendOffer()
|
||||
}
|
||||
}
|
||||
|
||||
conn.opened = true
|
||||
return nil
|
||||
}
|
||||
@@ -556,7 +580,17 @@ func (conn *Conn) onRelayDisconnected() {
|
||||
func (conn *Conn) onGuardEvent() {
|
||||
conn.dumpState.SendOffer()
|
||||
if err := conn.handshaker.SendOffer(); err != nil {
|
||||
conn.Log.Errorf("failed to send offer: %v", err)
|
||||
switch err {
|
||||
case ErrPeerNotAvailable:
|
||||
conn.Log.Warnf("failed to deliver offer to peer. Peer is not available")
|
||||
case ErrSignalNotSupportDeliveryCheck:
|
||||
conn.Log.Infof("signal delivery check is not supported, switch guard to retry mode")
|
||||
// must run on a separate goroutine to prevent deadlock while close the old guard
|
||||
go conn.switchGuard()
|
||||
default:
|
||||
conn.Log.Errorf("failed to deliver offer to peer: %v", err)
|
||||
conn.guard.FailedToSendOffer()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -778,6 +812,22 @@ func (conn *Conn) rosenpassDetermKey() (*wgtypes.Key, error) {
|
||||
return &key, nil
|
||||
}
|
||||
|
||||
func (conn *Conn) switchGuard() {
|
||||
if conn.guardCtxCancel == nil {
|
||||
return
|
||||
}
|
||||
|
||||
conn.guardCtxCancel()
|
||||
conn.guardCtxCancel = nil
|
||||
conn.wgGuard.Wait()
|
||||
conn.wg.Add(1)
|
||||
go func() {
|
||||
defer conn.wg.Done()
|
||||
conn.guard = guard.NewRetryGuard(conn.Log, conn.isConnectedOnAllWay, conn.config.Timeout, conn.srWatcher)
|
||||
conn.guard.Start(conn.ctx, conn.onGuardEvent)
|
||||
}()
|
||||
}
|
||||
|
||||
func isController(config ConnConfig) bool {
|
||||
return config.LocalKey > config.Key
|
||||
}
|
||||
|
||||
10
client/internal/peer/guard.go
Normal file
10
client/internal/peer/guard.go
Normal file
@@ -0,0 +1,10 @@
|
||||
package peer
|
||||
|
||||
import "context"
|
||||
|
||||
type Guard interface {
|
||||
Start(ctx context.Context, eventCallback func())
|
||||
SetRelayedConnDisconnected()
|
||||
SetICEConnDisconnected()
|
||||
FailedToSendOffer()
|
||||
}
|
||||
@@ -4,20 +4,26 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
offerResendPeriod = 2 * time.Second
|
||||
)
|
||||
|
||||
type isConnectedFunc func() bool
|
||||
|
||||
// Guard is responsible for the reconnection logic.
|
||||
// It will trigger to send an offer to the peer then has connection issues.
|
||||
// Only the offer error will start the timer to resend offer periodically.
|
||||
//
|
||||
// Watch these events:
|
||||
// - Relay client reconnected to home server
|
||||
// - Signal server connection state changed
|
||||
// - ICE connection disconnected
|
||||
// - Relayed connection disconnected
|
||||
// - ICE candidate changes
|
||||
// - Failed to send offer to remote peer
|
||||
type Guard struct {
|
||||
log *log.Entry
|
||||
isConnectedOnAllWay isConnectedFunc
|
||||
@@ -25,6 +31,7 @@ type Guard struct {
|
||||
srWatcher *SRWatcher
|
||||
relayedConnDisconnected chan struct{}
|
||||
iCEConnDisconnected chan struct{}
|
||||
offerError chan struct{}
|
||||
}
|
||||
|
||||
func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *Guard {
|
||||
@@ -35,6 +42,7 @@ func NewGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Durati
|
||||
srWatcher: srWatcher,
|
||||
relayedConnDisconnected: make(chan struct{}, 1),
|
||||
iCEConnDisconnected: make(chan struct{}, 1),
|
||||
offerError: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,81 +65,54 @@ func (g *Guard) SetICEConnDisconnected() {
|
||||
}
|
||||
}
|
||||
|
||||
func (g *Guard) FailedToSendOffer() {
|
||||
select {
|
||||
case g.offerError <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// reconnectLoopWithRetry periodically check the connection status.
|
||||
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
|
||||
func (g *Guard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
|
||||
srReconnectedChan := g.srWatcher.NewListener()
|
||||
defer g.srWatcher.RemoveListener(srReconnectedChan)
|
||||
|
||||
ticker := g.initialTicker(ctx)
|
||||
defer ticker.Stop()
|
||||
|
||||
tickerChannel := ticker.C
|
||||
offerResendTimer := time.NewTimer(0)
|
||||
offerResendTimer.Stop()
|
||||
defer offerResendTimer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case t := <-tickerChannel:
|
||||
if t.IsZero() {
|
||||
g.log.Infof("retry timed out, stop periodic offer sending")
|
||||
// after backoff timeout the ticker.C will be closed. We need to a dummy channel to avoid loop
|
||||
tickerChannel = make(<-chan time.Time)
|
||||
continue
|
||||
}
|
||||
|
||||
case <-g.relayedConnDisconnected:
|
||||
g.log.Debugf("Relay connection changed, retry connection")
|
||||
offerResendTimer.Stop()
|
||||
if !g.isConnectedOnAllWay() {
|
||||
callback()
|
||||
}
|
||||
case <-g.relayedConnDisconnected:
|
||||
g.log.Debugf("Relay connection changed, reset reconnection ticker")
|
||||
ticker.Stop()
|
||||
ticker = g.prepareExponentTicker(ctx)
|
||||
tickerChannel = ticker.C
|
||||
|
||||
case <-g.iCEConnDisconnected:
|
||||
g.log.Debugf("ICE connection changed, reset reconnection ticker")
|
||||
ticker.Stop()
|
||||
ticker = g.prepareExponentTicker(ctx)
|
||||
tickerChannel = ticker.C
|
||||
|
||||
g.log.Debugf("ICE connection changed, retry connection")
|
||||
offerResendTimer.Stop()
|
||||
if !g.isConnectedOnAllWay() {
|
||||
callback()
|
||||
}
|
||||
case <-srReconnectedChan:
|
||||
g.log.Debugf("has network changes, reset reconnection ticker")
|
||||
ticker.Stop()
|
||||
ticker = g.prepareExponentTicker(ctx)
|
||||
tickerChannel = ticker.C
|
||||
|
||||
g.log.Debugf("has network changes, retry connection")
|
||||
offerResendTimer.Stop()
|
||||
if !g.isConnectedOnAllWay() {
|
||||
callback()
|
||||
}
|
||||
case <-g.offerError:
|
||||
g.log.Debugf("failed to send offer, reset reconnection ticker")
|
||||
offerResendTimer.Reset(offerResendPeriod)
|
||||
continue
|
||||
case <-offerResendTimer.C:
|
||||
if !g.isConnectedOnAllWay() {
|
||||
callback()
|
||||
}
|
||||
case <-ctx.Done():
|
||||
g.log.Debugf("context is done, stop reconnect loop")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// initialTicker give chance to the peer to establish the initial connection.
|
||||
func (g *Guard) initialTicker(ctx context.Context) *backoff.Ticker {
|
||||
bo := backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 3 * time.Second,
|
||||
RandomizationFactor: 0.1,
|
||||
Multiplier: 2,
|
||||
MaxInterval: g.timeout,
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
|
||||
return backoff.NewTicker(bo)
|
||||
}
|
||||
|
||||
func (g *Guard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
|
||||
bo := backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: 0.1,
|
||||
Multiplier: 2,
|
||||
MaxInterval: g.timeout,
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
|
||||
ticker := backoff.NewTicker(bo)
|
||||
<-ticker.C // consume the initial tick what is happening right after the ticker has been created
|
||||
|
||||
return ticker
|
||||
}
|
||||
|
||||
139
client/internal/peer/guard/guard_retry.go
Normal file
139
client/internal/peer/guard/guard_retry.go
Normal file
@@ -0,0 +1,139 @@
|
||||
package guard
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
// RetryGuard is responsible for the reconnection logic.
|
||||
// It will trigger to send an offer to the peer then has connection issues.
|
||||
// Watch these events:
|
||||
// - Relay client reconnected to home server
|
||||
// - Signal server connection state changed
|
||||
// - ICE connection disconnected
|
||||
// - Relayed connection disconnected
|
||||
// - ICE candidate changes
|
||||
type RetryGuard struct {
|
||||
log *log.Entry
|
||||
isConnectedOnAllWay isConnectedFunc
|
||||
timeout time.Duration
|
||||
srWatcher *SRWatcher
|
||||
relayedConnDisconnected chan struct{}
|
||||
iCEConnDisconnected chan struct{}
|
||||
}
|
||||
|
||||
func (g *RetryGuard) FailedToSendOffer() {
|
||||
log.Errorf("FailedToSendOffer is not implemented in GuardRetry")
|
||||
}
|
||||
|
||||
func NewRetryGuard(log *log.Entry, isConnectedFn isConnectedFunc, timeout time.Duration, srWatcher *SRWatcher) *RetryGuard {
|
||||
return &RetryGuard{
|
||||
log: log,
|
||||
isConnectedOnAllWay: isConnectedFn,
|
||||
timeout: timeout,
|
||||
srWatcher: srWatcher,
|
||||
relayedConnDisconnected: make(chan struct{}, 1),
|
||||
iCEConnDisconnected: make(chan struct{}, 1),
|
||||
}
|
||||
}
|
||||
|
||||
func (g *RetryGuard) Start(ctx context.Context, eventCallback func()) {
|
||||
g.log.Infof("starting guard for reconnection with MaxInterval: %s", g.timeout)
|
||||
g.reconnectLoopWithRetry(ctx, eventCallback)
|
||||
}
|
||||
|
||||
func (g *RetryGuard) SetRelayedConnDisconnected() {
|
||||
select {
|
||||
case g.relayedConnDisconnected <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (g *RetryGuard) SetICEConnDisconnected() {
|
||||
select {
|
||||
case g.iCEConnDisconnected <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// reconnectLoopWithRetry periodically check the connection status.
|
||||
// Try to send offer while the P2P is not established or while the Relay is not connected if is it supported
|
||||
func (g *RetryGuard) reconnectLoopWithRetry(ctx context.Context, callback func()) {
|
||||
srReconnectedChan := g.srWatcher.NewListener()
|
||||
defer g.srWatcher.RemoveListener(srReconnectedChan)
|
||||
|
||||
ticker := g.initialTicker(ctx)
|
||||
defer ticker.Stop()
|
||||
|
||||
tickerChannel := ticker.C
|
||||
|
||||
for {
|
||||
select {
|
||||
case t := <-tickerChannel:
|
||||
if t.IsZero() {
|
||||
g.log.Infof("retry timed out, stop periodic offer sending")
|
||||
// after backoff timeout the ticker.C will be closed. We need to a dummy channel to avoid loop
|
||||
tickerChannel = make(<-chan time.Time)
|
||||
continue
|
||||
}
|
||||
|
||||
if !g.isConnectedOnAllWay() {
|
||||
callback()
|
||||
}
|
||||
case <-g.relayedConnDisconnected:
|
||||
g.log.Debugf("Relay connection changed, reset reconnection ticker")
|
||||
ticker.Stop()
|
||||
ticker = g.prepareExponentTicker(ctx)
|
||||
tickerChannel = ticker.C
|
||||
|
||||
case <-g.iCEConnDisconnected:
|
||||
g.log.Debugf("ICE connection changed, reset reconnection ticker")
|
||||
ticker.Stop()
|
||||
ticker = g.prepareExponentTicker(ctx)
|
||||
tickerChannel = ticker.C
|
||||
|
||||
case <-srReconnectedChan:
|
||||
g.log.Debugf("has network changes, reset reconnection ticker")
|
||||
ticker.Stop()
|
||||
ticker = g.prepareExponentTicker(ctx)
|
||||
tickerChannel = ticker.C
|
||||
|
||||
case <-ctx.Done():
|
||||
g.log.Debugf("context is done, stop reconnect loop")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// initialTicker give chance to the peer to establish the initial connection.
|
||||
func (g *RetryGuard) initialTicker(ctx context.Context) *backoff.Ticker {
|
||||
bo := backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 3 * time.Second,
|
||||
RandomizationFactor: 0.1,
|
||||
Multiplier: 2,
|
||||
MaxInterval: g.timeout,
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
|
||||
return backoff.NewTicker(bo)
|
||||
}
|
||||
|
||||
func (g *RetryGuard) prepareExponentTicker(ctx context.Context) *backoff.Ticker {
|
||||
bo := backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: 0.1,
|
||||
Multiplier: 2,
|
||||
MaxInterval: g.timeout,
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
|
||||
ticker := backoff.NewTicker(bo)
|
||||
<-ticker.C // consume the initial tick what is happening right after the ticker has been created
|
||||
|
||||
return ticker
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pion/ice/v4"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
@@ -9,9 +12,16 @@ import (
|
||||
sProto "github.com/netbirdio/netbird/shared/signal/proto"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrPeerNotAvailable = signal.ErrPeerNotAvailable
|
||||
ErrSignalNotSupportDeliveryCheck = errors.New("the signal client does not support SendWithDeliveryCheck")
|
||||
)
|
||||
|
||||
type Signaler struct {
|
||||
signal signal.Client
|
||||
wgPrivateKey wgtypes.Key
|
||||
|
||||
deliveryCheckNotSupported atomic.Bool
|
||||
}
|
||||
|
||||
func NewSignaler(signal signal.Client, wgPrivateKey wgtypes.Key) *Signaler {
|
||||
@@ -67,10 +77,21 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string,
|
||||
return err
|
||||
}
|
||||
|
||||
if err = s.signal.Send(msg); err != nil {
|
||||
return err
|
||||
if s.deliveryCheckNotSupported.Load() {
|
||||
return s.signal.Send(msg)
|
||||
}
|
||||
|
||||
if err = s.signal.SendWithDeliveryCheck(msg); err != nil {
|
||||
switch {
|
||||
case errors.Is(err, signal.ErrPeerNotAvailable):
|
||||
return ErrPeerNotAvailable
|
||||
case errors.Is(err, signal.ErrUnimplementedMethod):
|
||||
s.handleUnimplementedMethod(msg)
|
||||
return ErrSignalNotSupportDeliveryCheck
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -83,3 +104,15 @@ func (s *Signaler) SignalIdle(remoteKey string) error {
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Signaler) handleUnimplementedMethod(msg *sProto.Message) {
|
||||
// print out the warning only once
|
||||
if !s.deliveryCheckNotSupported.Load() {
|
||||
log.Warnf("signal client does not support delivery check, falling back to Send method and resend")
|
||||
}
|
||||
|
||||
s.deliveryCheckNotSupported.Store(true)
|
||||
if err := s.signal.Send(msg); err != nil {
|
||||
log.Warnf("failed to send signal msg to remote peer: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -345,7 +345,7 @@ func startSignal(t *testing.T) (*grpc.Server, string, error) {
|
||||
log.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
|
||||
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""))
|
||||
srv, err := signalServer.NewServer(context.Background(), otel.Meter(""), nil)
|
||||
require.NoError(t, err)
|
||||
proto.RegisterSignalExchangeServer(s, srv)
|
||||
|
||||
|
||||
@@ -73,8 +73,8 @@ func (p *RDCleanPathProxy) validateCertificateWithJS(conn *proxyConnection, cert
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RDCleanPathProxy) getTLSConfigWithValidation(conn *proxyConnection, requiresCredSSP bool) *tls.Config {
|
||||
config := &tls.Config{
|
||||
func (p *RDCleanPathProxy) getTLSConfigWithValidation(conn *proxyConnection) *tls.Config {
|
||||
return &tls.Config{
|
||||
InsecureSkipVerify: true, // We'll validate manually after handshake
|
||||
VerifyConnection: func(cs tls.ConnectionState) error {
|
||||
var certChain [][]byte
|
||||
@@ -93,15 +93,4 @@ func (p *RDCleanPathProxy) getTLSConfigWithValidation(conn *proxyConnection, req
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
// CredSSP (NLA) requires TLS 1.2 - it's incompatible with TLS 1.3
|
||||
if requiresCredSSP {
|
||||
config.MinVersion = tls.VersionTLS12
|
||||
config.MaxVersion = tls.VersionTLS12
|
||||
} else {
|
||||
config.MinVersion = tls.VersionTLS12
|
||||
config.MaxVersion = tls.VersionTLS13
|
||||
}
|
||||
|
||||
return config
|
||||
}
|
||||
|
||||
@@ -6,13 +6,11 @@ import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/asn1"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"sync"
|
||||
"syscall/js"
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@@ -21,34 +19,18 @@ const (
|
||||
RDCleanPathVersion = 3390
|
||||
RDCleanPathProxyHost = "rdcleanpath.proxy.local"
|
||||
RDCleanPathProxyScheme = "ws"
|
||||
|
||||
rdpDialTimeout = 15 * time.Second
|
||||
|
||||
GeneralErrorCode = 1
|
||||
WSAETimedOut = 10060
|
||||
WSAEConnRefused = 10061
|
||||
WSAEConnAborted = 10053
|
||||
WSAEConnReset = 10054
|
||||
WSAEGenericError = 10050
|
||||
)
|
||||
|
||||
type RDCleanPathPDU struct {
|
||||
Version int64 `asn1:"tag:0,explicit"`
|
||||
Error RDCleanPathErr `asn1:"tag:1,explicit,optional"`
|
||||
Destination string `asn1:"utf8,tag:2,explicit,optional"`
|
||||
ProxyAuth string `asn1:"utf8,tag:3,explicit,optional"`
|
||||
ServerAuth string `asn1:"utf8,tag:4,explicit,optional"`
|
||||
PreconnectionBlob string `asn1:"utf8,tag:5,explicit,optional"`
|
||||
X224ConnectionPDU []byte `asn1:"tag:6,explicit,optional"`
|
||||
ServerCertChain [][]byte `asn1:"tag:7,explicit,optional"`
|
||||
ServerAddr string `asn1:"utf8,tag:9,explicit,optional"`
|
||||
}
|
||||
|
||||
type RDCleanPathErr struct {
|
||||
ErrorCode int16 `asn1:"tag:0,explicit"`
|
||||
HTTPStatusCode int16 `asn1:"tag:1,explicit,optional"`
|
||||
WSALastError int16 `asn1:"tag:2,explicit,optional"`
|
||||
TLSAlertCode int8 `asn1:"tag:3,explicit,optional"`
|
||||
Version int64 `asn1:"tag:0,explicit"`
|
||||
Error []byte `asn1:"tag:1,explicit,optional"`
|
||||
Destination string `asn1:"utf8,tag:2,explicit,optional"`
|
||||
ProxyAuth string `asn1:"utf8,tag:3,explicit,optional"`
|
||||
ServerAuth string `asn1:"utf8,tag:4,explicit,optional"`
|
||||
PreconnectionBlob string `asn1:"utf8,tag:5,explicit,optional"`
|
||||
X224ConnectionPDU []byte `asn1:"tag:6,explicit,optional"`
|
||||
ServerCertChain [][]byte `asn1:"tag:7,explicit,optional"`
|
||||
ServerAddr string `asn1:"utf8,tag:9,explicit,optional"`
|
||||
}
|
||||
|
||||
type RDCleanPathProxy struct {
|
||||
@@ -228,13 +210,9 @@ func (p *RDCleanPathProxy) handleDirectRDP(conn *proxyConnection, firstPacket []
|
||||
destination := conn.destination
|
||||
log.Infof("Direct RDP mode: Connecting to %s via NetBird", destination)
|
||||
|
||||
ctx, cancel := context.WithTimeout(conn.ctx, rdpDialTimeout)
|
||||
defer cancel()
|
||||
|
||||
rdpConn, err := p.nbClient.Dial(ctx, "tcp", destination)
|
||||
rdpConn, err := p.nbClient.Dial(conn.ctx, "tcp", destination)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to connect to %s: %v", destination, err)
|
||||
p.sendRDCleanPathError(conn, newWSAError(err))
|
||||
return
|
||||
}
|
||||
conn.rdpConn = rdpConn
|
||||
@@ -242,7 +220,6 @@ func (p *RDCleanPathProxy) handleDirectRDP(conn *proxyConnection, firstPacket []
|
||||
_, err = rdpConn.Write(firstPacket)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to write first packet: %v", err)
|
||||
p.sendRDCleanPathError(conn, newWSAError(err))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -250,7 +227,6 @@ func (p *RDCleanPathProxy) handleDirectRDP(conn *proxyConnection, firstPacket []
|
||||
n, err := rdpConn.Read(response)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to read X.224 response: %v", err)
|
||||
p.sendRDCleanPathError(conn, newWSAError(err))
|
||||
return
|
||||
}
|
||||
|
||||
@@ -293,52 +269,3 @@ func (p *RDCleanPathProxy) sendToWebSocket(conn *proxyConnection, data []byte) {
|
||||
conn.wsHandlers.Call("send", uint8Array.Get("buffer"))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RDCleanPathProxy) sendRDCleanPathError(conn *proxyConnection, pdu RDCleanPathPDU) {
|
||||
data, err := asn1.Marshal(pdu)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to marshal error PDU: %v", err)
|
||||
return
|
||||
}
|
||||
p.sendToWebSocket(conn, data)
|
||||
}
|
||||
|
||||
func errorToWSACode(err error) int16 {
|
||||
if err == nil {
|
||||
return WSAEGenericError
|
||||
}
|
||||
var netErr *net.OpError
|
||||
if errors.As(err, &netErr) && netErr.Timeout() {
|
||||
return WSAETimedOut
|
||||
}
|
||||
if errors.Is(err, context.DeadlineExceeded) {
|
||||
return WSAETimedOut
|
||||
}
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return WSAEConnAborted
|
||||
}
|
||||
if errors.Is(err, io.EOF) {
|
||||
return WSAEConnReset
|
||||
}
|
||||
return WSAEGenericError
|
||||
}
|
||||
|
||||
func newWSAError(err error) RDCleanPathPDU {
|
||||
return RDCleanPathPDU{
|
||||
Version: RDCleanPathVersion,
|
||||
Error: RDCleanPathErr{
|
||||
ErrorCode: GeneralErrorCode,
|
||||
WSALastError: errorToWSACode(err),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newHTTPError(statusCode int16) RDCleanPathPDU {
|
||||
return RDCleanPathPDU{
|
||||
Version: RDCleanPathVersion,
|
||||
Error: RDCleanPathErr{
|
||||
ErrorCode: GeneralErrorCode,
|
||||
HTTPStatusCode: statusCode,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3,7 +3,6 @@
|
||||
package rdp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"encoding/asn1"
|
||||
"io"
|
||||
@@ -12,17 +11,11 @@ import (
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
const (
|
||||
// MS-RDPBCGR: confusingly named, actually means PROTOCOL_HYBRID (CredSSP)
|
||||
protocolSSL = 0x00000001
|
||||
protocolHybridEx = 0x00000008
|
||||
)
|
||||
|
||||
func (p *RDCleanPathProxy) processRDCleanPathPDU(conn *proxyConnection, pdu RDCleanPathPDU) {
|
||||
log.Infof("Processing RDCleanPath PDU: Version=%d, Destination=%s", pdu.Version, pdu.Destination)
|
||||
|
||||
if pdu.Version != RDCleanPathVersion {
|
||||
p.sendRDCleanPathError(conn, newHTTPError(400))
|
||||
p.sendRDCleanPathError(conn, "Unsupported version")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -31,13 +24,10 @@ func (p *RDCleanPathProxy) processRDCleanPathPDU(conn *proxyConnection, pdu RDCl
|
||||
destination = pdu.Destination
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(conn.ctx, rdpDialTimeout)
|
||||
defer cancel()
|
||||
|
||||
rdpConn, err := p.nbClient.Dial(ctx, "tcp", destination)
|
||||
rdpConn, err := p.nbClient.Dial(conn.ctx, "tcp", destination)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to connect to %s: %v", destination, err)
|
||||
p.sendRDCleanPathError(conn, newWSAError(err))
|
||||
p.sendRDCleanPathError(conn, "Connection failed")
|
||||
p.cleanupConnection(conn)
|
||||
return
|
||||
}
|
||||
@@ -50,34 +40,6 @@ func (p *RDCleanPathProxy) processRDCleanPathPDU(conn *proxyConnection, pdu RDCl
|
||||
p.setupTLSConnection(conn, pdu)
|
||||
}
|
||||
|
||||
// detectCredSSPFromX224 checks if the X.224 response indicates NLA/CredSSP is required.
|
||||
// Per MS-RDPBCGR spec: byte 11 = TYPE_RDP_NEG_RSP (0x02), bytes 15-18 = selectedProtocol flags.
|
||||
// Returns (requiresTLS12, selectedProtocol, detectionSuccessful).
|
||||
func (p *RDCleanPathProxy) detectCredSSPFromX224(x224Response []byte) (bool, uint32, bool) {
|
||||
const minResponseLength = 19
|
||||
|
||||
if len(x224Response) < minResponseLength {
|
||||
return false, 0, false
|
||||
}
|
||||
|
||||
// Per X.224 specification:
|
||||
// x224Response[0] == 0x03: Length of X.224 header (3 bytes)
|
||||
// x224Response[5] == 0xD0: X.224 Data TPDU code
|
||||
if x224Response[0] != 0x03 || x224Response[5] != 0xD0 {
|
||||
return false, 0, false
|
||||
}
|
||||
|
||||
if x224Response[11] == 0x02 {
|
||||
flags := uint32(x224Response[15]) | uint32(x224Response[16])<<8 |
|
||||
uint32(x224Response[17])<<16 | uint32(x224Response[18])<<24
|
||||
|
||||
hasNLA := (flags & (protocolSSL | protocolHybridEx)) != 0
|
||||
return hasNLA, flags, true
|
||||
}
|
||||
|
||||
return false, 0, false
|
||||
}
|
||||
|
||||
func (p *RDCleanPathProxy) setupTLSConnection(conn *proxyConnection, pdu RDCleanPathPDU) {
|
||||
var x224Response []byte
|
||||
if len(pdu.X224ConnectionPDU) > 0 {
|
||||
@@ -85,7 +47,7 @@ func (p *RDCleanPathProxy) setupTLSConnection(conn *proxyConnection, pdu RDClean
|
||||
_, err := conn.rdpConn.Write(pdu.X224ConnectionPDU)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to write X.224 PDU: %v", err)
|
||||
p.sendRDCleanPathError(conn, newWSAError(err))
|
||||
p.sendRDCleanPathError(conn, "Failed to forward X.224")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -93,32 +55,21 @@ func (p *RDCleanPathProxy) setupTLSConnection(conn *proxyConnection, pdu RDClean
|
||||
n, err := conn.rdpConn.Read(response)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to read X.224 response: %v", err)
|
||||
p.sendRDCleanPathError(conn, newWSAError(err))
|
||||
p.sendRDCleanPathError(conn, "Failed to read X.224 response")
|
||||
return
|
||||
}
|
||||
x224Response = response[:n]
|
||||
log.Debugf("Received X.224 Connection Confirm (%d bytes)", n)
|
||||
}
|
||||
|
||||
requiresCredSSP, selectedProtocol, detected := p.detectCredSSPFromX224(x224Response)
|
||||
if detected {
|
||||
if requiresCredSSP {
|
||||
log.Warnf("Detected NLA/CredSSP (selectedProtocol: 0x%08X), forcing TLS 1.2 for compatibility", selectedProtocol)
|
||||
} else {
|
||||
log.Warnf("No NLA/CredSSP detected (selectedProtocol: 0x%08X), allowing up to TLS 1.3", selectedProtocol)
|
||||
}
|
||||
} else {
|
||||
log.Warnf("Could not detect RDP security protocol, allowing up to TLS 1.3")
|
||||
}
|
||||
|
||||
tlsConfig := p.getTLSConfigWithValidation(conn, requiresCredSSP)
|
||||
tlsConfig := p.getTLSConfigWithValidation(conn)
|
||||
|
||||
tlsConn := tls.Client(conn.rdpConn, tlsConfig)
|
||||
conn.tlsConn = tlsConn
|
||||
|
||||
if err := tlsConn.Handshake(); err != nil {
|
||||
log.Errorf("TLS handshake failed: %v", err)
|
||||
p.sendRDCleanPathError(conn, newWSAError(err))
|
||||
p.sendRDCleanPathError(conn, "TLS handshake failed")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -155,6 +106,47 @@ func (p *RDCleanPathProxy) setupTLSConnection(conn *proxyConnection, pdu RDClean
|
||||
p.cleanupConnection(conn)
|
||||
}
|
||||
|
||||
func (p *RDCleanPathProxy) setupPlainConnection(conn *proxyConnection, pdu RDCleanPathPDU) {
|
||||
if len(pdu.X224ConnectionPDU) > 0 {
|
||||
log.Debugf("Forwarding X.224 Connection Request (%d bytes)", len(pdu.X224ConnectionPDU))
|
||||
_, err := conn.rdpConn.Write(pdu.X224ConnectionPDU)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to write X.224 PDU: %v", err)
|
||||
p.sendRDCleanPathError(conn, "Failed to forward X.224")
|
||||
return
|
||||
}
|
||||
|
||||
response := make([]byte, 1024)
|
||||
n, err := conn.rdpConn.Read(response)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to read X.224 response: %v", err)
|
||||
p.sendRDCleanPathError(conn, "Failed to read X.224 response")
|
||||
return
|
||||
}
|
||||
|
||||
responsePDU := RDCleanPathPDU{
|
||||
Version: RDCleanPathVersion,
|
||||
X224ConnectionPDU: response[:n],
|
||||
ServerAddr: conn.destination,
|
||||
}
|
||||
|
||||
p.sendRDCleanPathPDU(conn, responsePDU)
|
||||
} else {
|
||||
responsePDU := RDCleanPathPDU{
|
||||
Version: RDCleanPathVersion,
|
||||
ServerAddr: conn.destination,
|
||||
}
|
||||
p.sendRDCleanPathPDU(conn, responsePDU)
|
||||
}
|
||||
|
||||
go p.forwardConnToWS(conn, conn.rdpConn, "TCP")
|
||||
go p.forwardWSToConn(conn, conn.rdpConn, "TCP")
|
||||
|
||||
<-conn.ctx.Done()
|
||||
log.Debug("TCP connection context done, cleaning up")
|
||||
p.cleanupConnection(conn)
|
||||
}
|
||||
|
||||
func (p *RDCleanPathProxy) sendRDCleanPathPDU(conn *proxyConnection, pdu RDCleanPathPDU) {
|
||||
data, err := asn1.Marshal(pdu)
|
||||
if err != nil {
|
||||
@@ -166,6 +158,21 @@ func (p *RDCleanPathProxy) sendRDCleanPathPDU(conn *proxyConnection, pdu RDClean
|
||||
p.sendToWebSocket(conn, data)
|
||||
}
|
||||
|
||||
func (p *RDCleanPathProxy) sendRDCleanPathError(conn *proxyConnection, errorMsg string) {
|
||||
pdu := RDCleanPathPDU{
|
||||
Version: RDCleanPathVersion,
|
||||
Error: []byte(errorMsg),
|
||||
}
|
||||
|
||||
data, err := asn1.Marshal(pdu)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to marshal error PDU: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
p.sendToWebSocket(conn, data)
|
||||
}
|
||||
|
||||
func (p *RDCleanPathProxy) readWebSocketMessage(conn *proxyConnection) ([]byte, error) {
|
||||
msgChan := make(chan []byte)
|
||||
errChan := make(chan error)
|
||||
|
||||
4
go.mod
4
go.mod
@@ -62,8 +62,8 @@ require (
|
||||
github.com/miekg/dns v1.1.59
|
||||
github.com/mitchellh/hashstructure/v2 v2.0.2
|
||||
github.com/nadoo/ipset v0.5.0
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20251010134843-7af36217ac1f
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-f87a07690ba0
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250930114722-bab681dd3a96
|
||||
github.com/okta/okta-sdk-golang/v2 v2.18.0
|
||||
github.com/oschwald/maxminddb-golang v1.12.0
|
||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||
|
||||
8
go.sum
8
go.sum
@@ -503,12 +503,12 @@ github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944 h1:TDtJKmM6S
|
||||
github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944/go.mod h1:sHA6TRxjQ6RLbnI+3R4DZo2Eseg/iKiPRfNmcuNySVQ=
|
||||
github.com/netbirdio/ice/v4 v4.0.0-20250908184934-6202be846b51 h1:Ov4qdafATOgGMB1wbSuh+0aAHcwz9hdvB6VZjh1mVMI=
|
||||
github.com/netbirdio/ice/v4 v4.0.0-20250908184934-6202be846b51/go.mod h1:ZSIbPdBn5hePO8CpF1PekH2SfpTxg1PDhEwtbqZS7R8=
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20251010134843-7af36217ac1f h1:XIpRDlpPz3zFUkpwaqDRHjwpQRsf2ZKHggoex1MTafs=
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20251010134843-7af36217ac1f/go.mod h1:v0nUbbHbuQnqR7yKIYnKzsLBCswLtp2JctmKYmGgVhc=
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-f87a07690ba0 h1:9BUqQHPVOGr0edk8EifUBUfTr2Ob0ypAPxtasUApBxQ=
|
||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-f87a07690ba0/go.mod h1:v0nUbbHbuQnqR7yKIYnKzsLBCswLtp2JctmKYmGgVhc=
|
||||
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502 h1:3tHlFmhTdX9axERMVN63dqyFqnvuD+EMJHzM7mNGON8=
|
||||
github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM=
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45 h1:ujgviVYmx243Ksy7NdSwrdGPSRNE3pb8kEDSpH0QuAQ=
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45/go.mod h1:5/sjFmLb8O96B5737VCqhHyGRzNFIaN/Bu7ZodXc3qQ=
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250930114722-bab681dd3a96 h1:rPu2HVjtRZs/GloIz6h4tcNr/9Fq55SBBAlz6uOExt8=
|
||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250930114722-bab681dd3a96/go.mod h1:ap83I3Rs3SLND/58j9X+a3ixA9d9ztainwW01ExEw0w=
|
||||
github.com/netbirdio/wireguard-go v0.0.0-20241230120307-6a676aebaaf6 h1:X5h5QgP7uHAv78FWgHV8+WYLjHxK9v3ilkVXT1cpCrQ=
|
||||
github.com/netbirdio/wireguard-go v0.0.0-20241230120307-6a676aebaaf6/go.mod h1:tkCQ4FQXmpAgYVh++1cq16/dH4QJtmvpRv19DWGAHSA=
|
||||
github.com/nicksnyder/go-i18n/v2 v2.4.0 h1:3IcvPOAvnCKwNm0TB0dLDTuawWEj+ax/RERNC+diLMM=
|
||||
|
||||
@@ -49,7 +49,6 @@ services:
|
||||
- traefik.http.routers.netbird-wsproxy-signal.service=netbird-wsproxy-signal
|
||||
- traefik.http.services.netbird-wsproxy-signal.loadbalancer.server.port=80
|
||||
- traefik.http.routers.netbird-signal.rule=Host(`$NETBIRD_DOMAIN`) && PathPrefix(`/signalexchange.SignalExchange/`)
|
||||
- traefik.http.routers.netbird-signal.service=netbird-signal
|
||||
- traefik.http.services.netbird-signal.loadbalancer.server.port=10000
|
||||
- traefik.http.services.netbird-signal.loadbalancer.server.scheme=h2c
|
||||
|
||||
|
||||
@@ -136,7 +136,7 @@ func (a MockIntegratedValidator) GetValidatedPeers(_ context.Context, accountID
|
||||
return validatedPeers, nil
|
||||
}
|
||||
|
||||
func (MockIntegratedValidator) PreparePeer(_ context.Context, accountID string, peer *nbpeer.Peer, peersGroup []string, extraSettings *types.ExtraSettings, temporary bool) *nbpeer.Peer {
|
||||
func (MockIntegratedValidator) PreparePeer(_ context.Context, accountID string, peer *nbpeer.Peer, peersGroup []string, extraSettings *types.ExtraSettings) *nbpeer.Peer {
|
||||
return peer
|
||||
}
|
||||
|
||||
|
||||
@@ -3,16 +3,16 @@ package integrated_validator
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
"github.com/netbirdio/netbird/shared/management/proto"
|
||||
)
|
||||
|
||||
// IntegratedValidator interface exists to avoid the circle dependencies
|
||||
type IntegratedValidator interface {
|
||||
ValidateExtraSettings(ctx context.Context, newExtraSettings *types.ExtraSettings, oldExtraSettings *types.ExtraSettings, peers map[string]*nbpeer.Peer, userID string, accountID string) error
|
||||
ValidatePeer(ctx context.Context, update *nbpeer.Peer, peer *nbpeer.Peer, userID string, accountID string, dnsDomain string, peersGroup []string, extraSettings *types.ExtraSettings) (*nbpeer.Peer, bool, error)
|
||||
PreparePeer(ctx context.Context, accountID string, peer *nbpeer.Peer, peersGroup []string, extraSettings *types.ExtraSettings, temporary bool) *nbpeer.Peer
|
||||
PreparePeer(ctx context.Context, accountID string, peer *nbpeer.Peer, peersGroup []string, extraSettings *types.ExtraSettings) *nbpeer.Peer
|
||||
IsNotValidPeer(ctx context.Context, accountID string, peer *nbpeer.Peer, peersGroup []string, extraSettings *types.ExtraSettings) (bool, bool, error)
|
||||
GetValidatedPeers(ctx context.Context, accountID string, groups []*types.Group, peers []*nbpeer.Peer, extraSettings *types.ExtraSettings) (map[string]struct{}, error)
|
||||
PeerDeleted(ctx context.Context, accountID, peerID string, extraSettings *types.ExtraSettings) error
|
||||
|
||||
@@ -350,6 +350,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
|
||||
}
|
||||
|
||||
var peer *nbpeer.Peer
|
||||
var updateAccountPeers bool
|
||||
var eventsToStore []func()
|
||||
|
||||
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
@@ -362,6 +363,11 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
|
||||
return err
|
||||
}
|
||||
|
||||
updateAccountPeers, err = isPeerInActiveGroup(ctx, transaction, accountID, peerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
eventsToStore, err = deletePeers(ctx, am, transaction, accountID, userID, []*nbpeer.Peer{peer})
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete peer: %w", err)
|
||||
@@ -381,7 +387,7 @@ func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peer
|
||||
storeEvent()
|
||||
}
|
||||
|
||||
if userID != activity.SystemInitiator {
|
||||
if updateAccountPeers && userID != activity.SystemInitiator {
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
@@ -578,7 +584,7 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, accountID, setupKe
|
||||
}
|
||||
}
|
||||
|
||||
newPeer = am.integratedPeerValidator.PreparePeer(ctx, accountID, newPeer, groupsToAdd, settings.Extra, temporary)
|
||||
newPeer = am.integratedPeerValidator.PreparePeer(ctx, accountID, newPeer, groupsToAdd, settings.Extra)
|
||||
|
||||
network, err := am.Store.GetAccountNetwork(ctx, store.LockingStrengthNone, accountID)
|
||||
if err != nil {
|
||||
@@ -678,6 +684,11 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, accountID, setupKe
|
||||
return nil, nil, nil, fmt.Errorf("failed to add peer to database after %d attempts: %w", maxAttempts, err)
|
||||
}
|
||||
|
||||
updateAccountPeers, err := isPeerInActiveGroup(ctx, am.Store, accountID, newPeer.ID)
|
||||
if err != nil {
|
||||
updateAccountPeers = true
|
||||
}
|
||||
|
||||
if newPeer == nil {
|
||||
return nil, nil, nil, fmt.Errorf("new peer is nil")
|
||||
}
|
||||
@@ -690,7 +701,9 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, accountID, setupKe
|
||||
|
||||
am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)
|
||||
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
if updateAccountPeers {
|
||||
am.BufferUpdateAccountPeers(ctx, accountID)
|
||||
}
|
||||
|
||||
return am.getValidatedPeerWithMap(ctx, false, accountID, newPeer)
|
||||
}
|
||||
@@ -1514,6 +1527,16 @@ func getPeerGroupIDs(ctx context.Context, transaction store.Store, accountID str
|
||||
return transaction.GetPeerGroupIDs(ctx, store.LockingStrengthNone, accountID, peerID)
|
||||
}
|
||||
|
||||
// IsPeerInActiveGroup checks if the given peer is part of a group that is used
|
||||
// in an active DNS, route, or ACL configuration.
|
||||
func isPeerInActiveGroup(ctx context.Context, transaction store.Store, accountID, peerID string) (bool, error) {
|
||||
peerGroupIDs, err := getPeerGroupIDs(ctx, transaction, accountID, peerID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return areGroupChangesAffectPeers(ctx, transaction, accountID, peerGroupIDs) // TODO: use transaction
|
||||
}
|
||||
|
||||
// deletePeers deletes all specified peers and sends updates to the remote peers.
|
||||
// Returns a slice of functions to save events after successful peer deletion.
|
||||
func deletePeers(ctx context.Context, am *DefaultAccountManager, transaction store.Store, accountID, userID string, peers []*nbpeer.Peer) ([]func(), error) {
|
||||
|
||||
@@ -1790,7 +1790,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
t.Run("adding peer to unlinked group", func(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg) //
|
||||
peerShouldNotReceiveUpdate(t, updMsg) //
|
||||
close(done)
|
||||
}()
|
||||
|
||||
@@ -1815,7 +1815,7 @@ func TestPeerAccountPeersUpdate(t *testing.T) {
|
||||
t.Run("deleting peer with unlinked group", func(t *testing.T) {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
peerShouldReceiveUpdate(t, updMsg)
|
||||
peerShouldNotReceiveUpdate(t, updMsg)
|
||||
close(done)
|
||||
}()
|
||||
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -4,7 +4,6 @@ import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
)
|
||||
|
||||
@@ -15,8 +14,6 @@ type StoreMetrics struct {
|
||||
persistenceDurationMicro metric.Int64Histogram
|
||||
persistenceDurationMs metric.Int64Histogram
|
||||
transactionDurationMs metric.Int64Histogram
|
||||
queryDurationMs metric.Int64Histogram
|
||||
queryCounter metric.Int64Counter
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
@@ -62,29 +59,12 @@ func NewStoreMetrics(ctx context.Context, meter metric.Meter) (*StoreMetrics, er
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queryDurationMs, err := meter.Int64Histogram("management.store.query.duration.ms",
|
||||
metric.WithUnit("milliseconds"),
|
||||
metric.WithDescription("Duration of database query operations with operation type and table name"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
queryCounter, err := meter.Int64Counter("management.store.query.count",
|
||||
metric.WithDescription("Count of database query operations with operation type, table name, and status"),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &StoreMetrics{
|
||||
globalLockAcquisitionDurationMicro: globalLockAcquisitionDurationMicro,
|
||||
globalLockAcquisitionDurationMs: globalLockAcquisitionDurationMs,
|
||||
persistenceDurationMicro: persistenceDurationMicro,
|
||||
persistenceDurationMs: persistenceDurationMs,
|
||||
transactionDurationMs: transactionDurationMs,
|
||||
queryDurationMs: queryDurationMs,
|
||||
queryCounter: queryCounter,
|
||||
ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
@@ -105,13 +85,3 @@ func (metrics *StoreMetrics) CountPersistenceDuration(duration time.Duration) {
|
||||
func (metrics *StoreMetrics) CountTransactionDuration(duration time.Duration) {
|
||||
metrics.transactionDurationMs.Record(metrics.ctx, duration.Milliseconds())
|
||||
}
|
||||
|
||||
// CountStoreOperation records a store operation with its method name, status, and duration
|
||||
func (metrics *StoreMetrics) CountStoreOperation(method string, duration time.Duration) {
|
||||
attrs := []attribute.KeyValue{
|
||||
attribute.String("method", method),
|
||||
}
|
||||
|
||||
metrics.queryDurationMs.Record(metrics.ctx, duration.Milliseconds(), metric.WithAttributes(attrs...))
|
||||
metrics.queryCounter.Add(metrics.ctx, 1, metric.WithAttributes(attrs...))
|
||||
}
|
||||
|
||||
@@ -35,6 +35,7 @@ type Client interface {
|
||||
WaitStreamConnected()
|
||||
SendToStream(msg *proto.EncryptedMessage) error
|
||||
Send(msg *proto.Message) error
|
||||
SendWithDeliveryCheck(msg *proto.Message) error
|
||||
SetOnReconnectedListener(func())
|
||||
}
|
||||
|
||||
|
||||
@@ -199,7 +199,7 @@ func startSignal() (*grpc.Server, net.Listener) {
|
||||
panic(err)
|
||||
}
|
||||
s := grpc.NewServer()
|
||||
srv, err := server.NewServer(context.Background(), otel.Meter(""))
|
||||
srv, err := server.NewServer(context.Background(), otel.Meter(""), nil)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
@@ -23,6 +24,11 @@ import (
|
||||
"github.com/netbirdio/netbird/util/wsproxy"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrPeerNotAvailable = errors.New("peer not available")
|
||||
ErrUnimplementedMethod = errors.New("the signal client does not support SendWithDeliveryCheck")
|
||||
)
|
||||
|
||||
// ConnStateNotifier is a wrapper interface of the status recorder
|
||||
type ConnStateNotifier interface {
|
||||
MarkSignalDisconnected(error)
|
||||
@@ -397,6 +403,36 @@ func (c *GrpcClient) Send(msg *proto.Message) error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *GrpcClient) SendWithDeliveryCheck(msg *proto.Message) error {
|
||||
if !c.Ready() {
|
||||
return fmt.Errorf("no connection to signal")
|
||||
}
|
||||
|
||||
encryptedMessage, err := c.encryptMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(c.ctx, client.ConnectTimeout)
|
||||
defer cancel()
|
||||
|
||||
_, err = c.realClient.SendWithDeliveryCheck(ctx, encryptedMessage)
|
||||
if err != nil {
|
||||
if st, ok := status.FromError(err); ok {
|
||||
switch st.Code() {
|
||||
case codes.NotFound:
|
||||
return ErrPeerNotAvailable
|
||||
case codes.Unimplemented:
|
||||
return ErrUnimplementedMethod
|
||||
default:
|
||||
return fmt.Errorf("grpc error %s: %w", st.Code(), err)
|
||||
}
|
||||
}
|
||||
return err // Not a gRPC status error
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// receive receives messages from other peers coming through the Signal Exchange
|
||||
// and distributes them to worker threads for processing
|
||||
func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient) error {
|
||||
|
||||
@@ -16,6 +16,7 @@ type MockClient struct {
|
||||
SendToStreamFunc func(msg *proto.EncryptedMessage) error
|
||||
SendFunc func(msg *proto.Message) error
|
||||
SetOnReconnectedListenerFunc func(f func())
|
||||
SendWithDeliveryCheckFn func(msg *proto.Message) error
|
||||
}
|
||||
|
||||
// SetOnReconnectedListener sets the function to be called when the client reconnects.
|
||||
@@ -82,3 +83,10 @@ func (sm *MockClient) Send(msg *proto.Message) error {
|
||||
}
|
||||
return sm.SendFunc(msg)
|
||||
}
|
||||
|
||||
func (sm *MockClient) SendWithDeliveryCheck(msg *proto.Message) error {
|
||||
if sm.SendWithDeliveryCheckFn == nil {
|
||||
return nil
|
||||
}
|
||||
return sm.SendWithDeliveryCheck(msg)
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
_ "google.golang.org/protobuf/types/descriptorpb"
|
||||
emptypb "google.golang.org/protobuf/types/known/emptypb"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
@@ -439,72 +440,79 @@ var file_signalexchange_proto_rawDesc = []byte{
|
||||
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
|
||||
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x1a, 0x20, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x64, 0x65, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74,
|
||||
0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x10, 0x45, 0x6e, 0x63, 0x72,
|
||||
0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03,
|
||||
0x6b, 0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c,
|
||||
0x0a, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28,
|
||||
0x09, 0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04,
|
||||
0x62, 0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79,
|
||||
0x22, 0x63, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b,
|
||||
0x65, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a,
|
||||
0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62,
|
||||
0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e,
|
||||
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52,
|
||||
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xe4, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
|
||||
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73,
|
||||
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f,
|
||||
0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
|
||||
0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07,
|
||||
0x70, 0x61, 0x79, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x77, 0x67, 0x4c, 0x69, 0x73,
|
||||
0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x77,
|
||||
0x67, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x6e,
|
||||
0x65, 0x74, 0x42, 0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x0e, 0x6e, 0x65, 0x74, 0x42, 0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73,
|
||||
0x69, 0x6f, 0x6e, 0x12, 0x28, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28,
|
||||
0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
|
||||
0x67, 0x65, 0x2e, 0x4d, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x2c, 0x0a,
|
||||
0x11, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74,
|
||||
0x65, 0x64, 0x18, 0x06, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x11, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72,
|
||||
0x65, 0x73, 0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x12, 0x49, 0x0a, 0x0f, 0x72,
|
||||
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07,
|
||||
0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63,
|
||||
0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43,
|
||||
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
|
||||
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
|
||||
0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41,
|
||||
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f,
|
||||
0x6e, 0x49, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x09, 0x73, 0x65, 0x73,
|
||||
0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70,
|
||||
0x65, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06,
|
||||
0x41, 0x4e, 0x53, 0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44,
|
||||
0x49, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10,
|
||||
0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x0c,
|
||||
0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x04,
|
||||
0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01,
|
||||
0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01,
|
||||
0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f,
|
||||
0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
|
||||
0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b,
|
||||
0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
|
||||
0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73,
|
||||
0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72,
|
||||
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73,
|
||||
0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e,
|
||||
0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c,
|
||||
0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65,
|
||||
0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65,
|
||||
0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
|
||||
0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70,
|
||||
0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d,
|
||||
0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e,
|
||||
0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45,
|
||||
0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a,
|
||||
0x6f, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x1a, 0x1b, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65,
|
||||
0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x65, 0x6d, 0x70, 0x74, 0x79, 0x2e,
|
||||
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x56, 0x0a, 0x10, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74,
|
||||
0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79,
|
||||
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x72,
|
||||
0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09,
|
||||
0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x12, 0x0a, 0x04, 0x62, 0x6f, 0x64,
|
||||
0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0x63, 0x0a,
|
||||
0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18,
|
||||
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x1c, 0x0a, 0x09, 0x72, 0x65,
|
||||
0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x72,
|
||||
0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62, 0x6f, 0x64, 0x79,
|
||||
0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65,
|
||||
0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52, 0x04, 0x62, 0x6f,
|
||||
0x64, 0x79, 0x22, 0xe4, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d, 0x0a, 0x04, 0x74,
|
||||
0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73, 0x69, 0x67, 0x6e,
|
||||
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x2e,
|
||||
0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61,
|
||||
0x79, 0x6c, 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x70, 0x61, 0x79,
|
||||
0x6c, 0x6f, 0x61, 0x64, 0x12, 0x22, 0x0a, 0x0c, 0x77, 0x67, 0x4c, 0x69, 0x73, 0x74, 0x65, 0x6e,
|
||||
0x50, 0x6f, 0x72, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0c, 0x77, 0x67, 0x4c, 0x69,
|
||||
0x73, 0x74, 0x65, 0x6e, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x26, 0x0a, 0x0e, 0x6e, 0x65, 0x74, 0x42,
|
||||
0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x0e, 0x6e, 0x65, 0x74, 0x42, 0x69, 0x72, 0x64, 0x56, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e,
|
||||
0x12, 0x28, 0x0a, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14,
|
||||
0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e,
|
||||
0x4d, 0x6f, 0x64, 0x65, 0x52, 0x04, 0x6d, 0x6f, 0x64, 0x65, 0x12, 0x2c, 0x0a, 0x11, 0x66, 0x65,
|
||||
0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x18,
|
||||
0x06, 0x20, 0x03, 0x28, 0x0d, 0x52, 0x11, 0x66, 0x65, 0x61, 0x74, 0x75, 0x72, 0x65, 0x73, 0x53,
|
||||
0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x12, 0x49, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65,
|
||||
0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x18, 0x07, 0x20, 0x01, 0x28,
|
||||
0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
|
||||
0x67, 0x65, 0x2e, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66,
|
||||
0x69, 0x67, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e,
|
||||
0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76,
|
||||
0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72,
|
||||
0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64,
|
||||
0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f,
|
||||
0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x09,
|
||||
0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06, 0x41, 0x4e, 0x53,
|
||||
0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44, 0x49, 0x44, 0x41,
|
||||
0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10, 0x04, 0x12, 0x0b,
|
||||
0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x0c, 0x0a, 0x0a, 0x5f,
|
||||
0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x04, 0x4d, 0x6f, 0x64,
|
||||
0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28,
|
||||
0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01, 0x42, 0x09,
|
||||
0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52, 0x6f, 0x73,
|
||||
0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28, 0x0a, 0x0f,
|
||||
0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x18,
|
||||
0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
|
||||
0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
|
||||
0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18, 0x02, 0x20,
|
||||
0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65,
|
||||
0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0x8e, 0x02, 0x0a, 0x0e, 0x53, 0x69, 0x67,
|
||||
0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a, 0x04, 0x53,
|
||||
0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68,
|
||||
0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65,
|
||||
0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
|
||||
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64,
|
||||
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x53, 0x0a, 0x15, 0x53, 0x65, 0x6e,
|
||||
0x64, 0x57, 0x69, 0x74, 0x68, 0x44, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, 0x79, 0x43, 0x68, 0x65,
|
||||
0x63, 0x6b, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61,
|
||||
0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73,
|
||||
0x73, 0x61, 0x67, 0x65, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x59,
|
||||
0x0a, 0x0d, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12,
|
||||
0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
|
||||
0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
|
||||
0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74,
|
||||
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e,
|
||||
0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73,
|
||||
0x61, 0x67, 0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
@@ -528,6 +536,7 @@ var file_signalexchange_proto_goTypes = []interface{}{
|
||||
(*Body)(nil), // 3: signalexchange.Body
|
||||
(*Mode)(nil), // 4: signalexchange.Mode
|
||||
(*RosenpassConfig)(nil), // 5: signalexchange.RosenpassConfig
|
||||
(*emptypb.Empty)(nil), // 6: google.protobuf.Empty
|
||||
}
|
||||
var file_signalexchange_proto_depIdxs = []int32{
|
||||
3, // 0: signalexchange.Message.body:type_name -> signalexchange.Body
|
||||
@@ -535,11 +544,13 @@ var file_signalexchange_proto_depIdxs = []int32{
|
||||
4, // 2: signalexchange.Body.mode:type_name -> signalexchange.Mode
|
||||
5, // 3: signalexchange.Body.rosenpassConfig:type_name -> signalexchange.RosenpassConfig
|
||||
1, // 4: signalexchange.SignalExchange.Send:input_type -> signalexchange.EncryptedMessage
|
||||
1, // 5: signalexchange.SignalExchange.ConnectStream:input_type -> signalexchange.EncryptedMessage
|
||||
1, // 6: signalexchange.SignalExchange.Send:output_type -> signalexchange.EncryptedMessage
|
||||
1, // 7: signalexchange.SignalExchange.ConnectStream:output_type -> signalexchange.EncryptedMessage
|
||||
6, // [6:8] is the sub-list for method output_type
|
||||
4, // [4:6] is the sub-list for method input_type
|
||||
1, // 5: signalexchange.SignalExchange.SendWithDeliveryCheck:input_type -> signalexchange.EncryptedMessage
|
||||
1, // 6: signalexchange.SignalExchange.ConnectStream:input_type -> signalexchange.EncryptedMessage
|
||||
1, // 7: signalexchange.SignalExchange.Send:output_type -> signalexchange.EncryptedMessage
|
||||
6, // 8: signalexchange.SignalExchange.SendWithDeliveryCheck:output_type -> google.protobuf.Empty
|
||||
1, // 9: signalexchange.SignalExchange.ConnectStream:output_type -> signalexchange.EncryptedMessage
|
||||
7, // [7:10] is the sub-list for method output_type
|
||||
4, // [4:7] is the sub-list for method input_type
|
||||
4, // [4:4] is the sub-list for extension type_name
|
||||
4, // [4:4] is the sub-list for extension extendee
|
||||
0, // [0:4] is the sub-list for field type_name
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
syntax = "proto3";
|
||||
|
||||
import "google/protobuf/descriptor.proto";
|
||||
import "google/protobuf/empty.proto";
|
||||
|
||||
option go_package = "/proto";
|
||||
|
||||
@@ -9,6 +10,7 @@ package signalexchange;
|
||||
service SignalExchange {
|
||||
// Synchronously connect to the Signal Exchange service offering connection candidates and waiting for connection candidates from the other party (remote peer)
|
||||
rpc Send(EncryptedMessage) returns (EncryptedMessage) {}
|
||||
rpc SendWithDeliveryCheck(EncryptedMessage) returns (google.protobuf.Empty) {}
|
||||
// Connect to the Signal Exchange service offering connection candidates and maintain a channel for receiving candidates from the other party (remote peer)
|
||||
rpc ConnectStream(stream EncryptedMessage) returns (stream EncryptedMessage) {}
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
emptypb "google.golang.org/protobuf/types/known/emptypb"
|
||||
)
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
@@ -20,6 +21,7 @@ const _ = grpc.SupportPackageIsVersion7
|
||||
type SignalExchangeClient interface {
|
||||
// Synchronously connect to the Signal Exchange service offering connection candidates and waiting for connection candidates from the other party (remote peer)
|
||||
Send(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*EncryptedMessage, error)
|
||||
SendWithDeliveryCheck(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*emptypb.Empty, error)
|
||||
// Connect to the Signal Exchange service offering connection candidates and maintain a channel for receiving candidates from the other party (remote peer)
|
||||
ConnectStream(ctx context.Context, opts ...grpc.CallOption) (SignalExchange_ConnectStreamClient, error)
|
||||
}
|
||||
@@ -41,6 +43,15 @@ func (c *signalExchangeClient) Send(ctx context.Context, in *EncryptedMessage, o
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *signalExchangeClient) SendWithDeliveryCheck(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*emptypb.Empty, error) {
|
||||
out := new(emptypb.Empty)
|
||||
err := c.cc.Invoke(ctx, "/signalexchange.SignalExchange/SendWithDeliveryCheck", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *signalExchangeClient) ConnectStream(ctx context.Context, opts ...grpc.CallOption) (SignalExchange_ConnectStreamClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, &SignalExchange_ServiceDesc.Streams[0], "/signalexchange.SignalExchange/ConnectStream", opts...)
|
||||
if err != nil {
|
||||
@@ -78,6 +89,7 @@ func (x *signalExchangeConnectStreamClient) Recv() (*EncryptedMessage, error) {
|
||||
type SignalExchangeServer interface {
|
||||
// Synchronously connect to the Signal Exchange service offering connection candidates and waiting for connection candidates from the other party (remote peer)
|
||||
Send(context.Context, *EncryptedMessage) (*EncryptedMessage, error)
|
||||
SendWithDeliveryCheck(context.Context, *EncryptedMessage) (*emptypb.Empty, error)
|
||||
// Connect to the Signal Exchange service offering connection candidates and maintain a channel for receiving candidates from the other party (remote peer)
|
||||
ConnectStream(SignalExchange_ConnectStreamServer) error
|
||||
mustEmbedUnimplementedSignalExchangeServer()
|
||||
@@ -90,6 +102,9 @@ type UnimplementedSignalExchangeServer struct {
|
||||
func (UnimplementedSignalExchangeServer) Send(context.Context, *EncryptedMessage) (*EncryptedMessage, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Send not implemented")
|
||||
}
|
||||
func (UnimplementedSignalExchangeServer) SendWithDeliveryCheck(context.Context, *EncryptedMessage) (*emptypb.Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method SendWithDeliveryCheck not implemented")
|
||||
}
|
||||
func (UnimplementedSignalExchangeServer) ConnectStream(SignalExchange_ConnectStreamServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method ConnectStream not implemented")
|
||||
}
|
||||
@@ -124,6 +139,24 @@ func _SignalExchange_Send_Handler(srv interface{}, ctx context.Context, dec func
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _SignalExchange_SendWithDeliveryCheck_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(EncryptedMessage)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(SignalExchangeServer).SendWithDeliveryCheck(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/signalexchange.SignalExchange/SendWithDeliveryCheck",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(SignalExchangeServer).SendWithDeliveryCheck(ctx, req.(*EncryptedMessage))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _SignalExchange_ConnectStream_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(SignalExchangeServer).ConnectStream(&signalExchangeConnectStreamServer{stream})
|
||||
}
|
||||
@@ -161,6 +194,10 @@ var SignalExchange_ServiceDesc = grpc.ServiceDesc{
|
||||
MethodName: "Send",
|
||||
Handler: _SignalExchange_Send_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "SendWithDeliveryCheck",
|
||||
Handler: _SignalExchange_SendWithDeliveryCheck_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
|
||||
@@ -2,6 +2,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
@@ -9,6 +10,19 @@ import (
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
||||
func EnvDisableSendWithDeliveryCheck() bool {
|
||||
envVar := "NB_DISABLE_SEND_WITH_DELIVERY_CHECK"
|
||||
value, present := os.LookupEnv(envVar)
|
||||
if !present {
|
||||
return false
|
||||
}
|
||||
|
||||
if parsed, err := strconv.ParseBool(value); err == nil {
|
||||
return parsed
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// setFlagsFromEnvVars reads and updates flag values from environment variables with prefix NB_
|
||||
func setFlagsFromEnvVars(cmd *cobra.Command) {
|
||||
flags := cmd.PersistentFlags()
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"net/http"
|
||||
// nolint:gosec
|
||||
_ "net/http/pprof"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
||||
@@ -92,7 +93,9 @@ var (
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
flag.Parse()
|
||||
|
||||
startPprof()
|
||||
if os.Getenv("NB_PPROF_ENABLE") == "true" {
|
||||
startPprof()
|
||||
}
|
||||
|
||||
opts, certManager, err := getTLSConfigurations()
|
||||
if err != nil {
|
||||
@@ -114,7 +117,11 @@ var (
|
||||
}
|
||||
}()
|
||||
|
||||
srv, err := server.NewServer(cmd.Context(), metricsServer.Meter)
|
||||
optsSignal := &server.Options{
|
||||
DisableSendWithDeliveryCheck: EnvDisableSendWithDeliveryCheck(),
|
||||
}
|
||||
|
||||
srv, err := server.NewServer(cmd.Context(), metricsServer.Meter, optsSignal)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating signal server: %v", err)
|
||||
}
|
||||
@@ -150,7 +157,7 @@ var (
|
||||
serveHTTP(httpListener, grpcRootHandler)
|
||||
}
|
||||
|
||||
if signalPort != legacyGRPCPort {
|
||||
if signalPort != legacyGRPCPort && os.Getenv("NB_DISABLE_FALLBACK_GRPC") != "true" {
|
||||
// The Signal gRPC server was running on port 10000 previously. Old agents that are already connected to Signal
|
||||
// are using port 10000. For compatibility purposes we keep running a 2nd gRPC server on port 10000.
|
||||
compatListener, err = serveGRPC(grpcServer, legacyGRPCPort)
|
||||
|
||||
@@ -14,6 +14,7 @@ import (
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
gproto "google.golang.org/protobuf/proto"
|
||||
"google.golang.org/protobuf/types/known/emptypb"
|
||||
|
||||
"github.com/netbirdio/signal-dispatcher/dispatcher"
|
||||
|
||||
@@ -22,6 +23,8 @@ import (
|
||||
"github.com/netbirdio/netbird/signal/peer"
|
||||
)
|
||||
|
||||
var ErrPeerNotConnected = errors.New("peer not connected")
|
||||
|
||||
const (
|
||||
labelType = "type"
|
||||
labelTypeError = "error"
|
||||
@@ -49,20 +52,32 @@ var (
|
||||
ErrPeerRegisteredAgain = errors.New("peer registered again")
|
||||
)
|
||||
|
||||
type Options struct {
|
||||
// Disable SendWithDeliveryCheck method
|
||||
DisableSendWithDeliveryCheck bool
|
||||
}
|
||||
|
||||
// Server an instance of a Signal server
|
||||
type Server struct {
|
||||
metrics *metrics.AppMetrics
|
||||
disableSendWithDeliveryCheck bool
|
||||
|
||||
registry *peer.Registry
|
||||
proto.UnimplementedSignalExchangeServer
|
||||
dispatcher *dispatcher.Dispatcher
|
||||
metrics *metrics.AppMetrics
|
||||
|
||||
successHeader metadata.MD
|
||||
|
||||
sendTimeout time.Duration
|
||||
sendTimeout time.Duration
|
||||
directSendDisabled bool
|
||||
}
|
||||
|
||||
// NewServer creates a new Signal server
|
||||
func NewServer(ctx context.Context, meter metric.Meter) (*Server, error) {
|
||||
func NewServer(ctx context.Context, meter metric.Meter, opts *Options) (*Server, error) {
|
||||
if opts == nil {
|
||||
opts = &Options{}
|
||||
}
|
||||
|
||||
appMetrics, err := metrics.NewAppMetrics(meter)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating app metrics: %v", err)
|
||||
@@ -81,11 +96,21 @@ func NewServer(ctx context.Context, meter metric.Meter) (*Server, error) {
|
||||
}
|
||||
|
||||
s := &Server{
|
||||
dispatcher: d,
|
||||
registry: peer.NewRegistry(appMetrics),
|
||||
metrics: appMetrics,
|
||||
successHeader: metadata.Pairs(proto.HeaderRegistered, "1"),
|
||||
sendTimeout: sTimeout,
|
||||
dispatcher: d,
|
||||
registry: peer.NewRegistry(appMetrics),
|
||||
metrics: appMetrics,
|
||||
successHeader: metadata.Pairs(proto.HeaderRegistered, "1"),
|
||||
sendTimeout: sTimeout,
|
||||
disableSendWithDeliveryCheck: opts.DisableSendWithDeliveryCheck,
|
||||
}
|
||||
|
||||
if directSendDisabled := os.Getenv("NB_SIGNAL_DIRECT_SEND_DISABLED"); directSendDisabled == "true" {
|
||||
s.directSendDisabled = true
|
||||
log.Warn("direct send to connected peers is disabled")
|
||||
}
|
||||
|
||||
if opts.DisableSendWithDeliveryCheck {
|
||||
log.Warn("SendWithDeliveryCheck method is disabled")
|
||||
}
|
||||
|
||||
return s, nil
|
||||
@@ -95,12 +120,51 @@ func NewServer(ctx context.Context, meter metric.Meter) (*Server, error) {
|
||||
func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) {
|
||||
log.Tracef("received a new message to send from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||
|
||||
if _, found := s.registry.Get(msg.RemoteKey); found {
|
||||
s.forwardMessageToPeer(ctx, msg)
|
||||
if _, found := s.registry.Get(msg.RemoteKey); found && !s.directSendDisabled {
|
||||
_ = s.forwardMessageToPeer(ctx, msg)
|
||||
return &proto.EncryptedMessage{}, nil
|
||||
}
|
||||
|
||||
return s.dispatcher.SendMessage(ctx, msg)
|
||||
if _, err := s.dispatcher.SendMessage(ctx, msg, false); err != nil {
|
||||
log.Errorf("error sending message via dispatcher: %v", err)
|
||||
}
|
||||
return &proto.EncryptedMessage{}, nil
|
||||
}
|
||||
|
||||
// SendWithDeliveryCheck forwards a message to the signal peer with error handling
|
||||
// When the remote peer is not connected it returns codes.NotFound error, otherwise it returns other types of errors
|
||||
// that can be retried. In case codes.NotFound is returned the caller should not retry sending the message. The remote
|
||||
// peer should send a new offer to re-establish the connection when it comes back online.
|
||||
// Todo: double check the thread safe registry management. When both peer come online at the same time then both peers
|
||||
// might not be registered yet when the first message is sent.
|
||||
func (s *Server) SendWithDeliveryCheck(ctx context.Context, msg *proto.EncryptedMessage) (*emptypb.Empty, error) {
|
||||
if s.disableSendWithDeliveryCheck {
|
||||
log.Tracef("SendWithDeliveryCheck is disabled")
|
||||
return nil, status.Errorf(codes.Unimplemented, "SendWithDeliveryCheck method is disabled")
|
||||
}
|
||||
|
||||
log.Tracef("received a new message to send from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||
if _, found := s.registry.Get(msg.RemoteKey); found && !s.directSendDisabled {
|
||||
if err := s.forwardMessageToPeer(ctx, msg); err != nil {
|
||||
if errors.Is(err, ErrPeerNotConnected) {
|
||||
log.Tracef("remote peer [%s] not connected", msg.RemoteKey)
|
||||
return nil, status.Errorf(codes.NotFound, "remote peer not connected")
|
||||
}
|
||||
log.Errorf("error sending message with delivery check to peer [%s]: %v", msg.RemoteKey, err)
|
||||
return nil, status.Errorf(codes.Internal, "error forwarding message to peer: %v", err)
|
||||
}
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
if _, err := s.dispatcher.SendMessage(ctx, msg, true); err != nil {
|
||||
if errors.Is(err, dispatcher.ErrPeerNotConnected) {
|
||||
log.Tracef("remote peer [%s] doesn't have a listener", msg.RemoteKey)
|
||||
return nil, status.Errorf(codes.NotFound, "remote peer not connected")
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &emptypb.Empty{}, nil
|
||||
}
|
||||
|
||||
// ConnectStream connects to the exchange stream
|
||||
@@ -108,6 +172,7 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
p, err := s.RegisterPeer(stream, cancel)
|
||||
if err != nil {
|
||||
log.Errorf("error registering peer: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -117,6 +182,7 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
|
||||
err = stream.SendHeader(s.successHeader)
|
||||
if err != nil {
|
||||
s.metrics.RegistrationFailures.Add(stream.Context(), 1, metric.WithAttributes(attribute.String(labelError, labelErrorFailedHeader)))
|
||||
log.Errorf("error sending registration header to peer [%s] [streamID %d] : %v", p.Id, p.StreamID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -141,7 +207,7 @@ func (s *Server) RegisterPeer(stream proto.SignalExchange_ConnectStreamServer, c
|
||||
|
||||
p := peer.NewPeer(id[0], stream, cancel)
|
||||
if err := s.registry.Register(p); err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("error adding peer to registry peer: %w", err)
|
||||
}
|
||||
err := s.dispatcher.ListenForMessages(stream.Context(), p.Id, s.forwardMessageToPeer)
|
||||
if err != nil {
|
||||
@@ -158,7 +224,7 @@ func (s *Server) DeregisterPeer(p *peer.Peer) {
|
||||
s.registry.Deregister(p)
|
||||
}
|
||||
|
||||
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) {
|
||||
func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedMessage) error {
|
||||
log.Tracef("forwarding a new message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||
getRegistrationStart := time.Now()
|
||||
|
||||
@@ -170,7 +236,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
|
||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeNotConnected)))
|
||||
log.Tracef("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
|
||||
// todo respond to the sender?
|
||||
return
|
||||
return ErrPeerNotConnected
|
||||
}
|
||||
|
||||
s.metrics.GetRegistrationDelay.Record(ctx, float64(time.Since(getRegistrationStart).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream), attribute.String(labelRegistrationStatus, labelRegistrationFound)))
|
||||
@@ -191,7 +257,7 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
|
||||
if err != nil {
|
||||
log.Tracef("error while forwarding message from peer [%s] to peer [%s]: %v", msg.Key, msg.RemoteKey, err)
|
||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeError)))
|
||||
return
|
||||
return fmt.Errorf("error sending message to peer: %v", err)
|
||||
}
|
||||
s.metrics.MessageForwardLatency.Record(ctx, float64(time.Since(start).Nanoseconds())/1e6, metric.WithAttributes(attribute.String(labelType, labelTypeStream)))
|
||||
s.metrics.MessagesForwarded.Add(ctx, 1)
|
||||
@@ -200,10 +266,13 @@ func (s *Server) forwardMessageToPeer(ctx context.Context, msg *proto.EncryptedM
|
||||
case <-dstPeer.Stream.Context().Done():
|
||||
log.Tracef("failed to forward message from peer [%s] to peer [%s]: destination peer disconnected", msg.Key, msg.RemoteKey)
|
||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeDisconnected)))
|
||||
|
||||
return fmt.Errorf("destination peer disconnected")
|
||||
case <-time.After(s.sendTimeout):
|
||||
dstPeer.Cancel() // cancel the peer context to trigger deregistration
|
||||
log.Tracef("failed to forward message from peer [%s] to peer [%s]: send timeout", msg.Key, msg.RemoteKey)
|
||||
s.metrics.MessageForwardFailures.Add(ctx, 1, metric.WithAttributes(attribute.String(labelType, labelTypeTimeout)))
|
||||
return fmt.Errorf("sending message to peer timeout")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -1,13 +1,9 @@
|
||||
package version
|
||||
|
||||
import (
|
||||
"golang.org/x/sys/windows/registry"
|
||||
"runtime"
|
||||
)
|
||||
import "golang.org/x/sys/windows/registry"
|
||||
|
||||
const (
|
||||
urlWinExe = "https://pkgs.netbird.io/windows/x64"
|
||||
urlWinExeArm = "https://pkgs.netbird.io/windows/arm64"
|
||||
)
|
||||
|
||||
var regKeyAppPath = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\Netbird"
|
||||
@@ -15,14 +11,9 @@ var regKeyAppPath = "SOFTWARE\\Microsoft\\Windows\\CurrentVersion\\App Paths\\Ne
|
||||
// DownloadUrl return with the proper download link
|
||||
func DownloadUrl() string {
|
||||
_, err := registry.OpenKey(registry.LOCAL_MACHINE, regKeyAppPath, registry.QUERY_VALUE)
|
||||
if err != nil {
|
||||
if err == nil {
|
||||
return urlWinExe
|
||||
} else {
|
||||
return downloadURL
|
||||
}
|
||||
|
||||
url := urlWinExe
|
||||
if runtime.GOARCH == "arm64" {
|
||||
url = urlWinExeArm
|
||||
}
|
||||
|
||||
return url
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user