Compare commits

...

9 Commits

Author SHA1 Message Date
Mikhail Bragin
ed1e4dfc51 refactor signal client sync func (#147)
* refactor: move goroutine that runs Signal Client Receive to the engine for better control

* chore: fix comments typo

* test: fix golint

* chore: comments update

* chore: consider connection state=READY in signal and management clients

* chore: fix typos

* test: fix signal ping-pong test

* chore: add wait condition to signal client

* refactor: add stream status to the Signal client

* refactor: defer mutex unlock
2021-11-06 15:00:13 +01:00
braginini
4d34fb4e64 chore: decrease backoff maxinterval to avoid long connection waiting times on the client app 2021-11-02 14:51:29 +01:00
Maycon Santos
1fb8b74cd2 set IF arm6 and empty attribute for package (#146)
There is a behavior or bug in goreleaser where it appends the file name in the target URL and that was causing issues and misconfigured properties
2021-11-01 20:33:26 +01:00
Mikhail Bragin
d040cfed7e fix: client app retry logic (#144)
* fix: retry logic
2021-11-01 09:34:06 +01:00
Maycon Santos
2c729fe5cc remove architecture info from deb (#145) 2021-11-01 09:33:22 +01:00
braginini
e9066b4651 chore: increase signal and management gRPC clients timeouts 2021-10-31 12:14:00 +01:00
Mikhail Bragin
673e807528 chore: set default key expiration if not provided by frontednd (#142) 2021-10-31 12:06:44 +01:00
Mikhail Bragin
892080bc38 docs: update key features 2021-10-27 13:56:55 +02:00
braginini
2d39f6ccae fix: remove ICE port limits 2021-10-27 10:49:03 +02:00
11 changed files with 378 additions and 262 deletions

View File

@@ -221,7 +221,7 @@ uploads:
ids: ids:
- deb - deb
mode: archive mode: archive
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ .Arch }} target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ if .Arm }}armhf{{ else }}{{ .Arch }}{{ end }};deb.package=
username: dev@wiretrustee.com username: dev@wiretrustee.com
method: PUT method: PUT
- name: yum - name: yum

View File

@@ -31,6 +31,16 @@ It requires zero configuration effort leaving behind the hassle of opening ports
There is no centralized VPN server with Wiretrustee - your computers, devices, machines, and servers connect to each other directly over a fast encrypted tunnel. There is no centralized VPN server with Wiretrustee - your computers, devices, machines, and servers connect to each other directly over a fast encrypted tunnel.
**Wiretrustee automates Wireguard-based networks, offering a management layer with:**
* Centralized Peer IP management with a neat UI dashboard.
* Automatic Peer discovery and configuration.
* UDP hole punching to establish peer-to-peer connections behind NAT, firewall, and without a public static IP.
* Connection relay fallback in case a peer-to-peer connection is not possible.
* Multitenancy (coming soon).
* Client application SSO with MFA (coming soon).
* Access Controls (coming soon).
* Activity Monitoring (coming soon).
### Secure peer-to-peer VPN in minutes ### Secure peer-to-peer VPN in minutes
<p float="left" align="middle"> <p float="left" align="middle">
<img src="docs/media/peerA.gif" width="400"/> <img src="docs/media/peerA.gif" width="400"/>
@@ -45,22 +55,6 @@ Hosted demo version:
[UI Dashboard Repo](https://github.com/wiretrustee/wiretrustee-dashboard) [UI Dashboard Repo](https://github.com/wiretrustee/wiretrustee-dashboard)
### Why using Wiretrustee?
* Connect multiple devices to each other via a secure peer-to-peer Wireguard VPN tunnel. At home, the office, or anywhere else.
* No need to open ports and expose public IPs on the device, routers etc.
* Uses Kernel Wireguard module if available.
* Automatic network change detection. When a new peer joins the network others are notified and keys are exchanged automatically.
* Automatically reconnects in case of network failures or switches.
* Automatic NAT traversal.
* Relay server fallback in case of an unsuccessful peer-to-peer connection.
* Private key never leaves your device.
* Automatic IP address management.
* Intuitive UI Dashboard.
* Works on ARM devices (e.g. Raspberry Pi).
* Open-source (including Management Service)
### A bit on Wiretrustee internals ### A bit on Wiretrustee internals
* Wiretrustee features a Management Service that offers peer IP management and network updates distribution (e.g. when new peer joins the network). * Wiretrustee features a Management Service that offers peer IP management and network updates distribution (e.g. when new peer joins the network).
* Wiretrustee uses WebRTC ICE implemented in [pion/ice library](https://github.com/pion/ice) to discover connection candidates when establishing a peer-to-peer connection between devices. * Wiretrustee uses WebRTC ICE implemented in [pion/ice library](https://github.com/pion/ice) to discover connection candidates when establishing a peer-to-peer connection between devices.

View File

@@ -1,7 +1,6 @@
package cmd package cmd
import ( import (
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service" "github.com/kardianos/service"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@@ -11,31 +10,12 @@ import (
func (p *program) Start(s service.Service) error { func (p *program) Start(s service.Service) error {
var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
// Start should not block. Do the actual work async. // Start should not block. Do the actual work async.
log.Info("starting service") //nolint log.Info("starting service") //nolint
go func() { go func() {
operation := func() error { err := runClient()
err := runClient()
if err != nil {
log.Warnf("retrying Wiretrustee client app due to error: %v", err)
return err
}
return nil
}
err := backoff.Retry(operation, backOff)
if err != nil { if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err) log.Errorf("stopped Wiretrustee client app due to error: %v", err)
return return
} }
}() }()

View File

@@ -2,6 +2,7 @@ package cmd
import ( import (
"context" "context"
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service" "github.com/kardianos/service"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@@ -12,6 +13,7 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"time"
) )
var ( var (
@@ -117,86 +119,107 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK
} }
func runClient() error { func runClient() error {
config, err := internal.ReadConfig(managementURL, configPath) var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop the client after 3 days trying (must be a huge problem, e.g permission denied)
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
operation := func() error {
config, err := internal.ReadConfig(managementURL, configPath)
if err != nil {
log.Errorf("failed reading config %s %v", configPath, err)
return err
}
//validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mgmTlsEnabled := false
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
if err != nil {
log.Warn(err)
return err
}
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
if err != nil {
log.Error(err)
return err
}
peerConfig := loginResp.GetPeerConfig()
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
if err != nil {
log.Error(err)
return err
}
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
err = engine.Start()
if err != nil {
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
return err
}
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
select {
case <-stopCh:
case <-ctx.Done():
}
backOff.Reset()
err = mgmClient.Close()
if err != nil {
log.Errorf("failed closing Management Service client %v", err)
return err
}
err = signalClient.Close()
if err != nil {
log.Errorf("failed closing Signal Service client %v", err)
return err
}
err = engine.Stop()
if err != nil {
log.Errorf("failed stopping engine %v", err)
return err
}
go func() {
cleanupCh <- struct{}{}
}()
log.Info("stopped Wiretrustee client")
return ctx.Err()
}
err := backoff.Retry(operation, backOff)
if err != nil { if err != nil {
log.Errorf("failed reading config %s %v", configPath, err) log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
return err return err
} }
return nil
//validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mgmTlsEnabled := false
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
if err != nil {
log.Warn(err)
return err
}
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
if err != nil {
log.Error(err)
return err
}
peerConfig := loginResp.GetPeerConfig()
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
if err != nil {
log.Error(err)
return err
}
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
err = engine.Start()
if err != nil {
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
return err
}
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
select {
case <-stopCh:
case <-ctx.Done():
}
err = mgmClient.Close()
if err != nil {
log.Errorf("failed closing Management Service client %v", err)
return err
}
err = signalClient.Close()
if err != nil {
log.Errorf("failed closing Signal Service client %v", err)
return err
}
err = engine.Stop()
if err != nil {
log.Errorf("failed stopping engine %v", err)
return err
}
go func() {
cleanupCh <- struct{}{}
}()
log.Info("stopped Wiretrustee client")
return ctx.Err()
} }

View File

@@ -128,8 +128,6 @@ func (conn *Connection) Open(timeout time.Duration) error {
a, err := ice.NewAgent(&ice.AgentConfig{ a, err := ice.NewAgent(&ice.AgentConfig{
// MulticastDNSMode: ice.MulticastDNSModeQueryAndGather, // MulticastDNSMode: ice.MulticastDNSModeQueryAndGather,
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4}, NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
PortMin: 57830,
PortMax: 57830,
Urls: conn.Config.StunTurnURLS, Urls: conn.Config.StunTurnURLS,
CandidateTypes: []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay}, CandidateTypes: []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay},
InterfaceFilter: func(s string) bool { InterfaceFilter: func(s string) bool {

View File

@@ -4,7 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
ice "github.com/pion/ice/v2" "github.com/pion/ice/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/iface" "github.com/wiretrustee/wiretrustee/iface"
mgm "github.com/wiretrustee/wiretrustee/management/client" mgm "github.com/wiretrustee/wiretrustee/management/client"
@@ -142,7 +142,7 @@ func (e *Engine) initializePeer(peer Peer) {
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier, Multiplier: backoff.DefaultMultiplier,
MaxInterval: 5 * time.Second, MaxInterval: 5 * time.Second,
MaxElapsedTime: time.Duration(0), //never stop MaxElapsedTime: 0, //never stop
Stop: backoff.Stop, Stop: backoff.Stop,
Clock: backoff.SystemClock, Clock: backoff.SystemClock,
}, e.ctx) }, e.ctx)
@@ -157,8 +157,7 @@ func (e *Engine) initializePeer(peer Peer) {
} }
if err != nil { if err != nil {
log.Warnln(err) log.Infof("retrying connection because of error: %s", err.Error())
log.Debugf("retrying connection because of error: %s", err.Error())
return err return err
} }
return nil return nil
@@ -332,6 +331,8 @@ func (e *Engine) receiveManagementEvents() {
return nil return nil
}) })
if err != nil { if err != nil {
// happens if management is unavailable for a long time.
// We want to cancel the operation of the whole client
e.cancel() e.cancel()
return return
} }
@@ -414,68 +415,77 @@ func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {
// receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers // receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers
func (e *Engine) receiveSignalEvents() { func (e *Engine) receiveSignalEvents() {
// connect to a stream of messages coming from the signal server
e.signal.Receive(func(msg *sProto.Message) error {
e.syncMsgMux.Lock() go func() {
defer e.syncMsgMux.Unlock() // connect to a stream of messages coming from the signal server
err := e.signal.Receive(func(msg *sProto.Message) error {
conn := e.conns[msg.Key] e.syncMsgMux.Lock()
if conn == nil { defer e.syncMsgMux.Unlock()
return fmt.Errorf("wrongly addressed message %s", msg.Key)
}
if conn.Config.RemoteWgKey.String() != msg.Key { conn := e.conns[msg.Key]
return fmt.Errorf("unknown peer %s", msg.Key) if conn == nil {
} return fmt.Errorf("wrongly addressed message %s", msg.Key)
switch msg.GetBody().Type {
case sProto.Body_OFFER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
} }
err = conn.OnOffer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})
if err != nil { if conn.Config.RemoteWgKey.String() != msg.Key {
return err return fmt.Errorf("unknown peer %s", msg.Key)
}
switch msg.GetBody().Type {
case sProto.Body_OFFER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
}
err = conn.OnOffer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})
if err != nil {
return err
}
return nil
case sProto.Body_ANSWER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
}
err = conn.OnAnswer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})
if err != nil {
return err
}
case sProto.Body_CANDIDATE:
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
if err != nil {
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
return err
}
err = conn.OnRemoteCandidate(candidate)
if err != nil {
log.Errorf("error handling CANDIATE from %s", msg.Key)
return err
}
} }
return nil return nil
case sProto.Body_ANSWER: })
remoteCred, err := signal.UnMarshalCredential(msg) if err != nil {
if err != nil { // happens if signal is unavailable for a long time.
return err // We want to cancel the operation of the whole client
} e.cancel()
err = conn.OnAnswer(IceCredentials{ return
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})
if err != nil {
return err
}
case sProto.Body_CANDIDATE:
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
if err != nil {
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
return err
}
err = conn.OnRemoteCandidate(candidate)
if err != nil {
log.Errorf("error handling CANDIATE from %s", msg.Key)
return err
}
} }
}()
return nil e.signal.WaitStreamConnected()
})
e.signal.WaitConnected()
} }

