mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-19 00:36:38 +00:00
Compare commits
7 Commits
handle-exi
...
feature/re
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
23cc698c4d | ||
|
|
929388c5f4 | ||
|
|
fcb06d1a23 | ||
|
|
04619fe54f | ||
|
|
0bad81b99e | ||
|
|
7be389289c | ||
|
|
0568622d63 |
@@ -1,13 +0,0 @@
|
|||||||
package client
|
|
||||||
|
|
||||||
type RelayAddr struct {
|
|
||||||
addr string
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a RelayAddr) Network() string {
|
|
||||||
return "relay"
|
|
||||||
}
|
|
||||||
|
|
||||||
func (a RelayAddr) String() string {
|
|
||||||
return a.addr
|
|
||||||
}
|
|
||||||
@@ -136,7 +136,7 @@ type Client struct {
|
|||||||
mu sync.Mutex // protect serviceIsRunning and conns
|
mu sync.Mutex // protect serviceIsRunning and conns
|
||||||
readLoopMutex sync.Mutex
|
readLoopMutex sync.Mutex
|
||||||
wgReadLoop sync.WaitGroup
|
wgReadLoop sync.WaitGroup
|
||||||
instanceURL *RelayAddr
|
instanceURL *messages.RelayAddr
|
||||||
muInstanceURL sync.Mutex
|
muInstanceURL sync.Mutex
|
||||||
|
|
||||||
onDisconnectListener func(string)
|
onDisconnectListener func(string)
|
||||||
@@ -189,7 +189,11 @@ func (c *Client) Connect(ctx context.Context) error {
|
|||||||
c.instanceURL = instanceURL
|
c.instanceURL = instanceURL
|
||||||
c.muInstanceURL.Unlock()
|
c.muInstanceURL.Unlock()
|
||||||
|
|
||||||
c.stateSubscription = NewPeersStateSubscription(c.log, c.relayConn, c.closeConnsByPeerID)
|
if c.instanceURL.FeatureVersionCode < messages.VersionSubscription {
|
||||||
|
c.log.Warnf("server is deprecated, peer state subscription feature will not work")
|
||||||
|
} else {
|
||||||
|
c.stateSubscription = NewPeersStateSubscription(c.log, c.relayConn, c.closeConnsByPeerID)
|
||||||
|
}
|
||||||
|
|
||||||
c.log = c.log.WithField("relay", instanceURL.String())
|
c.log = c.log.WithField("relay", instanceURL.String())
|
||||||
c.log.Infof("relay connection established")
|
c.log.Infof("relay connection established")
|
||||||
@@ -291,7 +295,7 @@ func (c *Client) Close() error {
|
|||||||
return c.close(true)
|
return c.close(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) connect(ctx context.Context) (*RelayAddr, error) {
|
func (c *Client) connect(ctx context.Context) (*messages.RelayAddr, error) {
|
||||||
rd := dialer.NewRaceDial(c.log, c.connectionURL, quic.Dialer{}, ws.Dialer{})
|
rd := dialer.NewRaceDial(c.log, c.connectionURL, quic.Dialer{}, ws.Dialer{})
|
||||||
conn, err := rd.Dial()
|
conn, err := rd.Dial()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -311,7 +315,7 @@ func (c *Client) connect(ctx context.Context) (*RelayAddr, error) {
|
|||||||
return instanceURL, nil
|
return instanceURL, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) handShake(ctx context.Context) (*RelayAddr, error) {
|
func (c *Client) handShake(ctx context.Context) (*messages.RelayAddr, error) {
|
||||||
msg, err := messages.MarshalAuthMsg(c.hashedID, c.authTokenStore.TokenBinary())
|
msg, err := messages.MarshalAuthMsg(c.hashedID, c.authTokenStore.TokenBinary())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Errorf("failed to marshal auth message: %s", err)
|
c.log.Errorf("failed to marshal auth message: %s", err)
|
||||||
@@ -346,12 +350,16 @@ func (c *Client) handShake(ctx context.Context) (*RelayAddr, error) {
|
|||||||
return nil, fmt.Errorf("unexpected message type")
|
return nil, fmt.Errorf("unexpected message type")
|
||||||
}
|
}
|
||||||
|
|
||||||
addr, err := messages.UnmarshalAuthResponse(buf[:n])
|
payload, err := messages.UnmarshalAuthResponse(buf[:n])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &RelayAddr{addr: addr}, nil
|
relayAddr, err := messages.UnmarshalRelayAddr(payload)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return relayAddr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) readLoop(hc *healthcheck.Receiver, relayConn net.Conn, internallyStoppedFlag *internalStopFlag) {
|
func (c *Client) readLoop(hc *healthcheck.Receiver, relayConn net.Conn, internallyStoppedFlag *internalStopFlag) {
|
||||||
@@ -647,6 +655,11 @@ func (c *Client) readWithTimeout(ctx context.Context, buf []byte) (int, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) handlePeersOnlineMsg(buf []byte) {
|
func (c *Client) handlePeersOnlineMsg(buf []byte) {
|
||||||
|
if c.stateSubscription == nil {
|
||||||
|
c.log.Warnf("message type %d is not supported by the server, peer state subscription feature is not available)", messages.MsgTypePeersOnline)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
peersID, err := messages.UnmarshalPeersOnlineMsg(buf)
|
peersID, err := messages.UnmarshalPeersOnlineMsg(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Errorf("failed to unmarshal peers online msg: %s", err)
|
c.log.Errorf("failed to unmarshal peers online msg: %s", err)
|
||||||
@@ -656,6 +669,11 @@ func (c *Client) handlePeersOnlineMsg(buf []byte) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) handlePeersWentOfflineMsg(buf []byte) {
|
func (c *Client) handlePeersWentOfflineMsg(buf []byte) {
|
||||||
|
if c.stateSubscription == nil {
|
||||||
|
c.log.Warnf("message type %d is not supported by the server, peer state subscription feature is not available)", messages.MsgTypePeersWentOffline)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
peersID, err := messages.UnMarshalPeersWentOffline(buf)
|
peersID, err := messages.UnMarshalPeersWentOffline(buf)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Errorf("failed to unmarshal peers went offline msg: %s", err)
|
c.log.Errorf("failed to unmarshal peers went offline msg: %s", err)
|
||||||
|
|||||||
@@ -314,8 +314,8 @@ func TestBindToUnavailabePeer(t *testing.T) {
|
|||||||
t.Errorf("failed to connect to server: %s", err)
|
t.Errorf("failed to connect to server: %s", err)
|
||||||
}
|
}
|
||||||
_, err = clientAlice.OpenConn(ctx, "bob")
|
_, err = clientAlice.OpenConn(ctx, "bob")
|
||||||
if err == nil {
|
if err != nil {
|
||||||
t.Errorf("expected error when binding to unavailable peer, got nil")
|
t.Errorf("failed to open bob: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("closing client")
|
log.Infof("closing client")
|
||||||
@@ -390,18 +390,13 @@ func TestBindReconnect(t *testing.T) {
|
|||||||
|
|
||||||
chAlice, err := clientAlice.OpenConn(ctx, "bob")
|
chAlice, err := clientAlice.OpenConn(ctx, "bob")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("failed to bind channel: %s", err)
|
t.Fatalf("failed to bind channel: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
testString := "hello alice, I am bob"
|
testString := "hello alice, I am bob"
|
||||||
_, err = chBob.Write([]byte(testString))
|
_, err = chBob.Write([]byte(testString))
|
||||||
if err == nil {
|
|
||||||
t.Errorf("expected error when writing to channel, got nil")
|
|
||||||
}
|
|
||||||
|
|
||||||
chBob, err = clientBob.OpenConn(ctx, "alice")
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("failed to bind channel: %s", err)
|
t.Errorf("expected error when writing to channel, got nil")
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err = chBob.Write([]byte(testString))
|
_, err = chBob.Write([]byte(testString))
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ type Conn struct {
|
|||||||
client *Client
|
client *Client
|
||||||
dstID messages.PeerID
|
dstID messages.PeerID
|
||||||
messageChan chan Msg
|
messageChan chan Msg
|
||||||
instanceURL *RelayAddr
|
instanceURL *messages.RelayAddr
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewConn creates a new connection to a relayed remote peer.
|
// NewConn creates a new connection to a relayed remote peer.
|
||||||
@@ -20,7 +20,7 @@ type Conn struct {
|
|||||||
// dstID: the destination peer ID
|
// dstID: the destination peer ID
|
||||||
// messageChan: the channel where the messages will be received
|
// messageChan: the channel where the messages will be received
|
||||||
// instanceURL: the relay instance URL, it used to get the proper server instance address for the remote peer
|
// instanceURL: the relay instance URL, it used to get the proper server instance address for the remote peer
|
||||||
func NewConn(client *Client, dstID messages.PeerID, messageChan chan Msg, instanceURL *RelayAddr) *Conn {
|
func NewConn(client *Client, dstID messages.PeerID, messageChan chan Msg, instanceURL *messages.RelayAddr) *Conn {
|
||||||
c := &Conn{
|
c := &Conn{
|
||||||
client: client,
|
client: client,
|
||||||
dstID: dstID,
|
dstID: dstID,
|
||||||
|
|||||||
@@ -285,15 +285,15 @@ func TestForeginAutoClose(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
t.Log("open connection to another peer")
|
t.Log("open connection to another peer")
|
||||||
if _, err = mgr.OpenConn(ctx, toURL(srvCfg2)[0], "anotherpeer"); err == nil {
|
if _, err = mgr.OpenConn(ctx, toURL(srvCfg2)[0], "anotherpeer"); err != nil {
|
||||||
t.Fatalf("should have failed to open connection to another peer")
|
t.Fatalf("failed to open connection: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
timeout := relayCleanupInterval + keepUnusedServerTime + 1*time.Second
|
timeout := relayCleanupInterval + keepUnusedServerTime + 1*time.Second
|
||||||
t.Logf("waiting for relay cleanup: %s", timeout)
|
t.Logf("waiting for relay cleanup: %s", timeout)
|
||||||
time.Sleep(timeout)
|
time.Sleep(timeout)
|
||||||
if len(mgr.relayClients) != 0 {
|
if len(mgr.relayClients) != 1 {
|
||||||
t.Errorf("expected 0, got %d", len(mgr.relayClients))
|
t.Errorf("expected 1 relay client, got %d", len(mgr.relayClients))
|
||||||
}
|
}
|
||||||
|
|
||||||
t.Logf("closing manager")
|
t.Logf("closing manager")
|
||||||
|
|||||||
@@ -46,6 +46,10 @@ func NewPeersStateSubscription(log *log.Entry, relayConn relayedConnWriter, offl
|
|||||||
// OnPeersOnline should be called when a notification is received that certain peers have come online.
|
// OnPeersOnline should be called when a notification is received that certain peers have come online.
|
||||||
// It checks if any of the peers are being waited on and signals their availability.
|
// It checks if any of the peers are being waited on and signals their availability.
|
||||||
func (s *PeersStateSubscription) OnPeersOnline(peersID []messages.PeerID) {
|
func (s *PeersStateSubscription) OnPeersOnline(peersID []messages.PeerID) {
|
||||||
|
if s == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
@@ -63,6 +67,10 @@ func (s *PeersStateSubscription) OnPeersOnline(peersID []messages.PeerID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *PeersStateSubscription) OnPeersWentOffline(peersID []messages.PeerID) {
|
func (s *PeersStateSubscription) OnPeersWentOffline(peersID []messages.PeerID) {
|
||||||
|
if s == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
relevantPeers := make([]messages.PeerID, 0, len(peersID))
|
relevantPeers := make([]messages.PeerID, 0, len(peersID))
|
||||||
for _, peerID := range peersID {
|
for _, peerID := range peersID {
|
||||||
@@ -79,6 +87,9 @@ func (s *PeersStateSubscription) OnPeersWentOffline(peersID []messages.PeerID) {
|
|||||||
|
|
||||||
// WaitToBeOnlineAndSubscribe waits for a specific peer to come online and subscribes to its state changes.
|
// WaitToBeOnlineAndSubscribe waits for a specific peer to come online and subscribes to its state changes.
|
||||||
func (s *PeersStateSubscription) WaitToBeOnlineAndSubscribe(ctx context.Context, peerID messages.PeerID) error {
|
func (s *PeersStateSubscription) WaitToBeOnlineAndSubscribe(ctx context.Context, peerID messages.PeerID) error {
|
||||||
|
if s == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
// Check if already waiting for this peer
|
// Check if already waiting for this peer
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
if _, exists := s.waitingPeers[peerID]; exists {
|
if _, exists := s.waitingPeers[peerID]; exists {
|
||||||
@@ -132,6 +143,10 @@ func (s *PeersStateSubscription) WaitToBeOnlineAndSubscribe(ctx context.Context,
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *PeersStateSubscription) UnsubscribeStateChange(peerIDs []messages.PeerID) error {
|
func (s *PeersStateSubscription) UnsubscribeStateChange(peerIDs []messages.PeerID) error {
|
||||||
|
if s == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
msgErr := s.unsubscribeStateChange(peerIDs)
|
msgErr := s.unsubscribeStateChange(peerIDs)
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
@@ -149,6 +164,10 @@ func (s *PeersStateSubscription) UnsubscribeStateChange(peerIDs []messages.PeerI
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *PeersStateSubscription) Cleanup() {
|
func (s *PeersStateSubscription) Cleanup() {
|
||||||
|
if s == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
|||||||
56
relay/messages/addr.go
Normal file
56
relay/messages/addr.go
Normal file
@@ -0,0 +1,56 @@
|
|||||||
|
package messages
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type FeatureVersionCode uint16
|
||||||
|
|
||||||
|
const (
|
||||||
|
VersionUnknown FeatureVersionCode = 0
|
||||||
|
VersionSubscription FeatureVersionCode = 1
|
||||||
|
)
|
||||||
|
|
||||||
|
type RelayAddr struct {
|
||||||
|
Addr string `json:"ExposedAddr,omitempty"`
|
||||||
|
FeatureVersionCode FeatureVersionCode `json:"Version,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a RelayAddr) Network() string {
|
||||||
|
return "relay"
|
||||||
|
}
|
||||||
|
|
||||||
|
func (a RelayAddr) String() string {
|
||||||
|
return a.Addr
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalRelayAddr json encoded RelayAddr data.
|
||||||
|
func UnmarshalRelayAddr(data []byte) (*RelayAddr, error) {
|
||||||
|
if len(data) == 0 {
|
||||||
|
return nil, fmt.Errorf("unmarshalRelayAddr: empty data")
|
||||||
|
}
|
||||||
|
|
||||||
|
var addr RelayAddr
|
||||||
|
if err := json.Unmarshal(data, &addr); err != nil {
|
||||||
|
addrString, err := fallbackToOldFormat(data)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to fallback to old auth message: %v", err)
|
||||||
|
}
|
||||||
|
return &RelayAddr{Addr: addrString}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if addr.Addr == "" {
|
||||||
|
return nil, fmt.Errorf("unmarshalRelayAddr: empty address in RelayAddr")
|
||||||
|
}
|
||||||
|
return &addr, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func fallbackToOldFormat(data []byte) (string, error) {
|
||||||
|
addr := string(data)
|
||||||
|
if !strings.HasPrefix(addr, "rel://") && !strings.HasPrefix(addr, "rels://") {
|
||||||
|
return "", fmt.Errorf("invalid address: must start with rel:// or rels://: %s", addr)
|
||||||
|
}
|
||||||
|
return addr, nil
|
||||||
|
}
|
||||||
@@ -11,7 +11,7 @@ const (
|
|||||||
MaxHandshakeRespSize = 8192
|
MaxHandshakeRespSize = 8192
|
||||||
MaxMessageSize = 8820
|
MaxMessageSize = 8820
|
||||||
|
|
||||||
CurrentProtocolVersion = 1
|
CurrentProtocolVersion = 2
|
||||||
|
|
||||||
MsgTypeUnknown MsgType = 0
|
MsgTypeUnknown MsgType = 0
|
||||||
// Deprecated: Use MsgTypeAuth instead.
|
// Deprecated: Use MsgTypeAuth instead.
|
||||||
@@ -264,11 +264,11 @@ func MarshalAuthResponse(address string) ([]byte, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// UnmarshalAuthResponse it is a confirmation message to auth success
|
// UnmarshalAuthResponse it is a confirmation message to auth success
|
||||||
func UnmarshalAuthResponse(msg []byte) (string, error) {
|
func UnmarshalAuthResponse(msg []byte) ([]byte, error) {
|
||||||
if len(msg) < sizeOfProtoHeader+1 {
|
if len(msg) < sizeOfProtoHeader+1 {
|
||||||
return "", ErrInvalidMessageLength
|
return nil, ErrInvalidMessageLength
|
||||||
}
|
}
|
||||||
return string(msg[sizeOfProtoHeader:]), nil
|
return msg[sizeOfProtoHeader:], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// MarshalCloseMsg creates a close message.
|
// MarshalCloseMsg creates a close message.
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ func TestMarshalAuthResponse(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("error: %v", err)
|
t.Fatalf("error: %v", err)
|
||||||
}
|
}
|
||||||
if respAddr != address {
|
if string(respAddr) != address {
|
||||||
t.Errorf("expected %s, got %s", address, respAddr)
|
t.Errorf("expected %s, got %s", address, respAddr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user