diff --git a/go.mod b/go.mod index a1560b409..abd140dd1 100644 --- a/go.mod +++ b/go.mod @@ -64,6 +64,7 @@ require ( github.com/nadoo/ipset v0.5.0 github.com/netbirdio/management-integrations/integrations v0.0.0-20250906095204-f87a07690ba0 github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45 + github.com/oapi-codegen/runtime v1.1.2 github.com/okta/okta-sdk-golang/v2 v2.18.0 github.com/oschwald/maxminddb-golang v1.12.0 github.com/patrickmn/go-cache v2.1.0+incompatible @@ -127,6 +128,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/Microsoft/hcsshim v0.12.3 // indirect github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect + github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.30 // indirect diff --git a/go.sum b/go.sum index 13838b82d..484001aa7 100644 --- a/go.sum +++ b/go.sum @@ -66,11 +66,14 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/Microsoft/hcsshim v0.12.3 h1:LS9NXqXhMoqNCplK1ApmVSfB4UnVLRDWRapB6EIlxE0= github.com/Microsoft/hcsshim v0.12.3/go.mod h1:Iyl1WVpZzr+UkzjekHZbV8o5Z9ZkxNGx6CtY2Qg/JVQ= +github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk= github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible h1:hqcTK6ZISdip65SR792lwYJTa/axESA0889D3UlZbLo= github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible/go.mod h1:6B1nuc1MUs6c62ODZDl7hVE5Pv7O2XGSkgg2olnq34I= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8= github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ= +github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8= @@ -116,6 +119,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= github.com/bketelsen/crypt v0.0.4/go.mod h1:aI6NrJ0pMGgvZKL1iVgXLnfIFJtfV+bKCoqOes/6LfM= +github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w= github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c= github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= @@ -416,6 +420,7 @@ github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/X github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e h1:LvL4XsI70QxOGHed6yhQtAU34Kx3Qq2wwBzGFKY8zKk= github.com/jsummers/gobmp v0.0.0-20151104160322-e2ba15ffa76e/go.mod h1:kLgvv7o6UM+0QSf0QjAse3wReFDsb9qbZJdfexWlrQw= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= @@ -516,6 +521,8 @@ github.com/nicksnyder/go-i18n/v2 v2.4.0/go.mod h1:nxYSZE9M0bf3Y70gPQjN9ha7XNHX7g github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= +github.com/oapi-codegen/runtime v1.1.2 h1:P2+CubHq8fO4Q6fV1tqDBZHCwpVpvPg7oKiYzQgXIyI= +github.com/oapi-codegen/runtime v1.1.2/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg= github.com/okta/okta-sdk-golang/v2 v2.18.0 h1:cfDasMb7CShbZvOrF6n+DnLevWwiHgedWMGJ8M8xKDc= github.com/okta/okta-sdk-golang/v2 v2.18.0/go.mod h1:dz30v3ctAiMb7jpsCngGfQUAEGm1/NsWT92uTbNDQIs= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -635,6 +642,7 @@ github.com/spf13/jwalterweatherman v1.1.0/go.mod h1:aNWZUN0dPAAO/Ljvb5BEdw96iTZ0 github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.8.1/go.mod h1:o0Pch8wJ9BVSWGQMbra6iw0oQ5oktSIBaujf1rJH9Ns= +github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0= github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c h1:km8GpoQut05eY3GiYWEedbTT0qnSxrCjsVbb7yKY1KE= github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c/go.mod h1:cNQ3dwVJtS5Hmnjxy6AgTPd0Inb3pW05ftPSX7NZO7Q= github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef h1:Ch6Q+AZUxDBCVqdkI8FSpFyZDtCVBc2VmejdNrm5rRQ= diff --git a/management/server/account/manager.go b/management/server/account/manager.go index a1ed9498b..946a5ddae 100644 --- a/management/server/account/manager.go +++ b/management/server/account/manager.go @@ -126,6 +126,9 @@ type Manager interface { UpdateToPrimaryAccount(ctx context.Context, accountId string) error GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error) GetCurrentUserInfo(ctx context.Context, userAuth nbcontext.UserAuth) (*users.UserInfoWithPermissions, error) + CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error + GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) + GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) SetEphemeralManager(em ephemeral.Manager) AllowSync(string, uint64) bool } diff --git a/management/server/activity/codes.go b/management/server/activity/codes.go index 5c5989f84..880900195 100644 --- a/management/server/activity/codes.go +++ b/management/server/activity/codes.go @@ -177,8 +177,10 @@ const ( AccountNetworkRangeUpdated Activity = 87 PeerIPUpdated Activity = 88 - UserApproved Activity = 89 - UserRejected Activity = 90 + + UserApproved Activity = 89 + UserRejected Activity = 90 + JobCreatedByUser Activity = 91 AccountDeleted Activity = 99999 ) @@ -288,6 +290,8 @@ var activityMap = map[Activity]Code{ PeerIPUpdated: {"Peer IP updated", "peer.ip.update"}, UserApproved: {"User approved", "user.approve"}, UserRejected: {"User rejected", "user.reject"}, + + JobCreatedByUser: {"Create Job for peer", "peer.job.create"}, } // StringCode returns a string code of the activity diff --git a/management/server/http/handlers/peers/peers_handler.go b/management/server/http/handlers/peers/peers_handler.go index 4b33495de..b22cf01db 100644 --- a/management/server/http/handlers/peers/peers_handler.go +++ b/management/server/http/handlers/peers/peers_handler.go @@ -33,6 +33,9 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router) { Methods("GET", "PUT", "DELETE", "OPTIONS") router.HandleFunc("/peers/{peerId}/accessible-peers", peersHandler.GetAccessiblePeers).Methods("GET", "OPTIONS") router.HandleFunc("/peers/{peerId}/temporary-access", peersHandler.CreateTemporaryAccess).Methods("POST", "OPTIONS") + router.HandleFunc("/peers/{peerId}/jobs", peersHandler.ListJobs).Methods("GET", "OPTIONS") + router.HandleFunc("/peers/{peerId}/jobs", peersHandler.CreateJob).Methods("POST", "OPTIONS") + router.HandleFunc("/peers/{peerId}/jobs/{jobId}", peersHandler.GetJob).Methods("GET", "OPTIONS") } // NewHandler creates a new peers Handler @@ -42,6 +45,99 @@ func NewHandler(accountManager account.Manager) *Handler { } } +func (h *Handler) CreateJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userAuth, err := nbcontext.GetUserAuthFromContext(ctx) + if err != nil { + util.WriteError(ctx, err, w) + return + } + + vars := mux.Vars(r) + peerID := vars["peerId"] + + req := &api.JobRequest{} + if err := json.NewDecoder(r.Body).Decode(req); err != nil { + util.WriteErrorResponse("couldn't parse JSON request", http.StatusBadRequest, w) + return + } + + job, err := types.NewJob(userAuth.UserId, userAuth.AccountId, peerID, req) + if err != nil { + util.WriteError(ctx, err, w) + return + } + if err := h.accountManager.CreatePeerJob(ctx, userAuth.AccountId, peerID, userAuth.UserId, job); err != nil { + util.WriteError(ctx, err, w) + return + } + + resp, err := toSingleJobResponse(job) + if err != nil { + util.WriteError(ctx, err, w) + return + } + + util.WriteJSONObject(ctx, w, resp) +} + +func (h *Handler) ListJobs(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userAuth, err := nbcontext.GetUserAuthFromContext(ctx) + if err != nil { + util.WriteError(ctx, err, w) + return + } + + vars := mux.Vars(r) + peerID := vars["peerId"] + + jobs, err := h.accountManager.GetAllPeerJobs(ctx, userAuth.AccountId, userAuth.UserId, peerID) + if err != nil { + util.WriteError(ctx, err, w) + return + } + + respBody := make([]*api.JobResponse, 0, len(jobs)) + for _, job := range jobs { + resp, err := toSingleJobResponse(job) + if err != nil { + util.WriteError(ctx, err, w) + return + } + respBody = append(respBody, resp) + } + + util.WriteJSONObject(ctx, w, respBody) +} + +func (h *Handler) GetJob(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + userAuth, err := nbcontext.GetUserAuthFromContext(ctx) + if err != nil { + util.WriteError(ctx, err, w) + return + } + + vars := mux.Vars(r) + peerID := vars["peerId"] + jobID := vars["jobId"] + + job, err := h.accountManager.GetPeerJobByID(ctx, userAuth.AccountId, userAuth.UserId, peerID, jobID) + if err != nil { + util.WriteError(ctx, err, w) + return + } + + resp, err := toSingleJobResponse(job) + if err != nil { + util.WriteError(ctx, err, w) + return + } + + util.WriteJSONObject(ctx, w, resp) +} + func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) { peerToReturn := peer.Copy() if peer.Status.Connected { @@ -504,6 +600,28 @@ func toPeerListItemResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dn } } +func toSingleJobResponse(job *types.Job) (*api.JobResponse, error) { + workload, err := job.BuildWorkloadResponse() + if err != nil { + return nil, err + } + + var failed *string + if job.FailedReason != "" { + failed = &job.FailedReason + } + + return &api.JobResponse{ + Id: job.ID, + CreatedAt: job.CreatedAt, + CompletedAt: job.CompletedAt, + TriggeredBy: job.TriggeredBy, + Status: api.JobResponseStatus(job.Status), + FailedReason: failed, + Workload: *workload, + }, nil +} + func fqdn(peer *nbpeer.Peer, dnsDomain string) string { fqdn := peer.FQDN(dnsDomain) if fqdn == "" { diff --git a/management/server/mock_server/account_mock.go b/management/server/mock_server/account_mock.go index 6c58225a7..a142310f5 100644 --- a/management/server/mock_server/account_mock.go +++ b/management/server/mock_server/account_mock.go @@ -128,8 +128,30 @@ type MockAccountManager struct { AllowSyncFunc func(string, uint64) bool UpdateAccountPeersFunc func(ctx context.Context, accountID string) BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string) + CreatePeerJobFunc func(ctx context.Context, accountID, peerID, userID string, job *types.Job) error + GetAllPeerJobsFunc func(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) + GetPeerJobByIDFunc func(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) } +func (am *MockAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error { + if am.CreatePeerJobFunc != nil { + return am.CreatePeerJobFunc(ctx, accountID, peerID, userID, job) + } + return status.Errorf(codes.Unimplemented, "method CreateJob is not implemented") +} + +func (am *MockAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) { + if am.CreatePeerJobFunc != nil { + return am.GetAllPeerJobsFunc(ctx, accountID, userID, peerID) + } + return nil, status.Errorf(codes.Unimplemented, "method GetAllJobs is not implemented") +} +func (am *MockAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) { + if am.CreatePeerJobFunc != nil { + return am.GetPeerJobByIDFunc(ctx, accountID, userID, peerID, jobID) + } + return nil, status.Errorf(codes.Unimplemented, "method CreateJob is not implemented") +} func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error { if am.SaveGroupFunc != nil { diff --git a/management/server/peer.go b/management/server/peer.go index 4cf5d1e46..747af0a4e 100644 --- a/management/server/peer.go +++ b/management/server/peer.go @@ -330,6 +330,130 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user return peer, nil } +func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error { + // todo: Create permissions for job + allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete) + if err != nil { + return status.NewPermissionValidationError(err) + } + if !allowed { + return status.NewPermissionDeniedError() + } + + peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID) + if err != nil { + return err + } + + if peerAccountID != accountID { + return status.NewPeerNotPartOfAccountError() + } + + // check if peer connected + // todo: implement jobManager.IsPeerConnected + // if !am.jobManager.IsPeerConnected(ctx, peerID) { + // return status.NewJobFailedError("peer not connected") + // } + + // check if already has pending jobs + // todo: implement jobManager.GetPendingJobsByPeerID + // if pending := am.jobManager.GetPendingJobsByPeerID(ctx, peerID); len(pending) > 0 { + // return status.NewJobAlreadyPendingError(peerID) + // } + + // try sending job first + // todo: implement am.jobManager.SendJob + // if err := am.jobManager.SendJob(ctx, peerID, job); err != nil { + // return status.NewJobFailedError(fmt.Sprintf("failed to send job: %v", err)) + // } + + var peer *nbpeer.Peer + var eventsToStore func() + + // persist job in DB only if send succeeded + err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error { + peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, accountID, peerID) + if err != nil { + return err + } + if err := transaction.CreatePeerJob(ctx, job); err != nil { + return err + } + + jobMeta := map[string]any{ + "job_id": job.ID, + "for_peer_id": job.PeerID, + "job_type": job.Workload.Type, + "job_status": job.Status, + "job_workload": job.Workload, + } + + eventsToStore = func() { + am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, jobMeta) + } + return nil + }) + if err != nil { + return err + } + eventsToStore() + return nil +} + +func (am *DefaultAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) { + // todo: Create permissions for job + allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete) + if err != nil { + return nil, status.NewPermissionValidationError(err) + } + if !allowed { + return nil, status.NewPermissionDeniedError() + } + + peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID) + if err != nil { + return nil, err + } + + if peerAccountID != accountID { + return []*types.Job{}, nil + } + + accountJobs, err := am.Store.GetPeerJobs(ctx, accountID, peerID) + if err != nil { + return nil, err + } + + return accountJobs, nil +} + +func (am *DefaultAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) { + // todo: Create permissions for job + allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete) + if err != nil { + return nil, status.NewPermissionValidationError(err) + } + if !allowed { + return nil, status.NewPermissionDeniedError() + } + + peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID) + if err != nil { + return nil, err + } + + if peerAccountID != accountID { + return &types.Job{}, nil + } + + job, err := am.Store.GetPeerJobByID(ctx, accountID, jobID) + if err != nil { + return nil, err + } + + return job, nil +} + // DeletePeer removes peer from the account by its IP func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peerID, userID string) error { allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete) diff --git a/management/server/store/sql_store.go b/management/server/store/sql_store.go index 382d026c8..658116e6c 100644 --- a/management/server/store/sql_store.go +++ b/management/server/store/sql_store.go @@ -38,14 +38,15 @@ import ( ) const ( - storeSqliteFileName = "store.db" - idQueryCondition = "id = ?" - keyQueryCondition = "key = ?" - mysqlKeyQueryCondition = "`key` = ?" - accountAndIDQueryCondition = "account_id = ? and id = ?" - accountAndIDsQueryCondition = "account_id = ? AND id IN ?" - accountIDCondition = "account_id = ?" - peerNotFoundFMT = "peer %s not found" + storeSqliteFileName = "store.db" + idQueryCondition = "id = ?" + keyQueryCondition = "key = ?" + mysqlKeyQueryCondition = "`key` = ?" + accountAndIDQueryCondition = "account_id = ? and id = ?" + accountAndPeerIDQueryCondition = "account_id = ? and peer_id = ?" + accountAndIDsQueryCondition = "account_id = ? AND id IN ?" + accountIDCondition = "account_id = ?" + peerNotFoundFMT = "peer %s not found" ) // SqlStore represents an account storage backed by a Sql DB persisted to disk @@ -105,6 +106,7 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met &types.Account{}, &types.Policy{}, &types.PolicyRule{}, &route.Route{}, &nbdns.NameServerGroup{}, &installation{}, &types.ExtraSettings{}, &posture.Checks{}, &nbpeer.NetworkAddress{}, &networkTypes.Network{}, &routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{}, &types.AccountOnboarding{}, + &types.Job{}, ) if err != nil { return nil, fmt.Errorf("auto migratePreAuto: %w", err) @@ -123,6 +125,79 @@ func GetKeyQueryCondition(s *SqlStore) string { return keyQueryCondition } +// SaveJob persists a job in DB +func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error { + result := s.db.Create(job) + if result.Error != nil { + log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error) + return status.Errorf(status.Internal, "failed to create job in store") + } + return nil +} + +// job was pending for too long and has been cancelled +// todo call it when we first start the jobChannel to make sure no stuck jobs +func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, peerID string) error { + now := time.Now().UTC() + return s.db. + Model(&types.Job{}). + Where("peer_id = ? AND status = ?", types.JobStatusPending, peerID). + Updates(map[string]any{ + "status": types.JobStatusFailed, + "failed_reason": "Pending job cleanup: marked as failed automatically due to being stuck too long", + "completed_at": now, + }).Error +} + +// GetJobByID fetches job by ID +func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) { + var job types.Job + err := s.db. + Where(accountAndIDQueryCondition, accountID, jobID). + First(&job).Error + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, status.Errorf(status.NotFound, "job %s not found", jobID) + } + return &job, err +} + +// get all jobs +func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) { + var jobs []*types.Job + err := s.db. + Where(accountAndPeerIDQueryCondition, accountID, peerID). + Order("created_at DESC"). + Find(&jobs).Error + + if err != nil { + return nil, err + } + + return jobs, nil +} + +func (s *SqlStore) CompletePeerJob(accountID, jobID, result, failedReason string) error { + now := time.Now().UTC() + + updates := map[string]any{ + "completed_at": now, + } + + if result != "" && failedReason == "" { + updates["status"] = types.JobStatusSucceeded + updates["result"] = result + updates["failed_reason"] = "" + } else { + updates["status"] = types.JobStatusFailed + updates["failed_reason"] = failedReason + } + + return s.db. + Model(&types.Job{}). + Where(accountAndIDQueryCondition, accountID, jobID). + Updates(updates).Error +} + // AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) { log.WithContext(ctx).Tracef("acquiring global lock") diff --git a/management/server/store/store.go b/management/server/store/store.go index 21b660d96..23b257fc2 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -202,6 +202,11 @@ type Store interface { IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error) MarkAccountPrimary(ctx context.Context, accountID string) error UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error + CreatePeerJob(ctx context.Context, job *types.Job) error + CompletePeerJob(accountID, jobID, result, failedReason string) error + GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) + GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) + MarkPendingJobsAsFailed(ctx context.Context, peerID string) error GetPolicyRulesByResourceID(ctx context.Context, lockStrength LockingStrength, accountID string, peerID string) ([]*types.PolicyRule, error) } diff --git a/management/server/types/job.go b/management/server/types/job.go new file mode 100644 index 000000000..484738790 --- /dev/null +++ b/management/server/types/job.go @@ -0,0 +1,155 @@ +package types + +import ( + "encoding/json" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/netbirdio/netbird/shared/management/http/api" + "github.com/netbirdio/netbird/shared/management/status" +) + +type JobStatus string + +const ( + JobStatusPending JobStatus = "pending" + JobStatusSucceeded JobStatus = "succeeded" + JobStatusFailed JobStatus = "failed" +) + +type JobType string + +const ( + JobTypeBundle JobType = "bundle" +) + +type Job struct { + // ID is the primary identifier + ID string `gorm:"primaryKey"` + + // CreatedAt when job was created (UTC) + CreatedAt time.Time `gorm:"autoCreateTime"` + + // CompletedAt when job finished, null if still running + CompletedAt *time.Time + + // TriggeredBy user that triggered this job + TriggeredBy string `gorm:"index"` + + PeerID string `gorm:"index"` + + AccountID string `gorm:"index"` + + // Status of the job: pending, succeeded, failed + Status JobStatus `gorm:"index;type:varchar(50)"` + + // FailedReason describes why the job failed (if failed) + FailedReason string + + Workload Workload `gorm:"embedded;embeddedPrefix:workload_"` +} + +type Workload struct { + Type JobType `gorm:"column:workload_type;index;type:varchar(50)"` + Parameters json.RawMessage `gorm:"type:json"` + Result json.RawMessage `gorm:"type:json"` +} + +// NewJob creates a new job with default fields and validation +func NewJob(triggeredBy, accountID, peerID string, req *api.JobRequest) (*Job, error) { + if req == nil { + return nil, status.Errorf(status.BadRequest, "job request cannot be nil") + } + + // Determine job type + jobTypeStr, err := req.Workload.Discriminator() + if err != nil { + return nil, status.Errorf(status.BadRequest, "could not determine job type: %v", err) + } + jobType := JobType(jobTypeStr) + + if jobType == "" { + return nil, status.Errorf(status.BadRequest, "job type is required") + } + + var workload Workload + + switch jobType { + case JobTypeBundle: + if err := validateAndBuildBundleParams(req.Workload, &workload); err != nil { + return nil, status.Errorf(status.BadRequest, "%v", err) + } + default: + return nil, status.Errorf(status.BadRequest, "unsupported job type: %s", jobType) + } + + return &Job{ + ID: uuid.New().String(), + TriggeredBy: triggeredBy, + PeerID: peerID, + AccountID: accountID, + Status: JobStatusPending, + CreatedAt: time.Now().UTC(), + Workload: workload, + }, nil +} + +func (j *Job) BuildWorkloadResponse() (*api.WorkloadResponse, error) { + var wl api.WorkloadResponse + + switch j.Workload.Type { + case JobTypeBundle: + if err := j.buildBundleResponse(&wl); err != nil { + return nil, status.Errorf(status.InvalidArgument, err.Error()) + } + return &wl, nil + + default: + return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type) + } +} + +func (j *Job) buildBundleResponse(wl *api.WorkloadResponse) error { + var p api.BundleParameters + if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil { + return fmt.Errorf("invalid parameters for bundle job: %w", err) + } + var r api.BundleResult + if err := json.Unmarshal(j.Workload.Result, &r); err != nil { + return fmt.Errorf("invalid result for bundle job: %w", err) + } + + if err := wl.FromBundleWorkloadResponse(api.BundleWorkloadResponse{ + Type: api.WorkloadTypeBundle, + Parameters: p, + Result: r, + }); err != nil { + return fmt.Errorf("unknown job parameters: %v", err) + } + return nil +} + +func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) error { + bundle, err := req.AsBundleWorkloadRequest() + if err != nil { + return fmt.Errorf("invalid parameters for bundle job") + } + // validate bundle_for_time <= 5 minutes + if bundle.Parameters.BundleForTime < 0 || bundle.Parameters.BundleForTime > 5 { + return fmt.Errorf("bundle_for_time must be between 0 and 5, got %d", bundle.Parameters.BundleForTime) + } + // validate log-file-count ≥ 1 and ≤ 1000 + if bundle.Parameters.LogFileCount < 1 || bundle.Parameters.LogFileCount > 1000 { + return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", bundle.Parameters.LogFileCount) + } + + workload.Parameters, err = json.Marshal(bundle.Parameters) + if err != nil { + return fmt.Errorf("failed to marshal workload parameters: %w", err) + } + workload.Result = []byte("{}") + workload.Type = JobType(api.WorkloadTypeBundle) + + return nil +} diff --git a/shared/management/http/api/generate.sh b/shared/management/http/api/generate.sh index 2f24fd903..3770ea90f 100755 --- a/shared/management/http/api/generate.sh +++ b/shared/management/http/api/generate.sh @@ -11,6 +11,6 @@ fi old_pwd=$(pwd) script_path=$(dirname $(realpath "$0")) cd "$script_path" -go install github.com/deepmap/oapi-codegen/cmd/oapi-codegen@4a1477f6a8ba6ca8115cc23bb2fb67f0b9fca18e +go install github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen@latest oapi-codegen --config cfg.yaml openapi.yml -cd "$old_pwd" \ No newline at end of file +cd "$old_pwd" diff --git a/shared/management/http/api/openapi.yml b/shared/management/http/api/openapi.yml index 93578b1ae..5b6b211d1 100644 --- a/shared/management/http/api/openapi.yml +++ b/shared/management/http/api/openapi.yml @@ -34,6 +34,111 @@ tags: x-cloud-only: true components: schemas: + WorkloadType: + type: string + enum: + - bundle + BundleParameters: + type: object + properties: + bundle_for: + type: boolean + example: true + bundle_for_time: + type: integer + minimum: 0 + example: 2 + log_file_count: + type: integer + minimum: 0 + example: 100 + anonymize: + type: boolean + example: false + required: + - bundle_for + - bundle_for_time + - log_file_count + - anonymize + BundleResult: + type: object + properties: + upload_key: + type: string + example: "upload_key_123" + nullable: true + BundleWorkloadRequest: + type: object + properties: + type: + $ref: '#/components/schemas/WorkloadType' + parameters: + $ref: '#/components/schemas/BundleParameters' + required: + - type + - parameters + BundleWorkloadResponse: + type: object + properties: + type: + $ref: '#/components/schemas/WorkloadType' + parameters: + $ref: '#/components/schemas/BundleParameters' + result: + $ref: '#/components/schemas/BundleResult' + required: + - type + - parameters + - result + WorkloadRequest: + oneOf: + - $ref: '#/components/schemas/BundleWorkloadRequest' + discriminator: + propertyName: type + mapping: + bundle: '#/components/schemas/BundleWorkloadRequest' + WorkloadResponse: + oneOf: + - $ref: '#/components/schemas/BundleWorkloadResponse' + discriminator: + propertyName: type + mapping: + bundle: '#/components/schemas/BundleWorkloadResponse' + JobRequest: + type: object + properties: + workload: + $ref: '#/components/schemas/WorkloadRequest' + required: + - workload + JobResponse: + type: object + properties: + id: + type: string + created_at: + type: string + format: date-time + completed_at: + type: string + format: date-time + nullable: true + triggered_by: + type: string + status: + type: string + enum: [pending, succeeded, failed] + failed_reason: + type: string + nullable: true + workload: + $ref: '#/components/schemas/WorkloadResponse' + required: + - id + - created_at + - status + - triggered_by + - workload Account: type: object properties: @@ -2227,6 +2332,108 @@ security: - BearerAuth: [ ] - TokenAuth: [ ] paths: + /api/peers/{peerId}/jobs: + get: + summary: List Jobs + description: Retrieve all jobs for a given peer + tags: [ Jobs ] + security: + - BearerAuth: [] + - TokenAuth: [] + parameters: + - in: path + name: peerId + required: true + schema: + type: string + description: The unique identifier of a peer + responses: + '200': + description: List of jobs + content: + application/json: + schema: + type: array + items: + $ref: '#/components/schemas/JobResponse' + '400': + $ref: '#/components/responses/bad_request' + '401': + $ref: '#/components/responses/requires_authentication' + '403': + $ref: '#/components/responses/forbidden' + '500': + $ref: '#/components/responses/internal_error' + post: + summary: Create Job + description: Create a new job for a given peer + tags: [ Jobs ] + security: + - BearerAuth: [] + - TokenAuth: [] + parameters: + - in: path + name: peerId + required: true + schema: + type: string + description: The unique identifier of a peer + requestBody: + description: Create job request + content: + application/json: + schema: + $ref: '#/components/schemas/JobRequest' + required: true + responses: + '201': + description: Job created + content: + application/json: + schema: + $ref: '#/components/schemas/JobResponse' + '400': + "$ref": "#/components/responses/bad_request" + '401': + "$ref": "#/components/responses/requires_authentication" + '403': + "$ref": "#/components/responses/forbidden" + '500': + "$ref": "#/components/responses/internal_error" + /api/peers/{peerId}/jobs/{jobId}: + get: + summary: Get Job + description: Retrieve details of a specific job + tags: [ Jobs ] + security: + - BearerAuth: [] + - TokenAuth: [] + parameters: + - in: path + name: peerId + required: true + schema: + type: string + - in: path + name: jobId + required: true + schema: + type: string + responses: + '200': + description: A Job object + content: + application/json: + schema: + $ref: '#/components/schemas/JobResponse' + '400': + "$ref": "#/components/responses/bad_request" + '401': + "$ref": "#/components/responses/requires_authentication" + '403': + "$ref": "#/components/responses/forbidden" + '500': + "$ref": "#/components/responses/internal_error" /api/accounts: get: summary: List all Accounts diff --git a/shared/management/http/api/types.gen.go b/shared/management/http/api/types.gen.go index 3dbb32ef6..64ffad9b1 100644 --- a/shared/management/http/api/types.gen.go +++ b/shared/management/http/api/types.gen.go @@ -1,10 +1,14 @@ // Package api provides primitives to interact with the openapi HTTP API. // -// Code generated by github.com/deepmap/oapi-codegen version v1.11.1-0.20220912230023-4a1477f6a8ba DO NOT EDIT. +// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.5.0 DO NOT EDIT. package api import ( + "encoding/json" + "errors" "time" + + "github.com/oapi-codegen/runtime" ) const ( @@ -104,6 +108,13 @@ const ( IngressPortAllocationRequestPortRangeProtocolUdp IngressPortAllocationRequestPortRangeProtocol = "udp" ) +// Defines values for JobResponseStatus. +const ( + JobResponseStatusFailed JobResponseStatus = "failed" + JobResponseStatusPending JobResponseStatus = "pending" + JobResponseStatusSucceeded JobResponseStatus = "succeeded" +) + // Defines values for NameserverNsType. const ( NameserverNsTypeUdp NameserverNsType = "udp" @@ -179,6 +190,11 @@ const ( UserStatusInvited UserStatus = "invited" ) +// Defines values for WorkloadType. +const ( + WorkloadTypeBundle WorkloadType = "bundle" +) + // Defines values for GetApiEventsNetworkTrafficParamsType. const ( GetApiEventsNetworkTrafficParamsTypeTYPEDROP GetApiEventsNetworkTrafficParamsType = "TYPE_DROP" @@ -341,6 +357,32 @@ type AvailablePorts struct { Udp int `json:"udp"` } +// BundleParameters defines model for BundleParameters. +type BundleParameters struct { + Anonymize bool `json:"anonymize"` + BundleFor bool `json:"bundle_for"` + BundleForTime int `json:"bundle_for_time"` + LogFileCount int `json:"log_file_count"` +} + +// BundleResult defines model for BundleResult. +type BundleResult struct { + UploadKey *string `json:"upload_key"` +} + +// BundleWorkloadRequest defines model for BundleWorkloadRequest. +type BundleWorkloadRequest struct { + Parameters BundleParameters `json:"parameters"` + Type WorkloadType `json:"type"` +} + +// BundleWorkloadResponse defines model for BundleWorkloadResponse. +type BundleWorkloadResponse struct { + Parameters BundleParameters `json:"parameters"` + Result BundleResult `json:"result"` + Type WorkloadType `json:"type"` +} + // Checks List of objects that perform the actual checks type Checks struct { // GeoLocationCheck Posture check for geo location @@ -647,6 +689,25 @@ type IngressPortAllocationRequestPortRange struct { // IngressPortAllocationRequestPortRangeProtocol The protocol accepted by the port range type IngressPortAllocationRequestPortRangeProtocol string +// JobRequest defines model for JobRequest. +type JobRequest struct { + Workload WorkloadRequest `json:"workload"` +} + +// JobResponse defines model for JobResponse. +type JobResponse struct { + CompletedAt *time.Time `json:"completed_at"` + CreatedAt time.Time `json:"created_at"` + FailedReason *string `json:"failed_reason"` + Id string `json:"id"` + Status JobResponseStatus `json:"status"` + TriggeredBy string `json:"triggered_by"` + Workload WorkloadResponse `json:"workload"` +} + +// JobResponseStatus defines model for JobResponse.Status. +type JobResponseStatus string + // Location Describe geographical location information type Location struct { // CityName Commonly used English name of the city @@ -1857,6 +1918,19 @@ type UserRequest struct { Role string `json:"role"` } +// WorkloadRequest defines model for WorkloadRequest. +type WorkloadRequest struct { + union json.RawMessage +} + +// WorkloadResponse defines model for WorkloadResponse. +type WorkloadResponse struct { + union json.RawMessage +} + +// WorkloadType defines model for WorkloadType. +type WorkloadType string + // GetApiEventsNetworkTrafficParams defines parameters for GetApiEventsNetworkTraffic. type GetApiEventsNetworkTrafficParams struct { // Page Page number @@ -1974,6 +2048,9 @@ type PostApiPeersPeerIdIngressPortsJSONRequestBody = IngressPortAllocationReques // PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody defines body for PutApiPeersPeerIdIngressPortsAllocationId for application/json ContentType. type PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody = IngressPortAllocationRequest +// PostApiPeersPeerIdJobsJSONRequestBody defines body for PostApiPeersPeerIdJobs for application/json ContentType. +type PostApiPeersPeerIdJobsJSONRequestBody = JobRequest + // PostApiPeersPeerIdTemporaryAccessJSONRequestBody defines body for PostApiPeersPeerIdTemporaryAccess for application/json ContentType. type PostApiPeersPeerIdTemporaryAccessJSONRequestBody = PeerTemporaryAccessRequest @@ -2009,3 +2086,121 @@ type PutApiUsersUserIdJSONRequestBody = UserRequest // PostApiUsersUserIdTokensJSONRequestBody defines body for PostApiUsersUserIdTokens for application/json ContentType. type PostApiUsersUserIdTokensJSONRequestBody = PersonalAccessTokenRequest + +// AsBundleWorkloadRequest returns the union data inside the WorkloadRequest as a BundleWorkloadRequest +func (t WorkloadRequest) AsBundleWorkloadRequest() (BundleWorkloadRequest, error) { + var body BundleWorkloadRequest + err := json.Unmarshal(t.union, &body) + return body, err +} + +// FromBundleWorkloadRequest overwrites any union data inside the WorkloadRequest as the provided BundleWorkloadRequest +func (t *WorkloadRequest) FromBundleWorkloadRequest(v BundleWorkloadRequest) error { + v.Type = "bundle" + b, err := json.Marshal(v) + t.union = b + return err +} + +// MergeBundleWorkloadRequest performs a merge with any union data inside the WorkloadRequest, using the provided BundleWorkloadRequest +func (t *WorkloadRequest) MergeBundleWorkloadRequest(v BundleWorkloadRequest) error { + v.Type = "bundle" + b, err := json.Marshal(v) + if err != nil { + return err + } + + merged, err := runtime.JSONMerge(t.union, b) + t.union = merged + return err +} + +func (t WorkloadRequest) Discriminator() (string, error) { + var discriminator struct { + Discriminator string `json:"type"` + } + err := json.Unmarshal(t.union, &discriminator) + return discriminator.Discriminator, err +} + +func (t WorkloadRequest) ValueByDiscriminator() (interface{}, error) { + discriminator, err := t.Discriminator() + if err != nil { + return nil, err + } + switch discriminator { + case "bundle": + return t.AsBundleWorkloadRequest() + default: + return nil, errors.New("unknown discriminator value: " + discriminator) + } +} + +func (t WorkloadRequest) MarshalJSON() ([]byte, error) { + b, err := t.union.MarshalJSON() + return b, err +} + +func (t *WorkloadRequest) UnmarshalJSON(b []byte) error { + err := t.union.UnmarshalJSON(b) + return err +} + +// AsBundleWorkloadResponse returns the union data inside the WorkloadResponse as a BundleWorkloadResponse +func (t WorkloadResponse) AsBundleWorkloadResponse() (BundleWorkloadResponse, error) { + var body BundleWorkloadResponse + err := json.Unmarshal(t.union, &body) + return body, err +} + +// FromBundleWorkloadResponse overwrites any union data inside the WorkloadResponse as the provided BundleWorkloadResponse +func (t *WorkloadResponse) FromBundleWorkloadResponse(v BundleWorkloadResponse) error { + v.Type = "bundle" + b, err := json.Marshal(v) + t.union = b + return err +} + +// MergeBundleWorkloadResponse performs a merge with any union data inside the WorkloadResponse, using the provided BundleWorkloadResponse +func (t *WorkloadResponse) MergeBundleWorkloadResponse(v BundleWorkloadResponse) error { + v.Type = "bundle" + b, err := json.Marshal(v) + if err != nil { + return err + } + + merged, err := runtime.JSONMerge(t.union, b) + t.union = merged + return err +} + +func (t WorkloadResponse) Discriminator() (string, error) { + var discriminator struct { + Discriminator string `json:"type"` + } + err := json.Unmarshal(t.union, &discriminator) + return discriminator.Discriminator, err +} + +func (t WorkloadResponse) ValueByDiscriminator() (interface{}, error) { + discriminator, err := t.Discriminator() + if err != nil { + return nil, err + } + switch discriminator { + case "bundle": + return t.AsBundleWorkloadResponse() + default: + return nil, errors.New("unknown discriminator value: " + discriminator) + } +} + +func (t WorkloadResponse) MarshalJSON() ([]byte, error) { + b, err := t.union.MarshalJSON() + return b, err +} + +func (t *WorkloadResponse) UnmarshalJSON(b []byte) error { + err := t.union.UnmarshalJSON(b) + return err +}