Compare commits

...

1 Commits

Author SHA1 Message Date
aliamerj
b7cd2ee252 just testing 2025-09-01 14:05:19 +03:00
7 changed files with 290 additions and 116 deletions

View File

@@ -22,6 +22,8 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"golang.zx2c4.com/wireguard/tun/netstack" "golang.zx2c4.com/wireguard/tun/netstack"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes" "golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
nberrors "github.com/netbirdio/netbird/client/errors" nberrors "github.com/netbirdio/netbird/client/errors"
@@ -53,6 +55,7 @@ import (
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
nbssh "github.com/netbirdio/netbird/client/ssh" nbssh "github.com/netbirdio/netbird/client/ssh"
nbstatus "github.com/netbirdio/netbird/client/status"
"github.com/netbirdio/netbird/client/system" "github.com/netbirdio/netbird/client/system"
nbdns "github.com/netbirdio/netbird/dns" nbdns "github.com/netbirdio/netbird/dns"
"github.com/netbirdio/netbird/route" "github.com/netbirdio/netbird/route"
@@ -62,7 +65,9 @@ import (
relayClient "github.com/netbirdio/netbird/shared/relay/client" relayClient "github.com/netbirdio/netbird/shared/relay/client"
signal "github.com/netbirdio/netbird/shared/signal/client" signal "github.com/netbirdio/netbird/shared/signal/client"
sProto "github.com/netbirdio/netbird/shared/signal/proto" sProto "github.com/netbirdio/netbird/shared/signal/proto"
"github.com/netbirdio/netbird/upload-server/types"
"github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/util"
"google.golang.org/grpc/status"
) )
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer. // PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
@@ -125,6 +130,8 @@ type EngineConfig struct {
BlockInbound bool BlockInbound bool
LazyConnectionEnabled bool LazyConnectionEnabled bool
peerDaemonAddr string
} }
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers. // Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
@@ -887,6 +894,20 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
return nil return nil
} }
func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) {
conn, err := grpc.NewClient(
strings.TrimPrefix(addr, "tcp://"),
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("failed to connect to daemon error: %v\n"+
"If the daemon is not running please run: "+
"\nnetbird service install \nnetbird service start\n", err)
}
return conn, nil
}
func (e *Engine) receiveJobEvents() { func (e *Engine) receiveJobEvents() {
go func() { go func() {
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse { err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
@@ -914,6 +935,60 @@ func (e *Engine) receiveJobEvents() {
log.Debugf("connecting to Management Service jobs stream") log.Debugf("connecting to Management Service jobs stream")
} }
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusOutput, err := e.getStatusOutput(params.Anonymize)
if err != nil {
return "", err
}
request := &cProto.DebugBundleRequest{
Anonymize: params.Anonymize,
SystemInfo: true,
Status: statusOutput,
LogFileCount: uint32(params.LogFileCount),
UploadURL: types.DefaultBundleURL,
}
service := cProto.NewDaemonServiceClient(conn)
resp, err := service.DebugBundle(e.clientCtx, request)
if err != nil {
return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
}
if resp.GetUploadFailureReason() != "" {
return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
}
return resp.GetUploadedKey(), nil
}
func (e *Engine) getStatusOutput(anon bool) (string, error) {
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
if err != nil {
return "", err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("Failed to close connection: %v", err)
}
}()
statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true})
if err != nil {
return "", fmt.Errorf("status failed: %v", status.Convert(err).Message())
}
return nbstatus.ParseToFullDetailSummary(
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""),
), nil
}
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service // receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
// E.g. when a new peer has been registered and we are allowed to connect to it. // E.g. when a new peer has been registered and we are allowed to connect to it.
func (e *Engine) receiveManagementEvents() { func (e *Engine) receiveManagementEvents() {

View File

@@ -163,10 +163,13 @@ func (s *GRPCServer) Job(srv proto.ManagementService_JobServer) error {
} }
// Start background response handler // Start background response handler
s.startResponseReceiver(ctx, accountID, srv) s.startResponseReceiver(ctx, srv)
// Prepare per-peer state // Prepare per-peer state
updates := s.jobManager.CreateJobChannel(peer.ID) updates, err := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID)
if err != nil {
return status.Errorf(codes.Internal, err.Error())
}
log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart)) log.WithContext(ctx).Debugf("Sync: took %v", time.Since(reqStart))
// Main loop: forward jobs to client // Main loop: forward jobs to client
@@ -262,7 +265,7 @@ func (s *GRPCServer) handleHandshake(ctx context.Context, srv proto.ManagementSe
return peerKey, nil return peerKey, nil
} }
func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string, srv proto.ManagementService_JobServer) { func (s *GRPCServer) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) {
go func() { go func() {
for { for {
msg, err := srv.Recv() msg, err := srv.Recv()
@@ -280,7 +283,7 @@ func (s *GRPCServer) startResponseReceiver(ctx context.Context, accountID string
continue continue
} }
if err := s.jobManager.HandleResponse(ctx, accountID, jobResp); err != nil { if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil {
log.WithContext(ctx).Errorf("handle job response failed: %v", err) log.WithContext(ctx).Errorf("handle job response failed: %v", err)
} }

View File

@@ -6,18 +6,22 @@ import (
"sync" "sync"
"time" "time"
nbpeer "github.com/netbirdio/netbird/management/server/peer"
"github.com/netbirdio/netbird/management/server/store" "github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/shared/management/proto"
log "github.com/sirupsen/logrus"
) )
const jobChannelBuffer = 100 const jobChannelBuffer = 100
type JobEvent struct { type JobEvent struct {
PeerID string Job *types.Job
Request *proto.JobRequest Request *proto.JobRequest
Response *proto.JobResponse Response *proto.JobResponse
Done chan struct{} // closed when response arrives Done chan struct{} // closed when response arrives
StoreEvent func(meta map[string]any, peer *nbpeer.Peer)
} }
type JobManager struct { type JobManager struct {
@@ -42,9 +46,11 @@ func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager
} }
// CreateJobChannel creates or replaces a channel for a peer // CreateJobChannel creates or replaces a channel for a peer
func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent { func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID string) (chan *JobEvent, error) {
// TODO: all pending jobs stored in db for this peer should be failed // all pending jobs stored in db for this peer should be failed
// jm.Store.MarkPendingJobsAsFailed(peerID) if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil {
return nil, err
}
jm.mu.Lock() jm.mu.Lock()
defer jm.mu.Unlock() defer jm.mu.Unlock()
@@ -56,22 +62,28 @@ func (jm *JobManager) CreateJobChannel(peerID string) chan *JobEvent {
ch := make(chan *JobEvent, jobChannelBuffer) ch := make(chan *JobEvent, jobChannelBuffer)
jm.jobChannels[peerID] = ch jm.jobChannels[peerID] = ch
return ch return ch, nil
} }
// SendJob sends a job to a peer and tracks it as pending // SendJob sends a job to a peer and tracks it as pending
func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req *proto.JobRequest) error { func (jm *JobManager) SendJob(ctx context.Context, job *types.Job, storeEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
jm.mu.RLock() jm.mu.RLock()
ch, ok := jm.jobChannels[peerID] ch, ok := jm.jobChannels[job.PeerID]
jm.mu.RUnlock() jm.mu.RUnlock()
if !ok { if !ok {
return fmt.Errorf("peer %s has no channel", peerID) return fmt.Errorf("peer %s has no channel", job.PeerID)
}
req, err := job.ToStreamJobRequest()
if err != nil {
return err
} }
event := &JobEvent{ event := &JobEvent{
PeerID: peerID, Job: job,
Request: req, Request: req,
Done: make(chan struct{}), Done: make(chan struct{}),
StoreEvent: storeEvent,
} }
jm.mu.Lock() jm.mu.Lock()
@@ -81,24 +93,24 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req
select { select {
case ch <- event: case ch <- event:
case <-time.After(5 * time.Second): case <-time.After(5 * time.Second):
jm.cleanup(ctx, accountID, string(req.ID), "timed out") jm.cleanup(ctx, string(req.ID), "timed out")
return fmt.Errorf("job channel full for peer %s", peerID) return fmt.Errorf("job channel full for peer %s", job.PeerID)
} }
select { select {
case <-event.Done: case <-event.Done:
return nil return nil
case <-time.After(jm.responseWait): case <-time.After(jm.responseWait):
jm.cleanup(ctx, accountID, string(req.ID), "timed out") jm.cleanup(ctx, string(req.ID), "timed out")
return fmt.Errorf("job %s timed out", req.ID) return fmt.Errorf("job %s timed out", req.ID)
case <-ctx.Done(): case <-ctx.Done():
jm.cleanup(ctx, accountID, string(req.ID), ctx.Err().Error()) jm.cleanup(ctx, string(req.ID), ctx.Err().Error())
return ctx.Err() return ctx.Err()
} }
} }
// HandleResponse marks a job as finished and moves it to completed // HandleResponse marks a job as finished and moves it to completed
func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp *proto.JobResponse) error { func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error {
jm.mu.Lock() jm.mu.Lock()
defer jm.mu.Unlock() defer jm.mu.Unlock()
@@ -106,14 +118,17 @@ func (jm *JobManager) HandleResponse(ctx context.Context, accountID string, resp
if !ok { if !ok {
return fmt.Errorf("job %s not found", resp.ID) return fmt.Errorf("job %s not found", resp.ID)
} }
fmt.Printf("we got this %+v\n", resp)
//update or create the store for job response
err := jm.saveJob(ctx, event.Job, resp, event.StoreEvent)
if err == nil {
event.Response = resp event.Response = resp
//TODO: update the store for job response }
// jm.store.CompleteJob(ctx,accountID, string(resp.GetID()), string(resp.GetResult()),string(resp.GetReason()))
close(event.Done) close(event.Done)
delete(jm.pending, string(resp.ID)) delete(jm.pending, string(resp.ID))
return nil return err
} }
// CloseChannel closes a peers channel and cleans up its jobs // CloseChannel closes a peers channel and cleans up its jobs
@@ -128,22 +143,81 @@ func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string
} }
for jobID, ev := range jm.pending { for jobID, ev := range jm.pending {
if ev.PeerID == peerID { if ev.Job.PeerID == peerID {
// if the client disconnect and there is pending job then marke it as failed // if the client disconnect and there is pending job then marke it as failed
// jm.store.CompleteJob(ctx,accountID, jobID,"", "Time out ") if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Time out"); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID) delete(jm.pending, jobID)
} }
} }
} }
// cleanup removes a pending job safely // cleanup removes a pending job safely
func (jm *JobManager) cleanup(ctx context.Context, accountID, jobID string, reason string) { func (jm *JobManager) cleanup(ctx context.Context, jobID string, reason string) {
jm.mu.Lock() jm.mu.Lock()
defer jm.mu.Unlock() defer jm.mu.Unlock()
if ev, ok := jm.pending[jobID]; ok { if ev, ok := jm.pending[jobID]; ok {
close(ev.Done) close(ev.Done)
// jm.store.CompleteJob(ctx, accountID, jobID, "", reason) if err := jm.Store.MarkPendingJobsAsFailed(ctx, ev.Job.AccountID, ev.Job.PeerID, reason); err != nil {
log.WithContext(ctx).Errorf(err.Error())
}
delete(jm.pending, jobID) delete(jm.pending, jobID)
} }
} }
func (jm *JobManager) IsPeerConnected(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
_, ok := jm.jobChannels[peerID]
return ok
}
func (jm *JobManager) IsPeerHasPendingJobs(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
for _, ev := range jm.pending {
if ev.Job.PeerID == peerID {
return true
}
}
return false
}
func (jm *JobManager) saveJob(ctx context.Context, job *types.Job, response *proto.JobResponse, StoreEvent func(meta map[string]any, peer *nbpeer.Peer)) error {
var peer *nbpeer.Peer
var err error
var eventsToStore func()
// persist job in DB only if send succeeded
err = jm.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, job.AccountID, job.PeerID)
if err != nil {
return err
}
if err := transaction.CreateOrUpdatePeerJob(ctx, job, response); err != nil {
return err
}
jobMeta := map[string]any{
"job_id": job.ID,
"for_peer_id": job.PeerID,
"job_type": job.Workload.Type,
"job_status": job.Status,
"job_workload": job.Workload,
}
eventsToStore = func() {
StoreEvent(jobMeta, peer)
}
return nil
})
if err != nil {
return err
}
eventsToStore()
return nil
}

