diff --git a/.goreleaser.yaml b/.goreleaser.yaml index fbc44da2f..18bc8aaeb 100644 --- a/.goreleaser.yaml +++ b/.goreleaser.yaml @@ -221,7 +221,7 @@ uploads: ids: - deb mode: archive - target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main + target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ if .Arm }}armhf{{ else }}{{ .Arch }}{{ end }};deb.package= username: dev@wiretrustee.com method: PUT - name: yum diff --git a/client/cmd/up.go b/client/cmd/up.go index 2f21c0b61..cc7667d9b 100644 --- a/client/cmd/up.go +++ b/client/cmd/up.go @@ -123,8 +123,8 @@ func runClient() error { InitialInterval: time.Second, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, - MaxInterval: time.Hour, - MaxElapsedTime: 24 * 3 * time.Hour, + MaxInterval: 10 * time.Second, + MaxElapsedTime: 24 * 3 * time.Hour, //stop the client after 3 days trying (must be a huge problem, e.g permission denied) Stop: backoff.Stop, Clock: backoff.SystemClock, } diff --git a/client/internal/engine.go b/client/internal/engine.go index 1cca20bfc..1f4e65064 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "github.com/cenkalti/backoff/v4" - ice "github.com/pion/ice/v2" + "github.com/pion/ice/v2" log "github.com/sirupsen/logrus" "github.com/wiretrustee/wiretrustee/iface" mgm "github.com/wiretrustee/wiretrustee/management/client" @@ -142,7 +142,7 @@ func (e *Engine) initializePeer(peer Peer) { RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, MaxInterval: 5 * time.Second, - MaxElapsedTime: time.Duration(0), //never stop + MaxElapsedTime: 0, //never stop Stop: backoff.Stop, Clock: backoff.SystemClock, }, e.ctx) @@ -157,8 +157,7 @@ func (e *Engine) initializePeer(peer Peer) { } if err != nil { - log.Warnln(err) - log.Debugf("retrying connection because of error: %s", err.Error()) + log.Infof("retrying connection because of error: %s", err.Error()) return err } return nil @@ -332,6 +331,8 @@ func (e *Engine) receiveManagementEvents() { return nil }) if err != nil { + // happens if management is unavailable for a long time. + // We want to cancel the operation of the whole client e.cancel() return } @@ -414,68 +415,77 @@ func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error { // receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers func (e *Engine) receiveSignalEvents() { - // connect to a stream of messages coming from the signal server - e.signal.Receive(func(msg *sProto.Message) error { - e.syncMsgMux.Lock() - defer e.syncMsgMux.Unlock() + go func() { + // connect to a stream of messages coming from the signal server + err := e.signal.Receive(func(msg *sProto.Message) error { - conn := e.conns[msg.Key] - if conn == nil { - return fmt.Errorf("wrongly addressed message %s", msg.Key) - } + e.syncMsgMux.Lock() + defer e.syncMsgMux.Unlock() - if conn.Config.RemoteWgKey.String() != msg.Key { - return fmt.Errorf("unknown peer %s", msg.Key) - } - - switch msg.GetBody().Type { - case sProto.Body_OFFER: - remoteCred, err := signal.UnMarshalCredential(msg) - if err != nil { - return err + conn := e.conns[msg.Key] + if conn == nil { + return fmt.Errorf("wrongly addressed message %s", msg.Key) } - err = conn.OnOffer(IceCredentials{ - uFrag: remoteCred.UFrag, - pwd: remoteCred.Pwd, - }) - if err != nil { - return err + if conn.Config.RemoteWgKey.String() != msg.Key { + return fmt.Errorf("unknown peer %s", msg.Key) + } + + switch msg.GetBody().Type { + case sProto.Body_OFFER: + remoteCred, err := signal.UnMarshalCredential(msg) + if err != nil { + return err + } + err = conn.OnOffer(IceCredentials{ + uFrag: remoteCred.UFrag, + pwd: remoteCred.Pwd, + }) + + if err != nil { + return err + } + + return nil + case sProto.Body_ANSWER: + remoteCred, err := signal.UnMarshalCredential(msg) + if err != nil { + return err + } + err = conn.OnAnswer(IceCredentials{ + uFrag: remoteCred.UFrag, + pwd: remoteCred.Pwd, + }) + + if err != nil { + return err + } + + case sProto.Body_CANDIDATE: + + candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload) + if err != nil { + log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err) + return err + } + + err = conn.OnRemoteCandidate(candidate) + if err != nil { + log.Errorf("error handling CANDIATE from %s", msg.Key) + return err + } } return nil - case sProto.Body_ANSWER: - remoteCred, err := signal.UnMarshalCredential(msg) - if err != nil { - return err - } - err = conn.OnAnswer(IceCredentials{ - uFrag: remoteCred.UFrag, - pwd: remoteCred.Pwd, - }) - - if err != nil { - return err - } - - case sProto.Body_CANDIDATE: - - candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload) - if err != nil { - log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err) - return err - } - - err = conn.OnRemoteCandidate(candidate) - if err != nil { - log.Errorf("error handling CANDIATE from %s", msg.Key) - return err - } + }) + if err != nil { + // happens if signal is unavailable for a long time. + // We want to cancel the operation of the whole client + e.cancel() + return } + }() - return nil - }) - - e.signal.WaitConnected() + e.signal.WaitStreamConnected() } diff --git a/management/client/client.go b/management/client/client.go index 51a6b7b87..891a9f980 100644 --- a/management/client/client.go +++ b/management/client/client.go @@ -3,6 +3,7 @@ package client import ( "context" "crypto/tls" + "fmt" "github.com/cenkalti/backoff/v4" log "github.com/sirupsen/logrus" "github.com/wiretrustee/wiretrustee/client/system" @@ -10,6 +11,7 @@ import ( "github.com/wiretrustee/wiretrustee/management/proto" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "google.golang.org/grpc" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "io" @@ -70,13 +72,19 @@ func defaultBackoff(ctx context.Context) backoff.BackOff { InitialInterval: 800 * time.Millisecond, RandomizationFactor: backoff.DefaultRandomizationFactor, Multiplier: backoff.DefaultMultiplier, - MaxInterval: 15 * time.Minute, - MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client + MaxInterval: 10 * time.Second, + MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client Stop: backoff.Stop, Clock: backoff.SystemClock, }, ctx) } +// ready indicates whether the client is okay and ready to be used +// for now it just checks whether gRPC connection to the service is ready +func (c *Client) ready() bool { + return c.conn.GetState() == connectivity.Ready +} + // Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages // Blocking request. The result will be sent via msgHandler callback function func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error { @@ -85,6 +93,12 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error { operation := func() error { + log.Debugf("management connection state %v", c.conn.GetState()) + + if !c.ready() { + return fmt.Errorf("no connection to management") + } + // todo we already have it since we did the Login, maybe cache it locally? serverPubKey, err := c.GetServerPublicKey() if err != nil { @@ -98,7 +112,7 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error { return err } - log.Infof("connected to the Management Service Stream") + log.Infof("connected to the Management Service stream") // blocking until error err = c.receiveEvents(stream, *serverPubKey, msgHandler) @@ -139,7 +153,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server for { update, err := stream.Recv() if err == io.EOF { - log.Errorf("managment stream was closed: %s", err) + log.Errorf("Management stream has been closed by server: %s", err) return err } if err != nil { @@ -165,6 +179,10 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server // GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server) func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) { + if !c.ready() { + return nil, fmt.Errorf("no connection to management") + } + mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) //todo make a general setting defer cancel() resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{}) @@ -181,6 +199,9 @@ func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) { } func (c *Client) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) { + if !c.ready() { + return nil, fmt.Errorf("no connection to management") + } loginReq, err := encryption.EncryptMessage(serverKey, c.key, req) if err != nil { log.Errorf("failed to encrypt message: %s", err) diff --git a/signal/client/client_test.go b/signal/client/client_test.go index 2ac5b03ee..55aeaf2c6 100644 --- a/signal/client/client_test.go +++ b/signal/client/client_test.go @@ -36,7 +36,7 @@ var _ = Describe("Client", func() { }) Describe("Exchanging messages", func() { - Context("between connected peers", func() { + Context("between streamConnected peers", func() { It("should be successful", func() { var msgReceived sync.WaitGroup @@ -48,30 +48,42 @@ var _ = Describe("Client", func() { // connect PeerA to Signal keyA, _ := wgtypes.GenerateKey() clientA := createSignalClient(addr, keyA) - clientA.Receive(func(msg *sigProto.Message) error { - receivedOnA = msg.GetBody().GetPayload() - msgReceived.Done() - return nil - }) - clientA.WaitConnected() + go func() { + err := clientA.Receive(func(msg *sigProto.Message) error { + receivedOnA = msg.GetBody().GetPayload() + msgReceived.Done() + return nil + }) + if err != nil { + return + } + }() + clientA.WaitStreamConnected() // connect PeerB to Signal keyB, _ := wgtypes.GenerateKey() clientB := createSignalClient(addr, keyB) - clientB.Receive(func(msg *sigProto.Message) error { - receivedOnB = msg.GetBody().GetPayload() - err := clientB.Send(&sigProto.Message{ - Key: keyB.PublicKey().String(), - RemoteKey: keyA.PublicKey().String(), - Body: &sigProto.Body{Payload: "pong"}, + + go func() { + err := clientB.Receive(func(msg *sigProto.Message) error { + receivedOnB = msg.GetBody().GetPayload() + err := clientB.Send(&sigProto.Message{ + Key: keyB.PublicKey().String(), + RemoteKey: keyA.PublicKey().String(), + Body: &sigProto.Body{Payload: "pong"}, + }) + if err != nil { + Fail("failed sending a message to PeerA") + } + msgReceived.Done() + return nil }) if err != nil { - Fail("failed sending a message to PeerA") + return } - msgReceived.Done() - return nil - }) - clientB.WaitConnected() + }() + + clientB.WaitStreamConnected() // PeerA initiates ping-pong err := clientA.Send(&sigProto.Message{ @@ -100,11 +112,15 @@ var _ = Describe("Client", func() { key, _ := wgtypes.GenerateKey() client := createSignalClient(addr, key) - client.Receive(func(msg *sigProto.Message) error { - return nil - }) - client.WaitConnected() - + go func() { + err := client.Receive(func(msg *sigProto.Message) error { + return nil + }) + if err != nil { + return + } + }() + client.WaitStreamConnected() Expect(client).NotTo(BeNil()) }) })