fix: add reconnect when connection closes

This commit is contained in:
braginini
2021-04-19 19:58:02 +02:00
parent 72435c7ce6
commit c96b63b956
2 changed files with 95 additions and 52 deletions

View File

@@ -2,7 +2,6 @@ package connection
import ( import (
"context" "context"
"github.com/cenkalti/backoff/v4"
"github.com/pion/ice/v2" "github.com/pion/ice/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/iface" "github.com/wiretrustee/wiretrustee/iface"
@@ -53,7 +52,9 @@ type Connection struct {
// remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection // remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection
remoteAuthChannel chan IceCredentials remoteAuthChannel chan IceCredentials
closeChannel chan bool closeChannel chan bool
connectedChannel chan struct{}
closedChannel chan struct{}
// agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer // agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer
agent *ice.Agent agent *ice.Agent
@@ -69,6 +70,8 @@ func NewConnection(config ConnConfig,
signalCandidate func(candidate ice.Candidate) error, signalCandidate func(candidate ice.Candidate) error,
signalOffer func(uFrag string, pwd string) error, signalOffer func(uFrag string, pwd string) error,
signalAnswer func(uFrag string, pwd string) error, signalAnswer func(uFrag string, pwd string) error,
closedChannel chan struct{},
connectedChannel chan struct{},
) *Connection { ) *Connection {
return &Connection{ return &Connection{
@@ -78,6 +81,8 @@ func NewConnection(config ConnConfig,
signalAnswer: signalAnswer, signalAnswer: signalAnswer,
remoteAuthChannel: make(chan IceCredentials, 1), remoteAuthChannel: make(chan IceCredentials, 1),
closeChannel: make(chan bool, 2), closeChannel: make(chan bool, 2),
connectedChannel: connectedChannel,
closedChannel: closedChannel,
agent: nil, agent: nil,
isActive: false, isActive: false,
mux: sync.Mutex{}, mux: sync.Mutex{},
@@ -94,11 +99,6 @@ func (conn *Connection) Close() error {
return nil return nil
} }
log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String())
conn.closeChannel <- true
conn.closeChannel <- true
err := conn.agent.Close() err := conn.agent.Close()
if err != nil { if err != nil {
return err return err
@@ -109,6 +109,11 @@ func (conn *Connection) Close() error {
return err return err
} }
log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String())
conn.closeChannel <- true
conn.closeChannel <- true
log.Debugf("closed connection to peer %s", conn.Config.RemoteWgKey.String()) log.Debugf("closed connection to peer %s", conn.Config.RemoteWgKey.String())
conn.isActive = false conn.isActive = false
@@ -120,22 +125,20 @@ func (conn *Connection) Close() error {
// Will block until the connection has successfully established // Will block until the connection has successfully established
func (conn *Connection) Open() error { func (conn *Connection) Open() error {
log.Debugf("opening connection to peer %s", conn.Config.RemoteWgKey.String()) log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String())
conn.mux.Lock() conn.mux.Lock()
defer conn.mux.Unlock() defer conn.mux.Unlock()
wgConn, err := conn.createWireguardProxy()
if err != nil {
return err
}
conn.wgConn = *wgConn
// create an ice.Agent that will be responsible for negotiating and establishing actual peer-to-peer connection // create an ice.Agent that will be responsible for negotiating and establishing actual peer-to-peer connection
conn.agent, err = ice.NewAgent(&ice.AgentConfig{ a, err := ice.NewAgent(&ice.AgentConfig{
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4}, NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
Urls: conn.Config.StunTurnURLS, Urls: conn.Config.StunTurnURLS,
}) })
conn.agent = a
log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String())
if err != nil { if err != nil {
return err return err
} }
@@ -145,17 +148,23 @@ func (conn *Connection) Open() error {
return err return err
} }
log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String())
err = conn.listenOnConnectionStateChanges() err = conn.listenOnConnectionStateChanges()
if err != nil { if err != nil {
return err return err
} }
log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String())
err = conn.signalCredentials() err = conn.signalCredentials()
if err != nil { if err != nil {
return err return err
} }
// wait until credentials have been sent from the remote peer (will arrive via signal channel) log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String())
// wait until credentials have been sent from the remote peer (will arrive via a signal server)
remoteAuth := <-conn.remoteAuthChannel remoteAuth := <-conn.remoteAuthChannel
err = conn.agent.GatherCandidates() err = conn.agent.GatherCandidates()
@@ -169,6 +178,12 @@ func (conn *Connection) Open() error {
return err return err
} }
wgConn, err := conn.createWireguardProxy()
if err != nil {
return err
}
conn.wgConn = *wgConn
go conn.proxyToRemotePeer(*wgConn, remoteConn) go conn.proxyToRemotePeer(*wgConn, remoteConn)
go conn.proxyToLocalWireguard(*wgConn, remoteConn) go conn.proxyToLocalWireguard(*wgConn, remoteConn)
@@ -228,6 +243,7 @@ func (conn *Connection) OnRemoteCandidate(candidate ice.Candidate) error {
} }
// openConnectionToRemote opens an ice.Conn to the remote peer. This is a real peer-to-peer connection // openConnectionToRemote opens an ice.Conn to the remote peer. This is a real peer-to-peer connection
// blocks until connection has been established
func (conn *Connection) openConnectionToRemote(isControlling bool, credentials IceCredentials) (*ice.Conn, error) { func (conn *Connection) openConnectionToRemote(isControlling bool, credentials IceCredentials) (*ice.Conn, error) {
var realConn *ice.Conn var realConn *ice.Conn
var err error var err error
@@ -295,14 +311,7 @@ func (conn *Connection) listenOnConnectionStateChanges() error {
log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair)
} else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed { } else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed {
// todo do we really wanna have a connection restart within connection itself? Think of moving it outside // todo do we really wanna have a connection restart within connection itself? Think of moving it outside
operation := func() error { close(conn.closedChannel)
return conn.Restart()
}
err := backoff.Retry(operation, backoff.NewExponentialBackOff())
if err != nil {
log.Errorf("error while communicating with the Signal Exchange %s ", err)
return
}
} }
}) })

View File

@@ -2,6 +2,7 @@ package connection
import ( import (
"fmt" "fmt"
"github.com/cenkalti/backoff/v4"
"github.com/pion/ice/v2" "github.com/pion/ice/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/iface" "github.com/wiretrustee/wiretrustee/iface"
@@ -70,43 +71,76 @@ func (e *Engine) Start(privateKey string, peers []Peer) error {
// initialize peer agents // initialize peer agents
for _, peer := range peers { for _, peer := range peers {
remoteKey, _ := wgtypes.ParseKey(peer.WgPubKey)
connConfig := &ConnConfig{
WgListenAddr: fmt.Sprintf("127.0.0.1:%d", *wgPort),
WgPeerIp: e.wgIp,
WgIface: e.wgIface,
WgAllowedIPs: peer.WgAllowedIps,
WgKey: myKey,
RemoteWgKey: remoteKey,
StunTurnURLS: e.stunsTurns,
}
signalOffer := func(uFrag string, pwd string) error {
return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, false)
}
signalAnswer := func(uFrag string, pwd string) error {
return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, true)
}
signalCandidate := func(candidate ice.Candidate) error {
return signalCandidate(candidate, myKey, remoteKey, e.signal)
}
conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer)
e.conns[remoteKey.String()] = conn
peer := peer
go func() { go func() {
err = conn.Open()
if err != nil { operation := func() error {
log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error()) conn, closed, err := e.openConnection(*wgPort, myKey, peer)
//todo if err != nil {
return err
}
select {
case _, ok := <-closed:
if !ok {
err = conn.Close()
if err != nil {
return err
}
}
return fmt.Errorf("connection to peer %s has been closed", peer.WgPubKey)
}
} }
err = backoff.Retry(operation, backoff.NewExponentialBackOff())
if err != nil {
log.Errorf("----------------------> %s ", err)
return
}
}() }()
} }
return nil return nil
} }
func (e *Engine) openConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Connection, chan struct{}, error) {
remoteKey, _ := wgtypes.ParseKey(peer.WgPubKey)
connConfig := &ConnConfig{
WgListenAddr: fmt.Sprintf("127.0.0.1:%d", wgPort),
WgPeerIp: e.wgIp,
WgIface: e.wgIface,
WgAllowedIPs: peer.WgAllowedIps,
WgKey: myKey,
RemoteWgKey: remoteKey,
StunTurnURLS: e.stunsTurns,
}
signalOffer := func(uFrag string, pwd string) error {
return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, false)
}
signalAnswer := func(uFrag string, pwd string) error {
return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, true)
}
signalCandidate := func(candidate ice.Candidate) error {
return signalCandidate(candidate, myKey, remoteKey, e.signal)
}
connected := make(chan struct{}, 1)
closed := make(chan struct{})
conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer, closed, connected)
e.conns[remoteKey.String()] = conn
// blocks until the connection is open (or timeout??)
err := conn.Open()
if err != nil {
log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error())
return nil, nil, err
}
return conn, closed, nil
}
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error { func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {
err := s.Send(&sProto.Message{ err := s.Send(&sProto.Message{
Type: sProto.Message_CANDIDATE, Type: sProto.Message_CANDIDATE,