View File

@@ -353,53 +353,21 @@ func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, p
} }
// check if peer connected // check if peer connected
// todo: implement jobManager.IsPeerConnected if !am.jobManager.IsPeerConnected(peerID) {
// if !am.jobManager.IsPeerConnected(ctx, peerID) { return status.Errorf(status.BadRequest, "peer not connected")
// return status.NewJobFailedError("peer not connected") }
// }
// check if already has pending jobs // check if already has pending jobs
// todo: implement jobManager.GetPendingJobsByPeerID if am.jobManager.IsPeerHasPendingJobs(peerID) {
// if pending := am.jobManager.GetPendingJobsByPeerID(ctx, peerID); len(pending) > 0 { return status.Errorf(status.BadRequest, "peer already hase pending job")
// return status.NewJobAlreadyPendingError(peerID) }
// }
// try sending job first // try sending job first
// todo: implement am.jobManager.SendJob if err := am.jobManager.SendJob(ctx, job, func(meta map[string]any, peer *nbpeer.Peer) {
// if err := am.jobManager.SendJob(ctx, peerID, job); err != nil { am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, meta)
// return status.NewJobFailedError(fmt.Sprintf("failed to send job: %v", err)) }); err != nil {
// } return status.Errorf(status.Internal, "failed to send job: %v", err)
var peer *nbpeer.Peer
var eventsToStore func()
// persist job in DB only if send succeeded
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, accountID, peerID)
if err != nil {
return err
} }
if err := transaction.CreatePeerJob(ctx, job); err != nil {
return err
}
jobMeta := map[string]any{
"job_id": job.ID,
"for_peer_id": job.PeerID,
"job_type": job.Workload.Type,
"job_status": job.Status,
"job_workload": job.Workload,
}
eventsToStore = func() {
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, jobMeta)
}
return nil
})
if err != nil {
return err
}
eventsToStore()
return nil return nil
} }
@@ -422,7 +390,7 @@ func (am *DefaultAccountManager) GetAllPeerJobs(ctx context.Context, accountID,
return []*types.Job{}, nil return []*types.Job{}, nil
} }
accountJobs, err := am.Store.GetPeerJobs(ctx, accountID, peerID) accountJobs, err := am.Store.GetPeerJobs(accountID, peerID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@@ -449,7 +417,7 @@ func (am *DefaultAccountManager) GetPeerJobByID(ctx context.Context, accountID,
return &types.Job{}, nil return &types.Job{}, nil
} }
job, err := am.Store.GetPeerJobByID(ctx, accountID, jobID) job, err := am.Store.GetPeerJobByID(accountID, jobID)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@@ -34,6 +34,7 @@ import (
"github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/management/server/util" "github.com/netbirdio/netbird/management/server/util"
"github.com/netbirdio/netbird/route" "github.com/netbirdio/netbird/route"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status" "github.com/netbirdio/netbird/shared/management/status"
) )
@@ -127,8 +128,18 @@ func GetKeyQueryCondition(s *SqlStore) string {
} }
// SaveJob persists a job in DB // SaveJob persists a job in DB
func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error { func (s *SqlStore) CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error {
result := s.db.Create(job) if err := job.ApplyResponse(jobResponse); err != nil {
return status.Errorf(status.Internal, err.Error())
}
fmt.Printf("new job or update %v\n", job)
result := s.db.
Model(&types.Job{}).Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{"completed_at", "status", "failed_reason", "workload_workload_type", "workload_parameters", "workload_result"}),
}).Create(job)
if result.Error != nil { if result.Error != nil {
log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error) log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error)
return status.Errorf(status.Internal, "failed to create job in store") return status.Errorf(status.Internal, "failed to create job in store")
@@ -137,21 +148,25 @@ func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error {
} }
// job was pending for too long and has been cancelled // job was pending for too long and has been cancelled
// todo call it when we first start the jobChannel to make sure no stuck jobs func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error {
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, peerID string) error {
now := time.Now().UTC() now := time.Now().UTC()
return s.db. result := s.db.
Model(&types.Job{}). Model(&types.Job{}).
Where("peer_id = ? AND status = ?", types.JobStatusPending, peerID). Where(accountAndPeerIDQueryCondition+"AND status = ?", accountID, peerID, types.JobStatusPending).
Updates(map[string]any{ Updates(types.Job{
"status": types.JobStatusFailed, Status: types.JobStatusFailed,
"failed_reason": "Pending job cleanup: marked as failed automatically due to being stuck too long", FailedReason: reason,
"completed_at": now, CompletedAt: &now,
}).Error })
if result.Error != nil {
log.WithContext(ctx).Errorf("failed to mark pending jobs as Failed job in store: %s", result.Error)
return status.Errorf(status.Internal, "failed to mark pending job as Failed in store")
}
return nil
} }
// GetJobByID fetches job by ID // GetJobByID fetches job by ID
func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) { func (s *SqlStore) GetPeerJobByID(accountID, jobID string) (*types.Job, error) {
var job types.Job var job types.Job
err := s.db. err := s.db.
Where(accountAndIDQueryCondition, accountID, jobID). Where(accountAndIDQueryCondition, accountID, jobID).
@@ -163,7 +178,7 @@ func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string)
} }
// get all jobs // get all jobs
func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) { func (s *SqlStore) GetPeerJobs(accountID, peerID string) ([]*types.Job, error) {
var jobs []*types.Job var jobs []*types.Job
err := s.db. err := s.db.
Where(accountAndPeerIDQueryCondition, accountID, peerID). Where(accountAndPeerIDQueryCondition, accountID, peerID).
@@ -177,28 +192,6 @@ func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([
return jobs, nil return jobs, nil
} }
func (s *SqlStore) CompletePeerJob(accountID, jobID, result, failedReason string) error {
now := time.Now().UTC()
updates := map[string]any{
"completed_at": now,
}
if result != "" && failedReason == "" {
updates["status"] = types.JobStatusSucceeded
updates["result"] = result
updates["failed_reason"] = ""
} else {
updates["status"] = types.JobStatusFailed
updates["failed_reason"] = failedReason
}
return s.db.
Model(&types.Job{}).
Where(accountAndIDQueryCondition, accountID, jobID).
Updates(updates).Error
}
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock // AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock
func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) { func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
log.WithContext(ctx).Tracef("acquiring global lock") log.WithContext(ctx).Tracef("acquiring global lock")

View File

@@ -26,6 +26,7 @@ import (
"github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/testutil" "github.com/netbirdio/netbird/management/server/testutil"
"github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/util"
"github.com/netbirdio/netbird/management/server/migration" "github.com/netbirdio/netbird/management/server/migration"
@@ -205,11 +206,10 @@ type Store interface {
IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error) IsPrimaryAccount(ctx context.Context, accountID string) (bool, string, error)
MarkAccountPrimary(ctx context.Context, accountID string) error MarkAccountPrimary(ctx context.Context, accountID string) error
UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error
CreatePeerJob(ctx context.Context, job *types.Job) error GetPeerJobByID(accountID, jobID string) (*types.Job, error)
CompletePeerJob(accountID, jobID, result, failedReason string) error GetPeerJobs(accountID, peerID string) ([]*types.Job, error)
GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error
GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) CreateOrUpdatePeerJob(ctx context.Context, job *types.Job, jobResponse *proto.JobResponse) error
MarkPendingJobsAsFailed(ctx context.Context, peerID string) error
} }
const ( const (

View File

@@ -7,6 +7,7 @@ import (
"github.com/google/uuid" "github.com/google/uuid"
"github.com/netbirdio/netbird/shared/management/http/api" "github.com/netbirdio/netbird/shared/management/http/api"
"github.com/netbirdio/netbird/shared/management/proto"
"github.com/netbirdio/netbird/shared/management/status" "github.com/netbirdio/netbird/shared/management/status"
) )
@@ -153,3 +154,63 @@ func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) e
return nil return nil
} }
// ApplyResponse validates and maps a proto.JobResponse into the Job fields.
func (j *Job) ApplyResponse(resp *proto.JobResponse) error {
if resp == nil {
return nil
}
now := time.Now().UTC()
j.CompletedAt = &now
switch resp.Status {
case proto.JobStatus_succeeded:
j.Status = JobStatusSucceeded
case proto.JobStatus_failed:
j.Status = JobStatusFailed
default:
j.Status = JobStatusPending
}
if len(resp.Reason) > 0 {
j.FailedReason = string(resp.Reason)
}
// Handle workload results (oneof)
var err error
switch r := resp.WorkloadResults.(type) {
case *proto.JobResponse_Bundle:
if j.Workload.Result, err = json.Marshal(r.Bundle); err != nil {
return fmt.Errorf("failed to marshal workload results: %w", err)
}
default:
return fmt.Errorf("unsupported workload response type: %T", r)
}
return nil
}
func (j *Job) ToStreamJobRequest() (*proto.JobRequest, error) {
switch j.Workload.Type {
case JobTypeBundle:
return j.buildStreamBundleResponse()
default:
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
}
}
func (j *Job) buildStreamBundleResponse() (*proto.JobRequest, error) {
var p api.BundleParameters
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
return nil, fmt.Errorf("invalid parameters for bundle job: %w", err)
}
return &proto.JobRequest{
ID: []byte(j.ID),
WorkloadParameters: &proto.JobRequest_Bundle{
Bundle: &proto.BundleParameters{
BundleFor: p.BundleFor,
BundleForTime: int64(p.BundleForTime),
LogFileCount: int32(p.LogFileCount),
Anonymize: p.Anonymize,
},
},
}, nil
}