From 2a8198c409a33d05f77a41d117d4d7df9d30c629 Mon Sep 17 00:00:00 2001 From: braginini Date: Wed, 21 Apr 2021 14:45:01 +0200 Subject: [PATCH] refactor: improve logging and reorganize code --- connection/connection.go | 142 +++++++++++++++++---------------------- connection/engine.go | 21 ++++-- 2 files changed, 74 insertions(+), 89 deletions(-) diff --git a/connection/connection.go b/connection/connection.go index 040d8735d..1a4006349 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -85,36 +85,10 @@ func NewConnection(config ConnConfig, } } -func (conn *Connection) Close() error { - - if !conn.isActive { - log.Infof("connection to peer %s has been already closed, skipping", conn.Config.RemoteWgKey.String()) - return nil - } - - err := conn.agent.Close() - if err != nil { - return err - } - - err = conn.wgConn.Close() - if err != nil { - return err - } - - log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String()) - - conn.isActive = false - - return nil -} - // Open opens connection to a remote peer. // Will block until the connection has successfully established func (conn *Connection) Open(timeout time.Duration) error { - log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String()) - // create an ice.Agent that will be responsible for negotiating and establishing actual peer-to-peer connection a, err := ice.NewAgent(&ice.AgentConfig{ NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4}, @@ -122,8 +96,6 @@ func (conn *Connection) Open(timeout time.Duration) error { }) conn.agent = a - log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String()) - if err != nil { return err } @@ -133,26 +105,24 @@ func (conn *Connection) Open(timeout time.Duration) error { return err } - log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String()) - err = conn.listenOnConnectionStateChanges() if err != nil { return err } - log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String()) - err = conn.signalCredentials() if err != nil { return err } - log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String()) + log.Infof("trying to connect to peer %s", conn.Config.RemoteWgKey.String()) // wait until credentials have been sent from the remote peer (will arrive via a signal server) select { case remoteAuth := <-conn.remoteAuthChannel: + log.Infof("got a connection confirmation from peer %s", conn.Config.RemoteWgKey.String()) + err = conn.agent.GatherCandidates() if err != nil { return err @@ -173,33 +143,53 @@ func (conn *Connection) Open(timeout time.Duration) error { go conn.proxyToRemotePeer(*wgConn, remoteConn) go conn.proxyToLocalWireguard(*wgConn, remoteConn) - log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String()) + log.Infof("opened connection to peer %s", conn.Config.RemoteWgKey.String()) conn.isActive = true case <-time.After(timeout): - conn.closeCond.Do(func() { - log.Errorf("CLOSED CONDITION ON TIMEOUT") - err = conn.Close() - }) + err := conn.Close() + if err != nil { + log.Warnf("error while closing connection to peer %s -> %s", conn.Config.RemoteWgKey.String(), err.Error()) + } return fmt.Errorf("timeout of %vs exceeded while waiting for the remote peer %s", timeout.Seconds(), conn.Config.RemoteWgKey.String()) } + // wait until connection has been closed select { case <-conn.closeCond.C: - log.Errorf("CLOSED CONDITION RETURNING ERR TO ENGINE") - return fmt.Errorf("connection to peer %s has been closed while starting", conn.Config.RemoteWgKey.String()) + return fmt.Errorf("connection to peer %s has been closed", conn.Config.RemoteWgKey.String()) } } -func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error { - log.Debugf("OnAnswer from peer %s", conn.Config.RemoteWgKey.String()) +func (conn *Connection) Close() error { + var err error + conn.closeCond.Do(func() { - if conn.isActive { - log.Debugf("connection is active, ignoring OnAnswer from peer %s", conn.Config.RemoteWgKey.String()) - return nil - } + log.Warnf("closing connection to peer %s", conn.Config.RemoteWgKey.String()) + + if a := conn.agent; a != nil { + e := a.Close() + if e != nil { + log.Warnf("error while closing ICE agent of peer connection %s", conn.Config.RemoteWgKey.String()) + err = e + } + } + + if c := conn.wgConn; c != nil { + e := c.Close() + if e != nil { + log.Warnf("error while closingWireguard proxy connection of peer connection %s", conn.Config.RemoteWgKey.String()) + err = e + } + } + }) + return err +} + +func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error { conn.remoteAuthCond.Do(func() { + log.Debugf("OnAnswer from peer %s", conn.Config.RemoteWgKey.String()) conn.remoteAuthChannel <- remoteAuth }) return nil @@ -207,27 +197,18 @@ func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error { func (conn *Connection) OnOffer(remoteAuth IceCredentials) error { - log.Debugf("OnOffer from peer %s", conn.Config.RemoteWgKey.String()) - - if conn.isActive { - log.Debugf("connection is active, ignoring OnOffer from peer %s", conn.Config.RemoteWgKey.String()) - return nil - } - conn.remoteAuthCond.Do(func() { + log.Debugf("OnOffer from peer %s", conn.Config.RemoteWgKey.String()) conn.remoteAuthChannel <- remoteAuth + uFrag, pwd, err := conn.agent.GetLocalUserCredentials() + if err != nil { + } + + err = conn.signalAnswer(uFrag, pwd) + if err != nil { + } }) - uFrag, pwd, err := conn.agent.GetLocalUserCredentials() - if err != nil { - return err - } - - err = conn.signalAnswer(uFrag, pwd) - if err != nil { - return err - } - return nil } @@ -301,7 +282,7 @@ func (conn *Connection) listenOnLocalCandidates() error { // listenOnConnectionStateChanges registers callback of an ICE Agent to track connection state func (conn *Connection) listenOnConnectionStateChanges() error { err := conn.agent.OnConnectionStateChange(func(state ice.ConnectionState) { - log.Debugf("ICE Connection State has changed: %s", state.String()) + log.Debugf("ICE Connection State has changed for peer %s -> %s", conn.Config.RemoteWgKey.String(), state.String()) if state == ice.ConnectionStateConnected { // closed the connection has been established we can check the selected candidate pair pair, err := conn.agent.GetSelectedCandidatePair() @@ -312,14 +293,10 @@ func (conn *Connection) listenOnConnectionStateChanges() error { log.Debugf("closed to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) } else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed { // todo do we really wanna have a connection restart within connection itself? Think of moving it outside - - conn.closeCond.Do(func() { - log.Errorf("CLOSED CONNDITION TRIGGERED ON STATE %s", state.String()) - err := conn.Close() - if err != nil { - log.Errorf("failed closing connection fo peer %s %s", conn.Config.RemoteWgKey.String(), err.Error()) - } - }) + err := conn.Close() + if err != nil { + log.Warnf("error while closing connection to peer %s -> %s", conn.Config.RemoteWgKey.String(), err.Error()) + } } }) @@ -356,20 +333,20 @@ func (conn *Connection) proxyToRemotePeer(wgConn net.Conn, remoteConn *ice.Conn) buf := make([]byte, 1500) for { select { + case <-conn.closeCond.C: + log.Infof("stopped proxying from remote peer %s due to closed connection", conn.Config.RemoteWgKey.String()) + return default: n, err := wgConn.Read(buf) if err != nil { - log.Warnln("Error reading from peer: ", err.Error()) + //log.Warnln("failed reading from peer: ", err.Error()) continue } n, err = remoteConn.Write(buf[:n]) if err != nil { - log.Warnln("Error writing to remote peer: ", err.Error()) + //log.Warnln("failed writing to remote peer: ", err.Error()) } - case <-conn.closeCond.C: - log.Infof("stopped proxying to remote peer %s due to closed connection", conn.Config.RemoteWgKey.String()) - return } } } @@ -381,19 +358,20 @@ func (conn *Connection) proxyToLocalWireguard(wgConn net.Conn, remoteConn *ice.C buf := make([]byte, 1500) for { select { + case <-conn.closeCond.C: + log.Infof("stopped proxying from remote peer %s due to closed connection", conn.Config.RemoteWgKey.String()) + return default: n, err := remoteConn.Read(buf) if err != nil { - log.Errorf("failed reading from remote connection %s", err) + //log.Errorf("failed reading from remote connection %s", err) } n, err = wgConn.Write(buf[:n]) if err != nil { - log.Errorf("failed writing to local Wireguard instance %s", err) + //log.Errorf("failed writing to local Wireguard instance %s", err) } - case <-conn.closeCond.C: - log.Infof("stopped proxying from remote peer %s due to closed connection", conn.Config.RemoteWgKey.String()) - return + } } } diff --git a/connection/engine.go b/connection/engine.go index e2135bafc..f558eda6a 100644 --- a/connection/engine.go +++ b/connection/engine.go @@ -75,25 +75,33 @@ func (e *Engine) Start(privateKey string, peers []Peer) error { peer := peer go func() { - + var backOff = &backoff.ExponentialBackOff{ + InitialInterval: backoff.DefaultInitialInterval, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: 5 * time.Second, + MaxElapsedTime: time.Duration(0), //never stop + Stop: backoff.Stop, + Clock: backoff.SystemClock, + } operation := func() error { _, err := e.openPeerConnection(*wgPort, myKey, peer) if err != nil { + log.Warnln("retrying connection because of error: ", err.Error()) e.conns[peer.WgPubKey] = nil return err } + backOff.Reset() return nil } - err = backoff.Retry(operation, backoff.NewExponentialBackOff()) + err = backoff.Retry(operation, backOff) if err != nil { - log.Errorf("----------------------> %s ", err) - return + // should actually never happen + panic(err) } - }() } - return nil } @@ -126,7 +134,6 @@ func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (* // blocks until the connection is open (or timeout) err := conn.Open(20 * time.Second) if err != nil { - log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error()) return nil, err } return conn, nil