Compare commits

...

13 Commits

Author SHA1 Message Date
Maycon Santos
7bf9793f85 Support environment vars (#155)
* updage flag values from environment variables

* add log and removing unused constants

* removing unused code

* Docker build client

* fix indentation

* Documentation with docker command

* use docker volume
2021-11-15 09:11:50 +01:00
Maycon Santos
fcbf980588 Stop service before uninstall (#158) 2021-11-14 21:30:18 +01:00
Mikhail Bragin
d08e5efbce fix: too many open files caused by agent not being closed (#154)
* fix: too many open files caused by agent not being closed after unsuccessful attempts to start a peer connection (happens when no network available)

* fix: minor refactor to consider signal status
2021-11-14 19:41:17 +01:00
Maycon Santos
95ef8547f3 Signal management arm builds (#152)
* Add arm builds for Signal and Management services

* adding arm's binary version
2021-11-07 13:11:03 +01:00
Mikhail Bragin
ed1e4dfc51 refactor signal client sync func (#147)
* refactor: move goroutine that runs Signal Client Receive to the engine for better control

* chore: fix comments typo

* test: fix golint

* chore: comments update

* chore: consider connection state=READY in signal and management clients

* chore: fix typos

* test: fix signal ping-pong test

* chore: add wait condition to signal client

* refactor: add stream status to the Signal client

* refactor: defer mutex unlock
2021-11-06 15:00:13 +01:00
braginini
4d34fb4e64 chore: decrease backoff maxinterval to avoid long connection waiting times on the client app 2021-11-02 14:51:29 +01:00
Maycon Santos
1fb8b74cd2 set IF arm6 and empty attribute for package (#146)
There is a behavior or bug in goreleaser where it appends the file name in the target URL and that was causing issues and misconfigured properties
2021-11-01 20:33:26 +01:00
Mikhail Bragin
d040cfed7e fix: client app retry logic (#144)
* fix: retry logic
2021-11-01 09:34:06 +01:00
Maycon Santos
2c729fe5cc remove architecture info from deb (#145) 2021-11-01 09:33:22 +01:00
braginini
e9066b4651 chore: increase signal and management gRPC clients timeouts 2021-10-31 12:14:00 +01:00
Mikhail Bragin
673e807528 chore: set default key expiration if not provided by frontednd (#142) 2021-10-31 12:06:44 +01:00
Mikhail Bragin
892080bc38 docs: update key features 2021-10-27 13:56:55 +02:00
braginini
2d39f6ccae fix: remove ICE port limits 2021-10-27 10:49:03 +02:00
18 changed files with 559 additions and 285 deletions

View File

@@ -37,6 +37,7 @@ builds:
goarch: goarch:
- amd64 - amd64
- arm64 - arm64
- arm
ldflags: ldflags:
- -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.CommitDate}} -X main.builtBy=goreleaser - -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.CommitDate}} -X main.builtBy=goreleaser
mod_timestamp: '{{ .CommitTimestamp }}' mod_timestamp: '{{ .CommitTimestamp }}'
@@ -50,6 +51,7 @@ builds:
goarch: goarch:
- amd64 - amd64
- arm64 - arm64
- arm
ldflags: ldflags:
- -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.CommitDate}} -X main.builtBy=goreleaser - -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.CommitDate}} -X main.builtBy=goreleaser
mod_timestamp: '{{ .CommitTimestamp }}' mod_timestamp: '{{ .CommitTimestamp }}'
@@ -83,6 +85,52 @@ nfpms:
postinstall: "release_files/post_install.sh" postinstall: "release_files/post_install.sh"
preremove: "release_files/pre_remove.sh" preremove: "release_files/pre_remove.sh"
dockers: dockers:
- image_templates:
- wiretrustee/wiretrustee:{{ .Version }}-amd64
ids:
- wiretrustee
goarch: amd64
use: buildx
dockerfile: client/Dockerfile
build_flag_templates:
- "--platform=linux/amd64"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.title={{.ProjectName}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates:
- wiretrustee/wiretrustee:{{ .Version }}-arm64v8
ids:
- wiretrustee
goarch: arm64
use: buildx
dockerfile: client/Dockerfile
build_flag_templates:
- "--platform=linux/arm64"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.title={{.ProjectName}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates:
- wiretrustee/wiretrustee:{{ .Version }}-arm
ids:
- wiretrustee
goarch: arm
goarm: 6
use: buildx
dockerfile: client/Dockerfile
build_flag_templates:
- "--platform=linux/arm"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.title={{.ProjectName}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates: - image_templates:
- wiretrustee/signal:{{ .Version }}-amd64 - wiretrustee/signal:{{ .Version }}-amd64
ids: ids:
@@ -113,6 +161,22 @@ dockers:
- "--label=org.opencontainers.image.revision={{.FullCommit}}" - "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}" - "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com" - "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates:
- wiretrustee/signal:{{ .Version }}-arm
ids:
- wiretrustee-signal
goarch: arm
goarm: 6
use: buildx
dockerfile: signal/Dockerfile
build_flag_templates:
- "--platform=linux/arm"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.title={{.ProjectName}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates: - image_templates:
- wiretrustee/management:{{ .Version }}-amd64 - wiretrustee/management:{{ .Version }}-amd64
ids: ids:
@@ -143,6 +207,22 @@ dockers:
- "--label=org.opencontainers.image.revision={{.FullCommit}}" - "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}" - "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com" - "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates:
- wiretrustee/management:{{ .Version }}-arm
ids:
- wiretrustee-mgmt
goarch: arm
goarm: 6
use: buildx
dockerfile: management/Dockerfile
build_flag_templates:
- "--platform=linux/arm"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.title={{.ProjectName}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates: - image_templates:
- wiretrustee/management:{{ .Version }}-debug-amd64 - wiretrustee/management:{{ .Version }}-debug-amd64
ids: ids:
@@ -174,30 +254,63 @@ dockers:
- "--label=org.opencontainers.image.version={{.Version}}" - "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com" - "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates:
- wiretrustee/management:{{ .Version }}-debug-arm
ids:
- wiretrustee-mgmt
goarch: arm
goarm: 6
use: buildx
dockerfile: management/Dockerfile.debug
build_flag_templates:
- "--platform=linux/arm"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.title={{.ProjectName}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com"
docker_manifests: docker_manifests:
- name_template: wiretrustee/wiretrustee:{{ .Version }}
image_templates:
- wiretrustee/wiretrustee:{{ .Version }}-arm64v8
- wiretrustee/wiretrustee:{{ .Version }}-arm
- wiretrustee/wiretrustee:{{ .Version }}-amd64
- name_template: wiretrustee/wiretrustee:latest
image_templates:
- wiretrustee/wiretrustee:{{ .Version }}-arm64v8
- wiretrustee/wiretrustee:{{ .Version }}-arm
- wiretrustee/wiretrustee:{{ .Version }}-amd64
- name_template: wiretrustee/signal:{{ .Version }} - name_template: wiretrustee/signal:{{ .Version }}
image_templates: image_templates:
- wiretrustee/signal:{{ .Version }}-arm64v8 - wiretrustee/signal:{{ .Version }}-arm64v8
- wiretrustee/signal:{{ .Version }}-arm
- wiretrustee/signal:{{ .Version }}-amd64 - wiretrustee/signal:{{ .Version }}-amd64
- name_template: wiretrustee/signal:latest - name_template: wiretrustee/signal:latest
image_templates: image_templates:
- wiretrustee/signal:{{ .Version }}-arm64v8 - wiretrustee/signal:{{ .Version }}-arm64v8
- wiretrustee/signal:{{ .Version }}-arm
- wiretrustee/signal:{{ .Version }}-amd64 - wiretrustee/signal:{{ .Version }}-amd64
- name_template: wiretrustee/management:{{ .Version }} - name_template: wiretrustee/management:{{ .Version }}
image_templates: image_templates:
- wiretrustee/management:{{ .Version }}-arm64v8 - wiretrustee/management:{{ .Version }}-arm64v8
- wiretrustee/management:{{ .Version }}-arm
- wiretrustee/management:{{ .Version }}-amd64 - wiretrustee/management:{{ .Version }}-amd64
- name_template: wiretrustee/management:latest - name_template: wiretrustee/management:latest
image_templates: image_templates:
- wiretrustee/management:{{ .Version }}-arm64v8 - wiretrustee/management:{{ .Version }}-arm64v8
- wiretrustee/management:{{ .Version }}-arm
- wiretrustee/management:{{ .Version }}-amd64 - wiretrustee/management:{{ .Version }}-amd64
- name_template: wiretrustee/management:debug-latest - name_template: wiretrustee/management:debug-latest
image_templates: image_templates:
- wiretrustee/management:{{ .Version }}-debug-arm64v8 - wiretrustee/management:{{ .Version }}-debug-arm64v8
- wiretrustee/management:{{ .Version }}-debug-arm
- wiretrustee/management:{{ .Version }}-debug-amd64 - wiretrustee/management:{{ .Version }}-debug-amd64
brews: brews:
@@ -221,7 +334,7 @@ uploads:
ids: ids:
- deb - deb
mode: archive mode: archive
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ .Arch }} target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ if .Arm }}armhf{{ else }}{{ .Arch }}{{ end }};deb.package=
username: dev@wiretrustee.com username: dev@wiretrustee.com
method: PUT method: PUT
- name: yum - name: yum

View File

@@ -31,6 +31,16 @@ It requires zero configuration effort leaving behind the hassle of opening ports
There is no centralized VPN server with Wiretrustee - your computers, devices, machines, and servers connect to each other directly over a fast encrypted tunnel. There is no centralized VPN server with Wiretrustee - your computers, devices, machines, and servers connect to each other directly over a fast encrypted tunnel.
**Wiretrustee automates Wireguard-based networks, offering a management layer with:**
* Centralized Peer IP management with a neat UI dashboard.
* Automatic Peer discovery and configuration.
* UDP hole punching to establish peer-to-peer connections behind NAT, firewall, and without a public static IP.
* Connection relay fallback in case a peer-to-peer connection is not possible.
* Multitenancy (coming soon).
* Client application SSO with MFA (coming soon).
* Access Controls (coming soon).
* Activity Monitoring (coming soon).
### Secure peer-to-peer VPN in minutes ### Secure peer-to-peer VPN in minutes
<p float="left" align="middle"> <p float="left" align="middle">
<img src="docs/media/peerA.gif" width="400"/> <img src="docs/media/peerA.gif" width="400"/>
@@ -45,22 +55,6 @@ Hosted demo version:
[UI Dashboard Repo](https://github.com/wiretrustee/wiretrustee-dashboard) [UI Dashboard Repo](https://github.com/wiretrustee/wiretrustee-dashboard)
### Why using Wiretrustee?
* Connect multiple devices to each other via a secure peer-to-peer Wireguard VPN tunnel. At home, the office, or anywhere else.
* No need to open ports and expose public IPs on the device, routers etc.
* Uses Kernel Wireguard module if available.
* Automatic network change detection. When a new peer joins the network others are notified and keys are exchanged automatically.
* Automatically reconnects in case of network failures or switches.
* Automatic NAT traversal.
* Relay server fallback in case of an unsuccessful peer-to-peer connection.
* Private key never leaves your device.
* Automatic IP address management.
* Intuitive UI Dashboard.
* Works on ARM devices (e.g. Raspberry Pi).
* Open-source (including Management Service)
### A bit on Wiretrustee internals ### A bit on Wiretrustee internals
* Wiretrustee features a Management Service that offers peer IP management and network updates distribution (e.g. when new peer joins the network). * Wiretrustee features a Management Service that offers peer IP management and network updates distribution (e.g. when new peer joins the network).
* Wiretrustee uses WebRTC ICE implemented in [pion/ice library](https://github.com/pion/ice) to discover connection candidates when establishing a peer-to-peer connection between devices. * Wiretrustee uses WebRTC ICE implemented in [pion/ice library](https://github.com/pion/ice) to discover connection candidates when establishing a peer-to-peer connection between devices.
@@ -151,6 +145,11 @@ For **Windows** systems, start powershell as administrator and:
```shell ```shell
wiretrustee up --setup-key <SETUP KEY> wiretrustee up --setup-key <SETUP KEY>
``` ```
For **Docker**, you can run with the following command:
```shell
docker run --network host --privileged --rm -d -e WT_SETUP_KEY=<SETUP KEY> -v wiretrustee-client:/etc/wiretrustee wiretrustee/wiretrustee:<TAG>
```
> TAG > 0.3.0 version
Alternatively, if you are hosting your own Management Service provide `--management-url` property pointing to your Management Service: Alternatively, if you are hosting your own Management Service provide `--management-url` property pointing to your Management Service:
```shell ```shell

4
client/Dockerfile Normal file
View File

@@ -0,0 +1,4 @@
FROM gcr.io/distroless/base:debug
ENV WT_LOG_FILE=console
ENTRYPOINT [ "/go/bin/wiretrustee","up"]
COPY wiretrustee /go/bin/wiretrustee

View File

@@ -18,12 +18,12 @@ import (
) )
var ( var (
setupKey string
loginCmd = &cobra.Command{ loginCmd = &cobra.Command{
Use: "login", Use: "login",
Short: "login to the Wiretrustee Management Service (first run)", Short: "login to the Wiretrustee Management Service (first run)",
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
SetFlagsFromEnvVars()
err := util.InitLog(logLevel, logFile) err := util.InitLog(logLevel, logFile)
if err != nil { if err != nil {
log.Errorf("failed initializing log %v", err) log.Errorf("failed initializing log %v", err)
@@ -151,6 +151,3 @@ func promptPeerSetupKey() (string, error) {
return "", s.Err() return "", s.Err()
} }
//func init() {
//}

View File

@@ -4,19 +4,15 @@ import (
"fmt" "fmt"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
"github.com/spf13/pflag"
"github.com/wiretrustee/wiretrustee/client/internal" "github.com/wiretrustee/wiretrustee/client/internal"
"os" "os"
"os/signal" "os/signal"
"runtime" "runtime"
"strings"
"syscall" "syscall"
) )
const (
// ExitSetupFailed defines exit code
ExitSetupFailed = 1
DefaultConfigPath = ""
)
var ( var (
configPath string configPath string
defaultConfigPath string defaultConfigPath string
@@ -24,6 +20,7 @@ var (
defaultLogFile string defaultLogFile string
logFile string logFile string
managementURL string managementURL string
setupKey string
rootCmd = &cobra.Command{ rootCmd = &cobra.Command{
Use: "wiretrustee", Use: "wiretrustee",
Short: "", Short: "",
@@ -75,3 +72,28 @@ func SetupCloseHandler() {
} }
}() }()
} }
// SetFlagsFromEnvVars reads and updates flag values from environment variables with prefix WT_
func SetFlagsFromEnvVars() {
flags := rootCmd.PersistentFlags()
flags.VisitAll(func(f *pflag.Flag) {
envVar := FlagNameToEnvVar(f.Name)
if value, present := os.LookupEnv(envVar); present {
err := flags.Set(f.Name, value)
if err != nil {
log.Infof("unable to configure flag %s using variable %s, err: %v", f.Name, envVar, err)
}
}
})
}
// FlagNameToEnvVar converts flag name to environment var name adding a prefix,
// replacing dashes and making all uppercase (e.g. setup-keys is converted to WT_SETUP_KEYS)
func FlagNameToEnvVar(f string) string {
prefix := "WT_"
parsed := strings.ReplaceAll(f, "-", "_")
upper := strings.ToUpper(parsed)
return prefix + upper
}

View File

@@ -34,6 +34,3 @@ var (
Short: "manages wiretrustee service", Short: "manages wiretrustee service",
} }
) )
func init() {
}

View File

@@ -1,7 +1,6 @@
package cmd package cmd
import ( import (
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service" "github.com/kardianos/service"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@@ -9,40 +8,21 @@ import (
"time" "time"
) )
func (p *program) Start(s service.Service) error { func (p *program) Start(service.Service) error {
var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
// Start should not block. Do the actual work async. // Start should not block. Do the actual work async.
log.Info("starting service") //nolint log.Info("starting service") //nolint
go func() { go func() {
operation := func() error { err := runClient()
err := runClient()
if err != nil {
log.Warnf("retrying Wiretrustee client app due to error: %v", err)
return err
}
return nil
}
err := backoff.Retry(operation, backOff)
if err != nil { if err != nil {
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err) log.Errorf("stopped Wiretrustee client app due to error: %v", err)
return return
} }
}() }()
return nil return nil
} }
func (p *program) Stop(s service.Service) error { func (p *program) Stop(service.Service) error {
go func() { go func() {
stopCh <- 1 stopCh <- 1
}() }()
@@ -61,6 +41,7 @@ var (
Use: "run", Use: "run",
Short: "runs wiretrustee as service", Short: "runs wiretrustee as service",
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
SetFlagsFromEnvVars()
err := util.InitLog(logLevel, logFile) err := util.InitLog(logLevel, logFile)
if err != nil { if err != nil {
@@ -95,6 +76,8 @@ var (
Use: "start", Use: "start",
Short: "starts wiretrustee service", Short: "starts wiretrustee service",
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
SetFlagsFromEnvVars()
err := util.InitLog(logLevel, logFile) err := util.InitLog(logLevel, logFile)
if err != nil { if err != nil {
log.Errorf("failed initializing log %v", err) log.Errorf("failed initializing log %v", err)
@@ -121,6 +104,8 @@ var (
Use: "stop", Use: "stop",
Short: "stops wiretrustee service", Short: "stops wiretrustee service",
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
SetFlagsFromEnvVars()
err := util.InitLog(logLevel, logFile) err := util.InitLog(logLevel, logFile)
if err != nil { if err != nil {
log.Errorf("failed initializing log %v", err) log.Errorf("failed initializing log %v", err)
@@ -145,6 +130,8 @@ var (
Use: "restart", Use: "restart",
Short: "restarts wiretrustee service", Short: "restarts wiretrustee service",
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
SetFlagsFromEnvVars()
err := util.InitLog(logLevel, logFile) err := util.InitLog(logLevel, logFile)
if err != nil { if err != nil {
log.Errorf("failed initializing log %v", err) log.Errorf("failed initializing log %v", err)
@@ -163,6 +150,3 @@ var (
}, },
} }
) )
func init() {
}

View File

@@ -10,6 +10,7 @@ var (
Use: "install", Use: "install",
Short: "installs wiretrustee service", Short: "installs wiretrustee service",
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
SetFlagsFromEnvVars()
svcConfig := newSVCConfig() svcConfig := newSVCConfig()
@@ -49,6 +50,7 @@ var (
Use: "uninstall", Use: "uninstall",
Short: "uninstalls wiretrustee service from system", Short: "uninstalls wiretrustee service from system",
Run: func(cmd *cobra.Command, args []string) { Run: func(cmd *cobra.Command, args []string) {
SetFlagsFromEnvVars()
s, err := newSVC(&program{}, newSVCConfig()) s, err := newSVC(&program{}, newSVCConfig())
if err != nil { if err != nil {
@@ -65,6 +67,3 @@ var (
}, },
} }
) )
func init() {
}

View File

@@ -2,6 +2,7 @@ package cmd
import ( import (
"context" "context"
"github.com/cenkalti/backoff/v4"
"github.com/kardianos/service" "github.com/kardianos/service"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/spf13/cobra" "github.com/spf13/cobra"
@@ -12,6 +13,7 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"time"
) )
var ( var (
@@ -19,7 +21,7 @@ var (
Use: "up", Use: "up",
Short: "install, login and start wiretrustee client", Short: "install, login and start wiretrustee client",
RunE: func(cmd *cobra.Command, args []string) error { RunE: func(cmd *cobra.Command, args []string) error {
SetFlagsFromEnvVars()
err := loginCmd.RunE(cmd, args) err := loginCmd.RunE(cmd, args)
if err != nil { if err != nil {
return err return err
@@ -117,86 +119,107 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK
} }
func runClient() error { func runClient() error {
config, err := internal.ReadConfig(managementURL, configPath) var backOff = &backoff.ExponentialBackOff{
InitialInterval: time.Second,
RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: 10 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop the client after 3 days trying (must be a huge problem, e.g permission denied)
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}
operation := func() error {
config, err := internal.ReadConfig(managementURL, configPath)
if err != nil {
log.Errorf("failed reading config %s %v", configPath, err)
return err
}
//validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mgmTlsEnabled := false
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
if err != nil {
log.Warn(err)
return err
}
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
if err != nil {
log.Error(err)
return err
}
peerConfig := loginResp.GetPeerConfig()
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
if err != nil {
log.Error(err)
return err
}
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
err = engine.Start()
if err != nil {
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
return err
}
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
select {
case <-stopCh:
case <-ctx.Done():
}
backOff.Reset()
err = mgmClient.Close()
if err != nil {
log.Errorf("failed closing Management Service client %v", err)
return err
}
err = signalClient.Close()
if err != nil {
log.Errorf("failed closing Signal Service client %v", err)
return err
}
err = engine.Stop()
if err != nil {
log.Errorf("failed stopping engine %v", err)
return err
}
go func() {
cleanupCh <- struct{}{}
}()
log.Info("stopped Wiretrustee client")
return ctx.Err()
}
err := backoff.Retry(operation, backOff)
if err != nil { if err != nil {
log.Errorf("failed reading config %s %v", configPath, err) log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
return err return err
} }
return nil
//validate our peer's Wireguard PRIVATE key
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
if err != nil {
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
return err
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mgmTlsEnabled := false
if config.ManagementURL.Scheme == "https" {
mgmTlsEnabled = true
}
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
if err != nil {
log.Warn(err)
return err
}
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
if err != nil {
log.Error(err)
return err
}
peerConfig := loginResp.GetPeerConfig()
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
if err != nil {
log.Error(err)
return err
}
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
err = engine.Start()
if err != nil {
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
return err
}
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
select {
case <-stopCh:
case <-ctx.Done():
}
err = mgmClient.Close()
if err != nil {
log.Errorf("failed closing Management Service client %v", err)
return err
}
err = signalClient.Close()
if err != nil {
log.Errorf("failed closing Signal Service client %v", err)
return err
}
err = engine.Stop()
if err != nil {
log.Errorf("failed stopping engine %v", err)
return err
}
go func() {
cleanupCh <- struct{}{}
}()
log.Info("stopped Wiretrustee client")
return ctx.Err()
} }

View File

@@ -106,6 +106,7 @@ SectionEnd
Section Uninstall Section Uninstall
${INSTALL_TYPE} ${INSTALL_TYPE}
Exec '"$INSTDIR\${MAIN_APP_EXE}" service stop'
Exec '"$INSTDIR\${MAIN_APP_EXE}" service uninstall' Exec '"$INSTDIR\${MAIN_APP_EXE}" service uninstall'
# wait the service uninstall take unblock the executable # wait the service uninstall take unblock the executable
Sleep 3000 Sleep 3000

View File

@@ -128,8 +128,6 @@ func (conn *Connection) Open(timeout time.Duration) error {
a, err := ice.NewAgent(&ice.AgentConfig{ a, err := ice.NewAgent(&ice.AgentConfig{
// MulticastDNSMode: ice.MulticastDNSModeQueryAndGather, // MulticastDNSMode: ice.MulticastDNSModeQueryAndGather,
NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4}, NetworkTypes: []ice.NetworkType{ice.NetworkTypeUDP4},
PortMin: 57830,
PortMax: 57830,
Urls: conn.Config.StunTurnURLS, Urls: conn.Config.StunTurnURLS,
CandidateTypes: []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay}, CandidateTypes: []ice.CandidateType{ice.CandidateTypeHost, ice.CandidateTypeServerReflexive, ice.CandidateTypeRelay},
InterfaceFilter: func(s string) bool { InterfaceFilter: func(s string) bool {
@@ -140,12 +138,18 @@ func (conn *Connection) Open(timeout time.Duration) error {
return !ok return !ok
}, },
}) })
conn.agent = a
if err != nil { if err != nil {
return err return err
} }
conn.agent = a
defer func() {
err := conn.agent.Close()
if err != nil {
return
}
}()
err = conn.listenOnLocalCandidates() err = conn.listenOnLocalCandidates()
if err != nil { if err != nil {
return err return err

View File

@@ -4,7 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
ice "github.com/pion/ice/v2" "github.com/pion/ice/v2"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/iface" "github.com/wiretrustee/wiretrustee/iface"
mgm "github.com/wiretrustee/wiretrustee/management/client" mgm "github.com/wiretrustee/wiretrustee/management/client"
@@ -142,12 +142,17 @@ func (e *Engine) initializePeer(peer Peer) {
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier, Multiplier: backoff.DefaultMultiplier,
MaxInterval: 5 * time.Second, MaxInterval: 5 * time.Second,
MaxElapsedTime: time.Duration(0), //never stop MaxElapsedTime: 0, //never stop
Stop: backoff.Stop, Stop: backoff.Stop,
Clock: backoff.SystemClock, Clock: backoff.SystemClock,
}, e.ctx) }, e.ctx)
operation := func() error { operation := func() error {
if e.signal.GetStatus() != signal.StreamConnected {
return fmt.Errorf("not opening connection to peer because Signal is unavailable")
}
_, err := e.openPeerConnection(e.wgPort, e.config.WgPrivateKey, peer) _, err := e.openPeerConnection(e.wgPort, e.config.WgPrivateKey, peer)
e.peerMux.Lock() e.peerMux.Lock()
defer e.peerMux.Unlock() defer e.peerMux.Unlock()
@@ -157,7 +162,6 @@ func (e *Engine) initializePeer(peer Peer) {
} }
if err != nil { if err != nil {
log.Warnln(err)
log.Debugf("retrying connection because of error: %s", err.Error()) log.Debugf("retrying connection because of error: %s", err.Error())
return err return err
} }
@@ -332,6 +336,8 @@ func (e *Engine) receiveManagementEvents() {
return nil return nil
}) })
if err != nil { if err != nil {
// happens if management is unavailable for a long time.
// We want to cancel the operation of the whole client
e.cancel() e.cancel()
return return
} }
@@ -414,68 +420,77 @@ func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {
// receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers // receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers
func (e *Engine) receiveSignalEvents() { func (e *Engine) receiveSignalEvents() {
// connect to a stream of messages coming from the signal server
e.signal.Receive(func(msg *sProto.Message) error {
e.syncMsgMux.Lock() go func() {
defer e.syncMsgMux.Unlock() // connect to a stream of messages coming from the signal server
err := e.signal.Receive(func(msg *sProto.Message) error {
conn := e.conns[msg.Key] e.syncMsgMux.Lock()
if conn == nil { defer e.syncMsgMux.Unlock()
return fmt.Errorf("wrongly addressed message %s", msg.Key)
}
if conn.Config.RemoteWgKey.String() != msg.Key { conn := e.conns[msg.Key]
return fmt.Errorf("unknown peer %s", msg.Key) if conn == nil {
} return fmt.Errorf("wrongly addressed message %s", msg.Key)
switch msg.GetBody().Type {
case sProto.Body_OFFER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
} }
err = conn.OnOffer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})
if err != nil { if conn.Config.RemoteWgKey.String() != msg.Key {
return err return fmt.Errorf("unknown peer %s", msg.Key)
}
switch msg.GetBody().Type {
case sProto.Body_OFFER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
}
err = conn.OnOffer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})
if err != nil {
return err
}
return nil
case sProto.Body_ANSWER:
remoteCred, err := signal.UnMarshalCredential(msg)
if err != nil {
return err
}
err = conn.OnAnswer(IceCredentials{
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})
if err != nil {
return err
}
case sProto.Body_CANDIDATE:
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
if err != nil {
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
return err
}
err = conn.OnRemoteCandidate(candidate)
if err != nil {
log.Errorf("error handling CANDIATE from %s", msg.Key)
return err
}
} }
return nil return nil
case sProto.Body_ANSWER: })
remoteCred, err := signal.UnMarshalCredential(msg) if err != nil {
if err != nil { // happens if signal is unavailable for a long time.
return err // We want to cancel the operation of the whole client
} e.cancel()
err = conn.OnAnswer(IceCredentials{ return
uFrag: remoteCred.UFrag,
pwd: remoteCred.Pwd,
})
if err != nil {
return err
}
case sProto.Body_CANDIDATE:
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
if err != nil {
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
return err
}
err = conn.OnRemoteCandidate(candidate)
if err != nil {
log.Errorf("error handling CANDIATE from %s", msg.Key)
return err
}
} }
}()
return nil e.signal.WaitStreamConnected()
})
e.signal.WaitConnected()
} }

1
go.mod
View File

@@ -15,6 +15,7 @@ require (
github.com/rs/cors v1.8.0 github.com/rs/cors v1.8.0
github.com/sirupsen/logrus v1.7.0 github.com/sirupsen/logrus v1.7.0
github.com/spf13/cobra v1.1.3 github.com/spf13/cobra v1.1.3
github.com/spf13/pflag v1.0.5
github.com/vishvananda/netlink v1.1.0 github.com/vishvananda/netlink v1.1.0
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c

View File

@@ -3,6 +3,7 @@ package client
import ( import (
"context" "context"
"crypto/tls" "crypto/tls"
"fmt"
"github.com/cenkalti/backoff/v4" "github.com/cenkalti/backoff/v4"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/client/system" "github.com/wiretrustee/wiretrustee/client/system"
@@ -10,6 +11,7 @@ import (
"github.com/wiretrustee/wiretrustee/management/proto" "github.com/wiretrustee/wiretrustee/management/proto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"io" "io"
@@ -32,7 +34,7 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
} }
mgmCtx, cancel := context.WithTimeout(ctx, 3*time.Second) mgmCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
conn, err := grpc.DialContext( conn, err := grpc.DialContext(
mgmCtx, mgmCtx,
@@ -40,8 +42,8 @@ func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsE
transportOption, transportOption,
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 3 * time.Second, Time: 15 * time.Second,
Timeout: 2 * time.Second, Timeout: 10 * time.Second,
})) }))
if err != nil { if err != nil {
@@ -70,13 +72,19 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond, InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier, Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second, MaxInterval: 10 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop, Stop: backoff.Stop,
Clock: backoff.SystemClock, Clock: backoff.SystemClock,
}, ctx) }, ctx)
} }
// ready indicates whether the client is okay and ready to be used
// for now it just checks whether gRPC connection to the service is ready
func (c *Client) ready() bool {
return c.conn.GetState() == connectivity.Ready
}
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages // Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
// Blocking request. The result will be sent via msgHandler callback function // Blocking request. The result will be sent via msgHandler callback function
func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error { func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
@@ -85,6 +93,12 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
operation := func() error { operation := func() error {
log.Debugf("management connection state %v", c.conn.GetState())
if !c.ready() {
return fmt.Errorf("no connection to management")
}
// todo we already have it since we did the Login, maybe cache it locally? // todo we already have it since we did the Login, maybe cache it locally?
serverPubKey, err := c.GetServerPublicKey() serverPubKey, err := c.GetServerPublicKey()
if err != nil { if err != nil {
@@ -98,17 +112,15 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
return err return err
} }
log.Infof("connected to the Management Service Stream") log.Infof("connected to the Management Service stream")
// blocking until error // blocking until error
err = c.receiveEvents(stream, *serverPubKey, msgHandler) err = c.receiveEvents(stream, *serverPubKey, msgHandler)
if err != nil { if err != nil {
/*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied { backOff.Reset()
//todo handle differently??
}*/
return err return err
} }
backOff.Reset()
return nil return nil
} }
@@ -141,7 +153,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
for { for {
update, err := stream.Recv() update, err := stream.Recv()
if err == io.EOF { if err == io.EOF {
log.Errorf("managment stream was closed: %s", err) log.Errorf("Management stream has been closed by server: %s", err)
return err return err
} }
if err != nil { if err != nil {
@@ -167,6 +179,10 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
// GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server) // GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server)
func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) { func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
if !c.ready() {
return nil, fmt.Errorf("no connection to management")
}
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) //todo make a general setting mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) //todo make a general setting
defer cancel() defer cancel()
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{}) resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
@@ -183,6 +199,9 @@ func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
} }
func (c *Client) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) { func (c *Client) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) {
if !c.ready() {
return nil, fmt.Errorf("no connection to management")
}
loginReq, err := encryption.EncryptMessage(serverKey, c.key, req) loginReq, err := encryption.EncryptMessage(serverKey, c.key, req)
if err != nil { if err != nil {
log.Errorf("failed to encrypt message: %s", err) log.Errorf("failed to encrypt message: %s", err)

View File

@@ -3,11 +3,11 @@ package server
import ( import (
"github.com/google/uuid" "github.com/google/uuid"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/util"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"net" "net"
"sync" "sync"
"time"
) )
type AccountManager struct { type AccountManager struct {
@@ -35,16 +35,21 @@ func NewManager(store Store, peersUpdateManager *PeersUpdateManager) *AccountMan
} }
//AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account //AddSetupKey generates a new setup key with a given name and type, and adds it to the specified account
func (am *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn time.Duration) (*SetupKey, error) { func (am *AccountManager) AddSetupKey(accountId string, keyName string, keyType SetupKeyType, expiresIn *util.Duration) (*SetupKey, error) {
am.mux.Lock() am.mux.Lock()
defer am.mux.Unlock() defer am.mux.Unlock()
keyDuration := DefaultSetupKeyDuration
if expiresIn != nil {
keyDuration = expiresIn.Duration
}
account, err := am.Store.GetAccount(accountId) account, err := am.Store.GetAccount(accountId)
if err != nil { if err != nil {
return nil, status.Errorf(codes.NotFound, "account not found") return nil, status.Errorf(codes.NotFound, "account not found")
} }
setupKey := GenerateSetupKey(keyName, keyType, expiresIn) setupKey := GenerateSetupKey(keyName, keyType, keyDuration)
account.SetupKeys[setupKey.Key] = setupKey account.SetupKeys[setupKey.Key] = setupKey
err = am.Store.SaveAccount(account) err = am.Store.SaveAccount(account)

View File

@@ -5,6 +5,7 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/wiretrustee/wiretrustee/management/server" "github.com/wiretrustee/wiretrustee/management/server"
"github.com/wiretrustee/wiretrustee/util"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"net/http" "net/http"
@@ -34,7 +35,7 @@ type SetupKeyResponse struct {
type SetupKeyRequest struct { type SetupKeyRequest struct {
Name string Name string
Type server.SetupKeyType Type server.SetupKeyType
ExpiresIn Duration ExpiresIn *util.Duration
Revoked bool Revoked bool
} }
@@ -102,7 +103,7 @@ func (h *SetupKeys) createKey(accountId string, w http.ResponseWriter, r *http.R
return return
} }
setupKey, err := h.accountManager.AddSetupKey(accountId, req.Name, req.Type, req.ExpiresIn.Duration) setupKey, err := h.accountManager.AddSetupKey(accountId, req.Name, req.Type, req.ExpiresIn)
if err != nil { if err != nil {
errStatus, ok := status.FromError(err) errStatus, ok := status.FromError(err)
if ok && errStatus.Code() == codes.NotFound { if ok && errStatus.Code() == codes.NotFound {

View File

@@ -11,6 +11,7 @@ import (
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
@@ -23,6 +24,12 @@ import (
// A set of tools to exchange connection details (Wireguard endpoints) with the remote peer. // A set of tools to exchange connection details (Wireguard endpoints) with the remote peer.
// Status is the status of the client
type Status string
const StreamConnected Status = "Connected"
const StreamDisconnected Status = "Disconnected"
// Client Wraps the Signal Exchange Service gRpc client // Client Wraps the Signal Exchange Service gRpc client
type Client struct { type Client struct {
key wgtypes.Key key wgtypes.Key
@@ -30,8 +37,15 @@ type Client struct {
signalConn *grpc.ClientConn signalConn *grpc.ClientConn
ctx context.Context ctx context.Context
stream proto.SignalExchange_ConnectStreamClient stream proto.SignalExchange_ConnectStreamClient
//waiting group to notify once stream is connected // connectedCh used to notify goroutines waiting for the connection to the Signal stream
connWg *sync.WaitGroup //todo use a channel instead?? connectedCh chan struct{}
mux sync.Mutex
// StreamConnected indicates whether this client is StreamConnected to the Signal stream
status Status
}
func (c *Client) GetStatus() Status {
return c.status
} }
// Close Closes underlying connections to the Signal Exchange // Close Closes underlying connections to the Signal Exchange
@@ -48,7 +62,7 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{})) transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
} }
sigCtx, cancel := context.WithTimeout(ctx, 3*time.Second) sigCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel() defer cancel()
conn, err := grpc.DialContext( conn, err := grpc.DialContext(
sigCtx, sigCtx,
@@ -56,8 +70,8 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
transportOption, transportOption,
grpc.WithBlock(), grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{ grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 3 * time.Second, Time: 15 * time.Second,
Timeout: 2 * time.Second, Timeout: 10 * time.Second,
})) }))
if err != nil { if err != nil {
@@ -65,13 +79,13 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
return nil, err return nil, err
} }
var wg sync.WaitGroup
return &Client{ return &Client{
realClient: proto.NewSignalExchangeClient(conn), realClient: proto.NewSignalExchangeClient(conn),
ctx: ctx, ctx: ctx,
signalConn: conn, signalConn: conn,
key: key, key: key,
connWg: &wg, mux: sync.Mutex{},
status: StreamDisconnected,
}, nil }, nil
} }
@@ -81,8 +95,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
InitialInterval: 800 * time.Millisecond, InitialInterval: 800 * time.Millisecond,
RandomizationFactor: backoff.DefaultRandomizationFactor, RandomizationFactor: backoff.DefaultRandomizationFactor,
Multiplier: backoff.DefaultMultiplier, Multiplier: backoff.DefaultMultiplier,
MaxInterval: 30 * time.Second, MaxInterval: 10 * time.Second,
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
Stop: backoff.Stop, Stop: backoff.Stop,
Clock: backoff.SystemClock, Clock: backoff.SystemClock,
}, ctx) }, ctx)
@@ -91,36 +105,79 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
// Receive Connects to the Signal Exchange message stream and starts receiving messages. // Receive Connects to the Signal Exchange message stream and starts receiving messages.
// The messages will be handled by msgHandler function provided. // The messages will be handled by msgHandler function provided.
// This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart) // This function is blocking and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
// The key is the identifier of our Peer (could be Wireguard public key) // The connection retry logic will try to reconnect for 30 min and if wasn't successful will propagate the error to the function caller.
func (c *Client) Receive(msgHandler func(msg *proto.Message) error) { func (c *Client) Receive(msgHandler func(msg *proto.Message) error) error {
c.connWg.Add(1)
go func() {
var backOff = defaultBackoff(c.ctx) var backOff = defaultBackoff(c.ctx)
operation := func() error { operation := func() error {
err := c.connect(c.key.PublicKey().String(), msgHandler) c.notifyStreamDisconnected()
if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
c.connWg.Add(1)
return err
}
backOff.Reset() log.Debugf("signal connection state %v", c.signalConn.GetState())
return nil if !c.ready() {
return fmt.Errorf("no connection to signal")
} }
err := backoff.Retry(operation, backOff) // connect to Signal stream identifying ourselves with a public Wireguard key
// todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management)
stream, err := c.connect(c.key.PublicKey().String())
if err != nil { if err != nil {
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err) log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
return return err
} }
}()
c.notifyStreamConnected()
log.Infof("connected to the Signal Service stream")
// start receiving messages from the Signal stream (from other peers through signal)
err = c.receive(stream, msgHandler)
if err != nil {
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
backOff.Reset()
return err
}
return nil
}
err := backoff.Retry(operation, backOff)
if err != nil {
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err)
return err
}
return nil
}
func (c *Client) notifyStreamDisconnected() {
c.mux.Lock()
defer c.mux.Unlock()
c.status = StreamDisconnected
} }
func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error { func (c *Client) notifyStreamConnected() {
c.mux.Lock()
defer c.mux.Unlock()
c.status = StreamConnected
if c.connectedCh != nil {
// there are goroutines waiting on this channel -> release them
close(c.connectedCh)
c.connectedCh = nil
}
}
func (c *Client) getStreamStatusChan() <-chan struct{} {
c.mux.Lock()
defer c.mux.Unlock()
if c.connectedCh == nil {
c.connectedCh = make(chan struct{})
}
return c.connectedCh
}
func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
c.stream = nil c.stream = nil
// add key fingerprint to the request header to be identified on the server side // add key fingerprint to the request header to be identified on the server side
@@ -131,35 +188,48 @@ func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error)
c.stream = stream c.stream = stream
if err != nil { if err != nil {
return err return nil, err
} }
// blocks // blocks
header, err := c.stream.Header() header, err := c.stream.Header()
if err != nil { if err != nil {
return err return nil, err
} }
registered := header.Get(proto.HeaderRegistered) registered := header.Get(proto.HeaderRegistered)
if len(registered) == 0 { if len(registered) == 0 {
return fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams") return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
} }
//connection established we are good to use the stream
c.connWg.Done()
log.Infof("connected to the Signal Exchange Stream") return stream, nil
return c.receive(stream, msgHandler)
} }
// WaitConnected waits until the client is connected to the message stream // ready indicates whether the client is okay and ready to be used
func (c *Client) WaitConnected() { // for now it just checks whether gRPC connection to the service is in state Ready
c.connWg.Wait() func (c *Client) ready() bool {
return c.signalConn.GetState() == connectivity.Ready
}
// WaitStreamConnected waits until the client is connected to the Signal stream
func (c *Client) WaitStreamConnected() {
if c.status == StreamConnected {
return
}
ch := c.getStreamStatusChan()
select {
case <-c.ctx.Done():
case <-ch:
}
} }
// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server // SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server
// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange // The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange
// Client.connWg can be used to wait // Client.connWg can be used to wait
func (c *Client) SendToStream(msg *proto.EncryptedMessage) error { func (c *Client) SendToStream(msg *proto.EncryptedMessage) error {
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
if c.stream == nil { if c.stream == nil {
return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages") return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages")
} }
@@ -216,13 +286,17 @@ func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, er
// Send sends a message to the remote Peer through the Signal Exchange. // Send sends a message to the remote Peer through the Signal Exchange.
func (c *Client) Send(msg *proto.Message) error { func (c *Client) Send(msg *proto.Message) error {
if !c.ready() {
return fmt.Errorf("no connection to signal")
}
encryptedMessage, err := c.encryptMessage(msg) encryptedMessage, err := c.encryptMessage(msg)
if err != nil { if err != nil {
return err return err
} }
_, err = c.realClient.Send(context.TODO(), encryptedMessage) _, err = c.realClient.Send(context.TODO(), encryptedMessage)
if err != nil { if err != nil {
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err) //log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
return err return err
} }
@@ -239,10 +313,10 @@ func (c *Client) receive(stream proto.SignalExchange_ConnectStreamClient,
log.Warnf("stream canceled (usually indicates shutdown)") log.Warnf("stream canceled (usually indicates shutdown)")
return err return err
} else if s.Code() == codes.Unavailable { } else if s.Code() == codes.Unavailable {
log.Warnf("server has been stopped") log.Warnf("Signal Service is unavailable")
return err return err
} else if err == io.EOF { } else if err == io.EOF {
log.Warnf("stream closed by server") log.Warnf("Signal Service stream closed by server")
return err return err
} else if err != nil { } else if err != nil {
return err return err

View File

@@ -48,30 +48,42 @@ var _ = Describe("Client", func() {
// connect PeerA to Signal // connect PeerA to Signal
keyA, _ := wgtypes.GenerateKey() keyA, _ := wgtypes.GenerateKey()
clientA := createSignalClient(addr, keyA) clientA := createSignalClient(addr, keyA)
clientA.Receive(func(msg *sigProto.Message) error { go func() {
receivedOnA = msg.GetBody().GetPayload() err := clientA.Receive(func(msg *sigProto.Message) error {
msgReceived.Done() receivedOnA = msg.GetBody().GetPayload()
return nil msgReceived.Done()
}) return nil
clientA.WaitConnected() })
if err != nil {
return
}
}()
clientA.WaitStreamConnected()
// connect PeerB to Signal // connect PeerB to Signal
keyB, _ := wgtypes.GenerateKey() keyB, _ := wgtypes.GenerateKey()
clientB := createSignalClient(addr, keyB) clientB := createSignalClient(addr, keyB)
clientB.Receive(func(msg *sigProto.Message) error {
receivedOnB = msg.GetBody().GetPayload() go func() {
err := clientB.Send(&sigProto.Message{ err := clientB.Receive(func(msg *sigProto.Message) error {
Key: keyB.PublicKey().String(), receivedOnB = msg.GetBody().GetPayload()
RemoteKey: keyA.PublicKey().String(), err := clientB.Send(&sigProto.Message{
Body: &sigProto.Body{Payload: "pong"}, Key: keyB.PublicKey().String(),
RemoteKey: keyA.PublicKey().String(),
Body: &sigProto.Body{Payload: "pong"},
})
if err != nil {
Fail("failed sending a message to PeerA")
}
msgReceived.Done()
return nil
}) })
if err != nil { if err != nil {
Fail("failed sending a message to PeerA") return
} }
msgReceived.Done() }()
return nil
}) clientB.WaitStreamConnected()
clientB.WaitConnected()
// PeerA initiates ping-pong // PeerA initiates ping-pong
err := clientA.Send(&sigProto.Message{ err := clientA.Send(&sigProto.Message{
@@ -100,11 +112,15 @@ var _ = Describe("Client", func() {
key, _ := wgtypes.GenerateKey() key, _ := wgtypes.GenerateKey()
client := createSignalClient(addr, key) client := createSignalClient(addr, key)
client.Receive(func(msg *sigProto.Message) error { go func() {
return nil err := client.Receive(func(msg *sigProto.Message) error {
}) return nil
client.WaitConnected() })
if err != nil {
return
}
}()
client.WaitStreamConnected()
Expect(client).NotTo(BeNil()) Expect(client).NotTo(BeNil())
}) })
}) })