diff --git a/client/internal/engine.go b/client/internal/engine.go index 14f5e9ae8..ebc48acee 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -22,6 +22,8 @@ import ( log "github.com/sirupsen/logrus" "golang.zx2c4.com/wireguard/tun/netstack" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/protobuf/proto" nberrors "github.com/netbirdio/netbird/client/errors" @@ -53,6 +55,7 @@ import ( semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" nbssh "github.com/netbirdio/netbird/client/ssh" + nbstatus "github.com/netbirdio/netbird/client/status" "github.com/netbirdio/netbird/client/system" nbdns "github.com/netbirdio/netbird/dns" "github.com/netbirdio/netbird/route" @@ -62,7 +65,9 @@ import ( relayClient "github.com/netbirdio/netbird/shared/relay/client" signal "github.com/netbirdio/netbird/shared/signal/client" sProto "github.com/netbirdio/netbird/shared/signal/proto" + "github.com/netbirdio/netbird/upload-server/types" "github.com/netbirdio/netbird/util" + "google.golang.org/grpc/status" ) // PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer. @@ -125,6 +130,8 @@ type EngineConfig struct { BlockInbound bool LazyConnectionEnabled bool + + peerDaemonAddr string } // Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers. @@ -887,6 +894,20 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error { return nil } +func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) { + conn, err := grpc.NewClient( + strings.TrimPrefix(addr, "tcp://"), + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+ + "If the daemon is not running please run: "+ + "\nnetbird service install \nnetbird service start\n", err) + } + + return conn, nil +} + func (e *Engine) receiveJobEvents() { go func() { err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse { @@ -914,6 +935,60 @@ func (e *Engine) receiveJobEvents() { log.Debugf("connecting to Management Service jobs stream") } +func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) { + conn, err := e.getPeerClient("unix:///var/run/netbird.sock") + if err != nil { + return "", err + } + defer func() { + if err := conn.Close(); err != nil { + log.Errorf("Failed to close connection: %v", err) + } + }() + + statusOutput, err := e.getStatusOutput(params.Anonymize) + if err != nil { + return "", err + } + request := &cProto.DebugBundleRequest{ + Anonymize: params.Anonymize, + SystemInfo: true, + Status: statusOutput, + LogFileCount: uint32(params.LogFileCount), + UploadURL: types.DefaultBundleURL, + } + service := cProto.NewDaemonServiceClient(conn) + resp, err := service.DebugBundle(e.clientCtx, request) + if err != nil { + return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message()) + } + + if resp.GetUploadFailureReason() != "" { + return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason()) + } + return resp.GetUploadedKey(), nil +} + +func (e *Engine) getStatusOutput(anon bool) (string, error) { + conn, err := e.getPeerClient("unix:///var/run/netbird.sock") + if err != nil { + return "", err + } + defer func() { + if err := conn.Close(); err != nil { + log.Errorf("Failed to close connection: %v", err) + } + }() + + statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true}) + if err != nil { + return "", fmt.Errorf("status failed: %v", status.Convert(err).Message()) + } + return nbstatus.ParseToFullDetailSummary( + nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""), + ), nil +} + // receiveManagementEvents connects to the Management Service event stream to receive updates from the management service // E.g. when a new peer has been registered and we are allowed to connect to it. func (e *Engine) receiveManagementEvents() { diff --git a/management/server/grpcserver.go b/management/server/grpcserver.go index 4d57a8095..460e44b2d 100644 --- a/management/server/grpcserver.go +++ b/management/server/grpcserver.go @@ -163,10 +163,13 @@ func (s *GRPCServer) Job(srv proto.ManagementService_JobServer) error { } // Start background response handler - s.startResponseReceiver(ctx, accountID, srv) + s.startResponseReceiver(ctx, srv) // Prepare per-peer state - updates := s.jobManager.CreateJobChannel(peer.ID) + updates, err := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID) + if err != nil { + return status.Errorf(codes.Internal, err.Error()) + } log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart)) // Main loop: forward jobs to client @@ -262,7 +265,7 @@ func (s *GRPCServer) handleHandshake(ctx context.Context, srv proto.ManagementSe return peerKey, nil } -func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string, srv proto.ManagementService_JobServer) { +func (s *GRPCServer) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) { go func() { for { msg, err := srv.Recv() @@ -280,7 +283,7 @@ func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string continue } - if err := s.jobManager.HandleResponse(ctx, accountID, jobResp); err != nil { + if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil { log.WithContext(ctx).Errorf("handle job response failed: %v", err) } diff --git a/management/server/jobChannel.go b/management/server/jobChannel.go index e9c98d2b8..baaeda94a 100644 --- a/management/server/jobChannel.go +++ b/management/server/jobChannel.go @@ -6,18 +6,22 @@ import ( "sync" "time" + nbpeer "github.com/netbirdio/netbird/management/server/peer" "github.com/netbirdio/netbird/management/server/store" "github.com/netbirdio/netbird/management/server/telemetry" + "github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/shared/management/proto" + log "github.com/sirupsen/logrus" ) const jobChannelBuffer = 100 type JobEvent struct { - PeerID string - Request *proto.JobRequest - Response *proto.JobResponse - Done chan struct{} // closed when response arrives + Job *types.Job + Request *proto.JobRequest + Response *proto.JobResponse + Done chan struct{} // closed when response arrives + StoreEvent func(meta map[string]any, peer *nbpeer.Peer) } type JobManager struct { @@ -42,9 +46,11 @@ func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager } // CreateJobChannel creates or replaces a channel for a peer -func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent { - // TODO: all pending jobs stored in db for this peer should be failed - // jm.Store.MarkPendingJobsAsFailed(peerID) +func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID string) (chan *JobEvent, error) { + // all pending jobs stored in db for this peer should be failed + if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil { + return nil, err + } jm.mu.Lock() defer jm.mu.Unlock() @@ -56,22 +62,28 @@ func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent { ch := make(chan *JobEvent, jobChannelBuffer) jm.jobChannels[peerID] = ch - return ch + return ch, nil } // SendJob sends a job to a peer and tracks it as pending -func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req *proto.JobRequest) error { +func (jm *JobManager) SendJob(ctx context.Context, job *types.Job, storeEvent func(meta map[string]any, peer *nbpeer.Peer)) error { jm.mu.RLock() - ch, ok := jm.jobChannels[peerID] + ch, ok := jm.jobChannels[job.PeerID] jm.mu.RUnlock() if !ok { - return fmt.Errorf("peer %s has no channel", peerID) + return fmt.Errorf("peer %s has no channel", job.PeerID) + } + + req, err := job.ToStreamJobRequest() + if err != nil { + return err } event := &JobEvent{ - PeerID: peerID, - Request: req, - Done: make(chan struct{}), + Job: job, + Request: req, + Done: make(chan struct{}), + StoreEvent: storeEvent, } jm.mu.Lock() @@ -81,24 +93,24 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req select { case ch <- event: case <-time.After(5 * time.Second): - jm.cleanup(ctx, accountID, string(req.ID), "timed out") - return fmt.Errorf("job channel full for peer %s", peerID) + jm.cleanup(ctx, string(req.ID), "timed out") + return fmt.Errorf("job channel full for peer %s", job.PeerID) } select { case <-event.Done: return nil case <-time.After(jm.responseWait): - jm.cleanup(ctx, accountID, string(req.ID), "timed out") + jm.cleanup(ctx, string(req.ID), "timed out") return fmt.Errorf("job %s timed out", req.ID) case <-ctx.Done(): - jm.cleanup(ctx, accountID, string(req.ID), ctx.Err().Error()) + jm.cleanup(ctx, string(req.ID), ctx.Err().Error()) return ctx.Err() } } // HandleResponse marks a job as finished and moves it to completed -func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp *proto.JobResponse) error { +func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error { jm.mu.Lock() defer jm.mu.Unlock() @@ -106,14 +118,17 @@ func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp if !ok { return fmt.Errorf("job %s not found", resp.ID) } + fmt.Printf("we got this %+v\n", resp) + //update or create the store for job response + err := jm.saveJob(ctx, event.Job, resp, event.StoreEvent) + if err == nil { + event.Response = resp + } - event.Response = resp - //TODO: update the store for job response - // jm.store.CompleteJob(ctx,accountID, string(resp.GetID()), string(resp.GetResult()),string(resp.GetReason())) close(event.Done) delete(jm.pending, string(resp.ID)) - return nil + return err } // CloseChannel closes a peer’s channel and cleans up its jobs @@ -128,22 +143,81 @@ func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string } for jobID, ev := range jm.pending { - if ev.PeerID == peerID { + if ev.Job.PeerID == peerID { // if the client disconnect and there is pending job then marke it as failed - // jm.store.CompleteJob(ctx,accountID, jobID,"", "Time out ") + if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Time out"); err != nil { + log.WithContext(ctx).Errorf(err.Error()) + } delete(jm.pending, jobID) } } } // cleanup removes a pending job safely -func (jm *JobManager) cleanup(ctx context.Context, accountID, jobID string, reason string) { +func (jm *JobManager) cleanup(ctx context.Context, jobID string, reason string) { jm.mu.Lock() defer jm.mu.Unlock() if ev, ok := jm.pending[jobID]; ok { close(ev.Done) - // jm.store.CompleteJob(ctx, accountID, jobID, "", reason) + if err := jm.Store.MarkPendingJobsAsFailed(ctx, ev.Job.AccountID, ev.Job.PeerID, reason); err != nil { + log.WithContext(ctx).Errorf(err.Error()) + } delete(jm.pending, jobID) } } + +func (jm *JobManager) IsPeerConnected(peerID string) bool { + jm.mu.RLock() + defer jm.mu.RUnlock() + + _, ok := jm.jobChannels[peerID] + return ok +} + +func (jm *JobManager) IsPeerHasPendingJobs(peerID string) bool { + jm.mu.RLock() + defer jm.mu.RUnlock() + + for _, ev := range jm.pending { + if ev.Job.PeerID == peerID { + return true + } + } + return false +} + +func (jm *JobManager) saveJob(ctx context.Context, job *types.Job, response *proto.JobResponse, StoreEvent func(meta map[string]any, peer *nbpeer.Peer)) error { + var peer *nbpeer.Peer + var err error + var eventsToStore func() + + // persist job in DB only if send succeeded + err = jm.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error { + peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, job.AccountID, job.PeerID) + if err != nil { + return err + } + if err := transaction.CreateOrUpdatePeerJob(ctx, job, response); err != nil { + return err + } + + jobMeta := map[string]any{ + "job_id": job.ID, + "for_peer_id": job.PeerID, + "job_type": job.Workload.Type, + "job_status": job.Status, + "job_workload": job.Workload, + } + + eventsToStore = func() { + StoreEvent(jobMeta, peer) + } + return nil + }) + if err != nil { + return err + } + eventsToStore() + return nil +} diff --git a/management/server/peer.go b/management/server/peer.go index 876a51124..33966482c 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -353,53 +353,21 @@ func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, p } // check if peer connected - // todo: implement jobManager.IsPeerConnected - // if !am.jobManager.IsPeerConnected(ctx, peerID) { - // return status.NewJobFailedError("peer not connected") - // } + if !am.jobManager.IsPeerConnected(peerID) { + return status.Errorf(status.BadRequest, "peer not connected") + } // check if already has pending jobs - // todo: implement jobManager.GetPendingJobsByPeerID - // if pending := am.jobManager.GetPendingJobsByPeerID(ctx, peerID); len(pending) > 0 { - // return status.NewJobAlreadyPendingError(peerID) - // } + if am.jobManager.IsPeerHasPendingJobs(peerID) { + return status.Errorf(status.BadRequest, "peer already hase pending job") + } // try sending job first - // todo: implement am.jobManager.SendJob - // if err := am.jobManager.SendJob(ctx, peerID, job); err != nil { - // return status.NewJobFailedError(fmt.Sprintf("failed to send job: %v", err)) - // } - - var peer *nbpeer.Peer - var eventsToStore func() - - // persist job in DB only if send succeeded - err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error { - peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, accountID, peerID) - if err != nil { - return err - } - if err := transaction.CreatePeerJob(ctx, job); err != nil { - return err - } - - jobMeta := map[string]any{ - "job_id": job.ID, - "for_peer_id": job.PeerID, - "job_type": job.Workload.Type, - "job_status": job.Status, - "job_workload": job.Workload, - } - - eventsToStore = func() { - am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, jobMeta) - } - return nil - }) - if err != nil { - return err + if err := am.jobManager.SendJob(ctx, job, func(meta map[string]any, peer *nbpeer.Peer) { + am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, meta) + }); err != nil { + return status.Errorf(status.Internal, "failed to send job: %v", err) } - eventsToStore() return nil } @@ -422,7 +390,7 @@ func (am *DefaultAccountManager) GetAllPeerJobs(ctx context.Context, accountID, return []*types.Job{}, nil } - accountJobs, err := am.Store.GetPeerJobs(ctx, accountID, peerID) + accountJobs, err := am.Store.GetPeerJobs(accountID, peerID) if err != nil { return nil, err } @@ -449,7 +417,7 @@ func (am *DefaultAccountManager) GetPeerJobByID(ctx context.Context, accountID, return &types.Job{}, nil } - job, err := am.Store.GetPeerJobByID(ctx, accountID, jobID) + job, err := am.Store.GetPeerJobByID(accountID, jobID) if err != nil { return nil, err } diff --git a/management/server/store/sql_store.go b/management/server/store/sql_store.go index de3a01e72..34ca579b4 100644 --- a/management/server/store/sql_store.go +++ b/management/server/store/sql_store.go @@ -34,6 +34,7 @@ import ( "github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/util" "github.com/netbirdio/netbird/route" + "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/shared/management/status" ) @@ -127,8 +128,18 @@ func GetKeyQueryCondition(s *SqlStore) string { } // SaveJob persists a job in DB -func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error { - result := s.db.Create(job) +func (s *SqlStore) CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error { + if err := job.ApplyResponse(jobResponse); err != nil { + return status.Errorf(status.Internal, err.Error()) + } + fmt.Printf("new job or update %v\n", job) + + result := s.db. + Model(&types.Job{}).Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.AssignmentColumns([]string{"completed_at", "status", "failed_reason", "workload_workload_type", "workload_parameters", "workload_result"}), + }).Create(job) + if result.Error != nil { log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error) return status.Errorf(status.Internal, "failed to create job in store") @@ -137,21 +148,25 @@ func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error { } // job was pending for too long and has been cancelled -// todo call it when we first start the jobChannel to make sure no stuck jobs -func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, peerID string) error { +func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error { now := time.Now().UTC() - return s.db. + result := s.db. Model(&types.Job{}). - Where("peer_id = ? AND status = ?", types.JobStatusPending, peerID). - Updates(map[string]any{ - "status": types.JobStatusFailed, - "failed_reason": "Pending job cleanup: marked as failed automatically due to being stuck too long", - "completed_at": now, - }).Error + Where(accountAndPeerIDQueryCondition+"AND status = ?", accountID, peerID, types.JobStatusPending). + Updates(types.Job{ + Status: types.JobStatusFailed, + FailedReason: reason, + CompletedAt: &now, + }) + if result.Error != nil { + log.WithContext(ctx).Errorf("failed to mark pending jobs as Failed job in store: %s", result.Error) + return status.Errorf(status.Internal, "failed to mark pending job as Failed in store") + } + return nil } // GetJobByID fetches job by ID -func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) { +func (s *SqlStore) GetPeerJobByID(accountID, jobID string) (*types.Job, error) { var job types.Job err := s.db. Where(accountAndIDQueryCondition, accountID, jobID). @@ -163,7 +178,7 @@ func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) } // get all jobs -func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) { +func (s *SqlStore) GetPeerJobs(accountID, peerID string) ([]*types.Job, error) { var jobs []*types.Job err := s.db. Where(accountAndPeerIDQueryCondition, accountID, peerID). @@ -177,28 +192,6 @@ func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([ return jobs, nil } -func (s *SqlStore) CompletePeerJob(accountID, jobID, result, failedReason string) error { - now := time.Now().UTC() - - updates := map[string]any{ - "completed_at": now, - } - - if result != "" && failedReason == "" { - updates["status"] = types.JobStatusSucceeded - updates["result"] = result - updates["failed_reason"] = "" - } else { - updates["status"] = types.JobStatusFailed - updates["failed_reason"] = failedReason - } - - return s.db. - Model(&types.Job{}). - Where(accountAndIDQueryCondition, accountID, jobID). - Updates(updates).Error -} - // AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) { log.WithContext(ctx).Tracef("acquiring global lock") diff --git a/management/server/store/store.go b/management/server/store/store.go index 98b9ae865..71ed67f88 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -26,6 +26,7 @@ import ( "github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/management/server/testutil" "github.com/netbirdio/netbird/management/server/types" + "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/management/server/migration" @@ -205,11 +206,10 @@ type Store interface { IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error) MarkAccountPrimary(ctx context.Context, accountID string) error UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error - CreatePeerJob(ctx context.Context, job *types.Job) error - CompletePeerJob(accountID, jobID, result, failedReason string) error - GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) - GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) - MarkPendingJobsAsFailed(ctx context.Context, peerID string) error + GetPeerJobByID(accountID, jobID string) (*types.Job, error) + GetPeerJobs(accountID, peerID string) ([]*types.Job, error) + MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error + CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error } const ( diff --git a/management/server/types/job.go b/management/server/types/job.go index 484738790..ed094e0d9 100644 --- a/management/server/types/job.go +++ b/management/server/types/job.go @@ -7,6 +7,7 @@ import ( "github.com/google/uuid" "github.com/netbirdio/netbird/shared/management/http/api" + "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/shared/management/status" ) @@ -143,7 +144,7 @@ func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) e if bundle.Parameters.LogFileCount < 1 || bundle.Parameters.LogFileCount > 1000 { return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", bundle.Parameters.LogFileCount) } - + workload.Parameters, err = json.Marshal(bundle.Parameters) if err != nil { return fmt.Errorf("failed to marshal workload parameters: %w", err) @@ -153,3 +154,63 @@ func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) e return nil } + +// ApplyResponse validates and maps a proto.JobResponse into the Job fields. +func (j *Job) ApplyResponse(resp *proto.JobResponse) error { + if resp == nil { + return nil + } + now := time.Now().UTC() + j.CompletedAt = &now + switch resp.Status { + case proto.JobStatus_succeeded: + j.Status = JobStatusSucceeded + case proto.JobStatus_failed: + j.Status = JobStatusFailed + default: + j.Status = JobStatusPending + } + + if len(resp.Reason) > 0 { + j.FailedReason = string(resp.Reason) + } + + // Handle workload results (oneof) + var err error + switch r := resp.WorkloadResults.(type) { + case *proto.JobResponse_Bundle: + if j.Workload.Result, err = json.Marshal(r.Bundle); err != nil { + return fmt.Errorf("failed to marshal workload results: %w", err) + } + default: + return fmt.Errorf("unsupported workload response type: %T", r) + } + return nil +} + +func (j *Job) ToStreamJobRequest() (*proto.JobRequest, error) { + switch j.Workload.Type { + case JobTypeBundle: + return j.buildStreamBundleResponse() + default: + return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type) + } +} + +func (j *Job) buildStreamBundleResponse() (*proto.JobRequest, error) { + var p api.BundleParameters + if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil { + return nil, fmt.Errorf("invalid parameters for bundle job: %w", err) + } + return &proto.JobRequest{ + ID: []byte(j.ID), + WorkloadParameters: &proto.JobRequest_Bundle{ + Bundle: &proto.BundleParameters{ + BundleFor: p.BundleFor, + BundleForTime: int64(p.BundleForTime), + LogFileCount: int32(p.LogFileCount), + Anonymize: p.Anonymize, + }, + }, + }, nil +}