mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-25 19:56:46 +00:00
Compare commits
8 Commits
v0.2.0-bet
...
v0.2.2-bet
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d34fb4e64 | ||
|
|
1fb8b74cd2 | ||
|
|
d040cfed7e | ||
|
|
2c729fe5cc | ||
|
|
e9066b4651 | ||
|
|
673e807528 | ||
|
|
892080bc38 | ||
|
|
2d39f6ccae |
@@ -221,7 +221,7 @@ uploads:
|
|||||||
ids:
|
ids:
|
||||||
- deb
|
- deb
|
||||||
mode: archive
|
mode: archive
|
||||||
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ .Arch }}
|
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ if .Arm }}armhf{{ else }}{{ .Arch }}{{ end }};deb.package=
|
||||||
username: dev@wiretrustee.com
|
username: dev@wiretrustee.com
|
||||||
method: PUT
|
method: PUT
|
||||||
- name: yum
|
- name: yum
|
||||||
@@ -230,4 +230,4 @@ uploads:
|
|||||||
mode: archive
|
mode: archive
|
||||||
target: https://pkgs.wiretrustee.com/yum/{{ .Arch }}{{ if .Arm }}{{ .Arm }}{{ end }}
|
target: https://pkgs.wiretrustee.com/yum/{{ .Arch }}{{ if .Arm }}{{ .Arm }}{{ end }}
|
||||||
username: dev@wiretrustee.com
|
username: dev@wiretrustee.com
|
||||||
method: PUT
|
method: PUT
|
||||||
|
|||||||
26
README.md
26
README.md
@@ -31,6 +31,16 @@ It requires zero configuration effort leaving behind the hassle of opening ports
|
|||||||
|
|
||||||
There is no centralized VPN server with Wiretrustee - your computers, devices, machines, and servers connect to each other directly over a fast encrypted tunnel.
|
There is no centralized VPN server with Wiretrustee - your computers, devices, machines, and servers connect to each other directly over a fast encrypted tunnel.
|
||||||
|
|
||||||
|
**Wiretrustee automates Wireguard-based networks, offering a management layer with:**
|
||||||
|
* Centralized Peer IP management with a neat UI dashboard.
|
||||||
|
* Automatic Peer discovery and configuration.
|
||||||
|
* UDP hole punching to establish peer-to-peer connections behind NAT, firewall, and without a public static IP.
|
||||||
|
* Connection relay fallback in case a peer-to-peer connection is not possible.
|
||||||
|
* Multitenancy (coming soon).
|
||||||
|
* Client application SSO with MFA (coming soon).
|
||||||
|
* Access Controls (coming soon).
|
||||||
|
* Activity Monitoring (coming soon).
|
||||||
|
|
||||||
### Secure peer-to-peer VPN in minutes
|
### Secure peer-to-peer VPN in minutes
|
||||||
<p float="left" align="middle">
|
<p float="left" align="middle">
|
||||||
<img src="docs/media/peerA.gif" width="400"/>
|
<img src="docs/media/peerA.gif" width="400"/>
|
||||||
@@ -45,22 +55,6 @@ Hosted demo version:
|
|||||||
[UI Dashboard Repo](https://github.com/wiretrustee/wiretrustee-dashboard)
|
[UI Dashboard Repo](https://github.com/wiretrustee/wiretrustee-dashboard)
|
||||||
|
|
||||||
|
|
||||||
### Why using Wiretrustee?
|
|
||||||
|
|
||||||
* Connect multiple devices to each other via a secure peer-to-peer Wireguard VPN tunnel. At home, the office, or anywhere else.
|
|
||||||
* No need to open ports and expose public IPs on the device, routers etc.
|
|
||||||
* Uses Kernel Wireguard module if available.
|
|
||||||
* Automatic network change detection. When a new peer joins the network others are notified and keys are exchanged automatically.
|
|
||||||
* Automatically reconnects in case of network failures or switches.
|
|
||||||
* Automatic NAT traversal.
|
|
||||||
* Relay server fallback in case of an unsuccessful peer-to-peer connection.
|
|
||||||
* Private key never leaves your device.
|
|
||||||
* Automatic IP address management.
|
|
||||||
* Intuitive UI Dashboard.
|
|
||||||
* Works on ARM devices (e.g. Raspberry Pi).
|
|
||||||
* Open-source (including Management Service)
|
|
||||||
|
|
||||||
|
|
||||||
### A bit on Wiretrustee internals
|
### A bit on Wiretrustee internals
|
||||||
* Wiretrustee features a Management Service that offers peer IP management and network updates distribution (e.g. when new peer joins the network).
|
* Wiretrustee features a Management Service that offers peer IP management and network updates distribution (e.g. when new peer joins the network).
|
||||||
* Wiretrustee uses WebRTC ICE implemented in [pion/ice library](https://github.com/pion/ice) to discover connection candidates when establishing a peer-to-peer connection between devices.
|
* Wiretrustee uses WebRTC ICE implemented in [pion/ice library](https://github.com/pion/ice) to discover connection candidates when establishing a peer-to-peer connection between devices.
|
||||||
|
|||||||
@@ -1,7 +1,6 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"github.com/cenkalti/backoff/v4"
|
|
||||||
"github.com/kardianos/service"
|
"github.com/kardianos/service"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
@@ -11,31 +10,12 @@ import (
|
|||||||
|
|
||||||
func (p *program) Start(s service.Service) error {
|
func (p *program) Start(s service.Service) error {
|
||||||
|
|
||||||
var backOff = &backoff.ExponentialBackOff{
|
|
||||||
InitialInterval: time.Second,
|
|
||||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
|
||||||
Multiplier: backoff.DefaultMultiplier,
|
|
||||||
MaxInterval: 30 * time.Second,
|
|
||||||
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
|
|
||||||
Stop: backoff.Stop,
|
|
||||||
Clock: backoff.SystemClock,
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start should not block. Do the actual work async.
|
// Start should not block. Do the actual work async.
|
||||||
log.Info("starting service") //nolint
|
log.Info("starting service") //nolint
|
||||||
go func() {
|
go func() {
|
||||||
operation := func() error {
|
err := runClient()
|
||||||
err := runClient()
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("retrying Wiretrustee client app due to error: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
err := backoff.Retry(operation, backOff)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
|
log.Errorf("stopped Wiretrustee client app due to error: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
181
client/cmd/up.go
181
client/cmd/up.go
@@ -2,6 +2,7 @@ package cmd
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/cenkalti/backoff/v4"
|
||||||
"github.com/kardianos/service"
|
"github.com/kardianos/service"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
@@ -12,6 +13,7 @@ import (
|
|||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@@ -117,86 +119,107 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK
|
|||||||
}
|
}
|
||||||
|
|
||||||
func runClient() error {
|
func runClient() error {
|
||||||
config, err := internal.ReadConfig(managementURL, configPath)
|
var backOff = &backoff.ExponentialBackOff{
|
||||||
|
InitialInterval: time.Second,
|
||||||
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||||
|
Multiplier: backoff.DefaultMultiplier,
|
||||||
|
MaxInterval: 10 * time.Second,
|
||||||
|
MaxElapsedTime: 24 * 3 * time.Hour, //stop the client after 3 days trying (must be a huge problem, e.g permission denied)
|
||||||
|
Stop: backoff.Stop,
|
||||||
|
Clock: backoff.SystemClock,
|
||||||
|
}
|
||||||
|
|
||||||
|
operation := func() error {
|
||||||
|
|
||||||
|
config, err := internal.ReadConfig(managementURL, configPath)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed reading config %s %v", configPath, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
//validate our peer's Wireguard PRIVATE key
|
||||||
|
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
mgmTlsEnabled := false
|
||||||
|
if config.ManagementURL.Scheme == "https" {
|
||||||
|
mgmTlsEnabled = true
|
||||||
|
}
|
||||||
|
|
||||||
|
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
|
||||||
|
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
|
||||||
|
if err != nil {
|
||||||
|
log.Warn(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
|
||||||
|
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
peerConfig := loginResp.GetPeerConfig()
|
||||||
|
|
||||||
|
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
|
||||||
|
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
|
||||||
|
err = engine.Start()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-stopCh:
|
||||||
|
case <-ctx.Done():
|
||||||
|
}
|
||||||
|
|
||||||
|
backOff.Reset()
|
||||||
|
|
||||||
|
err = mgmClient.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed closing Management Service client %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
err = signalClient.Close()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed closing Signal Service client %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = engine.Stop()
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed stopping engine %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
cleanupCh <- struct{}{}
|
||||||
|
}()
|
||||||
|
|
||||||
|
log.Info("stopped Wiretrustee client")
|
||||||
|
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
err := backoff.Retry(operation, backOff)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed reading config %s %v", configPath, err)
|
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
//validate our peer's Wireguard PRIVATE key
|
|
||||||
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
mgmTlsEnabled := false
|
|
||||||
if config.ManagementURL.Scheme == "https" {
|
|
||||||
mgmTlsEnabled = true
|
|
||||||
}
|
|
||||||
|
|
||||||
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
|
|
||||||
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
|
|
||||||
if err != nil {
|
|
||||||
log.Warn(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
|
|
||||||
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
peerConfig := loginResp.GetPeerConfig()
|
|
||||||
|
|
||||||
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
|
|
||||||
if err != nil {
|
|
||||||
log.Error(err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
|
|
||||||
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
|
|
||||||
err = engine.Start()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-stopCh:
|
|
||||||
case <-ctx.Done():
|
|
||||||
}
|
|
||||||
|
|
||||||
err = mgmClient.Close()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed closing Management Service client %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
err = signalClient.Close()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed closing Signal Service client %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = engine.Stop()
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("failed stopping engine %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
cleanupCh <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
log.Info("stopped Wiretrustee client")
|
|
||||||
|
|
||||||
return ctx.Err()
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -128,8 +128,6 @@ func (conn *Connection) Open(timeout time.Duration) error {
|
|||||||
a, err := ice.NewAgent(&ice.AgentConfig{
|
a, err := ice.NewAgent(&ice.AgentConfig{
|
||||||
// MulticastDNSMode: ice.MulticastDNSModeQueryAndGather,
|
// MulticastDNSMode: ice.MulticastDNSModeQueryAndGather,
|
||||||
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
|
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
|
||||||
PortMin: 57830,
|
|
||||||
PortMax: 57830,
|
|
||||||
Urls: conn.Config.StunTurnURLS,
|
Urls: conn.Config.StunTurnURLS,
|
||||||
CandidateTypes: []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay},
|
CandidateTypes: []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay},
|
||||||
InterfaceFilter: func(s string) bool {
|
InterfaceFilter: func(s string) bool {
|
||||||
|
|||||||
@@ -32,7 +32,7 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
|
|||||||
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
|
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
mgmCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
conn, err := grpc.DialContext(
|
conn, err := grpc.DialContext(
|
||||||
mgmCtx,
|
mgmCtx,
|
||||||
@@ -40,8 +40,8 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
|
|||||||
transportOption,
|
transportOption,
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||||
Time: 3 * time.Second,
|
Time: 15 * time.Second,
|
||||||
Timeout: 2 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -70,8 +70,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
|
|||||||
InitialInterval: 800 * time.Millisecond,
|
InitialInterval: 800 * time.Millisecond,
|
||||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||||
Multiplier: backoff.DefaultMultiplier,
|
Multiplier: backoff.DefaultMultiplier,
|
||||||
MaxInterval: 30 * time.Second,
|
MaxInterval: 10 * time.Second,
|
||||||
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
|
MaxElapsedTime: 30 * time.Minute, //stop after an 30 min of trying, the error will be propagated to the general retry of the client
|
||||||
Stop: backoff.Stop,
|
Stop: backoff.Stop,
|
||||||
Clock: backoff.SystemClock,
|
Clock: backoff.SystemClock,
|
||||||
}, ctx)
|
}, ctx)
|
||||||
@@ -103,12 +103,10 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
|
|||||||
// blocking until error
|
// blocking until error
|
||||||
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
|
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
/*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied {
|
backOff.Reset()
|
||||||
//todo handle differently??
|
|
||||||
}*/
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
backOff.Reset()
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,11 +3,11 @@ package server
|
|||||||
import (
|
import (
|
||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/wiretrustee/wiretrustee/util"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
type AccountManager struct {
|
type AccountManager struct {
|
||||||
@@ -35,16 +35,21 @@ func NewManager(store Store, peersUpdateManager *PeersUpdateManager) *AccountMan
|
|||||||
}
|
}
|
||||||
|
|
||||||
//AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account
|
//AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account
|
||||||
func (am *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn time.Duration) (*SetupKey, error) {
|
func (am *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn *util.Duration) (*SetupKey, error) {
|
||||||
am.mux.Lock()
|
am.mux.Lock()
|
||||||
defer am.mux.Unlock()
|
defer am.mux.Unlock()
|
||||||
|
|
||||||
|
keyDuration := DefaultSetupKeyDuration
|
||||||
|
if expiresIn != nil {
|
||||||
|
keyDuration = expiresIn.Duration
|
||||||
|
}
|
||||||
|
|
||||||
account, err := am.Store.GetAccount(accountId)
|
account, err := am.Store.GetAccount(accountId)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, status.Errorf(codes.NotFound, "account not found")
|
return nil, status.Errorf(codes.NotFound, "account not found")
|
||||||
}
|
}
|
||||||
|
|
||||||
setupKey := GenerateSetupKey(keyName, keyType, expiresIn)
|
setupKey := GenerateSetupKey(keyName, keyType, keyDuration)
|
||||||
account.SetupKeys[setupKey.Key] = setupKey
|
account.SetupKeys[setupKey.Key] = setupKey
|
||||||
|
|
||||||
err = am.Store.SaveAccount(account)
|
err = am.Store.SaveAccount(account)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/wiretrustee/wiretrustee/management/server"
|
"github.com/wiretrustee/wiretrustee/management/server"
|
||||||
|
"github.com/wiretrustee/wiretrustee/util"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"net/http"
|
"net/http"
|
||||||
@@ -34,7 +35,7 @@ type SetupKeyResponse struct {
|
|||||||
type SetupKeyRequest struct {
|
type SetupKeyRequest struct {
|
||||||
Name string
|
Name string
|
||||||
Type server.SetupKeyType
|
Type server.SetupKeyType
|
||||||
ExpiresIn Duration
|
ExpiresIn *util.Duration
|
||||||
Revoked bool
|
Revoked bool
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -102,7 +103,7 @@ func (h *SetupKeys) createKey(accountId string, w http.ResponseWriter, r *http.R
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
setupKey, err := h.accountManager.AddSetupKey(accountId, req.Name, req.Type, req.ExpiresIn.Duration)
|
setupKey, err := h.accountManager.AddSetupKey(accountId, req.Name, req.Type, req.ExpiresIn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errStatus, ok := status.FromError(err)
|
errStatus, ok := status.FromError(err)
|
||||||
if ok && errStatus.Code() == codes.NotFound {
|
if ok && errStatus.Code() == codes.NotFound {
|
||||||
|
|||||||
@@ -48,7 +48,7 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
|
|||||||
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
|
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
|
||||||
}
|
}
|
||||||
|
|
||||||
sigCtx, cancel := context.WithTimeout(ctx, 3*time.Second)
|
sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
conn, err := grpc.DialContext(
|
conn, err := grpc.DialContext(
|
||||||
sigCtx,
|
sigCtx,
|
||||||
@@ -56,8 +56,8 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
|
|||||||
transportOption,
|
transportOption,
|
||||||
grpc.WithBlock(),
|
grpc.WithBlock(),
|
||||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||||
Time: 3 * time.Second,
|
Time: 15 * time.Second,
|
||||||
Timeout: 2 * time.Second,
|
Timeout: 10 * time.Second,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -81,8 +81,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
|
|||||||
InitialInterval: 800 * time.Millisecond,
|
InitialInterval: 800 * time.Millisecond,
|
||||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||||
Multiplier: backoff.DefaultMultiplier,
|
Multiplier: backoff.DefaultMultiplier,
|
||||||
MaxInterval: 30 * time.Second,
|
MaxInterval: 10 * time.Second,
|
||||||
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
|
MaxElapsedTime: 30 * time.Minute, //stop after an 30 min of trying, the error will be propagated to the general retry of the client
|
||||||
Stop: backoff.Stop,
|
Stop: backoff.Stop,
|
||||||
Clock: backoff.SystemClock,
|
Clock: backoff.SystemClock,
|
||||||
}, ctx)
|
}, ctx)
|
||||||
@@ -101,14 +101,19 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
|
|||||||
|
|
||||||
operation := func() error {
|
operation := func() error {
|
||||||
|
|
||||||
err := c.connect(c.key.PublicKey().String(), msgHandler)
|
stream, err := c.connect(c.key.PublicKey().String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
||||||
c.connWg.Add(1)
|
c.connWg.Add(1)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
backOff.Reset()
|
err = c.receive(stream, msgHandler)
|
||||||
|
if err != nil {
|
||||||
|
backOff.Reset()
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -120,7 +125,7 @@ func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
|
|||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error {
|
func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
|
||||||
c.stream = nil
|
c.stream = nil
|
||||||
|
|
||||||
// add key fingerprint to the request header to be identified on the server side
|
// add key fingerprint to the request header to be identified on the server side
|
||||||
@@ -131,23 +136,23 @@ func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error)
|
|||||||
|
|
||||||
c.stream = stream
|
c.stream = stream
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
// blocks
|
// blocks
|
||||||
header, err := c.stream.Header()
|
header, err := c.stream.Header()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return nil, err
|
||||||
}
|
}
|
||||||
registered := header.Get(proto.HeaderRegistered)
|
registered := header.Get(proto.HeaderRegistered)
|
||||||
if len(registered) == 0 {
|
if len(registered) == 0 {
|
||||||
return fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
|
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
|
||||||
}
|
}
|
||||||
//connection established we are good to use the stream
|
//connection established we are good to use the stream
|
||||||
c.connWg.Done()
|
c.connWg.Done()
|
||||||
|
|
||||||
log.Infof("connected to the Signal Exchange Stream")
|
log.Infof("connected to the Signal Exchange Stream")
|
||||||
|
|
||||||
return c.receive(stream, msgHandler)
|
return stream, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// WaitConnected waits until the client is connected to the message stream
|
// WaitConnected waits until the client is connected to the message stream
|
||||||
|
|||||||
Reference in New Issue
Block a user