Extend management to sync meta and posture checks with peer (#1727)

* Add method to retrieve peer's applied posture checks

* Add posture checks in server response and update proto messages

* Refactor

* Extends peer metadata synchronization through SyncRequest and propagate posture changes on syncResponse

* Remove account lock

* Pass system info on sync

* Fix tests

* Refactor

* resolve merge

* Evaluate process check on client (#1749)

* implement  server and client sync peer meta alongside mocks

* wip: add check file and process

* Add files to peer metadata for process check

* wip: update peer meta on first sync

* Add files to peer's metadata

* Evaluate process check using files from peer metadata

* Fix panic and append windows path to files

* Fix check network address and files equality

* Evaluate active process on darwin

* Evaluate active process on linux

* Skip processing processes if no paths are set

* Return network map on peer meta-sync and update account peer's

* Update client network map on meta sync

* Get system info with applied checks

* Add windows package

* Remove a network map from sync meta-response

* Update checks proto message

* Keep client checks state and sync meta on checks change

* Evaluate a running process

* skip build for android and ios

* skip check file and process for android and ios

* bump gopsutil version

* fix tests

* move process check to separate os file

* refactor

* evaluate info with checks on receiving management events

* skip meta-update for an old client with no meta-sync support

* Check if peer meta is empty without reflection
This commit is contained in:
Bethuel Mmbaga
2024-04-15 16:00:57 +03:00
committed by GitHub
parent 36582d13aa
commit c6ab215d9d
26 changed files with 1400 additions and 601 deletions

View File

@@ -3,19 +3,21 @@ package client
import (
"io"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"github.com/netbirdio/netbird/client/system"
"github.com/netbirdio/netbird/management/proto"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
)
type Client interface {
io.Closer
Sync(msgHandler func(msg *proto.SyncResponse) error) error
Sync(sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error
GetServerPublicKey() (*wgtypes.Key, error)
Register(serverKey wgtypes.Key, setupKey string, jwtToken string, sysInfo *system.Info, sshKey []byte) (*proto.LoginResponse, error)
Login(serverKey wgtypes.Key, sysInfo *system.Info, sshKey []byte) (*proto.LoginResponse, error)
GetDeviceAuthorizationFlow(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
GetNetworkMap() (*proto.NetworkMap, error)
GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error)
IsHealthy() bool
SyncMeta(sysInfo *system.Info) error
}

View File

@@ -255,7 +255,7 @@ func TestClient_Sync(t *testing.T) {
ch := make(chan *mgmtProto.SyncResponse, 1)
go func() {
err = client.Sync(func(msg *mgmtProto.SyncResponse) error {
err = client.Sync(info, func(msg *mgmtProto.SyncResponse) error {
ch <- msg
return nil
})

View File

@@ -113,7 +113,7 @@ func (c *GrpcClient) ready() bool {
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
// Blocking request. The result will be sent via msgHandler callback function
func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
func (c *GrpcClient) Sync(sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
backOff := defaultBackoff(c.ctx)
operation := func() error {
@@ -135,7 +135,7 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error
ctx, cancelStream := context.WithCancel(c.ctx)
defer cancelStream()
stream, err := c.connectToStream(ctx, *serverPubKey)
stream, err := c.connectToStream(ctx, *serverPubKey, sysInfo)
if err != nil {
log.Debugf("failed to open Management Service stream: %s", err)
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
@@ -177,7 +177,7 @@ func (c *GrpcClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error
}
// GetNetworkMap return with the network map
func (c *GrpcClient) GetNetworkMap() (*proto.NetworkMap, error) {
func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, error) {
serverPubKey, err := c.GetServerPublicKey()
if err != nil {
log.Debugf("failed getting Management Service public key: %s", err)
@@ -186,7 +186,7 @@ func (c *GrpcClient) GetNetworkMap() (*proto.NetworkMap, error) {
ctx, cancelStream := context.WithCancel(c.ctx)
defer cancelStream()
stream, err := c.connectToStream(ctx, *serverPubKey)
stream, err := c.connectToStream(ctx, *serverPubKey, sysInfo)
if err != nil {
log.Debugf("failed to open Management Service stream: %s", err)
return nil, err
@@ -219,8 +219,8 @@ func (c *GrpcClient) GetNetworkMap() (*proto.NetworkMap, error) {
return decryptedResp.GetNetworkMap(), nil
}
func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.Key) (proto.ManagementService_SyncClient, error) {
req := &proto.SyncRequest{}
func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
req := &proto.SyncRequest{Meta: infoToMetaData(sysInfo)}
myPrivateKey := c.key
myPublicKey := myPrivateKey.PublicKey()
@@ -430,6 +430,35 @@ func (c *GrpcClient) GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKC
return flowInfoResp, nil
}
// SyncMeta sends updated system metadata to the Management Service.
// It should be used if there is changes on peer posture check after initial sync.
func (c *GrpcClient) SyncMeta(sysInfo *system.Info) error {
if !c.ready() {
return fmt.Errorf("no connection to management")
}
serverPubKey, err := c.GetServerPublicKey()
if err != nil {
log.Debugf("failed getting Management Service public key: %s", err)
return err
}
syncMetaReq, err := encryption.EncryptMessage(*serverPubKey, c.key, &proto.SyncMetaRequest{Meta: infoToMetaData(sysInfo)})
if err != nil {
log.Errorf("failed to encrypt message: %s", err)
return err
}
mgmCtx, cancel := context.WithTimeout(c.ctx, ConnectTimeout)
defer cancel()
_, err = c.realClient.SyncMeta(mgmCtx, &proto.EncryptedMessage{
WgPubKey: c.key.PublicKey().String(),
Body: syncMetaReq,
})
return err
}
func (c *GrpcClient) notifyDisconnected(err error) {
c.connStateCallbackLock.RLock()
defer c.connStateCallbackLock.RUnlock()
@@ -463,6 +492,15 @@ func infoToMetaData(info *system.Info) *proto.PeerSystemMeta {
})
}
files := make([]*proto.File, 0, len(info.Files))
for _, file := range info.Files {
files = append(files, &proto.File{
Path: file.Path,
Exist: file.Exist,
ProcessIsRunning: file.ProcessIsRunning,
})
}
return &proto.PeerSystemMeta{
Hostname: info.Hostname,
GoOS: info.GoOS,
@@ -482,5 +520,6 @@ func infoToMetaData(info *system.Info) *proto.PeerSystemMeta {
Cloud: info.Environment.Cloud,
Platform: info.Environment.Platform,
},
Files: files,
}
}

View File

@@ -9,12 +9,13 @@ import (
type MockClient struct {
CloseFunc func() error
SyncFunc func(msgHandler func(msg *proto.SyncResponse) error) error
SyncFunc func(sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error
GetServerPublicKeyFunc func() (*wgtypes.Key, error)
RegisterFunc func(serverKey wgtypes.Key, setupKey string, jwtToken string, info *system.Info, sshKey []byte) (*proto.LoginResponse, error)
LoginFunc func(serverKey wgtypes.Key, info *system.Info, sshKey []byte) (*proto.LoginResponse, error)
GetDeviceAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.DeviceAuthorizationFlow, error)
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
SyncMetaFunc func(sysInfo *system.Info) error
}
func (m *MockClient) IsHealthy() bool {
@@ -28,11 +29,11 @@ func (m *MockClient) Close() error {
return m.CloseFunc()
}
func (m *MockClient) Sync(msgHandler func(msg *proto.SyncResponse) error) error {
func (m *MockClient) Sync(sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
if m.SyncFunc == nil {
return nil
}
return m.SyncFunc(msgHandler)
return m.SyncFunc(sysInfo, msgHandler)
}
func (m *MockClient) GetServerPublicKey() (*wgtypes.Key, error) {
@@ -71,6 +72,13 @@ func (m *MockClient) GetPKCEAuthorizationFlow(serverKey wgtypes.Key) (*proto.PKC
}
// GetNetworkMap mock implementation of GetNetworkMap from mgm.Client interface
func (m *MockClient) GetNetworkMap() (*proto.NetworkMap, error) {
func (m *MockClient) GetNetworkMap(_ *system.Info) (*proto.NetworkMap, error) {
return nil, nil
}
func (m *MockClient) SyncMeta(sysInfo *system.Info) error {
if m.SyncMetaFunc == nil {
return nil
}
return m.SyncMetaFunc(sysInfo)
}