mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-17 07:46:38 +00:00
Compare commits
22 Commits
v0.2.0-bet
...
periodic-p
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ab579f5de0 | ||
|
|
09eeb71af2 | ||
|
|
828410b34c | ||
|
|
4d2b194570 | ||
|
|
a67b9a16af | ||
|
|
6ae27c9a9b | ||
|
|
ff6e369a21 | ||
|
|
5c3b5e7f40 | ||
|
|
8c75ef8bef | ||
|
|
fdc11fff47 | ||
|
|
3dca2d6953 | ||
|
|
6b7d4cf644 | ||
|
|
edd4125742 | ||
|
|
7bf9793f85 | ||
|
|
fcbf980588 | ||
|
|
d08e5efbce | ||
|
|
95ef8547f3 | ||
|
|
ed1e4dfc51 | ||
|
|
4d34fb4e64 | ||
|
|
1fb8b74cd2 | ||
|
|
d040cfed7e | ||
|
|
2c729fe5cc |
4
.github/workflows/golang-test.yml
vendored
4
.github/workflows/golang-test.yml
vendored
@@ -8,7 +8,7 @@ jobs:
|
||||
test:
|
||||
strategy:
|
||||
matrix:
|
||||
go-version: [1.16.x]
|
||||
go-version: [1.17.x]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Install Go
|
||||
@@ -24,7 +24,7 @@ jobs:
|
||||
strategy:
|
||||
matrix:
|
||||
os: [ windows, linux, darwin ]
|
||||
go-version: [1.16.x]
|
||||
go-version: [1.17.x]
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
|
||||
2
.github/workflows/release.yml
vendored
2
.github/workflows/release.yml
vendored
@@ -18,7 +18,7 @@ jobs:
|
||||
name: Set up Go
|
||||
uses: actions/setup-go@v2
|
||||
with:
|
||||
go-version: 1.16
|
||||
go-version: 1.17
|
||||
-
|
||||
name: Cache Go modules
|
||||
uses: actions/cache@v1
|
||||
|
||||
117
.goreleaser.yaml
117
.goreleaser.yaml
@@ -37,6 +37,7 @@ builds:
|
||||
goarch:
|
||||
- amd64
|
||||
- arm64
|
||||
- arm
|
||||
ldflags:
|
||||
- -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.CommitDate}} -X main.builtBy=goreleaser
|
||||
mod_timestamp: '{{ .CommitTimestamp }}'
|
||||
@@ -50,6 +51,7 @@ builds:
|
||||
goarch:
|
||||
- amd64
|
||||
- arm64
|
||||
- arm
|
||||
ldflags:
|
||||
- -s -w -X main.version={{.Version}} -X main.commit={{.Commit}} -X main.date={{.CommitDate}} -X main.builtBy=goreleaser
|
||||
mod_timestamp: '{{ .CommitTimestamp }}'
|
||||
@@ -83,6 +85,52 @@ nfpms:
|
||||
postinstall: "release_files/post_install.sh"
|
||||
preremove: "release_files/pre_remove.sh"
|
||||
dockers:
|
||||
- image_templates:
|
||||
- wiretrustee/wiretrustee:{{ .Version }}-amd64
|
||||
ids:
|
||||
- wiretrustee
|
||||
goarch: amd64
|
||||
use: buildx
|
||||
dockerfile: client/Dockerfile
|
||||
build_flag_templates:
|
||||
- "--platform=linux/amd64"
|
||||
- "--label=org.opencontainers.image.created={{.Date}}"
|
||||
- "--label=org.opencontainers.image.title={{.ProjectName}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=maintainer=wiretrustee@wiretrustee.com"
|
||||
- image_templates:
|
||||
- wiretrustee/wiretrustee:{{ .Version }}-arm64v8
|
||||
ids:
|
||||
- wiretrustee
|
||||
goarch: arm64
|
||||
use: buildx
|
||||
dockerfile: client/Dockerfile
|
||||
build_flag_templates:
|
||||
- "--platform=linux/arm64"
|
||||
- "--label=org.opencontainers.image.created={{.Date}}"
|
||||
- "--label=org.opencontainers.image.title={{.ProjectName}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=maintainer=wiretrustee@wiretrustee.com"
|
||||
- image_templates:
|
||||
- wiretrustee/wiretrustee:{{ .Version }}-arm
|
||||
ids:
|
||||
- wiretrustee
|
||||
goarch: arm
|
||||
goarm: 6
|
||||
use: buildx
|
||||
dockerfile: client/Dockerfile
|
||||
build_flag_templates:
|
||||
- "--platform=linux/arm"
|
||||
- "--label=org.opencontainers.image.created={{.Date}}"
|
||||
- "--label=org.opencontainers.image.title={{.ProjectName}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=maintainer=wiretrustee@wiretrustee.com"
|
||||
- image_templates:
|
||||
- wiretrustee/signal:{{ .Version }}-amd64
|
||||
ids:
|
||||
@@ -113,6 +161,22 @@ dockers:
|
||||
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=maintainer=wiretrustee@wiretrustee.com"
|
||||
- image_templates:
|
||||
- wiretrustee/signal:{{ .Version }}-arm
|
||||
ids:
|
||||
- wiretrustee-signal
|
||||
goarch: arm
|
||||
goarm: 6
|
||||
use: buildx
|
||||
dockerfile: signal/Dockerfile
|
||||
build_flag_templates:
|
||||
- "--platform=linux/arm"
|
||||
- "--label=org.opencontainers.image.created={{.Date}}"
|
||||
- "--label=org.opencontainers.image.title={{.ProjectName}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=maintainer=wiretrustee@wiretrustee.com"
|
||||
- image_templates:
|
||||
- wiretrustee/management:{{ .Version }}-amd64
|
||||
ids:
|
||||
@@ -143,6 +207,22 @@ dockers:
|
||||
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=maintainer=wiretrustee@wiretrustee.com"
|
||||
- image_templates:
|
||||
- wiretrustee/management:{{ .Version }}-arm
|
||||
ids:
|
||||
- wiretrustee-mgmt
|
||||
goarch: arm
|
||||
goarm: 6
|
||||
use: buildx
|
||||
dockerfile: management/Dockerfile
|
||||
build_flag_templates:
|
||||
- "--platform=linux/arm"
|
||||
- "--label=org.opencontainers.image.created={{.Date}}"
|
||||
- "--label=org.opencontainers.image.title={{.ProjectName}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=maintainer=wiretrustee@wiretrustee.com"
|
||||
- image_templates:
|
||||
- wiretrustee/management:{{ .Version }}-debug-amd64
|
||||
ids:
|
||||
@@ -174,30 +254,63 @@ dockers:
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=maintainer=wiretrustee@wiretrustee.com"
|
||||
|
||||
- image_templates:
|
||||
- wiretrustee/management:{{ .Version }}-debug-arm
|
||||
ids:
|
||||
- wiretrustee-mgmt
|
||||
goarch: arm
|
||||
goarm: 6
|
||||
use: buildx
|
||||
dockerfile: management/Dockerfile.debug
|
||||
build_flag_templates:
|
||||
- "--platform=linux/arm"
|
||||
- "--label=org.opencontainers.image.created={{.Date}}"
|
||||
- "--label=org.opencontainers.image.title={{.ProjectName}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
|
||||
- "--label=org.opencontainers.image.version={{.Version}}"
|
||||
- "--label=maintainer=wiretrustee@wiretrustee.com"
|
||||
docker_manifests:
|
||||
- name_template: wiretrustee/wiretrustee:{{ .Version }}
|
||||
image_templates:
|
||||
- wiretrustee/wiretrustee:{{ .Version }}-arm64v8
|
||||
- wiretrustee/wiretrustee:{{ .Version }}-arm
|
||||
- wiretrustee/wiretrustee:{{ .Version }}-amd64
|
||||
|
||||
- name_template: wiretrustee/wiretrustee:latest
|
||||
image_templates:
|
||||
- wiretrustee/wiretrustee:{{ .Version }}-arm64v8
|
||||
- wiretrustee/wiretrustee:{{ .Version }}-arm
|
||||
- wiretrustee/wiretrustee:{{ .Version }}-amd64
|
||||
|
||||
- name_template: wiretrustee/signal:{{ .Version }}
|
||||
image_templates:
|
||||
- wiretrustee/signal:{{ .Version }}-arm64v8
|
||||
- wiretrustee/signal:{{ .Version }}-arm
|
||||
- wiretrustee/signal:{{ .Version }}-amd64
|
||||
|
||||
- name_template: wiretrustee/signal:latest
|
||||
image_templates:
|
||||
- wiretrustee/signal:{{ .Version }}-arm64v8
|
||||
- wiretrustee/signal:{{ .Version }}-arm
|
||||
- wiretrustee/signal:{{ .Version }}-amd64
|
||||
|
||||
- name_template: wiretrustee/management:{{ .Version }}
|
||||
image_templates:
|
||||
- wiretrustee/management:{{ .Version }}-arm64v8
|
||||
- wiretrustee/management:{{ .Version }}-arm
|
||||
- wiretrustee/management:{{ .Version }}-amd64
|
||||
|
||||
- name_template: wiretrustee/management:latest
|
||||
image_templates:
|
||||
- wiretrustee/management:{{ .Version }}-arm64v8
|
||||
- wiretrustee/management:{{ .Version }}-arm
|
||||
- wiretrustee/management:{{ .Version }}-amd64
|
||||
|
||||
- name_template: wiretrustee/management:debug-latest
|
||||
image_templates:
|
||||
- wiretrustee/management:{{ .Version }}-debug-arm64v8
|
||||
- wiretrustee/management:{{ .Version }}-debug-arm
|
||||
- wiretrustee/management:{{ .Version }}-debug-amd64
|
||||
|
||||
brews:
|
||||
@@ -221,7 +334,7 @@ uploads:
|
||||
ids:
|
||||
- deb
|
||||
mode: archive
|
||||
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ .Arch }}
|
||||
target: https://pkgs.wiretrustee.com/debian/pool/{{ .ArtifactName }};deb.distribution=stable;deb.component=main;deb.architecture={{ if .Arm }}armhf{{ else }}{{ .Arch }}{{ end }};deb.package=
|
||||
username: dev@wiretrustee.com
|
||||
method: PUT
|
||||
- name: yum
|
||||
@@ -230,4 +343,4 @@ uploads:
|
||||
mode: archive
|
||||
target: https://pkgs.wiretrustee.com/yum/{{ .Arch }}{{ if .Arm }}{{ .Arm }}{{ end }}
|
||||
username: dev@wiretrustee.com
|
||||
method: PUT
|
||||
method: PUT
|
||||
|
||||
26
README.md
26
README.md
@@ -15,7 +15,7 @@
|
||||
<strong>
|
||||
Start using Wiretrustee at <a href="https://app.wiretrustee.com/">app.wiretrustee.com</a>
|
||||
<br/>
|
||||
See <a href="docs/README.md">Documentation</a>
|
||||
See <a href="https://docs.wiretrustee.com">Documentation</a>
|
||||
<br/>
|
||||
Join our <a href="https://join.slack.com/t/wiretrustee/shared_invite/zt-vrahf41g-ik1v7fV8du6t0RwxSrJ96A">Slack channel</a>
|
||||
<br/>
|
||||
@@ -27,12 +27,11 @@
|
||||
|
||||
**Wiretrustee is an open-source VPN platform built on top of WireGuard® making it easy to create secure private networks for your organization or home.**
|
||||
|
||||
It requires zero configuration effort leaving behind the hassle of opening ports, complex firewall rules, vpn gateways, and so forth.
|
||||
|
||||
There is no centralized VPN server with Wiretrustee - your computers, devices, machines, and servers connect to each other directly over a fast encrypted tunnel.
|
||||
It requires zero configuration effort leaving behind the hassle of opening ports, complex firewall rules, VPN gateways, and so forth.
|
||||
|
||||
**Wiretrustee automates Wireguard-based networks, offering a management layer with:**
|
||||
* Centralized Peer IP management with a neat UI dashboard.
|
||||
* Centralized Peer IP management with a UI dashboard.
|
||||
* Encrypted peer-to-peet connections without a centralized VPN gateway.
|
||||
* Automatic Peer discovery and configuration.
|
||||
* UDP hole punching to establish peer-to-peer connections behind NAT, firewall, and without a public static IP.
|
||||
* Connection relay fallback in case a peer-to-peer connection is not possible.
|
||||
@@ -40,6 +39,7 @@ There is no centralized VPN server with Wiretrustee - your computers, devices, m
|
||||
* Client application SSO with MFA (coming soon).
|
||||
* Access Controls (coming soon).
|
||||
* Activity Monitoring (coming soon).
|
||||
* Private DNS (coming baoon)
|
||||
|
||||
### Secure peer-to-peer VPN in minutes
|
||||
<p float="left" align="middle">
|
||||
@@ -56,14 +56,17 @@ Hosted demo version:
|
||||
|
||||
|
||||
### A bit on Wiretrustee internals
|
||||
* Wiretrustee features a Management Service that offers peer IP management and network updates distribution (e.g. when new peer joins the network).
|
||||
* Wiretrustee features a Management Service that offers peer IP management and network updates distribution (e.g. when a new peer joins the network).
|
||||
* Wiretrustee uses WebRTC ICE implemented in [pion/ice library](https://github.com/pion/ice) to discover connection candidates when establishing a peer-to-peer connection between devices.
|
||||
* Peers negotiate connection through [Signal Service](signal/).
|
||||
* Signal Service uses public Wireguard keys to route messages between peers.
|
||||
Contents of the messages sent between peers through the signaling server are encrypted with Wireguard keys, making it impossible to inspect them.
|
||||
* Occasionally, the NAT-traversal is unsuccessful due to strict NATs (e.g. mobile carrier-grade NAT).
|
||||
When this occurs the system falls back to relay server (TURN), and a secure Wireguard tunnel is established via TURN server.
|
||||
[Coturn](https://github.com/coturn/coturn) is the one that has been successfully used for STUN and TURN in Wiretrustee setups.
|
||||
* Occasionally, the NAT traversal is unsuccessful due to strict NATs (e.g. mobile carrier-grade NAT). When this occurs the system falls back to the relay server (TURN), and a secure Wireguard tunnel is established via the TURN server. [Coturn](https://github.com/coturn/coturn) is the one that has been successfully used for STUN and TURN in Wiretrustee setups.
|
||||
|
||||
<p float="left" align="middle">
|
||||
<img src="https://docs.wiretrustee.com/img/architecture/high-level-dia.png" width="700"/>
|
||||
</p>
|
||||
|
||||
|
||||
### Product Roadmap
|
||||
- [Public Roadmap](https://github.com/wiretrustee/wiretrustee/projects/2)
|
||||
@@ -145,6 +148,11 @@ For **Windows** systems, start powershell as administrator and:
|
||||
```shell
|
||||
wiretrustee up --setup-key <SETUP KEY>
|
||||
```
|
||||
For **Docker**, you can run with the following command:
|
||||
```shell
|
||||
docker run --network host --privileged --rm -d -e WT_SETUP_KEY=<SETUP KEY> -v wiretrustee-client:/etc/wiretrustee wiretrustee/wiretrustee:<TAG>
|
||||
```
|
||||
> TAG > 0.3.0 version
|
||||
|
||||
Alternatively, if you are hosting your own Management Service provide `--management-url` property pointing to your Management Service:
|
||||
```shell
|
||||
|
||||
4
client/Dockerfile
Normal file
4
client/Dockerfile
Normal file
@@ -0,0 +1,4 @@
|
||||
FROM gcr.io/distroless/base:debug
|
||||
ENV WT_LOG_FILE=console
|
||||
ENTRYPOINT [ "/go/bin/wiretrustee","up"]
|
||||
COPY wiretrustee /go/bin/wiretrustee
|
||||
@@ -18,22 +18,21 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
setupKey string
|
||||
|
||||
loginCmd = &cobra.Command{
|
||||
Use: "login",
|
||||
Short: "login to the Wiretrustee Management Service (first run)",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
SetFlagsFromEnvVars()
|
||||
|
||||
err := util.InitLog(logLevel, logFile)
|
||||
if err != nil {
|
||||
log.Errorf("failed initializing log %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
config, err := internal.GetConfig(managementURL, configPath)
|
||||
config, err := internal.GetConfig(managementURL, configPath, preSharedKey)
|
||||
if err != nil {
|
||||
log.Errorf("failed getting config %s %v", configPath, err)
|
||||
//os.Exit(ExitSetupFailed)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -41,7 +40,6 @@ var (
|
||||
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
|
||||
if err != nil {
|
||||
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
|
||||
//os.Exit(ExitSetupFailed)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -56,7 +54,6 @@ var (
|
||||
mgmClient, err := mgm.NewClient(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
|
||||
if err != nil {
|
||||
log.Errorf("failed connecting to Management Service %s %v", config.ManagementURL.String(), err)
|
||||
//os.Exit(ExitSetupFailed)
|
||||
return err
|
||||
}
|
||||
log.Debugf("connected to anagement Service %s", config.ManagementURL.String())
|
||||
@@ -64,21 +61,18 @@ var (
|
||||
serverKey, err := mgmClient.GetServerPublicKey()
|
||||
if err != nil {
|
||||
log.Errorf("failed while getting Management Service public key: %v", err)
|
||||
//os.Exit(ExitSetupFailed)
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = loginPeer(*serverKey, mgmClient, setupKey)
|
||||
if err != nil {
|
||||
log.Errorf("failed logging-in peer on Management Service : %v", err)
|
||||
//os.Exit(ExitSetupFailed)
|
||||
return err
|
||||
}
|
||||
|
||||
err = mgmClient.Close()
|
||||
if err != nil {
|
||||
log.Errorf("failed closing Management Service client: %v", err)
|
||||
//os.Exit(ExitSetupFailed)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -151,6 +145,3 @@ func promptPeerSetupKey() (string, error) {
|
||||
|
||||
return "", s.Err()
|
||||
}
|
||||
|
||||
//func init() {
|
||||
//}
|
||||
|
||||
@@ -4,19 +4,15 @@ import (
|
||||
"fmt"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/pflag"
|
||||
"github.com/wiretrustee/wiretrustee/client/internal"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"strings"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
const (
|
||||
// ExitSetupFailed defines exit code
|
||||
ExitSetupFailed = 1
|
||||
DefaultConfigPath = ""
|
||||
)
|
||||
|
||||
var (
|
||||
configPath string
|
||||
defaultConfigPath string
|
||||
@@ -24,6 +20,8 @@ var (
|
||||
defaultLogFile string
|
||||
logFile string
|
||||
managementURL string
|
||||
setupKey string
|
||||
preSharedKey string
|
||||
rootCmd = &cobra.Command{
|
||||
Use: "wiretrustee",
|
||||
Short: "",
|
||||
@@ -56,6 +54,7 @@ func init() {
|
||||
rootCmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", "sets Wiretrustee log level")
|
||||
rootCmd.PersistentFlags().StringVar(&logFile, "log-file", defaultLogFile, "sets Wiretrustee log path. If console is specified the the log will be output to stdout")
|
||||
rootCmd.PersistentFlags().StringVar(&setupKey, "setup-key", "", "Setup key obtained from the Management Service Dashboard (used to register peer)")
|
||||
rootCmd.PersistentFlags().StringVar(&preSharedKey, "preshared-key", "", "Sets Wireguard PreSharedKey property. If set, then only peers that have the same key can communicate.")
|
||||
rootCmd.AddCommand(serviceCmd)
|
||||
rootCmd.AddCommand(upCmd)
|
||||
rootCmd.AddCommand(loginCmd)
|
||||
@@ -75,3 +74,28 @@ func SetupCloseHandler() {
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// SetFlagsFromEnvVars reads and updates flag values from environment variables with prefix WT_
|
||||
func SetFlagsFromEnvVars() {
|
||||
flags := rootCmd.PersistentFlags()
|
||||
flags.VisitAll(func(f *pflag.Flag) {
|
||||
|
||||
envVar := FlagNameToEnvVar(f.Name)
|
||||
|
||||
if value, present := os.LookupEnv(envVar); present {
|
||||
err := flags.Set(f.Name, value)
|
||||
if err != nil {
|
||||
log.Infof("unable to configure flag %s using variable %s, err: %v", f.Name, envVar, err)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// FlagNameToEnvVar converts flag name to environment var name adding a prefix,
|
||||
// replacing dashes and making all uppercase (e.g. setup-keys is converted to WT_SETUP_KEYS)
|
||||
func FlagNameToEnvVar(f string) string {
|
||||
prefix := "WT_"
|
||||
parsed := strings.ReplaceAll(f, "-", "_")
|
||||
upper := strings.ToUpper(parsed)
|
||||
return prefix + upper
|
||||
}
|
||||
|
||||
@@ -34,6 +34,3 @@ var (
|
||||
Short: "manages wiretrustee service",
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/kardianos/service"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -9,40 +8,21 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
func (p *program) Start(s service.Service) error {
|
||||
|
||||
var backOff = &backoff.ExponentialBackOff{
|
||||
InitialInterval: time.Second,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 30 * time.Second,
|
||||
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}
|
||||
func (p *program) Start(service.Service) error {
|
||||
|
||||
// Start should not block. Do the actual work async.
|
||||
log.Info("starting service") //nolint
|
||||
go func() {
|
||||
operation := func() error {
|
||||
err := runClient()
|
||||
if err != nil {
|
||||
log.Warnf("retrying Wiretrustee client app due to error: %v", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
err := runClient()
|
||||
if err != nil {
|
||||
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
|
||||
log.Errorf("stopped Wiretrustee client app due to error: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *program) Stop(s service.Service) error {
|
||||
func (p *program) Stop(service.Service) error {
|
||||
go func() {
|
||||
stopCh <- 1
|
||||
}()
|
||||
@@ -61,6 +41,7 @@ var (
|
||||
Use: "run",
|
||||
Short: "runs wiretrustee as service",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
SetFlagsFromEnvVars()
|
||||
|
||||
err := util.InitLog(logLevel, logFile)
|
||||
if err != nil {
|
||||
@@ -95,6 +76,8 @@ var (
|
||||
Use: "start",
|
||||
Short: "starts wiretrustee service",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
SetFlagsFromEnvVars()
|
||||
|
||||
err := util.InitLog(logLevel, logFile)
|
||||
if err != nil {
|
||||
log.Errorf("failed initializing log %v", err)
|
||||
@@ -121,6 +104,8 @@ var (
|
||||
Use: "stop",
|
||||
Short: "stops wiretrustee service",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
SetFlagsFromEnvVars()
|
||||
|
||||
err := util.InitLog(logLevel, logFile)
|
||||
if err != nil {
|
||||
log.Errorf("failed initializing log %v", err)
|
||||
@@ -145,6 +130,8 @@ var (
|
||||
Use: "restart",
|
||||
Short: "restarts wiretrustee service",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
SetFlagsFromEnvVars()
|
||||
|
||||
err := util.InitLog(logLevel, logFile)
|
||||
if err != nil {
|
||||
log.Errorf("failed initializing log %v", err)
|
||||
@@ -163,6 +150,3 @@ var (
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ var (
|
||||
Use: "install",
|
||||
Short: "installs wiretrustee service",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
SetFlagsFromEnvVars()
|
||||
|
||||
svcConfig := newSVCConfig()
|
||||
|
||||
@@ -49,6 +50,7 @@ var (
|
||||
Use: "uninstall",
|
||||
Short: "uninstalls wiretrustee service from system",
|
||||
Run: func(cmd *cobra.Command, args []string) {
|
||||
SetFlagsFromEnvVars()
|
||||
|
||||
s, err := newSVC(&program{}, newSVCConfig())
|
||||
if err != nil {
|
||||
@@ -65,6 +67,3 @@ var (
|
||||
},
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
}
|
||||
|
||||
197
client/cmd/up.go
197
client/cmd/up.go
@@ -2,6 +2,7 @@ package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/kardianos/service"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
@@ -12,6 +13,7 @@ import (
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -19,7 +21,7 @@ var (
|
||||
Use: "up",
|
||||
Short: "install, login and start wiretrustee client",
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
|
||||
SetFlagsFromEnvVars()
|
||||
err := loginCmd.RunE(cmd, args)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -61,12 +63,22 @@ func createEngineConfig(key wgtypes.Key, config *internal.Config, peerConfig *mg
|
||||
iFaceBlackList[config.IFaceBlackList[i]] = struct{}{}
|
||||
}
|
||||
|
||||
return &internal.EngineConfig{
|
||||
engineConf := &internal.EngineConfig{
|
||||
WgIface: config.WgIface,
|
||||
WgAddr: peerConfig.Address,
|
||||
IFaceBlackList: iFaceBlackList,
|
||||
WgPrivateKey: key,
|
||||
}, nil
|
||||
}
|
||||
|
||||
if config.PreSharedKey != "" {
|
||||
preSharedKey, err := wgtypes.ParseKey(config.PreSharedKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
engineConf.PreSharedKey = &preSharedKey
|
||||
}
|
||||
|
||||
return engineConf, nil
|
||||
}
|
||||
|
||||
// connectToSignal creates Signal Service client and established a connection
|
||||
@@ -117,86 +129,107 @@ func connectToManagement(ctx context.Context, managementAddr string, ourPrivateK
|
||||
}
|
||||
|
||||
func runClient() error {
|
||||
config, err := internal.ReadConfig(managementURL, configPath)
|
||||
var backOff = &backoff.ExponentialBackOff{
|
||||
InitialInterval: time.Second,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 10 * time.Second,
|
||||
MaxElapsedTime: 24 * 3 * time.Hour, //stop the client after 3 days trying (must be a huge problem, e.g permission denied)
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}
|
||||
|
||||
operation := func() error {
|
||||
|
||||
config, err := internal.ReadConfig(managementURL, configPath)
|
||||
if err != nil {
|
||||
log.Errorf("failed reading config %s %v", configPath, err)
|
||||
return err
|
||||
}
|
||||
|
||||
//validate our peer's Wireguard PRIVATE key
|
||||
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
|
||||
if err != nil {
|
||||
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
|
||||
return err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
mgmTlsEnabled := false
|
||||
if config.ManagementURL.Scheme == "https" {
|
||||
mgmTlsEnabled = true
|
||||
}
|
||||
|
||||
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
|
||||
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
|
||||
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
peerConfig := loginResp.GetPeerConfig()
|
||||
|
||||
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
|
||||
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
|
||||
err = engine.Start()
|
||||
if err != nil {
|
||||
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
|
||||
|
||||
select {
|
||||
case <-stopCh:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
backOff.Reset()
|
||||
|
||||
err = mgmClient.Close()
|
||||
if err != nil {
|
||||
log.Errorf("failed closing Management Service client %v", err)
|
||||
return err
|
||||
}
|
||||
err = signalClient.Close()
|
||||
if err != nil {
|
||||
log.Errorf("failed closing Signal Service client %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = engine.Stop()
|
||||
if err != nil {
|
||||
log.Errorf("failed stopping engine %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
cleanupCh <- struct{}{}
|
||||
}()
|
||||
|
||||
log.Info("stopped Wiretrustee client")
|
||||
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
if err != nil {
|
||||
log.Errorf("failed reading config %s %v", configPath, err)
|
||||
log.Errorf("exiting client retry loop due to unrecoverable error: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
//validate our peer's Wireguard PRIVATE key
|
||||
myPrivateKey, err := wgtypes.ParseKey(config.PrivateKey)
|
||||
if err != nil {
|
||||
log.Errorf("failed parsing Wireguard key %s: [%s]", config.PrivateKey, err.Error())
|
||||
return err
|
||||
}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
mgmTlsEnabled := false
|
||||
if config.ManagementURL.Scheme == "https" {
|
||||
mgmTlsEnabled = true
|
||||
}
|
||||
|
||||
// connect (just a connection, no stream yet) and login to Management Service to get an initial global Wiretrustee config
|
||||
mgmClient, loginResp, err := connectToManagement(ctx, config.ManagementURL.Host, myPrivateKey, mgmTlsEnabled)
|
||||
if err != nil {
|
||||
log.Warn(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// with the global Wiretrustee config in hand connect (just a connection, no stream yet) Signal
|
||||
signalClient, err := connectToSignal(ctx, loginResp.GetWiretrusteeConfig(), myPrivateKey)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
peerConfig := loginResp.GetPeerConfig()
|
||||
|
||||
engineConfig, err := createEngineConfig(myPrivateKey, config, peerConfig)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
return err
|
||||
}
|
||||
|
||||
// create start the Wiretrustee Engine that will connect to the Signal and Management streams and manage connections to remote peers.
|
||||
engine := internal.NewEngine(signalClient, mgmClient, engineConfig, cancel, ctx)
|
||||
err = engine.Start()
|
||||
if err != nil {
|
||||
log.Errorf("error while starting Wiretrustee Connection Engine: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Print("Wiretrustee engine started, my IP is: ", peerConfig.Address)
|
||||
|
||||
select {
|
||||
case <-stopCh:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
err = mgmClient.Close()
|
||||
if err != nil {
|
||||
log.Errorf("failed closing Management Service client %v", err)
|
||||
return err
|
||||
}
|
||||
err = signalClient.Close()
|
||||
if err != nil {
|
||||
log.Errorf("failed closing Signal Service client %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = engine.Stop()
|
||||
if err != nil {
|
||||
log.Errorf("failed stopping engine %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
go func() {
|
||||
cleanupCh <- struct{}{}
|
||||
}()
|
||||
|
||||
log.Info("stopped Wiretrustee client")
|
||||
|
||||
return ctx.Err()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -106,6 +106,7 @@ SectionEnd
|
||||
Section Uninstall
|
||||
${INSTALL_TYPE}
|
||||
|
||||
Exec '"$INSTDIR\${MAIN_APP_EXE}" service stop'
|
||||
Exec '"$INSTDIR\${MAIN_APP_EXE}" service uninstall'
|
||||
# wait the service uninstall take unblock the executable
|
||||
Sleep 3000
|
||||
|
||||
@@ -28,13 +28,14 @@ func init() {
|
||||
type Config struct {
|
||||
// Wireguard private key of local peer
|
||||
PrivateKey string
|
||||
PreSharedKey string
|
||||
ManagementURL *url.URL
|
||||
WgIface string
|
||||
IFaceBlackList []string
|
||||
}
|
||||
|
||||
//createNewConfig creates a new config generating a new Wireguard key and saving to file
|
||||
func createNewConfig(managementURL string, configPath string) (*Config, error) {
|
||||
func createNewConfig(managementURL string, configPath string, preSharedKey string) (*Config, error) {
|
||||
wgKey := generateKey()
|
||||
config := &Config{PrivateKey: wgKey, WgIface: iface.WgInterfaceDefault, IFaceBlackList: []string{}}
|
||||
if managementURL != "" {
|
||||
@@ -47,6 +48,10 @@ func createNewConfig(managementURL string, configPath string) (*Config, error) {
|
||||
config.ManagementURL = managementURLDefault
|
||||
}
|
||||
|
||||
if preSharedKey != "" {
|
||||
config.PreSharedKey = preSharedKey
|
||||
}
|
||||
|
||||
config.IFaceBlackList = []string{iface.WgInterfaceDefault, "tun0"}
|
||||
|
||||
err := util.WriteJson(configPath, config)
|
||||
@@ -93,11 +98,11 @@ func ReadConfig(managementURL string, configPath string) (*Config, error) {
|
||||
}
|
||||
|
||||
// GetConfig reads existing config or generates a new one
|
||||
func GetConfig(managementURL string, configPath string) (*Config, error) {
|
||||
func GetConfig(managementURL string, configPath string, preSharedKey string) (*Config, error) {
|
||||
|
||||
if _, err := os.Stat(configPath); os.IsNotExist(err) {
|
||||
log.Infof("generating new config %s", configPath)
|
||||
return createNewConfig(managementURL, configPath)
|
||||
return createNewConfig(managementURL, configPath, preSharedKey)
|
||||
} else {
|
||||
return ReadConfig(managementURL, configPath)
|
||||
}
|
||||
|
||||
@@ -60,6 +60,8 @@ type ConnConfig struct {
|
||||
// Remote Wireguard public key
|
||||
RemoteWgKey wgtypes.Key
|
||||
|
||||
PreSharedKey *wgtypes.Key
|
||||
|
||||
StunTurnURLS []*ice.URL
|
||||
|
||||
iFaceBlackList map[string]struct{}
|
||||
@@ -115,7 +117,7 @@ func NewConnection(config ConnConfig,
|
||||
closeCond: NewCond(),
|
||||
connected: NewCond(),
|
||||
agent: nil,
|
||||
wgProxy: NewWgProxy(config.WgIface, config.RemoteWgKey.String(), config.WgAllowedIPs, config.WgListenAddr),
|
||||
wgProxy: NewWgProxy(config.WgIface, config.RemoteWgKey.String(), config.WgAllowedIPs, config.WgListenAddr, config.PreSharedKey),
|
||||
Status: StatusDisconnected,
|
||||
}
|
||||
}
|
||||
@@ -138,12 +140,18 @@ func (conn *Connection) Open(timeout time.Duration) error {
|
||||
return !ok
|
||||
},
|
||||
})
|
||||
conn.agent = a
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.agent = a
|
||||
defer func() {
|
||||
err := conn.agent.Close()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
err = conn.listenOnLocalCandidates()
|
||||
if err != nil {
|
||||
return err
|
||||
|
||||
@@ -4,7 +4,7 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
ice "github.com/pion/ice/v2"
|
||||
"github.com/pion/ice/v2"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/wiretrustee/wiretrustee/iface"
|
||||
mgm "github.com/wiretrustee/wiretrustee/management/client"
|
||||
@@ -12,14 +12,16 @@ import (
|
||||
signal "github.com/wiretrustee/wiretrustee/signal/client"
|
||||
sProto "github.com/wiretrustee/wiretrustee/signal/proto"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"math/rand"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// PeerConnectionTimeout is a timeout of an initial connection attempt to a remote peer.
|
||||
// E.g. this peer will wait PeerConnectionTimeout for the remote peer to respond, if not successful then it will retry the connection attempt.
|
||||
const PeerConnectionTimeout = 40 * time.Second
|
||||
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
|
||||
// E.g. this peer will wait PeerConnectionTimeoutMax for the remote peer to respond, if not successful then it will retry the connection attempt.
|
||||
const PeerConnectionTimeoutMax = 45 //sec
|
||||
const PeerConnectionTimeoutMin = 30 //sec
|
||||
|
||||
// EngineConfig is a config for the Engine
|
||||
type EngineConfig struct {
|
||||
@@ -30,6 +32,8 @@ type EngineConfig struct {
|
||||
WgPrivateKey wgtypes.Key
|
||||
// IFaceBlackList is a list of network interfaces to ignore when discovering connection candidates (ICE related)
|
||||
IFaceBlackList map[string]struct{}
|
||||
|
||||
PreSharedKey *wgtypes.Key
|
||||
}
|
||||
|
||||
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
|
||||
@@ -40,6 +44,8 @@ type Engine struct {
|
||||
mgmClient *mgm.Client
|
||||
// conns is a collection of remote peer connections indexed by local public key of the remote peers
|
||||
conns map[string]*Connection
|
||||
// peerMap is a map that holds all the peers that are known to this peer
|
||||
peerMap map[string]struct{}
|
||||
|
||||
// peerMux is used to sync peer operations (e.g. open connection, peer removal)
|
||||
peerMux *sync.Mutex
|
||||
@@ -73,6 +79,7 @@ func NewEngine(signalClient *signal.Client, mgmClient *mgm.Client, config *Engin
|
||||
signal: signalClient,
|
||||
mgmClient: mgmClient,
|
||||
conns: map[string]*Connection{},
|
||||
peerMap: map[string]struct{}{},
|
||||
peerMux: &sync.Mutex{},
|
||||
syncMsgMux: &sync.Mutex{},
|
||||
config: config,
|
||||
@@ -137,45 +144,52 @@ func (e *Engine) Start() error {
|
||||
|
||||
// initializePeer peer agent attempt to open connection
|
||||
func (e *Engine) initializePeer(peer Peer) {
|
||||
|
||||
e.peerMap[peer.WgPubKey] = struct{}{}
|
||||
|
||||
var backOff = backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: backoff.DefaultInitialInterval,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 5 * time.Second,
|
||||
MaxElapsedTime: time.Duration(0), //never stop
|
||||
MaxElapsedTime: 0, //never stop
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, e.ctx)
|
||||
|
||||
operation := func() error {
|
||||
|
||||
if e.signal.GetStatus() != signal.StreamConnected {
|
||||
return fmt.Errorf("not opening connection to peer because Signal is unavailable")
|
||||
}
|
||||
|
||||
_, err := e.openPeerConnection(e.wgPort, e.config.WgPrivateKey, peer)
|
||||
e.peerMux.Lock()
|
||||
defer e.peerMux.Unlock()
|
||||
if _, ok := e.conns[peer.WgPubKey]; !ok {
|
||||
log.Debugf("removed connection attempt to peer: %v, not retrying", peer.WgPubKey)
|
||||
if _, ok := e.peerMap[peer.WgPubKey]; !ok {
|
||||
log.Debugf("peer was removed: %v, stop connecting", peer.WgPubKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
log.Warnln(err)
|
||||
log.Debugf("retrying connection because of error: %s", err.Error())
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
if err != nil {
|
||||
// should actually never happen
|
||||
panic(err)
|
||||
}
|
||||
go func() {
|
||||
err := backoff.Retry(operation, backOff)
|
||||
if err != nil {
|
||||
// should actually never happen
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (e *Engine) removePeerConnections(peers []string) error {
|
||||
e.peerMux.Lock()
|
||||
defer e.peerMux.Unlock()
|
||||
func (e *Engine) removePeers(peers []string) error {
|
||||
for _, peer := range peers {
|
||||
err := e.removePeerConnection(peer)
|
||||
err := e.removePeer(peer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -188,7 +202,7 @@ func (e *Engine) removeAllPeerConnections() error {
|
||||
e.peerMux.Lock()
|
||||
defer e.peerMux.Unlock()
|
||||
for peer := range e.conns {
|
||||
err := e.removePeerConnection(peer)
|
||||
err := e.removePeer(peer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -196,14 +210,17 @@ func (e *Engine) removeAllPeerConnections() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// removePeerConnection closes existing peer connection and removes peer
|
||||
func (e *Engine) removePeerConnection(peerKey string) error {
|
||||
// removePeer closes an existing peer connection and removes a peer
|
||||
func (e *Engine) removePeer(peerKey string) error {
|
||||
|
||||
delete(e.peerMap, peerKey)
|
||||
|
||||
conn, exists := e.conns[peerKey]
|
||||
if exists && conn != nil {
|
||||
delete(e.conns, peerKey)
|
||||
return conn.Close()
|
||||
}
|
||||
log.Infof("removed connection to peer %s", peerKey)
|
||||
log.Infof("removed peer %s", peerKey)
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -222,7 +239,6 @@ func (e *Engine) GetPeerConnectionStatus(peerKey string) *Status {
|
||||
|
||||
// openPeerConnection opens a new remote peer connection
|
||||
func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*Connection, error) {
|
||||
e.peerMux.Lock()
|
||||
|
||||
remoteKey, _ := wgtypes.ParseKey(peer.WgPubKey)
|
||||
connConfig := &ConnConfig{
|
||||
@@ -234,6 +250,7 @@ func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*
|
||||
RemoteWgKey: remoteKey,
|
||||
StunTurnURLS: append(e.STUNs, e.TURNs...),
|
||||
iFaceBlackList: e.config.IFaceBlackList,
|
||||
PreSharedKey: e.config.PreSharedKey,
|
||||
}
|
||||
|
||||
signalOffer := func(uFrag string, pwd string) error {
|
||||
@@ -247,11 +264,13 @@ func (e *Engine) openPeerConnection(wgPort int, myKey wgtypes.Key, peer Peer) (*
|
||||
return signalCandidate(candidate, myKey, remoteKey, e.signal)
|
||||
}
|
||||
conn := NewConnection(*connConfig, signalCandidate, signalOffer, signalAnswer)
|
||||
e.peerMux.Lock()
|
||||
e.conns[remoteKey.String()] = conn
|
||||
e.peerMux.Unlock()
|
||||
|
||||
// blocks until the connection is open (or timeout)
|
||||
err := conn.Open(PeerConnectionTimeout)
|
||||
timeout := rand.Intn(PeerConnectionTimeoutMax-PeerConnectionTimeoutMin) + PeerConnectionTimeoutMin
|
||||
err := conn.Open(time.Duration(timeout) * time.Second)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -332,6 +351,8 @@ func (e *Engine) receiveManagementEvents() {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
// happens if management is unavailable for a long time.
|
||||
// We want to cancel the operation of the whole client
|
||||
e.cancel()
|
||||
return
|
||||
}
|
||||
@@ -379,7 +400,9 @@ func (e *Engine) updateTURNs(turns []*mgmProto.ProtectedHostConfig) error {
|
||||
}
|
||||
|
||||
func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {
|
||||
log.Debugf("got peers update from Management Service, updating")
|
||||
e.peerMux.Lock()
|
||||
defer e.peerMux.Unlock()
|
||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(remotePeers))
|
||||
remotePeerMap := make(map[string]struct{})
|
||||
for _, peer := range remotePeers {
|
||||
remotePeerMap[peer.GetWgPubKey()] = struct{}{}
|
||||
@@ -392,7 +415,7 @@ func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {
|
||||
toRemove = append(toRemove, p)
|
||||
}
|
||||
}
|
||||
err := e.removePeerConnections(toRemove)
|
||||
err := e.removePeers(toRemove)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -401,8 +424,8 @@ func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {
|
||||
for _, peer := range remotePeers {
|
||||
peerKey := peer.GetWgPubKey()
|
||||
peerIPs := peer.GetAllowedIps()
|
||||
if _, ok := e.conns[peerKey]; !ok {
|
||||
go e.initializePeer(Peer{
|
||||
if _, ok := e.peerMap[peerKey]; !ok {
|
||||
e.initializePeer(Peer{
|
||||
WgPubKey: peerKey,
|
||||
WgAllowedIps: strings.Join(peerIPs, ","),
|
||||
})
|
||||
@@ -414,68 +437,77 @@ func (e *Engine) updatePeers(remotePeers []*mgmProto.RemotePeerConfig) error {
|
||||
|
||||
// receiveSignalEvents connects to the Signal Service event stream to negotiate connection with remote peers
|
||||
func (e *Engine) receiveSignalEvents() {
|
||||
// connect to a stream of messages coming from the signal server
|
||||
e.signal.Receive(func(msg *sProto.Message) error {
|
||||
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
go func() {
|
||||
// connect to a stream of messages coming from the signal server
|
||||
err := e.signal.Receive(func(msg *sProto.Message) error {
|
||||
|
||||
conn := e.conns[msg.Key]
|
||||
if conn == nil {
|
||||
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
||||
}
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
|
||||
if conn.Config.RemoteWgKey.String() != msg.Key {
|
||||
return fmt.Errorf("unknown peer %s", msg.Key)
|
||||
}
|
||||
|
||||
switch msg.GetBody().Type {
|
||||
case sProto.Body_OFFER:
|
||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
conn := e.conns[msg.Key]
|
||||
if conn == nil {
|
||||
return fmt.Errorf("wrongly addressed message %s", msg.Key)
|
||||
}
|
||||
err = conn.OnOffer(IceCredentials{
|
||||
uFrag: remoteCred.UFrag,
|
||||
pwd: remoteCred.Pwd,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
if conn.Config.RemoteWgKey.String() != msg.Key {
|
||||
return fmt.Errorf("unknown peer %s", msg.Key)
|
||||
}
|
||||
|
||||
switch msg.GetBody().Type {
|
||||
case sProto.Body_OFFER:
|
||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = conn.OnOffer(IceCredentials{
|
||||
uFrag: remoteCred.UFrag,
|
||||
pwd: remoteCred.Pwd,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
case sProto.Body_ANSWER:
|
||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = conn.OnAnswer(IceCredentials{
|
||||
uFrag: remoteCred.UFrag,
|
||||
pwd: remoteCred.Pwd,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case sProto.Body_CANDIDATE:
|
||||
|
||||
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
|
||||
if err != nil {
|
||||
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = conn.OnRemoteCandidate(candidate)
|
||||
if err != nil {
|
||||
log.Errorf("error handling CANDIATE from %s", msg.Key)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
case sProto.Body_ANSWER:
|
||||
remoteCred, err := signal.UnMarshalCredential(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = conn.OnAnswer(IceCredentials{
|
||||
uFrag: remoteCred.UFrag,
|
||||
pwd: remoteCred.Pwd,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
case sProto.Body_CANDIDATE:
|
||||
|
||||
candidate, err := ice.UnmarshalCandidate(msg.GetBody().Payload)
|
||||
if err != nil {
|
||||
log.Errorf("failed on parsing remote candidate %s -> %s", candidate, err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = conn.OnRemoteCandidate(candidate)
|
||||
if err != nil {
|
||||
log.Errorf("error handling CANDIATE from %s", msg.Key)
|
||||
return err
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
// happens if signal is unavailable for a long time.
|
||||
// We want to cancel the operation of the whole client
|
||||
e.cancel()
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
e.signal.WaitConnected()
|
||||
e.signal.WaitStreamConnected()
|
||||
}
|
||||
|
||||
@@ -4,27 +4,30 @@ import (
|
||||
ice "github.com/pion/ice/v2"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/wiretrustee/wiretrustee/iface"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"net"
|
||||
)
|
||||
|
||||
// WgProxy an instance of an instance of the Connection Wireguard Proxy
|
||||
type WgProxy struct {
|
||||
iface string
|
||||
remoteKey string
|
||||
allowedIps string
|
||||
wgAddr string
|
||||
close chan struct{}
|
||||
wgConn net.Conn
|
||||
iface string
|
||||
remoteKey string
|
||||
allowedIps string
|
||||
wgAddr string
|
||||
close chan struct{}
|
||||
wgConn net.Conn
|
||||
preSharedKey *wgtypes.Key
|
||||
}
|
||||
|
||||
// NewWgProxy creates a new Connection Wireguard Proxy
|
||||
func NewWgProxy(iface string, remoteKey string, allowedIps string, wgAddr string) *WgProxy {
|
||||
func NewWgProxy(iface string, remoteKey string, allowedIps string, wgAddr string, preSharedKey *wgtypes.Key) *WgProxy {
|
||||
return &WgProxy{
|
||||
iface: iface,
|
||||
remoteKey: remoteKey,
|
||||
allowedIps: allowedIps,
|
||||
wgAddr: wgAddr,
|
||||
close: make(chan struct{}),
|
||||
iface: iface,
|
||||
remoteKey: remoteKey,
|
||||
allowedIps: allowedIps,
|
||||
wgAddr: wgAddr,
|
||||
close: make(chan struct{}),
|
||||
preSharedKey: preSharedKey,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -48,7 +51,7 @@ func (p *WgProxy) Close() error {
|
||||
|
||||
// StartLocal configure the interface with a peer using a direct IP:Port endpoint to the remote host
|
||||
func (p *WgProxy) StartLocal(host string) error {
|
||||
err := iface.UpdatePeer(p.iface, p.remoteKey, p.allowedIps, DefaultWgKeepAlive, host)
|
||||
err := iface.UpdatePeer(p.iface, p.remoteKey, p.allowedIps, DefaultWgKeepAlive, host, p.preSharedKey)
|
||||
if err != nil {
|
||||
log.Errorf("error while configuring Wireguard peer [%s] %s", p.remoteKey, err.Error())
|
||||
return err
|
||||
@@ -67,7 +70,7 @@ func (p *WgProxy) Start(remoteConn *ice.Conn) error {
|
||||
p.wgConn = wgConn
|
||||
// add local proxy connection as a Wireguard peer
|
||||
err = iface.UpdatePeer(p.iface, p.remoteKey, p.allowedIps, DefaultWgKeepAlive,
|
||||
wgConn.LocalAddr().String())
|
||||
wgConn.LocalAddr().String(), p.preSharedKey)
|
||||
if err != nil {
|
||||
log.Errorf("error while configuring Wireguard peer [%s] %s", p.remoteKey, err.Error())
|
||||
return err
|
||||
@@ -92,13 +95,11 @@ func (p *WgProxy) proxyToRemotePeer(remoteConn *ice.Conn) {
|
||||
default:
|
||||
n, err := p.wgConn.Read(buf)
|
||||
if err != nil {
|
||||
//log.Warnln("failed reading from peer: ", err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = remoteConn.Write(buf[:n])
|
||||
if err != nil {
|
||||
//log.Warnln("failed writing to remote peer: ", err.Error())
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -118,13 +119,11 @@ func (p *WgProxy) proxyToLocalWireguard(remoteConn *ice.Conn) {
|
||||
default:
|
||||
n, err := remoteConn.Read(buf)
|
||||
if err != nil {
|
||||
//log.Errorf("failed reading from remote connection %s", err)
|
||||
continue
|
||||
}
|
||||
|
||||
_, err = p.wgConn.Write(buf[:n])
|
||||
if err != nil {
|
||||
//log.Errorf("failed writing to local Wireguard instance %s", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
67
go.mod
67
go.mod
@@ -1,27 +1,62 @@
|
||||
module github.com/wiretrustee/wiretrustee
|
||||
|
||||
go 1.16
|
||||
go 1.17
|
||||
|
||||
require (
|
||||
github.com/cenkalti/backoff/v4 v4.1.0
|
||||
github.com/cenkalti/backoff/v4 v4.1.2
|
||||
github.com/golang-jwt/jwt v3.2.2+incompatible
|
||||
github.com/golang/protobuf v1.5.2
|
||||
github.com/google/uuid v1.2.0
|
||||
github.com/google/uuid v1.3.0
|
||||
github.com/gorilla/mux v1.8.0
|
||||
github.com/kardianos/service v1.2.1-0.20210728001519-a323c3813bc7
|
||||
github.com/onsi/ginkgo v1.16.4
|
||||
github.com/onsi/gomega v1.13.0
|
||||
github.com/pion/ice/v2 v2.1.7
|
||||
github.com/kardianos/service v1.2.1-0.20210728001519-a323c3813bc7 //keep this version otherwise wiretrustee up command breaks
|
||||
github.com/onsi/ginkgo v1.16.5
|
||||
github.com/onsi/gomega v1.17.0
|
||||
github.com/pion/ice/v2 v2.1.17
|
||||
github.com/rs/cors v1.8.0
|
||||
github.com/sirupsen/logrus v1.7.0
|
||||
github.com/spf13/cobra v1.1.3
|
||||
github.com/sirupsen/logrus v1.8.1
|
||||
github.com/spf13/cobra v1.3.0
|
||||
github.com/spf13/pflag v1.0.5
|
||||
github.com/vishvananda/netlink v1.1.0
|
||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
|
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
|
||||
golang.zx2c4.com/wireguard v0.0.0-20210805125648-3957e9b9dd19
|
||||
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210803171230-4253848d036c
|
||||
golang.zx2c4.com/wireguard/windows v0.4.5
|
||||
google.golang.org/grpc v1.32.0
|
||||
google.golang.org/protobuf v1.26.0
|
||||
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3
|
||||
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e
|
||||
golang.zx2c4.com/wireguard v0.0.0-20211209221555-9c9e7e272434
|
||||
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20211215182854-7a385b3431de
|
||||
golang.zx2c4.com/wireguard/windows v0.5.1
|
||||
google.golang.org/grpc v1.43.0
|
||||
google.golang.org/protobuf v1.27.1
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||
)
|
||||
|
||||
require github.com/rs/xid v1.3.0
|
||||
|
||||
require (
|
||||
github.com/BurntSushi/toml v0.4.1 // indirect
|
||||
github.com/fsnotify/fsnotify v1.5.1 // indirect
|
||||
github.com/google/go-cmp v0.5.6 // indirect
|
||||
github.com/inconshreveable/mousetrap v1.0.0 // indirect
|
||||
github.com/josharian/native v0.0.0-20200817173448-b6b71def0850 // indirect
|
||||
github.com/mdlayher/genetlink v1.1.0 // indirect
|
||||
github.com/mdlayher/netlink v1.4.2 // indirect
|
||||
github.com/mdlayher/socket v0.0.0-20211102153432-57e3fa563ecb // indirect
|
||||
github.com/nxadm/tail v1.4.8 // indirect
|
||||
github.com/pion/dtls/v2 v2.0.12 // indirect
|
||||
github.com/pion/logging v0.2.2 // indirect
|
||||
github.com/pion/mdns v0.0.5 // indirect
|
||||
github.com/pion/randutil v0.1.0 // indirect
|
||||
github.com/pion/stun v0.3.5 // indirect
|
||||
github.com/pion/transport v0.12.3 // indirect
|
||||
github.com/pion/turn/v2 v2.0.5 // indirect
|
||||
github.com/pion/udp v0.1.1 // indirect
|
||||
github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df // indirect
|
||||
golang.org/x/mod v0.5.1 // indirect
|
||||
golang.org/x/net v0.0.0-20211208012354-db4efeb81f4b // indirect
|
||||
golang.org/x/text v0.3.8-0.20211105212822-18b340fc7af2 // indirect
|
||||
golang.org/x/tools v0.1.8 // indirect
|
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
|
||||
golang.zx2c4.com/go118/netip v0.0.0-20211111135330-a4a02eeacf9d // indirect
|
||||
golang.zx2c4.com/wintun v0.0.0-20211104114900-415007cec224 // indirect
|
||||
google.golang.org/genproto v0.0.0-20211208223120-3a66f561d7aa // indirect
|
||||
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect
|
||||
gopkg.in/yaml.v2 v2.4.0 // indirect
|
||||
honnef.co/go/tools v0.2.2 // indirect
|
||||
)
|
||||
|
||||
@@ -146,7 +146,7 @@ func GetListenPort(iface string) (*int, error) {
|
||||
|
||||
// UpdatePeer updates existing Wireguard Peer or creates a new one if doesn't exist
|
||||
// Endpoint is optional
|
||||
func UpdatePeer(iface string, peerKey string, allowedIps string, keepAlive time.Duration, endpoint string) error {
|
||||
func UpdatePeer(iface string, peerKey string, allowedIps string, keepAlive time.Duration, endpoint string, preSharedKey *wgtypes.Key) error {
|
||||
|
||||
log.Debugf("updating interface %s peer %s: endpoint %s ", iface, peerKey, endpoint)
|
||||
|
||||
@@ -165,6 +165,7 @@ func UpdatePeer(iface string, peerKey string, allowedIps string, keepAlive time.
|
||||
ReplaceAllowedIPs: true,
|
||||
AllowedIPs: []net.IPNet{*ipNet},
|
||||
PersistentKeepaliveInterval: &keepAlive,
|
||||
PresharedKey: preSharedKey,
|
||||
}
|
||||
|
||||
config := wgtypes.Config{
|
||||
|
||||
@@ -111,7 +111,7 @@ func Test_UpdatePeer(t *testing.T) {
|
||||
keepAlive := 15 * time.Second
|
||||
allowedIP := "10.99.99.2/32"
|
||||
endpoint := "127.0.0.1:9900"
|
||||
err = UpdatePeer(ifaceName, peerPubKey, allowedIP, keepAlive, endpoint)
|
||||
err = UpdatePeer(ifaceName, peerPubKey, allowedIP, keepAlive, endpoint, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -163,7 +163,7 @@ func Test_UpdatePeerEndpoint(t *testing.T) {
|
||||
keepAlive := 15 * time.Second
|
||||
allowedIP := "10.99.99.2/32"
|
||||
endpoint := "127.0.0.1:9900"
|
||||
err = UpdatePeer(ifaceName, peerPubKey, allowedIP, keepAlive, endpoint)
|
||||
err = UpdatePeer(ifaceName, peerPubKey, allowedIP, keepAlive, endpoint, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -204,7 +204,7 @@ func Test_RemovePeer(t *testing.T) {
|
||||
keepAlive := 15 * time.Second
|
||||
allowedIP := "10.99.99.2/32"
|
||||
endpoint := "127.0.0.1:9900"
|
||||
err = UpdatePeer(ifaceName, peerPubKey, allowedIP, keepAlive, endpoint)
|
||||
err = UpdatePeer(ifaceName, peerPubKey, allowedIP, keepAlive, endpoint, nil)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package client
|
||||
import (
|
||||
"context"
|
||||
"crypto/tls"
|
||||
"fmt"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/wiretrustee/wiretrustee/client/system"
|
||||
@@ -10,7 +11,9 @@ import (
|
||||
"github.com/wiretrustee/wiretrustee/management/proto"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"io"
|
||||
"time"
|
||||
@@ -26,7 +29,7 @@ type Client struct {
|
||||
// NewClient creates a new client to Management service
|
||||
func NewClient(ctx context.Context, addr string, ourPrivateKey wgtypes.Key, tlsEnabled bool) (*Client, error) {
|
||||
|
||||
transportOption := grpc.WithInsecure()
|
||||
transportOption := grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||
|
||||
if tlsEnabled {
|
||||
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
|
||||
@@ -70,13 +73,19 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 30 * time.Second,
|
||||
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
|
||||
MaxInterval: 10 * time.Second,
|
||||
MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
}
|
||||
|
||||
// ready indicates whether the client is okay and ready to be used
|
||||
// for now it just checks whether gRPC connection to the service is ready
|
||||
func (c *Client) ready() bool {
|
||||
return c.conn.GetState() == connectivity.Ready || c.conn.GetState() == connectivity.Idle
|
||||
}
|
||||
|
||||
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
|
||||
// Blocking request. The result will be sent via msgHandler callback function
|
||||
func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
@@ -85,6 +94,12 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
|
||||
operation := func() error {
|
||||
|
||||
log.Debugf("management connection state %v", c.conn.GetState())
|
||||
|
||||
if !c.ready() {
|
||||
return fmt.Errorf("no connection to management")
|
||||
}
|
||||
|
||||
// todo we already have it since we did the Login, maybe cache it locally?
|
||||
serverPubKey, err := c.GetServerPublicKey()
|
||||
if err != nil {
|
||||
@@ -98,17 +113,15 @@ func (c *Client) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("connected to the Management Service Stream")
|
||||
log.Infof("connected to the Management Service stream")
|
||||
|
||||
// blocking until error
|
||||
err = c.receiveEvents(stream, *serverPubKey, msgHandler)
|
||||
if err != nil {
|
||||
/*if errStatus, ok := status.FromError(err); ok && errStatus.Code() == codes.PermissionDenied {
|
||||
//todo handle differently??
|
||||
}*/
|
||||
backOff.Reset()
|
||||
return err
|
||||
}
|
||||
backOff.Reset()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -141,7 +154,7 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
|
||||
for {
|
||||
update, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
log.Errorf("managment stream was closed: %s", err)
|
||||
log.Errorf("Management stream has been closed by server: %s", err)
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
@@ -167,6 +180,10 @@ func (c *Client) receiveEvents(stream proto.ManagementService_SyncClient, server
|
||||
|
||||
// GetServerPublicKey returns server Wireguard public key (used later for encrypting messages sent to the server)
|
||||
func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
|
||||
if !c.ready() {
|
||||
return nil, fmt.Errorf("no connection to management")
|
||||
}
|
||||
|
||||
mgmCtx, cancel := context.WithTimeout(c.ctx, 5*time.Second) //todo make a general setting
|
||||
defer cancel()
|
||||
resp, err := c.realClient.GetServerKey(mgmCtx, &proto.Empty{})
|
||||
@@ -183,6 +200,9 @@ func (c *Client) GetServerPublicKey() (*wgtypes.Key, error) {
|
||||
}
|
||||
|
||||
func (c *Client) login(serverKey wgtypes.Key, req *proto.LoginRequest) (*proto.LoginResponse, error) {
|
||||
if !c.ready() {
|
||||
return nil, fmt.Errorf("no connection to management")
|
||||
}
|
||||
loginReq, err := encryption.EncryptMessage(serverKey, c.key, req)
|
||||
if err != nil {
|
||||
log.Errorf("failed to encrypt message: %s", err)
|
||||
|
||||
@@ -2,6 +2,7 @@ package server
|
||||
|
||||
import (
|
||||
"github.com/google/uuid"
|
||||
"github.com/rs/xid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/wiretrustee/wiretrustee/util"
|
||||
"google.golang.org/grpc/codes"
|
||||
@@ -19,10 +20,39 @@ type AccountManager struct {
|
||||
|
||||
// Account represents a unique account of the system
|
||||
type Account struct {
|
||||
Id string
|
||||
Id string
|
||||
// User.Id it was created by
|
||||
CreatedBy string
|
||||
SetupKeys map[string]*SetupKey
|
||||
Network *Network
|
||||
Peers map[string]*Peer
|
||||
Users map[string]*User
|
||||
}
|
||||
|
||||
func (a *Account) Copy() *Account {
|
||||
peers := map[string]*Peer{}
|
||||
for id, peer := range a.Peers {
|
||||
peers[id] = peer.Copy()
|
||||
}
|
||||
|
||||
users := map[string]*User{}
|
||||
for id, user := range a.Users {
|
||||
users[id] = user.Copy()
|
||||
}
|
||||
|
||||
setupKeys := map[string]*SetupKey{}
|
||||
for id, key := range a.SetupKeys {
|
||||
setupKeys[id] = key.Copy()
|
||||
}
|
||||
|
||||
return &Account{
|
||||
Id: a.Id,
|
||||
CreatedBy: a.CreatedBy,
|
||||
SetupKeys: setupKeys,
|
||||
Network: a.Network.Copy(),
|
||||
Peers: peers,
|
||||
Users: users,
|
||||
}
|
||||
}
|
||||
|
||||
// NewManager creates a new AccountManager with a provided Store
|
||||
@@ -125,29 +155,6 @@ func (am *AccountManager) GetAccount(accountId string) (*Account, error) {
|
||||
return account, nil
|
||||
}
|
||||
|
||||
// GetOrCreateAccount returns an existing account or creates a new one if doesn't exist
|
||||
func (am *AccountManager) GetOrCreateAccount(accountId string) (*Account, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
_, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
|
||||
return am.createAccount(accountId)
|
||||
} else {
|
||||
// other error
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
account, err := am.Store.GetAccount(accountId)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed retrieving account")
|
||||
}
|
||||
|
||||
return account, nil
|
||||
}
|
||||
|
||||
//AccountExists checks whether account exists (returns true) or not (returns false)
|
||||
func (am *AccountManager) AccountExists(accountId string) (*bool, error) {
|
||||
am.mux.Lock()
|
||||
@@ -168,18 +175,18 @@ func (am *AccountManager) AccountExists(accountId string) (*bool, error) {
|
||||
return &res, nil
|
||||
}
|
||||
|
||||
// AddAccount generates a new Account with a provided accountId and saves to the Store
|
||||
func (am *AccountManager) AddAccount(accountId string) (*Account, error) {
|
||||
// AddAccount generates a new Account with a provided accountId and userId, saves to the Store
|
||||
func (am *AccountManager) AddAccount(accountId string, userId string) (*Account, error) {
|
||||
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
return am.createAccount(accountId)
|
||||
return am.createAccount(accountId, userId)
|
||||
|
||||
}
|
||||
|
||||
func (am *AccountManager) createAccount(accountId string) (*Account, error) {
|
||||
account, _ := newAccountWithId(accountId)
|
||||
func (am *AccountManager) createAccount(accountId string, userId string) (*Account, error) {
|
||||
account, _ := newAccountWithId(accountId, userId)
|
||||
|
||||
err := am.Store.SaveAccount(account)
|
||||
if err != nil {
|
||||
@@ -190,7 +197,7 @@ func (am *AccountManager) createAccount(accountId string) (*Account, error) {
|
||||
}
|
||||
|
||||
// newAccountWithId creates a new Account with a default SetupKey (doesn't store in a Store) and provided id
|
||||
func newAccountWithId(accountId string) (*Account, *SetupKey) {
|
||||
func newAccountWithId(accountId string, userId string) (*Account, *SetupKey) {
|
||||
|
||||
log.Debugf("creating new account")
|
||||
|
||||
@@ -204,16 +211,17 @@ func newAccountWithId(accountId string) (*Account, *SetupKey) {
|
||||
Net: net.IPNet{IP: net.ParseIP("100.64.0.0"), Mask: net.IPMask{255, 192, 0, 0}},
|
||||
Dns: ""}
|
||||
peers := make(map[string]*Peer)
|
||||
users := make(map[string]*User)
|
||||
|
||||
log.Debugf("created new account %s with setup key %s", accountId, defaultKey.Key)
|
||||
|
||||
return &Account{Id: accountId, SetupKeys: setupKeys, Network: network, Peers: peers}, defaultKey
|
||||
return &Account{Id: accountId, SetupKeys: setupKeys, Network: network, Peers: peers, Users: users, CreatedBy: userId}, defaultKey
|
||||
}
|
||||
|
||||
// newAccount creates a new Account with a default SetupKey (doesn't store in a Store)
|
||||
func newAccount() (*Account, *SetupKey) {
|
||||
accountId := uuid.New().String()
|
||||
return newAccountWithId(accountId)
|
||||
// newAccount creates a new Account with a default SetupKey and a provided User.Id of a user who issued account creation (doesn't store in a Store)
|
||||
func newAccount(userId string) (*Account, *SetupKey) {
|
||||
accountId := xid.New().String()
|
||||
return newAccountWithId(accountId, userId)
|
||||
}
|
||||
|
||||
func getAccountSetupKeyById(acc *Account, keyId string) *SetupKey {
|
||||
|
||||
@@ -2,12 +2,36 @@ package server
|
||||
|
||||
import (
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
"net"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestAccountManager_GetOrCreateAccountByUser(t *testing.T) {
|
||||
manager, err := createManager(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
userId := "test_user"
|
||||
account, err := manager.GetOrCreateAccountByUser(userId)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if account == nil {
|
||||
t.Fatalf("expected to create an account for a user %s", userId)
|
||||
}
|
||||
|
||||
account, err = manager.GetAccountByUser(userId)
|
||||
if err != nil {
|
||||
t.Errorf("expected to get existing account after creation, no account was found for a user %s", userId)
|
||||
}
|
||||
|
||||
if account != nil && account.Users[userId] == nil {
|
||||
t.Fatalf("expected to create an account for a user %s but no user was found after creation udner the account %s", userId, account.Id)
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountManager_AddAccount(t *testing.T) {
|
||||
manager, err := createManager(t)
|
||||
if err != nil {
|
||||
@@ -16,6 +40,7 @@ func TestAccountManager_AddAccount(t *testing.T) {
|
||||
}
|
||||
|
||||
expectedId := "test_account"
|
||||
userId := "account_creator"
|
||||
expectedPeersSize := 0
|
||||
expectedSetupKeysSize := 2
|
||||
expectedNetwork := net.IPNet{
|
||||
@@ -23,7 +48,7 @@ func TestAccountManager_AddAccount(t *testing.T) {
|
||||
Mask: net.IPMask{255, 192, 0, 0},
|
||||
}
|
||||
|
||||
account, err := manager.AddAccount(expectedId)
|
||||
account, err := manager.AddAccount(expectedId, userId)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -45,46 +70,6 @@ func TestAccountManager_AddAccount(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountManager_GetOrCreateAccount(t *testing.T) {
|
||||
manager, err := createManager(t)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
expectedId := "test_account"
|
||||
|
||||
//make sure account doesn't exist
|
||||
account, err := manager.GetAccount(expectedId)
|
||||
if err != nil {
|
||||
errStatus, ok := status.FromError(err)
|
||||
if !(ok && errStatus.Code() == codes.NotFound) {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
if account != nil {
|
||||
t.Fatal("expecting empty account")
|
||||
}
|
||||
|
||||
account, err = manager.GetOrCreateAccount(expectedId)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if account.Id != expectedId {
|
||||
t.Fatalf("expected to create an account, got wrong account")
|
||||
}
|
||||
|
||||
account, err = manager.GetOrCreateAccount(expectedId)
|
||||
if err != nil {
|
||||
t.Errorf("expected to get existing account after creation, failed")
|
||||
}
|
||||
|
||||
if account.Id != expectedId {
|
||||
t.Fatalf("expected to create an account, got wrong account")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAccountManager_AccountExists(t *testing.T) {
|
||||
manager, err := createManager(t)
|
||||
if err != nil {
|
||||
@@ -93,7 +78,8 @@ func TestAccountManager_AccountExists(t *testing.T) {
|
||||
}
|
||||
|
||||
expectedId := "test_account"
|
||||
_, err = manager.AddAccount(expectedId)
|
||||
userId := "account_creator"
|
||||
_, err = manager.AddAccount(expectedId, userId)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -117,7 +103,8 @@ func TestAccountManager_GetAccount(t *testing.T) {
|
||||
}
|
||||
|
||||
expectedId := "test_account"
|
||||
account, err := manager.AddAccount(expectedId)
|
||||
userId := "account_creator"
|
||||
account, err := manager.AddAccount(expectedId, userId)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -154,7 +141,7 @@ func TestAccountManager_AddPeer(t *testing.T) {
|
||||
return
|
||||
}
|
||||
|
||||
account, err := manager.AddAccount("test_account")
|
||||
account, err := manager.AddAccount("test_account", "account_creator")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ type FileStore struct {
|
||||
Accounts map[string]*Account
|
||||
SetupKeyId2AccountId map[string]string `json:"-"`
|
||||
PeerKeyId2AccountId map[string]string `json:"-"`
|
||||
UserId2AccountId map[string]string `json:"-"`
|
||||
|
||||
// mutex to synchronise Store read/write operations
|
||||
mux sync.Mutex `json:"-"`
|
||||
@@ -45,6 +46,7 @@ func restore(file string) (*FileStore, error) {
|
||||
mux: sync.Mutex{},
|
||||
SetupKeyId2AccountId: make(map[string]string),
|
||||
PeerKeyId2AccountId: make(map[string]string),
|
||||
UserId2AccountId: make(map[string]string),
|
||||
storeFile: file,
|
||||
}
|
||||
|
||||
@@ -65,6 +67,7 @@ func restore(file string) (*FileStore, error) {
|
||||
store.storeFile = file
|
||||
store.SetupKeyId2AccountId = make(map[string]string)
|
||||
store.PeerKeyId2AccountId = make(map[string]string)
|
||||
store.UserId2AccountId = make(map[string]string)
|
||||
for accountId, account := range store.Accounts {
|
||||
for setupKeyId := range account.SetupKeys {
|
||||
store.SetupKeyId2AccountId[strings.ToUpper(setupKeyId)] = accountId
|
||||
@@ -72,6 +75,9 @@ func restore(file string) (*FileStore, error) {
|
||||
for _, peer := range account.Peers {
|
||||
store.PeerKeyId2AccountId[peer.Key] = accountId
|
||||
}
|
||||
for _, user := range account.Users {
|
||||
store.UserId2AccountId[user.Id] = accountId
|
||||
}
|
||||
}
|
||||
|
||||
return store, nil
|
||||
@@ -168,6 +174,10 @@ func (s *FileStore) SaveAccount(account *Account) error {
|
||||
s.PeerKeyId2AccountId[peer.Key] = account.Id
|
||||
}
|
||||
|
||||
for _, user := range account.Users {
|
||||
s.UserId2AccountId[user.Id] = account.Id
|
||||
}
|
||||
|
||||
err := s.persist(s.storeFile)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -217,6 +227,18 @@ func (s *FileStore) GetAccount(accountId string) (*Account, error) {
|
||||
return account, nil
|
||||
}
|
||||
|
||||
func (s *FileStore) GetUserAccount(userId string) (*Account, error) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
accountId, accountIdFound := s.UserId2AccountId[userId]
|
||||
if !accountIdFound {
|
||||
return nil, status.Errorf(codes.NotFound, "account not found")
|
||||
}
|
||||
|
||||
return s.GetAccount(accountId)
|
||||
}
|
||||
|
||||
func (s *FileStore) GetPeerAccount(peerKey string) (*Account, error) {
|
||||
s.mux.Lock()
|
||||
defer s.mux.Unlock()
|
||||
|
||||
171
management/server/file_store_test.go
Normal file
171
management/server/file_store_test.go
Normal file
@@ -0,0 +1,171 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"github.com/wiretrustee/wiretrustee/util"
|
||||
"net"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewStore(t *testing.T) {
|
||||
store := newStore(t)
|
||||
|
||||
if store.Accounts == nil || len(store.Accounts) != 0 {
|
||||
t.Errorf("expected to create a new empty Accounts map when creating a new FileStore")
|
||||
}
|
||||
|
||||
if store.SetupKeyId2AccountId == nil || len(store.SetupKeyId2AccountId) != 0 {
|
||||
t.Errorf("expected to create a new empty SetupKeyId2AccountId map when creating a new FileStore")
|
||||
}
|
||||
|
||||
if store.PeerKeyId2AccountId == nil || len(store.PeerKeyId2AccountId) != 0 {
|
||||
t.Errorf("expected to create a new empty PeerKeyId2AccountId map when creating a new FileStore")
|
||||
}
|
||||
|
||||
if store.UserId2AccountId == nil || len(store.UserId2AccountId) != 0 {
|
||||
t.Errorf("expected to create a new empty UserId2AccountId map when creating a new FileStore")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestSaveAccount(t *testing.T) {
|
||||
store := newStore(t)
|
||||
|
||||
account, _ := newAccount("testuser")
|
||||
account.Users["testuser"] = NewAdminUser("testuser")
|
||||
setupKey := GenerateDefaultSetupKey()
|
||||
account.SetupKeys[setupKey.Key] = setupKey
|
||||
account.Peers["testpeer"] = &Peer{
|
||||
Key: "peerkey",
|
||||
SetupKey: "peerkeysetupkey",
|
||||
IP: net.IP{127, 0, 0, 1},
|
||||
Meta: PeerSystemMeta{},
|
||||
Name: "peer name",
|
||||
Status: &PeerStatus{Connected: true, LastSeen: time.Now()},
|
||||
}
|
||||
|
||||
// SaveAccount should trigger persist
|
||||
err := store.SaveAccount(account)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if store.Accounts[account.Id] == nil {
|
||||
t.Errorf("expecting Account to be stored after SaveAccount()")
|
||||
}
|
||||
|
||||
if store.PeerKeyId2AccountId["peerkey"] == "" {
|
||||
t.Errorf("expecting PeerKeyId2AccountId index updated after SaveAccount()")
|
||||
}
|
||||
|
||||
if store.UserId2AccountId["testuser"] == "" {
|
||||
t.Errorf("expecting UserId2AccountId index updated after SaveAccount()")
|
||||
}
|
||||
|
||||
if store.SetupKeyId2AccountId[setupKey.Key] == "" {
|
||||
t.Errorf("expecting SetupKeyId2AccountId index updated after SaveAccount()")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestStore(t *testing.T) {
|
||||
store := newStore(t)
|
||||
|
||||
account, _ := newAccount("testuser")
|
||||
account.Users["testuser"] = NewAdminUser("testuser")
|
||||
account.Peers["testpeer"] = &Peer{
|
||||
Key: "peerkey",
|
||||
SetupKey: "peerkeysetupkey",
|
||||
IP: net.IP{127, 0, 0, 1},
|
||||
Meta: PeerSystemMeta{},
|
||||
Name: "peer name",
|
||||
Status: &PeerStatus{Connected: true, LastSeen: time.Now()},
|
||||
}
|
||||
|
||||
// SaveAccount should trigger persist
|
||||
err := store.SaveAccount(account)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
restored, err := NewStore(store.storeFile)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
restoredAccount := restored.Accounts[account.Id]
|
||||
if restoredAccount == nil {
|
||||
t.Errorf("failed to restore a FileStore file - missing Account %s", account.Id)
|
||||
}
|
||||
|
||||
if restoredAccount != nil && restoredAccount.Peers["testpeer"] == nil {
|
||||
t.Errorf("failed to restore a FileStore file - missing Peer testpeer")
|
||||
}
|
||||
|
||||
if restoredAccount != nil && restoredAccount.CreatedBy != "testuser" {
|
||||
t.Errorf("failed to restore a FileStore file - missing Account CreatedBy")
|
||||
}
|
||||
|
||||
if restoredAccount != nil && restoredAccount.Users["testuser"] == nil {
|
||||
t.Errorf("failed to restore a FileStore file - missing User testuser")
|
||||
}
|
||||
|
||||
if restoredAccount != nil && restoredAccount.Network == nil {
|
||||
t.Errorf("failed to restore a FileStore file - missing Network")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func TestRestore(t *testing.T) {
|
||||
storeDir := t.TempDir()
|
||||
|
||||
err := util.CopyFileContents("testdata/store.json", filepath.Join(storeDir, "store.json"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
store, err := NewStore(storeDir)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
account := store.Accounts["bf1c8084-ba50-4ce7-9439-34653001fc3b"]
|
||||
if account == nil {
|
||||
t.Errorf("failed to restore a FileStore file - missing account bf1c8084-ba50-4ce7-9439-34653001fc3b")
|
||||
}
|
||||
|
||||
if account != nil && account.Users["edafee4e-63fb-11ec-90d6-0242ac120003"] == nil {
|
||||
t.Errorf("failed to restore a FileStore file - missing Account User edafee4e-63fb-11ec-90d6-0242ac120003")
|
||||
}
|
||||
|
||||
if account != nil && account.Users["f4f6d672-63fb-11ec-90d6-0242ac120003"] == nil {
|
||||
t.Errorf("failed to restore a FileStore file - missing Account User f4f6d672-63fb-11ec-90d6-0242ac120003")
|
||||
}
|
||||
|
||||
if account != nil && account.Network == nil {
|
||||
t.Errorf("failed to restore a FileStore file - missing Account Network")
|
||||
}
|
||||
|
||||
if account != nil && account.SetupKeys["A2C8E62B-38F5-4553-B31E-DD66C696CEBB"] == nil {
|
||||
t.Errorf("failed to restore a FileStore file - missing Account SetupKey A2C8E62B-38F5-4553-B31E-DD66C696CEBB")
|
||||
}
|
||||
|
||||
if len(store.UserId2AccountId) != 2 {
|
||||
t.Errorf("failed to restore a FileStore wrong UserId2AccountId mapping")
|
||||
}
|
||||
|
||||
if len(store.SetupKeyId2AccountId) != 1 {
|
||||
t.Errorf("failed to restore a FileStore wrong SetupKeyId2AccountId mapping")
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func newStore(t *testing.T) *FileStore {
|
||||
store, err := NewStore(t.TempDir())
|
||||
if err != nil {
|
||||
t.Errorf("failed creating a new store")
|
||||
}
|
||||
|
||||
return store
|
||||
}
|
||||
@@ -3,6 +3,7 @@ package server
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/ptypes/timestamp"
|
||||
@@ -95,6 +96,8 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
if s.config.TURNConfig.TimeBasedCredentials {
|
||||
s.turnCredentialsManager.SetupRefresh(peerKey.String())
|
||||
}
|
||||
|
||||
s.schedulePeerUpdates(srv.Context(), peerKey.String(), peer)
|
||||
// keep a connection to the peer and send updates when available
|
||||
for {
|
||||
select {
|
||||
@@ -135,6 +138,39 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Server) schedulePeerUpdates(context context.Context, peerKey string, peer *Peer) {
|
||||
//todo: introduce the following logic:
|
||||
// add a ModificationId to the Account entity (ModificationId increments by 1 if there was a change to the account network map)
|
||||
// periodically fetch changes of the Account providing ModificationId
|
||||
// if ModificationId is < then the one of the Account, then send changes
|
||||
// Client has to handle modification id as well
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-context.Done():
|
||||
log.Debugf("peer update cancelled %s", peerKey)
|
||||
return
|
||||
default:
|
||||
maxSleep := 6
|
||||
minSleep := 3
|
||||
sleep := rand.Intn(maxSleep-minSleep) + minSleep
|
||||
time.Sleep(time.Duration(sleep) * time.Second)
|
||||
|
||||
peers, err := s.accountManager.GetPeersForAPeer(peerKey)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
update := toSyncResponse(s.config, peer, peers, nil)
|
||||
err = s.peersUpdateManager.SendUpdate(peerKey, &UpdateMessage{Update: update})
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Peer, error) {
|
||||
|
||||
meta := req.GetMeta()
|
||||
@@ -158,12 +194,13 @@ func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Pe
|
||||
return nil, status.Errorf(codes.NotFound, "provided setup key doesn't exists")
|
||||
}
|
||||
|
||||
peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
|
||||
// notify other peers of our registration - uncomment if you want to bring back peer update logic
|
||||
/*peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
|
||||
if err != nil {
|
||||
return nil, status.Error(codes.Internal, "internal server error")
|
||||
}
|
||||
|
||||
// notify other peers of our registration
|
||||
|
||||
for _, remotePeer := range peers {
|
||||
// exclude notified peer and add ourselves
|
||||
peersToSend := []*Peer{peer}
|
||||
@@ -178,7 +215,7 @@ func (s *Server) registerPeer(peerKey wgtypes.Key, req *proto.LoginRequest) (*Pe
|
||||
// todo rethink if we should keep this return
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}*/
|
||||
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
@@ -62,7 +62,13 @@ func (h *Peers) deletePeer(accountId string, peer *server.Peer, w http.ResponseW
|
||||
}
|
||||
|
||||
func (h *Peers) HandlePeer(w http.ResponseWriter, r *http.Request) {
|
||||
accountId := extractAccountIdFromRequestContext(r)
|
||||
userId := extractUserIdFromRequestContext(r)
|
||||
account, err := h.accountManager.GetOrCreateAccountByUser(userId)
|
||||
if err != nil {
|
||||
log.Errorf("failed getting account of a user %s: %v", userId, err)
|
||||
http.Redirect(w, r, "/", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
vars := mux.Vars(r)
|
||||
peerId := vars["id"] //effectively peer IP address
|
||||
if len(peerId) == 0 {
|
||||
@@ -70,7 +76,7 @@ func (h *Peers) HandlePeer(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
peer, err := h.accountManager.GetPeerByIP(accountId, peerId)
|
||||
peer, err := h.accountManager.GetPeerByIP(account.Id, peerId)
|
||||
if err != nil {
|
||||
http.Error(w, "peer not found", http.StatusNotFound)
|
||||
return
|
||||
@@ -78,10 +84,10 @@ func (h *Peers) HandlePeer(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
switch r.Method {
|
||||
case http.MethodDelete:
|
||||
h.deletePeer(accountId, peer, w, r)
|
||||
h.deletePeer(account.Id, peer, w, r)
|
||||
return
|
||||
case http.MethodPut:
|
||||
h.updatePeer(accountId, peer, w, r)
|
||||
h.updatePeer(account.Id, peer, w, r)
|
||||
return
|
||||
case http.MethodGet:
|
||||
writeJSONObject(w, toPeerResponse(peer))
|
||||
@@ -96,11 +102,11 @@ func (h *Peers) HandlePeer(w http.ResponseWriter, r *http.Request) {
|
||||
func (h *Peers) GetPeers(w http.ResponseWriter, r *http.Request) {
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
accountId := extractAccountIdFromRequestContext(r)
|
||||
userId := extractUserIdFromRequestContext(r)
|
||||
//new user -> create a new account
|
||||
account, err := h.accountManager.GetOrCreateAccount(accountId)
|
||||
account, err := h.accountManager.GetOrCreateAccountByUser(userId)
|
||||
if err != nil {
|
||||
log.Errorf("failed getting user account %s: %v", accountId, err)
|
||||
log.Errorf("failed getting account of a user %s: %v", userId, err)
|
||||
http.Redirect(w, r, "/", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -118,7 +118,14 @@ func (h *SetupKeys) createKey(accountId string, w http.ResponseWriter, r *http.R
|
||||
}
|
||||
|
||||
func (h *SetupKeys) HandleKey(w http.ResponseWriter, r *http.Request) {
|
||||
accountId := extractAccountIdFromRequestContext(r)
|
||||
userId := extractUserIdFromRequestContext(r)
|
||||
account, err := h.accountManager.GetOrCreateAccountByUser(userId)
|
||||
if err != nil {
|
||||
log.Errorf("failed getting account of a user %s: %v", userId, err)
|
||||
http.Redirect(w, r, "/", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
keyId := vars["id"]
|
||||
if len(keyId) == 0 {
|
||||
@@ -128,10 +135,10 @@ func (h *SetupKeys) HandleKey(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
switch r.Method {
|
||||
case http.MethodPut:
|
||||
h.updateKey(accountId, keyId, w, r)
|
||||
h.updateKey(account.Id, keyId, w, r)
|
||||
return
|
||||
case http.MethodGet:
|
||||
h.getKey(accountId, keyId, w, r)
|
||||
h.getKey(account.Id, keyId, w, r)
|
||||
return
|
||||
default:
|
||||
http.Error(w, "", http.StatusNotFound)
|
||||
@@ -140,21 +147,20 @@ func (h *SetupKeys) HandleKey(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
func (h *SetupKeys) GetKeys(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
accountId := extractAccountIdFromRequestContext(r)
|
||||
userId := extractUserIdFromRequestContext(r)
|
||||
//new user -> create a new account
|
||||
account, err := h.accountManager.GetOrCreateAccountByUser(userId)
|
||||
if err != nil {
|
||||
log.Errorf("failed getting account of a user %s: %v", userId, err)
|
||||
http.Redirect(w, r, "/", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
switch r.Method {
|
||||
case http.MethodPost:
|
||||
h.createKey(accountId, w, r)
|
||||
h.createKey(account.Id, w, r)
|
||||
return
|
||||
case http.MethodGet:
|
||||
|
||||
//new user -> create a new account
|
||||
account, err := h.accountManager.GetOrCreateAccount(accountId)
|
||||
if err != nil {
|
||||
log.Errorf("failed getting user account %s: %v", accountId, err)
|
||||
http.Redirect(w, r, "/", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
w.WriteHeader(200)
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
@@ -165,7 +171,7 @@ func (h *SetupKeys) GetKeys(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
err = json.NewEncoder(w).Encode(respBody)
|
||||
if err != nil {
|
||||
log.Errorf("failed encoding account peers %s: %v", accountId, err)
|
||||
log.Errorf("failed encoding account peers %s: %v", account.Id, err)
|
||||
http.Redirect(w, r, "/", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -8,8 +8,8 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
// extractAccountIdFromRequestContext extracts accountId from the request context previously filled by the JWT token (after auth)
|
||||
func extractAccountIdFromRequestContext(r *http.Request) string {
|
||||
// extractUserIdFromRequestContext extracts accountId from the request context previously filled by the JWT token (after auth)
|
||||
func extractUserIdFromRequestContext(r *http.Request) string {
|
||||
token := r.Context().Value("user").(*jwt.Token)
|
||||
claims := token.Claims.(jwt.MapClaims)
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package server_test
|
||||
import (
|
||||
"context"
|
||||
server "github.com/wiretrustee/wiretrustee/management/server"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"io/ioutil"
|
||||
"math/rand"
|
||||
"net"
|
||||
@@ -320,9 +321,9 @@ var _ = Describe("Management service", func() {
|
||||
})
|
||||
})
|
||||
|
||||
Context("when there are 50 peers registered under one account", func() {
|
||||
Context("when there are 30 peers registered under one account", func() {
|
||||
Context("when there are 10 more peers registered under the same account", func() {
|
||||
Specify("all of the 50 peers will get updates of 10 newly registered peers", func() {
|
||||
Specify("all of the 20 peers will have 29 peer to connect to (total 30-1 itself)", func() {
|
||||
|
||||
initialPeers := 20
|
||||
additionalPeers := 10
|
||||
@@ -335,7 +336,7 @@ var _ = Describe("Management service", func() {
|
||||
}
|
||||
|
||||
wg := sync2.WaitGroup{}
|
||||
wg.Add(initialPeers + initialPeers*additionalPeers)
|
||||
wg.Add(initialPeers)
|
||||
|
||||
var clients []mgmtProto.ManagementService_SyncClient
|
||||
for _, peer := range peers {
|
||||
@@ -367,9 +368,10 @@ var _ = Describe("Management service", func() {
|
||||
resp := &mgmtProto.SyncResponse{}
|
||||
err = pb.Unmarshal(decryptedBytes, resp)
|
||||
Expect(err).NotTo(HaveOccurred())
|
||||
if len(resp.GetRemotePeers()) > 0 {
|
||||
if len(resp.GetRemotePeers()) == 29 {
|
||||
//only consider peer updates
|
||||
wg.Done()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
@@ -472,7 +474,9 @@ func loginPeerWithValidSetupKey(serverPubKey wgtypes.Key, key wgtypes.Key, clien
|
||||
func createRawClient(addr string) (mgmtProto.ManagementServiceClient, *grpc.ClientConn) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(),
|
||||
|
||||
conn, err := grpc.DialContext(ctx, addr,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 10 * time.Second,
|
||||
|
||||
13
management/server/migration/README.md
Normal file
13
management/server/migration/README.md
Normal file
@@ -0,0 +1,13 @@
|
||||
## Migration from Store v2 to Store v2
|
||||
|
||||
Previously Account.Id was an Auth0 user id.
|
||||
Conversion moves user id to Account.CreatedBy and generates a new Account.Id using xid.
|
||||
It also adds a User with id = old Account.Id with a role Admin.
|
||||
|
||||
To start a conversion simply run the command below providing your current Wiretrustee Management datadir (where store.json file is located)
|
||||
and a new data directory location (where a converted store.js will be stored):
|
||||
```shell
|
||||
./migration --oldDir /var/wiretrustee/datadir --newDir /var/wiretrustee/newdatadir/
|
||||
```
|
||||
|
||||
Afterwards you can run the Management service providing ```/var/wiretrustee/newdatadir/ ``` as a datadir.
|
||||
56
management/server/migration/convert_accounts.go
Normal file
56
management/server/migration/convert_accounts.go
Normal file
@@ -0,0 +1,56 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/rs/xid"
|
||||
"github.com/wiretrustee/wiretrustee/management/server"
|
||||
)
|
||||
|
||||
func main() {
|
||||
|
||||
oldDir := flag.String("oldDir", "old store directory", "/var/wiretrustee/datadir")
|
||||
newDir := flag.String("newDir", "new store directory", "/var/wiretrustee/newdatadir")
|
||||
|
||||
flag.Parse()
|
||||
|
||||
oldStore, err := server.NewStore(*oldDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
newStore, err := server.NewStore(*newDir)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
err = Convert(oldStore, newStore)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
fmt.Println("successfully converted")
|
||||
}
|
||||
|
||||
// Convert converts old store ato a new store
|
||||
// Previously Account.Id was an Auth0 user id
|
||||
// Conversion moved user id to Account.CreatedBy and generated a new Account.Id using xid
|
||||
// It also adds a User with id = old Account.Id with a role Admin
|
||||
func Convert(oldStore *server.FileStore, newStore *server.FileStore) error {
|
||||
for _, account := range oldStore.Accounts {
|
||||
accountCopy := account.Copy()
|
||||
accountCopy.Id = xid.New().String()
|
||||
accountCopy.CreatedBy = account.Id
|
||||
accountCopy.Users[account.Id] = &server.User{
|
||||
Id: account.Id,
|
||||
Role: server.UserRoleAdmin,
|
||||
}
|
||||
|
||||
err := newStore.SaveAccount(accountCopy)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
76
management/server/migration/convert_accounts_test.go
Normal file
76
management/server/migration/convert_accounts_test.go
Normal file
@@ -0,0 +1,76 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/wiretrustee/wiretrustee/management/server"
|
||||
"github.com/wiretrustee/wiretrustee/util"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestConvertAccounts(t *testing.T) {
|
||||
|
||||
storeDir := t.TempDir()
|
||||
|
||||
err := util.CopyFileContents("../testdata/storev1.json", filepath.Join(storeDir, "store.json"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
store, err := server.NewStore(storeDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
convertedStore, err := server.NewStore(filepath.Join(storeDir, "converted"))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
err = Convert(store, convertedStore)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if len(store.Accounts) != len(convertedStore.Accounts) {
|
||||
t.Errorf("expecting the same number of accounts after conversion")
|
||||
}
|
||||
|
||||
for _, account := range store.Accounts {
|
||||
convertedAccount, err := convertedStore.GetUserAccount(account.Id)
|
||||
if err != nil || convertedAccount == nil {
|
||||
t.Errorf("expecting Account %s to be converted", account.Id)
|
||||
return
|
||||
}
|
||||
if convertedAccount.CreatedBy != account.Id {
|
||||
t.Errorf("expecting converted Account.CreatedBy field to be equal to the old Account.Id")
|
||||
return
|
||||
}
|
||||
if convertedAccount.Id == account.Id {
|
||||
t.Errorf("expecting converted Account.Id to be different from Account.Id")
|
||||
return
|
||||
}
|
||||
if len(convertedAccount.Users) != 1 {
|
||||
t.Errorf("expecting converted Account.Users to be of size 1")
|
||||
return
|
||||
}
|
||||
user := convertedAccount.Users[account.Id]
|
||||
if user == nil {
|
||||
t.Errorf("expecting to find a user in converted Account.Users")
|
||||
return
|
||||
}
|
||||
if user.Role != server.UserRoleAdmin {
|
||||
t.Errorf("expecting to find a user in converted Account.Users with a role Admin")
|
||||
return
|
||||
}
|
||||
|
||||
for peerId := range account.Peers {
|
||||
convertedPeer := convertedAccount.Peers[peerId]
|
||||
if convertedPeer == nil {
|
||||
t.Errorf("expecting Account Peer of StoreV1 to be found in StoreV2")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
@@ -17,6 +17,14 @@ type Network struct {
|
||||
Dns string
|
||||
}
|
||||
|
||||
func (n *Network) Copy() *Network {
|
||||
return &Network{
|
||||
Id: n.Id,
|
||||
Net: n.Net,
|
||||
Dns: n.Dns,
|
||||
}
|
||||
}
|
||||
|
||||
// AllocatePeerIP pics an available IP from an net.IPNet.
|
||||
// This method considers already taken IPs and reuses IPs if there are gaps in takenIps
|
||||
// E.g. if ipNet=100.30.0.0/16 and takenIps=[100.30.0.1, 100.30.0.5] then the result would be 100.30.0.2
|
||||
|
||||
@@ -123,6 +123,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// notify peer itself
|
||||
err = am.peersUpdateManager.SendUpdate(peerKey,
|
||||
&UpdateMessage{
|
||||
Update: &proto.SyncResponse{
|
||||
@@ -134,7 +135,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
|
||||
}
|
||||
|
||||
//notify other peers of the change
|
||||
peers, err := am.Store.GetAccountPeers(accountId)
|
||||
/*peers, err := am.Store.GetAccountPeers(accountId)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -156,7 +157,7 @@ func (am *AccountManager) DeletePeer(accountId string, peerKey string) (*Peer, e
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
} */
|
||||
|
||||
am.peersUpdateManager.CloseChannel(peerKey)
|
||||
return peer, nil
|
||||
@@ -206,7 +207,6 @@ func (am *AccountManager) GetPeersForAPeer(peerKey string) ([]*Peer, error) {
|
||||
// Each Account has a list of pre-authorised SetupKey and if no Account has a given key err wit ha code codes.Unauthenticated
|
||||
// will be returned, meaning the key is invalid
|
||||
// Each new Peer will be assigned a new next net.IP from the Account.Network and Account.Network.LastIP will be updated (IP's are not reused).
|
||||
// If the specified setupKey is empty then a new Account will be created //todo remove this part
|
||||
// The peer property is just a placeholder for the Peer properties to pass further
|
||||
func (am *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error) {
|
||||
am.mux.Lock()
|
||||
@@ -218,8 +218,8 @@ func (am *AccountManager) AddPeer(setupKey string, peer Peer) (*Peer, error) {
|
||||
var err error
|
||||
var sk *SetupKey
|
||||
if len(upperKey) == 0 {
|
||||
// Empty setup key, create a new account for it.
|
||||
account, sk = newAccount()
|
||||
// Empty setup key, fail
|
||||
return nil, status.Errorf(codes.InvalidArgument, "empty setupKey %s", setupKey)
|
||||
} else {
|
||||
account, err = am.Store.GetAccountBySetupKey(upperKey)
|
||||
if err != nil {
|
||||
|
||||
@@ -5,6 +5,7 @@ type Store interface {
|
||||
DeletePeer(accountId string, peerKey string) (*Peer, error)
|
||||
SavePeer(accountId string, peer *Peer) error
|
||||
GetAccount(accountId string) (*Account, error)
|
||||
GetUserAccount(userId string) (*Account, error)
|
||||
GetAccountPeers(accountId string) ([]*Peer, error)
|
||||
GetPeerAccount(peerKey string) (*Account, error)
|
||||
GetAccountBySetupKey(setupKey string) (*Account, error)
|
||||
|
||||
12
management/server/testdata/store.json
vendored
12
management/server/testdata/store.json
vendored
@@ -22,7 +22,17 @@
|
||||
},
|
||||
"Dns": null
|
||||
},
|
||||
"Peers": {}
|
||||
"Peers": {},
|
||||
"Users": {
|
||||
"edafee4e-63fb-11ec-90d6-0242ac120003": {
|
||||
"Id": "edafee4e-63fb-11ec-90d6-0242ac120003",
|
||||
"Role": "admin"
|
||||
},
|
||||
"f4f6d672-63fb-11ec-90d6-0242ac120003": {
|
||||
"Id": "f4f6d672-63fb-11ec-90d6-0242ac120003",
|
||||
"Role": "user"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
154
management/server/testdata/storev1.json
vendored
Normal file
154
management/server/testdata/storev1.json
vendored
Normal file
@@ -0,0 +1,154 @@
|
||||
{
|
||||
"Accounts": {
|
||||
"auth0|61bf82ddeab084006aa1bccd": {
|
||||
"Id": "auth0|61bf82ddeab084006aa1bccd",
|
||||
"SetupKeys": {
|
||||
"1B2B50B0-B3E8-4B0C-A426-525EDB8481BD": {
|
||||
"Id": "831727121",
|
||||
"Key": "1B2B50B0-B3E8-4B0C-A426-525EDB8481BD",
|
||||
"Name": "One-off key",
|
||||
"Type": "one-off",
|
||||
"CreatedAt": "2021-12-24T16:09:45.926075752+01:00",
|
||||
"ExpiresAt": "2022-01-23T16:09:45.926075752+01:00",
|
||||
"Revoked": false,
|
||||
"UsedTimes": 1,
|
||||
"LastUsed": "2021-12-24T16:12:45.763424077+01:00"
|
||||
},
|
||||
"EB51E9EB-A11F-4F6E-8E49-C982891B405A": {
|
||||
"Id": "1769568301",
|
||||
"Key": "EB51E9EB-A11F-4F6E-8E49-C982891B405A",
|
||||
"Name": "Default key",
|
||||
"Type": "reusable",
|
||||
"CreatedAt": "2021-12-24T16:09:45.926073628+01:00",
|
||||
"ExpiresAt": "2022-01-23T16:09:45.926073628+01:00",
|
||||
"Revoked": false,
|
||||
"UsedTimes": 1,
|
||||
"LastUsed": "2021-12-24T16:13:06.236748538+01:00"
|
||||
}
|
||||
},
|
||||
"Network": {
|
||||
"Id": "a443c07a-5765-4a78-97fc-390d9c1d0e49",
|
||||
"Net": {
|
||||
"IP": "100.64.0.0",
|
||||
"Mask": "/8AAAA=="
|
||||
},
|
||||
"Dns": ""
|
||||
},
|
||||
"Peers": {
|
||||
"oMNaI8qWi0CyclSuwGR++SurxJyM3pQEiPEHwX8IREo=": {
|
||||
"Key": "oMNaI8qWi0CyclSuwGR++SurxJyM3pQEiPEHwX8IREo=",
|
||||
"SetupKey": "EB51E9EB-A11F-4F6E-8E49-C982891B405A",
|
||||
"IP": "100.64.0.2",
|
||||
"Meta": {
|
||||
"Hostname": "braginini",
|
||||
"GoOS": "linux",
|
||||
"Kernel": "Linux",
|
||||
"Core": "21.04",
|
||||
"Platform": "x86_64",
|
||||
"OS": "Ubuntu",
|
||||
"WtVersion": ""
|
||||
},
|
||||
"Name": "braginini",
|
||||
"Status": {
|
||||
"LastSeen": "2021-12-24T16:13:11.244342541+01:00",
|
||||
"Connected": false
|
||||
}
|
||||
},
|
||||
"xlx9/9D8+ibnRiIIB8nHGMxGOzxV17r8ShPHgi4aYSM=": {
|
||||
"Key": "xlx9/9D8+ibnRiIIB8nHGMxGOzxV17r8ShPHgi4aYSM=",
|
||||
"SetupKey": "1B2B50B0-B3E8-4B0C-A426-525EDB8481BD",
|
||||
"IP": "100.64.0.1",
|
||||
"Meta": {
|
||||
"Hostname": "braginini",
|
||||
"GoOS": "linux",
|
||||
"Kernel": "Linux",
|
||||
"Core": "21.04",
|
||||
"Platform": "x86_64",
|
||||
"OS": "Ubuntu",
|
||||
"WtVersion": ""
|
||||
},
|
||||
"Name": "braginini",
|
||||
"Status": {
|
||||
"LastSeen": "2021-12-24T16:12:49.089339333+01:00",
|
||||
"Connected": false
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"google-oauth2|103201118415301331038": {
|
||||
"Id": "google-oauth2|103201118415301331038",
|
||||
"SetupKeys": {
|
||||
"5AFB60DB-61F2-4251-8E11-494847EE88E9": {
|
||||
"Id": "2485964613",
|
||||
"Key": "5AFB60DB-61F2-4251-8E11-494847EE88E9",
|
||||
"Name": "Default key",
|
||||
"Type": "reusable",
|
||||
"CreatedAt": "2021-12-24T16:10:02.238476+01:00",
|
||||
"ExpiresAt": "2022-01-23T16:10:02.238476+01:00",
|
||||
"Revoked": false,
|
||||
"UsedTimes": 1,
|
||||
"LastUsed": "2021-12-24T16:12:05.994307717+01:00"
|
||||
},
|
||||
"A72E4DC2-00DE-4542-8A24-62945438104E": {
|
||||
"Id": "3504804807",
|
||||
"Key": "A72E4DC2-00DE-4542-8A24-62945438104E",
|
||||
"Name": "One-off key",
|
||||
"Type": "one-off",
|
||||
"CreatedAt": "2021-12-24T16:10:02.238478209+01:00",
|
||||
"ExpiresAt": "2022-01-23T16:10:02.238478209+01:00",
|
||||
"Revoked": false,
|
||||
"UsedTimes": 1,
|
||||
"LastUsed": "2021-12-24T16:11:27.015741738+01:00"
|
||||
}
|
||||
},
|
||||
"Network": {
|
||||
"Id": "b6d0b152-364e-40c1-a8a1-fa7bcac2267f",
|
||||
"Net": {
|
||||
"IP": "100.64.0.0",
|
||||
"Mask": "/8AAAA=="
|
||||
},
|
||||
"Dns": ""
|
||||
},
|
||||
"Peers": {
|
||||
"6kjbmVq1hmucVzvBXo5OucY5OYv+jSsB1jUTLq291Dw=": {
|
||||
"Key": "6kjbmVq1hmucVzvBXo5OucY5OYv+jSsB1jUTLq291Dw=",
|
||||
"SetupKey": "5AFB60DB-61F2-4251-8E11-494847EE88E9",
|
||||
"IP": "100.64.0.2",
|
||||
"Meta": {
|
||||
"Hostname": "braginini",
|
||||
"GoOS": "linux",
|
||||
"Kernel": "Linux",
|
||||
"Core": "21.04",
|
||||
"Platform": "x86_64",
|
||||
"OS": "Ubuntu",
|
||||
"WtVersion": ""
|
||||
},
|
||||
"Name": "braginini",
|
||||
"Status": {
|
||||
"LastSeen": "2021-12-24T16:12:05.994305438+01:00",
|
||||
"Connected": false
|
||||
}
|
||||
},
|
||||
"Ok+5QMdt/UjoktNOvicGYj+IX2g98p+0N2PJ3vJ45RI=": {
|
||||
"Key": "Ok+5QMdt/UjoktNOvicGYj+IX2g98p+0N2PJ3vJ45RI=",
|
||||
"SetupKey": "A72E4DC2-00DE-4542-8A24-62945438104E",
|
||||
"IP": "100.64.0.1",
|
||||
"Meta": {
|
||||
"Hostname": "braginini",
|
||||
"GoOS": "linux",
|
||||
"Kernel": "Linux",
|
||||
"Core": "21.04",
|
||||
"Platform": "x86_64",
|
||||
"OS": "Ubuntu",
|
||||
"WtVersion": ""
|
||||
},
|
||||
"Name": "braginini",
|
||||
"Status": {
|
||||
"LastSeen": "2021-12-24T16:11:27.015739803+01:00",
|
||||
"Connected": false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
71
management/server/user.go
Normal file
71
management/server/user.go
Normal file
@@ -0,0 +1,71 @@
|
||||
package server
|
||||
|
||||
import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
const (
|
||||
UserRoleAdmin UserRole = "admin"
|
||||
UserRoleUser UserRole = "user"
|
||||
)
|
||||
|
||||
// UserRole is the role of the User
|
||||
type UserRole string
|
||||
|
||||
// User represents a user of the system
|
||||
type User struct {
|
||||
Id string
|
||||
Role UserRole
|
||||
}
|
||||
|
||||
func (u *User) Copy() *User {
|
||||
return &User{
|
||||
Id: u.Id,
|
||||
Role: u.Role,
|
||||
}
|
||||
}
|
||||
|
||||
// NewUser creates a new user
|
||||
func NewUser(id string, role UserRole) *User {
|
||||
return &User{
|
||||
Id: id,
|
||||
Role: role,
|
||||
}
|
||||
}
|
||||
|
||||
// NewAdminUser creates a new user with role UserRoleAdmin
|
||||
func NewAdminUser(id string) *User {
|
||||
return NewUser(id, UserRoleAdmin)
|
||||
}
|
||||
|
||||
// GetOrCreateAccountByUser returns an existing account for a given user id or creates a new one if doesn't exist
|
||||
func (am *AccountManager) GetOrCreateAccountByUser(userId string) (*Account, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
account, err := am.Store.GetUserAccount(userId)
|
||||
if err != nil {
|
||||
if s, ok := status.FromError(err); ok && s.Code() == codes.NotFound {
|
||||
account, _ = newAccount(userId)
|
||||
account.Users[userId] = NewAdminUser(userId)
|
||||
err = am.Store.SaveAccount(account)
|
||||
if err != nil {
|
||||
return nil, status.Errorf(codes.Internal, "failed creating account")
|
||||
}
|
||||
} else {
|
||||
// other error
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return account, nil
|
||||
}
|
||||
|
||||
// GetAccountByUser returns an existing account for a given user id, NotFound if account couldn't be found
|
||||
func (am *AccountManager) GetAccountByUser(userId string) (*Account, error) {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
return am.Store.GetUserAccount(userId)
|
||||
}
|
||||
@@ -11,7 +11,9 @@ import (
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/status"
|
||||
@@ -23,6 +25,12 @@ import (
|
||||
|
||||
// A set of tools to exchange connection details (Wireguard endpoints) with the remote peer.
|
||||
|
||||
// Status is the status of the client
|
||||
type Status string
|
||||
|
||||
const StreamConnected Status = "Connected"
|
||||
const StreamDisconnected Status = "Disconnected"
|
||||
|
||||
// Client Wraps the Signal Exchange Service gRpc client
|
||||
type Client struct {
|
||||
key wgtypes.Key
|
||||
@@ -30,8 +38,15 @@ type Client struct {
|
||||
signalConn *grpc.ClientConn
|
||||
ctx context.Context
|
||||
stream proto.SignalExchange_ConnectStreamClient
|
||||
//waiting group to notify once stream is connected
|
||||
connWg *sync.WaitGroup //todo use a channel instead??
|
||||
// connectedCh used to notify goroutines waiting for the connection to the Signal stream
|
||||
connectedCh chan struct{}
|
||||
mux sync.Mutex
|
||||
// StreamConnected indicates whether this client is StreamConnected to the Signal stream
|
||||
status Status
|
||||
}
|
||||
|
||||
func (c *Client) GetStatus() Status {
|
||||
return c.status
|
||||
}
|
||||
|
||||
// Close Closes underlying connections to the Signal Exchange
|
||||
@@ -42,7 +57,7 @@ func (c *Client) Close() error {
|
||||
// NewClient creates a new Signal client
|
||||
func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled bool) (*Client, error) {
|
||||
|
||||
transportOption := grpc.WithInsecure()
|
||||
transportOption := grpc.WithTransportCredentials(insecure.NewCredentials())
|
||||
|
||||
if tlsEnabled {
|
||||
transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{}))
|
||||
@@ -65,13 +80,13 @@ func NewClient(ctx context.Context, addr string, key wgtypes.Key, tlsEnabled boo
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
return &Client{
|
||||
realClient: proto.NewSignalExchangeClient(conn),
|
||||
ctx: ctx,
|
||||
signalConn: conn,
|
||||
key: key,
|
||||
connWg: &wg,
|
||||
mux: sync.Mutex{},
|
||||
status: StreamDisconnected,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -81,8 +96,8 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
|
||||
InitialInterval: 800 * time.Millisecond,
|
||||
RandomizationFactor: backoff.DefaultRandomizationFactor,
|
||||
Multiplier: backoff.DefaultMultiplier,
|
||||
MaxInterval: 30 * time.Second,
|
||||
MaxElapsedTime: 24 * 3 * time.Hour, //stop after 3 days trying
|
||||
MaxInterval: 10 * time.Second,
|
||||
MaxElapsedTime: 12 * time.Hour, //stop after 12 hours of trying, the error will be propagated to the general retry of the client
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
@@ -91,36 +106,79 @@ func defaultBackoff(ctx context.Context) backoff.BackOff {
|
||||
|
||||
// Receive Connects to the Signal Exchange message stream and starts receiving messages.
|
||||
// The messages will be handled by msgHandler function provided.
|
||||
// This function runs a goroutine underneath and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
|
||||
// The key is the identifier of our Peer (could be Wireguard public key)
|
||||
func (c *Client) Receive(msgHandler func(msg *proto.Message) error) {
|
||||
c.connWg.Add(1)
|
||||
go func() {
|
||||
// This function is blocking and reconnects to the Signal Exchange if errors occur (e.g. Exchange restart)
|
||||
// The connection retry logic will try to reconnect for 30 min and if wasn't successful will propagate the error to the function caller.
|
||||
func (c *Client) Receive(msgHandler func(msg *proto.Message) error) error {
|
||||
|
||||
var backOff = defaultBackoff(c.ctx)
|
||||
var backOff = defaultBackoff(c.ctx)
|
||||
|
||||
operation := func() error {
|
||||
operation := func() error {
|
||||
|
||||
err := c.connect(c.key.PublicKey().String(), msgHandler)
|
||||
if err != nil {
|
||||
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
||||
c.connWg.Add(1)
|
||||
return err
|
||||
}
|
||||
c.notifyStreamDisconnected()
|
||||
|
||||
backOff.Reset()
|
||||
return nil
|
||||
log.Debugf("signal connection state %v", c.signalConn.GetState())
|
||||
if !c.ready() {
|
||||
return fmt.Errorf("no connection to signal")
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
// connect to Signal stream identifying ourselves with a public Wireguard key
|
||||
// todo once the key rotation logic has been implemented, consider changing to some other identifier (received from management)
|
||||
stream, err := c.connect(c.key.PublicKey().String())
|
||||
if err != nil {
|
||||
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err)
|
||||
return
|
||||
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
c.notifyStreamConnected()
|
||||
|
||||
log.Infof("connected to the Signal Service stream")
|
||||
|
||||
// start receiving messages from the Signal stream (from other peers through signal)
|
||||
err = c.receive(stream, msgHandler)
|
||||
if err != nil {
|
||||
log.Warnf("disconnected from the Signal Exchange due to an error: %v", err)
|
||||
backOff.Reset()
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, backOff)
|
||||
if err != nil {
|
||||
log.Errorf("exiting Signal Service connection retry loop due to unrecoverable error: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
func (c *Client) notifyStreamDisconnected() {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.status = StreamDisconnected
|
||||
}
|
||||
|
||||
func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error) error {
|
||||
func (c *Client) notifyStreamConnected() {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
c.status = StreamConnected
|
||||
if c.connectedCh != nil {
|
||||
// there are goroutines waiting on this channel -> release them
|
||||
close(c.connectedCh)
|
||||
c.connectedCh = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) getStreamStatusChan() <-chan struct{} {
|
||||
c.mux.Lock()
|
||||
defer c.mux.Unlock()
|
||||
if c.connectedCh == nil {
|
||||
c.connectedCh = make(chan struct{})
|
||||
}
|
||||
return c.connectedCh
|
||||
}
|
||||
|
||||
func (c *Client) connect(key string) (proto.SignalExchange_ConnectStreamClient, error) {
|
||||
c.stream = nil
|
||||
|
||||
// add key fingerprint to the request header to be identified on the server side
|
||||
@@ -131,35 +189,48 @@ func (c *Client) connect(key string, msgHandler func(msg *proto.Message) error)
|
||||
|
||||
c.stream = stream
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
// blocks
|
||||
header, err := c.stream.Header()
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
registered := header.Get(proto.HeaderRegistered)
|
||||
if len(registered) == 0 {
|
||||
return fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
|
||||
return nil, fmt.Errorf("didn't receive a registration header from the Signal server whille connecting to the streams")
|
||||
}
|
||||
//connection established we are good to use the stream
|
||||
c.connWg.Done()
|
||||
|
||||
log.Infof("connected to the Signal Exchange Stream")
|
||||
|
||||
return c.receive(stream, msgHandler)
|
||||
return stream, nil
|
||||
}
|
||||
|
||||
// WaitConnected waits until the client is connected to the message stream
|
||||
func (c *Client) WaitConnected() {
|
||||
c.connWg.Wait()
|
||||
// ready indicates whether the client is okay and ready to be used
|
||||
// for now it just checks whether gRPC connection to the service is in state Ready
|
||||
func (c *Client) ready() bool {
|
||||
return c.signalConn.GetState() == connectivity.Ready || c.signalConn.GetState() == connectivity.Idle
|
||||
}
|
||||
|
||||
// WaitStreamConnected waits until the client is connected to the Signal stream
|
||||
func (c *Client) WaitStreamConnected() {
|
||||
|
||||
if c.status == StreamConnected {
|
||||
return
|
||||
}
|
||||
|
||||
ch := c.getStreamStatusChan()
|
||||
select {
|
||||
case <-c.ctx.Done():
|
||||
case <-ch:
|
||||
}
|
||||
}
|
||||
|
||||
// SendToStream sends a message to the remote Peer through the Signal Exchange using established stream connection to the Signal Server
|
||||
// The Client.Receive method must be called before sending messages to establish initial connection to the Signal Exchange
|
||||
// Client.connWg can be used to wait
|
||||
func (c *Client) SendToStream(msg *proto.EncryptedMessage) error {
|
||||
|
||||
if !c.ready() {
|
||||
return fmt.Errorf("no connection to signal")
|
||||
}
|
||||
if c.stream == nil {
|
||||
return fmt.Errorf("connection to the Signal Exchnage has not been established yet. Please call Client.Receive before sending messages")
|
||||
}
|
||||
@@ -216,13 +287,17 @@ func (c *Client) encryptMessage(msg *proto.Message) (*proto.EncryptedMessage, er
|
||||
// Send sends a message to the remote Peer through the Signal Exchange.
|
||||
func (c *Client) Send(msg *proto.Message) error {
|
||||
|
||||
if !c.ready() {
|
||||
return fmt.Errorf("no connection to signal")
|
||||
}
|
||||
|
||||
encryptedMessage, err := c.encryptMessage(msg)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = c.realClient.Send(context.TODO(), encryptedMessage)
|
||||
if err != nil {
|
||||
log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
|
||||
//log.Errorf("error while sending message to peer [%s] [error: %v]", msg.RemoteKey, err)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -239,10 +314,10 @@ func (c *Client) receive(stream proto.SignalExchange_ConnectStreamClient,
|
||||
log.Warnf("stream canceled (usually indicates shutdown)")
|
||||
return err
|
||||
} else if s.Code() == codes.Unavailable {
|
||||
log.Warnf("server has been stopped")
|
||||
log.Warnf("Signal Service is unavailable")
|
||||
return err
|
||||
} else if err == io.EOF {
|
||||
log.Warnf("stream closed by server")
|
||||
log.Warnf("Signal Service stream closed by server")
|
||||
return err
|
||||
} else if err != nil {
|
||||
return err
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"github.com/wiretrustee/wiretrustee/signal/server"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"net"
|
||||
@@ -48,30 +49,42 @@ var _ = Describe("Client", func() {
|
||||
// connect PeerA to Signal
|
||||
keyA, _ := wgtypes.GenerateKey()
|
||||
clientA := createSignalClient(addr, keyA)
|
||||
clientA.Receive(func(msg *sigProto.Message) error {
|
||||
receivedOnA = msg.GetBody().GetPayload()
|
||||
msgReceived.Done()
|
||||
return nil
|
||||
})
|
||||
clientA.WaitConnected()
|
||||
go func() {
|
||||
err := clientA.Receive(func(msg *sigProto.Message) error {
|
||||
receivedOnA = msg.GetBody().GetPayload()
|
||||
msgReceived.Done()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
clientA.WaitStreamConnected()
|
||||
|
||||
// connect PeerB to Signal
|
||||
keyB, _ := wgtypes.GenerateKey()
|
||||
clientB := createSignalClient(addr, keyB)
|
||||
clientB.Receive(func(msg *sigProto.Message) error {
|
||||
receivedOnB = msg.GetBody().GetPayload()
|
||||
err := clientB.Send(&sigProto.Message{
|
||||
Key: keyB.PublicKey().String(),
|
||||
RemoteKey: keyA.PublicKey().String(),
|
||||
Body: &sigProto.Body{Payload: "pong"},
|
||||
|
||||
go func() {
|
||||
err := clientB.Receive(func(msg *sigProto.Message) error {
|
||||
receivedOnB = msg.GetBody().GetPayload()
|
||||
err := clientB.Send(&sigProto.Message{
|
||||
Key: keyB.PublicKey().String(),
|
||||
RemoteKey: keyA.PublicKey().String(),
|
||||
Body: &sigProto.Body{Payload: "pong"},
|
||||
})
|
||||
if err != nil {
|
||||
Fail("failed sending a message to PeerA")
|
||||
}
|
||||
msgReceived.Done()
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
Fail("failed sending a message to PeerA")
|
||||
return
|
||||
}
|
||||
msgReceived.Done()
|
||||
return nil
|
||||
})
|
||||
clientB.WaitConnected()
|
||||
}()
|
||||
|
||||
clientB.WaitStreamConnected()
|
||||
|
||||
// PeerA initiates ping-pong
|
||||
err := clientA.Send(&sigProto.Message{
|
||||
@@ -100,11 +113,15 @@ var _ = Describe("Client", func() {
|
||||
|
||||
key, _ := wgtypes.GenerateKey()
|
||||
client := createSignalClient(addr, key)
|
||||
client.Receive(func(msg *sigProto.Message) error {
|
||||
return nil
|
||||
})
|
||||
client.WaitConnected()
|
||||
|
||||
go func() {
|
||||
err := client.Receive(func(msg *sigProto.Message) error {
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
client.WaitStreamConnected()
|
||||
Expect(client).NotTo(BeNil())
|
||||
})
|
||||
})
|
||||
@@ -154,7 +171,8 @@ func createSignalClient(addr string, key wgtypes.Key) *Client {
|
||||
|
||||
func createRawSignalClient(addr string) sigProto.SignalExchangeClient {
|
||||
ctx := context.Background()
|
||||
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(),
|
||||
conn, err := grpc.DialContext(ctx, addr,
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithBlock(),
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 3 * time.Second,
|
||||
|
||||
@@ -55,7 +55,7 @@ func (registry *Registry) Register(peer *Peer) {
|
||||
// can be that peer already exists but it is fine (e.g. reconnect)
|
||||
// todo investigate what happens to the old peer (especially Peer.Stream) when we override it
|
||||
registry.Peers.Store(peer.Id, peer)
|
||||
log.Printf("registered peer [%s]", peer.Id)
|
||||
log.Debugf("peer registered [%s]", peer.Id)
|
||||
|
||||
}
|
||||
|
||||
@@ -63,7 +63,7 @@ func (registry *Registry) Register(peer *Peer) {
|
||||
func (registry *Registry) Deregister(peer *Peer) {
|
||||
_, loaded := registry.Peers.LoadAndDelete(peer.Id)
|
||||
if loaded {
|
||||
log.Printf("deregistered peer [%s]", peer.Id)
|
||||
log.Debugf("peer deregistered [%s]", peer.Id)
|
||||
} else {
|
||||
log.Warnf("attempted to remove non-existent peer [%s]", peer.Id)
|
||||
}
|
||||
|
||||
@@ -36,11 +36,11 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.
|
||||
//forward the message to the target peer
|
||||
err := dstPeer.Stream.Send(msg)
|
||||
if err != nil {
|
||||
log.Errorf("error while forwarding message from peer [%s] to peer [%s]", msg.Key, msg.RemoteKey)
|
||||
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", msg.Key, msg.RemoteKey, err)
|
||||
//todo respond to the sender?
|
||||
}
|
||||
} else {
|
||||
log.Warnf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
|
||||
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", msg.Key, msg.RemoteKey)
|
||||
//todo respond to the sender?
|
||||
}
|
||||
return &proto.EncryptedMessage{}, nil
|
||||
@@ -48,11 +48,17 @@ func (s *Server) Send(ctx context.Context, msg *proto.EncryptedMessage) (*proto.
|
||||
|
||||
// ConnectStream connects to the exchange stream
|
||||
func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer) error {
|
||||
|
||||
p, err := s.connectPeer(stream)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
log.Infof("peer disconnected [%s] ", p.Id)
|
||||
s.registry.Deregister(p)
|
||||
}()
|
||||
|
||||
//needed to confirm that the peer has been registered so that the client can proceed
|
||||
header := metadata.Pairs(proto.HeaderRegistered, "1")
|
||||
err = stream.SendHeader(header)
|
||||
@@ -60,8 +66,10 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infof("peer [%s] has successfully connected", p.Id)
|
||||
log.Infof("peer connected [%s]", p.Id)
|
||||
|
||||
for {
|
||||
//read incoming messages
|
||||
msg, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
break
|
||||
@@ -74,14 +82,13 @@ func (s *Server) ConnectStream(stream proto.SignalExchange_ConnectStreamServer)
|
||||
//forward the message to the target peer
|
||||
err := dstPeer.Stream.Send(msg)
|
||||
if err != nil {
|
||||
log.Errorf("error while forwarding message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey)
|
||||
log.Errorf("error while forwarding message from peer [%s] to peer [%s] %v", p.Id, msg.RemoteKey, err)
|
||||
//todo respond to the sender?
|
||||
}
|
||||
} else {
|
||||
log.Warnf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)
|
||||
log.Debugf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)
|
||||
//todo respond to the sender?
|
||||
}
|
||||
|
||||
}
|
||||
<-stream.Context().Done()
|
||||
return stream.Context().Err()
|
||||
|
||||
Reference in New Issue
Block a user