mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
[management/client] Integrate Job API with Job Stream and Client Engine (#4428)
* integrate api integrate api with stream and implement some client side * fix typo and fix validation * use real daemon address * redo the connect via address * Refactor the debug bundle generator to be ready to use from engine (#4469) * fix tests * fix lint * fix bug with stream * try refactor status 1 * fix convert fullStatus to statusOutput & add logFile * fix tests * fix tests * fix not enough arguments in call to nbstatus.ConvertToStatusOutputOverview * fix status_test * fix(engine): avoid deadlock when stopping engine during debug bundle * use atomic for lock-free * use new lock --------- Co-authored-by: Zoltan Papp <zoltan.pmail@gmail.com>
This commit is contained in:
@@ -32,6 +32,7 @@ import (
|
||||
"github.com/netbirdio/netbird/client/iface/device"
|
||||
nbnetstack "github.com/netbirdio/netbird/client/iface/netstack"
|
||||
"github.com/netbirdio/netbird/client/internal/acl"
|
||||
"github.com/netbirdio/netbird/client/internal/debug"
|
||||
"github.com/netbirdio/netbird/client/internal/dns"
|
||||
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
||||
"github.com/netbirdio/netbird/client/internal/ingressgw"
|
||||
@@ -48,11 +49,13 @@ import (
|
||||
"github.com/netbirdio/netbird/client/internal/routemanager"
|
||||
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
|
||||
"github.com/netbirdio/netbird/client/internal/statemanager"
|
||||
"github.com/netbirdio/netbird/client/jobexec"
|
||||
cProto "github.com/netbirdio/netbird/client/proto"
|
||||
"github.com/netbirdio/netbird/shared/management/domain"
|
||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
||||
|
||||
nbssh "github.com/netbirdio/netbird/client/ssh"
|
||||
nbstatus "github.com/netbirdio/netbird/client/status"
|
||||
"github.com/netbirdio/netbird/client/system"
|
||||
nbdns "github.com/netbirdio/netbird/dns"
|
||||
"github.com/netbirdio/netbird/route"
|
||||
@@ -63,6 +66,7 @@ import (
|
||||
signal "github.com/netbirdio/netbird/shared/signal/client"
|
||||
sProto "github.com/netbirdio/netbird/shared/signal/proto"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
"github.com/netbirdio/netbird/version"
|
||||
)
|
||||
|
||||
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
|
||||
@@ -125,6 +129,11 @@ type EngineConfig struct {
|
||||
BlockInbound bool
|
||||
|
||||
LazyConnectionEnabled bool
|
||||
|
||||
// for debug bundle generation
|
||||
ProfileConfig *profilemanager.Config
|
||||
|
||||
LogFile string
|
||||
}
|
||||
|
||||
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
|
||||
@@ -189,11 +198,15 @@ type Engine struct {
|
||||
stateManager *statemanager.Manager
|
||||
srWatcher *guard.SRWatcher
|
||||
|
||||
// Sync response persistence
|
||||
// Sync response persistence (protected by syncRespMux)
|
||||
syncRespMux sync.RWMutex
|
||||
persistSyncResponse bool
|
||||
latestSyncResponse *mgmProto.SyncResponse
|
||||
connSemaphore *semaphoregroup.SemaphoreGroup
|
||||
flowManager nftypes.FlowManager
|
||||
|
||||
jobExecutor *jobexec.Executor
|
||||
jobExecutorWG sync.WaitGroup
|
||||
}
|
||||
|
||||
// Peer is an instance of the Connection Peer
|
||||
@@ -207,17 +220,7 @@ type localIpUpdater interface {
|
||||
}
|
||||
|
||||
// NewEngine creates a new Connection Engine with probes attached
|
||||
func NewEngine(
|
||||
clientCtx context.Context,
|
||||
clientCancel context.CancelFunc,
|
||||
signalClient signal.Client,
|
||||
mgmClient mgm.Client,
|
||||
relayManager *relayClient.Manager,
|
||||
config *EngineConfig,
|
||||
mobileDep MobileDependency,
|
||||
statusRecorder *peer.Status,
|
||||
checks []*mgmProto.Checks,
|
||||
) *Engine {
|
||||
func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signalClient signal.Client, mgmClient mgm.Client, relayManager *relayClient.Manager, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, c *profilemanager.Config) *Engine {
|
||||
engine := &Engine{
|
||||
clientCtx: clientCtx,
|
||||
clientCancel: clientCancel,
|
||||
@@ -236,6 +239,7 @@ func NewEngine(
|
||||
statusRecorder: statusRecorder,
|
||||
checks: checks,
|
||||
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
|
||||
jobExecutor: jobexec.NewExecutor(),
|
||||
}
|
||||
|
||||
sm := profilemanager.NewServiceManager("")
|
||||
@@ -314,6 +318,8 @@ func (e *Engine) Stop() error {
|
||||
e.cancel()
|
||||
}
|
||||
|
||||
e.jobExecutorWG.Wait() // block until job goroutines finish
|
||||
|
||||
// very ugly but we want to remove peers from the WireGuard interface first before removing interface.
|
||||
// Removing peers happens in the conn.Close() asynchronously
|
||||
time.Sleep(500 * time.Millisecond)
|
||||
@@ -699,9 +705,18 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Persist sync response under the dedicated lock (syncRespMux), not under syncMsgMux.
|
||||
// Read the storage-enabled flag under the syncRespMux too.
|
||||
e.syncRespMux.RLock()
|
||||
enabled := e.persistSyncResponse
|
||||
e.syncRespMux.RUnlock()
|
||||
|
||||
// Store sync response if persistence is enabled
|
||||
if e.persistSyncResponse {
|
||||
if enabled {
|
||||
e.syncRespMux.Lock()
|
||||
e.latestSyncResponse = update
|
||||
e.syncRespMux.Unlock()
|
||||
|
||||
log.Debugf("sync response persisted with serial %d", nm.GetSerial())
|
||||
}
|
||||
|
||||
@@ -886,20 +901,27 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) receiveJobEvents() {
|
||||
e.jobExecutorWG.Add(1)
|
||||
go func() {
|
||||
defer e.jobExecutorWG.Done()
|
||||
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
|
||||
// Simple test handler — replace with real logic
|
||||
log.Infof("Received job request: %+v", msg)
|
||||
// TODO: trigger local debug bundle or other job
|
||||
return &mgmProto.JobResponse{
|
||||
ID: msg.ID,
|
||||
WorkloadResults: &mgmProto.JobResponse_Bundle{
|
||||
Bundle: &mgmProto.BundleResult{
|
||||
UploadKey: "upload-key",
|
||||
},
|
||||
},
|
||||
resp := mgmProto.JobResponse{
|
||||
ID: msg.ID,
|
||||
Status: mgmProto.JobStatus_failed,
|
||||
}
|
||||
switch params := msg.WorkloadParameters.(type) {
|
||||
case *mgmProto.JobRequest_Bundle:
|
||||
bundleResult, err := e.handleBundle(params.Bundle)
|
||||
if err != nil {
|
||||
resp.Reason = []byte(err.Error())
|
||||
return &resp
|
||||
}
|
||||
resp.Status = mgmProto.JobStatus_succeeded
|
||||
resp.WorkloadResults = bundleResult
|
||||
return &resp
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
@@ -914,6 +936,49 @@ func (e *Engine) receiveJobEvents() {
|
||||
log.Debugf("connecting to Management Service jobs stream")
|
||||
}
|
||||
|
||||
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobResponse_Bundle, error) {
|
||||
syncResponse, err := e.GetLatestSyncResponse()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get latest sync response: %w", err)
|
||||
}
|
||||
|
||||
if syncResponse == nil {
|
||||
return nil, errors.New("sync response is not available")
|
||||
}
|
||||
|
||||
// convert fullStatus to statusOutput
|
||||
fullStatus := e.statusRecorder.GetFullStatus()
|
||||
protoFullStatus := nbstatus.ToProtoFullStatus(fullStatus)
|
||||
overview := nbstatus.ConvertToStatusOutputOverview(protoFullStatus, params.Anonymize, version.NetbirdVersion(), "", nil, nil, nil, "", "")
|
||||
statusOutput := nbstatus.ParseToFullDetailSummary(overview)
|
||||
|
||||
bundleDeps := debug.GeneratorDependencies{
|
||||
InternalConfig: e.config.ProfileConfig,
|
||||
StatusRecorder: e.statusRecorder,
|
||||
SyncResponse: syncResponse,
|
||||
LogFile: e.config.LogFile,
|
||||
}
|
||||
|
||||
bundleJobParams := debug.BundleConfig{
|
||||
Anonymize: params.Anonymize,
|
||||
ClientStatus: statusOutput,
|
||||
IncludeSystemInfo: true,
|
||||
LogFileCount: uint32(params.LogFileCount),
|
||||
}
|
||||
|
||||
uploadKey, err := e.jobExecutor.BundleJob(e.ctx, bundleDeps, bundleJobParams, e.config.ProfileConfig.ManagementURL.String())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
response := &mgmProto.JobResponse_Bundle{
|
||||
Bundle: &mgmProto.BundleResult{
|
||||
UploadKey: uploadKey,
|
||||
},
|
||||
}
|
||||
return response, nil
|
||||
}
|
||||
|
||||
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
|
||||
// E.g. when a new peer has been registered and we are allowed to connect to it.
|
||||
func (e *Engine) receiveManagementEvents() {
|
||||
@@ -1761,8 +1826,8 @@ func (e *Engine) stopDNSServer() {
|
||||
|
||||
// SetSyncResponsePersistence enables or disables sync response persistence
|
||||
func (e *Engine) SetSyncResponsePersistence(enabled bool) {
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
e.syncRespMux.Lock()
|
||||
defer e.syncRespMux.Unlock()
|
||||
|
||||
if enabled == e.persistSyncResponse {
|
||||
return
|
||||
@@ -1777,20 +1842,22 @@ func (e *Engine) SetSyncResponsePersistence(enabled bool) {
|
||||
|
||||
// GetLatestSyncResponse returns the stored sync response if persistence is enabled
|
||||
func (e *Engine) GetLatestSyncResponse() (*mgmProto.SyncResponse, error) {
|
||||
e.syncMsgMux.Lock()
|
||||
defer e.syncMsgMux.Unlock()
|
||||
e.syncRespMux.RLock()
|
||||
enabled := e.persistSyncResponse
|
||||
latest := e.latestSyncResponse
|
||||
e.syncRespMux.RUnlock()
|
||||
|
||||
if !e.persistSyncResponse {
|
||||
if !enabled {
|
||||
return nil, errors.New("sync response persistence is disabled")
|
||||
}
|
||||
|
||||
if e.latestSyncResponse == nil {
|
||||
if latest == nil {
|
||||
//nolint:nilnil
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(e.latestSyncResponse))
|
||||
sr, ok := proto.Clone(e.latestSyncResponse).(*mgmProto.SyncResponse)
|
||||
log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(latest))
|
||||
sr, ok := proto.Clone(latest).(*mgmProto.SyncResponse)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("failed to clone sync response")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user