mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-19 00:36:38 +00:00
Merge remote-tracking branch 'origin/main' into braginini/wasm
# Conflicts: # signal/client/client.go
This commit is contained in:
@@ -221,7 +221,7 @@ uploads:
|
|||||||
ids:
|
ids:
|
||||||
- deb
|
- deb
|
||||||
mode: archive
|
mode: archive
|
||||||
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main
|
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ if .Arm }}armhf{{ else }}{{ .Arch }}{{ end }};deb.package=
|
||||||
username: dev@wiretrustee.com
|
username: dev@wiretrustee.com
|
||||||
method: PUT
|
method: PUT
|
||||||
- name: yum
|
- name: yum
|
||||||
|
|||||||
@@ -123,8 +123,8 @@ func runClient() error {
|
|||||||
InitialInterval: time.Second,
|
InitialInterval: time.Second,
|
||||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||||
Multiplier: backoff.DefaultMultiplier,
|
Multiplier: backoff.DefaultMultiplier,
|
||||||
MaxInterval: time.Hour,
|
MaxInterval: 10 * time.Second,
|
||||||
MaxElapsedTime: 24 * 3 * time.Hour,
|
MaxElapsedTime: 24 * 3 * time.Hour, //stop the client after 3 days trying (must be a huge problem, e.g permission denied)
|
||||||
Stop: backoff.Stop,
|
Stop: backoff.Stop,
|
||||||
Clock: backoff.SystemClock,
|
Clock: backoff.SystemClock,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/cenkalti/backoff/v4"
|
"github.com/cenkalti/backoff/v4"
|
||||||
ice "github.com/pion/ice/v2"
|
"github.com/pion/ice/v2"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/wiretrustee/wiretrustee/iface"
|
"github.com/wiretrustee/wiretrustee/iface"
|
||||||
mgm "github.com/wiretrustee/wiretrustee/management/client"
|
mgm "github.com/wiretrustee/wiretrustee/management/client"
|
||||||
@@ -142,7 +142,7 @@ func (e *Engine) initializePeer(peer Peer) {
|
|||||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||||
Multiplier: backoff.DefaultMultiplier,
|
Multiplier: backoff.DefaultMultiplier,
|
||||||
MaxInterval: 5 * time.Second,
|
MaxInterval: 5 * time.Second,
|
||||||
MaxElapsedTime: time.Duration(0), //never stop
|
MaxElapsedTime: 0, //never stop
|
||||||
Stop: backoff.Stop,
|
Stop: backoff.Stop,
|
||||||
Clock: backoff.SystemClock,
|
Clock: backoff.SystemClock,
|
||||||
}, e.ctx)
|
}, e.ctx)
|
||||||
@@ -157,8 +157,7 @@ func (e *Engine) initializePeer(peer Peer) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnln(err)
|
log.Infof("retrying connection because of error: %s", err.Error())
|
||||||
log.Debugf("retrying connection because of error: %s", err.Error())
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -332,6 +331,8 @@ func (e *Engine) receiveManagementEvents() {
|
|||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// happens if management is unavailable for a long time.
|
||||||
|
// We want to cancel the operation of the whole client
|
||||||
e.cancel()
|
e.cancel()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -414,68 +415,77 @@ func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {
|
|||||||
|
|
||||||
// receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers
|
// receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers
|
||||||
func (e *Engine) receiveSignalEvents() {
|
func (e *Engine) receiveSignalEvents() {
|
||||||
// connect to a stream of messages coming from the signal server
|
|
||||||
e.signal.Receive(func(msg *sProto.Message) error {
|
|
||||||
|
|
||||||
e.syncMsgMux.Lock()
|
go func() {
|
||||||
defer e.syncMsgMux.Unlock()
|
// connect to a stream of messages coming from the signal server
|
||||||
|
err := e.signal.Receive(func(msg *sProto.Message) error {
|
||||||
|
|
||||||
conn := e.conns[msg.Key]
|
e.syncMsgMux.Lock()
|
||||||
if conn == nil {
|
defer e.syncMsgMux.Unlock()
|
||||||
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
|
||||||
}
|
|
||||||
|
|
||||||
if conn.Config.RemoteWgKey.String() != msg.Key {
|
conn := e.conns[msg.Key]
|
||||||
return fmt.Errorf("unknown peer %s", msg.Key)
|
if conn == nil {
|
||||||
}
|
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
||||||
|
|
||||||
switch msg.GetBody().Type {
|
|
||||||
case sProto.Body_OFFER:
|
|
||||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
err = conn.OnOffer(IceCredentials{
|
|
||||||
uFrag: remoteCred.UFrag,
|
|
||||||
pwd: remoteCred.Pwd,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
if conn.Config.RemoteWgKey.String() != msg.Key {
|
||||||
return err
|
return fmt.Errorf("unknown peer %s", msg.Key)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch msg.GetBody().Type {
|
||||||
|
case sProto.Body_OFFER:
|
||||||
|
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = conn.OnOffer(IceCredentials{
|
||||||
|
uFrag: remoteCred.UFrag,
|
||||||
|
pwd: remoteCred.Pwd,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
case sProto.Body_ANSWER:
|
||||||
|
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = conn.OnAnswer(IceCredentials{
|
||||||
|
uFrag: remoteCred.UFrag,
|
||||||
|
pwd: remoteCred.Pwd,
|
||||||
|
})
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
case sProto.Body_CANDIDATE:
|
||||||
|
|
||||||
|
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = conn.OnRemoteCandidate(candidate)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error handling CANDIATE from %s", msg.Key)
|
||||||
|
return err
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
case sProto.Body_ANSWER:
|
})
|
||||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
if err != nil {
|
||||||
if err != nil {
|
// happens if signal is unavailable for a long time.
|
||||||
return err
|
// We want to cancel the operation of the whole client
|
||||||
}
|
e.cancel()
|
||||||
err = conn.OnAnswer(IceCredentials{
|
return
|
||||||
uFrag: remoteCred.UFrag,
|
|
||||||
pwd: remoteCred.Pwd,
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
case sProto.Body_CANDIDATE:
|
|
||||||
|
|
||||||
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = conn.OnRemoteCandidate(candidate)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error handling CANDIATE from %s", msg.Key)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
return nil
|
e.signal.WaitStreamConnected()
|
||||||
})
|
|
||||||
|
|
||||||
e.signal.WaitConnected()
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package client
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
|
"fmt"
|
||||||
"github.com/cenkalti/backoff/v4"
|
"github.com/cenkalti/backoff/v4"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/wiretrustee/wiretrustee/client/system"
|
"github.com/wiretrustee/wiretrustee/client/system"
|
||||||
@@ -10,6 +11,7 @@ import (
|
|||||||
"github.com/wiretrustee/wiretrustee/management/proto"
|
"github.com/wiretrustee/wiretrustee/management/proto"
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
|
"google.golang.org/grpc/connectivity"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
"io"
|
"io"
|
||||||
@@ -70,13 +72,19 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
|
|||||||
InitialInterval: 800 * time.Millisecond,
|
InitialInterval: 800 * time.Millisecond,
|
||||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||||
Multiplier: backoff.DefaultMultiplier,
|
Multiplier: backoff.DefaultMultiplier,
|
||||||
MaxInterval: 15 * time.Minute,
|
MaxInterval: 10 * time.Second,
|
||||||
MaxElapsedTime: time.Hour, //stop after an hour of trying, the error will be propagated to the general retry of the client
|
MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
|
||||||
Stop: backoff.Stop,
|
Stop: backoff.Stop,
|
||||||
Clock: backoff.SystemClock,
|
Clock: backoff.SystemClock,
|
||||||
}, ctx)
|
}, ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ready indicates whether the client is okay and ready to be used
|
||||||
|
// for now it just checks whether gRPC connection to the service is ready
|
||||||
|
func (c *Client) ready() bool {
|
||||||
|
return c.conn.GetState() == connectivity.Ready
|
||||||
|
}
|
||||||
|
|
||||||
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
|
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
|
||||||
// Blocking request. The result will be sent via msgHandler callback function
|
// Blocking request. The result will be sent via msgHandler callback function
|
||||||
func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
|
func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
|
||||||
@@ -85,6 +93,12 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
|
|||||||
|
|
||||||
operation := func() error {
|
operation := func() error {
|
||||||
|
|
||||||
|
log.Debugf("management connection state %v", c.conn.GetState())
|
||||||
|
|
||||||
|
if !c.ready() {
|
||||||
|
return fmt.Errorf("no connection to management")
|
||||||
|
}
|
||||||
|
|
||||||
// todo we already have it since we did the Login, maybe cache it locally?
|
// todo we already have it since we did the Login, maybe cache it locally?
|
||||||
serverPubKey, err := c.GetServerPublicKey()
|
serverPubKey, err := c.GetServerPublicKey()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -98,7 +112,7 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infof("connected to the Management Service Stream")
|
log.Infof("connected to the Management Service stream")
|
||||||
|
|
||||||
// blocking until error
|
// blocking until error
|
||||||
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
|
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
|
||||||
@@ -139,7 +153,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
|
|||||||
for {
|
for {
|
||||||
update, err := stream.Recv()
|
update, err := stream.Recv()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
log.Errorf("managment stream was closed: %s", err)
|
log.Errorf("Management stream has been closed by server: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -165,6 +179,10 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
|
|||||||
|
|
||||||
// GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server)
|
// GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server)
|
||||||
func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
|
func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
|
||||||
|
if !c.ready() {
|
||||||
|
return nil, fmt.Errorf("no connection to management")
|
||||||
|
}
|
||||||
|
|
||||||
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) //todo make a general setting
|
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) //todo make a general setting
|
||||||
defer cancel()
|
defer cancel()
|
||||||
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
|
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
|
||||||
@@ -181,6 +199,9 @@ func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) {
|
func (c *Client) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) {
|
||||||
|
if !c.ready() {
|
||||||
|
return nil, fmt.Errorf("no connection to management")
|
||||||
|
}
|
||||||
loginReq, err := encryption.EncryptMessage(serverKey, c.key, req)
|
loginReq, err := encryption.EncryptMessage(serverKey, c.key, req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to encrypt message: %s", err)
|
log.Errorf("failed to encrypt message: %s", err)
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ var _ = Describe("Client", func() {
|
|||||||
})
|
})
|
||||||
|
|
||||||
Describe("Exchanging messages", func() {
|
Describe("Exchanging messages", func() {
|
||||||
Context("between connected peers", func() {
|
Context("between streamConnected peers", func() {
|
||||||
It("should be successful", func() {
|
It("should be successful", func() {
|
||||||
|
|
||||||
var msgReceived sync.WaitGroup
|
var msgReceived sync.WaitGroup
|
||||||
@@ -48,30 +48,42 @@ var _ = Describe("Client", func() {
|
|||||||
// connect PeerA to Signal
|
// connect PeerA to Signal
|
||||||
keyA, _ := wgtypes.GenerateKey()
|
keyA, _ := wgtypes.GenerateKey()
|
||||||
clientA := createSignalClient(addr, keyA)
|
clientA := createSignalClient(addr, keyA)
|
||||||
clientA.Receive(func(msg *sigProto.Message) error {
|
go func() {
|
||||||
receivedOnA = msg.GetBody().GetPayload()
|
err := clientA.Receive(func(msg *sigProto.Message) error {
|
||||||
msgReceived.Done()
|
receivedOnA = msg.GetBody().GetPayload()
|
||||||
return nil
|
msgReceived.Done()
|
||||||
})
|
return nil
|
||||||
clientA.WaitConnected()
|
})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
clientA.WaitStreamConnected()
|
||||||
|
|
||||||
// connect PeerB to Signal
|
// connect PeerB to Signal
|
||||||
keyB, _ := wgtypes.GenerateKey()
|
keyB, _ := wgtypes.GenerateKey()
|
||||||
clientB := createSignalClient(addr, keyB)
|
clientB := createSignalClient(addr, keyB)
|
||||||
clientB.Receive(func(msg *sigProto.Message) error {
|
|
||||||
receivedOnB = msg.GetBody().GetPayload()
|
go func() {
|
||||||
err := clientB.Send(&sigProto.Message{
|
err := clientB.Receive(func(msg *sigProto.Message) error {
|
||||||
Key: keyB.PublicKey().String(),
|
receivedOnB = msg.GetBody().GetPayload()
|
||||||
RemoteKey: keyA.PublicKey().String(),
|
err := clientB.Send(&sigProto.Message{
|
||||||
Body: &sigProto.Body{Payload: "pong"},
|
Key: keyB.PublicKey().String(),
|
||||||
|
RemoteKey: keyA.PublicKey().String(),
|
||||||
|
Body: &sigProto.Body{Payload: "pong"},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
Fail("failed sending a message to PeerA")
|
||||||
|
}
|
||||||
|
msgReceived.Done()
|
||||||
|
return nil
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Fail("failed sending a message to PeerA")
|
return
|
||||||
}
|
}
|
||||||
msgReceived.Done()
|
}()
|
||||||
return nil
|
|
||||||
})
|
clientB.WaitStreamConnected()
|
||||||
clientB.WaitConnected()
|
|
||||||
|
|
||||||
// PeerA initiates ping-pong
|
// PeerA initiates ping-pong
|
||||||
err := clientA.Send(&sigProto.Message{
|
err := clientA.Send(&sigProto.Message{
|
||||||
@@ -100,11 +112,15 @@ var _ = Describe("Client", func() {
|
|||||||
|
|
||||||
key, _ := wgtypes.GenerateKey()
|
key, _ := wgtypes.GenerateKey()
|
||||||
client := createSignalClient(addr, key)
|
client := createSignalClient(addr, key)
|
||||||
client.Receive(func(msg *sigProto.Message) error {
|
go func() {
|
||||||
return nil
|
err := client.Receive(func(msg *sigProto.Message) error {
|
||||||
})
|
return nil
|
||||||
client.WaitConnected()
|
})
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
client.WaitStreamConnected()
|
||||||
Expect(client).NotTo(BeNil())
|
Expect(client).NotTo(BeNil())
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user