- Implement remote addr for conn

- Eliminate cached offeranswer arguments
- Fix exponent reset in conn reconnect loop
- Fix on disconnected callback for permanent server
- Add peer relay status for status details command
This commit is contained in:
Zoltán Papp
2024-07-16 11:02:32 +02:00
parent add4e9f4e4
commit 4ea55bfe3c
16 changed files with 544 additions and 357 deletions

View File

@@ -146,7 +146,6 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
OnStatusChanged: conn.onWorkerICEStateDisconnected,
}
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler)
conn.workerRelay = NewWorkerRelay(ctx, connLog, config, relayManager, rFns)
relayIsSupportedLocally := conn.workerRelay.RelayIsSupportedLocally()
@@ -155,6 +154,8 @@ func NewConn(engineCtx context.Context, config ConnConfig, statusRecorder *Statu
return nil, err
}
conn.handshaker = NewHandshaker(ctx, connLog, config, signaler, conn.workerICE, conn.workerRelay)
conn.handshaker.AddOnNewOfferListener(conn.workerRelay.OnNewOffer)
if os.Getenv("NB_FORCE_RELAY") != "true" {
conn.handshaker.AddOnNewOfferListener(conn.workerICE.OnNewOffer)
@@ -188,7 +189,7 @@ func (conn *Conn) Open() {
conn.waitInitialRandomSleepTime()
err = conn.doHandshake()
err = conn.handshaker.sendOffer()
if err != nil {
conn.log.Errorf("failed to send offer: %v", err)
}
@@ -323,57 +324,62 @@ func (conn *Conn) reconnectLoopWithRetry() {
case <-time.After(3 * time.Second):
}
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0,
Multiplier: 1.99,
MaxInterval: conn.config.Timeout * time.Second,
MaxElapsedTime: 0,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, conn.ctx)
ticker := backoff.NewTicker(bo)
defer ticker.Stop()
<-ticker.C // consume the initial tick what is happening right after the ticker has been created
no := time.Now()
for {
select {
case t := <-ticker.C:
if t.IsZero() {
// in case if the ticker has been canceled by context then avoid the temporary loop
bo := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 800 * time.Millisecond,
RandomizationFactor: 0, // todo: add randomisation factor
Multiplier: 1.99,
MaxInterval: conn.config.Timeout * time.Second,
MaxElapsedTime: 0,
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, conn.ctx)
ticker := backoff.NewTicker(bo)
defer ticker.Stop()
<-ticker.C // consume the initial tick what is happening right after the ticker has been created
no := time.Now()
L:
for {
select {
case t := <-ticker.C:
if t.IsZero() {
// in case if the ticker has been canceled by context then avoid the temporary loop
return
}
// checks if there is peer connection is established via relay or ice and that it has a wireguard handshake and skip offer
// todo check wg handshake
conn.log.Tracef("ticker timedout, relay state: %s, ice state: %s, elapsed time: %s", conn.statusRelay, conn.statusICE, time.Since(no))
no = time.Now()
if conn.statusRelay == StatusConnected && conn.statusICE == StatusConnected {
continue
}
conn.log.Debugf("ticker timed out, retry to do handshake")
err := conn.handshaker.sendOffer()
if err != nil {
conn.log.Errorf("failed to do handshake: %v", err)
}
case changed := <-conn.relayDisconnected:
if !changed {
continue
}
conn.log.Debugf("Relay state changed, reset reconnect timer")
ticker.Stop()
break L
case changed := <-conn.iCEDisconnected:
if !changed {
continue
}
conn.log.Debugf("ICE state changed, reset reconnect timer")
ticker.Stop()
break L
case <-conn.ctx.Done():
return
}
// checks if there is peer connection is established via relay or ice and that it has a wireguard handshake and skip offer
// todo check wg handshake
conn.log.Tracef("ticker timedout, relay state: %s, ice state: %s, elapsed time: %s", conn.statusRelay, conn.statusICE, time.Since(no))
no = time.Now()
if conn.statusRelay == StatusConnected && conn.statusICE == StatusConnected {
continue
}
log.Debugf("ticker timed out, retry to do handshake")
err := conn.doHandshake()
if err != nil {
conn.log.Errorf("failed to do handshake: %v", err)
}
case changed := <-conn.relayDisconnected:
if !changed {
continue
}
conn.log.Debugf("Relay state changed, reset reconnect timer")
bo.Reset()
case changed := <-conn.iCEDisconnected:
if !changed {
continue
}
conn.log.Debugf("ICE state changed, reset reconnect timer")
bo.Reset()
case <-conn.ctx.Done():
return
}
}
}
@@ -399,7 +405,7 @@ func (conn *Conn) reconnectLoopForOnDisconnectedEvent() {
return
}
err := conn.doHandshake()
err := conn.handshaker.SendOffer()
if err != nil {
conn.log.Errorf("failed to do handshake: %v", err)
}
@@ -419,6 +425,8 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
conn.statusICE = StatusConnected
defer conn.updateIceState(iceConnInfo)
if conn.currentConnType > priority {
return
}
@@ -480,16 +488,7 @@ func (conn *Conn) iCEConnectionIsReady(priority ConnPriority, iceConnInfo ICECon
conn.currentConnType = priority
peerState := State{
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
RemoteIceCandidateType: iceConnInfo.RemoteIceCandidateType,
LocalIceCandidateEndpoint: iceConnInfo.LocalIceCandidateEndpoint,
RemoteIceCandidateEndpoint: iceConnInfo.RemoteIceCandidateEndpoint,
Direct: iceConnInfo.Direct,
Relayed: iceConnInfo.Relayed,
}
conn.updateStatus(peerState, iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
conn.doOnConnected(iceConnInfo.RosenpassPubKey, iceConnInfo.RosenpassAddr)
}
// todo review to make sense to handle connection and disconnected status also?
@@ -498,16 +497,6 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
defer conn.mu.Unlock()
conn.log.Tracef("ICE connection state changed to %s", newState)
defer func() {
changed := conn.statusICE != newState && newState != StatusConnecting
conn.statusICE = newState
select {
case conn.iCEDisconnected <- changed:
default:
}
}()
// switch back to relay connection
if conn.endpointRelay != nil {
@@ -516,26 +505,27 @@ func (conn *Conn) onWorkerICEStateDisconnected(newState ConnStatus) {
if err != nil {
conn.log.Errorf("failed to switch to relay conn: %v", err)
}
// todo update status to relay related things
return
}
if conn.statusRelay == StatusConnected {
return
changed := conn.statusICE != newState && newState != StatusConnecting
conn.statusICE = newState
select {
case conn.iCEDisconnected <- changed:
default:
}
if conn.evalStatus() == newState {
return
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
Direct: false, // todo fix it
Relayed: true, // todo fix it
ConnStatusUpdate: time.Now(),
}
if newState > conn.statusICE {
peerState := State{
PubKey: conn.config.Key,
ConnStatus: newState,
ConnStatusUpdate: time.Now(),
Mux: new(sync.RWMutex),
}
_ = conn.statusRecorder.UpdatePeerState(peerState)
err := conn.statusRecorder.UpdatePeerICEStateToDisconnected(peerState)
if err != nil {
conn.log.Warnf("unable to save peer's state, got error: %v", err)
}
}
@@ -561,6 +551,8 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
conn.endpointRelay = endpointUdpAddr
conn.log.Debugf("conn resolved IP for %s: %s", endpoint, endpointUdpAddr.IP)
defer conn.updateRelayStatus(rci.relayedConn.RemoteAddr().String(), rci.rosenpassPubKey)
if conn.currentConnType > connPriorityRelay {
if conn.statusICE == StatusConnected {
log.Debugf("do not switch to relay because current priority is: %v", conn.currentConnType)
@@ -592,27 +584,13 @@ func (conn *Conn) relayConnectionIsReady(rci RelayConnInfo) {
conn.wgProxyRelay = wgProxy
conn.currentConnType = connPriorityRelay
peerState := State{
Direct: false,
Relayed: true,
}
conn.log.Infof("start to communicate with peer via relay")
conn.updateStatus(peerState, rci.rosenpassPubKey, rci.rosenpassAddr)
conn.doOnConnected(rci.rosenpassPubKey, rci.rosenpassAddr)
}
func (conn *Conn) onWorkerRelayStateDisconnected() {
conn.mu.Lock()
defer conn.mu.Unlock()
defer func() {
changed := conn.statusRelay != StatusDisconnected
conn.statusRelay = StatusDisconnected
select {
case conn.relayDisconnected <- changed:
default:
}
}()
if conn.wgProxyRelay != nil {
conn.endpointRelay = nil
@@ -620,22 +598,25 @@ func (conn *Conn) onWorkerRelayStateDisconnected() {
conn.wgProxyRelay = nil
}
if conn.statusICE == StatusConnected {
return
changed := conn.statusRelay != StatusDisconnected
conn.statusRelay = StatusDisconnected
select {
case conn.relayDisconnected <- changed:
default:
}
if conn.evalStatus() == StatusDisconnected {
return
peerState := State{
PubKey: conn.config.Key,
ConnStatus: conn.evalStatus(),
Direct: false, // todo fix it
Relayed: true, // todo fix it
ConnStatusUpdate: time.Now(),
}
if StatusDisconnected > conn.statusRelay {
peerState := State{
PubKey: conn.config.Key,
ConnStatus: StatusDisconnected,
ConnStatusUpdate: time.Now(),
Mux: new(sync.RWMutex),
}
_ = conn.statusRecorder.UpdatePeerState(peerState)
err := conn.statusRecorder.UpdatePeerRelayedStateToDisconnected(peerState)
if err != nil {
conn.log.Warnf("unable to save peer's state, got error: %v", err)
}
}
@@ -648,18 +629,45 @@ func (conn *Conn) configureWGEndpoint(addr *net.UDPAddr) error {
conn.config.WgConfig.PreSharedKey,
)
}
func (conn *Conn) updateStatus(peerState State, remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
peerState.PubKey = conn.config.Key
peerState.ConnStatus = StatusConnected
peerState.ConnStatusUpdate = time.Now()
peerState.RosenpassEnabled = isRosenpassEnabled(remoteRosenpassPubKey)
peerState.Mux = new(sync.RWMutex)
err := conn.statusRecorder.UpdatePeerState(peerState)
func (conn *Conn) updateRelayStatus(relayServerAddr string, rosenpassPubKey []byte) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatus: conn.evalStatus(), // todo fix it
Direct: false, // todo fix it
Relayed: true, // todo fix it
RelayServerAddress: relayServerAddr,
RosenpassEnabled: isRosenpassEnabled(rosenpassPubKey),
}
err := conn.statusRecorder.UpdatePeerRelayedState(peerState)
if err != nil {
conn.log.Warnf("unable to save peer's state, got error: %v", err)
}
}
func (conn *Conn) updateIceState(iceConnInfo ICEConnInfo) {
peerState := State{
PubKey: conn.config.Key,
ConnStatusUpdate: time.Now(),
ConnStatus: conn.evalStatus(),
Direct: iceConnInfo.Direct, // todo fix it
Relayed: iceConnInfo.Relayed, // todo fix it
LocalIceCandidateType: iceConnInfo.LocalIceCandidateType,
RemoteIceCandidateType: iceConnInfo.RemoteIceCandidateType,
LocalIceCandidateEndpoint: iceConnInfo.LocalIceCandidateEndpoint,
RemoteIceCandidateEndpoint: iceConnInfo.RemoteIceCandidateEndpoint,
RosenpassEnabled: isRosenpassEnabled(iceConnInfo.RosenpassPubKey),
}
err := conn.statusRecorder.UpdatePeerICEState(peerState)
if err != nil {
conn.log.Warnf("unable to save peer's state, got error: %v", err)
}
}
func (conn *Conn) doOnConnected(remoteRosenpassPubKey []byte, remoteRosenpassAddr string) {
if runtime.GOOS == "ios" {
runtime.GC()
}
@@ -669,25 +677,6 @@ func (conn *Conn) updateStatus(peerState State, remoteRosenpassPubKey []byte, re
}
}
func (conn *Conn) doHandshake() error {
if !conn.signaler.Ready() {
return ErrSignalIsNotReady
}
var (
ha HandshakeArgs
err error
)
ha.IceUFrag, ha.IcePwd = conn.workerICE.GetLocalUserCredentials()
addr, err := conn.workerRelay.RelayInstanceAddress()
if err == nil {
ha.RelayAddr = addr
}
conn.log.Tracef("do handshake with args: %v", ha)
return conn.handshaker.SendOffer(ha)
}
func (conn *Conn) waitInitialRandomSleepTime() {
minWait := 100
maxWait := 800

View File

@@ -41,34 +41,30 @@ type OfferAnswer struct {
RelaySrvAddress string
}
type HandshakeArgs struct {
IceUFrag string
IcePwd string
RelayAddr string
}
type Handshaker struct {
mu sync.Mutex
ctx context.Context
log *log.Entry
config ConnConfig
signaler *Signaler
ice *WorkerICE
relay *WorkerRelay
onNewOfferListeners []func(*OfferAnswer)
// remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection
remoteOffersCh chan OfferAnswer
// remoteAnswerCh is a channel used to wait for remote credentials answer (confirmation of our offer) to proceed with the connection
remoteAnswerCh chan OfferAnswer
lastOfferArgs HandshakeArgs
}
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler) *Handshaker {
func NewHandshaker(ctx context.Context, log *log.Entry, config ConnConfig, signaler *Signaler, ice *WorkerICE, relay *WorkerRelay) *Handshaker {
return &Handshaker{
ctx: ctx,
log: log,
config: config,
signaler: signaler,
ice: ice,
relay: relay,
remoteOffersCh: make(chan OfferAnswer),
remoteAnswerCh: make(chan OfferAnswer),
}
@@ -98,17 +94,10 @@ func (h *Handshaker) Listen() {
}
}
func (h *Handshaker) SendOffer(args HandshakeArgs) error {
func (h *Handshaker) SendOffer() error {
h.mu.Lock()
defer h.mu.Unlock()
err := h.sendOffer(args)
if err != nil {
return err
}
h.lastOfferArgs = args
return nil
return h.sendOffer()
}
// OnRemoteOffer handles an offer from the remote peer and returns true if the message was accepted, false otherwise
@@ -163,14 +152,23 @@ func (h *Handshaker) waitForRemoteOfferConfirmation() (*OfferAnswer, error) {
}
// sendOffer prepares local user credentials and signals them to the remote peer
func (h *Handshaker) sendOffer(args HandshakeArgs) error {
func (h *Handshaker) sendOffer() error {
if !h.signaler.Ready() {
return ErrSignalIsNotReady
}
iceUFrag, icePwd := h.ice.GetLocalUserCredentials()
offer := OfferAnswer{
IceCredentials: IceCredentials{args.IceUFrag, args.IcePwd},
IceCredentials: IceCredentials{iceUFrag, icePwd},
WgListenPort: h.config.LocalWgPort,
Version: version.NetbirdVersion(),
RosenpassPubKey: h.config.RosenpassPubKey,
RosenpassAddr: h.config.RosenpassAddr,
RelaySrvAddress: args.RelayAddr,
}
addr, err := h.relay.RelayInstanceAddress()
if err == nil {
offer.RelaySrvAddress = addr
}
return h.signaler.SignalOffer(offer, h.config.Key)
@@ -178,15 +176,21 @@ func (h *Handshaker) sendOffer(args HandshakeArgs) error {
func (h *Handshaker) sendAnswer() error {
h.log.Debugf("sending answer")
uFrag, pwd := h.ice.GetLocalUserCredentials()
answer := OfferAnswer{
IceCredentials: IceCredentials{h.lastOfferArgs.IceUFrag, h.lastOfferArgs.IcePwd},
IceCredentials: IceCredentials{uFrag, pwd},
WgListenPort: h.config.LocalWgPort,
Version: version.NetbirdVersion(),
RosenpassPubKey: h.config.RosenpassPubKey,
RosenpassAddr: h.config.RosenpassAddr,
RelaySrvAddress: h.lastOfferArgs.RelayAddr,
}
err := h.signaler.SignalAnswer(answer, h.config.Key)
addr, err := h.relay.RelayInstanceAddress()
if err == nil {
answer.RelaySrvAddress = addr
}
err = h.signaler.SignalAnswer(answer, h.config.Key)
if err != nil {
return err
}

View File

@@ -23,6 +23,8 @@ type State struct {
PubKey string
FQDN string
ConnStatus ConnStatus
connStatusRelay ConnStatus
connStatusICE ConnStatus
ConnStatusUpdate time.Time
Relayed bool
Direct bool
@@ -30,6 +32,7 @@ type State struct {
RemoteIceCandidateType string
LocalIceCandidateEndpoint string
RemoteIceCandidateEndpoint string
RelayServerAddress string
LastWireguardHandshake time.Time
BytesTx int64
BytesRx int64
@@ -240,7 +243,7 @@ func (d *Status) UpdatePeerState(receivedState State) error {
peerState.SetRoutes(receivedState.GetRoutes())
}
skipNotification := shouldSkipNotify(receivedState, peerState)
skipNotification := shouldSkipNotify(receivedState.ConnStatus, peerState)
if receivedState.ConnStatus != peerState.ConnStatus {
peerState.ConnStatus = receivedState.ConnStatus
@@ -251,6 +254,7 @@ func (d *Status) UpdatePeerState(receivedState State) error {
peerState.RemoteIceCandidateType = receivedState.RemoteIceCandidateType
peerState.LocalIceCandidateEndpoint = receivedState.LocalIceCandidateEndpoint
peerState.RemoteIceCandidateEndpoint = receivedState.RemoteIceCandidateEndpoint
peerState.RelayServerAddress = receivedState.RelayServerAddress
peerState.RosenpassEnabled = receivedState.RosenpassEnabled
}
@@ -270,6 +274,152 @@ func (d *Status) UpdatePeerState(receivedState State) error {
return nil
}
func (d *Status) UpdatePeerICEState(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
return errors.New("peer doesn't exist")
}
if receivedState.IP != "" {
peerState.IP = receivedState.IP
}
skipNotification := shouldSkipNotify(receivedState.ConnStatus, peerState)
peerState.ConnStatus = receivedState.ConnStatus
peerState.ConnStatusUpdate = receivedState.ConnStatusUpdate
peerState.Direct = receivedState.Direct
peerState.Relayed = receivedState.Relayed
peerState.LocalIceCandidateType = receivedState.LocalIceCandidateType
peerState.RemoteIceCandidateType = receivedState.RemoteIceCandidateType
peerState.LocalIceCandidateEndpoint = receivedState.LocalIceCandidateEndpoint
peerState.RemoteIceCandidateEndpoint = receivedState.RemoteIceCandidateEndpoint
peerState.RosenpassEnabled = receivedState.RosenpassEnabled
d.peers[receivedState.PubKey] = peerState
if skipNotification {
return nil
}
ch, found := d.changeNotify[receivedState.PubKey]
if found && ch != nil {
close(ch)
d.changeNotify[receivedState.PubKey] = nil
}
d.notifyPeerListChanged()
return nil
}
func (d *Status) UpdatePeerRelayedState(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
return errors.New("peer doesn't exist")
}
skipNotification := shouldSkipNotify(receivedState.ConnStatus, peerState)
peerState.ConnStatus = receivedState.ConnStatus
peerState.ConnStatusUpdate = receivedState.ConnStatusUpdate
peerState.Direct = receivedState.Direct
peerState.Relayed = receivedState.Relayed
peerState.RelayServerAddress = receivedState.RelayServerAddress
peerState.RosenpassEnabled = receivedState.RosenpassEnabled
d.peers[receivedState.PubKey] = peerState
if skipNotification {
return nil
}
ch, found := d.changeNotify[receivedState.PubKey]
if found && ch != nil {
close(ch)
d.changeNotify[receivedState.PubKey] = nil
}
d.notifyPeerListChanged()
return nil
}
func (d *Status) UpdatePeerRelayedStateToDisconnected(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
return errors.New("peer doesn't exist")
}
skipNotification := shouldSkipNotify(receivedState.ConnStatus, peerState)
peerState.ConnStatus = receivedState.ConnStatus
peerState.Direct = receivedState.Direct
peerState.Relayed = receivedState.Relayed
peerState.ConnStatusUpdate = receivedState.ConnStatusUpdate
peerState.RelayServerAddress = ""
//peerState.RosenpassEnabled = receivedState.RosenpassEnabled // todo: check this variable
d.peers[receivedState.PubKey] = peerState
if skipNotification {
return nil
}
ch, found := d.changeNotify[receivedState.PubKey]
if found && ch != nil {
close(ch)
d.changeNotify[receivedState.PubKey] = nil
}
d.notifyPeerListChanged()
return nil
}
func (d *Status) UpdatePeerICEStateToDisconnected(receivedState State) error {
d.mux.Lock()
defer d.mux.Unlock()
peerState, ok := d.peers[receivedState.PubKey]
if !ok {
return errors.New("peer doesn't exist")
}
skipNotification := shouldSkipNotify(receivedState.ConnStatus, peerState)
peerState.ConnStatus = receivedState.ConnStatus
peerState.Direct = receivedState.Direct
peerState.Relayed = receivedState.Relayed
peerState.ConnStatusUpdate = receivedState.ConnStatusUpdate
peerState.LocalIceCandidateType = receivedState.LocalIceCandidateType
peerState.RemoteIceCandidateType = receivedState.RemoteIceCandidateType
peerState.LocalIceCandidateEndpoint = receivedState.LocalIceCandidateEndpoint
peerState.RemoteIceCandidateEndpoint = receivedState.RemoteIceCandidateEndpoint
//peerState.RosenpassEnabled = receivedState.RosenpassEnabled // todo: check this variable
d.peers[receivedState.PubKey] = peerState
if skipNotification {
return nil
}
ch, found := d.changeNotify[receivedState.PubKey]
if found && ch != nil {
close(ch)
d.changeNotify[receivedState.PubKey] = nil
}
d.notifyPeerListChanged()
return nil
}
// UpdateWireGuardPeerState updates the WireGuard bits of the peer state
func (d *Status) UpdateWireGuardPeerState(pubKey string, wgStats iface.WGStats) error {
d.mux.Lock()
@@ -289,13 +439,13 @@ func (d *Status) UpdateWireGuardPeerState(pubKey string, wgStats iface.WGStats)
return nil
}
func shouldSkipNotify(received, curr State) bool {
func shouldSkipNotify(receivedConnStatus ConnStatus, curr State) bool {
switch {
case received.ConnStatus == StatusConnecting:
case receivedConnStatus == StatusConnecting:
return true
case received.ConnStatus == StatusDisconnected && curr.ConnStatus == StatusConnecting:
case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusConnecting:
return true
case received.ConnStatus == StatusDisconnected && curr.ConnStatus == StatusDisconnected:
case receivedConnStatus == StatusDisconnected && curr.ConnStatus == StatusDisconnected:
return curr.IP != ""
default:
return false