Files
netbird/management/server/job/manager.go
Zoltan Papp 58daa674ef [Management/Client] Trigger debug bundle runs from API/Dashboard (#4592) (#4832)
This PR adds the ability to trigger debug bundle generation remotely from the Management API/Dashboard.
2026-01-19 11:22:16 +01:00

183 lines
4.8 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package job
import (
"context"
"fmt"
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/netbirdio/netbird/management/internals/modules/peers"
"github.com/netbirdio/netbird/management/server/store"
"github.com/netbirdio/netbird/management/server/telemetry"
"github.com/netbirdio/netbird/management/server/types"
"github.com/netbirdio/netbird/shared/management/proto"
)
type Event struct {
PeerID string
Request *proto.JobRequest
Response *proto.JobResponse
}
type Manager struct {
mu *sync.RWMutex
jobChannels map[string]*Channel // per-peer job streams
pending map[string]*Event // jobID → event
responseWait time.Duration
metrics telemetry.AppMetrics
Store store.Store
peersManager peers.Manager
}
func NewJobManager(metrics telemetry.AppMetrics, store store.Store, peersManager peers.Manager) *Manager {
return &Manager{
jobChannels: make(map[string]*Channel),
pending: make(map[string]*Event),
responseWait: 5 * time.Minute,
metrics: metrics,
mu: &sync.RWMutex{},
Store: store,
peersManager: peersManager,
}
}
// CreateJobChannel creates or replaces a channel for a peer
func (jm *Manager) CreateJobChannel(ctx context.Context, accountID, peerID string) *Channel {
// all pending jobs stored in db for this peer should be failed
if err := jm.Store.MarkAllPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil {
log.WithContext(ctx).Error(err.Error())
}
jm.mu.Lock()
defer jm.mu.Unlock()
if ch, ok := jm.jobChannels[peerID]; ok {
ch.Close()
delete(jm.jobChannels, peerID)
}
ch := NewChannel()
jm.jobChannels[peerID] = ch
return ch
}
// SendJob sends a job to a peer and tracks it as pending
func (jm *Manager) SendJob(ctx context.Context, accountID, peerID string, req *proto.JobRequest) error {
jm.mu.RLock()
ch, ok := jm.jobChannels[peerID]
jm.mu.RUnlock()
if !ok {
return fmt.Errorf("peer %s has no channel", peerID)
}
event := &Event{
PeerID: peerID,
Request: req,
}
jm.mu.Lock()
jm.pending[string(req.ID)] = event
jm.mu.Unlock()
if err := ch.AddEvent(ctx, jm.responseWait, event); err != nil {
jm.cleanup(ctx, accountID, string(req.ID), err.Error())
return err
}
return nil
}
// HandleResponse marks a job as finished and moves it to completed
func (jm *Manager) HandleResponse(ctx context.Context, resp *proto.JobResponse, peerKey string) error {
jm.mu.Lock()
defer jm.mu.Unlock()
// todo: validate job ID and would be nice to use uuid text marshal instead of string
jobID := string(resp.ID)
// todo: in this map has jobs for all peers in any account. Consider to validate the jobID association for the peer
event, ok := jm.pending[jobID]
if !ok {
return fmt.Errorf("job %s not found", jobID)
}
var job types.Job
// todo: ApplyResponse should be static. Any member value is unusable in this way
if err := job.ApplyResponse(resp); err != nil {
return fmt.Errorf("invalid job response: %v", err)
}
peerID, err := jm.peersManager.GetPeerID(ctx, peerKey)
if err != nil {
return fmt.Errorf("failed to get peer ID: %v", err)
}
if peerID != event.PeerID {
return fmt.Errorf("peer ID mismatch: %s != %s", peerID, event.PeerID)
}
// update or create the store for job response
err = jm.Store.CompletePeerJob(ctx, &job)
if err != nil {
return fmt.Errorf("failed to complete job %s: %v", jobID, err)
}
delete(jm.pending, jobID)
return nil
}
// CloseChannel closes a peers channel and cleans up its jobs
func (jm *Manager) CloseChannel(ctx context.Context, accountID, peerID string) {
jm.mu.Lock()
defer jm.mu.Unlock()
if ch, ok := jm.jobChannels[peerID]; ok {
ch.Close()
delete(jm.jobChannels, peerID)
}
for jobID, ev := range jm.pending {
if ev.PeerID == peerID {
// if the client disconnect and there is pending job then mark it as failed
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, jobID, "Time out peer disconnected"); err != nil {
log.WithContext(ctx).Errorf("failed to mark pending jobs as failed: %v", err)
}
delete(jm.pending, jobID)
}
}
}
// cleanup removes a pending job safely
func (jm *Manager) cleanup(ctx context.Context, accountID, jobID string, reason string) {
jm.mu.Lock()
defer jm.mu.Unlock()
if ev, ok := jm.pending[jobID]; ok {
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, ev.PeerID, jobID, reason); err != nil {
log.WithContext(ctx).Errorf("failed to mark pending jobs as failed: %v", err)
}
delete(jm.pending, jobID)
}
}
func (jm *Manager) IsPeerConnected(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
_, ok := jm.jobChannels[peerID]
return ok
}
func (jm *Manager) IsPeerHasPendingJobs(peerID string) bool {
jm.mu.RLock()
defer jm.mu.RUnlock()
for _, ev := range jm.pending {
if ev.PeerID == peerID {
return true
}
}
return false
}