Merge remote-tracking branch 'origin/main' into proto-ipv6-overlay

# Conflicts:
#	client/internal/peer/status.go
This commit is contained in:
Viktor Liu
2026-05-04 12:08:38 +02:00
25 changed files with 986 additions and 239 deletions

View File

@@ -2568,6 +2568,8 @@ func convertToOfferAnswer(msg *sProto.Message) (*peer.OfferAnswer, error) {
}
}
relayIP := decodeRelayIP(msg.GetBody().GetRelayServerIP())
offerAnswer := peer.OfferAnswer{
IceCredentials: peer.IceCredentials{
UFrag: remoteCred.UFrag,
@@ -2578,7 +2580,23 @@ func convertToOfferAnswer(msg *sProto.Message) (*peer.OfferAnswer, error) {
RosenpassPubKey: rosenpassPubKey,
RosenpassAddr: rosenpassAddr,
RelaySrvAddress: msg.GetBody().GetRelayServerAddress(),
RelaySrvIP: relayIP,
SessionID: sessionID,
}
return &offerAnswer, nil
}
// decodeRelayIP decodes the proto relayServerIP bytes (4 or 16) into a
// netip.Addr. Returns the zero value for empty input and logs a warning
// for malformed payloads.
func decodeRelayIP(b []byte) netip.Addr {
if len(b) == 0 {
return netip.Addr{}
}
ip, ok := netip.AddrFromSlice(b)
if !ok {
log.Warnf("invalid relayServerIP in signal message (%d bytes), ignoring", len(b))
return netip.Addr{}
}
return ip.Unmap()
}

View File

@@ -3,6 +3,7 @@ package peer
import (
"context"
"errors"
"net/netip"
"sync"
"sync/atomic"
@@ -40,6 +41,10 @@ type OfferAnswer struct {
// relay server address
RelaySrvAddress string
// RelaySrvIP is the IP the remote peer is connected to on its
// relay server. Used as a dial target if DNS for RelaySrvAddress
// fails. Zero value if the peer did not advertise an IP.
RelaySrvIP netip.Addr
// SessionID is the unique identifier of the session, used to discard old messages
SessionID *ICESessionID
}
@@ -217,8 +222,9 @@ func (h *Handshaker) buildOfferAnswer() OfferAnswer {
answer.SessionID = &sid
}
if addr, err := h.relay.RelayInstanceAddress(); err == nil {
if addr, ip, err := h.relay.RelayInstanceAddress(); err == nil {
answer.RelaySrvAddress = addr
answer.RelaySrvIP = ip
}
return answer

View File

@@ -8,6 +8,7 @@ import (
type mocListener struct {
lastState int
wg sync.WaitGroup
peersWg sync.WaitGroup
peers int
}
@@ -33,6 +34,7 @@ func (l *mocListener) OnAddressChanged(host, addr string) {
}
func (l *mocListener) OnPeersListChanged(size int) {
l.peers = size
l.peersWg.Done()
}
func (l *mocListener) setWaiter() {
@@ -43,6 +45,14 @@ func (l *mocListener) wait() {
l.wg.Wait()
}
func (l *mocListener) setPeersWaiter() {
l.peersWg.Add(1)
}
func (l *mocListener) waitPeers() {
l.peersWg.Wait()
}
func Test_notifier_serverState(t *testing.T) {
type scenario struct {
@@ -72,11 +82,13 @@ func Test_notifier_serverState(t *testing.T) {
func Test_notifier_SetListener(t *testing.T) {
listener := &mocListener{}
listener.setWaiter()
listener.setPeersWaiter()
n := newNotifier()
n.lastNotification = stateConnecting
n.setListener(listener)
listener.wait()
listener.waitPeers()
if listener.lastState != n.lastNotification {
t.Errorf("invalid state: %d, expected: %d", listener.lastState, n.lastNotification)
}
@@ -85,9 +97,14 @@ func Test_notifier_SetListener(t *testing.T) {
func Test_notifier_RemoveListener(t *testing.T) {
listener := &mocListener{}
listener.setWaiter()
listener.setPeersWaiter()
n := newNotifier()
n.lastNotification = stateConnecting
n.setListener(listener)
// setListener replays cached state on a goroutine; wait for both the state
// and peers callbacks to finish so we don't race on listener.peers.
listener.wait()
listener.waitPeers()
n.removeListener()
n.peerListChanged(1)

View File

@@ -54,19 +54,19 @@ func (s *Signaler) signalOfferAnswer(offerAnswer OfferAnswer, remoteKey string,
log.Warnf("failed to get session ID bytes: %v", err)
}
}
msg, err := signal.MarshalCredential(
s.wgPrivateKey,
offerAnswer.WgListenPort,
remoteKey,
&signal.Credential{
msg, err := signal.MarshalCredential(s.wgPrivateKey, remoteKey, signal.CredentialPayload{
Type: bodyType,
WgListenPort: offerAnswer.WgListenPort,
Credential: &signal.Credential{
UFrag: offerAnswer.IceCredentials.UFrag,
Pwd: offerAnswer.IceCredentials.Pwd,
},
bodyType,
offerAnswer.RosenpassPubKey,
offerAnswer.RosenpassAddr,
offerAnswer.RelaySrvAddress,
sessionIDBytes)
RosenpassPubKey: offerAnswer.RosenpassPubKey,
RosenpassAddr: offerAnswer.RosenpassAddr,
RelaySrvAddress: offerAnswer.RelaySrvAddress,
RelaySrvIP: offerAnswer.RelaySrvIP,
SessionID: sessionIDBytes,
})
if err != nil {
return err
}

View File

@@ -323,10 +323,10 @@ func (d *Status) RemovePeer(peerPubKey string) error {
// UpdatePeerState updates peer status
func (d *Status) UpdatePeerState(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
d.mux.Unlock()
return errors.New("peer doesn't exist")
}
@@ -346,23 +346,29 @@ func (d *Status) UpdatePeerState(receivedState State) error {
d.peers[receivedState.PubKey] = peerState
if hasConnStatusChanged(oldState, receivedState.ConnStatus) {
d.notifyPeerListChanged()
}
notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus)
// when we close the connection we will not notify the router manager
if receivedState.ConnStatus == StatusIdle {
d.notifyPeerStateChangeListeners(receivedState.PubKey)
notifyRouter := receivedState.ConnStatus == StatusIdle
routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter)
numPeers := d.numOfPeers()
d.mux.Unlock()
if notifyList {
d.notifier.peerListChanged(numPeers)
}
if notifyRouter {
d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot)
}
return nil
}
func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.ResID) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[peer]
if !ok {
d.mux.Unlock()
return errors.New("peer doesn't exist")
}
@@ -374,17 +380,20 @@ func (d *Status) AddPeerStateRoute(peer string, route string, resourceId route.R
d.routeIDLookup.AddRemoteRouteID(resourceId, pref)
}
numPeers := d.numOfPeers()
d.mux.Unlock()
// todo: consider to make sense of this notification or not
d.notifyPeerListChanged()
d.notifier.peerListChanged(numPeers)
return nil
}
func (d *Status) RemovePeerStateRoute(peer string, route string) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[peer]
if !ok {
d.mux.Unlock()
return errors.New("peer doesn't exist")
}
@@ -396,8 +405,11 @@ func (d *Status) RemovePeerStateRoute(peer string, route string) error {
d.routeIDLookup.RemoveRemoteRouteID(pref)
}
numPeers := d.numOfPeers()
d.mux.Unlock()
// todo: consider to make sense of this notification or not
d.notifyPeerListChanged()
d.notifier.peerListChanged(numPeers)
return nil
}
@@ -413,10 +425,10 @@ func (d *Status) CheckRoutes(ip netip.Addr) ([]byte, bool) {
func (d *Status) UpdatePeerICEState(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
d.mux.Unlock()
return errors.New("peer doesn't exist")
}
@@ -434,22 +446,28 @@ func (d *Status) UpdatePeerICEState(receivedState State) error {
d.peers[receivedState.PubKey] = peerState
if hasConnStatusChanged(oldState, receivedState.ConnStatus) {
d.notifyPeerListChanged()
}
notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus)
notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed)
routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter)
numPeers := d.numOfPeers()
if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) {
d.notifyPeerStateChangeListeners(receivedState.PubKey)
d.mux.Unlock()
if notifyList {
d.notifier.peerListChanged(numPeers)
}
if notifyRouter {
d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot)
}
return nil
}
func (d *Status) UpdatePeerRelayedState(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
d.mux.Unlock()
return errors.New("peer doesn't exist")
}
@@ -464,22 +482,28 @@ func (d *Status) UpdatePeerRelayedState(receivedState State) error {
d.peers[receivedState.PubKey] = peerState
if hasConnStatusChanged(oldState, receivedState.ConnStatus) {
d.notifyPeerListChanged()
}
notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus)
notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed)
routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter)
numPeers := d.numOfPeers()
if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) {
d.notifyPeerStateChangeListeners(receivedState.PubKey)
d.mux.Unlock()
if notifyList {
d.notifier.peerListChanged(numPeers)
}
if notifyRouter {
d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot)
}
return nil
}
func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
d.mux.Unlock()
return errors.New("peer doesn't exist")
}
@@ -493,22 +517,28 @@ func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error
d.peers[receivedState.PubKey] = peerState
if hasConnStatusChanged(oldState, receivedState.ConnStatus) {
d.notifyPeerListChanged()
}
notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus)
notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed)
routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter)
numPeers := d.numOfPeers()
if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) {
d.notifyPeerStateChangeListeners(receivedState.PubKey)
d.mux.Unlock()
if notifyList {
d.notifier.peerListChanged(numPeers)
}
if notifyRouter {
d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot)
}
return nil
}
func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
d.mux.Unlock()
return errors.New("peer doesn't exist")
}
@@ -525,12 +555,18 @@ func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
d.peers[receivedState.PubKey] = peerState
if hasConnStatusChanged(oldState, receivedState.ConnStatus) {
d.notifyPeerListChanged()
}
notifyList := hasConnStatusChanged(oldState, receivedState.ConnStatus)
notifyRouter := hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed)
routerSnapshot := d.snapshotRouterPeersLocked(receivedState.PubKey, notifyRouter)
numPeers := d.numOfPeers()
if hasStatusOrRelayedChange(oldState, receivedState.ConnStatus, oldIsRelayed, receivedState.Relayed) {
d.notifyPeerStateChangeListeners(receivedState.PubKey)
d.mux.Unlock()
if notifyList {
d.notifier.peerListChanged(numPeers)
}
if notifyRouter {
d.dispatchRouterPeers(receivedState.PubKey, routerSnapshot)
}
return nil
}
@@ -597,17 +633,33 @@ func (d *Status) UpdatePeerSSHHostKey(peerPubKey string, sshHostKey []byte) erro
// FinishPeerListModifications this event invoke the notification
func (d *Status) FinishPeerListModifications() {
d.mux.Lock()
defer d.mux.Unlock()
if !d.peerListChangedForNotification {
d.mux.Unlock()
return
}
d.peerListChangedForNotification = false
d.notifyPeerListChanged()
numPeers := d.numOfPeers()
// snapshot per-peer router state to deliver after the lock is released
type routerDispatch struct {
peerID string
snapshot map[string]RouterState
}
dispatches := make([]routerDispatch, 0, len(d.peers))
for key := range d.peers {
d.notifyPeerStateChangeListeners(key)
snapshot := d.snapshotRouterPeersLocked(key, true)
if snapshot != nil {
dispatches = append(dispatches, routerDispatch{peerID: key, snapshot: snapshot})
}
}
d.mux.Unlock()
d.notifier.peerListChanged(numPeers)
for _, rd := range dispatches {
d.dispatchRouterPeers(rd.peerID, rd.snapshot)
}
}
@@ -658,10 +710,15 @@ func (d *Status) GetLocalPeerState() LocalPeerState {
// UpdateLocalPeerState updates local peer status
func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) {
d.mux.Lock()
defer d.mux.Unlock()
d.localPeer = localPeerState
d.notifyAddressChanged()
fqdn := d.localPeer.FQDN
ip := d.localPeer.IP
if d.localPeer.IPv6 != "" {
ip = ip + "\n" + d.localPeer.IPv6
}
d.mux.Unlock()
d.notifier.localAddressChanged(fqdn, ip)
}
// AddLocalPeerStateRoute adds a route to the local peer state
@@ -724,30 +781,36 @@ func (d *Status) CleanLocalPeerStateRoutes() {
// CleanLocalPeerState cleans local peer status
func (d *Status) CleanLocalPeerState() {
d.mux.Lock()
defer d.mux.Unlock()
d.localPeer = LocalPeerState{}
d.notifyAddressChanged()
fqdn := d.localPeer.FQDN
ip := d.localPeer.IP
d.mux.Unlock()
d.notifier.localAddressChanged(fqdn, ip)
}
// MarkManagementDisconnected sets ManagementState to disconnected
func (d *Status) MarkManagementDisconnected(err error) {
d.mux.Lock()
defer d.mux.Unlock()
defer d.onConnectionChanged()
d.managementState = false
d.managementError = err
mgm := d.managementState
sig := d.signalState
d.mux.Unlock()
d.notifier.updateServerStates(mgm, sig)
}
// MarkManagementConnected sets ManagementState to connected
func (d *Status) MarkManagementConnected() {
d.mux.Lock()
defer d.mux.Unlock()
defer d.onConnectionChanged()
d.managementState = true
d.managementError = nil
mgm := d.managementState
sig := d.signalState
d.mux.Unlock()
d.notifier.updateServerStates(mgm, sig)
}
// UpdateSignalAddress update the address of the signal server
@@ -781,21 +844,25 @@ func (d *Status) UpdateLazyConnection(enabled bool) {
// MarkSignalDisconnected sets SignalState to disconnected
func (d *Status) MarkSignalDisconnected(err error) {
d.mux.Lock()
defer d.mux.Unlock()
defer d.onConnectionChanged()
d.signalState = false
d.signalError = err
mgm := d.managementState
sig := d.signalState
d.mux.Unlock()
d.notifier.updateServerStates(mgm, sig)
}
// MarkSignalConnected sets SignalState to connected
func (d *Status) MarkSignalConnected() {
d.mux.Lock()
defer d.mux.Unlock()
defer d.onConnectionChanged()
d.signalState = true
d.signalError = nil
mgm := d.managementState
sig := d.signalState
d.mux.Unlock()
d.notifier.updateServerStates(mgm, sig)
}
func (d *Status) UpdateRelayStates(relayResults []relay.ProbeResult) {
@@ -922,7 +989,7 @@ func (d *Status) GetRelayStates() []relay.ProbeResult {
// if the server connection is not established then we will use the general address
// in case of connection we will use the instance specific address
instanceAddr, err := d.relayMgr.RelayInstanceAddress()
instanceAddr, _, err := d.relayMgr.RelayInstanceAddress()
if err != nil {
// TODO add their status
for _, r := range d.relayMgr.ServerURLs() {
@@ -1015,18 +1082,17 @@ func (d *Status) RemoveConnectionListener() {
d.notifier.removeListener()
}
func (d *Status) onConnectionChanged() {
d.notifier.updateServerStates(d.managementState, d.signalState)
}
// notifyPeerStateChangeListeners notifies route manager about the change in peer state
func (d *Status) notifyPeerStateChangeListeners(peerID string) {
subs, ok := d.changeNotify[peerID]
if !ok {
return
// snapshotRouterPeersLocked builds the RouterState map for a peer's subscribers.
// Caller MUST hold d.mux. Returns nil when there are no subscribers for peerID
// or when notify is false. The snapshot is consumed later by dispatchRouterPeers
// outside the lock so the channel send cannot stall any d.mux holder.
func (d *Status) snapshotRouterPeersLocked(peerID string, notify bool) map[string]RouterState {
if !notify {
return nil
}
if _, ok := d.changeNotify[peerID]; !ok {
return nil
}
// collect the relevant data for router peers
routerPeers := make(map[string]RouterState, len(d.changeNotify))
for pid := range d.changeNotify {
s, ok := d.peers[pid]
@@ -1034,13 +1100,35 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) {
log.Warnf("router peer not found in peers list: %s", pid)
continue
}
routerPeers[pid] = RouterState{
Status: s.ConnStatus,
Relayed: s.Relayed,
Latency: s.Latency,
}
}
return routerPeers
}
// dispatchRouterPeers delivers a previously snapshotted router-state map to
// the peer's subscribers. Caller MUST NOT hold d.mux. The method takes a
// fresh, short read of d.changeNotify under the lock to grab subscriber
// channels, then sends outside the lock so a slow consumer cannot block other
// d.mux holders. The send itself stays blocking (only short-circuited by the
// subscriber's context) so peer state transitions are not silently dropped.
func (d *Status) dispatchRouterPeers(peerID string, routerPeers map[string]RouterState) {
if routerPeers == nil {
return
}
d.mux.Lock()
subsMap, ok := d.changeNotify[peerID]
subs := make([]*StatusChangeSubscription, 0, len(subsMap))
if ok {
for _, sub := range subsMap {
subs = append(subs, sub)
}
}
d.mux.Unlock()
for _, sub := range subs {
select {
@@ -1050,18 +1138,6 @@ func (d *Status) notifyPeerStateChangeListeners(peerID string) {
}
}
func (d *Status) notifyPeerListChanged() {
d.notifier.peerListChanged(d.numOfPeers())
}
func (d *Status) notifyAddressChanged() {
addr := d.localPeer.IP
if d.localPeer.IPv6 != "" {
addr = addr + "\n" + d.localPeer.IPv6
}
d.notifier.localAddressChanged(d.localPeer.FQDN, addr)
}
func (d *Status) numOfPeers() int {
return len(d.peers) + len(d.offlinePeers)
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"errors"
"net"
"net/netip"
"sync"
"sync/atomic"
@@ -53,15 +54,19 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
w.relaySupportedOnRemotePeer.Store(true)
// the relayManager will return with error in case if the connection has lost with relay server
currentRelayAddress, err := w.relayManager.RelayInstanceAddress()
currentRelayAddress, _, err := w.relayManager.RelayInstanceAddress()
if err != nil {
w.log.Errorf("failed to handle new offer: %s", err)
return
}
srv := w.preferredRelayServer(currentRelayAddress, remoteOfferAnswer.RelaySrvAddress)
var serverIP netip.Addr
if srv == remoteOfferAnswer.RelaySrvAddress {
serverIP = remoteOfferAnswer.RelaySrvIP
}
relayedConn, err := w.relayManager.OpenConn(w.peerCtx, srv, w.config.Key)
relayedConn, err := w.relayManager.OpenConn(w.peerCtx, srv, w.config.Key, serverIP)
if err != nil {
if errors.Is(err, relayClient.ErrConnAlreadyExists) {
w.log.Debugf("handled offer by reusing existing relay connection")
@@ -90,7 +95,7 @@ func (w *WorkerRelay) OnNewOffer(remoteOfferAnswer *OfferAnswer) {
})
}
func (w *WorkerRelay) RelayInstanceAddress() (string, error) {
func (w *WorkerRelay) RelayInstanceAddress() (string, netip.Addr, error) {
return w.relayManager.RelayInstanceAddress()
}

View File

@@ -13,11 +13,9 @@ import (
"github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
nbgrpc "github.com/netbirdio/netbird/client/grpc"
"github.com/netbirdio/netbird/flow/proto"
@@ -301,12 +299,11 @@ func defaultBackoff(ctx context.Context, interval time.Duration) backoff.BackOff
}, ctx)
}
// isContextDone reports whether the local context has been canceled or has
// exceeded its deadline. It deliberately does not inspect gRPC status codes:
// a server- or proxy-sent codes.Canceled / codes.DeadlineExceeded must not
// short-circuit our retry loop, since retrying is the correct response when
// the local context is still alive.
func isContextDone(err error) bool {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
return true
}
if s, ok := status.FromError(err); ok {
return s.Code() == codes.Canceled || s.Code() == codes.DeadlineExceeded
}
return false
return errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)
}

View File

@@ -862,6 +862,9 @@ func (am *DefaultAccountManager) AddPeer(ctx context.Context, accountID, setupKe
if !addedByUser {
opEvent.Meta["setup_key_name"] = peerAddConfig.SetupKeyName
}
if newPeer.Status != nil && newPeer.Status.RequiresApproval {
opEvent.Meta["pending_approval"] = true
}
if !temporary {
am.StoreEvent(ctx, opEvent.InitiatorID, opEvent.TargetID, opEvent.AccountID, opEvent.Activity, opEvent.Meta)

View File

@@ -246,27 +246,23 @@ func (c *GrpcClient) handleJobStream(
for {
jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey)
if err != nil {
if ctx.Err() != nil {
log.Debugf("job stream context has been canceled, this usually indicates shutdown")
return nil
}
if s, ok := gstatus.FromError(err); ok {
switch s.Code() {
case codes.PermissionDenied:
c.notifyDisconnected(err)
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
case codes.Canceled:
log.Debugf("job stream context has been canceled, this usually indicates shutdown")
return err
case codes.Unimplemented:
log.Warn("Job feature is not supported by the current management server version. " +
"Please update the management service to use this feature.")
return nil
default:
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
} else {
// non-gRPC error
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
log.Warnf("job stream disconnected, will retry silently. Reason: %v", err)
return err
}
if jobReq == nil || len(jobReq.ID) == 0 {
@@ -381,22 +377,15 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
if err != nil {
c.notifyDisconnected(err)
if s, ok := gstatus.FromError(err); ok {
switch s.Code() {
case codes.PermissionDenied:
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
case codes.Canceled:
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
return nil
default:
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err
}
} else {
// non-gRPC error
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err
if ctx.Err() != nil {
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
return nil
}
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
}
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
return err
}
return nil

View File

@@ -2,8 +2,12 @@ package client
import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"net/url"
"strings"
"sync"
"time"
@@ -146,6 +150,7 @@ func (cc *connContainer) close() {
type Client struct {
log *log.Entry
connectionURL string
serverIP netip.Addr
authTokenStore *auth.TokenStore
hashedID messages.PeerID
@@ -170,13 +175,22 @@ type Client struct {
}
// NewClient creates a new client for the relay server. The client is not connected to the server until the Connect
// is called.
func NewClient(serverURL string, authTokenStore *auth.TokenStore, peerID string, mtu uint16) *Client {
return NewClientWithServerIP(serverURL, netip.Addr{}, authTokenStore, peerID, mtu)
}
// NewClientWithServerIP creates a new client for the relay server with a known server IP. serverIP, when valid, is
// dialed directly first; the FQDN is only attempted if the IP-based dial fails. TLS verification still uses the
// FQDN from serverURL via SNI.
func NewClientWithServerIP(serverURL string, serverIP netip.Addr, authTokenStore *auth.TokenStore, peerID string, mtu uint16) *Client {
hashedID := messages.HashID(peerID)
relayLog := log.WithFields(log.Fields{"relay": serverURL})
c := &Client{
log: relayLog,
connectionURL: serverURL,
serverIP: serverIP,
authTokenStore: authTokenStore,
hashedID: hashedID,
mtu: mtu,
@@ -304,6 +318,23 @@ func (c *Client) ServerInstanceURL() (string, error) {
return c.instanceURL.String(), nil
}
// ConnectedIP returns the IP address of the live relay-server connection,
// extracted from the underlying socket's RemoteAddr. Zero value if not
// connected or if the address is not an IP literal.
func (c *Client) ConnectedIP() netip.Addr {
c.mu.Lock()
conn := c.relayConn
c.mu.Unlock()
if conn == nil {
return netip.Addr{}
}
addr := conn.RemoteAddr()
if addr == nil {
return netip.Addr{}
}
return extractIPLiteral(addr.String())
}
// SetOnDisconnectListener sets a function that will be called when the connection to the relay server is closed.
func (c *Client) SetOnDisconnectListener(fn func(string)) {
c.listenerMutex.Lock()
@@ -332,10 +363,23 @@ func (c *Client) Close() error {
func (c *Client) connect(ctx context.Context) (*RelayAddr, error) {
dialers := c.getDialers()
rd := dialer.NewRaceDial(c.log, dialer.DefaultConnectionTimeout, c.connectionURL, dialers...)
conn, err := rd.Dial(ctx)
if err != nil {
return nil, err
var conn net.Conn
if c.serverIP.IsValid() {
var err error
conn, err = c.dialRaceDirect(ctx, dialers)
if err != nil {
c.log.Infof("dial via server IP %s failed, falling back to FQDN: %v", c.serverIP, err)
conn = nil
}
}
if conn == nil {
rd := dialer.NewRaceDial(c.log, dialer.DefaultConnectionTimeout, c.connectionURL, dialers...)
var err error
conn, err = rd.Dial(ctx)
if err != nil {
return nil, fmt.Errorf("dial via FQDN: %w", err)
}
}
c.relayConn = conn
@@ -351,6 +395,52 @@ func (c *Client) connect(ctx context.Context) (*RelayAddr, error) {
return instanceURL, nil
}
// dialRaceDirect dials c.serverIP, preserving the original FQDN as the TLS ServerName for SNI.
func (c *Client) dialRaceDirect(ctx context.Context, dialers []dialer.DialeFn) (net.Conn, error) {
directURL, serverName, err := substituteHost(c.connectionURL, c.serverIP)
if err != nil {
return nil, fmt.Errorf("substitute host: %w", err)
}
c.log.Debugf("dialing via server IP %s (SNI=%s)", c.serverIP, serverName)
rd := dialer.NewRaceDial(c.log, dialer.DefaultConnectionTimeout, directURL, dialers...).
WithServerName(serverName)
return rd.Dial(ctx)
}
// substituteHost replaces the host portion of a rel/rels URL with ip,
// preserving the scheme and port. Returns the rewritten URL and the
// original host to use as the TLS ServerName, or empty if the original
// host is itself an IP literal (SNI requires a DNS name).
func substituteHost(serverURL string, ip netip.Addr) (string, string, error) {
u, err := url.Parse(serverURL)
if err != nil {
return "", "", fmt.Errorf("parse %q: %w", serverURL, err)
}
if u.Scheme == "" || u.Host == "" {
return "", "", fmt.Errorf("invalid relay URL %q", serverURL)
}
if !ip.IsValid() {
return "", "", errors.New("invalid server IP")
}
origHost := u.Hostname()
if _, err := netip.ParseAddr(origHost); err == nil {
origHost = ""
}
ip = ip.Unmap()
newHost := ip.String()
if ip.Is6() {
newHost = "[" + newHost + "]"
}
if port := u.Port(); port != "" {
u.Host = newHost + ":" + port
} else {
u.Host = newHost
}
return u.String(), origHost, nil
}
func (c *Client) handShake(ctx context.Context) (*RelayAddr, error) {
msg, err := messages.MarshalAuthMsg(c.hashedID, c.authTokenStore.TokenBinary())
if err != nil {
@@ -716,3 +806,21 @@ func (c *Client) handlePeersWentOfflineMsg(buf []byte) {
}
c.stateSubscription.OnPeersWentOffline(peersID)
}
// extractIPLiteral returns the IP from address forms produced by the relay
// dialers (URL or host:port). Zero value if the host is not an IP.
func extractIPLiteral(s string) netip.Addr {
if u, err := url.Parse(s); err == nil && u.Host != "" {
s = u.Host
}
host, _, err := net.SplitHostPort(s)
if err != nil {
host = s
}
host = strings.Trim(host, "[]")
ip, err := netip.ParseAddr(host)
if err != nil {
return netip.Addr{}
}
return ip.Unmap()
}

View File

@@ -0,0 +1,280 @@
package client
import (
"context"
"fmt"
"net"
"net/netip"
"testing"
"time"
"go.opentelemetry.io/otel"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/relay/server"
"github.com/netbirdio/netbird/shared/relay/auth/allow"
)
// TestClient_ServerIPRecoversFromUnresolvableFQDN verifies that when the
// primary FQDN-based dial fails (unresolvable .invalid host), Connect
// recovers via the server IP and SNI still uses the FQDN.
func TestClient_ServerIPRecoversFromUnresolvableFQDN(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
listenAddr, port := freeAddr(t)
srvCfg := server.Config{
Meter: otel.Meter(""),
ExposedAddress: fmt.Sprintf("rel://test-unresolvable-host.invalid:%d", port),
TLSSupport: false,
AuthValidator: &allow.Auth{},
}
srv, err := server.NewServer(srvCfg)
if err != nil {
t.Fatalf("create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
if err := srv.Listen(server.ListenerConfig{Address: listenAddr}); err != nil {
errChan <- err
}
}()
t.Cleanup(func() {
if err := srv.Shutdown(context.Background()); err != nil {
t.Errorf("shutdown server: %s", err)
}
})
if err := waitForServerToStart(errChan); err != nil {
t.Fatalf("server failed to start: %s", err)
}
t.Run("no server IP, primary fails", func(t *testing.T) {
c := NewClient(srvCfg.ExposedAddress, hmacTokenStore, "alice-noip", iface.DefaultMTU)
err := c.Connect(ctx)
if err == nil {
_ = c.Close()
t.Fatalf("expected connect to fail without server IP, got nil")
}
})
t.Run("server IP recovers", func(t *testing.T) {
c := NewClientWithServerIP(srvCfg.ExposedAddress, netip.MustParseAddr("127.0.0.1"), hmacTokenStore, "alice-with-ip", iface.DefaultMTU)
if err := c.Connect(ctx); err != nil {
t.Fatalf("connect with server IP: %s", err)
}
t.Cleanup(func() { _ = c.Close() })
if !c.Ready() {
t.Fatalf("client not ready after connect")
}
if got := c.ConnectedIP(); got.String() != "127.0.0.1" {
t.Fatalf("ConnectedIP = %q, want 127.0.0.1", got)
}
})
}
// TestClient_ConnectedIPAfterFQDNDial verifies ConnectedIP returns the
// resolved IP after a successful FQDN-based dial. The underlying socket's
// RemoteAddr must be exposed through the dialer wrappers; if it returns
// the dial-time URL instead, ConnectedIP returns empty and the dial
// IP we advertise to peers is empty too.
func TestClient_ConnectedIPAfterFQDNDial(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
listenAddr, port := freeAddr(t)
srvCfg := server.Config{
Meter: otel.Meter(""),
ExposedAddress: fmt.Sprintf("rel://localhost:%d", port),
TLSSupport: false,
AuthValidator: &allow.Auth{},
}
srv, err := server.NewServer(srvCfg)
if err != nil {
t.Fatalf("create server: %s", err)
}
errChan := make(chan error, 1)
go func() {
if err := srv.Listen(server.ListenerConfig{Address: listenAddr}); err != nil {
errChan <- err
}
}()
t.Cleanup(func() { _ = srv.Shutdown(context.Background()) })
if err := waitForServerToStart(errChan); err != nil {
t.Fatalf("server failed to start: %s", err)
}
c := NewClient(srvCfg.ExposedAddress, hmacTokenStore, "alice-fqdn", iface.DefaultMTU)
if err := c.Connect(ctx); err != nil {
t.Fatalf("connect: %s", err)
}
t.Cleanup(func() { _ = c.Close() })
got := c.ConnectedIP().String()
if got != "127.0.0.1" && got != "::1" {
t.Fatalf("ConnectedIP after FQDN dial = %q, want 127.0.0.1 or ::1", got)
}
}
func TestSubstituteHost(t *testing.T) {
tests := []struct {
name string
serverURL string
ip string
wantURL string
wantServerName string
wantErr bool
}{
{
name: "rels with port",
serverURL: "rels://relay.netbird.io:443",
ip: "10.0.0.5",
wantURL: "rels://10.0.0.5:443",
wantServerName: "relay.netbird.io",
},
{
name: "rel with port",
serverURL: "rel://relay.example.com:80",
ip: "192.0.2.1",
wantURL: "rel://192.0.2.1:80",
wantServerName: "relay.example.com",
},
{
name: "ipv6 server IP bracketed",
serverURL: "rels://relay.example.com:443",
ip: "2001:db8::1",
wantURL: "rels://[2001:db8::1]:443",
wantServerName: "relay.example.com",
},
{
name: "no port",
serverURL: "rels://relay.example.com",
ip: "10.0.0.5",
wantURL: "rels://10.0.0.5",
wantServerName: "relay.example.com",
},
{
name: "ipv6 server with port returns empty SNI",
serverURL: "rels://[2001:db8::5]:443",
ip: "10.0.0.5",
wantURL: "rels://10.0.0.5:443",
wantServerName: "",
},
{
name: "ipv4 server with port returns empty SNI",
serverURL: "rels://10.0.0.5:443",
ip: "10.0.0.6",
wantURL: "rels://10.0.0.6:443",
wantServerName: "",
},
{
name: "ipv6 server IP no port",
serverURL: "rels://relay.example.com",
ip: "2001:db8::1",
wantURL: "rels://[2001:db8::1]",
wantServerName: "relay.example.com",
},
{
name: "missing scheme",
serverURL: "relay.example.com:443",
ip: "10.0.0.5",
wantErr: true,
},
{
name: "empty",
serverURL: "",
ip: "10.0.0.5",
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var ip netip.Addr
if tt.ip != "" {
ip = netip.MustParseAddr(tt.ip)
}
gotURL, gotName, err := substituteHost(tt.serverURL, ip)
if tt.wantErr {
if err == nil {
t.Fatalf("expected error, got nil")
}
return
}
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
if gotURL != tt.wantURL {
t.Errorf("URL = %q, want %q", gotURL, tt.wantURL)
}
if gotName != tt.wantServerName {
t.Errorf("ServerName = %q, want %q", gotName, tt.wantServerName)
}
})
}
}
func TestClient_ConnectedIPEmptyWhenNotConnected(t *testing.T) {
c := NewClient("rel://example.invalid:80", hmacTokenStore, "x", iface.DefaultMTU)
if got := c.ConnectedIP(); got.IsValid() {
t.Fatalf("ConnectedIP on disconnected client = %q, want zero", got)
}
}
// staticAddr is a net.Addr that returns a fixed string. Used to verify
// ConnectedIP parses RemoteAddr correctly.
type staticAddr struct{ s string }
func (a staticAddr) Network() string { return "tcp" }
func (a staticAddr) String() string { return a.s }
type stubConn struct {
net.Conn
remote net.Addr
}
func (s stubConn) RemoteAddr() net.Addr { return s.remote }
func TestClient_ConnectedIPParsesRemoteAddr(t *testing.T) {
tests := []struct {
name string
s string
want string
}{
{"hostport ipv4", "127.0.0.1:50301", "127.0.0.1"},
{"hostport ipv6 bracketed", "[::1]:50301", "::1"},
{"url with ipv4", "rel://127.0.0.1:50301", "127.0.0.1"},
{"url with ipv6", "rels://[2001:db8::1]:443", "2001:db8::1"},
{"fqdn url returns empty", "rel://relay.example.com:50301", ""},
{"fqdn hostport returns empty", "relay.example.com:50301", ""},
{"plain ipv4 no port", "10.0.0.1", "10.0.0.1"},
{"empty", "", ""},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
c := &Client{relayConn: stubConn{remote: staticAddr{s: tt.s}}}
got := c.ConnectedIP()
var gotStr string
if got.IsValid() {
gotStr = got.String()
}
if gotStr != tt.want {
t.Errorf("ConnectedIP(%q) = %q, want %q", tt.s, gotStr, tt.want)
}
})
}
}
// freeAddr returns a 127.0.0.1 address with an OS-assigned port. The
// listener is closed before returning, so the port is briefly free for
// the caller to bind. Avoids hardcoded ports that can collide.
func freeAddr(t *testing.T) (string, int) {
t.Helper()
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("get free port: %s", err)
}
addr := l.Addr().(*net.TCPAddr)
_ = l.Close()
return addr.String(), addr.Port
}

View File

@@ -23,7 +23,7 @@ func (d Dialer) Protocol() string {
return Network
}
func (d Dialer) Dial(ctx context.Context, address string) (net.Conn, error) {
func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn, error) {
quicURL, err := prepareURL(address)
if err != nil {
return nil, err
@@ -32,11 +32,14 @@ func (d Dialer) Dial(ctx context.Context, address string) (net.Conn, error) {
// Get the base TLS config
tlsClientConfig := quictls.ClientQUICTLSConfig()
// Set ServerName to hostname if not an IP address
host, _, splitErr := net.SplitHostPort(quicURL)
if splitErr == nil && net.ParseIP(host) == nil {
// It's a hostname, not an IP - modify directly
tlsClientConfig.ServerName = host
switch {
case serverName != "" && net.ParseIP(serverName) == nil:
tlsClientConfig.ServerName = serverName
default:
host, _, splitErr := net.SplitHostPort(quicURL)
if splitErr == nil && net.ParseIP(host) == nil {
tlsClientConfig.ServerName = host
}
}
quicConfig := &quic.Config{

View File

@@ -14,7 +14,9 @@ const (
)
type DialeFn interface {
Dial(ctx context.Context, address string) (net.Conn, error)
// Dial connects to address. serverName, when non-empty, overrides the TLS
// ServerName used for SNI/cert validation. Empty means derive from address.
Dial(ctx context.Context, address, serverName string) (net.Conn, error)
Protocol() string
}
@@ -27,6 +29,7 @@ type dialResult struct {
type RaceDial struct {
log *log.Entry
serverURL string
serverName string
dialerFns []DialeFn
connectionTimeout time.Duration
}
@@ -40,6 +43,16 @@ func NewRaceDial(log *log.Entry, connectionTimeout time.Duration, serverURL stri
}
}
// WithServerName sets a TLS SNI/cert validation override. Used when serverURL
// contains an IP literal but the cert is issued for a different hostname.
//
// Mutates the receiver and is not safe for concurrent reconfiguration; a
// RaceDial is intended to be constructed per dial and discarded.
func (r *RaceDial) WithServerName(serverName string) *RaceDial {
r.serverName = serverName
return r
}
func (r *RaceDial) Dial(ctx context.Context) (net.Conn, error) {
connChan := make(chan dialResult, len(r.dialerFns))
winnerConn := make(chan net.Conn, 1)
@@ -64,7 +77,7 @@ func (r *RaceDial) dial(dfn DialeFn, abortCtx context.Context, connChan chan dia
defer cancel()
r.log.Infof("dialing Relay server via %s", dfn.Protocol())
conn, err := dfn.Dial(ctx, r.serverURL)
conn, err := dfn.Dial(ctx, r.serverURL, r.serverName)
connChan <- dialResult{Conn: conn, Protocol: dfn.Protocol(), Err: err}
}

View File

@@ -28,7 +28,7 @@ type MockDialer struct {
protocolStr string
}
func (m *MockDialer) Dial(ctx context.Context, address string) (net.Conn, error) {
func (m *MockDialer) Dial(ctx context.Context, address, _ string) (net.Conn, error) {
return m.dialFunc(ctx, address)
}

View File

@@ -12,14 +12,24 @@ import (
type Conn struct {
ctx context.Context
*websocket.Conn
remoteAddr WebsocketAddr
remoteAddr net.Addr
}
func NewConn(wsConn *websocket.Conn, serverAddress string) net.Conn {
// NewConn builds a relay ws.Conn. underlying is the raw TCP/TLS conn captured
// from the http transport's DialContext; when set, RemoteAddr returns its
// peer address (an IP literal). When nil (e.g. wasm), RemoteAddr falls back
// to the dial-time URL.
func NewConn(wsConn *websocket.Conn, serverAddress string, underlying net.Conn) net.Conn {
var addr net.Addr = WebsocketAddr{serverAddress}
if underlying != nil {
if ra := underlying.RemoteAddr(); ra != nil {
addr = ra
}
}
return &Conn{
ctx: context.Background(),
Conn: wsConn,
remoteAddr: WebsocketAddr{serverAddress},
remoteAddr: addr,
}
}

View File

@@ -2,10 +2,14 @@
package ws
import "github.com/coder/websocket"
import (
"net"
func createDialOptions() *websocket.DialOptions {
"github.com/coder/websocket"
)
func createDialOptions(serverName string, underlyingOut *net.Conn) *websocket.DialOptions {
return &websocket.DialOptions{
HTTPClient: httpClientNbDialer(),
HTTPClient: httpClientNbDialer(serverName, underlyingOut),
}
}

View File

@@ -2,9 +2,13 @@
package ws
import "github.com/coder/websocket"
import (
"net"
func createDialOptions() *websocket.DialOptions {
// WASM version doesn't support HTTPClient
"github.com/coder/websocket"
)
func createDialOptions(_ string, _ *net.Conn) *websocket.DialOptions {
// WASM version doesn't support HTTPClient or custom TLS config.
return &websocket.DialOptions{}
}

View File

@@ -26,13 +26,14 @@ func (d Dialer) Protocol() string {
return "WS"
}
func (d Dialer) Dial(ctx context.Context, address string) (net.Conn, error) {
func (d Dialer) Dial(ctx context.Context, address, serverName string) (net.Conn, error) {
wsURL, err := prepareURL(address)
if err != nil {
return nil, err
}
opts := createDialOptions()
var underlying net.Conn
opts := createDialOptions(serverName, &underlying)
parsedURL, err := url.Parse(wsURL)
if err != nil {
@@ -52,7 +53,7 @@ func (d Dialer) Dial(ctx context.Context, address string) (net.Conn, error) {
_ = resp.Body.Close()
}
conn := NewConn(wsConn, address)
conn := NewConn(wsConn, address, underlying)
return conn, nil
}
@@ -64,7 +65,10 @@ func prepareURL(address string) (string, error) {
return strings.Replace(address, "rel", "ws", 1), nil
}
func httpClientNbDialer() *http.Client {
// httpClientNbDialer builds the http client used by the websocket library.
// underlyingOut, when non-nil, is populated with the raw conn from the
// transport's DialContext so the caller can read its RemoteAddr.
func httpClientNbDialer(serverName string, underlyingOut *net.Conn) *http.Client {
customDialer := nbnet.NewDialer()
certPool, err := x509.SystemCertPool()
@@ -75,10 +79,15 @@ func httpClientNbDialer() *http.Client {
customTransport := &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return customDialer.DialContext(ctx, network, addr)
c, err := customDialer.DialContext(ctx, network, addr)
if err == nil && underlyingOut != nil {
*underlyingOut = c
}
return c, err
},
TLSClientConfig: &tls.Config{
RootCAs: certPool,
RootCAs: certPool,
ServerName: serverName,
},
}

View File

@@ -5,6 +5,7 @@ import (
"context"
"fmt"
"net"
"net/netip"
"reflect"
"sync"
"time"
@@ -75,6 +76,9 @@ type Manager struct {
mtu uint16
maxBackoffInterval time.Duration
cleanupInterval time.Duration
keepUnusedServerTime time.Duration
}
// NewManager creates a new manager instance.
@@ -95,6 +99,8 @@ func NewManager(ctx context.Context, serverURLs []string, peerID string, mtu uin
},
relayClients: make(map[string]*RelayTrack),
onDisconnectedListeners: make(map[string]*list.List),
cleanupInterval: relayCleanupInterval,
keepUnusedServerTime: keepUnusedServerTime,
}
for _, opt := range opts {
opt(m)
@@ -130,7 +136,10 @@ func (m *Manager) Serve() error {
// OpenConn opens a connection to the given peer key. If the peer is on the same relay server, the connection will be
// established via the relay server. If the peer is on a different relay server, the manager will establish a new
// connection to the relay server. It returns back with a net.Conn what represent the remote peer connection.
func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string) (net.Conn, error) {
//
// serverIP, when valid and serverAddress is foreign, is used as a dial target if the FQDN-based dial fails.
// Ignored for the local home-server path. TLS verification still uses the FQDN via SNI.
func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string, serverIP netip.Addr) (net.Conn, error) {
m.relayClientMu.RLock()
defer m.relayClientMu.RUnlock()
@@ -151,7 +160,7 @@ func (m *Manager) OpenConn(ctx context.Context, serverAddress, peerKey string) (
netConn, err = m.relayClient.OpenConn(ctx, peerKey)
} else {
log.Debugf("open peer connection via foreign server: %s", serverAddress)
netConn, err = m.openConnVia(ctx, serverAddress, peerKey)
netConn, err = m.openConnVia(ctx, serverAddress, peerKey, serverIP)
}
if err != nil {
return nil, err
@@ -203,16 +212,22 @@ func (m *Manager) AddCloseListener(serverAddress string, onClosedListener OnServ
return nil
}
// RelayInstanceAddress returns the address of the permanent relay server. It could change if the network connection is
// lost. This address will be sent to the target peer to choose the common relay server for the communication.
func (m *Manager) RelayInstanceAddress() (string, error) {
// RelayInstanceAddress returns the address and resolved IP of the permanent relay server. It could change if the
// network connection is lost. The address is sent to the target peer to choose the common relay server for the
// communication; the IP is sent alongside so remote peers can dial directly without their own DNS lookup. Both
// values are read under the same lock so they cannot diverge across a reconnection.
func (m *Manager) RelayInstanceAddress() (string, netip.Addr, error) {
m.relayClientMu.RLock()
defer m.relayClientMu.RUnlock()
if m.relayClient == nil {
return "", ErrRelayClientNotConnected
return "", netip.Addr{}, ErrRelayClientNotConnected
}
return m.relayClient.ServerInstanceURL()
addr, err := m.relayClient.ServerInstanceURL()
if err != nil {
return "", netip.Addr{}, err
}
return addr, m.relayClient.ConnectedIP(), nil
}
// ServerURLs returns the addresses of the relay servers.
@@ -236,7 +251,7 @@ func (m *Manager) UpdateToken(token *relayAuth.Token) error {
return m.tokenStore.UpdateToken(token)
}
func (m *Manager) openConnVia(ctx context.Context, serverAddress, peerKey string) (net.Conn, error) {
func (m *Manager) openConnVia(ctx context.Context, serverAddress, peerKey string, serverIP netip.Addr) (net.Conn, error) {
// check if already has a connection to the desired relay server
m.relayClientsMutex.RLock()
rt, ok := m.relayClients[serverAddress]
@@ -271,7 +286,7 @@ func (m *Manager) openConnVia(ctx context.Context, serverAddress, peerKey string
m.relayClients[serverAddress] = rt
m.relayClientsMutex.Unlock()
relayClient := NewClient(serverAddress, m.tokenStore, m.peerID, m.mtu)
relayClient := NewClientWithServerIP(serverAddress, serverIP, m.tokenStore, m.peerID, m.mtu)
err := relayClient.Connect(m.ctx)
if err != nil {
rt.err = err
@@ -364,7 +379,7 @@ func (m *Manager) isForeignServer(address string) (bool, error) {
}
func (m *Manager) startCleanupLoop() {
ticker := time.NewTicker(relayCleanupInterval)
ticker := time.NewTicker(m.cleanupInterval)
defer ticker.Stop()
for {
select {
@@ -389,7 +404,7 @@ func (m *Manager) cleanUpUnusedRelays() {
continue
}
if time.Since(rt.created) <= keepUnusedServerTime {
if time.Since(rt.created) <= m.keepUnusedServerTime {
rt.Unlock()
continue
}

View File

@@ -0,0 +1,144 @@
package client
import (
"context"
"io"
"net/netip"
"testing"
"time"
"github.com/netbirdio/netbird/client/iface"
"github.com/netbirdio/netbird/relay/server"
)
// TestManager_ForeignRelayServerIP exercises the foreign-relay path
// end-to-end through Manager.OpenConn. Alice and Bob register on different
// relay servers; Alice dials Bob's foreign relay using an unresolvable
// FQDN. Without a server IP the dial fails; with Bob's advertised IP it
// recovers and a payload round-trips between the peers.
func TestManager_ForeignRelayServerIP(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// Alice's home relay
homeCfg := server.ListenerConfig{Address: "127.0.0.1:52401"}
homeSrv, err := server.NewServer(newManagerTestServerConfig(homeCfg.Address))
if err != nil {
t.Fatalf("create home server: %s", err)
}
homeErr := make(chan error, 1)
go func() {
if err := homeSrv.Listen(homeCfg); err != nil {
homeErr <- err
}
}()
t.Cleanup(func() { _ = homeSrv.Shutdown(context.Background()) })
if err := waitForServerToStart(homeErr); err != nil {
t.Fatalf("home server: %s", err)
}
// Bob's foreign relay
foreignCfg := server.ListenerConfig{Address: "127.0.0.1:52402"}
foreignSrv, err := server.NewServer(newManagerTestServerConfig(foreignCfg.Address))
if err != nil {
t.Fatalf("create foreign server: %s", err)
}
foreignErr := make(chan error, 1)
go func() {
if err := foreignSrv.Listen(foreignCfg); err != nil {
foreignErr <- err
}
}()
t.Cleanup(func() { _ = foreignSrv.Shutdown(context.Background()) })
if err := waitForServerToStart(foreignErr); err != nil {
t.Fatalf("foreign server: %s", err)
}
mCtx, mCancel := context.WithCancel(ctx)
t.Cleanup(mCancel)
mgrAlice := NewManager(mCtx, toURL(homeCfg), "alice", iface.DefaultMTU)
if err := mgrAlice.Serve(); err != nil {
t.Fatalf("alice manager serve: %s", err)
}
mgrBob := NewManager(mCtx, toURL(foreignCfg), "bob", iface.DefaultMTU)
if err := mgrBob.Serve(); err != nil {
t.Fatalf("bob manager serve: %s", err)
}
// Bob's real relay URL and the IP that would ride along in signal as relayServerIP.
bobRealAddr, bobAdvertisedIP, err := mgrBob.RelayInstanceAddress()
if err != nil {
t.Fatalf("bob relay address: %s", err)
}
if !bobAdvertisedIP.IsValid() {
t.Fatalf("expected valid RelayInstanceIP for bob, got zero")
}
// .invalid is reserved (RFC 2606), so DNS resolution always fails.
const brokenFQDN = "rel://relay-bob-instance.invalid:52402"
if brokenFQDN == bobRealAddr {
t.Fatalf("broken FQDN must differ from bob's real address (%s)", bobRealAddr)
}
t.Run("no server IP, dial fails", func(t *testing.T) {
dialCtx, dialCancel := context.WithTimeout(ctx, 5*time.Second)
defer dialCancel()
_, err := mgrAlice.OpenConn(dialCtx, brokenFQDN, "bob", netip.Addr{})
if err == nil {
t.Fatalf("expected OpenConn to fail without server IP, got success")
}
})
t.Run("server IP recovers", func(t *testing.T) {
// Bob waits for Alice's incoming peer connection on his side.
bobSideCh := make(chan error, 1)
go func() {
conn, err := mgrBob.OpenConn(ctx, bobRealAddr, "alice", netip.Addr{})
if err != nil {
bobSideCh <- err
return
}
buf := make([]byte, 1024)
n, err := conn.Read(buf)
if err != nil {
bobSideCh <- err
return
}
if _, err := conn.Write(buf[:n]); err != nil {
bobSideCh <- err
return
}
bobSideCh <- nil
}()
aliceConn, err := mgrAlice.OpenConn(ctx, brokenFQDN, "bob", bobAdvertisedIP)
if err != nil {
t.Fatalf("alice OpenConn with server IP: %s", err)
}
t.Cleanup(func() { _ = aliceConn.Close() })
payload := []byte("alice-to-bob")
if _, err := aliceConn.Write(payload); err != nil {
t.Fatalf("alice write: %s", err)
}
buf := make([]byte, len(payload))
if _, err := io.ReadFull(aliceConn, buf); err != nil {
t.Fatalf("alice read echo: %s", err)
}
if string(buf) != string(payload) {
t.Fatalf("echo mismatch: got %q want %q", buf, payload)
}
select {
case err := <-bobSideCh:
if err != nil {
t.Fatalf("bob side: %s", err)
}
case <-time.After(5 * time.Second):
t.Fatalf("timed out waiting for bob side")
}
})
}

View File

@@ -3,6 +3,7 @@ package client
import (
"context"
"fmt"
"net/netip"
"testing"
"time"
@@ -101,15 +102,15 @@ func TestForeignConn(t *testing.T) {
if err := clientBob.Serve(); err != nil {
t.Fatalf("failed to serve manager: %s", err)
}
bobsSrvAddr, err := clientBob.RelayInstanceAddress()
bobsSrvAddr, _, err := clientBob.RelayInstanceAddress()
if err != nil {
t.Fatalf("failed to get relay address: %s", err)
}
connAliceToBob, err := clientAlice.OpenConn(ctx, bobsSrvAddr, "bob")
connAliceToBob, err := clientAlice.OpenConn(ctx, bobsSrvAddr, "bob", netip.Addr{})
if err != nil {
t.Fatalf("failed to bind channel: %s", err)
}
connBobToAlice, err := clientBob.OpenConn(ctx, bobsSrvAddr, "alice")
connBobToAlice, err := clientBob.OpenConn(ctx, bobsSrvAddr, "alice", netip.Addr{})
if err != nil {
t.Fatalf("failed to bind channel: %s", err)
}
@@ -209,7 +210,7 @@ func TestForeginConnClose(t *testing.T) {
if err != nil {
t.Fatalf("failed to serve manager: %s", err)
}
conn, err := mgr.OpenConn(ctx, toURL(srvCfg2)[0], "bob")
conn, err := mgr.OpenConn(ctx, toURL(srvCfg2)[0], "bob", netip.Addr{})
if err != nil {
t.Fatalf("failed to bind channel: %s", err)
}
@@ -301,7 +302,7 @@ func TestForeignAutoClose(t *testing.T) {
}
t.Log("open connection to another peer")
if _, err = mgr.OpenConn(ctx, foreignServerURL, "anotherpeer"); err == nil {
if _, err = mgr.OpenConn(ctx, foreignServerURL, "anotherpeer", netip.Addr{}); err == nil {
t.Fatalf("should have failed to open connection to another peer")
}
@@ -367,11 +368,11 @@ func TestAutoReconnect(t *testing.T) {
if err != nil {
t.Fatalf("failed to serve manager: %s", err)
}
ra, err := clientAlice.RelayInstanceAddress()
ra, _, err := clientAlice.RelayInstanceAddress()
if err != nil {
t.Errorf("failed to get relay address: %s", err)
}
conn, err := clientAlice.OpenConn(ctx, ra, "bob")
conn, err := clientAlice.OpenConn(ctx, ra, "bob", netip.Addr{})
if err != nil {
t.Errorf("failed to bind channel: %s", err)
}
@@ -391,7 +392,7 @@ func TestAutoReconnect(t *testing.T) {
}
log.Infof("reopent the connection")
_, err = clientAlice.OpenConn(ctx, ra, "bob")
_, err = clientAlice.OpenConn(ctx, ra, "bob", netip.Addr{})
if err != nil {
t.Errorf("failed to open channel: %s", err)
}
@@ -453,7 +454,7 @@ func TestNotifierDoubleAdd(t *testing.T) {
t.Fatalf("failed to serve manager: %s", err)
}
conn1, err := clientAlice.OpenConn(ctx, clientAlice.ServerURLs()[0], "bob")
conn1, err := clientAlice.OpenConn(ctx, clientAlice.ServerURLs()[0], "bob", netip.Addr{})
if err != nil {
t.Fatalf("failed to bind channel: %s", err)
}

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
"net/netip"
"strings"
"github.com/netbirdio/netbird/shared/signal/proto"
@@ -14,17 +15,17 @@ import (
// A set of tools to exchange connection details (Wireguard endpoints) with the remote peer.
// Status is the status of the client
type Status string
const StreamConnected Status = "Connected"
const StreamDisconnected Status = "Disconnected"
const (
StreamConnected Status = "Connected"
StreamDisconnected Status = "Disconnected"
// DirectCheck indicates support to direct mode checks
DirectCheck uint32 = 1
)
// Status is the status of the client
type Status string
type Client interface {
io.Closer
StreamConnected() bool
@@ -38,6 +39,24 @@ type Client interface {
SetOnReconnectedListener(func())
}
// Credential is an instance of a GrpcClient's Credential
type Credential struct {
UFrag string
Pwd string
}
// CredentialPayload bundles the fields of a signal Body for MarshalCredential.
type CredentialPayload struct {
Type proto.Body_Type
WgListenPort int
Credential *Credential
RosenpassPubKey []byte
RosenpassAddr string
RelaySrvAddress string
RelaySrvIP netip.Addr
SessionID []byte
}
// UnMarshalCredential parses the credentials from the message and returns a Credential instance
func UnMarshalCredential(msg *proto.Message) (*Credential, error) {
@@ -52,27 +71,27 @@ func UnMarshalCredential(msg *proto.Message) (*Credential, error) {
}
// MarshalCredential marshal a Credential instance and returns a Message object
func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey string, credential *Credential, t proto.Body_Type, rosenpassPubKey []byte, rosenpassAddr string, relaySrvAddress string, sessionID []byte) (*proto.Message, error) {
func MarshalCredential(myKey wgtypes.Key, remoteKey string, p CredentialPayload) (*proto.Message, error) {
body := &proto.Body{
Type: p.Type,
Payload: fmt.Sprintf("%s:%s", p.Credential.UFrag, p.Credential.Pwd),
WgListenPort: uint32(p.WgListenPort),
NetBirdVersion: version.NetbirdVersion(),
RosenpassConfig: &proto.RosenpassConfig{
RosenpassPubKey: p.RosenpassPubKey,
RosenpassServerAddr: p.RosenpassAddr,
},
SessionId: p.SessionID,
}
if p.RelaySrvAddress != "" {
body.RelayServerAddress = &p.RelaySrvAddress
}
if p.RelaySrvIP.IsValid() {
body.RelayServerIP = p.RelaySrvIP.Unmap().AsSlice()
}
return &proto.Message{
Key: myKey.PublicKey().String(),
RemoteKey: remoteKey,
Body: &proto.Body{
Type: t,
Payload: fmt.Sprintf("%s:%s", credential.UFrag, credential.Pwd),
WgListenPort: uint32(myPort),
NetBirdVersion: version.NetbirdVersion(),
RosenpassConfig: &proto.RosenpassConfig{
RosenpassPubKey: rosenpassPubKey,
RosenpassServerAddr: rosenpassAddr,
},
RelayServerAddress: relaySrvAddress,
SessionId: sessionID,
},
Body: body,
}, nil
}
// Credential is an instance of a GrpcClient's Credential
type Credential struct {
UFrag string
Pwd string
}

View File

@@ -167,7 +167,7 @@ func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Mes
// start receiving messages from the Signal stream (from other peers through signal)
err = c.receive(stream)
if err != nil {
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
if ctx.Err() != nil {
log.Debugf("signal connection context has been canceled, this usually indicates shutdown")
return nil
}

View File

@@ -229,8 +229,13 @@ type Body struct {
// RosenpassConfig is a Rosenpass config of the remote peer our peer tries to connect to
RosenpassConfig *RosenpassConfig `protobuf:"bytes,7,opt,name=rosenpassConfig,proto3" json:"rosenpassConfig,omitempty"`
// relayServerAddress is url of the relay server
RelayServerAddress string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3" json:"relayServerAddress,omitempty"`
SessionId []byte `protobuf:"bytes,10,opt,name=sessionId,proto3,oneof" json:"sessionId,omitempty"`
RelayServerAddress *string `protobuf:"bytes,8,opt,name=relayServerAddress,proto3,oneof" json:"relayServerAddress,omitempty"`
SessionId []byte `protobuf:"bytes,10,opt,name=sessionId,proto3,oneof" json:"sessionId,omitempty"`
// relayServerIP is the IP the sender is connected to on its relay server,
// encoded as 4 bytes (IPv4) or 16 bytes (IPv6). Receivers may use it as a
// fallback dial target when DNS resolution of relayServerAddress fails.
// SNI/TLS verification still uses relayServerAddress.
RelayServerIP []byte `protobuf:"bytes,11,opt,name=relayServerIP,proto3,oneof" json:"relayServerIP,omitempty"`
}
func (x *Body) Reset() {
@@ -315,8 +320,8 @@ func (x *Body) GetRosenpassConfig() *RosenpassConfig {
}
func (x *Body) GetRelayServerAddress() string {
if x != nil {
return x.RelayServerAddress
if x != nil && x.RelayServerAddress != nil {
return *x.RelayServerAddress
}
return ""
}
@@ -328,6 +333,13 @@ func (x *Body) GetSessionId() []byte {
return nil
}
func (x *Body) GetRelayServerIP() []byte {
if x != nil {
return x.RelayServerIP
}
return nil
}
// Mode indicates a connection mode
type Mode struct {
state protoimpl.MessageState
@@ -451,7 +463,7 @@ var file_signalexchange_proto_rawDesc = []byte{
0x52, 0x09, 0x72, 0x65, 0x6d, 0x6f, 0x74, 0x65, 0x4b, 0x65, 0x79, 0x12, 0x28, 0x0a, 0x04, 0x62,
0x6f, 0x64, 0x79, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x73, 0x69, 0x67, 0x6e,
0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f, 0x64, 0x79, 0x52,
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xe4, 0x03, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
0x04, 0x62, 0x6f, 0x64, 0x79, 0x22, 0xc3, 0x04, 0x0a, 0x04, 0x42, 0x6f, 0x64, 0x79, 0x12, 0x2d,
0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x19, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x42, 0x6f,
0x64, 0x79, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x18, 0x0a,
@@ -471,40 +483,46 @@ var file_signalexchange_proto_rawDesc = []byte{
0x20, 0x01, 0x28, 0x0b, 0x32, 0x1f, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63,
0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43,
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x2e, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x33, 0x0a, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53,
0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x08, 0x20, 0x01,
0x28, 0x09, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x12, 0x21, 0x0a, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x49, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x00, 0x52, 0x09, 0x73, 0x65, 0x73,
0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70,
0x28, 0x09, 0x48, 0x00, 0x52, 0x12, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65,
0x72, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x88, 0x01, 0x01, 0x12, 0x21, 0x0a, 0x09, 0x73,
0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x01,
0x52, 0x09, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x29,
0x0a, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x49, 0x50, 0x18,
0x0b, 0x20, 0x01, 0x28, 0x0c, 0x48, 0x02, 0x52, 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65,
0x72, 0x76, 0x65, 0x72, 0x49, 0x50, 0x88, 0x01, 0x01, 0x22, 0x43, 0x0a, 0x04, 0x54, 0x79, 0x70,
0x65, 0x12, 0x09, 0x0a, 0x05, 0x4f, 0x46, 0x46, 0x45, 0x52, 0x10, 0x00, 0x12, 0x0a, 0x0a, 0x06,
0x41, 0x4e, 0x53, 0x57, 0x45, 0x52, 0x10, 0x01, 0x12, 0x0d, 0x0a, 0x09, 0x43, 0x41, 0x4e, 0x44,
0x49, 0x44, 0x41, 0x54, 0x45, 0x10, 0x02, 0x12, 0x08, 0x0a, 0x04, 0x4d, 0x4f, 0x44, 0x45, 0x10,
0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x0c,
0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f, 0x6e, 0x49, 0x64, 0x22, 0x2e, 0x0a, 0x04,
0x4d, 0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01,
0x20, 0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01,
0x01, 0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f,
0x52, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
0x28, 0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b,
0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70,
0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73,
0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72,
0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73,
0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e,
0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c,
0x0a, 0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65,
0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65,
0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61,
0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70,
0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d,
0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e,
0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45,
0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a,
0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65,
0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67,
0x65, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x04, 0x12, 0x0b, 0x0a, 0x07, 0x47, 0x4f, 0x5f, 0x49, 0x44, 0x4c, 0x45, 0x10, 0x05, 0x42, 0x15,
0x0a, 0x13, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64,
0x64, 0x72, 0x65, 0x73, 0x73, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x73, 0x65, 0x73, 0x73, 0x69, 0x6f,
0x6e, 0x49, 0x64, 0x42, 0x10, 0x0a, 0x0e, 0x5f, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x53, 0x65, 0x72,
0x76, 0x65, 0x72, 0x49, 0x50, 0x4a, 0x04, 0x08, 0x09, 0x10, 0x0a, 0x22, 0x2e, 0x0a, 0x04, 0x4d,
0x6f, 0x64, 0x65, 0x12, 0x1b, 0x0a, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x18, 0x01, 0x20,
0x01, 0x28, 0x08, 0x48, 0x00, 0x52, 0x06, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x88, 0x01, 0x01,
0x42, 0x09, 0x0a, 0x07, 0x5f, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x22, 0x6d, 0x0a, 0x0f, 0x52,
0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x28,
0x0a, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65,
0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0f, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61,
0x73, 0x73, 0x50, 0x75, 0x62, 0x4b, 0x65, 0x79, 0x12, 0x30, 0x0a, 0x13, 0x72, 0x6f, 0x73, 0x65,
0x6e, 0x70, 0x61, 0x73, 0x73, 0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x13, 0x72, 0x6f, 0x73, 0x65, 0x6e, 0x70, 0x61, 0x73, 0x73,
0x53, 0x65, 0x72, 0x76, 0x65, 0x72, 0x41, 0x64, 0x64, 0x72, 0x32, 0xb9, 0x01, 0x0a, 0x0e, 0x53,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x45, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x4c, 0x0a,
0x04, 0x53, 0x65, 0x6e, 0x64, 0x12, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78,
0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64,
0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20, 0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c,
0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74,
0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x0d, 0x43,
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x20, 0x2e, 0x73,
0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e, 0x45, 0x6e,
0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x1a, 0x20,
0x2e, 0x73, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x65, 0x78, 0x63, 0x68, 0x61, 0x6e, 0x67, 0x65, 0x2e,
0x45, 0x6e, 0x63, 0x72, 0x79, 0x70, 0x74, 0x65, 0x64, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65,
0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@@ -63,9 +63,17 @@ message Body {
RosenpassConfig rosenpassConfig = 7;
// relayServerAddress is url of the relay server
string relayServerAddress = 8;
optional string relayServerAddress = 8;
reserved 9;
optional bytes sessionId = 10;
// relayServerIP is the IP the sender is connected to on its relay server,
// encoded as 4 bytes (IPv4) or 16 bytes (IPv6). Receivers may use it as a
// fallback dial target when DNS resolution of relayServerAddress fails.
// SNI/TLS verification still uses relayServerAddress.
optional bytes relayServerIP = 11;
}
// Mode indicates a connection mode