mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
[misc] Move shared components to shared directory (#4286)
Moved the following directories: ``` - management/client → shared/management/client - management/domain → shared/management/domain - management/proto → shared/management/proto - signal/client → shared/signal/client - signal/proto → shared/signal/proto - relay/client → shared/relay/client - relay/auth → shared/relay/auth ``` and adjusted import paths
This commit is contained in:
77
shared/signal/client/client.go
Normal file
77
shared/signal/client/client.go
Normal file
@@ -0,0 +1,77 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
|
||||
"github.com/netbirdio/netbird/shared/signal/proto"
|
||||
"github.com/netbirdio/netbird/version"
|
||||
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
)
|
||||
|
||||
// A set of tools to exchange connection details (Wireguard endpoints) with the remote peer.
|
||||
|
||||
// Status is the status of the client
|
||||
type Status string
|
||||
|
||||
const StreamConnected Status = "Connected"
|
||||
const StreamDisconnected Status = "Disconnected"
|
||||
|
||||
const (
|
||||
// DirectCheck indicates support to direct mode checks
|
||||
DirectCheck uint32 = 1
|
||||
)
|
||||
|
||||
type Client interface {
|
||||
io.Closer
|
||||
StreamConnected() bool
|
||||
GetStatus() Status
|
||||
Receive(ctx context.Context, msgHandler func(msg *proto.Message) error) error
|
||||
Ready() bool
|
||||
IsHealthy() bool
|
||||
WaitStreamConnected()
|
||||
SendToStream(msg *proto.EncryptedMessage) error
|
||||
Send(msg *proto.Message) error
|
||||
SetOnReconnectedListener(func())
|
||||
}
|
||||
|
||||
// UnMarshalCredential parses the credentials from the message and returns a Credential instance
|
||||
func UnMarshalCredential(msg *proto.Message) (*Credential, error) {
|
||||
|
||||
credential := strings.Split(msg.GetBody().GetPayload(), ":")
|
||||
if len(credential) != 2 {
|
||||
return nil, fmt.Errorf("error parsing message body %s", msg.Body)
|
||||
}
|
||||
return &Credential{
|
||||
UFrag: credential[0],
|
||||
Pwd: credential[1],
|
||||
}, nil
|
||||
}
|
||||
|
||||
// MarshalCredential marshal a Credential instance and returns a Message object
|
||||
func MarshalCredential(myKey wgtypes.Key, myPort int, remoteKey string, credential *Credential, t proto.Body_Type, rosenpassPubKey []byte, rosenpassAddr string, relaySrvAddress string) (*proto.Message, error) {
|
||||
return &proto.Message{
|
||||
Key: myKey.PublicKey().String(),
|
||||
RemoteKey: remoteKey,
|
||||
Body: &proto.Body{
|
||||
Type: t,
|
||||
Payload: fmt.Sprintf("%s:%s", credential.UFrag, credential.Pwd),
|
||||
WgListenPort: uint32(myPort),
|
||||
NetBirdVersion: version.NetbirdVersion(),
|
||||
RosenpassConfig: &proto.RosenpassConfig{
|
||||
RosenpassPubKey: rosenpassPubKey,
|
||||
RosenpassServerAddr: rosenpassAddr,
|
||||
},
|
||||
RelayServerAddress: relaySrvAddress,
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Credential is an instance of a GrpcClient's Credential
|
||||
type Credential struct {
|
||||
UFrag string
|
||||
Pwd string
|
||||
}
|
||||
13
shared/signal/client/client_suite_test.go
Normal file
13
shared/signal/client/client_suite_test.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSignal(t *testing.T) {
|
||||
RegisterFailHandler(Fail)
|
||||
RunSpecs(t, "Signal Suite")
|
||||
}
|
||||
230
shared/signal/client/client_test.go
Normal file
230
shared/signal/client/client_test.go
Normal file
@@ -0,0 +1,230 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo"
|
||||
. "github.com/onsi/gomega"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"go.opentelemetry.io/otel"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
|
||||
sigProto "github.com/netbirdio/netbird/shared/signal/proto"
|
||||
"github.com/netbirdio/netbird/signal/server"
|
||||
)
|
||||
|
||||
var _ = Describe("GrpcClient", func() {
|
||||
|
||||
var (
|
||||
addr string
|
||||
listener net.Listener
|
||||
server *grpc.Server
|
||||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
server, listener = startSignal()
|
||||
addr = listener.Addr().String()
|
||||
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
server.Stop()
|
||||
listener.Close()
|
||||
})
|
||||
|
||||
Describe("Exchanging messages", func() {
|
||||
Context("between connected peers", func() {
|
||||
It("should be successful", func() {
|
||||
|
||||
var msgReceived sync.WaitGroup
|
||||
msgReceived.Add(2)
|
||||
|
||||
var payloadReceivedOnA string
|
||||
var payloadReceivedOnB string
|
||||
var featuresSupportedReceivedOnA []uint32
|
||||
var featuresSupportedReceivedOnB []uint32
|
||||
|
||||
// connect PeerA to Signal
|
||||
keyA, _ := wgtypes.GenerateKey()
|
||||
clientA := createSignalClient(addr, keyA)
|
||||
go func() {
|
||||
err := clientA.Receive(context.Background(), func(msg *sigProto.Message) error {
|
||||
payloadReceivedOnA = msg.GetBody().GetPayload()
|
||||
featuresSupportedReceivedOnA = msg.GetBody().GetFeaturesSupported()
|
||||
msgReceived.Done()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
clientA.WaitStreamConnected()
|
||||
|
||||
// connect PeerB to Signal
|
||||
keyB, _ := wgtypes.GenerateKey()
|
||||
clientB := createSignalClient(addr, keyB)
|
||||
|
||||
go func() {
|
||||
err := clientB.Receive(context.Background(), func(msg *sigProto.Message) error {
|
||||
payloadReceivedOnB = msg.GetBody().GetPayload()
|
||||
featuresSupportedReceivedOnB = msg.GetBody().GetFeaturesSupported()
|
||||
err := clientB.Send(&sigProto.Message{
|
||||
Key: keyB.PublicKey().String(),
|
||||
RemoteKey: keyA.PublicKey().String(),
|
||||
Body: &sigProto.Body{Payload: "pong"},
|
||||
})
|
||||
if err != nil {
|
||||
Fail("failed sending a message to PeerA")
|
||||
}
|
||||
msgReceived.Done()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
clientB.WaitStreamConnected()
|
||||
|
||||
// PeerA initiates ping-pong
|
||||
err := clientA.Send(&sigProto.Message{
|
||||
Key: keyA.PublicKey().String(),
|
||||
RemoteKey: keyB.PublicKey().String(),
|
||||
Body: &sigProto.Body{Payload: "ping", FeaturesSupported: []uint32{DirectCheck}},
|
||||
})
|
||||
if err != nil {
|
||||
Fail("failed sending a message to PeerB")
|
||||
}
|
||||
|
||||
if waitTimeout(&msgReceived, 3*time.Second) {
|
||||
Fail("test timed out on waiting for peers to exchange messages")
|
||||
}
|
||||
|
||||
Expect(payloadReceivedOnA).To(BeEquivalentTo("pong"))
|
||||
Expect(payloadReceivedOnB).To(BeEquivalentTo("ping"))
|
||||
Expect(featuresSupportedReceivedOnA).To(BeNil())
|
||||
Expect(featuresSupportedReceivedOnB).To(ContainElements([]uint32{DirectCheck}))
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
Describe("Connecting to the Signal stream channel", func() {
|
||||
Context("with a signal client", func() {
|
||||
It("should be successful", func() {
|
||||
|
||||
key, _ := wgtypes.GenerateKey()
|
||||
client := createSignalClient(addr, key)
|
||||
go func() {
|
||||
err := client.Receive(context.Background(), func(msg *sigProto.Message) error {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
client.WaitStreamConnected()
|
||||
Expect(client).NotTo(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
Context("with a raw client and no Id header", func() {
|
||||
It("should fail", func() {
|
||||
|
||||
client := createRawSignalClient(addr)
|
||||
stream, err := client.ConnectStream(context.Background())
|
||||
if err != nil {
|
||||
Fail("error connecting to stream")
|
||||
}
|
||||
|
||||
_, err = stream.Recv()
|
||||
|
||||
Expect(stream).NotTo(BeNil())
|
||||
Expect(err).NotTo(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
Context("with a raw client and with an Id header", func() {
|
||||
It("should be successful", func() {
|
||||
|
||||
md := metadata.New(map[string]string{sigProto.HeaderId: "peer"})
|
||||
ctx := metadata.NewOutgoingContext(context.Background(), md)
|
||||
|
||||
client := createRawSignalClient(addr)
|
||||
stream, err := client.ConnectStream(ctx)
|
||||
|
||||
Expect(stream).NotTo(BeNil())
|
||||
Expect(err).To(BeNil())
|
||||
})
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
})
|
||||
|
||||
func createSignalClient(addr string, key wgtypes.Key) *GrpcClient {
|
||||
var sigTLSEnabled = false
|
||||
client, err := NewClient(context.Background(), addr, key, sigTLSEnabled)
|
||||
if err != nil {
|
||||
Fail("failed creating signal client")
|
||||
}
|
||||
return client
|
||||
}
|
||||
|
||||
func createRawSignalClient(addr string) sigProto.SignalExchangeClient {
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.DialContext(ctx, addr,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 3 * time.Second,
|
||||
Timeout: 2 * time.Second,
|
||||
}))
|
||||
if err != nil {
|
||||
Fail("failed creating raw signal client")
|
||||
}
|
||||
|
||||
return sigProto.NewSignalExchangeClient(conn)
|
||||
}
|
||||
|
||||
func startSignal() (*grpc.Server, net.Listener) {
|
||||
lis, err := net.Listen("tcp", ":0")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
s := grpc.NewServer()
|
||||
srv, err := server.NewServer(context.Background(), otel.Meter(""))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
sigProto.RegisterSignalExchangeServer(s, srv)
|
||||
go func() {
|
||||
if err := s.Serve(lis); err != nil {
|
||||
log.Fatalf("failed to serve: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
return s, lis
|
||||
}
|
||||
|
||||
// waitTimeout waits for the waitgroup for the specified max timeout.
|
||||
// Returns true if waiting timed out.
|
||||
func waitTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
|
||||
c := make(chan struct{})
|
||||
go func() {
|
||||
defer close(c)
|
||||
wg.Wait()
|
||||
}()
|
||||
select {
|
||||
case <-c:
|
||||
return false // completed normally
|
||||
case <-time.After(timeout):
|
||||
return true // timed out
|
||||
}
|
||||
}
|
||||
10
shared/signal/client/go.sum
Normal file
10
shared/signal/client/go.sum
Normal file
@@ -0,0 +1,10 @@
|
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
google.golang.org/grpc v1.64.1/go.mod h1:hiQF4LFZelK2WKaP6W0L92zGHtiQdZxk8CrSdvyjeP0=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
437
shared/signal/client/grpc.go
Normal file
437
shared/signal/client/grpc.go
Normal file
@@ -0,0 +1,437 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
|
||||
"github.com/netbirdio/netbird/encryption"
|
||||
"github.com/netbirdio/netbird/shared/management/client"
|
||||
"github.com/netbirdio/netbird/shared/signal/proto"
|
||||
nbgrpc "github.com/netbirdio/netbird/util/grpc"
|
||||
)
|
||||
|
||||
// ConnStateNotifier is a wrapper interface of the status recorder
|
||||
type ConnStateNotifier interface {
|
||||
MarkSignalDisconnected(error)
|
||||
MarkSignalConnected()
|
||||
}
|
||||
|
||||
// GrpcClient Wraps the Signal Exchange Service gRpc client
|
||||
type GrpcClient struct {
|
||||
key wgtypes.Key
|
||||
realClient proto.SignalExchangeClient
|
||||
signalConn *grpc.ClientConn
|
||||
ctx context.Context
|
||||
stream proto.SignalExchange_ConnectStreamClient
|
||||
// connectedCh used to notify goroutines waiting for the connection to the Signal stream
|
||||
connectedCh chan struct{}
|
||||
mux sync.Mutex
|
||||
// StreamConnected indicates whether this client is StreamConnected to the Signal stream
|
||||
status Status
|
||||
|
||||
connStateCallback ConnStateNotifier
|
||||
connStateCallbackLock sync.RWMutex
|
||||
|
||||
onReconnectedListenerFn func()
|
||||
}
|
||||
|
||||
func (c *GrpcClient) StreamConnected() bool {
|
||||
return c.status == StreamConnected
|
||||
}
|
||||
|
||||
func (c *GrpcClient) GetStatus() Status {
|
||||
return c.status
|
||||
}
|
||||
|
||||
// Close Closes underlying connections to the Signal Exchange
|
||||
func (c *GrpcClient) Close() error {
|
||||
return c.signalConn.Close()
|
||||
}
|
||||
|
||||
// NewClient creates a new Signal client
|
||||
func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (*GrpcClient, error) {
|
||||
var conn *grpc.ClientConn
|
||||
|
||||
operation := func() error {
|
||||
var err error
|
||||
conn, err = nbgrpc.CreateConnection(addr, tlsEnabled)
|
||||
if err != nil {
|
||||
log.Printf("createConnection error: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, nbgrpc.Backoff(ctx))
|
||||
if err != nil {
|
||||
log.Errorf("failed to connect to the signalling server: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
log.Debugf("connected to Signal Service: %v", conn.Target())
|
||||
|
||||
return &GrpcClient{
|
||||
realClient: proto.NewSignalExchangeClient(conn),
|
||||
ctx: ctx,
|
||||
signalConn: conn,
|
||||
key: key,
|
||||
mux: sync.Mutex{},
|
||||
status: StreamDisconnected,
|
||||
connStateCallbackLock: sync.RWMutex{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
// SetConnStateListener set the ConnStateNotifier
|
||||
func (c *GrpcClient) SetConnStateListener(notifier ConnStateNotifier) {
|
||||
c.connStateCallbackLock.Lock()
|
||||
defer c.connStateCallbackLock.Unlock()
|
||||
c.connStateCallback = notifier
|
||||
}
|
||||
|
||||
// defaultBackoff is a basic backoff mechanism for general issues
|
||||
func defaultBackoff(ctx context.Context) backoff.BackOff {
|
||||
return backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: 1,
|
||||
Multiplier: 1.7,
|
||||
MaxInterval: 10 * time.Second,
|
||||
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
}
|
||||
|
||||
// Receive Connects to the Signal Exchange message stream and starts receiving messages.
|
||||
// The messages will be handled by msgHandler function provided.
|
||||
// This function is blocking and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
|
||||
// The connection retry logic will try to reconnect for 30 min and if wasn't successful will propagate the error to the function caller.
|
||||
func (c *GrpcClient) Receive(ctx context.Context, msgHandler func(msg *proto.Message) error) error {
|
||||
|
||||
var backOff = defaultBackoff(ctx)
|
||||
|
||||
operation := func() error {
|
||||
|
||||
c.notifyStreamDisconnected()
|
||||
|
||||
log.Debugf("signal connection state %v", c.signalConn.GetState())
|
||||
connState := c.signalConn.GetState()
|
||||
if connState == connectivity.Shutdown {
|
||||
return backoff.Permanent(fmt.Errorf("connection to signal has been shut down"))
|
||||
} else if !(connState == connectivity.Ready || connState == connectivity.Idle) {
|
||||
c.signalConn.WaitForStateChange(ctx, connState)
|
||||
return fmt.Errorf("connection to signal is not ready and in %s state", connState)
|
||||
}
|
||||
|
||||
// connect to Signal stream identifying ourselves with a public WireGuard key
|
||||
// todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management)
|
||||
ctx, cancelStream := context.WithCancel(ctx)
|
||||
defer cancelStream()
|
||||
stream, err := c.connect(ctx, c.key.PublicKey().String())
|
||||
if err != nil {
|
||||
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
c.notifyStreamConnected()
|
||||
|
||||
log.Infof("connected to the Signal Service stream")
|
||||
c.notifyConnected()
|
||||
// start receiving messages from the Signal stream (from other peers through signal)
|
||||
err = c.receive(stream, msgHandler)
|
||||
if err != nil {
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
|
||||
log.Debugf("signal connection context has been canceled, this usually indicates shutdown")
|
||||
return nil
|
||||
}
|
||||
// we need this reset because after a successful connection and a consequent error, backoff lib doesn't
|
||||
// reset times and next try will start with a long delay
|
||||
backOff.Reset()
|
||||
c.notifyDisconnected(err)
|
||||
log.Warnf("disconnected from the Signal service but will retry silently. Reason: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
if err != nil {
|
||||
log.Errorf("exiting the Signal service connection retry loop due to the unrecoverable error: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func (c *GrpcClient) notifyStreamDisconnected() {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.status = StreamDisconnected
|
||||
}
|
||||
|
||||
func (c *GrpcClient) notifyStreamConnected() {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
c.status = StreamConnected
|
||||
if c.connectedCh != nil {
|
||||
// there are goroutines waiting on this channel -> release them
|
||||
close(c.connectedCh)
|
||||
c.connectedCh = nil
|
||||
}
|
||||
|
||||
if c.onReconnectedListenerFn != nil {
|
||||
c.onReconnectedListenerFn()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GrpcClient) getStreamStatusChan() <-chan struct{} {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
if c.connectedCh == nil {
|
||||
c.connectedCh = make(chan struct{})
|
||||
}
|
||||
return c.connectedCh
|
||||
}
|
||||
|
||||
func (c *GrpcClient) connect(ctx context.Context, key string) (proto.SignalExchange_ConnectStreamClient, error) {
|
||||
c.stream = nil
|
||||
|
||||
// add key fingerprint to the request header to be identified on the server side
|
||||
md := metadata.New(map[string]string{proto.HeaderId: key})
|
||||
metaCtx := metadata.NewOutgoingContext(ctx, md)
|
||||
stream, err := c.realClient.ConnectStream(metaCtx, grpc.WaitForReady(true))
|
||||
c.stream = stream
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// blocks
|
||||
header, err := c.stream.Header()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
registered := header.Get(proto.HeaderRegistered)
|
||||
if len(registered) == 0 {
|
||||
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
|
||||
}
|
||||
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
// Ready indicates whether the client is okay and Ready to be used
|
||||
// for now it just checks whether gRPC connection to the service is in state Ready
|
||||
func (c *GrpcClient) Ready() bool {
|
||||
return c.signalConn.GetState() == connectivity.Ready || c.signalConn.GetState() == connectivity.Idle
|
||||
}
|
||||
|
||||
// IsHealthy probes the gRPC connection and returns false on errors
|
||||
func (c *GrpcClient) IsHealthy() bool {
|
||||
switch c.signalConn.GetState() {
|
||||
case connectivity.TransientFailure:
|
||||
return false
|
||||
case connectivity.Connecting:
|
||||
return true
|
||||
case connectivity.Shutdown:
|
||||
return true
|
||||
case connectivity.Idle:
|
||||
case connectivity.Ready:
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(c.ctx, 1*time.Second)
|
||||
defer cancel()
|
||||
_, err := c.realClient.Send(ctx, &proto.EncryptedMessage{
|
||||
Key: c.key.PublicKey().String(),
|
||||
RemoteKey: "dummy",
|
||||
Body: nil,
|
||||
})
|
||||
if err != nil {
|
||||
c.notifyDisconnected(err)
|
||||
log.Warnf("health check returned: %s", err)
|
||||
return false
|
||||
}
|
||||
c.notifyConnected()
|
||||
return true
|
||||
}
|
||||
|
||||
// WaitStreamConnected waits until the client is connected to the Signal stream
|
||||
func (c *GrpcClient) WaitStreamConnected() {
|
||||
|
||||
if c.status == StreamConnected {
|
||||
return
|
||||
}
|
||||
|
||||
ch := c.getStreamStatusChan()
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
case <-ch:
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GrpcClient) SetOnReconnectedListener(fn func()) {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
|
||||
c.onReconnectedListenerFn = fn
|
||||
}
|
||||
|
||||
// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server
|
||||
// The GrpcClient.Receive method must be called before sending messages to establish initial connection to the Signal Exchange
|
||||
// GrpcClient.connWg can be used to wait
|
||||
func (c *GrpcClient) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
if !c.Ready() {
|
||||
return fmt.Errorf("no connection to signal")
|
||||
}
|
||||
if c.stream == nil {
|
||||
return fmt.Errorf("connection to the Signal Exchange has not been established yet. Please call GrpcClient.Receive before sending messages")
|
||||
}
|
||||
|
||||
err := c.stream.Send(msg)
|
||||
if err != nil {
|
||||
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// decryptMessage decrypts the body of the msg using Wireguard private key and Remote peer's public key
|
||||
func (c *GrpcClient) decryptMessage(msg *proto.EncryptedMessage) (*proto.Message, error) {
|
||||
remoteKey, err := wgtypes.ParseKey(msg.GetKey())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body := &proto.Body{}
|
||||
err = encryption.DecryptMessage(remoteKey, c.key, msg.GetBody(), body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &proto.Message{
|
||||
Key: msg.Key,
|
||||
RemoteKey: msg.RemoteKey,
|
||||
Body: body,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// encryptMessage encrypts the body of the msg using Wireguard private key and Remote peer's public key
|
||||
func (c *GrpcClient) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, error) {
|
||||
|
||||
remoteKey, err := wgtypes.ParseKey(msg.RemoteKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
encryptedBody, err := encryption.EncryptMessage(remoteKey, c.key, msg.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &proto.EncryptedMessage{
|
||||
Key: msg.GetKey(),
|
||||
RemoteKey: msg.GetRemoteKey(),
|
||||
Body: encryptedBody,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// Send sends a message to the remote Peer through the Signal Exchange.
|
||||
func (c *GrpcClient) Send(msg *proto.Message) error {
|
||||
|
||||
if !c.Ready() {
|
||||
return fmt.Errorf("no connection to signal")
|
||||
}
|
||||
|
||||
encryptedMessage, err := c.encryptMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
attemptTimeout := client.ConnectTimeout
|
||||
|
||||
for attempt := 0; attempt < 4; attempt++ {
|
||||
if attempt > 1 {
|
||||
attemptTimeout = time.Duration(attempt) * 5 * time.Second
|
||||
}
|
||||
ctx, cancel := context.WithTimeout(c.ctx, attemptTimeout)
|
||||
|
||||
_, err = c.realClient.Send(ctx, encryptedMessage)
|
||||
|
||||
cancel()
|
||||
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.Canceled {
|
||||
return err
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// receive receives messages from other peers coming through the Signal Exchange
|
||||
func (c *GrpcClient) receive(stream proto.SignalExchange_ConnectStreamClient,
|
||||
msgHandler func(msg *proto.Message) error) error {
|
||||
|
||||
for {
|
||||
msg, err := stream.Recv()
|
||||
switch s, ok := status.FromError(err); {
|
||||
case ok && s.Code() == codes.Canceled:
|
||||
log.Debugf("stream canceled (usually indicates shutdown)")
|
||||
return err
|
||||
case s.Code() == codes.Unavailable:
|
||||
log.Debugf("Signal Service is unavailable")
|
||||
return err
|
||||
case err == io.EOF:
|
||||
log.Debugf("Signal Service stream closed by server")
|
||||
return err
|
||||
case err != nil:
|
||||
return err
|
||||
}
|
||||
log.Tracef("received a new message from Peer [fingerprint: %s]", msg.Key)
|
||||
|
||||
decryptedMessage, err := c.decryptMessage(msg)
|
||||
if err != nil {
|
||||
log.Errorf("failed decrypting message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
|
||||
}
|
||||
|
||||
err = msgHandler(decryptedMessage)
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("error while handling message of Peer [key: %s] error: [%s]", msg.Key, err.Error())
|
||||
// todo send something??
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *GrpcClient) notifyDisconnected(err error) {
|
||||
c.connStateCallbackLock.RLock()
|
||||
defer c.connStateCallbackLock.RUnlock()
|
||||
|
||||
if c.connStateCallback == nil {
|
||||
return
|
||||
}
|
||||
c.connStateCallback.MarkSignalDisconnected(err)
|
||||
}
|
||||
|
||||
func (c *GrpcClient) notifyConnected() {
|
||||
c.connStateCallbackLock.RLock()
|
||||
defer c.connStateCallbackLock.RUnlock()
|
||||
|
||||
if c.connStateCallback == nil {
|
||||
return
|
||||
}
|
||||
c.connStateCallback.MarkSignalConnected()
|
||||
}
|
||||
84
shared/signal/client/mock.go
Normal file
84
shared/signal/client/mock.go
Normal file
@@ -0,0 +1,84 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/netbirdio/netbird/shared/signal/proto"
|
||||
)
|
||||
|
||||
type MockClient struct {
|
||||
CloseFunc func() error
|
||||
GetStatusFunc func() Status
|
||||
StreamConnectedFunc func() bool
|
||||
ReadyFunc func() bool
|
||||
WaitStreamConnectedFunc func()
|
||||
ReceiveFunc func(ctx context.Context, msgHandler func(msg *proto.Message) error) error
|
||||
SendToStreamFunc func(msg *proto.EncryptedMessage) error
|
||||
SendFunc func(msg *proto.Message) error
|
||||
SetOnReconnectedListenerFunc func(f func())
|
||||
}
|
||||
|
||||
// SetOnReconnectedListener sets the function to be called when the client reconnects.
|
||||
func (sm *MockClient) SetOnReconnectedListener(_ func()) {
|
||||
// Do nothing
|
||||
}
|
||||
|
||||
func (sm *MockClient) IsHealthy() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (sm *MockClient) Close() error {
|
||||
if sm.CloseFunc == nil {
|
||||
return nil
|
||||
}
|
||||
return sm.CloseFunc()
|
||||
}
|
||||
|
||||
func (sm *MockClient) GetStatus() Status {
|
||||
if sm.GetStatusFunc == nil {
|
||||
return ""
|
||||
}
|
||||
return sm.GetStatusFunc()
|
||||
}
|
||||
|
||||
func (sm *MockClient) StreamConnected() bool {
|
||||
if sm.StreamConnectedFunc == nil {
|
||||
return false
|
||||
}
|
||||
return sm.StreamConnectedFunc()
|
||||
}
|
||||
|
||||
func (sm *MockClient) Ready() bool {
|
||||
if sm.ReadyFunc == nil {
|
||||
return false
|
||||
}
|
||||
return sm.ReadyFunc()
|
||||
}
|
||||
|
||||
func (sm *MockClient) WaitStreamConnected() {
|
||||
if sm.WaitStreamConnectedFunc == nil {
|
||||
return
|
||||
}
|
||||
sm.WaitStreamConnectedFunc()
|
||||
}
|
||||
|
||||
func (sm *MockClient) Receive(ctx context.Context, msgHandler func(msg *proto.Message) error) error {
|
||||
if sm.ReceiveFunc == nil {
|
||||
return nil
|
||||
}
|
||||
return sm.ReceiveFunc(ctx, msgHandler)
|
||||
}
|
||||
|
||||
func (sm *MockClient) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
if sm.SendToStreamFunc == nil {
|
||||
return nil
|
||||
}
|
||||
return sm.SendToStreamFunc(msg)
|
||||
}
|
||||
|
||||
func (sm *MockClient) Send(msg *proto.Message) error {
|
||||
if sm.SendFunc == nil {
|
||||
return nil
|
||||
}
|
||||
return sm.SendFunc(msg)
|
||||
}
|
||||
Reference in New Issue
Block a user