mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 00:06:38 +00:00
feature: add base for the signal websocket client
This commit is contained in:
@@ -72,7 +72,7 @@ func createEngineConfig(key wgtypes.Key, config *internal.Config, peerConfig *mg
|
||||
}
|
||||
|
||||
// connectToSignal creates Signal Service client and established a connection
|
||||
func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig, ourPrivateKey wgtypes.Key) (*signal.Client, error) {
|
||||
func connectToSignal(ctx context.Context, wtConfig *mgmProto.WiretrusteeConfig, ourPrivateKey wgtypes.Key) (signal.Client, error) {
|
||||
var sigTLSEnabled bool
|
||||
if wtConfig.Signal.Protocol == mgmProto.HostConfig_HTTPS {
|
||||
sigTLSEnabled = true
|
||||
|
||||
@@ -35,7 +35,7 @@ type EngineConfig struct {
|
||||
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
|
||||
type Engine struct {
|
||||
// signal is a Signal Service client
|
||||
signal *signal.Client
|
||||
signal signal.Client
|
||||
// mgmClient is a Management Service client
|
||||
mgmClient *mgm.Client
|
||||
// conns is a collection of remote peer connections indexed by local public key of the remote peers
|
||||
@@ -68,7 +68,7 @@ type Peer struct {
|
||||
}
|
||||
|
||||
// NewEngine creates a new Connection Engine
|
||||
func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc, ctx context.Context) *Engine {
|
||||
func NewEngine(signalClient signal.Client, mgmClient *mgm.Client, config *EngineConfig, cancel context.CancelFunc, ctx context.Context) *Engine {
|
||||
return &Engine{
|
||||
signal: signalClient,
|
||||
mgmClient: mgmClient,
|
||||
@@ -258,7 +258,7 @@ func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client) error {
|
||||
func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client) error {
|
||||
err := s.Send(&sProto.Message{
|
||||
Key: myKey.PublicKey().String(),
|
||||
RemoteKey: remoteKey.String(),
|
||||
@@ -276,7 +276,7 @@ func signalCandidate(candidate ice.Candidate, myKey wgtypes.Key, remoteKey wgtyp
|
||||
return nil
|
||||
}
|
||||
|
||||
func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s *signal.Client, isAnswer bool) error {
|
||||
func signalAuth(uFrag string, pwd string, myKey wgtypes.Key, remoteKey wgtypes.Key, s signal.Client, isAnswer bool) error {
|
||||
|
||||
var t sProto.Body_Type
|
||||
if isAnswer {
|
||||
|
||||
@@ -1,271 +1,20 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/wiretrustee/wiretrustee/encryption"
|
||||
"github.com/wiretrustee/wiretrustee/signal/proto"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
"io"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// A set of tools to exchange connection details (Wireguard endpoints) with the remote peer.
|
||||
|
||||
// Client Wraps the Signal Exchange Service gRpc client
|
||||
type Client struct {
|
||||
key wgtypes.Key
|
||||
realClient proto.SignalExchangeClient
|
||||
signalConn *grpc.ClientConn
|
||||
ctx context.Context
|
||||
stream proto.SignalExchange_ConnectStreamClient
|
||||
//waiting group to notify once stream is connected
|
||||
connWg *sync.WaitGroup //todo use a channel instead??
|
||||
}
|
||||
|
||||
// Close Closes underlying connections to the Signal Exchange
|
||||
func (c *Client) Close() error {
|
||||
return c.signalConn.Close()
|
||||
}
|
||||
|
||||
// NewClient creates a new Signal client
|
||||
func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (*Client, error) {
|
||||
|
||||
transportOption := grpc.WithInsecure()
|
||||
|
||||
if tlsEnabled {
|
||||
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
|
||||
}
|
||||
|
||||
sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(
|
||||
sigCtx,
|
||||
addr,
|
||||
transportOption,
|
||||
grpc.WithBlock(),
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 15 * time.Second,
|
||||
Timeout: 10 * time.Second,
|
||||
}))
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("failed to connect to the signalling server %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
return &Client{
|
||||
realClient: proto.NewSignalExchangeClient(conn),
|
||||
ctx: ctx,
|
||||
signalConn: conn,
|
||||
key: key,
|
||||
connWg: &wg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
//defaultBackoff is a basic backoff mechanism for general issues
|
||||
func defaultBackoff(ctx context.Context) backoff.BackOff {
|
||||
return backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 15 * time.Minute,
|
||||
MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
|
||||
}
|
||||
|
||||
// Receive Connects to the Signal Exchange message stream and starts receiving messages.
|
||||
// The messages will be handled by msgHandler function provided.
|
||||
// This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
|
||||
// The key is the identifier of our Peer (could be Wireguard public key)
|
||||
func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
|
||||
c.connWg.Add(1)
|
||||
go func() {
|
||||
|
||||
var backOff = defaultBackoff(c.ctx)
|
||||
|
||||
operation := func() error {
|
||||
|
||||
stream, err := c.connect(c.key.PublicKey().String())
|
||||
if err != nil {
|
||||
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
||||
c.connWg.Add(1)
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.receive(stream, msgHandler)
|
||||
if err != nil {
|
||||
backOff.Reset()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
if err != nil {
|
||||
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
|
||||
c.stream = nil
|
||||
|
||||
// add key fingerprint to the request header to be identified on the server side
|
||||
md := metadata.New(map[string]string{proto.HeaderId: key})
|
||||
ctx := metadata.NewOutgoingContext(c.ctx, md)
|
||||
|
||||
stream, err := c.realClient.ConnectStream(ctx, grpc.WaitForReady(true))
|
||||
|
||||
c.stream = stream
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// blocks
|
||||
header, err := c.stream.Header()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
registered := header.Get(proto.HeaderRegistered)
|
||||
if len(registered) == 0 {
|
||||
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
|
||||
}
|
||||
//connection established we are good to use the stream
|
||||
c.connWg.Done()
|
||||
|
||||
log.Infof("connected to the Signal Exchange Stream")
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
// WaitConnected waits until the client is connected to the message stream
|
||||
func (c *Client) WaitConnected() {
|
||||
c.connWg.Wait()
|
||||
}
|
||||
|
||||
// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server
|
||||
// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange
|
||||
// Client.connWg can be used to wait
|
||||
func (c *Client) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
|
||||
if c.stream == nil {
|
||||
return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages")
|
||||
}
|
||||
|
||||
err := c.stream.Send(msg)
|
||||
if err != nil {
|
||||
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
|
||||
func (c *Client) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
|
||||
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body := &proto.Body{}
|
||||
err = encryption.DecryptMessage(remoteKey, c.key, msg.GetBody(), body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &proto.Message{
|
||||
Key: msg.Key,
|
||||
RemoteKey: msg.RemoteKey,
|
||||
Body: body,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// encryptMessage encrypts the body of the msg using Wireguard private key and Remote peer's public key
|
||||
func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, error) {
|
||||
|
||||
remoteKey, err := wgtypes.ParseKey(msg.RemoteKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
encryptedBody, err := encryption.EncryptMessage(remoteKey, c.key, msg.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &proto.EncryptedMessage{
|
||||
Key: msg.GetKey(),
|
||||
RemoteKey: msg.GetRemoteKey(),
|
||||
Body: encryptedBody,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Send sends a message to the remote Peer through the Signal Exchange.
|
||||
func (c *Client) Send(msg *proto.Message) error {
|
||||
|
||||
encryptedMessage, err := c.encryptMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.realClient.Send(context.TODO(), encryptedMessage)
|
||||
if err != nil {
|
||||
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// receive receives messages from other peers coming through the Signal Exchange
|
||||
func (c *Client) receive(stream proto.SignalExchange_ConnectStreamClient,
|
||||
msgHandler func(msg *proto.Message) error) error {
|
||||
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
|
||||
log.Warnf("stream canceled (usually indicates shutdown)")
|
||||
return err
|
||||
} else if s.Code() == codes.Unavailable {
|
||||
log.Warnf("server has been stopped")
|
||||
return err
|
||||
} else if err == io.EOF {
|
||||
log.Warnf("stream closed by server")
|
||||
return err
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debugf("received a new message from Peer [fingerprint: %s]", msg.Key)
|
||||
|
||||
decryptedMessage, err := c.decryptMessage(msg)
|
||||
if err != nil {
|
||||
log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
|
||||
}
|
||||
|
||||
err = msgHandler(decryptedMessage)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
|
||||
//todo send something??
|
||||
}
|
||||
}
|
||||
type Client interface {
|
||||
Receive(msgHandler func(msg *proto.Message) error)
|
||||
Close() error
|
||||
Send(msg *proto.Message) error
|
||||
SendToStream(msg *proto.EncryptedMessage) error
|
||||
WaitConnected()
|
||||
}
|
||||
|
||||
// UnMarshalCredential parses the credentials from the message and returns a Credential instance
|
||||
|
||||
266
signal/client/grpc_client.go
Normal file
266
signal/client/grpc_client.go
Normal file
@@ -0,0 +1,266 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/wiretrustee/wiretrustee/encryption"
|
||||
"github.com/wiretrustee/wiretrustee/signal/proto"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// GrpcClient Wraps the Signal Exchange Service gRpc client
|
||||
type GrpcClient struct {
|
||||
key wgtypes.Key
|
||||
realClient proto.SignalExchangeClient
|
||||
signalConn *grpc.ClientConn
|
||||
ctx context.Context
|
||||
stream proto.SignalExchange_ConnectStreamClient
|
||||
//waiting group to notify once stream is connected
|
||||
connWg *sync.WaitGroup //todo use a channel instead??
|
||||
}
|
||||
|
||||
// Close Closes underlying connections to the Signal Exchange
|
||||
func (c *GrpcClient) Close() error {
|
||||
return c.signalConn.Close()
|
||||
}
|
||||
|
||||
// NewClient creates a new Signal client
|
||||
func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (*GrpcClient, error) {
|
||||
|
||||
transportOption := grpc.WithInsecure()
|
||||
|
||||
if tlsEnabled {
|
||||
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
|
||||
}
|
||||
|
||||
sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(
|
||||
sigCtx,
|
||||
addr,
|
||||
transportOption,
|
||||
grpc.WithBlock(),
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 15 * time.Second,
|
||||
Timeout: 10 * time.Second,
|
||||
}))
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("failed to connect to the signalling server %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
return &GrpcClient{
|
||||
realClient: proto.NewSignalExchangeClient(conn),
|
||||
ctx: ctx,
|
||||
signalConn: conn,
|
||||
key: key,
|
||||
connWg: &wg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
//defaultBackoff is a basic backoff mechanism for general issues
|
||||
func defaultBackoff(ctx context.Context) backoff.BackOff {
|
||||
return backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 15 * time.Minute,
|
||||
MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
|
||||
}
|
||||
|
||||
// Receive Connects to the Signal Exchange message stream and starts receiving messages.
|
||||
// The messages will be handled by msgHandler function provided.
|
||||
// This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
|
||||
// The key is the identifier of our Peer (could be Wireguard public key)
|
||||
func (c *GrpcClient) Receive(msgHandler func(msg *proto.Message) error) {
|
||||
c.connWg.Add(1)
|
||||
go func() {
|
||||
|
||||
var backOff = defaultBackoff(c.ctx)
|
||||
|
||||
operation := func() error {
|
||||
|
||||
stream, err := c.connect(c.key.PublicKey().String())
|
||||
if err != nil {
|
||||
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
||||
c.connWg.Add(1)
|
||||
return err
|
||||
}
|
||||
|
||||
err = c.receive(stream, msgHandler)
|
||||
if err != nil {
|
||||
backOff.Reset()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
if err != nil {
|
||||
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (c *GrpcClient) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
|
||||
c.stream = nil
|
||||
|
||||
// add key fingerprint to the request header to be identified on the server side
|
||||
md := metadata.New(map[string]string{proto.HeaderId: key})
|
||||
ctx := metadata.NewOutgoingContext(c.ctx, md)
|
||||
|
||||
stream, err := c.realClient.ConnectStream(ctx, grpc.WaitForReady(true))
|
||||
|
||||
c.stream = stream
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// blocks
|
||||
header, err := c.stream.Header()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
registered := header.Get(proto.HeaderRegistered)
|
||||
if len(registered) == 0 {
|
||||
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
|
||||
}
|
||||
//connection established we are good to use the stream
|
||||
c.connWg.Done()
|
||||
|
||||
log.Infof("connected to the Signal Exchange Stream")
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
// WaitConnected waits until the client is connected to the message stream
|
||||
func (c *GrpcClient) WaitConnected() {
|
||||
c.connWg.Wait()
|
||||
}
|
||||
|
||||
// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server
|
||||
// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange
|
||||
// Client.connWg can be used to wait
|
||||
func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
|
||||
if c.stream == nil {
|
||||
return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages")
|
||||
}
|
||||
|
||||
err := c.stream.Send(msg)
|
||||
if err != nil {
|
||||
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
|
||||
func (c *GrpcClient) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
|
||||
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body := &proto.Body{}
|
||||
err = encryption.DecryptMessage(remoteKey, c.key, msg.GetBody(), body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &proto.Message{
|
||||
Key: msg.Key,
|
||||
RemoteKey: msg.RemoteKey,
|
||||
Body: body,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// encryptMessage encrypts the body of the msg using Wireguard private key and Remote peer's public key
|
||||
func (c *GrpcClient) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, error) {
|
||||
|
||||
remoteKey, err := wgtypes.ParseKey(msg.RemoteKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
encryptedBody, err := encryption.EncryptMessage(remoteKey, c.key, msg.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &proto.EncryptedMessage{
|
||||
Key: msg.GetKey(),
|
||||
RemoteKey: msg.GetRemoteKey(),
|
||||
Body: encryptedBody,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Send sends a message to the remote Peer through the Signal Exchange.
|
||||
func (c *GrpcClient) Send(msg *proto.Message) error {
|
||||
|
||||
encryptedMessage, err := c.encryptMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.realClient.Send(context.TODO(), encryptedMessage)
|
||||
if err != nil {
|
||||
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// receive receives messages from other peers coming through the Signal Exchange
|
||||
func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient,
|
||||
msgHandler func(msg *proto.Message) error) error {
|
||||
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
|
||||
log.Warnf("stream canceled (usually indicates shutdown)")
|
||||
return err
|
||||
} else if s.Code() == codes.Unavailable {
|
||||
log.Warnf("server has been stopped")
|
||||
return err
|
||||
} else if err == io.EOF {
|
||||
log.Warnf("stream closed by server")
|
||||
return err
|
||||
} else if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debugf("received a new message from Peer [fingerprint: %s]", msg.Key)
|
||||
|
||||
decryptedMessage, err := c.decryptMessage(msg)
|
||||
if err != nil {
|
||||
log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
|
||||
}
|
||||
|
||||
err = msgHandler(decryptedMessage)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
|
||||
//todo send something??
|
||||
}
|
||||
}
|
||||
}
|
||||
38
signal/client/ws_client.go
Normal file
38
signal/client/ws_client.go
Normal file
@@ -0,0 +1,38 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/wiretrustee/wiretrustee/signal/proto"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
)
|
||||
|
||||
//WebsocketClient is a Signal server websocket client (alternative to the original gRPC Client)
|
||||
type WebsocketClient struct {
|
||||
key wgtypes.Key
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func NewWebsocketClient(ctx context.Context, addr string, wgPrivateKey wgtypes.Key) (*WebsocketClient, error) {
|
||||
return &WebsocketClient{
|
||||
key: wgPrivateKey,
|
||||
ctx: ctx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *WebsocketClient) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *WebsocketClient) Receive(msgHandler func(msg *proto.Message) error) {
|
||||
|
||||
}
|
||||
func (c *WebsocketClient) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
return nil
|
||||
}
|
||||
func (c *WebsocketClient) Send(msg *proto.Message) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *WebsocketClient) WaitConnected() {
|
||||
|
||||
}
|
||||
@@ -22,6 +22,7 @@ func NewWebsocketChannel(conn *websocket.Conn) *WebsocketChannel {
|
||||
}
|
||||
|
||||
func (c *WebsocketChannel) Send(msg *proto.EncryptedMessage) error {
|
||||
//todo
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user