diff --git a/.github/workflows/golang-test-build.yml b/.github/workflows/golang-test-build.yml new file mode 100644 index 000000000..680346ce9 --- /dev/null +++ b/.github/workflows/golang-test-build.yml @@ -0,0 +1,44 @@ +on: + push: + branches: + - main + pull_request: +name: Test Build On Platforms +jobs: + test_build: + strategy: + matrix: + os: [ windows, linux, darwin ] + go-version: [1.17.x] + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@v2 + + - name: Install Go + uses: actions/setup-go@v2 + with: + go-version: ${{ matrix.go-version }} + + - name: Cache Go modules + uses: actions/cache@v1 + with: + path: ~/go/pkg/mod + key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} + restore-keys: | + ${{ runner.os }}-go- + + - name: Install modules + run: go mod tidy + + - name: run build client + run: GOOS=${{ matrix.os }} go build . + working-directory: client + + - name: run build management + run: GOOS=${{ matrix.os }} go build . + working-directory: management + + - name: run build signal + run: GOOS=${{ matrix.os }} go build . + working-directory: signal \ No newline at end of file diff --git a/.github/workflows/golang-test.yml b/.github/workflows/golang-test.yml index 63a41a408..6ddb7f12b 100644 --- a/.github/workflows/golang-test.yml +++ b/.github/workflows/golang-test.yml @@ -3,7 +3,7 @@ on: branches: - main pull_request: -name: Test +name: Test Code jobs: test: strategy: @@ -15,45 +15,19 @@ jobs: uses: actions/setup-go@v2 with: go-version: ${{ matrix.go-version }} + - name: update limits.d + run: | + cat <<'EOF' | sudo tee -a /etc/security/limits.d/wt.conf + root soft nproc 65535 + root hard nproc 65535 + root soft nofile 65535 + root hard nofile 65535 + $(whoami) soft nproc 65535 + $(whoami) hard nproc 65535 + $(whoami) soft nofile 65535 + $(whoami) hard nofile 65535 + EOF - name: Checkout code uses: actions/checkout@v2 - name: Test - run: GOBIN=$(which go) && sudo --preserve-env=GOROOT $GOBIN test -p 1 ./... - - test_build: - strategy: - matrix: - os: [ windows, linux, darwin ] - go-version: [1.17.x] - runs-on: ubuntu-latest - steps: - - name: Checkout code - uses: actions/checkout@v2 - - - name: Install Go - uses: actions/setup-go@v2 - with: - go-version: ${{ matrix.go-version }} - - - name: Cache Go modules - uses: actions/cache@v1 - with: - path: ~/go/pkg/mod - key: ${{ runner.os }}-go-${{ hashFiles('**/go.sum') }} - restore-keys: | - ${{ runner.os }}-go- - - - name: Install modules - run: go mod tidy - - - name: run build client - run: GOOS=${{ matrix.os }} go build . - working-directory: client - - - name: run build management - run: GOOS=${{ matrix.os }} go build . - working-directory: management - - - name: run build signal - run: GOOS=${{ matrix.os }} go build . - working-directory: signal \ No newline at end of file + run: GOBIN=$(which go) && sudo --preserve-env=GOROOT $GOBIN test -p 1 ./... \ No newline at end of file diff --git a/client/cmd/up.go b/client/cmd/up.go index acb048421..c9fc95f33 100644 --- a/client/cmd/up.go +++ b/client/cmd/up.go @@ -68,6 +68,7 @@ func createEngineConfig(key wgtypes.Key, config *internal.Config, peerConfig *mg WgAddr: peerConfig.Address, IFaceBlackList: iFaceBlackList, WgPrivateKey: key, + WgPort: internal.WgPort, } if config.PreSharedKey != "" { diff --git a/client/cmd/up_test.go b/client/cmd/up_test.go index b5a7a8b6d..4ecb030a3 100644 --- a/client/cmd/up_test.go +++ b/client/cmd/up_test.go @@ -36,7 +36,7 @@ func TestUp_Start(t *testing.T) { func TestUp(t *testing.T) { - defer iface.Close() + defer iface.Close("wt0") tempDir := t.TempDir() confPath := tempDir + "/config.json" diff --git a/client/internal/cond.go b/client/internal/cond.go deleted file mode 100644 index 06aff312c..000000000 --- a/client/internal/cond.go +++ /dev/null @@ -1,32 +0,0 @@ -package internal - -import "sync" - -// A Cond is a condition variable like sync.Cond, but using a channel so we can use select. -type Cond struct { - once sync.Once - C chan struct{} -} - -// NewCond creates a new condition variable. -func NewCond() *Cond { - return &Cond{C: make(chan struct{})} -} - -// Do runs f if the condition hasn't been signaled yet. Afterwards it will be signaled. -func (c *Cond) Do(f func()) { - c.once.Do(func() { - f() - close(c.C) - }) -} - -// Signal closes the condition variable channel. -func (c *Cond) Signal() { - c.Do(func() {}) -} - -// Wait waits for the condition variable channel to close. -func (c *Cond) Wait() { - <-c.C -} diff --git a/client/internal/connection.go b/client/internal/connection.go deleted file mode 100644 index affa5b8a3..000000000 --- a/client/internal/connection.go +++ /dev/null @@ -1,425 +0,0 @@ -package internal - -import ( - "context" - "fmt" - ice "github.com/pion/ice/v2" - log "github.com/sirupsen/logrus" - "github.com/wiretrustee/wiretrustee/iface" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - "net" - "sync" - "time" -) - -var ( - // DefaultWgKeepAlive default Wireguard keep alive constant - DefaultWgKeepAlive = 20 * time.Second - privateIPBlocks []*net.IPNet -) - -type Status string - -const ( - StatusConnected Status = "Connected" - StatusConnecting Status = "Connecting" - StatusDisconnected Status = "Disconnected" -) - -func init() { - for _, cidr := range []string{ - "127.0.0.0/8", // IPv4 loopback - "10.0.0.0/8", // RFC1918 - "172.16.0.0/12", // RFC1918 - "192.168.0.0/16", // RFC1918 - "169.254.0.0/16", // RFC3927 link-local - "::1/128", // IPv6 loopback - "fe80::/10", // IPv6 link-local - "fc00::/7", // IPv6 unique local addr - } { - _, block, err := net.ParseCIDR(cidr) - if err != nil { - panic(fmt.Errorf("parse error on %q: %v", cidr, err)) - } - privateIPBlocks = append(privateIPBlocks, block) - } -} - -// ConnConfig Connection configuration struct -type ConnConfig struct { - // Local Wireguard listening address e.g. 127.0.0.1:51820 - WgListenAddr string - // A Local Wireguard Peer IP address in CIDR notation e.g. 10.30.30.1/24 - WgPeerIP string - // Local Wireguard Interface name (e.g. wg0) - WgIface string - // Wireguard allowed IPs (e.g. 10.30.30.2/32) - WgAllowedIPs string - // Local Wireguard private key - WgKey wgtypes.Key - // Remote Wireguard public key - RemoteWgKey wgtypes.Key - - PreSharedKey *wgtypes.Key - - StunTurnURLS []*ice.URL - - iFaceBlackList map[string]struct{} -} - -// IceCredentials ICE protocol credentials struct -type IceCredentials struct { - uFrag string - pwd string -} - -// Connection Holds information about a connection and handles signal protocol -type Connection struct { - Config ConnConfig - // signalCandidate is a handler function to signal remote peer about local connection candidate - signalCandidate func(candidate ice.Candidate) error - - // signalOffer is a handler function to signal remote peer our connection offer (credentials) - signalOffer func(uFrag string, pwd string) error - - // signalOffer is a handler function to signal remote peer our connection answer (credentials) - signalAnswer func(uFrag string, pwd string) error - - // remoteAuthChannel is a channel used to wait for remote credentials to proceed with the connection - remoteAuthChannel chan IceCredentials - - // agent is an actual ice.Agent that is used to negotiate and maintain a connection to a remote peer - agent *ice.Agent - - wgProxy *WgProxy - - connected *Cond - closeCond *Cond - - remoteAuthCond sync.Once - - Status Status -} - -// NewConnection Creates a new connection and sets handling functions for signal protocol -func NewConnection(config ConnConfig, - signalCandidate func(candidate ice.Candidate) error, - signalOffer func(uFrag string, pwd string) error, - signalAnswer func(uFrag string, pwd string) error, -) *Connection { - - return &Connection{ - Config: config, - signalCandidate: signalCandidate, - signalOffer: signalOffer, - signalAnswer: signalAnswer, - remoteAuthChannel: make(chan IceCredentials, 1), - closeCond: NewCond(), - connected: NewCond(), - agent: nil, - wgProxy: NewWgProxy(config.WgIface, config.RemoteWgKey.String(), config.WgAllowedIPs, config.WgListenAddr, config.PreSharedKey), - Status: StatusDisconnected, - } -} - -// Open opens connection to a remote peer. -// Will block until the connection has successfully established -func (conn *Connection) Open(timeout time.Duration) error { - - // create an ice.Agent that will be responsible for negotiating and establishing actual peer-to-peer connection - a, err := ice.NewAgent(&ice.AgentConfig{ - // MulticastDNSMode: ice.MulticastDNSModeQueryAndGather, - NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4}, - Urls: conn.Config.StunTurnURLS, - CandidateTypes: []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay}, - InterfaceFilter: func(s string) bool { - if conn.Config.iFaceBlackList == nil { - return true - } - _, ok := conn.Config.iFaceBlackList[s] - return !ok - }, - }) - if err != nil { - return err - } - - conn.agent = a - defer func() { - err := conn.agent.Close() - if err != nil { - return - } - }() - - err = conn.listenOnLocalCandidates() - if err != nil { - return err - } - - err = conn.listenOnConnectionStateChanges() - if err != nil { - return err - } - - err = conn.signalCredentials() - if err != nil { - return err - } - - conn.Status = StatusConnecting - log.Debugf("trying to connect to peer %s", conn.Config.RemoteWgKey.String()) - - // wait until credentials have been sent from the remote peer (will arrive via a signal server) - select { - case remoteAuth := <-conn.remoteAuthChannel: - - log.Debugf("got a connection confirmation from peer %s", conn.Config.RemoteWgKey.String()) - - err = conn.agent.GatherCandidates() - if err != nil { - return err - } - - isControlling := conn.Config.WgKey.PublicKey().String() > conn.Config.RemoteWgKey.String() - var remoteConn *ice.Conn - remoteConn, err = conn.openConnectionToRemote(isControlling, remoteAuth) - if err != nil { - log.Errorf("failed establishing connection with the remote peer %s %s", conn.Config.RemoteWgKey.String(), err) - return err - } - - var pair *ice.CandidatePair - pair, err = conn.agent.GetSelectedCandidatePair() - if err != nil { - return err - } - - useProxy := useProxy(pair) - - // in case the remote peer is in the local network or one of the peers has public static IP -> no need for a Wireguard proxy, direct communication is possible. - if !useProxy { - log.Debugf("it is possible to establish a direct connection (without proxy) to peer %s - my addr: %s, remote addr: %s", conn.Config.RemoteWgKey.String(), pair.Local, pair.Remote) - err = conn.wgProxy.StartLocal(fmt.Sprintf("%s:%d", pair.Remote.Address(), iface.WgPort)) - if err != nil { - return err - } - - } else { - log.Debugf("establishing secure tunnel to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) - err = conn.wgProxy.Start(remoteConn) - if err != nil { - return err - } - } - - relayed := pair.Remote.Type() == ice.CandidateTypeRelay || pair.Local.Type() == ice.CandidateTypeRelay - - conn.Status = StatusConnected - log.Infof("opened connection to peer %s [localProxy=%v, relayed=%v]", conn.Config.RemoteWgKey.String(), useProxy, relayed) - case <-conn.closeCond.C: - conn.Status = StatusDisconnected - return fmt.Errorf("connection to peer %s has been closed", conn.Config.RemoteWgKey.String()) - case <-time.After(timeout): - err = conn.Close() - if err != nil { - log.Warnf("error while closing connection to peer %s -> %s", conn.Config.RemoteWgKey.String(), err.Error()) - } - conn.Status = StatusDisconnected - return fmt.Errorf("timeout of %vs exceeded while waiting for the remote peer %s", timeout.Seconds(), conn.Config.RemoteWgKey.String()) - } - - // wait until connection has been closed - <-conn.closeCond.C - conn.Status = StatusDisconnected - return fmt.Errorf("connection to peer %s has been closed", conn.Config.RemoteWgKey.String()) -} - -func isPublicIP(ip net.IP) bool { - if ip.IsLoopback() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() { - return false - } - - for _, block := range privateIPBlocks { - if block.Contains(ip) { - return false - } - } - return true -} - -//useProxy determines whether a direct connection (without a go proxy) is possible -//There are 3 cases: one of the peers has a public IP or both peers are in the same private network -//Please note, that this check happens when peers were already able to ping each other with ICE layer. -func useProxy(pair *ice.CandidatePair) bool { - remoteIP := net.ParseIP(pair.Remote.Address()) - myIp := net.ParseIP(pair.Local.Address()) - remoteIsPublic := isPublicIP(remoteIP) - myIsPublic := isPublicIP(myIp) - - //one of the hosts has a public IP - if remoteIsPublic && pair.Remote.Type() == ice.CandidateTypeHost { - return false - } - if myIsPublic && pair.Local.Type() == ice.CandidateTypeHost { - return false - } - - if pair.Local.Type() == ice.CandidateTypeHost && pair.Remote.Type() == ice.CandidateTypeHost { - if !remoteIsPublic && !myIsPublic { - //both hosts are in the same private network - return false - } - } - - return true -} - -// Close Closes a peer connection -func (conn *Connection) Close() error { - var err error - conn.closeCond.Do(func() { - - log.Debugf("closing connection to peer %s", conn.Config.RemoteWgKey.String()) - - if a := conn.agent; a != nil { - e := a.Close() - if e != nil { - log.Warnf("error while closing ICE agent of peer connection %s", conn.Config.RemoteWgKey.String()) - err = e - } - } - - if c := conn.wgProxy; c != nil { - e := c.Close() - if e != nil { - log.Warnf("error while closingWireguard proxy connection of peer connection %s", conn.Config.RemoteWgKey.String()) - err = e - } - } - }) - return err -} - -// OnAnswer Handles the answer from the other peer -func (conn *Connection) OnAnswer(remoteAuth IceCredentials) error { - - conn.remoteAuthCond.Do(func() { - log.Debugf("OnAnswer from peer %s", conn.Config.RemoteWgKey.String()) - conn.remoteAuthChannel <- remoteAuth - }) - return nil -} - -// OnOffer Handles the offer from the other peer -func (conn *Connection) OnOffer(remoteAuth IceCredentials) error { - - conn.remoteAuthCond.Do(func() { - log.Debugf("OnOffer from peer %s", conn.Config.RemoteWgKey.String()) - conn.remoteAuthChannel <- remoteAuth - uFrag, pwd, err := conn.agent.GetLocalUserCredentials() - if err != nil { //nolint - } - - err = conn.signalAnswer(uFrag, pwd) - if err != nil { //nolint - } - }) - - return nil -} - -// OnRemoteCandidate Handles remote candidate provided by the peer. -func (conn *Connection) OnRemoteCandidate(candidate ice.Candidate) error { - - log.Debugf("onRemoteCandidate from peer %s -> %s", conn.Config.RemoteWgKey.String(), candidate.String()) - - err := conn.agent.AddRemoteCandidate(candidate) - if err != nil { - return err - } - - return nil -} - -// openConnectionToRemote opens an ice.Conn to the remote peer. This is a real peer-to-peer connection -// blocks until connection has been established -func (conn *Connection) openConnectionToRemote(isControlling bool, credentials IceCredentials) (*ice.Conn, error) { - var realConn *ice.Conn - var err error - - if isControlling { - realConn, err = conn.agent.Dial(context.TODO(), credentials.uFrag, credentials.pwd) - } else { - realConn, err = conn.agent.Accept(context.TODO(), credentials.uFrag, credentials.pwd) - } - - if err != nil { - return nil, err - } - - return realConn, err -} - -// signalCredentials prepares local user credentials and signals them to the remote peer -func (conn *Connection) signalCredentials() error { - localUFrag, localPwd, err := conn.agent.GetLocalUserCredentials() - if err != nil { - return err - } - - err = conn.signalOffer(localUFrag, localPwd) - if err != nil { - return err - } - return nil -} - -// listenOnLocalCandidates registers callback of an ICE Agent to receive new local connection candidates and then -// signals them to the remote peer -func (conn *Connection) listenOnLocalCandidates() error { - err := conn.agent.OnCandidate(func(candidate ice.Candidate) { - if candidate != nil { - log.Debugf("discovered local candidate %s", candidate.String()) - err := conn.signalCandidate(candidate) - if err != nil { - log.Errorf("failed signaling candidate to the remote peer %s %s", conn.Config.RemoteWgKey.String(), err) - //todo ?? - return - } - } - }) - - if err != nil { - return err - } - - return nil -} - -// listenOnConnectionStateChanges registers callback of an ICE Agent to track connection state -func (conn *Connection) listenOnConnectionStateChanges() error { - err := conn.agent.OnConnectionStateChange(func(state ice.ConnectionState) { - log.Debugf("ICE Connection State has changed for peer %s -> %s", conn.Config.RemoteWgKey.String(), state.String()) - if state == ice.ConnectionStateConnected { - // closed the connection has been established we can check the selected candidate pair - pair, err := conn.agent.GetSelectedCandidatePair() - if err != nil { - log.Errorf("failed selecting active ICE candidate pair %s", err) - return - } - log.Debugf("ICE connected to peer %s via a selected connnection candidate pair %s", conn.Config.RemoteWgKey.String(), pair) - } else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed { - err := conn.Close() - if err != nil { - log.Warnf("error while closing connection to peer %s -> %s", conn.Config.RemoteWgKey.String(), err.Error()) - } - } - }) - - if err != nil { - return err - } - - return nil -} diff --git a/client/internal/engine.go b/client/internal/engine.go index 6ac12a88f..603eb4481 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -3,9 +3,10 @@ package internal import ( "context" "fmt" - "github.com/cenkalti/backoff/v4" "github.com/pion/ice/v2" log "github.com/sirupsen/logrus" + "github.com/wiretrustee/wiretrustee/client/internal/peer" + "github.com/wiretrustee/wiretrustee/client/internal/proxy" "github.com/wiretrustee/wiretrustee/iface" mgm "github.com/wiretrustee/wiretrustee/management/client" mgmProto "github.com/wiretrustee/wiretrustee/management/proto" @@ -20,11 +21,14 @@ import ( // PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer. // E.g. this peer will wait PeerConnectionTimeoutMax for the remote peer to respond, if not successful then it will retry the connection attempt. -const PeerConnectionTimeoutMax = 45 //sec -const PeerConnectionTimeoutMin = 30 //sec +const PeerConnectionTimeoutMax = 45000 //ms +const PeerConnectionTimeoutMin = 30000 //ms + +const WgPort = 51820 // EngineConfig is a config for the Engine type EngineConfig struct { + WgPort int WgIface string // WgAddr is a Wireguard local address (Wiretrustee Network IP) WgAddr string @@ -42,21 +46,13 @@ type Engine struct { signal *signal.Client // mgmClient is a Management Service client mgmClient *mgm.Client - // conns is a collection of remote peer connections indexed by local public key of the remote peers - conns map[string]*Connection - // peerMap is a map that holds all the peers that are known to this peer - peerMap map[string]struct{} + // peerConns is a map that holds all the peers that are known to this peer + peerConns map[string]*peer.Conn - // peerMux is used to sync peer operations (e.g. open connection, peer removal) - peerMux *sync.Mutex // syncMsgMux is used to guarantee sequential Management Service message processing syncMsgMux *sync.Mutex config *EngineConfig - - // wgPort is a Wireguard local listen port - wgPort int - // STUNs is a list of STUN servers used by ICE STUNs []*ice.URL // TURNs is a list of STUN servers used by ICE @@ -78,9 +74,7 @@ func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *Engin return &Engine{ signal: signalClient, mgmClient: mgmClient, - conns: map[string]*Connection{}, - peerMap: map[string]struct{}{}, - peerMux: &sync.Mutex{}, + peerConns: map[string]*peer.Conn{}, syncMsgMux: &sync.Mutex{}, config: config, STUNs: []*ice.URL{}, @@ -91,13 +85,16 @@ func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *Engin } func (e *Engine) Stop() error { + e.syncMsgMux.Lock() + defer e.syncMsgMux.Unlock() + err := e.removeAllPeerConnections() if err != nil { return err } log.Debugf("removing Wiretrustee interface %s", e.config.WgIface) - err = iface.Close() + err = iface.Close(e.config.WgIface) if err != nil { log.Errorf("failed closing Wiretrustee interface %s %v", e.config.WgIface, err) return err @@ -112,6 +109,8 @@ func (e *Engine) Stop() error { // Connections to remote peers are not established here. // However, they will be established once an event with a list of peers to connect to will be received from Management Service func (e *Engine) Start() error { + e.syncMsgMux.Lock() + defer e.syncMsgMux.Unlock() wgIface := e.config.WgIface wgAddr := e.config.WgAddr @@ -123,86 +122,33 @@ func (e *Engine) Start() error { return err } - err = iface.Configure(wgIface, myPrivateKey.String()) + err = iface.Configure(wgIface, myPrivateKey.String(), e.config.WgPort) if err != nil { log.Errorf("failed configuring Wireguard interface [%s]: %s", wgIface, err.Error()) return err } - port, err := iface.GetListenPort(wgIface) - if err != nil { - log.Errorf("failed getting Wireguard listen port [%s]: %s", wgIface, err.Error()) - return err - } - e.wgPort = *port - e.receiveSignalEvents() e.receiveManagementEvents() return nil } -// initializePeer peer agent attempt to open connection -func (e *Engine) initializePeer(peer Peer) { - - e.peerMap[peer.WgPubKey] = struct{}{} - - var backOff = backoff.WithContext(&backoff.ExponentialBackOff{ - InitialInterval: backoff.DefaultInitialInterval, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, - MaxInterval: 5 * time.Second, - MaxElapsedTime: 0, //never stop - Stop: backoff.Stop, - Clock: backoff.SystemClock, - }, e.ctx) - - operation := func() error { - - if e.signal.GetStatus() != signal.StreamConnected { - return fmt.Errorf("not opening connection to peer because Signal is unavailable") - } - - _, err := e.openPeerConnection(e.wgPort, e.config.WgPrivateKey, peer) - e.peerMux.Lock() - defer e.peerMux.Unlock() - if _, ok := e.peerMap[peer.WgPubKey]; !ok { - log.Debugf("peer was removed: %v, stop connecting", peer.WgPubKey) - return nil - } - - if err != nil { - log.Debugf("retrying connection because of error: %s", err.Error()) - return err - } - return nil - } - - go func() { - err := backoff.Retry(operation, backOff) - if err != nil { - // should actually never happen - panic(err) - } - }() -} - func (e *Engine) removePeers(peers []string) error { - for _, peer := range peers { - err := e.removePeer(peer) + for _, p := range peers { + err := e.removePeer(p) if err != nil { return err } + log.Infof("removed peer %s", p) } return nil } func (e *Engine) removeAllPeerConnections() error { log.Debugf("removing all peer connections") - e.peerMux.Lock() - defer e.peerMux.Unlock() - for peer := range e.conns { - err := e.removePeer(peer) + for p := range e.peerConns { + err := e.removePeer(p) if err != nil { return err } @@ -212,69 +158,39 @@ func (e *Engine) removeAllPeerConnections() error { // removePeer closes an existing peer connection and removes a peer func (e *Engine) removePeer(peerKey string) error { - - delete(e.peerMap, peerKey) - - conn, exists := e.conns[peerKey] - if exists && conn != nil { - delete(e.conns, peerKey) + log.Debugf("removing peer from engine %s", peerKey) + conn, exists := e.peerConns[peerKey] + if exists { + delete(e.peerConns, peerKey) return conn.Close() } - log.Infof("removed peer %s", peerKey) return nil } // GetPeerConnectionStatus returns a connection Status or nil if peer connection wasn't found -func (e *Engine) GetPeerConnectionStatus(peerKey string) *Status { - e.peerMux.Lock() - defer e.peerMux.Unlock() +func (e *Engine) GetPeerConnectionStatus(peerKey string) peer.ConnStatus { - conn, exists := e.conns[peerKey] + conn, exists := e.peerConns[peerKey] if exists && conn != nil { - return &conn.Status + return conn.Status() } - return nil + return -1 } -// openPeerConnection opens a new remote peer connection -func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Connection, error) { +// GetConnectedPeers returns a connection Status or nil if peer connection wasn't found +func (e *Engine) GetConnectedPeers() []string { + e.syncMsgMux.Lock() + defer e.syncMsgMux.Unlock() - remoteKey, _ := wgtypes.ParseKey(peer.WgPubKey) - connConfig := &ConnConfig{ - WgListenAddr: fmt.Sprintf("127.0.0.1:%d", wgPort), - WgPeerIP: e.config.WgAddr, - WgIface: e.config.WgIface, - WgAllowedIPs: peer.WgAllowedIps, - WgKey: myKey, - RemoteWgKey: remoteKey, - StunTurnURLS: append(e.STUNs, e.TURNs...), - iFaceBlackList: e.config.IFaceBlackList, - PreSharedKey: e.config.PreSharedKey, + peers := []string{} + for s, conn := range e.peerConns { + if conn.Status() == peer.StatusConnected { + peers = append(peers, s) + } } - signalOffer := func(uFrag string, pwd string) error { - return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, false) - } - - signalAnswer := func(uFrag string, pwd string) error { - return signalAuth(uFrag, pwd, myKey, remoteKey, e.signal, true) - } - signalCandidate := func(candidate ice.Candidate) error { - return signalCandidate(candidate, myKey, remoteKey, e.signal) - } - conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer) - e.peerMux.Lock() - e.conns[remoteKey.String()] = conn - e.peerMux.Unlock() - - // blocks until the connection is open (or timeout) - timeout := rand.Intn(PeerConnectionTimeoutMax-PeerConnectionTimeoutMin) + PeerConnectionTimeoutMin - err := conn.Open(time.Duration(timeout) * time.Second) - if err != nil { - return nil, err - } - return conn, nil + return peers } func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error { @@ -400,17 +316,15 @@ func (e *Engine) updateTURNs(turns []*mgmProto.ProtectedHostConfig) error { } func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error { - e.peerMux.Lock() - defer e.peerMux.Unlock() log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(remotePeers)) remotePeerMap := make(map[string]struct{}) - for _, peer := range remotePeers { - remotePeerMap[peer.GetWgPubKey()] = struct{}{} + for _, p := range remotePeers { + remotePeerMap[p.GetWgPubKey()] = struct{}{} } //remove peers that are no longer available for us toRemove := []string{} - for p := range e.conns { + for p := range e.peerConns { if _, ok := remotePeerMap[p]; !ok { toRemove = append(toRemove, p) } @@ -421,20 +335,115 @@ func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error { } // add new peers - for _, peer := range remotePeers { - peerKey := peer.GetWgPubKey() - peerIPs := peer.GetAllowedIps() - if _, ok := e.peerMap[peerKey]; !ok { - e.initializePeer(Peer{ - WgPubKey: peerKey, - WgAllowedIps: strings.Join(peerIPs, ","), - }) + for _, p := range remotePeers { + peerKey := p.GetWgPubKey() + peerIPs := p.GetAllowedIps() + if _, ok := e.peerConns[peerKey]; !ok { + conn, err := e.createPeerConn(peerKey, strings.Join(peerIPs, ",")) + if err != nil { + return err + } + e.peerConns[peerKey] = conn + + go e.connWorker(conn, peerKey) } } return nil } +func (e Engine) connWorker(conn *peer.Conn, peerKey string) { + for { + + // randomize starting time a bit + min := 500 + max := 2000 + time.Sleep(time.Duration(rand.Intn(max-min)+min) * time.Millisecond) + + // if peer has been removed -> give up + if !e.peerExists(peerKey) { + log.Infof("peer %s doesn't exist anymore, won't retry connection", peerKey) + return + } + + if !e.signal.Ready() { + log.Infof("signal client isn't ready, skipping connection attempt %s", peerKey) + continue + } + + err := conn.Open() + if err != nil { + log.Debugf("connection to peer %s failed: %v", peerKey, err) + } + } +} + +func (e Engine) peerExists(peerKey string) bool { + e.syncMsgMux.Lock() + defer e.syncMsgMux.Unlock() + _, ok := e.peerConns[peerKey] + return ok +} + +func (e Engine) createPeerConn(pubKey string, allowedIPs string) (*peer.Conn, error) { + + var stunTurn []*ice.URL + stunTurn = append(stunTurn, e.STUNs...) + stunTurn = append(stunTurn, e.TURNs...) + + interfaceBlacklist := make([]string, 0, len(e.config.IFaceBlackList)) + for k := range e.config.IFaceBlackList { + interfaceBlacklist = append(interfaceBlacklist, k) + } + + proxyConfig := proxy.Config{ + RemoteKey: pubKey, + WgListenAddr: fmt.Sprintf("127.0.0.1:%d", e.config.WgPort), + WgInterface: e.config.WgIface, + AllowedIps: allowedIPs, + PreSharedKey: e.config.PreSharedKey, + } + + // randomize connection timeout + timeout := time.Duration(rand.Intn(PeerConnectionTimeoutMax-PeerConnectionTimeoutMin)+PeerConnectionTimeoutMin) * time.Millisecond + config := peer.ConnConfig{ + Key: pubKey, + LocalKey: e.config.WgPrivateKey.PublicKey().String(), + StunTurn: stunTurn, + InterfaceBlackList: interfaceBlacklist, + Timeout: timeout, + ProxyConfig: proxyConfig, + } + + peerConn, err := peer.NewConn(config) + if err != nil { + return nil, err + } + + wgPubKey, err := wgtypes.ParseKey(pubKey) + if err != nil { + return nil, err + } + + signalOffer := func(uFrag string, pwd string) error { + return signalAuth(uFrag, pwd, e.config.WgPrivateKey, wgPubKey, e.signal, false) + } + + signalCandidate := func(candidate ice.Candidate) error { + return signalCandidate(candidate, e.config.WgPrivateKey, wgPubKey, e.signal) + } + + signalAnswer := func(uFrag string, pwd string) error { + return signalAuth(uFrag, pwd, e.config.WgPrivateKey, wgPubKey, e.signal, true) + } + + peerConn.SetSignalCandidate(signalCandidate) + peerConn.SetSignalOffer(signalOffer) + peerConn.SetSignalAnswer(signalAnswer) + + return peerConn, nil +} + // receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers func (e *Engine) receiveSignalEvents() { @@ -445,58 +454,37 @@ func (e *Engine) receiveSignalEvents() { e.syncMsgMux.Lock() defer e.syncMsgMux.Unlock() - conn := e.conns[msg.Key] + conn := e.peerConns[msg.Key] if conn == nil { return fmt.Errorf("wrongly addressed message %s", msg.Key) } - if conn.Config.RemoteWgKey.String() != msg.Key { - return fmt.Errorf("unknown peer %s", msg.Key) - } - switch msg.GetBody().Type { case sProto.Body_OFFER: remoteCred, err := signal.UnMarshalCredential(msg) if err != nil { return err } - err = conn.OnOffer(IceCredentials{ - uFrag: remoteCred.UFrag, - pwd: remoteCred.Pwd, + conn.OnRemoteOffer(peer.IceCredentials{ + UFrag: remoteCred.UFrag, + Pwd: remoteCred.Pwd, }) - - if err != nil { - return err - } - - return nil case sProto.Body_ANSWER: remoteCred, err := signal.UnMarshalCredential(msg) if err != nil { return err } - err = conn.OnAnswer(IceCredentials{ - uFrag: remoteCred.UFrag, - pwd: remoteCred.Pwd, + conn.OnRemoteAnswer(peer.IceCredentials{ + UFrag: remoteCred.UFrag, + Pwd: remoteCred.Pwd, }) - - if err != nil { - return err - } - case sProto.Body_CANDIDATE: - candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload) if err != nil { log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err) return err } - - err = conn.OnRemoteCandidate(candidate) - if err != nil { - log.Errorf("error handling CANDIATE from %s", msg.Key) - return err - } + conn.OnRemoteCandidate(candidate) } return nil diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go new file mode 100644 index 000000000..7ceb284e3 --- /dev/null +++ b/client/internal/engine_test.go @@ -0,0 +1,208 @@ +package internal + +import ( + "context" + "fmt" + log "github.com/sirupsen/logrus" + mgmt "github.com/wiretrustee/wiretrustee/management/client" + mgmtProto "github.com/wiretrustee/wiretrustee/management/proto" + "github.com/wiretrustee/wiretrustee/management/server" + signal "github.com/wiretrustee/wiretrustee/signal/client" + "github.com/wiretrustee/wiretrustee/signal/proto" + signalServer "github.com/wiretrustee/wiretrustee/signal/server" + "github.com/wiretrustee/wiretrustee/util" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "google.golang.org/grpc" + "google.golang.org/grpc/keepalive" + "net" + "os" + "path/filepath" + "runtime" + "sync" + "testing" + "time" +) + +var ( + kaep = keepalive.EnforcementPolicy{ + MinTime: 15 * time.Second, + PermitWithoutStream: true, + } + + kasp = keepalive.ServerParameters{ + MaxConnectionIdle: 15 * time.Second, + MaxConnectionAgeGrace: 5 * time.Second, + Time: 5 * time.Second, + Timeout: 2 * time.Second, + } +) + +func TestEngine_MultiplePeers(t *testing.T) { + + //log.SetLevel(log.DebugLevel) + + dir := t.TempDir() + + err := util.CopyFileContents("../testdata/store.json", filepath.Join(dir, "store.json")) + if err != nil { + t.Fatal(err) + } + defer func() { + os.Remove(filepath.Join(dir, "store.json")) //nolint + }() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + signalServer, err := startSignal(10000) + if err != nil { + t.Fatal(err) + return + } + defer signalServer.Stop() + + mgmtServer, err := startManagement(33071, &server.Config{ + Stuns: []*server.Host{}, + TURNConfig: &server.TURNConfig{}, + Signal: &server.Host{ + Proto: "http", + URI: "localhost:10000", + }, + Datadir: dir, + HttpConfig: nil, + }) + if err != nil { + t.Fatal(err) + return + } + defer mgmtServer.Stop() + + setupKey := "A2C8E62B-38F5-4553-B31E-DD66C696CEBB" + + mu := sync.Mutex{} + engines := []*Engine{} + numPeers := 10 + wg := sync.WaitGroup{} + wg.Add(numPeers) + // create and start peers + for i := 0; i < numPeers; i++ { + j := i + go func() { + engine, err := createEngine(ctx, cancel, setupKey, j) + if err != nil { + return + } + mu.Lock() + defer mu.Unlock() + engine.Start() //nolint + engines = append(engines, engine) + wg.Done() + }() + } + + // wait until all have been created and started + wg.Wait() + + // check whether all the peer have expected peers connected + expectedConnected := numPeers * (numPeers - 1) + for { + time.Sleep(time.Second) + totalConnected := 0 + for _, engine := range engines { + totalConnected = totalConnected + len(engine.GetConnectedPeers()) + } + if totalConnected == expectedConnected { + break + } + log.Infof("total connected=%d", totalConnected) + } +} + +func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey string, i int) (*Engine, error) { + + key, err := wgtypes.GenerateKey() + if err != nil { + return nil, err + } + mgmtClient, err := mgmt.NewClient(ctx, "localhost:33071", key, false) + if err != nil { + return nil, err + } + signalClient, err := signal.NewClient(ctx, "localhost:10000", key, false) + if err != nil { + return nil, err + } + + publicKey, err := mgmtClient.GetServerPublicKey() + if err != nil { + return nil, err + } + + resp, err := mgmtClient.Register(*publicKey, setupKey) + if err != nil { + return nil, err + } + + var ifaceName string + if runtime.GOOS == "darwin" { + ifaceName = fmt.Sprintf("utun1%d", i) + } else { + ifaceName = fmt.Sprintf("wt%d", i) + } + + conf := &EngineConfig{ + WgIface: ifaceName, + WgAddr: resp.PeerConfig.Address, + WgPrivateKey: key, + WgPort: 33100 + i, + } + + return NewEngine(signalClient, mgmtClient, conf, cancel, ctx), nil +} + +func startSignal(port int) (*grpc.Server, error) { + s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) + + lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + proto.RegisterSignalExchangeServer(s, signalServer.NewServer()) + + go func() { + if err = s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } + }() + + return s, nil +} + +func startManagement(port int, config *server.Config) (*grpc.Server, error) { + + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port)) + if err != nil { + return nil, err + } + s := grpc.NewServer(grpc.KeepaliveEnforcementPolicy(kaep), grpc.KeepaliveParams(kasp)) + store, err := server.NewStore(config.Datadir) + if err != nil { + log.Fatalf("failed creating a store: %s: %v", config.Datadir, err) + } + peersUpdateManager := server.NewPeersUpdateManager() + accountManager := server.NewManager(store, peersUpdateManager) + turnManager := server.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig) + mgmtServer, err := server.NewServer(config, accountManager, peersUpdateManager, turnManager) + if err != nil { + return nil, err + } + mgmtProto.RegisterManagementServiceServer(s, mgmtServer) + go func() { + if err = s.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } + }() + + return s, nil +} diff --git a/client/internal/peer/conn.go b/client/internal/peer/conn.go new file mode 100644 index 000000000..9b881559b --- /dev/null +++ b/client/internal/peer/conn.go @@ -0,0 +1,421 @@ +package peer + +import ( + "context" + "github.com/pion/ice/v2" + log "github.com/sirupsen/logrus" + "github.com/wiretrustee/wiretrustee/client/internal/proxy" + "net" + "sync" + "time" +) + +// ConnConfig is a peer Connection configuration +type ConnConfig struct { + + // Key is a public key of a remote peer + Key string + // LocalKey is a public key of a local peer + LocalKey string + + // StunTurn is a list of STUN and TURN URLs + StunTurn []*ice.URL + + // InterfaceBlackList is a list of machine interfaces that should be filtered out by ICE Candidate gathering + // (e.g. if eth0 is in the list, host candidate of this interface won't be used) + InterfaceBlackList []string + + Timeout time.Duration + + ProxyConfig proxy.Config +} + +// IceCredentials ICE protocol credentials struct +type IceCredentials struct { + UFrag string + Pwd string +} + +type Conn struct { + config ConnConfig + mu sync.Mutex + + // signalCandidate is a handler function to signal remote peer about local connection candidate + signalCandidate func(candidate ice.Candidate) error + // signalOffer is a handler function to signal remote peer our connection offer (credentials) + signalOffer func(uFrag string, pwd string) error + signalAnswer func(uFrag string, pwd string) error + + // remoteOffersCh is a channel used to wait for remote credentials to proceed with the connection + remoteOffersCh chan IceCredentials + // remoteAnswerCh is a channel used to wait for remote credentials answer (confirmation of our offer) to proceed with the connection + remoteAnswerCh chan IceCredentials + closeCh chan struct{} + ctx context.Context + notifyDisconnected context.CancelFunc + + agent *ice.Agent + status ConnStatus + + proxy proxy.Proxy +} + +// NewConn creates a new not opened Conn to the remote peer. +// To establish a connection run Conn.Open +func NewConn(config ConnConfig) (*Conn, error) { + return &Conn{ + config: config, + mu: sync.Mutex{}, + status: StatusDisconnected, + closeCh: make(chan struct{}), + remoteOffersCh: make(chan IceCredentials), + remoteAnswerCh: make(chan IceCredentials), + }, nil +} + +// interfaceFilter is a function passed to ICE Agent to filter out blacklisted interfaces +func interfaceFilter(blackList []string) func(string) bool { + var blackListMap map[string]struct{} + if blackList != nil { + blackListMap = make(map[string]struct{}) + for _, s := range blackList { + blackListMap[s] = struct{}{} + } + } + return func(iFace string) bool { + if len(blackListMap) == 0 { + return true + } + _, ok := blackListMap[iFace] + return !ok + } +} + +func (conn *Conn) reCreateAgent() error { + conn.mu.Lock() + defer conn.mu.Unlock() + + failedTimeout := 6 * time.Second + var err error + conn.agent, err = ice.NewAgent(&ice.AgentConfig{ + MulticastDNSMode: ice.MulticastDNSModeDisabled, + NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4}, + Urls: conn.config.StunTurn, + CandidateTypes: []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay}, + FailedTimeout: &failedTimeout, + InterfaceFilter: interfaceFilter(conn.config.InterfaceBlackList), + }) + if err != nil { + return err + } + + err = conn.agent.OnCandidate(conn.onICECandidate) + if err != nil { + return err + } + + err = conn.agent.OnConnectionStateChange(conn.onICEConnectionStateChange) + if err != nil { + return err + } + + err = conn.agent.OnSelectedCandidatePairChange(conn.onICESelectedCandidatePair) + if err != nil { + return err + } + + return nil +} + +// Open opens connection to the remote peer starting ICE candidate gathering process. +// Blocks until connection has been closed or connection timeout. +// ConnStatus will be set accordingly +func (conn *Conn) Open() error { + log.Debugf("trying to connect to peer %s", conn.config.Key) + + defer func() { + err := conn.cleanup() + if err != nil { + log.Errorf("error while cleaning up peer connection %s: %v", conn.config.Key, err) + return + } + }() + + err := conn.reCreateAgent() + if err != nil { + return err + } + + err = conn.sendOffer() + if err != nil { + return err + } + + log.Debugf("connection offer sent to peer %s, waiting for the confirmation", conn.config.Key) + + // Only continue once we got a connection confirmation from the remote peer. + // The connection timeout could have happened before a confirmation received from the remote. + // The connection could have also been closed externally (e.g. when we received an update from the management that peer shouldn't be connected) + var remoteCredentials IceCredentials + select { + case remoteCredentials = <-conn.remoteOffersCh: + // received confirmation from the remote peer -> ready to proceed + err = conn.sendAnswer() + if err != nil { + return err + } + case remoteCredentials = <-conn.remoteAnswerCh: + case <-time.After(conn.config.Timeout): + return NewConnectionTimeoutError(conn.config.Key, conn.config.Timeout) + case <-conn.closeCh: + // closed externally + return NewConnectionClosedError(conn.config.Key) + } + + log.Debugf("received connection confirmation from peer %s", conn.config.Key) + + //at this point we received offer/answer and we are ready to gather candidates + conn.mu.Lock() + conn.status = StatusConnecting + conn.ctx, conn.notifyDisconnected = context.WithCancel(context.Background()) + defer conn.notifyDisconnected() + conn.mu.Unlock() + + err = conn.agent.GatherCandidates() + if err != nil { + return err + } + + // will block until connection succeeded + // but it won't release if ICE Agent went into Disconnected or Failed state, + // so we have to cancel it with the provided context once agent detected a broken connection + isControlling := conn.config.LocalKey > conn.config.Key + var remoteConn *ice.Conn + if isControlling { + remoteConn, err = conn.agent.Dial(conn.ctx, remoteCredentials.UFrag, remoteCredentials.Pwd) + } else { + remoteConn, err = conn.agent.Accept(conn.ctx, remoteCredentials.UFrag, remoteCredentials.Pwd) + } + if err != nil { + return err + } + + // the connection has been established successfully so we are ready to start the proxy + err = conn.startProxy(remoteConn) + if err != nil { + return err + } + + log.Infof("connected to peer %s [laddr <-> raddr] [%s <-> %s]", conn.config.Key, remoteConn.LocalAddr().String(), remoteConn.RemoteAddr().String()) + + // wait until connection disconnected or has been closed externally (upper layer, e.g. engine) + select { + case <-conn.closeCh: + // closed externally + return NewConnectionClosedError(conn.config.Key) + case <-conn.ctx.Done(): + // disconnected from the remote peer + return NewConnectionDisconnectedError(conn.config.Key) + } +} + +// startProxy starts proxying traffic from/to local Wireguard and sets connection status to StatusConnected +func (conn *Conn) startProxy(remoteConn net.Conn) error { + conn.mu.Lock() + defer conn.mu.Unlock() + + conn.proxy = proxy.NewWireguardProxy(conn.config.ProxyConfig) + err := conn.proxy.Start(remoteConn) + if err != nil { + return err + } + conn.status = StatusConnected + + return nil +} + +// cleanup closes all open resources and sets status to StatusDisconnected +func (conn *Conn) cleanup() error { + log.Debugf("trying to cleanup %s", conn.config.Key) + conn.mu.Lock() + defer conn.mu.Unlock() + + if conn.agent != nil { + err := conn.agent.Close() + if err != nil { + return err + } + conn.agent = nil + } + + if conn.proxy != nil { + err := conn.proxy.Close() + if err != nil { + return err + } + conn.proxy = nil + } + + if conn.notifyDisconnected != nil { + conn.notifyDisconnected() + conn.notifyDisconnected = nil + } + + conn.status = StatusDisconnected + + log.Debugf("cleaned up connection to peer %s", conn.config.Key) + + return nil +} + +// SetSignalOffer sets a handler function to be triggered by Conn when a new connection offer has to be signalled to the remote peer +func (conn *Conn) SetSignalOffer(handler func(uFrag string, pwd string) error) { + conn.signalOffer = handler +} + +// SetSignalAnswer sets a handler function to be triggered by Conn when a new connection answer has to be signalled to the remote peer +func (conn *Conn) SetSignalAnswer(handler func(uFrag string, pwd string) error) { + conn.signalAnswer = handler +} + +// SetSignalCandidate sets a handler function to be triggered by Conn when a new ICE local connection candidate has to be signalled to the remote peer +func (conn *Conn) SetSignalCandidate(handler func(candidate ice.Candidate) error) { + conn.signalCandidate = handler +} + +// onICECandidate is a callback attached to an ICE Agent to receive new local connection candidates +// and then signals them to the remote peer +func (conn *Conn) onICECandidate(candidate ice.Candidate) { + if candidate != nil { + //log.Debugf("discovered local candidate %s", candidate.String()) + go func() { + err := conn.signalCandidate(candidate) + if err != nil { + log.Errorf("failed signaling candidate to the remote peer %s %s", conn.config.Key, err) + } + }() + } +} + +func (conn *Conn) onICESelectedCandidatePair(c1 ice.Candidate, c2 ice.Candidate) { + log.Debugf("selected candidate pair [local <-> remote] -> [%s <-> %s], peer %s", conn.config.Key, + c1.String(), c2.String()) +} + +// onICEConnectionStateChange registers callback of an ICE Agent to track connection state +func (conn *Conn) onICEConnectionStateChange(state ice.ConnectionState) { + log.Debugf("peer %s ICE ConnectionState has changed to %s", conn.config.Key, state.String()) + if state == ice.ConnectionStateFailed || state == ice.ConnectionStateDisconnected { + conn.notifyDisconnected() + } +} + +func (conn *Conn) sendAnswer() error { + conn.mu.Lock() + defer conn.mu.Unlock() + + localUFrag, localPwd, err := conn.agent.GetLocalUserCredentials() + if err != nil { + return err + } + + log.Debugf("sending asnwer to %s", conn.config.Key) + err = conn.signalAnswer(localUFrag, localPwd) + if err != nil { + return err + } + + return nil +} + +// sendOffer prepares local user credentials and signals them to the remote peer +func (conn *Conn) sendOffer() error { + conn.mu.Lock() + defer conn.mu.Unlock() + + localUFrag, localPwd, err := conn.agent.GetLocalUserCredentials() + if err != nil { + return err + } + err = conn.signalOffer(localUFrag, localPwd) + if err != nil { + return err + } + return nil +} + +// Close closes this peer Conn issuing a close event to the Conn closeCh +func (conn *Conn) Close() error { + conn.mu.Lock() + defer conn.mu.Unlock() + select { + case conn.closeCh <- struct{}{}: + default: + // probably could happen when peer has been added and removed right after not even starting to connect + // todo further investigate + // this really happens due to unordered messages coming from management + // more importantly it causes inconsistency -> 2 Conn objects for the same peer + // e.g. this flow: + // update from management has peers: [1,2,3,4] + // engine creates a Conn for peers: [1,2,3,4] and schedules Open in ~1sec + // before conn.Open() another update from management arrives with peers: [1,2,3] + // engine removes peer 4 and calls conn.Close() which does nothing (this default clause) + // before conn.Open() another update from management arrives with peers: [1,2,3,4,5] + // engine adds a new Conn for 4 and 5 + // therefore peer 4 has 2 Conn objects + log.Warnf("closing not started coonection %s", conn.config.Key) + } + return nil +} + +// Status returns current status of the Conn +func (conn *Conn) Status() ConnStatus { + conn.mu.Lock() + defer conn.mu.Unlock() + return conn.status +} + +// OnRemoteOffer handles an offer from the remote peer +// can block until Conn restarts +func (conn *Conn) OnRemoteOffer(remoteAuth IceCredentials) { + log.Debugf("OnRemoteOffer from peer %s on status %s", conn.config.Key, conn.status.String()) + + select { + case conn.remoteOffersCh <- remoteAuth: + default: + log.Debugf("OnRemoteOffer skipping message from peer %s on status %s because is not ready", conn.config.Key, conn.status.String()) + //connection might not be ready yet to receive so we ignore the message + } +} + +// OnRemoteAnswer handles an offer from the remote peer +// can block until Conn restarts +func (conn *Conn) OnRemoteAnswer(remoteAuth IceCredentials) { + log.Debugf("OnRemoteAnswer from peer %s on status %s", conn.config.Key, conn.status.String()) + + select { + case conn.remoteAnswerCh <- remoteAuth: + default: + //connection might not be ready yet to receive so we ignore the message + log.Debugf("OnRemoteAnswer skipping message from peer %s on status %s because is not ready", conn.config.Key, conn.status.String()) + } +} + +// OnRemoteCandidate Handles ICE connection Candidate provided by the remote peer. +func (conn *Conn) OnRemoteCandidate(candidate ice.Candidate) { + log.Debugf("OnRemoteCandidate from peer %s -> %s", conn.config.Key, candidate.String()) + go func() { + conn.mu.Lock() + defer conn.mu.Unlock() + + if conn.agent == nil { + return + } + + err := conn.agent.AddRemoteCandidate(candidate) + if err != nil { + log.Errorf("error while handling remote candidate from peer %s", conn.config.Key) + return + } + }() +} diff --git a/client/internal/peer/error.go b/client/internal/peer/error.go new file mode 100644 index 000000000..91d69532b --- /dev/null +++ b/client/internal/peer/error.go @@ -0,0 +1,56 @@ +package peer + +import ( + "fmt" + "time" +) + +// ConnectionTimeoutError is an error indicating that a peer Conn has been timed out +type ConnectionTimeoutError struct { + peer string + timeout time.Duration +} + +func (e *ConnectionTimeoutError) Error() string { + return fmt.Sprintf("connection to peer %s timed out after %s", e.peer, e.timeout.String()) +} + +// NewConnectionTimeoutError creates a new ConnectionTimeoutError error +func NewConnectionTimeoutError(peer string, timeout time.Duration) error { + return &ConnectionTimeoutError{ + peer: peer, + timeout: timeout, + } +} + +// ConnectionClosedError is an error indicating that a peer Conn has been forcefully closed +type ConnectionClosedError struct { + peer string +} + +func (e *ConnectionClosedError) Error() string { + return fmt.Sprintf("connection to peer %s has been closed", e.peer) +} + +// NewConnectionClosedError creates a new ConnectionClosedError error +func NewConnectionClosedError(peer string) error { + return &ConnectionClosedError{ + peer: peer, + } +} + +// ConnectionDisconnectedError is an error indicating that a peer Conn has ctx from the remote +type ConnectionDisconnectedError struct { + peer string +} + +func (e *ConnectionDisconnectedError) Error() string { + return fmt.Sprintf("disconnected from peer %s", e.peer) +} + +// NewConnectionDisconnectedError creates a new ConnectionDisconnectedError error +func NewConnectionDisconnectedError(peer string) error { + return &ConnectionDisconnectedError{ + peer: peer, + } +} diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go new file mode 100644 index 000000000..a0cfa4dbe --- /dev/null +++ b/client/internal/peer/status.go @@ -0,0 +1,25 @@ +package peer + +import log "github.com/sirupsen/logrus" + +type ConnStatus int + +func (s ConnStatus) String() string { + switch s { + case StatusConnecting: + return "StatusConnecting" + case StatusConnected: + return "StatusConnected" + case StatusDisconnected: + return "StatusDisconnected" + default: + log.Errorf("unknown status: %d", s) + return "INVALID_PEER_CONNECTION_STATUS" + } +} + +const ( + StatusConnected = iota + StatusConnecting + StatusDisconnected +) diff --git a/client/internal/proxy/dummy.go b/client/internal/proxy/dummy.go new file mode 100644 index 000000000..bce657d2a --- /dev/null +++ b/client/internal/proxy/dummy.go @@ -0,0 +1,68 @@ +package proxy + +import ( + "context" + log "github.com/sirupsen/logrus" + "net" + "time" +) + +// DummyProxy just sends pings to the RemoteKey peer and reads responses +type DummyProxy struct { + conn net.Conn + remote string + ctx context.Context + cancel context.CancelFunc +} + +func NewDummyProxy(remote string) *DummyProxy { + p := &DummyProxy{remote: remote} + p.ctx, p.cancel = context.WithCancel(context.Background()) + return p +} + +func (p *DummyProxy) Close() error { + p.cancel() + return nil +} + +func (p *DummyProxy) Start(remoteConn net.Conn) error { + p.conn = remoteConn + go func() { + buf := make([]byte, 1500) + for { + select { + case <-p.ctx.Done(): + return + default: + _, err := p.conn.Read(buf) + if err != nil { + log.Errorf("error while reading RemoteKey %s proxy %v", p.remote, err) + return + } + //log.Debugf("received %s from %s", string(buf[:n]), p.remote) + } + + } + }() + + go func() { + for { + select { + case <-p.ctx.Done(): + return + default: + _, err := p.conn.Write([]byte("hello")) + //log.Debugf("sent ping to %s", p.remote) + if err != nil { + log.Errorf("error while writing to RemoteKey %s proxy %v", p.remote, err) + return + } + time.Sleep(5 * time.Second) + } + } + + }() + + return nil +} diff --git a/client/internal/proxy/proxy.go b/client/internal/proxy/proxy.go new file mode 100644 index 000000000..5ab892faa --- /dev/null +++ b/client/internal/proxy/proxy.go @@ -0,0 +1,24 @@ +package proxy + +import ( + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "io" + "net" + "time" +) + +const DefaultWgKeepAlive = 25 * time.Second + +type Config struct { + WgListenAddr string + RemoteKey string + WgInterface string + AllowedIps string + PreSharedKey *wgtypes.Key +} + +type Proxy interface { + io.Closer + // Start creates a local remoteConn and starts proxying data from/to remoteConn + Start(remoteConn net.Conn) error +} diff --git a/client/internal/proxy/wireguard.go b/client/internal/proxy/wireguard.go new file mode 100644 index 000000000..b4fa67600 --- /dev/null +++ b/client/internal/proxy/wireguard.go @@ -0,0 +1,121 @@ +package proxy + +import ( + "context" + log "github.com/sirupsen/logrus" + "github.com/wiretrustee/wiretrustee/iface" + "net" +) + +// WireguardProxy proxies +type WireguardProxy struct { + ctx context.Context + cancel context.CancelFunc + + config Config + + remoteConn net.Conn + localConn net.Conn +} + +func NewWireguardProxy(config Config) *WireguardProxy { + p := &WireguardProxy{config: config} + p.ctx, p.cancel = context.WithCancel(context.Background()) + return p +} + +func (p *WireguardProxy) updateEndpoint() error { + // add local proxy connection as a Wireguard peer + err := iface.UpdatePeer(p.config.WgInterface, p.config.RemoteKey, p.config.AllowedIps, DefaultWgKeepAlive, + p.localConn.LocalAddr().String(), p.config.PreSharedKey) + if err != nil { + return err + } + + return nil +} + +func (p *WireguardProxy) Start(remoteConn net.Conn) error { + p.remoteConn = remoteConn + + var err error + p.localConn, err = net.Dial("udp", p.config.WgListenAddr) + if err != nil { + log.Errorf("failed dialing to local Wireguard port %s", err) + return err + } + + err = p.updateEndpoint() + if err != nil { + log.Errorf("error while updating Wireguard peer endpoint [%s] %v", p.config.RemoteKey, err) + return err + } + + go p.proxyToRemote() + go p.proxyToLocal() + + return nil +} + +func (p *WireguardProxy) Close() error { + p.cancel() + if c := p.localConn; c != nil { + err := p.localConn.Close() + if err != nil { + return err + } + } + err := iface.RemovePeer(p.config.WgInterface, p.config.RemoteKey) + if err != nil { + return err + } + return nil +} + +// proxyToRemote proxies everything from Wireguard to the RemoteKey peer +// blocks +func (p *WireguardProxy) proxyToRemote() { + + buf := make([]byte, 1500) + for { + select { + case <-p.ctx.Done(): + log.Debugf("stopped proxying to remote peer %s due to closed connection", p.config.RemoteKey) + return + default: + n, err := p.localConn.Read(buf) + if err != nil { + continue + } + + _, err = p.remoteConn.Write(buf[:n]) + if err != nil { + continue + } + } + } +} + +// proxyToLocal proxies everything from the RemoteKey peer to local Wireguard +// blocks +func (p *WireguardProxy) proxyToLocal() { + + buf := make([]byte, 1500) + for { + select { + case <-p.ctx.Done(): + log.Debugf("stopped proxying from remote peer %s due to closed connection", p.config.RemoteKey) + return + default: + n, err := p.remoteConn.Read(buf) + if err != nil { + continue + } + + _, err = p.localConn.Write(buf[:n]) + if err != nil { + continue + } + } + } +} diff --git a/client/internal/wgproxy.go b/client/internal/wgproxy.go deleted file mode 100644 index 037472d40..000000000 --- a/client/internal/wgproxy.go +++ /dev/null @@ -1,131 +0,0 @@ -package internal - -import ( - ice "github.com/pion/ice/v2" - log "github.com/sirupsen/logrus" - "github.com/wiretrustee/wiretrustee/iface" - "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - "net" -) - -// WgProxy an instance of an instance of the Connection Wireguard Proxy -type WgProxy struct { - iface string - remoteKey string - allowedIps string - wgAddr string - close chan struct{} - wgConn net.Conn - preSharedKey *wgtypes.Key -} - -// NewWgProxy creates a new Connection Wireguard Proxy -func NewWgProxy(iface string, remoteKey string, allowedIps string, wgAddr string, preSharedKey *wgtypes.Key) *WgProxy { - return &WgProxy{ - iface: iface, - remoteKey: remoteKey, - allowedIps: allowedIps, - wgAddr: wgAddr, - close: make(chan struct{}), - preSharedKey: preSharedKey, - } -} - -// Close closes the proxy -func (p *WgProxy) Close() error { - - close(p.close) - if c := p.wgConn; c != nil { - err := p.wgConn.Close() - if err != nil { - return err - } - } - err := iface.RemovePeer(p.iface, p.remoteKey) - if err != nil { - return err - } - - return nil -} - -// StartLocal configure the interface with a peer using a direct IP:Port endpoint to the remote host -func (p *WgProxy) StartLocal(host string) error { - err := iface.UpdatePeer(p.iface, p.remoteKey, p.allowedIps, DefaultWgKeepAlive, host, p.preSharedKey) - if err != nil { - log.Errorf("error while configuring Wireguard peer [%s] %s", p.remoteKey, err.Error()) - return err - } - return nil -} - -// Start starts a new proxy using the ICE connection -func (p *WgProxy) Start(remoteConn *ice.Conn) error { - - wgConn, err := net.Dial("udp", p.wgAddr) - if err != nil { - log.Fatalf("failed dialing to local Wireguard port %s", err) - return err - } - p.wgConn = wgConn - // add local proxy connection as a Wireguard peer - err = iface.UpdatePeer(p.iface, p.remoteKey, p.allowedIps, DefaultWgKeepAlive, - wgConn.LocalAddr().String(), p.preSharedKey) - if err != nil { - log.Errorf("error while configuring Wireguard peer [%s] %s", p.remoteKey, err.Error()) - return err - } - - go func() { p.proxyToRemotePeer(remoteConn) }() - go func() { p.proxyToLocalWireguard(remoteConn) }() - - return err -} - -// proxyToRemotePeer proxies everything from Wireguard to the remote peer -// blocks -func (p *WgProxy) proxyToRemotePeer(remoteConn *ice.Conn) { - - buf := make([]byte, 1500) - for { - select { - case <-p.close: - log.Debugf("stopped proxying from remote peer %s due to closed connection", p.remoteKey) - return - default: - n, err := p.wgConn.Read(buf) - if err != nil { - continue - } - - _, err = remoteConn.Write(buf[:n]) - if err != nil { - continue - } - } - } -} - -// proxyToLocalWireguard proxies everything from the remote peer to local Wireguard -// blocks -func (p *WgProxy) proxyToLocalWireguard(remoteConn *ice.Conn) { - - buf := make([]byte, 1500) - for { - select { - case <-p.close: - log.Debugf("stopped proxying from remote peer %s due to closed connection", p.remoteKey) - return - default: - n, err := remoteConn.Read(buf) - if err != nil { - continue - } - - _, err = p.wgConn.Write(buf[:n]) - if err != nil { - continue - } - } - } -} diff --git a/client/testdata/management.json b/client/testdata/management.json index 2355caa7a..4745f2e8c 100644 --- a/client/testdata/management.json +++ b/client/testdata/management.json @@ -1,37 +1,37 @@ { - "Stuns": [ - { - "Proto": "udp", - "URI": "stun:stun.wiretrustee.com:3468", - "Username": "", - "Password": null - } - ], - "TURNConfig": { - "Turns": [ - { - "Proto": "udp", - "URI": "turn:stun.wiretrustee.com:3468", - "Username": "some_user", - "Password": "c29tZV9wYXNzd29yZA==" - } - ], - "CredentialsTTL": "1h", - "Secret": "c29tZV9wYXNzd29yZA==", - "TimeBasedCredentials": true - }, - "Signal": { - "Proto": "http", - "URI": "signal.wiretrustee.com:10000", - "Username": "", - "Password": null - }, - "DataDir": "", - "HttpConfig": { - "LetsEncryptDomain": "", - "Address": "0.0.0.0:33071", - "AuthIssuer": ",", - "AuthAudience": "", - "AuthKeysLocation": "" + "Stuns": [ + { + "Proto": "udp", + "URI": "stun:stun.wiretrustee.com:3468", + "Username": "", + "Password": null } + ], + "TURNConfig": { + "Turns": [ + { + "Proto": "udp", + "URI": "turn:stun.wiretrustee.com:3468", + "Username": "some_user", + "Password": "c29tZV9wYXNzd29yZA==" + } + ], + "CredentialsTTL": "1h", + "Secret": "c29tZV9wYXNzd29yZA==", + "TimeBasedCredentials": true + }, + "Signal": { + "Proto": "http", + "URI": "signal.wiretrustee.com:10000", + "Username": "", + "Password": null + }, + "DataDir": "", + "HttpConfig": { + "LetsEncryptDomain": "", + "Address": "0.0.0.0:33071", + "AuthIssuer": ",", + "AuthAudience": "", + "AuthKeysLocation": "" + } } \ No newline at end of file diff --git a/client/testdata/store.json b/client/testdata/store.json index aed56de20..d2c4743b0 100644 --- a/client/testdata/store.json +++ b/client/testdata/store.json @@ -11,6 +11,7 @@ "ExpiresAt": "2321-09-18T20:46:20.005936822+02:00", "Revoked": false, "UsedTimes": 0 + } }, "Network": { @@ -21,7 +22,17 @@ }, "Dns": null }, - "Peers": {} + "Peers": {}, + "Users": { + "edafee4e-63fb-11ec-90d6-0242ac120003": { + "Id": "edafee4e-63fb-11ec-90d6-0242ac120003", + "Role": "admin" + }, + "f4f6d672-63fb-11ec-90d6-0242ac120003": { + "Id": "f4f6d672-63fb-11ec-90d6-0242ac120003", + "Role": "user" + } + } } } } \ No newline at end of file diff --git a/iface/iface.go b/iface/iface.go index 0a6042d08..0f2db4e67 100644 --- a/iface/iface.go +++ b/iface/iface.go @@ -17,8 +17,6 @@ const ( var ( tunIface tun.Device - // todo check after move the WgPort constant to the client - WgPort = 51820 ) // CreateWithUserspace Creates a new Wireguard interface, using wireguard-go userspace implementation @@ -103,7 +101,7 @@ func Exists(iface string) (*bool, error) { // Configure configures a Wireguard interface // The interface must exist before calling this method (e.g. call interface.Create() before) -func Configure(iface string, privateKey string) error { +func Configure(iface string, privateKey string, port int) error { log.Debugf("configuring Wireguard interface %s", iface) @@ -113,12 +111,11 @@ func Configure(iface string, privateKey string) error { return err } fwmark := 0 - p := WgPort config := wgtypes.Config{ PrivateKey: &key, ReplacePeers: false, FirewallMark: &fwmark, - ListenPort: &p, + ListenPort: &port, } return configureDevice(iface, config) @@ -235,7 +232,7 @@ func RemovePeer(iface string, peerKey string) error { return configureDevice(iface, config) } -// Closes the User Space tunnel interface +// CloseWithUserspace closes the User Space tunnel interface func CloseWithUserspace() error { return tunIface.Close() } diff --git a/iface/iface_darwin.go b/iface/iface_darwin.go index e93db7759..d8d5ae57e 100644 --- a/iface/iface_darwin.go +++ b/iface/iface_darwin.go @@ -40,7 +40,7 @@ func addRoute(iface string, ipNet *net.IPNet) error { } // Closes the tunnel interface -func Close() error { +func Close(iFace string) error { name, err := tunIface.Name() if err != nil { return err diff --git a/iface/iface_linux.go b/iface/iface_linux.go index d2ffe88b6..09367e562 100644 --- a/iface/iface_linux.go +++ b/iface/iface_linux.go @@ -1,10 +1,8 @@ package iface import ( - "fmt" log "github.com/sirupsen/logrus" "github.com/vishvananda/netlink" - "golang.zx2c4.com/wireguard/wgctrl" "os" ) @@ -131,34 +129,14 @@ func (w *wgLink) Type() string { return "wireguard" } -// Closes the tunnel interface -func Close() error { +// Close closes the tunnel interface +func Close(iFace string) error { if tunIface != nil { return CloseWithUserspace() } else { - var iface = "" - wg, err := wgctrl.New() - if err != nil { - return err - } - defer wg.Close() - devList, err := wg.Devices() - if err != nil { - return err - } - for _, wgDev := range devList { - // todo check after move the WgPort constant to the client - if wgDev.ListenPort == WgPort { - iface = wgDev.Name - break - } - } - if iface == "" { - return fmt.Errorf("Wireguard Interface not found") - } attrs := netlink.NewLinkAttrs() - attrs.Name = iface + attrs.Name = iFace link := wgLink{ attrs: &attrs, diff --git a/iface/iface_test.go b/iface/iface_test.go index a8a89e32c..f4cfe927d 100644 --- a/iface/iface_test.go +++ b/iface/iface_test.go @@ -14,6 +14,7 @@ import ( const ( key = "0PMI6OkB5JmB+Jj/iWWHekuQRx+bipZirWCWKFXexHc=" peerPubKey = "Ok0mC0qlJyXEPKh2UFIpsI2jG0L7LRpC3sLAusSJ5CQ=" + WgPort = 51820 ) func init() { @@ -29,7 +30,7 @@ func Test_CreateInterface(t *testing.T) { t.Fatal(err) } defer func() { - err = Close() + err = Close(ifaceName) if err != nil { t.Error(err) } @@ -44,13 +45,6 @@ func Test_CreateInterface(t *testing.T) { t.Error(err) } }() - - d, err := wg.Device(ifaceName) - if err != nil { - t.Fatal(err) - } - // todo move the WgPort constant to the client - WgPort = d.ListenPort } func Test_ConfigureInterface(t *testing.T) { ifaceName := "utun1000" @@ -60,13 +54,13 @@ func Test_ConfigureInterface(t *testing.T) { t.Fatal(err) } defer func() { - err = Close() + err = Close(ifaceName) if err != nil { t.Error(err) } }() - err = Configure(ifaceName, key) + err = Configure(ifaceName, key, WgPort) if err != nil { t.Fatal(err) } @@ -99,12 +93,12 @@ func Test_UpdatePeer(t *testing.T) { t.Fatal(err) } defer func() { - err = Close() + err = Close(ifaceName) if err != nil { t.Error(err) } }() - err = Configure(ifaceName, key) + err = Configure(ifaceName, key, WgPort) if err != nil { t.Fatal(err) } @@ -151,12 +145,12 @@ func Test_UpdatePeerEndpoint(t *testing.T) { t.Fatal(err) } defer func() { - err = Close() + err = Close(ifaceName) if err != nil { t.Error(err) } }() - err = Configure(ifaceName, key) + err = Configure(ifaceName, key, WgPort) if err != nil { t.Fatal(err) } @@ -192,12 +186,12 @@ func Test_RemovePeer(t *testing.T) { t.Fatal(err) } defer func() { - err = Close() + err = Close(ifaceName) if err != nil { t.Error(err) } }() - err = Configure(ifaceName, key) + err = Configure(ifaceName, key, WgPort) if err != nil { t.Fatal(err) } @@ -235,13 +229,7 @@ func Test_Close(t *testing.T) { } }() - d, err := wg.Device(ifaceName) - if err != nil { - t.Fatal(err) - } - // todo move the WgPort constant to the client - WgPort = d.ListenPort - err = Close() + err = Close(ifaceName) if err != nil { t.Fatal(err) } diff --git a/iface/iface_windows.go b/iface/iface_windows.go index ddf279af8..a85e062cf 100644 --- a/iface/iface_windows.go +++ b/iface/iface_windows.go @@ -41,6 +41,6 @@ func getUAPI(iface string) (net.Listener, error) { } // Closes the tunnel interface -func Close() error { +func Close(iFace string) error { return CloseWithUserspace() } diff --git a/signal/client/client.go b/signal/client/client.go index c086de54b..bfa3f4f8d 100644 --- a/signal/client/client.go +++ b/signal/client/client.go @@ -45,6 +45,10 @@ type Client struct { status Status } +func (c *Client) StreamConnected() bool { + return c.status == StreamConnected +} + func (c *Client) GetStatus() Status { return c.status } @@ -117,7 +121,7 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) error { c.notifyStreamDisconnected() log.Debugf("signal connection state %v", c.signalConn.GetState()) - if !c.ready() { + if !c.Ready() { return fmt.Errorf("no connection to signal") } @@ -204,9 +208,9 @@ func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, return stream, nil } -// ready indicates whether the client is okay and ready to be used +// Ready indicates whether the client is okay and Ready to be used // for now it just checks whether gRPC connection to the service is in state Ready -func (c *Client) ready() bool { +func (c *Client) Ready() bool { return c.signalConn.GetState() == connectivity.Ready || c.signalConn.GetState() == connectivity.Idle } @@ -228,7 +232,7 @@ func (c *Client) WaitStreamConnected() { // The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange // Client.connWg can be used to wait func (c *Client) SendToStream(msg *proto.EncryptedMessage) error { - if !c.ready() { + if !c.Ready() { return fmt.Errorf("no connection to signal") } if c.stream == nil { @@ -287,7 +291,7 @@ func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, er // Send sends a message to the remote Peer through the Signal Exchange. func (c *Client) Send(msg *proto.Message) error { - if !c.ready() { + if !c.Ready() { return fmt.Errorf("no connection to signal") } @@ -295,9 +299,11 @@ func (c *Client) Send(msg *proto.Message) error { if err != nil { return err } - _, err = c.realClient.Send(context.TODO(), encryptedMessage) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err = c.realClient.Send(ctx, encryptedMessage) if err != nil { - //log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) return err } diff --git a/signal/cmd/run.go b/signal/cmd/run.go index d06199692..9e9936366 100644 --- a/signal/cmd/run.go +++ b/signal/cmd/run.go @@ -74,10 +74,6 @@ var ( log.Fatalf("failed to listen: %v", err) } - if err != nil { - log.Fatalf("failed to listen: %v", err) - } - proto.RegisterSignalExchangeServer(grpcServer, server.NewServer()) log.Printf("started server: localhost:%v", signalPort) if err := grpcServer.Serve(lis); err != nil { diff --git a/signal/server/signal.go b/signal/server/signal.go index c33287ab0..d2857dd7c 100644 --- a/signal/server/signal.go +++ b/signal/server/signal.go @@ -29,7 +29,7 @@ func NewServer() *Server { func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.EncryptedMessage, error) { if !s.registry.IsPeerRegistered(msg.Key) { - return nil, fmt.Errorf("unknown peer %s", msg.Key) + return nil, fmt.Errorf("peer %s is not registered", msg.Key) } if dstPeer, found := s.registry.Get(msg.RemoteKey); found {