diff --git a/client/internal/engine.go b/client/internal/engine.go index 14f5e9ae8..f25801fb6 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -22,6 +22,8 @@ import ( log "github.com/sirupsen/logrus" "golang.zx2c4.com/wireguard/tun/netstack" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" nberrors "github.com/netbirdio/netbird/client/errors" @@ -53,6 +55,7 @@ import ( semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" nbssh "github.com/netbirdio/netbird/client/ssh" + nbstatus "github.com/netbirdio/netbird/client/status" "github.com/netbirdio/netbird/client/system" nbdns "github.com/netbirdio/netbird/dns" "github.com/netbirdio/netbird/route" @@ -62,7 +65,9 @@ import ( relayClient "github.com/netbirdio/netbird/shared/relay/client" signal "github.com/netbirdio/netbird/shared/signal/client" sProto "github.com/netbirdio/netbird/shared/signal/proto" + "github.com/netbirdio/netbird/upload-server/types" "github.com/netbirdio/netbird/util" + "google.golang.org/grpc/status" ) // PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer. @@ -887,19 +892,45 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error { return nil } +func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) { + conn, err := grpc.NewClient( + strings.TrimPrefix(addr, "tcp://"), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+ + "If the daemon is not running please run: "+ + "\nnetbird service install \nnetbird service start\n", err) + } + + return conn, nil +} + func (e *Engine) receiveJobEvents() { go func() { err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse { - // Simple test handler — replace with real logic log.Infof("Received job request: %+v", msg) - // TODO: trigger local debug bundle or other job - return &mgmProto.JobResponse{ - ID: msg.ID, - WorkloadResults: &mgmProto.JobResponse_Bundle{ - Bundle: &mgmProto.BundleResult{ - UploadKey: "upload-key", + switch params := msg.WorkloadParameters.(type) { + case *mgmProto.JobRequest_Bundle: + uploadKey, err := e.handleBundle(params.Bundle) + if err != nil { + return &mgmProto.JobResponse{ + ID: msg.ID, + Status: mgmProto.JobStatus_failed, + Reason: []byte(err.Error()), + } + } + return &mgmProto.JobResponse{ + ID: msg.ID, + Status: mgmProto.JobStatus_succeeded, + WorkloadResults: &mgmProto.JobResponse_Bundle{ + Bundle: &mgmProto.BundleResult{ + UploadKey: uploadKey, + }, }, - }, + } + default: + return nil } }) if err != nil { @@ -914,6 +945,62 @@ func (e *Engine) receiveJobEvents() { log.Debugf("connecting to Management Service jobs stream") } +func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) { + // todo: implement with real deamon address + conn, err := e.getPeerClient("unix:///var/run/netbird.sock") + if err != nil { + return "", err + } + defer func() { + if err := conn.Close(); err != nil { + log.Errorf("Failed to close connection: %v", err) + } + }() + + statusOutput, err := e.getStatusOutput(params.Anonymize) + if err != nil { + return "", err + } + request := &cProto.DebugBundleRequest{ + Anonymize: params.Anonymize, + SystemInfo: true, + Status: statusOutput, + LogFileCount: uint32(params.LogFileCount), + UploadURL: types.DefaultBundleURL, + } + service := cProto.NewDaemonServiceClient(conn) + resp, err := service.DebugBundle(e.clientCtx, request) + if err != nil { + return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message()) + } + + if resp.GetUploadFailureReason() != "" { + return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason()) + } + return resp.GetUploadedKey(), nil +} + +func (e *Engine) getStatusOutput(anon bool) (string, error) { + // todo: implement with real deamon address + conn, err := e.getPeerClient("unix:///var/run/netbird.sock") + if err != nil { + return "", err + } + defer func() { + if err := conn.Close(); err != nil { + log.Errorf("Failed to close connection: %v", err) + } + }() + + statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true}) + if err != nil { + return "", fmt.Errorf("status failed: %v", status.Convert(err).Message()) + } + return nbstatus.ParseToFullDetailSummary( + nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""), + ), nil +} + // receiveManagementEvents connects to the Management Service event stream to receive updates from the management service // E.g. when a new peer has been registered and we are allowed to connect to it. func (e *Engine) receiveManagementEvents() { diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 4d57a8095..65e931f18 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -163,11 +163,11 @@ func (s *GRPCServer) Job(srv proto.ManagementService_JobServer) error { } // Start background response handler - s.startResponseReceiver(ctx, accountID, srv) + s.startResponseReceiver(ctx, srv) // Prepare per-peer state - updates := s.jobManager.CreateJobChannel(peer.ID) - log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart)) + updates := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID) + log.WithContext(ctx).Debugf("Job: took %v", time.Since(reqStart)) // Main loop: forward jobs to client return s.sendJobsLoop(ctx, accountID, peerKey, peer, updates, srv) @@ -262,7 +262,7 @@ func (s *GRPCServer) handleHandshake(ctx context.Context, srv proto.ManagementSe return peerKey, nil } -func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string, srv proto.ManagementService_JobServer) { +func (s *GRPCServer) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) { go func() { for { msg, err := srv.Recv() @@ -280,7 +280,7 @@ func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string continue } - if err := s.jobManager.HandleResponse(ctx, accountID, jobResp); err != nil { + if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil { log.WithContext(ctx).Errorf("handle job response failed: %v", err) } diff --git a/management/server/jobChannel.go b/management/server/jobChannel.go index e9c98d2b8..9c64d6102 100644 --- a/management/server/jobChannel.go +++ b/management/server/jobChannel.go @@ -9,6 +9,7 @@ import ( "github.com/netbirdio/netbird/management/server/store" "github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/shared/management/proto" + log "github.com/sirupsen/logrus" ) const jobChannelBuffer = 100 @@ -17,7 +18,6 @@ type JobEvent struct { PeerID string Request *proto.JobRequest Response *proto.JobResponse - Done chan struct{} // closed when response arrives } type JobManager struct { @@ -42,9 +42,11 @@ func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager } // CreateJobChannel creates or replaces a channel for a peer -func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent { - // TODO: all pending jobs stored in db for this peer should be failed - // jm.Store.MarkPendingJobsAsFailed(peerID) +func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID string) chan *JobEvent { + // all pending jobs stored in db for this peer should be failed + if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil { + log.WithContext(ctx).Error(err.Error()) + } jm.mu.Lock() defer jm.mu.Unlock() @@ -71,7 +73,6 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req event := &JobEvent{ PeerID: peerID, Request: req, - Done: make(chan struct{}), } jm.mu.Lock() @@ -80,14 +81,6 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req select { case ch <- event: - case <-time.After(5 * time.Second): - jm.cleanup(ctx, accountID, string(req.ID), "timed out") - return fmt.Errorf("job channel full for peer %s", peerID) - } - - select { - case <-event.Done: - return nil case <-time.After(jm.responseWait): jm.cleanup(ctx, accountID, string(req.ID), "timed out") return fmt.Errorf("job %s timed out", req.ID) @@ -95,10 +88,11 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req jm.cleanup(ctx, accountID, string(req.ID), ctx.Err().Error()) return ctx.Err() } + return nil } // HandleResponse marks a job as finished and moves it to completed -func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp *proto.JobResponse) error { +func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error { jm.mu.Lock() defer jm.mu.Unlock() @@ -106,14 +100,14 @@ func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp if !ok { return fmt.Errorf("job %s not found", resp.ID) } + //update or create the store for job response + err := jm.Store.CompletePeerJob(ctx, resp) + if err == nil { + event.Response = resp + } - event.Response = resp - //TODO: update the store for job response - // jm.store.CompleteJob(ctx,accountID, string(resp.GetID()), string(resp.GetResult()),string(resp.GetReason())) - close(event.Done) delete(jm.pending, string(resp.ID)) - - return nil + return err } // CloseChannel closes a peer’s channel and cleans up its jobs @@ -130,7 +124,9 @@ func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string for jobID, ev := range jm.pending { if ev.PeerID == peerID { // if the client disconnect and there is pending job then marke it as failed - // jm.store.CompleteJob(ctx,accountID, jobID,"", "Time out ") + if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Time out peer disconnected"); err != nil { + log.WithContext(ctx).Errorf(err.Error()) + } delete(jm.pending, jobID) } } @@ -142,8 +138,29 @@ func (jm *JobManager) cleanup(ctx context.Context, accountID, jobID string, reas defer jm.mu.Unlock() if ev, ok := jm.pending[jobID]; ok { - close(ev.Done) - // jm.store.CompleteJob(ctx, accountID, jobID, "", reason) + if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, ev.PeerID, reason); err != nil { + log.WithContext(ctx).Errorf(err.Error()) + } delete(jm.pending, jobID) } } + +func (jm *JobManager) IsPeerConnected(peerID string) bool { + jm.mu.RLock() + defer jm.mu.RUnlock() + + _, ok := jm.jobChannels[peerID] + return ok +} + +func (jm *JobManager) IsPeerHasPendingJobs(peerID string) bool { + jm.mu.RLock() + defer jm.mu.RUnlock() + + for _, ev := range jm.pending { + if ev.PeerID == peerID { + return true + } + } + return false +} diff --git a/management/server/peer.go b/management/server/peer.go index 876a51124..16abf2b40 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -353,22 +353,24 @@ func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, p } // check if peer connected - // todo: implement jobManager.IsPeerConnected - // if !am.jobManager.IsPeerConnected(ctx, peerID) { - // return status.NewJobFailedError("peer not connected") - // } + if !am.jobManager.IsPeerConnected(peerID) { + return status.Errorf(status.BadRequest, "peer not connected") + } // check if already has pending jobs - // todo: implement jobManager.GetPendingJobsByPeerID - // if pending := am.jobManager.GetPendingJobsByPeerID(ctx, peerID); len(pending) > 0 { - // return status.NewJobAlreadyPendingError(peerID) - // } + if am.jobManager.IsPeerHasPendingJobs(peerID) { + return status.Errorf(status.BadRequest, "peer already hase pending job") + } + + jobStream, err := job.ToStreamJobRequest() + if err != nil { + return status.Errorf(status.BadRequest, "invalid job request %v", err) + } // try sending job first - // todo: implement am.jobManager.SendJob - // if err := am.jobManager.SendJob(ctx, peerID, job); err != nil { - // return status.NewJobFailedError(fmt.Sprintf("failed to send job: %v", err)) - // } + if err := am.jobManager.SendJob(ctx, accountID, peerID, jobStream); err != nil { + return status.Errorf(status.Internal, "failed to send job: %v", err) + } var peer *nbpeer.Peer var eventsToStore func() diff --git a/management/server/store/sql_store.go b/management/server/store/sql_store.go index de3a01e72..6eddeb1ca 100644 --- a/management/server/store/sql_store.go +++ b/management/server/store/sql_store.go @@ -34,6 +34,7 @@ import ( "github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/util" "github.com/netbirdio/netbird/route" + "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/shared/management/status" ) @@ -136,18 +137,39 @@ func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error { return nil } -// job was pending for too long and has been cancelled -// todo call it when we first start the jobChannel to make sure no stuck jobs -func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, peerID string) error { - now := time.Now().UTC() - return s.db. +func (s *SqlStore) CompletePeerJob(ctx context.Context, jobResponse *proto.JobResponse) error { + var job types.Job + if err := job.ApplyResponse(jobResponse); err != nil { + return status.Errorf(status.Internal, err.Error()) + } + result := s.db. Model(&types.Job{}). - Where("peer_id = ? AND status = ?", types.JobStatusPending, peerID). - Updates(map[string]any{ - "status": types.JobStatusFailed, - "failed_reason": "Pending job cleanup: marked as failed automatically due to being stuck too long", - "completed_at": now, - }).Error + Where(idQueryCondition, string(jobResponse.GetID())). + Updates(job) + + if result.Error != nil { + log.WithContext(ctx).Errorf("failed to update job in store: %s", result.Error) + return status.Errorf(status.Internal, "failed to create job in store") + } + return nil +} + +// job was pending for too long and has been cancelled +func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error { + now := time.Now().UTC() + result := s.db. + Model(&types.Job{}). + Where(accountAndPeerIDQueryCondition+"AND status = ?", accountID, peerID, types.JobStatusPending). + Updates(types.Job{ + Status: types.JobStatusFailed, + FailedReason: reason, + CompletedAt: &now, + }) + if result.Error != nil { + log.WithContext(ctx).Errorf("failed to mark pending jobs as Failed job in store: %s", result.Error) + return status.Errorf(status.Internal, "failed to mark pending job as Failed in store") + } + return nil } // GetJobByID fetches job by ID @@ -159,7 +181,11 @@ func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) if errors.Is(err, gorm.ErrRecordNotFound) { return nil, status.Errorf(status.NotFound, "job %s not found", jobID) } - return &job, err + if err != nil { + log.WithContext(ctx).Errorf("failed to fetch job from store: %s", err) + return nil, err + } + return &job, nil } // get all jobs @@ -171,34 +197,13 @@ func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([ Find(&jobs).Error if err != nil { + log.WithContext(ctx).Errorf("failed to fetch jobs from store: %s", err) return nil, err } return jobs, nil } -func (s *SqlStore) CompletePeerJob(accountID, jobID, result, failedReason string) error { - now := time.Now().UTC() - - updates := map[string]any{ - "completed_at": now, - } - - if result != "" && failedReason == "" { - updates["status"] = types.JobStatusSucceeded - updates["result"] = result - updates["failed_reason"] = "" - } else { - updates["status"] = types.JobStatusFailed - updates["failed_reason"] = failedReason - } - - return s.db. - Model(&types.Job{}). - Where(accountAndIDQueryCondition, accountID, jobID). - Updates(updates).Error -} - // AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) { log.WithContext(ctx).Tracef("acquiring global lock") diff --git a/management/server/store/store.go b/management/server/store/store.go index 98b9ae865..42c4dbec9 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -26,6 +26,7 @@ import ( "github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/management/server/testutil" "github.com/netbirdio/netbird/management/server/types" + "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/management/server/migration" @@ -206,10 +207,10 @@ type Store interface { MarkAccountPrimary(ctx context.Context, accountID string) error UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error CreatePeerJob(ctx context.Context, job *types.Job) error - CompletePeerJob(accountID, jobID, result, failedReason string) error + CompletePeerJob(ctx context.Context, jobResponse *proto.JobResponse) error GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) - MarkPendingJobsAsFailed(ctx context.Context, peerID string) error + MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error } const ( diff --git a/management/server/types/job.go b/management/server/types/job.go index 484738790..35e761c5f 100644 --- a/management/server/types/job.go +++ b/management/server/types/job.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/netbirdio/netbird/shared/management/http/api" + "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/shared/management/status" ) @@ -136,14 +137,14 @@ func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) e return fmt.Errorf("invalid parameters for bundle job") } // validate bundle_for_time <= 5 minutes - if bundle.Parameters.BundleForTime < 0 || bundle.Parameters.BundleForTime > 5 { - return fmt.Errorf("bundle_for_time must be between 0 and 5, got %d", bundle.Parameters.BundleForTime) + if bundle.Parameters.BundleForTime < 1 || bundle.Parameters.BundleForTime > 5 { + return fmt.Errorf("bundle_for_time must be between 1 and 5, got %d", bundle.Parameters.BundleForTime) } // validate log-file-count ≥ 1 and ≤ 1000 if bundle.Parameters.LogFileCount < 1 || bundle.Parameters.LogFileCount > 1000 { return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", bundle.Parameters.LogFileCount) } - + workload.Parameters, err = json.Marshal(bundle.Parameters) if err != nil { return fmt.Errorf("failed to marshal workload parameters: %w", err) @@ -153,3 +154,63 @@ func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) e return nil } + +// ApplyResponse validates and maps a proto.JobResponse into the Job fields. +func (j *Job) ApplyResponse(resp *proto.JobResponse) error { + if resp == nil { + return nil + } + now := time.Now().UTC() + j.CompletedAt = &now + switch resp.Status { + case proto.JobStatus_succeeded: + j.Status = JobStatusSucceeded + case proto.JobStatus_failed: + j.Status = JobStatusFailed + default: + j.Status = JobStatusPending + } + + if len(resp.Reason) > 0 { + j.FailedReason = string(resp.Reason) + } + + // Handle workload results (oneof) + var err error + switch r := resp.WorkloadResults.(type) { + case *proto.JobResponse_Bundle: + if j.Workload.Result, err = json.Marshal(r.Bundle); err != nil { + return fmt.Errorf("failed to marshal workload results: %w", err) + } + default: + return fmt.Errorf("unsupported workload response type: %T", r) + } + return nil +} + +func (j *Job) ToStreamJobRequest() (*proto.JobRequest, error) { + switch j.Workload.Type { + case JobTypeBundle: + return j.buildStreamBundleResponse() + default: + return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type) + } +} + +func (j *Job) buildStreamBundleResponse() (*proto.JobRequest, error) { + var p api.BundleParameters + if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil { + return nil, fmt.Errorf("invalid parameters for bundle job: %w", err) + } + return &proto.JobRequest{ + ID: []byte(j.ID), + WorkloadParameters: &proto.JobRequest_Bundle{ + Bundle: &proto.BundleParameters{ + BundleFor: p.BundleFor, + BundleForTime: int64(p.BundleForTime), + LogFileCount: int32(p.LogFileCount), + Anonymize: p.Anonymize, + }, + }, + }, nil +} diff --git a/shared/management/client/grpc.go b/shared/management/client/grpc.go index fe3910638..1739f8e6b 100644 --- a/shared/management/client/grpc.go +++ b/shared/management/client/grpc.go @@ -298,7 +298,7 @@ func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes. // blocking until error err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler) - if err != nil { + if err != nil && err != io.EOF{ c.notifyDisconnected(err) s, _ := gstatus.FromError(err) switch s.Code() {