diff --git a/connection/cond.go b/connection/cond.go new file mode 100644 index 000000000..9790e4d9c --- /dev/null +++ b/connection/cond.go @@ -0,0 +1,32 @@ +package connection + +import "sync" + +// A Cond is a condition variable like sync.Cond, but using a channel so we can use select. +type Cond struct { + once sync.Once + C chan struct{} +} + +// NewCond creates a new condition variable. +func NewCond() *Cond { + return &Cond{C: make(chan struct{})} +} + +// Do runs f if the condition hasn't been signaled yet. Afterwards it will be signaled. +func (c *Cond) Do(f func()) { + c.once.Do(func() { + f() + close(c.C) + }) +} + +// Signal closes the condition variable channel. +func (c *Cond) Signal() { + c.Do(func() {}) +} + +// Wait waits for the condition variable channel to close. +func (c *Cond) Wait() { + <-c.C +} diff --git a/connection/connection.go b/connection/connection.go index e3231e051..040d8735d 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -53,17 +53,17 @@ type Connection struct { // remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection remoteAuthChannel chan IceCredentials - closeChannel chan struct{} - closedChannel chan struct{} - // agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer agent *ice.Agent wgConn net.Conn - // mux is used to ensure exclusive access to Open() and Close() operations on connection - mux sync.Mutex // isActive indicates whether connection is active or not. isActive bool + + connected *Cond + closeCond *Cond + + remoteAuthCond sync.Once } func NewConnection(config ConnConfig, @@ -78,19 +78,15 @@ func NewConnection(config ConnConfig, signalOffer: signalOffer, signalAnswer: signalAnswer, remoteAuthChannel: make(chan IceCredentials, 1), - closeChannel: make(chan struct{}), - closedChannel: make(chan struct{}), + closeCond: NewCond(), + connected: NewCond(), agent: nil, isActive: false, - mux: sync.Mutex{}, } } func (conn *Connection) Close() error { - conn.mux.Lock() - defer conn.mux.Unlock() - if !conn.isActive { log.Infof("connection to peer %s has been already closed, skipping", conn.Config.RemoteWgKey.String()) return nil @@ -108,10 +104,6 @@ func (conn *Connection) Close() error { log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String()) - close(conn.closeChannel) - - log.Debugf("closed connection to peer %s", conn.Config.RemoteWgKey.String()) - conn.isActive = false return nil @@ -119,13 +111,10 @@ func (conn *Connection) Close() error { // Open opens connection to a remote peer. // Will block until the connection has successfully established -func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) { +func (conn *Connection) Open(timeout time.Duration) error { log.Debugf("1: opening connection to peer %s", conn.Config.RemoteWgKey.String()) - conn.mux.Lock() - defer conn.mux.Unlock() - // create an ice.Agent that will be responsible for negotiating and establishing actual peer-to-peer connection a, err := ice.NewAgent(&ice.AgentConfig{ NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4}, @@ -136,26 +125,26 @@ func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) { log.Debugf("2: opening connection to peer %s", conn.Config.RemoteWgKey.String()) if err != nil { - return nil, err + return err } err = conn.listenOnLocalCandidates() if err != nil { - return nil, err + return err } log.Debugf("3: opening connection to peer %s", conn.Config.RemoteWgKey.String()) err = conn.listenOnConnectionStateChanges() if err != nil { - return nil, err + return err } log.Debugf("4: opening connection to peer %s", conn.Config.RemoteWgKey.String()) err = conn.signalCredentials() if err != nil { - return nil, err + return err } log.Debugf("5: opening connection to peer %s", conn.Config.RemoteWgKey.String()) @@ -166,18 +155,18 @@ func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) { err = conn.agent.GatherCandidates() if err != nil { - return nil, err + return err } remoteConn, err := conn.openConnectionToRemote(remoteAuth.isControlling, remoteAuth) if err != nil { log.Errorf("failed establishing connection with the remote peer %s %s", conn.Config.RemoteWgKey.String(), err) - return nil, err + return err } wgConn, err := conn.createWireguardProxy() if err != nil { - return nil, err + return err } conn.wgConn = *wgConn @@ -187,10 +176,18 @@ func (conn *Connection) Open(timeout time.Duration) (chan struct{}, error) { log.Debugf("opened connection to peer %s", conn.Config.RemoteWgKey.String()) conn.isActive = true - - return conn.closedChannel, nil case <-time.After(timeout): - return nil, fmt.Errorf("timeout of %vs exceeded while waiting for connection to peer %s", timeout.Seconds(), conn.Config.RemoteWgKey.String()) + conn.closeCond.Do(func() { + log.Errorf("CLOSED CONDITION ON TIMEOUT") + err = conn.Close() + }) + return fmt.Errorf("timeout of %vs exceeded while waiting for the remote peer %s", timeout.Seconds(), conn.Config.RemoteWgKey.String()) + } + + select { + case <-conn.closeCond.C: + log.Errorf("CLOSED CONDITION RETURNING ERR TO ENGINE") + return fmt.Errorf("connection to peer %s has been closed while starting", conn.Config.RemoteWgKey.String()) } } @@ -202,7 +199,9 @@ func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error { return nil } - conn.remoteAuthChannel <- remoteAuth + conn.remoteAuthCond.Do(func() { + conn.remoteAuthChannel <- remoteAuth + }) return nil } @@ -215,7 +214,9 @@ func (conn *Connection) OnOffer(remoteAuth IceCredentials) error { return nil } - conn.remoteAuthChannel <- remoteAuth + conn.remoteAuthCond.Do(func() { + conn.remoteAuthChannel <- remoteAuth + }) uFrag, pwd, err := conn.agent.GetLocalUserCredentials() if err != nil { @@ -302,20 +303,23 @@ func (conn *Connection) listenOnConnectionStateChanges() error { err := conn.agent.OnConnectionStateChange(func(state ice.ConnectionState) { log.Debugf("ICE Connection State has changed: %s", state.String()) if state == ice.ConnectionStateConnected { - // once the connection has been established we can check the selected candidate pair + // closed the connection has been established we can check the selected candidate pair pair, err := conn.agent.GetSelectedCandidatePair() if err != nil { log.Errorf("failed selecting active ICE candidate pair %s", err) return } - log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) + log.Debugf("closed to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) } else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed { // todo do we really wanna have a connection restart within connection itself? Think of moving it outside - err := conn.Close() - if err != nil { - log.Errorf("failed closing connection fo peer %s %s", conn.Config.RemoteWgKey.String(), err.Error()) - } - close(conn.closedChannel) + + conn.closeCond.Do(func() { + log.Errorf("CLOSED CONNDITION TRIGGERED ON STATE %s", state.String()) + err := conn.Close() + if err != nil { + log.Errorf("failed closing connection fo peer %s %s", conn.Config.RemoteWgKey.String(), err.Error()) + } + }) } }) @@ -363,10 +367,8 @@ func (conn *Connection) proxyToRemotePeer(wgConn net.Conn, remoteConn *ice.Conn) if err != nil { log.Warnln("Error writing to remote peer: ", err.Error()) } - case _, ok := <-conn.closeChannel: - if !ok { - log.Infof("stopped proxying to remote peer %s", conn.Config.RemoteWgKey.String()) - } + case <-conn.closeCond.C: + log.Infof("stopped proxying to remote peer %s due to closed connection", conn.Config.RemoteWgKey.String()) return } } @@ -389,10 +391,8 @@ func (conn *Connection) proxyToLocalWireguard(wgConn net.Conn, remoteConn *ice.C if err != nil { log.Errorf("failed writing to local Wireguard instance %s", err) } - case _, ok := <-conn.closeChannel: - if !ok { - log.Infof("stopped proxying from remote peer %s", conn.Config.RemoteWgKey.String()) - } + case <-conn.closeCond.C: + log.Infof("stopped proxying from remote peer %s due to closed connection", conn.Config.RemoteWgKey.String()) return } } diff --git a/connection/engine.go b/connection/engine.go index c71ef15b7..e2135bafc 100644 --- a/connection/engine.go +++ b/connection/engine.go @@ -77,20 +77,12 @@ func (e *Engine) Start(privateKey string, peers []Peer) error { go func() { operation := func() error { - _, closed, err := e.openPeerConnection(*wgPort, myKey, peer) + _, err := e.openPeerConnection(*wgPort, myKey, peer) if err != nil { e.conns[peer.WgPubKey] = nil return err } - - select { - case _, ok := <-closed: - if !ok { - e.conns[peer.WgPubKey] = nil - return fmt.Errorf("connection to peer %s has been closed", peer.WgPubKey) - } - return nil - } + return nil } err = backoff.Retry(operation, backoff.NewExponentialBackOff()) @@ -105,7 +97,7 @@ func (e *Engine) Start(privateKey string, peers []Peer) error { return nil } -func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Connection, chan struct{}, error) { +func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Connection, error) { remoteKey, _ := wgtypes.ParseKey(peer.WgPubKey) connConfig := &ConnConfig{ @@ -132,12 +124,12 @@ func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (* conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer) e.conns[remoteKey.String()] = conn // blocks until the connection is open (or timeout) - closedCh, err := conn.Open(60 * time.Second) + err := conn.Open(20 * time.Second) if err != nil { log.Errorf("error openning connection to a remote peer %s %s", remoteKey.String(), err.Error()) - return nil, nil, err + return nil, err } - return conn, closedCh, nil + return conn, nil } func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {