mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
[management/client] create job channel between management and client (#4367)
* new bi-directional stream for jobs * create bidirectional job channel to send requests from the server and receive responses from the client * fix tests * fix lint and close bug * fix lint * clean up & fix close of closed channel * add nolint:staticcheck * remove some redundant code from the job channel PR since this one is a cleaner rewrite * cleanup removes a pending job safely * change proto * rename to jobRequest * apply feedback 1 * apply feedback 2 * fix typo * apply feedback 3 * apply last feedback
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
type Client interface {
|
||||
io.Closer
|
||||
Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error
|
||||
Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
|
||||
GetServerPublicKey() (*wgtypes.Key, error)
|
||||
Register(serverKey wgtypes.Key, setupKey string, jwtToken string, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
|
||||
Login(serverKey wgtypes.Key, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
|
||||
|
||||
@@ -71,6 +71,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
||||
t.Cleanup(cleanUp)
|
||||
|
||||
peersUpdateManager := mgmt.NewPeersUpdateManager(nil)
|
||||
jobManager := mgmt.NewJobManager(nil, store)
|
||||
eventStore := &activity.InMemoryEventStore{}
|
||||
ia, _ := integrations.NewIntegratedValidator(context.Background(), eventStore)
|
||||
|
||||
@@ -108,7 +109,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
||||
Return(true, nil).
|
||||
AnyTimes()
|
||||
|
||||
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
||||
accountManager, err := mgmt.BuildManager(context.Background(), store, peersUpdateManager, jobManager, nil, "", "netbird.selfhosted", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@@ -116,7 +117,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
||||
groupsManager := groups.NewManagerMock()
|
||||
|
||||
secretsManager := mgmt.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
||||
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, nil, nil, mgmt.MockIntegratedValidator{})
|
||||
mgmtServer, err := mgmt.NewServer(context.Background(), config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, nil, nil, mgmt.MockIntegratedValidator{})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
@@ -12,6 +12,7 @@ import (
|
||||
gstatus "google.golang.org/grpc/status"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
"google.golang.org/grpc"
|
||||
@@ -111,6 +112,25 @@ func (c *GrpcClient) ready() bool {
|
||||
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
|
||||
// Blocking request. The result will be sent via msgHandler callback function
|
||||
func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
|
||||
return c.handleSyncStream(ctx, serverPubKey, sysInfo, msgHandler)
|
||||
})
|
||||
}
|
||||
|
||||
// Job wraps the real client's Job endpoint call and takes care of retries and encryption/decryption of messages
|
||||
// Blocking request. The result will be sent via msgHandler callback function
|
||||
func (c *GrpcClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
|
||||
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
|
||||
return c.handleJobStream(ctx, serverPubKey, msgHandler)
|
||||
})
|
||||
}
|
||||
|
||||
// withMgmtStream runs a streaming operation against the ManagementService
|
||||
// It takes care of retries, connection readiness, and fetching server public key.
|
||||
func (c *GrpcClient) withMgmtStream(
|
||||
ctx context.Context,
|
||||
handler func(ctx context.Context, serverPubKey wgtypes.Key) error,
|
||||
) error {
|
||||
operation := func() error {
|
||||
log.Debugf("management connection state %v", c.conn.GetState())
|
||||
connState := c.conn.GetState()
|
||||
@@ -128,7 +148,7 @@ func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
|
||||
return err
|
||||
}
|
||||
|
||||
return c.handleStream(ctx, *serverPubKey, sysInfo, msgHandler)
|
||||
return handler(ctx, *serverPubKey)
|
||||
}
|
||||
|
||||
err := backoff.Retry(operation, defaultBackoff(ctx))
|
||||
@@ -139,12 +159,132 @@ func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *GrpcClient) handleStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info,
|
||||
msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
func (c *GrpcClient) handleJobStream(
|
||||
ctx context.Context,
|
||||
serverPubKey wgtypes.Key,
|
||||
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
|
||||
) error {
|
||||
stream, err := c.realClient.Job(ctx)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to open job stream: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Handshake with the server
|
||||
if err := c.sendHandshake(ctx, stream, serverPubKey); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debug("job stream handshake sent successfully")
|
||||
|
||||
// Main loop: receive, process, respond
|
||||
for {
|
||||
jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey)
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
|
||||
log.WithContext(ctx).Info("job stream closed by server")
|
||||
return nil
|
||||
}
|
||||
log.WithContext(ctx).Errorf("error receiving job request: %v", err)
|
||||
return err
|
||||
}
|
||||
|
||||
if jobReq == nil || len(jobReq.ID) == 0 {
|
||||
log.WithContext(ctx).Debug("received unknown or empty job request, skipping")
|
||||
continue
|
||||
}
|
||||
|
||||
jobResp := c.processJobRequest(ctx, jobReq, msgHandler)
|
||||
if err := c.sendJobResponse(ctx, stream, serverPubKey, jobResp); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendHandshake sends the initial handshake message
|
||||
func (c *GrpcClient) sendHandshake(ctx context.Context, stream proto.ManagementService_JobClient, serverPubKey wgtypes.Key) error {
|
||||
handshakeReq := &proto.JobRequest{
|
||||
ID: []byte(uuid.New().String()),
|
||||
}
|
||||
encHello, err := encryption.EncryptMessage(serverPubKey, c.key, handshakeReq)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to encrypt handshake message: %v", err)
|
||||
return err
|
||||
}
|
||||
return stream.Send(&proto.EncryptedMessage{
|
||||
WgPubKey: c.key.PublicKey().String(),
|
||||
Body: encHello,
|
||||
})
|
||||
}
|
||||
|
||||
// receiveJobRequest waits for and decrypts a job request
|
||||
func (c *GrpcClient) receiveJobRequest(
|
||||
ctx context.Context,
|
||||
stream proto.ManagementService_JobClient,
|
||||
serverPubKey wgtypes.Key,
|
||||
) (*proto.JobRequest, error) {
|
||||
encryptedMsg, err := stream.Recv()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
jobReq := &proto.JobRequest{}
|
||||
if err := encryption.DecryptMessage(serverPubKey, c.key, encryptedMsg.Body, jobReq); err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to decrypt job request: %v", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return jobReq, nil
|
||||
}
|
||||
|
||||
// processJobRequest executes the handler and ensures a valid response
|
||||
func (c *GrpcClient) processJobRequest(
|
||||
ctx context.Context,
|
||||
jobReq *proto.JobRequest,
|
||||
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
|
||||
) *proto.JobResponse {
|
||||
jobResp := msgHandler(jobReq)
|
||||
if jobResp == nil {
|
||||
jobResp = &proto.JobResponse{
|
||||
ID: jobReq.ID,
|
||||
Status: proto.JobStatus_failed,
|
||||
Reason: []byte("handler returned nil response"),
|
||||
}
|
||||
log.WithContext(ctx).Warnf("job handler returned nil for job %s", string(jobReq.ID))
|
||||
}
|
||||
return jobResp
|
||||
}
|
||||
|
||||
// sendJobResponse encrypts and sends a job response
|
||||
func (c *GrpcClient) sendJobResponse(
|
||||
ctx context.Context,
|
||||
stream proto.ManagementService_JobClient,
|
||||
serverPubKey wgtypes.Key,
|
||||
resp *proto.JobResponse,
|
||||
) error {
|
||||
encResp, err := encryption.EncryptMessage(serverPubKey, c.key, resp)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to encrypt job response for job %s: %v", string(resp.ID), err)
|
||||
return err
|
||||
}
|
||||
|
||||
if err := stream.Send(&proto.EncryptedMessage{
|
||||
WgPubKey: c.key.PublicKey().String(),
|
||||
Body: encResp,
|
||||
}); err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to send job response for job %s: %v", string(resp.ID), err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Debugf("job response sent successfully for job %s", string(resp.ID))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
ctx, cancelStream := context.WithCancel(ctx)
|
||||
defer cancelStream()
|
||||
|
||||
stream, err := c.connectToStream(ctx, serverPubKey, sysInfo)
|
||||
stream, err := c.connectToSyncStream(ctx, serverPubKey, sysInfo)
|
||||
if err != nil {
|
||||
log.Debugf("failed to open Management Service stream: %s", err)
|
||||
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
|
||||
@@ -157,7 +297,7 @@ func (c *GrpcClient) handleStream(ctx context.Context, serverPubKey wgtypes.Key,
|
||||
c.notifyConnected()
|
||||
|
||||
// blocking until error
|
||||
err = c.receiveEvents(stream, serverPubKey, msgHandler)
|
||||
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
|
||||
if err != nil {
|
||||
c.notifyDisconnected(err)
|
||||
s, _ := gstatus.FromError(err)
|
||||
@@ -186,7 +326,7 @@ func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, err
|
||||
|
||||
ctx, cancelStream := context.WithCancel(c.ctx)
|
||||
defer cancelStream()
|
||||
stream, err := c.connectToStream(ctx, *serverPubKey, sysInfo)
|
||||
stream, err := c.connectToSyncStream(ctx, *serverPubKey, sysInfo)
|
||||
if err != nil {
|
||||
log.Debugf("failed to open Management Service stream: %s", err)
|
||||
return nil, err
|
||||
@@ -219,7 +359,7 @@ func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, err
|
||||
return decryptedResp.GetNetworkMap(), nil
|
||||
}
|
||||
|
||||
func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
|
||||
func (c *GrpcClient) connectToSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
|
||||
req := &proto.SyncRequest{Meta: infoToMetaData(sysInfo)}
|
||||
|
||||
myPrivateKey := c.key
|
||||
@@ -238,7 +378,7 @@ func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.K
|
||||
return sync, nil
|
||||
}
|
||||
|
||||
func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
func (c *GrpcClient) receiveUpdatesEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
|
||||
for {
|
||||
update, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
|
||||
@@ -20,6 +20,7 @@ type MockClient struct {
|
||||
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
|
||||
SyncMetaFunc func(sysInfo *system.Info) error
|
||||
LogoutFunc func() error
|
||||
JobFunc func(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
|
||||
}
|
||||
|
||||
func (m *MockClient) IsHealthy() bool {
|
||||
@@ -40,6 +41,13 @@ func (m *MockClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
|
||||
return m.SyncFunc(ctx, sysInfo, msgHandler)
|
||||
}
|
||||
|
||||
func (m *MockClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
|
||||
if m.JobFunc == nil {
|
||||
return nil
|
||||
}
|
||||
return m.JobFunc(ctx, msgHandler)
|
||||
}
|
||||
|
||||
func (m *MockClient) GetServerPublicKey() (*wgtypes.Key, error) {
|
||||
if m.GetServerPublicKeyFunc == nil {
|
||||
return nil, nil
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -48,6 +48,9 @@ service ManagementService {
|
||||
|
||||
// Logout logs out the peer and removes it from the management server
|
||||
rpc Logout(EncryptedMessage) returns (Empty) {}
|
||||
|
||||
// Executes a job on a target peer (e.g., debug bundle)
|
||||
rpc Job(stream EncryptedMessage) returns (stream EncryptedMessage) {}
|
||||
}
|
||||
|
||||
message EncryptedMessage {
|
||||
@@ -60,6 +63,42 @@ message EncryptedMessage {
|
||||
int32 version = 3;
|
||||
}
|
||||
|
||||
message JobRequest {
|
||||
bytes ID = 1;
|
||||
|
||||
oneof workload_parameters {
|
||||
BundleParameters bundle = 10;
|
||||
//OtherParameters other = 11;
|
||||
}
|
||||
}
|
||||
|
||||
enum JobStatus {
|
||||
unknown_status = 0; //placeholder
|
||||
succeeded = 1;
|
||||
failed = 2;
|
||||
}
|
||||
|
||||
message JobResponse{
|
||||
bytes ID = 1;
|
||||
JobStatus status=2;
|
||||
bytes Reason=3;
|
||||
oneof workload_results {
|
||||
BundleResult bundle = 10;
|
||||
//OtherResult other = 11;
|
||||
}
|
||||
}
|
||||
|
||||
message BundleParameters {
|
||||
bool bundle_for = 1;
|
||||
int64 bundle_for_time = 2;
|
||||
int32 log_file_count = 3;
|
||||
bool anonymize = 4;
|
||||
}
|
||||
|
||||
message BundleResult {
|
||||
string upload_key = 1;
|
||||
}
|
||||
|
||||
message SyncRequest {
|
||||
// Meta data of the peer
|
||||
PeerSystemMeta meta = 1;
|
||||
|
||||
@@ -50,6 +50,8 @@ type ManagementServiceClient interface {
|
||||
SyncMeta(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
|
||||
// Logout logs out the peer and removes it from the management server
|
||||
Logout(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
|
||||
// Executes a job on a target peer (e.g., debug bundle)
|
||||
Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error)
|
||||
}
|
||||
|
||||
type managementServiceClient struct {
|
||||
@@ -155,6 +157,37 @@ func (c *managementServiceClient) Logout(ctx context.Context, in *EncryptedMessa
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *managementServiceClient) Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, &ManagementService_ServiceDesc.Streams[1], "/management.ManagementService/Job", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &managementServiceJobClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type ManagementService_JobClient interface {
|
||||
Send(*EncryptedMessage) error
|
||||
Recv() (*EncryptedMessage, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type managementServiceJobClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *managementServiceJobClient) Send(m *EncryptedMessage) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *managementServiceJobClient) Recv() (*EncryptedMessage, error) {
|
||||
m := new(EncryptedMessage)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// ManagementServiceServer is the server API for ManagementService service.
|
||||
// All implementations must embed UnimplementedManagementServiceServer
|
||||
// for forward compatibility
|
||||
@@ -191,6 +224,8 @@ type ManagementServiceServer interface {
|
||||
SyncMeta(context.Context, *EncryptedMessage) (*Empty, error)
|
||||
// Logout logs out the peer and removes it from the management server
|
||||
Logout(context.Context, *EncryptedMessage) (*Empty, error)
|
||||
// Executes a job on a target peer (e.g., debug bundle)
|
||||
Job(ManagementService_JobServer) error
|
||||
mustEmbedUnimplementedManagementServiceServer()
|
||||
}
|
||||
|
||||
@@ -222,6 +257,9 @@ func (UnimplementedManagementServiceServer) SyncMeta(context.Context, *Encrypted
|
||||
func (UnimplementedManagementServiceServer) Logout(context.Context, *EncryptedMessage) (*Empty, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Logout not implemented")
|
||||
}
|
||||
func (UnimplementedManagementServiceServer) Job(ManagementService_JobServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Job not implemented")
|
||||
}
|
||||
func (UnimplementedManagementServiceServer) mustEmbedUnimplementedManagementServiceServer() {}
|
||||
|
||||
// UnsafeManagementServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
@@ -382,6 +420,32 @@ func _ManagementService_Logout_Handler(srv interface{}, ctx context.Context, dec
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ManagementService_Job_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(ManagementServiceServer).Job(&managementServiceJobServer{stream})
|
||||
}
|
||||
|
||||
type ManagementService_JobServer interface {
|
||||
Send(*EncryptedMessage) error
|
||||
Recv() (*EncryptedMessage, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type managementServiceJobServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *managementServiceJobServer) Send(m *EncryptedMessage) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *managementServiceJobServer) Recv() (*EncryptedMessage, error) {
|
||||
m := new(EncryptedMessage)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// ManagementService_ServiceDesc is the grpc.ServiceDesc for ManagementService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
@@ -424,6 +488,12 @@ var ManagementService_ServiceDesc = grpc.ServiceDesc{
|
||||
Handler: _ManagementService_Sync_Handler,
|
||||
ServerStreams: true,
|
||||
},
|
||||
{
|
||||
StreamName: "Job",
|
||||
Handler: _ManagementService_Job_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "management.proto",
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user