mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
feat: add signal websocket support as an alternative to gRPC
This commit is contained in:
11
go.mod
11
go.mod
@@ -8,6 +8,7 @@ require (
|
|||||||
github.com/golang/protobuf v1.5.2
|
github.com/golang/protobuf v1.5.2
|
||||||
github.com/google/uuid v1.2.0
|
github.com/google/uuid v1.2.0
|
||||||
github.com/gorilla/mux v1.8.0
|
github.com/gorilla/mux v1.8.0
|
||||||
|
github.com/gorilla/websocket v1.4.2
|
||||||
github.com/kardianos/service v1.2.1-0.20210728001519-a323c3813bc7
|
github.com/kardianos/service v1.2.1-0.20210728001519-a323c3813bc7
|
||||||
github.com/onsi/ginkgo v1.16.4
|
github.com/onsi/ginkgo v1.16.4
|
||||||
github.com/onsi/gomega v1.13.0
|
github.com/onsi/gomega v1.13.0
|
||||||
@@ -16,12 +17,14 @@ require (
|
|||||||
github.com/sirupsen/logrus v1.7.0
|
github.com/sirupsen/logrus v1.7.0
|
||||||
github.com/spf13/cobra v1.1.3
|
github.com/spf13/cobra v1.1.3
|
||||||
github.com/vishvananda/netlink v1.1.0
|
github.com/vishvananda/netlink v1.1.0
|
||||||
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97
|
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519
|
||||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c
|
golang.org/x/sys v0.0.0-20211020174200-9d6173849985
|
||||||
golang.zx2c4.com/wireguard v0.0.0-20210805125648-3957e9b9dd19
|
golang.zx2c4.com/wireguard v0.0.0-20211026125340-e42c6c4bc2d0
|
||||||
|
golang.zx2c4.com/wireguard/tun/netstack v0.0.0-20211026125340-e42c6c4bc2d0 // indirect
|
||||||
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210803171230-4253848d036c
|
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20210803171230-4253848d036c
|
||||||
golang.zx2c4.com/wireguard/windows v0.4.5
|
golang.zx2c4.com/wireguard/windows v0.4.5
|
||||||
google.golang.org/grpc v1.32.0
|
google.golang.org/grpc v1.39.0-dev.0.20210518002758-2713b77e8526
|
||||||
google.golang.org/protobuf v1.26.0
|
google.golang.org/protobuf v1.26.0
|
||||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||||
|
nhooyr.io/websocket v1.8.7
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,20 +1,21 @@
|
|||||||
package cmd
|
package cmd
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/spf13/cobra"
|
"github.com/spf13/cobra"
|
||||||
"github.com/wiretrustee/wiretrustee/encryption"
|
"github.com/wiretrustee/wiretrustee/encryption"
|
||||||
|
"github.com/wiretrustee/wiretrustee/signal/peer"
|
||||||
"github.com/wiretrustee/wiretrustee/signal/proto"
|
"github.com/wiretrustee/wiretrustee/signal/proto"
|
||||||
"github.com/wiretrustee/wiretrustee/signal/server"
|
"github.com/wiretrustee/wiretrustee/signal/server"
|
||||||
|
"github.com/wiretrustee/wiretrustee/signal/server/http"
|
||||||
"github.com/wiretrustee/wiretrustee/util"
|
"github.com/wiretrustee/wiretrustee/util"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -45,25 +46,20 @@ var (
|
|||||||
log.Fatalf("failed initializing log %v", err)
|
log.Fatalf("failed initializing log %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
registry := peer.NewRegistry()
|
||||||
|
|
||||||
var opts []grpc.ServerOption
|
var opts []grpc.ServerOption
|
||||||
|
var httpServer *http.Server
|
||||||
if signalLetsencryptDomain != "" {
|
if signalLetsencryptDomain != "" {
|
||||||
if _, err := os.Stat(signalSSLDir); os.IsNotExist(err) {
|
|
||||||
err = os.MkdirAll(signalSSLDir, os.ModeDir)
|
//automatically generate a new certificate with Let's Encrypt
|
||||||
if err != nil {
|
|
||||||
log.Fatalf("failed creating datadir: %s: %v", signalSSLDir, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
certManager := encryption.CreateCertManager(signalSSLDir, signalLetsencryptDomain)
|
certManager := encryption.CreateCertManager(signalSSLDir, signalLetsencryptDomain)
|
||||||
transportCredentials := credentials.NewTLS(certManager.TLSConfig())
|
transportCredentials := credentials.NewTLS(certManager.TLSConfig())
|
||||||
opts = append(opts, grpc.Creds(transportCredentials))
|
opts = append(opts, grpc.Creds(transportCredentials))
|
||||||
|
|
||||||
listener := certManager.Listener()
|
httpServer = http.NewHttpsServer("0.0.0.0:443", certManager, registry)
|
||||||
log.Infof("http server listening on %s", listener.Addr())
|
} else {
|
||||||
go func() {
|
httpServer = http.NewHttpServer("0.0.0.0:80", registry)
|
||||||
if err := http.Serve(listener, certManager.HTTPHandler(nil)); err != nil {
|
|
||||||
log.Errorf("failed to serve https server: %v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
opts = append(opts, signalKaep, signalKasp)
|
opts = append(opts, signalKaep, signalKasp)
|
||||||
@@ -78,15 +74,34 @@ var (
|
|||||||
log.Fatalf("failed to listen: %v", err)
|
log.Fatalf("failed to listen: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
proto.RegisterSignalExchangeServer(grpcServer, server.NewServer())
|
proto.RegisterSignalExchangeServer(grpcServer, server.NewServer(registry))
|
||||||
log.Printf("started server: localhost:%v", signalPort)
|
log.Printf("gRPC server listening on 0.0.0.0:%v", signalPort)
|
||||||
if err := grpcServer.Serve(lis); err != nil {
|
|
||||||
log.Fatalf("failed to serve: %v", err)
|
go func() {
|
||||||
}
|
if err := grpcServer.Serve(lis); err != nil {
|
||||||
|
log.Fatalf("failed to serve: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
err = httpServer.Start()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed to serve http server: %v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
SetupCloseHandler()
|
SetupCloseHandler()
|
||||||
<-stopCh
|
<-stopCh
|
||||||
log.Println("Receive signal to stop running the Signal server")
|
log.Println("received signal to stop running the Signal server")
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
err = httpServer.Stop(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatalf("failed stopping the http server %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
grpcServer.Stop()
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -1,22 +1,41 @@
|
|||||||
package peer
|
package peer
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"github.com/wiretrustee/wiretrustee/signal/proto"
|
"github.com/wiretrustee/wiretrustee/signal/proto"
|
||||||
"sync"
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
//Channel abstracts transport that Peer is using to communicate with teh Signal server.
|
||||||
|
//There are 2 types channels so far: gRPC- and websocket-based.
|
||||||
|
type Channel interface {
|
||||||
|
Send(msg *proto.EncryptedMessage) error
|
||||||
|
}
|
||||||
|
|
||||||
|
type WebsocketChannel struct {
|
||||||
|
conn *websocket.Conn
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewWebsocketChannel(conn *websocket.Conn) *WebsocketChannel {
|
||||||
|
return &WebsocketChannel{conn: conn}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *WebsocketChannel) Send(msg *proto.EncryptedMessage) error {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Peer representation of a connected Peer
|
// Peer representation of a connected Peer
|
||||||
type Peer struct {
|
type Peer struct {
|
||||||
// a unique id of the Peer (e.g. sha256 fingerprint of the Wireguard public key)
|
// a unique id of the Peer (e.g. sha256 fingerprint of the Wireguard public key)
|
||||||
Id string
|
Id string
|
||||||
|
|
||||||
//a gRpc connection stream to the Peer
|
//a connection stream to the Peer (gRPC or websocket)
|
||||||
Stream proto.SignalExchange_ConnectStreamServer
|
Stream Channel
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPeer creates a new instance of a connected Peer
|
// NewPeer creates a new instance of a Peer connected with gRPC
|
||||||
func NewPeer(id string, stream proto.SignalExchange_ConnectStreamServer) *Peer {
|
func NewPeer(id string, stream Channel) *Peer {
|
||||||
return &Peer{
|
return &Peer{
|
||||||
Id: id,
|
Id: id,
|
||||||
Stream: stream,
|
Stream: stream,
|
||||||
|
|||||||
147
signal/server/http/server.go
Normal file
147
signal/server/http/server.go
Normal file
@@ -0,0 +1,147 @@
|
|||||||
|
package http
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/base64"
|
||||||
|
pb "github.com/golang/protobuf/proto" //nolint
|
||||||
|
"github.com/gorilla/mux"
|
||||||
|
"github.com/gorilla/websocket"
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
"github.com/wiretrustee/wiretrustee/signal/peer"
|
||||||
|
"github.com/wiretrustee/wiretrustee/signal/proto"
|
||||||
|
"golang.org/x/crypto/acme/autocert"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Server struct {
|
||||||
|
server *http.Server
|
||||||
|
certManager *autocert.Manager
|
||||||
|
registry *peer.Registry
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHttpsServer(addr string, certManager *autocert.Manager, registry *peer.Registry) *Server {
|
||||||
|
|
||||||
|
server := &http.Server{
|
||||||
|
Addr: addr,
|
||||||
|
WriteTimeout: time.Second * 15,
|
||||||
|
ReadTimeout: time.Second * 15,
|
||||||
|
IdleTimeout: time.Second * 60,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Server{
|
||||||
|
server: server,
|
||||||
|
certManager: certManager,
|
||||||
|
registry: registry,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewHttpServer(addr string, registry *peer.Registry) *Server {
|
||||||
|
return NewHttpsServer(addr, nil, registry)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the http server
|
||||||
|
func (s *Server) Stop(ctx context.Context) error {
|
||||||
|
err := s.server.Shutdown(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) Start() error {
|
||||||
|
|
||||||
|
r := mux.NewRouter()
|
||||||
|
|
||||||
|
r.HandleFunc("/signal", func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.serveWs(w, r)
|
||||||
|
})
|
||||||
|
|
||||||
|
http.Handle("/", r)
|
||||||
|
|
||||||
|
if s.certManager != nil {
|
||||||
|
// if HTTPS is enabled we reuse the listener from the cert manager
|
||||||
|
listener := s.certManager.Listener()
|
||||||
|
log.Infof("HTTPs server listening on %s with Let's Encrypt autocert configured", listener.Addr())
|
||||||
|
if err := http.Serve(listener, s.certManager.HTTPHandler(r)); err != nil {
|
||||||
|
log.Errorf("failed to serve https server: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Infof("HTTP server listening on %s", s.server.Addr)
|
||||||
|
if err := s.server.ListenAndServe(); err != nil {
|
||||||
|
log.Errorf("failed to serve http server: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// serveWs handles websocket requests from the peer.
|
||||||
|
func (s *Server) serveWs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
upgrader := websocket.Upgrader{
|
||||||
|
CheckOrigin: func(r *http.Request) bool {
|
||||||
|
return true //TODO not good to allow everything
|
||||||
|
},
|
||||||
|
}
|
||||||
|
conn, err := upgrader.Upgrade(w, r, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed upgrading Websocket request %v", err)
|
||||||
|
//http.Error(w, "failed upgrading Websocket request", http.StatusInternalServerError)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
params := r.URL.Query()
|
||||||
|
peerId := params.Get("id")
|
||||||
|
if peerId == "" {
|
||||||
|
log.Warn("required Websocket query id parameter is missing")
|
||||||
|
//http.Error(w, "required Websocket query id parameter is missing", http.StatusBadRequest)
|
||||||
|
conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
decodeString, err := base64.URLEncoding.DecodeString(peerId)
|
||||||
|
if err != nil {
|
||||||
|
conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
channel := peer.NewWebsocketChannel(conn)
|
||||||
|
p := peer.NewPeer(string(decodeString), channel)
|
||||||
|
s.registry.Register(p)
|
||||||
|
|
||||||
|
defer func() {
|
||||||
|
s.registry.Deregister(p)
|
||||||
|
conn.Close()
|
||||||
|
}()
|
||||||
|
|
||||||
|
conn.SetReadLimit(1024 * 1024 * 3)
|
||||||
|
for {
|
||||||
|
t, byteMsg, err := conn.ReadMessage()
|
||||||
|
if err != nil {
|
||||||
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
||||||
|
log.Printf("error: %v", err, t)
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
msg := &proto.EncryptedMessage{}
|
||||||
|
err = pb.Unmarshal(byteMsg, msg)
|
||||||
|
if err != nil {
|
||||||
|
//todo
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if dstPeer, found := s.registry.Get(msg.RemoteKey); found {
|
||||||
|
//forward the message to the target peer
|
||||||
|
err := dstPeer.Stream.Send(msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("error while forwarding message from peer [%s] to peer [%s]", p.Id, msg.RemoteKey)
|
||||||
|
//todo respond to the sender?
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Warnf("message from peer [%s] can't be forwarded to peer [%s] because destination peer is not connected", p.Id, msg.RemoteKey)
|
||||||
|
//todo respond to the sender?
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -19,9 +19,9 @@ type Server struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new Signal server
|
// NewServer creates a new Signal server
|
||||||
func NewServer() *Server {
|
func NewServer(registry *peer.Registry) *Server {
|
||||||
return &Server{
|
return &Server{
|
||||||
registry: peer.NewRegistry(),
|
registry: registry,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user