diff --git a/connection/connection.go b/connection/connection.go index 5485181d9..068f95592 100644 --- a/connection/connection.go +++ b/connection/connection.go @@ -2,10 +2,10 @@ package connection import ( "context" + "github.com/cenkalti/backoff/v4" "github.com/pion/ice/v2" log "github.com/sirupsen/logrus" "github.com/wiretrustee/wiretrustee/iface" - "github.com/wiretrustee/wiretrustee/util" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "net" "sync" @@ -295,14 +295,12 @@ func (conn *Connection) listenOnConnectionStateChanges() error { log.Debugf("connected to peer %s via selected candidate pair %s", conn.Config.RemoteWgKey.String(), pair) } else if state == ice.ConnectionStateDisconnected || state == ice.ConnectionStateFailed { // todo do we really wanna have a connection restart within connection itself? Think of moving it outside - err := util.Retry(15, time.Second, func() error { + operation := func() error { return conn.Restart() - }, func(err error) { - log.Warnf("failed restarting connection, retrying ... %s", err) - }) - + } + err := backoff.Retry(operation, backoff.NewExponentialBackOff()) if err != nil { - log.Errorf("failed restarting connection %s", err) + log.Errorf("error while communicating with the Signal Exchange %s ", err) return } } diff --git a/go.mod b/go.mod index 46c586846..a070c1519 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/wiretrustee/wiretrustee go 1.16 require ( + github.com/cenkalti/backoff/v4 v4.1.0 github.com/golang/protobuf v1.4.2 github.com/google/nftables v0.0.0-20201230142148-715e31cb3c31 github.com/pion/ice/v2 v2.0.17 diff --git a/signal/client.go b/signal/client.go index 1d237c572..1ca404a1c 100644 --- a/signal/client.go +++ b/signal/client.go @@ -3,9 +3,9 @@ package signal import ( "context" "fmt" + "github.com/cenkalti/backoff/v4" log "github.com/sirupsen/logrus" "github.com/wiretrustee/wiretrustee/signal/proto" - "github.com/wiretrustee/wiretrustee/util" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/keepalive" @@ -42,8 +42,8 @@ func NewClient(addr string, ctx context.Context) (*Client, error) { grpc.WithInsecure(), grpc.WithBlock(), grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 30 * time.Second, - Timeout: 10 * time.Second, + Time: 3 * time.Second, + Timeout: 2 * time.Second, })) if err != nil { @@ -65,13 +65,16 @@ func NewClient(addr string, ctx context.Context) (*Client, error) { func (client *Client) Receive(key string, msgHandler func(msg *proto.Message) error) { client.connWg.Add(1) go func() { - err := util.Retry(15, time.Second, func() error { - return client.connect(key, msgHandler) - }, func(err error) { - log.Warnf("disconnected from the Signal Exchange due to an error %s. Retrying ... ", err) - client.connWg.Add(1) - }) + operation := func() error { + err := client.connect(key, msgHandler) + if err != nil { + log.Warnf("disconnected from the Signal Exchange due to an error %s. Retrying ... ", err) + client.connWg.Add(1) + } + return err + } + err := backoff.Retry(operation, backoff.NewExponentialBackOff()) if err != nil { log.Errorf("error while communicating with the Signal Exchange %s ", err) return @@ -111,11 +114,14 @@ func (client *Client) WaitConnected() { // The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange // Client.connWg can be used to wait func (client *Client) Send(msg *proto.Message) error { - if client.stream == nil { + + _, err := client.realClient.Connect(context.TODO(), msg) + + /*if client.stream == nil { return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages") } - err := client.stream.Send(msg) + err := client.stream.Send(msg)*/ if err != nil { log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) return err diff --git a/signal/signal.go b/signal/signal.go index 2d2fbbeeb..f6a2b7ccc 100644 --- a/signal/signal.go +++ b/signal/signal.go @@ -32,7 +32,7 @@ func NewServer() *SignalExchangeServer { func (s *SignalExchangeServer) Connect(ctx context.Context, msg *proto.Message) (*proto.Message, error) { if _, found := s.registry.Peers[msg.Key]; found { - return nil, fmt.Errorf("unknown peer %s", msg.Key) + return &proto.Message{}, nil } if dstPeer, found := s.registry.Peers[msg.RemoteKey]; found {