diff --git a/client/cmd/up.go b/client/cmd/up.go index 48141d490..2f21c0b61 100644 --- a/client/cmd/up.go +++ b/client/cmd/up.go @@ -72,7 +72,7 @@ func createEngineConfig(key wgtypes.Key, config *internal.Config, peerConfig *mg } // connectToSignal creates Signal Service client and established a connection -func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig, ourPrivateKey wgtypes.Key) (*signal.Client, error) { +func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig, ourPrivateKey wgtypes.Key) (signal.Client, error) { var sigTLSEnabled bool if wtConfig.Signal.Protocol == mgmProto.HostConfig_HTTPS { sigTLSEnabled = true diff --git a/client/internal/engine.go b/client/internal/engine.go index 1fd011635..1cca20bfc 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -35,7 +35,7 @@ type EngineConfig struct { // Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers. type Engine struct { // signal is a Signal Service client - signal *signal.Client + signal signal.Client // mgmClient is a Management Service client mgmClient *mgm.Client // conns is a collection of remote peer connections indexed by local public key of the remote peers @@ -68,7 +68,7 @@ type Peer struct { } // NewEngine creates a new Connection Engine -func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc, ctx context.Context) *Engine { +func NewEngine(signalClient signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc, ctx context.Context) *Engine { return &Engine{ signal: signalClient, mgmClient: mgmClient, @@ -258,7 +258,7 @@ func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (* return conn, nil } -func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error { +func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client) error { err := s.Send(&sProto.Message{ Key: myKey.PublicKey().String(), RemoteKey: remoteKey.String(), @@ -276,7 +276,7 @@ func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtyp return nil } -func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client, isAnswer bool) error { +func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client, isAnswer bool) error { var t sProto.Body_Type if isAnswer { diff --git a/signal/client/client.go b/signal/client/client.go index 5702a8c1c..741a2089f 100644 --- a/signal/client/client.go +++ b/signal/client/client.go @@ -1,271 +1,20 @@ package client import ( - "context" - "crypto/tls" "fmt" - "github.com/cenkalti/backoff/v4" - log "github.com/sirupsen/logrus" - "github.com/wiretrustee/wiretrustee/encryption" "github.com/wiretrustee/wiretrustee/signal/proto" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/metadata" - "google.golang.org/grpc/status" - "io" "strings" - "sync" - "time" ) // A set of tools to exchange connection details (Wireguard endpoints) with the remote peer. -// Client Wraps the Signal Exchange Service gRpc client -type Client struct { - key wgtypes.Key - realClient proto.SignalExchangeClient - signalConn *grpc.ClientConn - ctx context.Context - stream proto.SignalExchange_ConnectStreamClient - //waiting group to notify once stream is connected - connWg *sync.WaitGroup //todo use a channel instead?? -} - -// Close Closes underlying connections to the Signal Exchange -func (c *Client) Close() error { - return c.signalConn.Close() -} - -// NewClient creates a new Signal client -func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (*Client, error) { - - transportOption := grpc.WithInsecure() - - if tlsEnabled { - transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) - } - - sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second) - defer cancel() - conn, err := grpc.DialContext( - sigCtx, - addr, - transportOption, - grpc.WithBlock(), - grpc.WithKeepaliveParams(keepalive.ClientParameters{ - Time: 15 * time.Second, - Timeout: 10 * time.Second, - })) - - if err != nil { - log.Errorf("failed to connect to the signalling server %v", err) - return nil, err - } - - var wg sync.WaitGroup - return &Client{ - realClient: proto.NewSignalExchangeClient(conn), - ctx: ctx, - signalConn: conn, - key: key, - connWg: &wg, - }, nil -} - -//defaultBackoff is a basic backoff mechanism for general issues -func defaultBackoff(ctx context.Context) backoff.BackOff { - return backoff.WithContext(&backoff.ExponentialBackOff{ - InitialInterval: 800 * time.Millisecond, - RandomizationFactor: backoff.DefaultRandomizationFactor, - Multiplier: backoff.DefaultMultiplier, - MaxInterval: 15 * time.Minute, - MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client - Stop: backoff.Stop, - Clock: backoff.SystemClock, - }, ctx) - -} - -// Receive Connects to the Signal Exchange message stream and starts receiving messages. -// The messages will be handled by msgHandler function provided. -// This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart) -// The key is the identifier of our Peer (could be Wireguard public key) -func (c *Client) Receive(msgHandler func(msg *proto.Message) error) { - c.connWg.Add(1) - go func() { - - var backOff = defaultBackoff(c.ctx) - - operation := func() error { - - stream, err := c.connect(c.key.PublicKey().String()) - if err != nil { - log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) - c.connWg.Add(1) - return err - } - - err = c.receive(stream, msgHandler) - if err != nil { - backOff.Reset() - return err - } - - return nil - } - - err := backoff.Retry(operation, backOff) - if err != nil { - log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err) - return - } - }() -} - -func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) { - c.stream = nil - - // add key fingerprint to the request header to be identified on the server side - md := metadata.New(map[string]string{proto.HeaderId: key}) - ctx := metadata.NewOutgoingContext(c.ctx, md) - - stream, err := c.realClient.ConnectStream(ctx, grpc.WaitForReady(true)) - - c.stream = stream - if err != nil { - return nil, err - } - // blocks - header, err := c.stream.Header() - if err != nil { - return nil, err - } - registered := header.Get(proto.HeaderRegistered) - if len(registered) == 0 { - return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams") - } - //connection established we are good to use the stream - c.connWg.Done() - - log.Infof("connected to the Signal Exchange Stream") - - return stream, nil -} - -// WaitConnected waits until the client is connected to the message stream -func (c *Client) WaitConnected() { - c.connWg.Wait() -} - -// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server -// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange -// Client.connWg can be used to wait -func (c *Client) SendToStream(msg *proto.EncryptedMessage) error { - - if c.stream == nil { - return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages") - } - - err := c.stream.Send(msg) - if err != nil { - log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) - return err - } - - return nil -} - -// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key -func (c *Client) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) { - remoteKey, err := wgtypes.ParseKey(msg.GetKey()) - if err != nil { - return nil, err - } - - body := &proto.Body{} - err = encryption.DecryptMessage(remoteKey, c.key, msg.GetBody(), body) - if err != nil { - return nil, err - } - - return &proto.Message{ - Key: msg.Key, - RemoteKey: msg.RemoteKey, - Body: body, - }, nil -} - -// encryptMessage encrypts the body of the msg using Wireguard private key and Remote peer's public key -func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, error) { - - remoteKey, err := wgtypes.ParseKey(msg.RemoteKey) - if err != nil { - return nil, err - } - - encryptedBody, err := encryption.EncryptMessage(remoteKey, c.key, msg.Body) - if err != nil { - return nil, err - } - - return &proto.EncryptedMessage{ - Key: msg.GetKey(), - RemoteKey: msg.GetRemoteKey(), - Body: encryptedBody, - }, nil -} - -// Send sends a message to the remote Peer through the Signal Exchange. -func (c *Client) Send(msg *proto.Message) error { - - encryptedMessage, err := c.encryptMessage(msg) - if err != nil { - return err - } - _, err = c.realClient.Send(context.TODO(), encryptedMessage) - if err != nil { - log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) - return err - } - - return nil -} - -// receive receives messages from other peers coming through the Signal Exchange -func (c *Client) receive(stream proto.SignalExchange_ConnectStreamClient, - msgHandler func(msg *proto.Message) error) error { - - for { - msg, err := stream.Recv() - if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { - log.Warnf("stream canceled (usually indicates shutdown)") - return err - } else if s.Code() == codes.Unavailable { - log.Warnf("server has been stopped") - return err - } else if err == io.EOF { - log.Warnf("stream closed by server") - return err - } else if err != nil { - return err - } - log.Debugf("received a new message from Peer [fingerprint: %s]", msg.Key) - - decryptedMessage, err := c.decryptMessage(msg) - if err != nil { - log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error()) - } - - err = msgHandler(decryptedMessage) - - if err != nil { - log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error()) - //todo send something?? - } - } +type Client interface { + Receive(msgHandler func(msg *proto.Message) error) + Close() error + Send(msg *proto.Message) error + SendToStream(msg *proto.EncryptedMessage) error + WaitConnected() } // UnMarshalCredential parses the credentials from the message and returns a Credential instance diff --git a/signal/client/grpc_client.go b/signal/client/grpc_client.go new file mode 100644 index 000000000..dcecc37a4 --- /dev/null +++ b/signal/client/grpc_client.go @@ -0,0 +1,266 @@ +package client + +import ( + "context" + "crypto/tls" + "fmt" + "github.com/cenkalti/backoff/v4" + log "github.com/sirupsen/logrus" + "github.com/wiretrustee/wiretrustee/encryption" + "github.com/wiretrustee/wiretrustee/signal/proto" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/status" + "io" + "sync" + "time" +) + +// GrpcClient Wraps the Signal Exchange Service gRpc client +type GrpcClient struct { + key wgtypes.Key + realClient proto.SignalExchangeClient + signalConn *grpc.ClientConn + ctx context.Context + stream proto.SignalExchange_ConnectStreamClient + //waiting group to notify once stream is connected + connWg *sync.WaitGroup //todo use a channel instead?? +} + +// Close Closes underlying connections to the Signal Exchange +func (c *GrpcClient) Close() error { + return c.signalConn.Close() +} + +// NewClient creates a new Signal client +func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (*GrpcClient, error) { + + transportOption := grpc.WithInsecure() + + if tlsEnabled { + transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) + } + + sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second) + defer cancel() + conn, err := grpc.DialContext( + sigCtx, + addr, + transportOption, + grpc.WithBlock(), + grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 15 * time.Second, + Timeout: 10 * time.Second, + })) + + if err != nil { + log.Errorf("failed to connect to the signalling server %v", err) + return nil, err + } + + var wg sync.WaitGroup + return &GrpcClient{ + realClient: proto.NewSignalExchangeClient(conn), + ctx: ctx, + signalConn: conn, + key: key, + connWg: &wg, + }, nil +} + +//defaultBackoff is a basic backoff mechanism for general issues +func defaultBackoff(ctx context.Context) backoff.BackOff { + return backoff.WithContext(&backoff.ExponentialBackOff{ + InitialInterval: 800 * time.Millisecond, + RandomizationFactor: backoff.DefaultRandomizationFactor, + Multiplier: backoff.DefaultMultiplier, + MaxInterval: 15 * time.Minute, + MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client + Stop: backoff.Stop, + Clock: backoff.SystemClock, + }, ctx) + +} + +// Receive Connects to the Signal Exchange message stream and starts receiving messages. +// The messages will be handled by msgHandler function provided. +// This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart) +// The key is the identifier of our Peer (could be Wireguard public key) +func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) { + c.connWg.Add(1) + go func() { + + var backOff = defaultBackoff(c.ctx) + + operation := func() error { + + stream, err := c.connect(c.key.PublicKey().String()) + if err != nil { + log.Warnf("disconnected from the Signal Exchange due to an error: %v", err) + c.connWg.Add(1) + return err + } + + err = c.receive(stream, msgHandler) + if err != nil { + backOff.Reset() + return err + } + + return nil + } + + err := backoff.Retry(operation, backOff) + if err != nil { + log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err) + return + } + }() +} + +func (c *GrpcClient) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) { + c.stream = nil + + // add key fingerprint to the request header to be identified on the server side + md := metadata.New(map[string]string{proto.HeaderId: key}) + ctx := metadata.NewOutgoingContext(c.ctx, md) + + stream, err := c.realClient.ConnectStream(ctx, grpc.WaitForReady(true)) + + c.stream = stream + if err != nil { + return nil, err + } + // blocks + header, err := c.stream.Header() + if err != nil { + return nil, err + } + registered := header.Get(proto.HeaderRegistered) + if len(registered) == 0 { + return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams") + } + //connection established we are good to use the stream + c.connWg.Done() + + log.Infof("connected to the Signal Exchange Stream") + + return stream, nil +} + +// WaitConnected waits until the client is connected to the message stream +func (c *GrpcClient) WaitConnected() { + c.connWg.Wait() +} + +// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server +// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange +// Client.connWg can be used to wait +func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error { + + if c.stream == nil { + return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages") + } + + err := c.stream.Send(msg) + if err != nil { + log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) + return err + } + + return nil +} + +// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key +func (c *GrpcClient) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) { + remoteKey, err := wgtypes.ParseKey(msg.GetKey()) + if err != nil { + return nil, err + } + + body := &proto.Body{} + err = encryption.DecryptMessage(remoteKey, c.key, msg.GetBody(), body) + if err != nil { + return nil, err + } + + return &proto.Message{ + Key: msg.Key, + RemoteKey: msg.RemoteKey, + Body: body, + }, nil +} + +// encryptMessage encrypts the body of the msg using Wireguard private key and Remote peer's public key +func (c *GrpcClient) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, error) { + + remoteKey, err := wgtypes.ParseKey(msg.RemoteKey) + if err != nil { + return nil, err + } + + encryptedBody, err := encryption.EncryptMessage(remoteKey, c.key, msg.Body) + if err != nil { + return nil, err + } + + return &proto.EncryptedMessage{ + Key: msg.GetKey(), + RemoteKey: msg.GetRemoteKey(), + Body: encryptedBody, + }, nil +} + +// Send sends a message to the remote Peer through the Signal Exchange. +func (c *GrpcClient) Send(msg *proto.Message) error { + + encryptedMessage, err := c.encryptMessage(msg) + if err != nil { + return err + } + _, err = c.realClient.Send(context.TODO(), encryptedMessage) + if err != nil { + log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) + return err + } + + return nil +} + +// receive receives messages from other peers coming through the Signal Exchange +func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient, + msgHandler func(msg *proto.Message) error) error { + + for { + msg, err := stream.Recv() + if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled { + log.Warnf("stream canceled (usually indicates shutdown)") + return err + } else if s.Code() == codes.Unavailable { + log.Warnf("server has been stopped") + return err + } else if err == io.EOF { + log.Warnf("stream closed by server") + return err + } else if err != nil { + return err + } + log.Debugf("received a new message from Peer [fingerprint: %s]", msg.Key) + + decryptedMessage, err := c.decryptMessage(msg) + if err != nil { + log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error()) + } + + err = msgHandler(decryptedMessage) + + if err != nil { + log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error()) + //todo send something?? + } + } +} diff --git a/signal/client/ws_client.go b/signal/client/ws_client.go new file mode 100644 index 000000000..e1371f2be --- /dev/null +++ b/signal/client/ws_client.go @@ -0,0 +1,38 @@ +package client + +import ( + "context" + "github.com/wiretrustee/wiretrustee/signal/proto" + "golang.zx2c4.com/wireguard/wgctrl/wgtypes" +) + +//WebsocketClient is a Signal server websocket client (alternative to the original gRPC Client) +type WebsocketClient struct { + key wgtypes.Key + ctx context.Context +} + +func NewWebsocketClient(ctx context.Context, addr string, wgPrivateKey wgtypes.Key) (*WebsocketClient, error) { + return &WebsocketClient{ + key: wgPrivateKey, + ctx: ctx, + }, nil +} + +func (c *WebsocketClient) Close() error { + return nil +} + +func (c *WebsocketClient) Receive(msgHandler func(msg *proto.Message) error) { + +} +func (c *WebsocketClient) SendToStream(msg *proto.EncryptedMessage) error { + return nil +} +func (c *WebsocketClient) Send(msg *proto.Message) error { + return nil +} + +func (c *WebsocketClient) WaitConnected() { + +} diff --git a/signal/peer/peer.go b/signal/peer/peer.go index 87dab8f61..df60ad95d 100644 --- a/signal/peer/peer.go +++ b/signal/peer/peer.go @@ -22,6 +22,7 @@ func NewWebsocketChannel(conn *websocket.Conn) *WebsocketChannel { } func (c *WebsocketChannel) Send(msg *proto.EncryptedMessage) error { + //todo return nil }