mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-25 19:56:46 +00:00
implement remote debug api (#4418)
fix lint clean up fix MarkPendingJobsAsFailed apply feedbacks 1 fix typo change api and apply new schema fix lint fix api object clean switch case apply feedback 2 fix error handle in create job get rid of any/interface type in job database fix sonar issue use RawJson for both parameters and results running go mod tidy update package fix 1 update codegen fix code-gen fix snyk fix snyk hopefully
This commit is contained in:
@@ -126,6 +126,9 @@ type Manager interface {
|
||||
UpdateToPrimaryAccount(ctx context.Context, accountId string) error
|
||||
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
|
||||
GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error)
|
||||
CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error
|
||||
GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error)
|
||||
GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error)
|
||||
SetEphemeralManager(em ephemeral.Manager)
|
||||
AllowSync(string, uint64) bool
|
||||
}
|
||||
|
||||
@@ -177,8 +177,10 @@ const (
|
||||
|
||||
AccountNetworkRangeUpdated Activity = 87
|
||||
PeerIPUpdated Activity = 88
|
||||
UserApproved Activity = 89
|
||||
UserRejected Activity = 90
|
||||
|
||||
UserApproved Activity = 89
|
||||
UserRejected Activity = 90
|
||||
JobCreatedByUser Activity = 91
|
||||
|
||||
AccountDeleted Activity = 99999
|
||||
)
|
||||
@@ -288,6 +290,8 @@ var activityMap = map[Activity]Code{
|
||||
PeerIPUpdated: {"Peer IP updated", "peer.ip.update"},
|
||||
UserApproved: {"User approved", "user.approve"},
|
||||
UserRejected: {"User rejected", "user.reject"},
|
||||
|
||||
JobCreatedByUser: {"Create Job for peer", "peer.job.create"},
|
||||
}
|
||||
|
||||
// StringCode returns a string code of the activity
|
||||
|
||||
@@ -33,6 +33,9 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) {
|
||||
Methods("GET", "PUT", "DELETE", "OPTIONS")
|
||||
router.HandleFunc("/peers/{peerId}/accessible-peers", peersHandler.GetAccessiblePeers).Methods("GET", "OPTIONS")
|
||||
router.HandleFunc("/peers/{peerId}/temporary-access", peersHandler.CreateTemporaryAccess).Methods("POST", "OPTIONS")
|
||||
router.HandleFunc("/peers/{peerId}/jobs", peersHandler.ListJobs).Methods("GET", "OPTIONS")
|
||||
router.HandleFunc("/peers/{peerId}/jobs", peersHandler.CreateJob).Methods("POST", "OPTIONS")
|
||||
router.HandleFunc("/peers/{peerId}/jobs/{jobId}", peersHandler.GetJob).Methods("GET", "OPTIONS")
|
||||
}
|
||||
|
||||
// NewHandler creates a new peers Handler
|
||||
@@ -42,6 +45,99 @@ func NewHandler(accountManager account.Manager) *Handler {
|
||||
}
|
||||
}
|
||||
|
||||
func (h *Handler) CreateJob(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
peerID := vars["peerId"]
|
||||
|
||||
req := &api.JobRequest{}
|
||||
if err := json.NewDecoder(r.Body).Decode(req); err != nil {
|
||||
util.WriteErrorResponse("couldn't parse JSON request", http.StatusBadRequest, w)
|
||||
return
|
||||
}
|
||||
|
||||
job, err := types.NewJob(userAuth.UserId, userAuth.AccountId, peerID, req)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
if err := h.accountManager.CreatePeerJob(ctx, userAuth.AccountId, peerID, userAuth.UserId, job); err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := toSingleJobResponse(job)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
util.WriteJSONObject(ctx, w, resp)
|
||||
}
|
||||
|
||||
func (h *Handler) ListJobs(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
peerID := vars["peerId"]
|
||||
|
||||
jobs, err := h.accountManager.GetAllPeerJobs(ctx, userAuth.AccountId, userAuth.UserId, peerID)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
respBody := make([]*api.JobResponse, 0, len(jobs))
|
||||
for _, job := range jobs {
|
||||
resp, err := toSingleJobResponse(job)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
respBody = append(respBody, resp)
|
||||
}
|
||||
|
||||
util.WriteJSONObject(ctx, w, respBody)
|
||||
}
|
||||
|
||||
func (h *Handler) GetJob(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
vars := mux.Vars(r)
|
||||
peerID := vars["peerId"]
|
||||
jobID := vars["jobId"]
|
||||
|
||||
job, err := h.accountManager.GetPeerJobByID(ctx, userAuth.AccountId, userAuth.UserId, peerID, jobID)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
resp, err := toSingleJobResponse(job)
|
||||
if err != nil {
|
||||
util.WriteError(ctx, err, w)
|
||||
return
|
||||
}
|
||||
|
||||
util.WriteJSONObject(ctx, w, resp)
|
||||
}
|
||||
|
||||
func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
|
||||
peerToReturn := peer.Copy()
|
||||
if peer.Status.Connected {
|
||||
@@ -504,6 +600,28 @@ func toPeerListItemResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dn
|
||||
}
|
||||
}
|
||||
|
||||
func toSingleJobResponse(job *types.Job) (*api.JobResponse, error) {
|
||||
workload, err := job.BuildWorkloadResponse()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var failed *string
|
||||
if job.FailedReason != "" {
|
||||
failed = &job.FailedReason
|
||||
}
|
||||
|
||||
return &api.JobResponse{
|
||||
Id: job.ID,
|
||||
CreatedAt: job.CreatedAt,
|
||||
CompletedAt: job.CompletedAt,
|
||||
TriggeredBy: job.TriggeredBy,
|
||||
Status: api.JobResponseStatus(job.Status),
|
||||
FailedReason: failed,
|
||||
Workload: *workload,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func fqdn(peer *nbpeer.Peer, dnsDomain string) string {
|
||||
fqdn := peer.FQDN(dnsDomain)
|
||||
if fqdn == "" {
|
||||
|
||||
@@ -128,8 +128,30 @@ type MockAccountManager struct {
|
||||
AllowSyncFunc func(string, uint64) bool
|
||||
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||
CreatePeerJobFunc func(ctx context.Context, accountID, peerID, userID string, job *types.Job) error
|
||||
GetAllPeerJobsFunc func(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error)
|
||||
GetPeerJobByIDFunc func(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error)
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error {
|
||||
if am.CreatePeerJobFunc != nil {
|
||||
return am.CreatePeerJobFunc(ctx, accountID, peerID, userID, job)
|
||||
}
|
||||
return status.Errorf(codes.Unimplemented, "method CreateJob is not implemented")
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) {
|
||||
if am.CreatePeerJobFunc != nil {
|
||||
return am.GetAllPeerJobsFunc(ctx, accountID, userID, peerID)
|
||||
}
|
||||
return nil, status.Errorf(codes.Unimplemented, "method GetAllJobs is not implemented")
|
||||
}
|
||||
func (am *MockAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) {
|
||||
if am.CreatePeerJobFunc != nil {
|
||||
return am.GetPeerJobByIDFunc(ctx, accountID, userID, peerID, jobID)
|
||||
}
|
||||
return nil, status.Errorf(codes.Unimplemented, "method CreateJob is not implemented")
|
||||
}
|
||||
|
||||
func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error {
|
||||
if am.SaveGroupFunc != nil {
|
||||
|
||||
@@ -330,6 +330,130 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
|
||||
return peer, nil
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error {
|
||||
// todo: Create permissions for job
|
||||
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
|
||||
if err != nil {
|
||||
return status.NewPermissionValidationError(err)
|
||||
}
|
||||
if !allowed {
|
||||
return status.NewPermissionDeniedError()
|
||||
}
|
||||
|
||||
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if peerAccountID != accountID {
|
||||
return status.NewPeerNotPartOfAccountError()
|
||||
}
|
||||
|
||||
// check if peer connected
|
||||
// todo: implement jobManager.IsPeerConnected
|
||||
// if !am.jobManager.IsPeerConnected(ctx, peerID) {
|
||||
// return status.NewJobFailedError("peer not connected")
|
||||
// }
|
||||
|
||||
// check if already has pending jobs
|
||||
// todo: implement jobManager.GetPendingJobsByPeerID
|
||||
// if pending := am.jobManager.GetPendingJobsByPeerID(ctx, peerID); len(pending) > 0 {
|
||||
// return status.NewJobAlreadyPendingError(peerID)
|
||||
// }
|
||||
|
||||
// try sending job first
|
||||
// todo: implement am.jobManager.SendJob
|
||||
// if err := am.jobManager.SendJob(ctx, peerID, job); err != nil {
|
||||
// return status.NewJobFailedError(fmt.Sprintf("failed to send job: %v", err))
|
||||
// }
|
||||
|
||||
var peer *nbpeer.Peer
|
||||
var eventsToStore func()
|
||||
|
||||
// persist job in DB only if send succeeded
|
||||
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, accountID, peerID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := transaction.CreatePeerJob(ctx, job); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
jobMeta := map[string]any{
|
||||
"job_id": job.ID,
|
||||
"for_peer_id": job.PeerID,
|
||||
"job_type": job.Workload.Type,
|
||||
"job_status": job.Status,
|
||||
"job_workload": job.Workload,
|
||||
}
|
||||
|
||||
eventsToStore = func() {
|
||||
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, jobMeta)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
eventsToStore()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) {
|
||||
// todo: Create permissions for job
|
||||
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
|
||||
if err != nil {
|
||||
return nil, status.NewPermissionValidationError(err)
|
||||
}
|
||||
if !allowed {
|
||||
return nil, status.NewPermissionDeniedError()
|
||||
}
|
||||
|
||||
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if peerAccountID != accountID {
|
||||
return []*types.Job{}, nil
|
||||
}
|
||||
|
||||
accountJobs, err := am.Store.GetPeerJobs(ctx, accountID, peerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return accountJobs, nil
|
||||
}
|
||||
|
||||
func (am *DefaultAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) {
|
||||
// todo: Create permissions for job
|
||||
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
|
||||
if err != nil {
|
||||
return nil, status.NewPermissionValidationError(err)
|
||||
}
|
||||
if !allowed {
|
||||
return nil, status.NewPermissionDeniedError()
|
||||
}
|
||||
|
||||
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if peerAccountID != accountID {
|
||||
return &types.Job{}, nil
|
||||
}
|
||||
|
||||
job, err := am.Store.GetPeerJobByID(ctx, accountID, jobID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return job, nil
|
||||
}
|
||||
|
||||
// DeletePeer removes peer from the account by its IP
|
||||
func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peerID, userID string) error {
|
||||
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
|
||||
|
||||
@@ -38,14 +38,15 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
storeSqliteFileName = "store.db"
|
||||
idQueryCondition = "id = ?"
|
||||
keyQueryCondition = "key = ?"
|
||||
mysqlKeyQueryCondition = "`key` = ?"
|
||||
accountAndIDQueryCondition = "account_id = ? and id = ?"
|
||||
accountAndIDsQueryCondition = "account_id = ? AND id IN ?"
|
||||
accountIDCondition = "account_id = ?"
|
||||
peerNotFoundFMT = "peer %s not found"
|
||||
storeSqliteFileName = "store.db"
|
||||
idQueryCondition = "id = ?"
|
||||
keyQueryCondition = "key = ?"
|
||||
mysqlKeyQueryCondition = "`key` = ?"
|
||||
accountAndIDQueryCondition = "account_id = ? and id = ?"
|
||||
accountAndPeerIDQueryCondition = "account_id = ? and peer_id = ?"
|
||||
accountAndIDsQueryCondition = "account_id = ? AND id IN ?"
|
||||
accountIDCondition = "account_id = ?"
|
||||
peerNotFoundFMT = "peer %s not found"
|
||||
)
|
||||
|
||||
// SqlStore represents an account storage backed by a Sql DB persisted to disk
|
||||
@@ -105,6 +106,7 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met
|
||||
&types.Account{}, &types.Policy{}, &types.PolicyRule{}, &route.Route{}, &nbdns.NameServerGroup{},
|
||||
&installation{}, &types.ExtraSettings{}, &posture.Checks{}, &nbpeer.NetworkAddress{},
|
||||
&networkTypes.Network{}, &routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{}, &types.AccountOnboarding{},
|
||||
&types.Job{},
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("auto migratePreAuto: %w", err)
|
||||
@@ -123,6 +125,79 @@ func GetKeyQueryCondition(s *SqlStore) string {
|
||||
return keyQueryCondition
|
||||
}
|
||||
|
||||
// SaveJob persists a job in DB
|
||||
func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error {
|
||||
result := s.db.Create(job)
|
||||
if result.Error != nil {
|
||||
log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error)
|
||||
return status.Errorf(status.Internal, "failed to create job in store")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// job was pending for too long and has been cancelled
|
||||
// todo call it when we first start the jobChannel to make sure no stuck jobs
|
||||
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, peerID string) error {
|
||||
now := time.Now().UTC()
|
||||
return s.db.
|
||||
Model(&types.Job{}).
|
||||
Where("peer_id = ? AND status = ?", types.JobStatusPending, peerID).
|
||||
Updates(map[string]any{
|
||||
"status": types.JobStatusFailed,
|
||||
"failed_reason": "Pending job cleanup: marked as failed automatically due to being stuck too long",
|
||||
"completed_at": now,
|
||||
}).Error
|
||||
}
|
||||
|
||||
// GetJobByID fetches job by ID
|
||||
func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) {
|
||||
var job types.Job
|
||||
err := s.db.
|
||||
Where(accountAndIDQueryCondition, accountID, jobID).
|
||||
First(&job).Error
|
||||
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||
return nil, status.Errorf(status.NotFound, "job %s not found", jobID)
|
||||
}
|
||||
return &job, err
|
||||
}
|
||||
|
||||
// get all jobs
|
||||
func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) {
|
||||
var jobs []*types.Job
|
||||
err := s.db.
|
||||
Where(accountAndPeerIDQueryCondition, accountID, peerID).
|
||||
Order("created_at DESC").
|
||||
Find(&jobs).Error
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return jobs, nil
|
||||
}
|
||||
|
||||
func (s *SqlStore) CompletePeerJob(accountID, jobID, result, failedReason string) error {
|
||||
now := time.Now().UTC()
|
||||
|
||||
updates := map[string]any{
|
||||
"completed_at": now,
|
||||
}
|
||||
|
||||
if result != "" && failedReason == "" {
|
||||
updates["status"] = types.JobStatusSucceeded
|
||||
updates["result"] = result
|
||||
updates["failed_reason"] = ""
|
||||
} else {
|
||||
updates["status"] = types.JobStatusFailed
|
||||
updates["failed_reason"] = failedReason
|
||||
}
|
||||
|
||||
return s.db.
|
||||
Model(&types.Job{}).
|
||||
Where(accountAndIDQueryCondition, accountID, jobID).
|
||||
Updates(updates).Error
|
||||
}
|
||||
|
||||
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock
|
||||
func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
|
||||
log.WithContext(ctx).Tracef("acquiring global lock")
|
||||
|
||||
@@ -202,6 +202,11 @@ type Store interface {
|
||||
IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error)
|
||||
MarkAccountPrimary(ctx context.Context, accountID string) error
|
||||
UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error
|
||||
CreatePeerJob(ctx context.Context, job *types.Job) error
|
||||
CompletePeerJob(accountID, jobID, result, failedReason string) error
|
||||
GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error)
|
||||
GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error)
|
||||
MarkPendingJobsAsFailed(ctx context.Context, peerID string) error
|
||||
GetPolicyRulesByResourceID(ctx context.Context, lockStrength LockingStrength, accountID string, peerID string) ([]*types.PolicyRule, error)
|
||||
}
|
||||
|
||||
|
||||
155
management/server/types/job.go
Normal file
155
management/server/types/job.go
Normal file
@@ -0,0 +1,155 @@
|
||||
package types
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||
"github.com/netbirdio/netbird/shared/management/status"
|
||||
)
|
||||
|
||||
type JobStatus string
|
||||
|
||||
const (
|
||||
JobStatusPending JobStatus = "pending"
|
||||
JobStatusSucceeded JobStatus = "succeeded"
|
||||
JobStatusFailed JobStatus = "failed"
|
||||
)
|
||||
|
||||
type JobType string
|
||||
|
||||
const (
|
||||
JobTypeBundle JobType = "bundle"
|
||||
)
|
||||
|
||||
type Job struct {
|
||||
// ID is the primary identifier
|
||||
ID string `gorm:"primaryKey"`
|
||||
|
||||
// CreatedAt when job was created (UTC)
|
||||
CreatedAt time.Time `gorm:"autoCreateTime"`
|
||||
|
||||
// CompletedAt when job finished, null if still running
|
||||
CompletedAt *time.Time
|
||||
|
||||
// TriggeredBy user that triggered this job
|
||||
TriggeredBy string `gorm:"index"`
|
||||
|
||||
PeerID string `gorm:"index"`
|
||||
|
||||
AccountID string `gorm:"index"`
|
||||
|
||||
// Status of the job: pending, succeeded, failed
|
||||
Status JobStatus `gorm:"index;type:varchar(50)"`
|
||||
|
||||
// FailedReason describes why the job failed (if failed)
|
||||
FailedReason string
|
||||
|
||||
Workload Workload `gorm:"embedded;embeddedPrefix:workload_"`
|
||||
}
|
||||
|
||||
type Workload struct {
|
||||
Type JobType `gorm:"column:workload_type;index;type:varchar(50)"`
|
||||
Parameters json.RawMessage `gorm:"type:json"`
|
||||
Result json.RawMessage `gorm:"type:json"`
|
||||
}
|
||||
|
||||
// NewJob creates a new job with default fields and validation
|
||||
func NewJob(triggeredBy, accountID, peerID string, req *api.JobRequest) (*Job, error) {
|
||||
if req == nil {
|
||||
return nil, status.Errorf(status.BadRequest, "job request cannot be nil")
|
||||
}
|
||||
|
||||
// Determine job type
|
||||
jobTypeStr, err := req.Workload.Discriminator()
|
||||
if err != nil {
|
||||
return nil, status.Errorf(status.BadRequest, "could not determine job type: %v", err)
|
||||
}
|
||||
jobType := JobType(jobTypeStr)
|
||||
|
||||
if jobType == "" {
|
||||
return nil, status.Errorf(status.BadRequest, "job type is required")
|
||||
}
|
||||
|
||||
var workload Workload
|
||||
|
||||
switch jobType {
|
||||
case JobTypeBundle:
|
||||
if err := validateAndBuildBundleParams(req.Workload, &workload); err != nil {
|
||||
return nil, status.Errorf(status.BadRequest, "%v", err)
|
||||
}
|
||||
default:
|
||||
return nil, status.Errorf(status.BadRequest, "unsupported job type: %s", jobType)
|
||||
}
|
||||
|
||||
return &Job{
|
||||
ID: uuid.New().String(),
|
||||
TriggeredBy: triggeredBy,
|
||||
PeerID: peerID,
|
||||
AccountID: accountID,
|
||||
Status: JobStatusPending,
|
||||
CreatedAt: time.Now().UTC(),
|
||||
Workload: workload,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (j *Job) BuildWorkloadResponse() (*api.WorkloadResponse, error) {
|
||||
var wl api.WorkloadResponse
|
||||
|
||||
switch j.Workload.Type {
|
||||
case JobTypeBundle:
|
||||
if err := j.buildBundleResponse(&wl); err != nil {
|
||||
return nil, status.Errorf(status.InvalidArgument, err.Error())
|
||||
}
|
||||
return &wl, nil
|
||||
|
||||
default:
|
||||
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
|
||||
}
|
||||
}
|
||||
|
||||
func (j *Job) buildBundleResponse(wl *api.WorkloadResponse) error {
|
||||
var p api.BundleParameters
|
||||
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
|
||||
return fmt.Errorf("invalid parameters for bundle job: %w", err)
|
||||
}
|
||||
var r api.BundleResult
|
||||
if err := json.Unmarshal(j.Workload.Result, &r); err != nil {
|
||||
return fmt.Errorf("invalid result for bundle job: %w", err)
|
||||
}
|
||||
|
||||
if err := wl.FromBundleWorkloadResponse(api.BundleWorkloadResponse{
|
||||
Type: api.WorkloadTypeBundle,
|
||||
Parameters: p,
|
||||
Result: r,
|
||||
}); err != nil {
|
||||
return fmt.Errorf("unknown job parameters: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) error {
|
||||
bundle, err := req.AsBundleWorkloadRequest()
|
||||
if err != nil {
|
||||
return fmt.Errorf("invalid parameters for bundle job")
|
||||
}
|
||||
// validate bundle_for_time <= 5 minutes
|
||||
if bundle.Parameters.BundleForTime < 0 || bundle.Parameters.BundleForTime > 5 {
|
||||
return fmt.Errorf("bundle_for_time must be between 0 and 5, got %d", bundle.Parameters.BundleForTime)
|
||||
}
|
||||
// validate log-file-count ≥ 1 and ≤ 1000
|
||||
if bundle.Parameters.LogFileCount < 1 || bundle.Parameters.LogFileCount > 1000 {
|
||||
return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", bundle.Parameters.LogFileCount)
|
||||
}
|
||||
|
||||
workload.Parameters, err = json.Marshal(bundle.Parameters)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal workload parameters: %w", err)
|
||||
}
|
||||
workload.Result = []byte("{}")
|
||||
workload.Type = JobType(api.WorkloadTypeBundle)
|
||||
|
||||
return nil
|
||||
}
|
||||
Reference in New Issue
Block a user