diff --git a/management/server/account/manager.go b/management/server/account/manager.go index f5af68f93..5d2a9b037 100644 --- a/management/server/account/manager.go +++ b/management/server/account/manager.go @@ -123,4 +123,7 @@ type Manager interface { UpdateToPrimaryAccount(ctx context.Context, accountId string) error GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error) GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error) + CreateJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error + GetAllJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) + GetJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) } diff --git a/management/server/activity/codes.go b/management/server/activity/codes.go index 6f9619597..ab3b95e2e 100644 --- a/management/server/activity/codes.go +++ b/management/server/activity/codes.go @@ -178,6 +178,8 @@ const ( AccountNetworkRangeUpdated Activity = 87 PeerIPUpdated Activity = 88 + JobCreatedByUser Activity = 89 + AccountDeleted Activity = 99999 ) diff --git a/management/server/http/handlers/peers/peers_handler.go b/management/server/http/handlers/peers/peers_handler.go index 414c7b1b9..7dc78e8a7 100644 --- a/management/server/http/handlers/peers/peers_handler.go +++ b/management/server/http/handlers/peers/peers_handler.go @@ -14,11 +14,11 @@ import ( "github.com/netbirdio/netbird/management/server/activity" nbcontext "github.com/netbirdio/netbird/management/server/context" "github.com/netbirdio/netbird/management/server/groups" + nbpeer "github.com/netbirdio/netbird/management/server/peer" + "github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/shared/management/http/api" "github.com/netbirdio/netbird/shared/management/http/util" - nbpeer "github.com/netbirdio/netbird/management/server/peer" "github.com/netbirdio/netbird/shared/management/status" - "github.com/netbirdio/netbird/management/server/types" ) // Handler is a handler that returns peers of the account @@ -32,6 +32,10 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) { router.HandleFunc("/peers/{peerId}", peersHandler.HandlePeer). Methods("GET", "PUT", "DELETE", "OPTIONS") router.HandleFunc("/peers/{peerId}/accessible-peers", peersHandler.GetAccessiblePeers).Methods("GET", "OPTIONS") + + router.HandleFunc("/peers/{peerId}/jobs", peersHandler.ListJobs).Methods("GET", "OPTIONS") + router.HandleFunc("/peers/{peerId}/jobs", peersHandler.CreateJob).Methods("POST", "OPTIONS") + router.HandleFunc("/peers/{peerId}/jobs/{jobId}", peersHandler.GetJob).Methods("GET", "OPTIONS") } // NewHandler creates a new peers Handler @@ -41,6 +45,80 @@ func NewHandler(accountManager account.Manager) *Handler { } } +func (h *Handler) CreateJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userAuth, err := nbcontext.GetUserAuthFromContext(ctx) + if err != nil { + util.WriteError(ctx, err, w) + return + } + + vars := mux.Vars(r) + peerID := vars["peerId"] + + req := &api.JobRequest{} + if err := json.NewDecoder(r.Body).Decode(req); err != nil { + util.WriteErrorResponse("invalid JSON payload", http.StatusBadRequest, w) + return + } + + job, err := types.NewJob(userAuth.UserId, userAuth.AccountId, peerID, types.JobType(req.Type), req.Parameters) + if err != nil { + error := fmt.Sprintf("invalid Job request %v", err) + util.WriteErrorResponse(error, http.StatusBadRequest, w) + return + } + if err := h.accountManager.CreateJob(ctx, userAuth.AccountId, peerID, userAuth.UserId, job); err != nil { + error := fmt.Sprintf("failed to create job %v", err) + util.WriteErrorResponse(error, http.StatusBadRequest, w) + return + } + + util.WriteJSONObject(ctx, w, job) +} + +func (h *Handler) ListJobs(w http.ResponseWriter, r *http.Request) { + userAuth, err := nbcontext.GetUserAuthFromContext(r.Context()) + if err != nil { + util.WriteError(r.Context(), err, w) + return + } + + vars := mux.Vars(r) + peerID := vars["peerId"] + + jobs, err := h.accountManager.GetAllJobs(r.Context(), userAuth.AccountId, userAuth.UserId, peerID) + if err != nil { + error := fmt.Sprintf("failed to fetch jobs %v", err) + util.WriteErrorResponse(error, http.StatusBadRequest, w) + return + } + + util.WriteJSONObject(r.Context(), w, jobs) +} + +func (h *Handler) GetJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userAuth, err := nbcontext.GetUserAuthFromContext(ctx) + if err != nil { + util.WriteError(ctx, err, w) + return + } + + vars := mux.Vars(r) + peerID := vars["peerId"] + jobID := vars["jobId"] + + job, err := h.accountManager.GetJobByID(ctx, userAuth.AccountId, userAuth.UserId, peerID, jobID) + if err != nil { + error := fmt.Sprintf("failed to fetch job %v", err) + util.WriteErrorResponse(error, http.StatusBadRequest, w) + return + } + + util.WriteJSONObject(ctx, w, job) +} + func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) { peerToReturn := peer.Copy() if peer.Status.Connected { @@ -354,7 +432,7 @@ func toSinglePeerResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dnsD } return &api.Peer{ - CreatedAt: peer.CreatedAt, + CreatedAt: peer.CreatedAt, Id: peer.ID, Name: peer.Name, Ip: peer.IP.String(), @@ -391,7 +469,7 @@ func toPeerListItemResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dn } return &api.PeerBatch{ - CreatedAt: peer.CreatedAt, + CreatedAt: peer.CreatedAt, Id: peer.ID, Name: peer.Name, Ip: peer.IP.String(), diff --git a/management/server/mock_server/account_mock.go b/management/server/mock_server/account_mock.go index 1d44068d2..4b812a810 100644 --- a/management/server/mock_server/account_mock.go +++ b/management/server/mock_server/account_mock.go @@ -123,6 +123,29 @@ type MockAccountManager struct { GetOrCreateAccountByPrivateDomainFunc func(ctx context.Context, initiatorId, domain string) (*types.Account, bool, error) UpdateAccountPeersFunc func(ctx context.Context, accountID string) BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string) + CreateJobFunc func(ctx context.Context, accountID, peerID, userID string, job *types.Job) error + GetAllJobsFunc func(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) + GetJobByIDFunc func(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) +} + +func (am *MockAccountManager) CreateJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error { + if am.CreateJobFunc != nil { + return am.CreateJobFunc(ctx, accountID, peerID, userID, job) + } + return status.Errorf(codes.Unimplemented, "method CreateJob is not implemented") +} + +func (am *MockAccountManager) GetAllJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) { + if am.CreateJobFunc != nil { + return am.GetAllJobsFunc(ctx, accountID, userID, peerID) + } + return nil, status.Errorf(codes.Unimplemented, "method GetAllJobs is not implemented") +} +func (am *MockAccountManager) GetJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) { + if am.CreateJobFunc != nil { + return am.GetJobByIDFunc(ctx, accountID, userID, peerID, jobID) + } + return nil, status.Errorf(codes.Unimplemented, "method CreateJob is not implemented") } func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error { diff --git a/management/server/peer.go b/management/server/peer.go index 979137e94..b2187867d 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -333,6 +333,142 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user return peer, nil } +func (am *DefaultAccountManager) CreateJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error { + if err := job.ValidateJobRequest(); err != nil { + return err + } + + unlock := am.Store.AcquireWriteLockByUID(ctx, accountID) + defer unlock() + + // todo: Create permissions for job + allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete) + if err != nil { + return status.NewPermissionValidationError(err) + } + if !allowed { + return status.NewPermissionDeniedError() + } + + peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID) + if err != nil { + return err + } + + if peerAccountID != accountID { + return status.NewPeerNotPartOfAccountError() + } + + // check if peer connected + // todo: implement jobManager.IsPeerConnected + // if !am.jobManager.IsPeerConnected(ctx, peerID) { + // return status.NewJobFailedError("peer not connected") + // } + + // check if already has pending jobs + // todo: implement jobManager.GetPendingJobsByPeerID + // if pending := am.jobManager.GetPendingJobsByPeerID(ctx, peerID); len(pending) > 0 { + // return status.NewJobAlreadyPendingError(peerID) + // } + + // try sending job first + // todo: implement am.jobManager.SendJob + // if err := am.jobManager.SendJob(ctx, peerID, job); err != nil { + // return status.NewJobFailedError(fmt.Sprintf("failed to send job: %v", err)) + // } + + var peer *nbpeer.Peer + var eventsToStore func() + + // persist job in DB only if send succeeded + err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error { + peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, accountID, peerID) + if err != nil { + return err + } + if err := transaction.SaveJob(ctx, job); err != nil { + return fmt.Errorf("failed to save job for peer %s: %w", peer.ID, err) + } + + settings, err := transaction.GetAccountSettings(ctx, store.LockingStrengthNone, accountID) + + if err != nil { + return err + } + dnsDomain := am.GetDNSDomain(settings) + + eventsToStore = func() { + am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, peer.EventMeta(dnsDomain)) + } + + if err = transaction.IncrementNetworkSerial(ctx, accountID); err != nil { + return fmt.Errorf("failed to increment network serial: %w", err) + } + return nil + }) + if err != nil { + return err + } + + eventsToStore() + + return nil +} + +func (am *DefaultAccountManager) GetAllJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) { + // todo: Create permissions for job + allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete) + if err != nil { + return nil, status.NewPermissionValidationError(err) + } + if !allowed { + return nil, status.NewPermissionDeniedError() + } + + peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID) + if err != nil { + return nil, err + } + + if peerAccountID != accountID { + return []*types.Job{}, nil + } + + accountJobs, err := am.Store.GetJobs(ctx, accountID, peerID) + if err != nil { + return nil, err + } + + return accountJobs, nil +} + +func (am *DefaultAccountManager) GetJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) { + // todo: Create permissions for job + allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete) + if err != nil { + return nil, status.NewPermissionValidationError(err) + } + if !allowed { + return nil, status.NewPermissionDeniedError() + } + + peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID) + if err != nil { + return nil, err + } + + if peerAccountID != accountID { + return &types.Job{}, nil + } + + job, err := am.Store.GetJobByID(ctx, accountID, jobID) + if err != nil { + return nil, err + } + + return job, nil +} + // DeletePeer removes peer from the account by its IP func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peerID, userID string) error { unlock := am.Store.AcquireWriteLockByUID(ctx, accountID) diff --git a/management/server/store/sql_store.go b/management/server/store/sql_store.go index 64f80776b..556cbc1d8 100644 --- a/management/server/store/sql_store.go +++ b/management/server/store/sql_store.go @@ -38,14 +38,15 @@ import ( ) const ( - storeSqliteFileName = "store.db" - idQueryCondition = "id = ?" - keyQueryCondition = "key = ?" - mysqlKeyQueryCondition = "`key` = ?" - accountAndIDQueryCondition = "account_id = ? and id = ?" - accountAndIDsQueryCondition = "account_id = ? AND id IN ?" - accountIDCondition = "account_id = ?" - peerNotFoundFMT = "peer %s not found" + storeSqliteFileName = "store.db" + idQueryCondition = "id = ?" + keyQueryCondition = "key = ?" + mysqlKeyQueryCondition = "`key` = ?" + accountAndIDQueryCondition = "account_id = ? and id = ?" + accountAndPeerIDQueryCondition = "account_id = ? and peer_id = ?" + accountAndIDsQueryCondition = "account_id = ? AND id IN ?" + accountIDCondition = "account_id = ?" + peerNotFoundFMT = "peer %s not found" ) // SqlStore represents an account storage backed by a Sql DB persisted to disk @@ -106,6 +107,7 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met &types.Account{}, &types.Policy{}, &types.PolicyRule{}, &route.Route{}, &nbdns.NameServerGroup{}, &installation{}, &types.ExtraSettings{}, &posture.Checks{}, &nbpeer.NetworkAddress{}, &networkTypes.Network{}, &routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{}, &types.AccountOnboarding{}, + &types.Job{}, ) if err != nil { return nil, fmt.Errorf("auto migratePreAuto: %w", err) @@ -124,6 +126,76 @@ func GetKeyQueryCondition(s *SqlStore) string { return keyQueryCondition } +// SaveJob persists a job in DB +func (s *SqlStore) SaveJob(ctx context.Context, job *types.Job) error { + start := time.Now() + defer func() { + if s.metrics != nil { + s.metrics.StoreMetrics().CountPersistenceDuration(time.Since(start)) + } + }() + + return s.db.WithContext(ctx). + Clauses(clause.OnConflict{UpdateAll: true}). + Create(job).Error +} + +// job was pending for too long and has been cancelled +// todo call it when we first start the jobChannel to make sure no stuck jobs +func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, triggeredBy string) error { + now := time.Now().UTC() + friendlyReason := "Pending job cleanup: marked as failed automatically due to being stuck too long" + return s.db.WithContext(ctx). + Model(&types.Job{}). + Where("status = ?", types.JobStatusPending). + Updates(map[string]any{ + "status": types.JobStatusFailed, + "failed_reason": friendlyReason, + "completed_at": now, + }).Error +} + +// GetJobByID fetches job by ID +func (s *SqlStore) GetJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) { + var job types.Job + err := s.db.WithContext(ctx). + Where(accountAndIDQueryCondition, accountID, jobID). + First(&job).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, fmt.Errorf("job %s not found", jobID) + } + return &job, err +} + +// get all jobs +func (s *SqlStore) GetJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) { + var jobs []*types.Job + err := s.db.WithContext(ctx). + Where(accountAndPeerIDQueryCondition, accountID, peerID). + Order("created_at DESC"). + Find(&jobs).Error + if err != nil { + return nil, err + } + + return jobs, nil +} + +func (s *SqlStore) CompleteJob(ctx context.Context, accountID, jobID, result string, failedReason string) error { + job, err := s.GetJobByID(ctx, accountID, jobID) + if err != nil { + return err + } + // mark it as succeeded or failed + if result != "" && failedReason == "" { + job.MarkSucceeded(result) + } else { + job.MarkFailed(failedReason) + } + + return s.db.WithContext(ctx).Save(job).Error +} + // AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) { log.WithContext(ctx).Tracef("acquiring global lock") diff --git a/management/server/store/store.go b/management/server/store/store.go index 9e0c04853..61ead53f6 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -205,6 +205,11 @@ type Store interface { IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error) MarkAccountPrimary(ctx context.Context, accountID string) error UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error + SaveJob(ctx context.Context, job *types.Job) error + GetJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) + GetJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) + CompleteJob(ctx context.Context, accountID, jobID, result string, failedReason string) error + MarkPendingJobsAsFailed(ctx context.Context, triggeredBy string) error } const ( diff --git a/management/server/types/job.go b/management/server/types/job.go new file mode 100644 index 000000000..6c35bd117 --- /dev/null +++ b/management/server/types/job.go @@ -0,0 +1,174 @@ +package types + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" +) + +type JobStatus string + +const ( + JobStatusPending JobStatus = "pending" + JobStatusSucceeded JobStatus = "succeeded" + JobStatusFailed JobStatus = "failed" +) + +type JobType string + +const ( + JobTypeBundle JobType = "bundle" + // add more job types here +) + +type Job struct { + // ID is the primary identifier + ID string `gorm:"primaryKey"` + + // CreatedAt when job was created (UTC) + CreatedAt time.Time `gorm:"autoCreateTime"` + + // CompletedAt when job finished, null if still running + CompletedAt *time.Time + + // TriggeredBy user that triggered this job + TriggeredBy string `gorm:"index"` + + PeerID string `gorm:"index"` + + AccountID string `gorm:"index"` + + // Type of the job, e.g. "bundle" + Type JobType `gorm:"index;type:varchar(50)"` + + // Status of the job: pending, succeeded, failed + Status JobStatus `gorm:"index;type:varchar(50)"` + + // FailedReason describes why the job failed (if failed) + FailedReason string + + // Result can contain job output (JSON, URL, etc.) + Result string + + // Parameters is a JSON blob storing job configuration (untyped) + Parameters json.RawMessage `gorm:"type:json"` +} + +// JobParametersBundle represents parameters for bundle/debug jobs +type JobParametersBundle struct { + BundleFor bool `json:"bundle_for"` + BundleForTime int `json:"bundle_for_time"` // minutes + LogFileCount int `json:"log_file_count"` + Anonymize bool `json:"anonymize"` +} + +// NewJob creates a new job with default fields and validation +func NewJob(triggeredBy, accountID, peerID string, jobType JobType, parameters map[string]any) (*Job, error) { + job := &Job{ + ID: uuid.New().String(), + TriggeredBy: triggeredBy, + PeerID: peerID, + AccountID: accountID, + Type: jobType, + Status: JobStatusPending, + CreatedAt: time.Now().UTC(), + } + + // Encode parameters + if err := job.encodeParameters(parameters); err != nil { + return nil, fmt.Errorf("failed to encode job parameters: %w", err) + } + + // Validate job + if err := job.ValidateJobRequest(); err != nil { + return nil, err + } + + return job, nil +} + +// DecodeParameters decodes raw parameters into a target struct +func (j *Job) DecodeParameters(target any) error { + if len(j.Parameters) == 0 { + return nil + } + return json.Unmarshal(j.Parameters, target) +} + +// EncodeParameters replaces raw parameters with marshaled JSON +func (j *Job) encodeParameters(params map[string]any) error { + if params == nil { + return fmt.Errorf("parameters cannot be empty") + } + data, err := json.Marshal(params) + if err != nil { + return err + } + j.Parameters = data + return nil +} + +// IsPending returns true if job is pending +func (j *Job) IsPending() bool { + return j.Status == JobStatusPending +} + +// MarkSucceeded sets job as completed successfully +func (j *Job) MarkSucceeded(result string) { + now := time.Now().UTC() + j.Status = JobStatusSucceeded + j.Result = result + j.CompletedAt = &now +} + +// MarkFailed sets job as failed with reason +func (j *Job) MarkFailed(reason string) { + now := time.Now().UTC() + j.Status = JobStatusFailed + j.FailedReason = reason + j.CompletedAt = &now +} + +// HasCompleted checks if job is completed (success or fail) +func (j *Job) HasCompleted() bool { + return j.Status == JobStatusSucceeded || j.Status == JobStatusFailed +} + +func (j *Job) ValidateJobRequest() error { + if j == nil { + return fmt.Errorf("job cannot be nil") + } + + if j.Type == "" { + return fmt.Errorf("job type must be specified") + } + + if len(j.Parameters) == 0 { + return fmt.Errorf("job parameters must be provided") + } + + switch j.Type { + case JobTypeBundle: + var params JobParametersBundle + if err := json.Unmarshal(j.Parameters, ¶ms); err != nil { + return fmt.Errorf("invalid parameters for bundle job: %w", err) + } + + // validate bundle_for_time <= 5 minutes + if params.BundleForTime < 0 || params.BundleForTime > 5 { + return fmt.Errorf("bundle_for_time must be between 0 and 5, got %d", params.BundleForTime) + } + + // validate log-file-count ≥ 1 and ≤ 1000 + if params.LogFileCount < 1 || params.LogFileCount > 1000 { + return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", params.LogFileCount) + } + + default: + return fmt.Errorf("unsupported job type: %s", j.Type) + } + + return nil +} diff --git a/shared/management/http/api/types.gen.go b/shared/management/http/api/types.gen.go index cf224a00e..8e4d92163 100644 --- a/shared/management/http/api/types.gen.go +++ b/shared/management/http/api/types.gen.go @@ -199,6 +199,11 @@ const ( GetApiEventsNetworkTrafficParamsDirectionINGRESS GetApiEventsNetworkTrafficParamsDirection = "INGRESS" ) +type JobRequest struct { + Type string `json:"type" binding:"required"` // Job type, e.g., "bundle" + Parameters map[string]any `json:"parameters" binding:"required"` // Dynamic parameters +} + // AccessiblePeer defines model for AccessiblePeer. type AccessiblePeer struct { // CityName Commonly used English name of the city @@ -1015,8 +1020,8 @@ type OSVersionCheck struct { // Peer defines model for Peer. type Peer struct { - // CreatedAt Peer creation date (UTC) - CreatedAt time.Time `json:"created_at"` + // CreatedAt Peer creation date (UTC) + CreatedAt time.Time `json:"created_at"` // ApprovalRequired (Cloud only) Indicates whether peer needs approval ApprovalRequired bool `json:"approval_required"` @@ -1098,8 +1103,8 @@ type Peer struct { // PeerBatch defines model for PeerBatch. type PeerBatch struct { - // CreatedAt Peer creation date (UTC) - CreatedAt time.Time `json:"created_at"` + // CreatedAt Peer creation date (UTC) + CreatedAt time.Time `json:"created_at"` // AccessiblePeersCount Number of accessible peers AccessiblePeersCount int `json:"accessible_peers_count"`