refactor: improve logging and reorganize code

This commit is contained in:
braginini
2021-04-21 14:45:01 +02:00
parent 3c52326301
commit 2a8198c409
2 changed files with 74 additions and 89 deletions

View File

@@ -85,36 +85,10 @@ func NewConnection(config ConnConfig,
}
}
func (conn *Connection) Close() error {
if !conn.isActive {
log.Infof("connection to peer %s has been already closed, skipping", conn.Config.RemoteWgKey.String())
return nil
}
err := conn.agent.Close()
if err != nil {
return err
}
err = conn.wgConn.Close()
if err != nil {
return err
}
log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String())
conn.isActive = false
return nil
}
// Open opens connection to a remote peer.
// Will block until the connection has successfully established
func (conn *Connection) Open(timeout time.Duration) error {
log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String())
// create an ice.Agent that will be responsible for negotiating and establishing actual peer-to-peer connection
a, err := ice.NewAgent(&ice.AgentConfig{
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
@@ -122,8 +96,6 @@ func (conn *Connection) Open(timeout time.Duration) error {
})
conn.agent = a
log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String())
if err != nil {
return err
}
@@ -133,26 +105,24 @@ func (conn *Connection) Open(timeout time.Duration) error {
return err
}
log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String())
err = conn.listenOnConnectionStateChanges()
if err != nil {
return err
}
log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String())
err = conn.signalCredentials()
if err != nil {
return err
}
log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String())
log.Infof("trying to connect to peer %s", conn.Config.RemoteWgKey.String())
// wait until credentials have been sent from the remote peer (will arrive via a signal server)
select {
case remoteAuth := <-conn.remoteAuthChannel:
log.Infof("got a connection confirmation from peer %s", conn.Config.RemoteWgKey.String())
err = conn.agent.GatherCandidates()
if err != nil {
return err
@@ -173,33 +143,53 @@ func (conn *Connection) Open(timeout time.Duration) error {
go conn.proxyToRemotePeer(*wgConn, remoteConn)
go conn.proxyToLocalWireguard(*wgConn, remoteConn)
log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String())
log.Infof("opened connection to peer %s", conn.Config.RemoteWgKey.String())
conn.isActive = true
case <-time.After(timeout):
conn.closeCond.Do(func() {
log.Errorf("CLOSED CONDITION ON TIMEOUT")
err = conn.Close()
})
err := conn.Close()
if err != nil {
log.Warnf("error while closing connection to peer %s -> %s", conn.Config.RemoteWgKey.String(), err.Error())
}
return fmt.Errorf("timeout of %vs exceeded while waiting for the remote peer %s", timeout.Seconds(), conn.Config.RemoteWgKey.String())
}
// wait until connection has been closed
select {
case <-conn.closeCond.C:
log.Errorf("CLOSED CONDITION RETURNING ERR TO ENGINE")
return fmt.Errorf("connection to peer %s has been closed while starting", conn.Config.RemoteWgKey.String())
return fmt.Errorf("connection to peer %s has been closed", conn.Config.RemoteWgKey.String())
}
}
func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error {
log.Debugf("OnAnswer from peer %s", conn.Config.RemoteWgKey.String())
func (conn *Connection) Close() error {
var err error
conn.closeCond.Do(func() {
if conn.isActive {
log.Debugf("connection is active, ignoring OnAnswer from peer %s", conn.Config.RemoteWgKey.String())
return nil
}
log.Warnf("closing connection to peer %s", conn.Config.RemoteWgKey.String())
if a := conn.agent; a != nil {
e := a.Close()
if e != nil {
log.Warnf("error while closing ICE agent of peer connection %s", conn.Config.RemoteWgKey.String())
err = e
}
}
if c := conn.wgConn; c != nil {
e := c.Close()
if e != nil {
log.Warnf("error while closingWireguard proxy connection of peer connection %s", conn.Config.RemoteWgKey.String())
err = e
}
}
})
return err
}
func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error {
conn.remoteAuthCond.Do(func() {
log.Debugf("OnAnswer from peer %s", conn.Config.RemoteWgKey.String())
conn.remoteAuthChannel <- remoteAuth
})
return nil
@@ -207,27 +197,18 @@ func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error {
func (conn *Connection) OnOffer(remoteAuth IceCredentials) error {
log.Debugf("OnOffer from peer %s", conn.Config.RemoteWgKey.String())
if conn.isActive {
log.Debugf("connection is active, ignoring OnOffer from peer %s", conn.Config.RemoteWgKey.String())
return nil
}
conn.remoteAuthCond.Do(func() {
log.Debugf("OnOffer from peer %s", conn.Config.RemoteWgKey.String())
conn.remoteAuthChannel <- remoteAuth
uFrag, pwd, err := conn.agent.GetLocalUserCredentials()
if err != nil {
}
err = conn.signalAnswer(uFrag, pwd)
if err != nil {
}
})
uFrag, pwd, err := conn.agent.GetLocalUserCredentials()
if err != nil {
return err
}
err = conn.signalAnswer(uFrag, pwd)
if err != nil {
return err
}
return nil
}
@@ -301,7 +282,7 @@ func (conn *Connection) listenOnLocalCandidates() error {
// listenOnConnectionStateChanges registers callback of an ICE Agent to track connection state
func (conn *Connection) listenOnConnectionStateChanges() error {
err := conn.agent.OnConnectionStateChange(func(state ice.ConnectionState) {
log.Debugf("ICE Connection State has changed: %s", state.String())
log.Debugf("ICE Connection State has changed for peer %s -> %s", conn.Config.RemoteWgKey.String(), state.String())
if state == ice.ConnectionStateConnected {
// closed the connection has been established we can check the selected candidate pair
pair, err := conn.agent.GetSelectedCandidatePair()
@@ -312,14 +293,10 @@ func (conn *Connection) listenOnConnectionStateChanges() error {
log.Debugf("closed to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair)
} else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed {
// todo do we really wanna have a connection restart within connection itself? Think of moving it outside
conn.closeCond.Do(func() {
log.Errorf("CLOSED CONNDITION TRIGGERED ON STATE %s", state.String())
err := conn.Close()
if err != nil {
log.Errorf("failed closing connection fo peer %s %s", conn.Config.RemoteWgKey.String(), err.Error())
}
})
err := conn.Close()
if err != nil {
log.Warnf("error while closing connection to peer %s -> %s", conn.Config.RemoteWgKey.String(), err.Error())
}
}
})
@@ -356,20 +333,20 @@ func (conn *Connection) proxyToRemotePeer(wgConn net.Conn, remoteConn *ice.Conn)
buf := make([]byte, 1500)
for {
select {
case <-conn.closeCond.C:
log.Infof("stopped proxying from remote peer %s due to closed connection", conn.Config.RemoteWgKey.String())
return
default:
n, err := wgConn.Read(buf)
if err != nil {
log.Warnln("Error reading from peer: ", err.Error())
//log.Warnln("failed reading from peer: ", err.Error())
continue
}
n, err = remoteConn.Write(buf[:n])
if err != nil {
log.Warnln("Error writing to remote peer: ", err.Error())
//log.Warnln("failed writing to remote peer: ", err.Error())
}
case <-conn.closeCond.C:
log.Infof("stopped proxying to remote peer %s due to closed connection", conn.Config.RemoteWgKey.String())
return
}
}
}
@@ -381,19 +358,20 @@ func (conn *Connection) proxyToLocalWireguard(wgConn net.Conn, remoteConn *ice.C
buf := make([]byte, 1500)
for {
select {
case <-conn.closeCond.C:
log.Infof("stopped proxying from remote peer %s due to closed connection", conn.Config.RemoteWgKey.String())
return
default:
n, err := remoteConn.Read(buf)
if err != nil {
log.Errorf("failed reading from remote connection %s", err)
//log.Errorf("failed reading from remote connection %s", err)
}
n, err = wgConn.Write(buf[:n])
if err != nil {
log.Errorf("failed writing to local Wireguard instance %s", err)
//log.Errorf("failed writing to local Wireguard instance %s", err)
}
case <-conn.closeCond.C:
log.Infof("stopped proxying from remote peer %s due to closed connection", conn.Config.RemoteWgKey.String())
return
}
}
}

View File

@@ -75,25 +75,33 @@ func (e *Engine) Start(privateKey string, peers []Peer) error {
peer := peer
go func() {
var backOff = &backoff.ExponentialBackOff{
InitialInterval: backoff.DefaultInitialInterval,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 5 * time.Second,
MaxElapsedTime: time.Duration(0), //never stop
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
operation := func() error {
_, err := e.openPeerConnection(*wgPort, myKey, peer)
if err != nil {
log.Warnln("retrying connection because of error: ", err.Error())
e.conns[peer.WgPubKey] = nil
return err
}
backOff.Reset()
return nil
}
err = backoff.Retry(operation, backoff.NewExponentialBackOff())
err = backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("----------------------> %s ", err)
return
// should actually never happen
panic(err)
}
}()
}
return nil
}
@@ -126,7 +134,6 @@ func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*
// blocks until the connection is open (or timeout)
err := conn.Open(20 * time.Second)
if err != nil {
log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error())
return nil, err
}
return conn, nil