View File

@@ -3,6 +3,7 @@ package client
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"fmt"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/client/system" "github.com/wiretrustee/wiretrustee/client/system"
@@ -10,6 +11,7 @@ import (
"github.com/wiretrustee/wiretrustee/management/proto" "github.com/wiretrustee/wiretrustee/management/proto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"io" "io"
@@ -32,7 +34,7 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
} }
mgmCtx, cancel := context.WithTimeout(ctx, 3*time.Second) mgmCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
conn, err := grpc.DialContext( conn, err := grpc.DialContext(
mgmCtx, mgmCtx,
@@ -40,8 +42,8 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
transportOption, transportOption,
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 3 * time.Second, Time: 15 * time.Second,
Timeout: 2 * time.Second, Timeout: 10 * time.Second,
})) }))
if err != nil { if err != nil {
@@ -70,13 +72,19 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond, InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier, Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second, MaxInterval: 10 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop, Stop: backoff.Stop,
Clock: backoff.SystemClock, Clock: backoff.SystemClock,
}, ctx) }, ctx)
} }
// ready indicates whether the client is okay and ready to be used
// for now it just checks whether gRPC connection to the service is ready
func (c *Client) ready() bool {
return c.conn.GetState() == connectivity.Ready
}
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages // Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
// Blocking request. The result will be sent via msgHandler callback function // Blocking request. The result will be sent via msgHandler callback function
func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error { func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
@@ -85,6 +93,12 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
operation := func() error { operation := func() error {
log.Debugf("management connection state %v", c.conn.GetState())
if !c.ready() {
return fmt.Errorf("no connection to management")
}
// todo we already have it since we did the Login, maybe cache it locally? // todo we already have it since we did the Login, maybe cache it locally?
serverPubKey, err := c.GetServerPublicKey() serverPubKey, err := c.GetServerPublicKey()
if err != nil { if err != nil {
@@ -98,17 +112,15 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
return err return err
} }
log.Infof("connected to the Management Service Stream") log.Infof("connected to the Management Service stream")
// blocking until error // blocking until error
err = c.receiveEvents(stream, *serverPubKey, msgHandler) err = c.receiveEvents(stream, *serverPubKey, msgHandler)
if err != nil { if err != nil {
/*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied { backOff.Reset()
//todo handle differently??
}*/
return err return err
} }
backOff.Reset()
return nil return nil
} }
@@ -141,7 +153,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
for { for {
update, err := stream.Recv() update, err := stream.Recv()
if err == io.EOF { if err == io.EOF {
log.Errorf("managment stream was closed: %s", err) log.Errorf("Management stream has been closed by server: %s", err)
return err return err
} }
if err != nil { if err != nil {
@@ -167,6 +179,10 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
// GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server) // GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server)
func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) { func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
if !c.ready() {
return nil, fmt.Errorf("no connection to management")
}
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) //todo make a general setting mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) //todo make a general setting
defer cancel() defer cancel()
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{}) resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
@@ -183,6 +199,9 @@ func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
} }
func (c *Client) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) { func (c *Client) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) {
if !c.ready() {
return nil, fmt.Errorf("no connection to management")
}
loginReq, err := encryption.EncryptMessage(serverKey, c.key, req) loginReq, err := encryption.EncryptMessage(serverKey, c.key, req)
if err != nil { if err != nil {
log.Errorf("failed to encrypt message: %s", err) log.Errorf("failed to encrypt message: %s", err)

View File

@@ -3,11 +3,11 @@ package server
import ( import (
"github.com/google/uuid" "github.com/google/uuid"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/util"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"net" "net"
"sync" "sync"
"time"
) )
type AccountManager struct { type AccountManager struct {
@@ -35,16 +35,21 @@ func NewManager(store Store, peersUpdateManager *PeersUpdateManager) *AccountMan
} }
//AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account //AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account
func (am *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn time.Duration) (*SetupKey, error) { func (am *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn *util.Duration) (*SetupKey, error) {
am.mux.Lock() am.mux.Lock()
defer am.mux.Unlock() defer am.mux.Unlock()
keyDuration := DefaultSetupKeyDuration
if expiresIn != nil {
keyDuration = expiresIn.Duration
}
account, err := am.Store.GetAccount(accountId) account, err := am.Store.GetAccount(accountId)
if err != nil { if err != nil {
return nil, status.Errorf(codes.NotFound, "account not found") return nil, status.Errorf(codes.NotFound, "account not found")
} }
setupKey := GenerateSetupKey(keyName, keyType, expiresIn) setupKey := GenerateSetupKey(keyName, keyType, keyDuration)
account.SetupKeys[setupKey.Key] = setupKey account.SetupKeys[setupKey.Key] = setupKey
err = am.Store.SaveAccount(account) err = am.Store.SaveAccount(account)

View File

@@ -5,6 +5,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/management/server" "github.com/wiretrustee/wiretrustee/management/server"
"github.com/wiretrustee/wiretrustee/util"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"net/http" "net/http"
@@ -34,7 +35,7 @@ type SetupKeyResponse struct {
type SetupKeyRequest struct { type SetupKeyRequest struct {
Name string Name string
Type server.SetupKeyType Type server.SetupKeyType
ExpiresIn Duration ExpiresIn *util.Duration
Revoked bool Revoked bool
} }
@@ -102,7 +103,7 @@ func (h *SetupKeys) createKey(accountId string, w http.ResponseWriter, r *http.R
return return
} }
setupKey, err := h.accountManager.AddSetupKey(accountId, req.Name, req.Type, req.ExpiresIn.Duration) setupKey, err := h.accountManager.AddSetupKey(accountId, req.Name, req.Type, req.ExpiresIn)
if err != nil { if err != nil {
errStatus, ok := status.FromError(err) errStatus, ok := status.FromError(err)
if ok && errStatus.Code() == codes.NotFound { if ok && errStatus.Code() == codes.NotFound {

View File

@@ -11,6 +11,7 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
@@ -23,6 +24,12 @@ import (
// A set of tools to exchange connection details (Wireguard endpoints) with the remote peer. // A set of tools to exchange connection details (Wireguard endpoints) with the remote peer.
// Status is the status of the client
type Status string
const streamConnected Status = "streamConnected"
const streamDisconnected Status = "streamDisconnected"
// Client Wraps the Signal Exchange Service gRpc client // Client Wraps the Signal Exchange Service gRpc client
type Client struct { type Client struct {
key wgtypes.Key key wgtypes.Key
@@ -30,8 +37,11 @@ type Client struct {
signalConn *grpc.ClientConn signalConn *grpc.ClientConn
ctx context.Context ctx context.Context
stream proto.SignalExchange_ConnectStreamClient stream proto.SignalExchange_ConnectStreamClient
//waiting group to notify once stream is connected // connectedCh used to notify goroutines waiting for the connection to the Signal stream
connWg *sync.WaitGroup //todo use a channel instead?? connectedCh chan struct{}
mux sync.Mutex
// streamConnected indicates whether this client is streamConnected to the Signal stream
status Status
} }
// Close Closes underlying connections to the Signal Exchange // Close Closes underlying connections to the Signal Exchange
@@ -48,7 +58,7 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
} }
sigCtx, cancel := context.WithTimeout(ctx, 3*time.Second) sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
conn, err := grpc.DialContext( conn, err := grpc.DialContext(
sigCtx, sigCtx,
@@ -56,8 +66,8 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
transportOption, transportOption,
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 3 * time.Second, Time: 15 * time.Second,
Timeout: 2 * time.Second, Timeout: 10 * time.Second,
})) }))
if err != nil { if err != nil {
@@ -65,13 +75,13 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
return nil, err return nil, err
} }
var wg sync.WaitGroup
return &Client{ return &Client{
realClient: proto.NewSignalExchangeClient(conn), realClient: proto.NewSignalExchangeClient(conn),
ctx: ctx, ctx: ctx,
signalConn: conn, signalConn: conn,
key: key, key: key,
connWg: &wg, mux: sync.Mutex{},
status: streamDisconnected,
}, nil }, nil
} }
@@ -81,8 +91,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond, InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier, Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second, MaxInterval: 10 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop, Stop: backoff.Stop,
Clock: backoff.SystemClock, Clock: backoff.SystemClock,
}, ctx) }, ctx)
@@ -91,36 +101,79 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
// Receive Connects to the Signal Exchange message stream and starts receiving messages. // Receive Connects to the Signal Exchange message stream and starts receiving messages.
// The messages will be handled by msgHandler function provided. // The messages will be handled by msgHandler function provided.
// This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart) // This function is blocking and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
// The key is the identifier of our Peer (could be Wireguard public key) // The connection retry logic will try to reconnect for 30 min and if wasn't successful will propagate the error to the function caller.
func (c *Client) Receive(msgHandler func(msg *proto.Message) error) { func (c *Client) Receive(msgHandler func(msg *proto.Message) error) error {
c.connWg.Add(1)
go func() {
var backOff = defaultBackoff(c.ctx) var backOff = defaultBackoff(c.ctx)
operation := func() error { operation := func() error {
err := c.connect(c.key.PublicKey().String(), msgHandler) c.notifyStreamDisconnected()
if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
c.connWg.Add(1)
return err
}
backOff.Reset() log.Debugf("signal connection state %v", c.signalConn.GetState())
return nil if !c.ready() {
return fmt.Errorf("no connection to signal")
} }
err := backoff.Retry(operation, backOff) // connect to Signal stream identifying ourselves with a public Wireguard key
// todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management)
stream, err := c.connect(c.key.PublicKey().String())
if err != nil { if err != nil {
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err) log.Warnf("streamDisconnected from the Signal Exchange due to an error: %v", err)
return return err
} }
}()
c.notifyStreamConnected()
log.Infof("streamConnected to the Signal Service stream")
// start receiving messages from the Signal stream (from other peers through signal)
err = c.receive(stream, msgHandler)
if err != nil {
log.Warnf("streamDisconnected from the Signal Exchange due to an error: %v", err)
backOff.Reset()
return err
}
return nil
}
err := backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err)
return err
}
return nil
}
func (c *Client) notifyStreamDisconnected() {
c.mux.Lock()
defer c.mux.Unlock()
c.status = streamDisconnected
} }
func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error { func (c *Client) notifyStreamConnected() {
c.mux.Lock()
defer c.mux.Unlock()
c.status = streamConnected
if c.connectedCh != nil {
// there are goroutines waiting on this channel -> release them
close(c.connectedCh)
c.connectedCh = nil
}
}
func (c *Client) getStreamStatusChan() <-chan struct{} {
c.mux.Lock()
defer c.mux.Unlock()
if c.connectedCh == nil {
c.connectedCh = make(chan struct{})
}
return c.connectedCh
}
func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
c.stream = nil c.stream = nil
// add key fingerprint to the request header to be identified on the server side // add key fingerprint to the request header to be identified on the server side
@@ -131,35 +184,48 @@ func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error)
c.stream = stream c.stream = stream
if err != nil { if err != nil {
return err return nil, err
} }
// blocks // blocks
header, err := c.stream.Header() header, err := c.stream.Header()
if err != nil { if err != nil {
return err return nil, err
} }
registered := header.Get(proto.HeaderRegistered) registered := header.Get(proto.HeaderRegistered)
if len(registered) == 0 { if len(registered) == 0 {
return fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams") return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
} }
//connection established we are good to use the stream
c.connWg.Done()
log.Infof("connected to the Signal Exchange Stream") return stream, nil
return c.receive(stream, msgHandler)
} }
// WaitConnected waits until the client is connected to the message stream // ready indicates whether the client is okay and ready to be used
func (c *Client) WaitConnected() { // for now it just checks whether gRPC connection to the service is in state Ready
c.connWg.Wait() func (c *Client) ready() bool {
return c.signalConn.GetState() == connectivity.Ready
}
// WaitStreamConnected waits until the client is connected to the Signal stream
func (c *Client) WaitStreamConnected() {
if c.status == streamConnected {
return
}
ch := c.getStreamStatusChan()
select {
case <-c.ctx.Done():
case <-ch:
}
} }
// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server // SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server
// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange // The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange
// Client.connWg can be used to wait // Client.connWg can be used to wait
func (c *Client) SendToStream(msg *proto.EncryptedMessage) error { func (c *Client) SendToStream(msg *proto.EncryptedMessage) error {
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
if c.stream == nil { if c.stream == nil {
return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages") return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages")
} }
@@ -216,13 +282,17 @@ func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, er
// Send sends a message to the remote Peer through the Signal Exchange. // Send sends a message to the remote Peer through the Signal Exchange.
func (c *Client) Send(msg *proto.Message) error { func (c *Client) Send(msg *proto.Message) error {
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
encryptedMessage, err := c.encryptMessage(msg) encryptedMessage, err := c.encryptMessage(msg)
if err != nil { if err != nil {
return err return err
} }
_, err = c.realClient.Send(context.TODO(), encryptedMessage) _, err = c.realClient.Send(context.TODO(), encryptedMessage)
if err != nil { if err != nil {
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) //log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
return err return err
} }
@@ -239,10 +309,10 @@ func (c *Client) receive(stream proto.SignalExchange_ConnectStreamClient,
log.Warnf("stream canceled (usually indicates shutdown)") log.Warnf("stream canceled (usually indicates shutdown)")
return err return err
} else if s.Code() == codes.Unavailable { } else if s.Code() == codes.Unavailable {
log.Warnf("server has been stopped") log.Warnf("Signal Service is unavailable")
return err return err
} else if err == io.EOF { } else if err == io.EOF {
log.Warnf("stream closed by server") log.Warnf("Signal Service stream closed by server")
return err return err
} else if err != nil { } else if err != nil {
return err return err

View File

@@ -36,7 +36,7 @@ var _ = Describe("Client", func() {
}) })
Describe("Exchanging messages", func() { Describe("Exchanging messages", func() {
Context("between connected peers", func() { Context("between streamConnected peers", func() {
It("should be successful", func() { It("should be successful", func() {
var msgReceived sync.WaitGroup var msgReceived sync.WaitGroup
@@ -48,30 +48,42 @@ var _ = Describe("Client", func() {
// connect PeerA to Signal // connect PeerA to Signal
keyA, _ := wgtypes.GenerateKey() keyA, _ := wgtypes.GenerateKey()
clientA := createSignalClient(addr, keyA) clientA := createSignalClient(addr, keyA)
clientA.Receive(func(msg *sigProto.Message) error { go func() {
receivedOnA = msg.GetBody().GetPayload() err := clientA.Receive(func(msg *sigProto.Message) error {
msgReceived.Done() receivedOnA = msg.GetBody().GetPayload()
return nil msgReceived.Done()
}) return nil
clientA.WaitConnected() })
if err != nil {
return
}
}()
clientA.WaitStreamConnected()
// connect PeerB to Signal // connect PeerB to Signal
keyB, _ := wgtypes.GenerateKey() keyB, _ := wgtypes.GenerateKey()
clientB := createSignalClient(addr, keyB) clientB := createSignalClient(addr, keyB)
clientB.Receive(func(msg *sigProto.Message) error {
receivedOnB = msg.GetBody().GetPayload() go func() {
err := clientB.Send(&sigProto.Message{ err := clientB.Receive(func(msg *sigProto.Message) error {
Key: keyB.PublicKey().String(), receivedOnB = msg.GetBody().GetPayload()
RemoteKey: keyA.PublicKey().String(), err := clientB.Send(&sigProto.Message{
Body: &sigProto.Body{Payload: "pong"}, Key: keyB.PublicKey().String(),
RemoteKey: keyA.PublicKey().String(),
Body: &sigProto.Body{Payload: "pong"},
})
if err != nil {
Fail("failed sending a message to PeerA")
}
msgReceived.Done()
return nil
}) })
if err != nil { if err != nil {
Fail("failed sending a message to PeerA") return
} }
msgReceived.Done() }()
return nil
}) clientB.WaitStreamConnected()
clientB.WaitConnected()
// PeerA initiates ping-pong // PeerA initiates ping-pong
err := clientA.Send(&sigProto.Message{ err := clientA.Send(&sigProto.Message{
@@ -100,11 +112,15 @@ var _ = Describe("Client", func() {
key, _ := wgtypes.GenerateKey() key, _ := wgtypes.GenerateKey()
client := createSignalClient(addr, key) client := createSignalClient(addr, key)
client.Receive(func(msg *sigProto.Message) error { go func() {
return nil err := client.Receive(func(msg *sigProto.Message) error {
}) return nil
client.WaitConnected() })
if err != nil {
return
}
}()
client.WaitStreamConnected()
Expect(client).NotTo(BeNil()) Expect(client).NotTo(BeNil())
}) })
}) })