diff --git a/connection/connection.go b/connection/connection.go index 3b7a781f6..9a410bdc6 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -2,7 +2,6 @@ package connection import ( "context" - "github.com/cenkalti/backoff/v4" "github.com/pion/ice/v2" log "github.com/sirupsen/logrus" "github.com/wiretrustee/wiretrustee/iface" @@ -53,7 +52,9 @@ type Connection struct { // remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection remoteAuthChannel chan IceCredentials - closeChannel chan bool + closeChannel chan bool + connectedChannel chan struct{} + closedChannel chan struct{} // agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer agent *ice.Agent @@ -69,6 +70,8 @@ func NewConnection(config ConnConfig, signalCandidate func(candidate ice.Candidate) error, signalOffer func(uFrag string, pwd string) error, signalAnswer func(uFrag string, pwd string) error, + closedChannel chan struct{}, + connectedChannel chan struct{}, ) *Connection { return &Connection{ @@ -78,6 +81,8 @@ func NewConnection(config ConnConfig, signalAnswer: signalAnswer, remoteAuthChannel: make(chan IceCredentials, 1), closeChannel: make(chan bool, 2), + connectedChannel: connectedChannel, + closedChannel: closedChannel, agent: nil, isActive: false, mux: sync.Mutex{}, @@ -94,11 +99,6 @@ func (conn *Connection) Close() error { return nil } - log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String()) - - conn.closeChannel <- true - conn.closeChannel <- true - err := conn.agent.Close() if err != nil { return err @@ -109,6 +109,11 @@ func (conn *Connection) Close() error { return err } + log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String()) + + conn.closeChannel <- true + conn.closeChannel <- true + log.Debugf("closed connection to peer %s", conn.Config.RemoteWgKey.String()) conn.isActive = false @@ -120,22 +125,20 @@ func (conn *Connection) Close() error { // Will block until the connection has successfully established func (conn *Connection) Open() error { - log.Debugf("opening connection to peer %s", conn.Config.RemoteWgKey.String()) + log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String()) conn.mux.Lock() defer conn.mux.Unlock() - wgConn, err := conn.createWireguardProxy() - if err != nil { - return err - } - conn.wgConn = *wgConn - // create an ice.Agent that will be responsible for negotiating and establishing actual peer-to-peer connection - conn.agent, err = ice.NewAgent(&ice.AgentConfig{ + a, err := ice.NewAgent(&ice.AgentConfig{ NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4}, Urls: conn.Config.StunTurnURLS, }) + conn.agent = a + + log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String()) + if err != nil { return err } @@ -145,17 +148,23 @@ func (conn *Connection) Open() error { return err } + log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String()) + err = conn.listenOnConnectionStateChanges() if err != nil { return err } + log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String()) + err = conn.signalCredentials() if err != nil { return err } - // wait until credentials have been sent from the remote peer (will arrive via signal channel) + log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String()) + + // wait until credentials have been sent from the remote peer (will arrive via a signal server) remoteAuth := <-conn.remoteAuthChannel err = conn.agent.GatherCandidates() @@ -169,6 +178,12 @@ func (conn *Connection) Open() error { return err } + wgConn, err := conn.createWireguardProxy() + if err != nil { + return err + } + conn.wgConn = *wgConn + go conn.proxyToRemotePeer(*wgConn, remoteConn) go conn.proxyToLocalWireguard(*wgConn, remoteConn) @@ -228,6 +243,7 @@ func (conn *Connection) OnRemoteCandidate(candidate ice.Candidate) error { } // openConnectionToRemote opens an ice.Conn to the remote peer. This is a real peer-to-peer connection +// blocks until connection has been established func (conn *Connection) openConnectionToRemote(isControlling bool, credentials IceCredentials) (*ice.Conn, error) { var realConn *ice.Conn var err error @@ -295,14 +311,7 @@ func (conn *Connection) listenOnConnectionStateChanges() error { log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) } else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed { // todo do we really wanna have a connection restart within connection itself? Think of moving it outside - operation := func() error { - return conn.Restart() - } - err := backoff.Retry(operation, backoff.NewExponentialBackOff()) - if err != nil { - log.Errorf("error while communicating with the Signal Exchange %s ", err) - return - } + close(conn.closedChannel) } }) diff --git a/connection/engine.go b/connection/engine.go index 06ba03c77..bd40fc67a 100644 --- a/connection/engine.go +++ b/connection/engine.go @@ -2,6 +2,7 @@ package connection import ( "fmt" + "github.com/cenkalti/backoff/v4" "github.com/pion/ice/v2" log "github.com/sirupsen/logrus" "github.com/wiretrustee/wiretrustee/iface" @@ -70,43 +71,76 @@ func (e *Engine) Start(privateKey string, peers []Peer) error { // initialize peer agents for _, peer := range peers { - remoteKey, _ := wgtypes.ParseKey(peer.WgPubKey) - connConfig := &ConnConfig{ - WgListenAddr: fmt.Sprintf("127.0.0.1:%d", *wgPort), - WgPeerIp: e.wgIp, - WgIface: e.wgIface, - WgAllowedIPs: peer.WgAllowedIps, - WgKey: myKey, - RemoteWgKey: remoteKey, - StunTurnURLS: e.stunsTurns, - } - - signalOffer := func(uFrag string, pwd string) error { - return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, false) - } - - signalAnswer := func(uFrag string, pwd string) error { - return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, true) - } - signalCandidate := func(candidate ice.Candidate) error { - return signalCandidate(candidate, myKey, remoteKey, e.signal) - } - - conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer) - e.conns[remoteKey.String()] = conn + peer := peer go func() { - err = conn.Open() - if err != nil { - log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error()) - //todo + + operation := func() error { + conn, closed, err := e.openConnection(*wgPort, myKey, peer) + if err != nil { + return err + } + + select { + case _, ok := <-closed: + if !ok { + err = conn.Close() + if err != nil { + return err + } + } + return fmt.Errorf("connection to peer %s has been closed", peer.WgPubKey) + } } + + err = backoff.Retry(operation, backoff.NewExponentialBackOff()) + if err != nil { + log.Errorf("----------------------> %s ", err) + return + } + }() } return nil } +func (e *Engine) openConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Connection, chan struct{}, error) { + remoteKey, _ := wgtypes.ParseKey(peer.WgPubKey) + connConfig := &ConnConfig{ + WgListenAddr: fmt.Sprintf("127.0.0.1:%d", wgPort), + WgPeerIp: e.wgIp, + WgIface: e.wgIface, + WgAllowedIPs: peer.WgAllowedIps, + WgKey: myKey, + RemoteWgKey: remoteKey, + StunTurnURLS: e.stunsTurns, + } + + signalOffer := func(uFrag string, pwd string) error { + return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, false) + } + + signalAnswer := func(uFrag string, pwd string) error { + return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, true) + } + signalCandidate := func(candidate ice.Candidate) error { + return signalCandidate(candidate, myKey, remoteKey, e.signal) + } + + connected := make(chan struct{}, 1) + closed := make(chan struct{}) + conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer, closed, connected) + e.conns[remoteKey.String()] = conn + // blocks until the connection is open (or timeout??) + err := conn.Open() + if err != nil { + log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error()) + return nil, nil, err + } + return conn, closed, nil +} + func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error { err := s.Send(&sProto.Message{ Type: sProto.Message_CANDIDATE,