mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-26 04:06:38 +00:00
Peer configuration management (#69)
* feature: add config properties to the SyncResponse of the management gRpc service * fix: lint errors * chore: modify management protocol according to the review notes * fix: management proto fields sequence * feature: add proper peer configuration to be synced * chore: minor changes * feature: finalize peer config management * fix: lint errors * feature: add management server config file * refactor: extract hosts-config to a separate file * refactor: review notes applied to correct file_store usage * refactor: extract management service configuration to a file * refactor: simplify management config
This commit is contained in:
@@ -2,7 +2,7 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/wiretrustee/wiretrustee/management/store"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -17,33 +17,38 @@ import (
|
||||
|
||||
// Server an instance of a Management server
|
||||
type Server struct {
|
||||
Store *store.FileStore
|
||||
wgKey wgtypes.Key
|
||||
accountManager *AccountManager
|
||||
wgKey wgtypes.Key
|
||||
proto.UnimplementedManagementServiceServer
|
||||
peerChannels map[string]chan *UpdateChannelMessage
|
||||
channelsMux *sync.Mutex
|
||||
config *Config
|
||||
}
|
||||
|
||||
// AllowedIPsFormat generates Wireguard AllowedIPs format (e.g. 100.30.30.1/32)
|
||||
const AllowedIPsFormat = "%s/32"
|
||||
|
||||
type UpdateChannelMessage struct {
|
||||
Update *proto.SyncResponse
|
||||
}
|
||||
|
||||
// NewServer creates a new Management server
|
||||
func NewServer(dataDir string) (*Server, error) {
|
||||
func NewServer(config *Config) (*Server, error) {
|
||||
key, err := wgtypes.GeneratePrivateKey()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
store, err := store.NewStore(dataDir)
|
||||
store, err := NewStore(config.Datadir)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Server{
|
||||
Store: store,
|
||||
wgKey: key,
|
||||
// peerKey -> event channel
|
||||
peerChannels: make(map[string]chan *UpdateChannelMessage),
|
||||
channelsMux: &sync.Mutex{},
|
||||
peerChannels: make(map[string]chan *UpdateChannelMessage),
|
||||
channelsMux: &sync.Mutex{},
|
||||
accountManager: NewManager(store),
|
||||
config: config,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -73,8 +78,8 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
return status.Errorf(codes.InvalidArgument, "provided wgPubKey %s is invalid", peerKey.String())
|
||||
}
|
||||
|
||||
exists := s.Store.PeerExists(peerKey.String())
|
||||
if !exists {
|
||||
peer, err := s.accountManager.GetPeer(peerKey.String())
|
||||
if err != nil {
|
||||
return status.Errorf(codes.Unauthenticated, "provided peer with the key wgPubKey %s is not registered", peerKey.String())
|
||||
}
|
||||
|
||||
@@ -84,7 +89,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
||||
return status.Errorf(codes.InvalidArgument, "invalid request message")
|
||||
}
|
||||
|
||||
err = s.sendInitialSync(peerKey, srv)
|
||||
err = s.sendInitialSync(peerKey, peer, srv)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -132,28 +137,28 @@ func (s *Server) RegisterPeer(ctx context.Context, req *proto.RegisterPeerReques
|
||||
s.channelsMux.Lock()
|
||||
defer s.channelsMux.Unlock()
|
||||
|
||||
err := s.Store.AddPeer(req.SetupKey, req.Key)
|
||||
peer, err := s.accountManager.AddPeer(req.SetupKey, req.Key)
|
||||
if err != nil {
|
||||
return &proto.RegisterPeerResponse{}, status.Errorf(codes.NotFound, "provided setup key doesn't exists")
|
||||
}
|
||||
|
||||
peers, err := s.Store.GetPeersForAPeer(req.Key)
|
||||
peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
|
||||
if err != nil {
|
||||
//todo return a proper error
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// notify other peers of our registration
|
||||
for _, peer := range peers {
|
||||
if channel, ok := s.peerChannels[peer]; ok {
|
||||
for _, remotePeer := range peers {
|
||||
if channel, ok := s.peerChannels[remotePeer.Key]; ok {
|
||||
// exclude notified peer and add ourselves
|
||||
peersToSend := []string{req.Key}
|
||||
peersToSend := []*Peer{peer}
|
||||
for _, p := range peers {
|
||||
if peer != p {
|
||||
if remotePeer.Key != p.Key {
|
||||
peersToSend = append(peersToSend, p)
|
||||
}
|
||||
}
|
||||
update := &proto.SyncResponse{Peers: peersToSend}
|
||||
update := toSyncResponse(s.config, peer, peersToSend)
|
||||
channel <- &UpdateChannelMessage{Update: update}
|
||||
}
|
||||
}
|
||||
@@ -161,6 +166,73 @@ func (s *Server) RegisterPeer(ctx context.Context, req *proto.RegisterPeerReques
|
||||
return &proto.RegisterPeerResponse{}, nil
|
||||
}
|
||||
|
||||
func toResponseProto(configProto Protocol) proto.HostConfig_Protocol {
|
||||
switch configProto {
|
||||
case UDP:
|
||||
return proto.HostConfig_UDP
|
||||
case DTLS:
|
||||
return proto.HostConfig_DTLS
|
||||
case HTTP:
|
||||
return proto.HostConfig_HTTP
|
||||
case HTTPS:
|
||||
return proto.HostConfig_HTTPS
|
||||
case TCP:
|
||||
return proto.HostConfig_TCP
|
||||
default:
|
||||
//mbragin: todo something better?
|
||||
panic(fmt.Errorf("unexpected config protocol type %v", configProto))
|
||||
}
|
||||
}
|
||||
|
||||
func toSyncResponse(config *Config, peer *Peer, peers []*Peer) *proto.SyncResponse {
|
||||
|
||||
var stuns []*proto.HostConfig
|
||||
for _, stun := range config.Stuns {
|
||||
stuns = append(stuns, &proto.HostConfig{
|
||||
Uri: stun.URI,
|
||||
Protocol: toResponseProto(stun.Proto),
|
||||
})
|
||||
}
|
||||
var turns []*proto.ProtectedHostConfig
|
||||
for _, turn := range config.Turns {
|
||||
turns = append(turns, &proto.ProtectedHostConfig{
|
||||
HostConfig: &proto.HostConfig{
|
||||
Uri: turn.URI,
|
||||
Protocol: toResponseProto(turn.Proto),
|
||||
},
|
||||
User: turn.Username,
|
||||
Password: string(turn.Password),
|
||||
})
|
||||
}
|
||||
|
||||
wtConfig := &proto.WiretrusteeConfig{
|
||||
Stuns: stuns,
|
||||
Turns: turns,
|
||||
Signal: &proto.HostConfig{
|
||||
Uri: config.Signal.URI,
|
||||
Protocol: toResponseProto(config.Signal.Proto),
|
||||
},
|
||||
}
|
||||
|
||||
pConfig := &proto.PeerConfig{
|
||||
Address: peer.IP.String(),
|
||||
}
|
||||
|
||||
remotePeers := make([]*proto.RemotePeerConfig, 0, len(peers))
|
||||
for _, rPeer := range peers {
|
||||
remotePeers = append(remotePeers, &proto.RemotePeerConfig{
|
||||
WgPubKey: rPeer.Key,
|
||||
AllowedIps: []string{fmt.Sprintf(AllowedIPsFormat, rPeer.IP)}, //todo /32
|
||||
})
|
||||
}
|
||||
|
||||
return &proto.SyncResponse{
|
||||
WiretrusteeConfig: wtConfig,
|
||||
PeerConfig: pConfig,
|
||||
RemotePeers: remotePeers,
|
||||
}
|
||||
}
|
||||
|
||||
// IsHealthy indicates whether the service is healthy
|
||||
func (s *Server) IsHealthy(ctx context.Context, req *proto.Empty) (*proto.Empty, error) {
|
||||
return &proto.Empty{}, nil
|
||||
@@ -195,16 +267,14 @@ func (s *Server) closeUpdatesChannel(peerKey string) {
|
||||
}
|
||||
|
||||
// sendInitialSync sends initial proto.SyncResponse to the peer requesting synchronization
|
||||
func (s *Server) sendInitialSync(peerKey wgtypes.Key, srv proto.ManagementService_SyncServer) error {
|
||||
func (s *Server) sendInitialSync(peerKey wgtypes.Key, peer *Peer, srv proto.ManagementService_SyncServer) error {
|
||||
|
||||
peers, err := s.Store.GetPeersForAPeer(peerKey.String())
|
||||
peers, err := s.accountManager.GetPeersForAPeer(peer.Key)
|
||||
if err != nil {
|
||||
log.Warnf("error getting a list of peers for a peer %s", peerKey.String())
|
||||
log.Warnf("error getting a list of peers for a peer %s", peer.Key)
|
||||
return err
|
||||
}
|
||||
plainResp := &proto.SyncResponse{
|
||||
Peers: peers,
|
||||
}
|
||||
plainResp := toSyncResponse(s.config, peer, peers)
|
||||
|
||||
encryptedResp, err := encryption.EncryptMessage(peerKey, s.wgKey, plainResp)
|
||||
if err != nil {
|
||||
|
||||
Reference in New Issue
Block a user