From f1cc9e4b8895c48e906ded9d49fb85264ddb1b79 Mon Sep 17 00:00:00 2001 From: Zoltan Papp Date: Tue, 9 Sep 2025 17:12:51 +0200 Subject: [PATCH] Refactor the debug bundle generator to be ready to use from engine (#4469) --- client/internal/connect.go | 2 + client/internal/debug/upload.go | 101 ++++++++++++++++ .../debug/upload_test.go} | 4 +- client/internal/engine.go | 114 ++++++++---------- client/internal/engine_test.go | 52 +++----- client/jobexec/executor.go | 35 ++++++ client/server/debug.go | 96 +-------------- 7 files changed, 206 insertions(+), 198 deletions(-) create mode 100644 client/internal/debug/upload.go rename client/{server/debug_test.go => internal/debug/upload_test.go} (93%) create mode 100644 client/jobexec/executor.go diff --git a/client/internal/connect.go b/client/internal/connect.go index 523dcaf1f..91219638c 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -444,6 +444,8 @@ func createEngineConfig(key wgtypes.Key, config *profilemanager.Config, peerConf BlockInbound: config.BlockInbound, LazyConnectionEnabled: config.LazyConnectionEnabled, + + ProfileConfig: config, } if config.PreSharedKey != "" { diff --git a/client/internal/debug/upload.go b/client/internal/debug/upload.go new file mode 100644 index 000000000..cdf52409d --- /dev/null +++ b/client/internal/debug/upload.go @@ -0,0 +1,101 @@ +package debug + +import ( + "context" + "crypto/sha256" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + + "github.com/netbirdio/netbird/upload-server/types" +) + +const maxBundleUploadSize = 50 * 1024 * 1024 + +func UploadDebugBundle(ctx context.Context, url, managementURL, filePath string) (key string, err error) { + response, err := getUploadURL(ctx, url, managementURL) + if err != nil { + return "", err + } + + err = upload(ctx, filePath, response) + if err != nil { + return "", err + } + return response.Key, nil +} + +func upload(ctx context.Context, filePath string, response *types.GetURLResponse) error { + fileData, err := os.Open(filePath) + if err != nil { + return fmt.Errorf("open file: %w", err) + } + + defer fileData.Close() + + stat, err := fileData.Stat() + if err != nil { + return fmt.Errorf("stat file: %w", err) + } + + if stat.Size() > maxBundleUploadSize { + return fmt.Errorf("file size exceeds maximum limit of %d bytes", maxBundleUploadSize) + } + + req, err := http.NewRequestWithContext(ctx, "PUT", response.URL, fileData) + if err != nil { + return fmt.Errorf("create PUT request: %w", err) + } + + req.ContentLength = stat.Size() + req.Header.Set("Content-Type", "application/octet-stream") + + putResp, err := http.DefaultClient.Do(req) + if err != nil { + return fmt.Errorf("upload failed: %v", err) + } + defer putResp.Body.Close() + + if putResp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(putResp.Body) + return fmt.Errorf("upload status %d: %s", putResp.StatusCode, string(body)) + } + return nil +} + +func getUploadURL(ctx context.Context, url string, managementURL string) (*types.GetURLResponse, error) { + id := getURLHash(managementURL) + getReq, err := http.NewRequestWithContext(ctx, "GET", url+"?id="+id, nil) + if err != nil { + return nil, fmt.Errorf("create GET request: %w", err) + } + + getReq.Header.Set(types.ClientHeader, types.ClientHeaderValue) + + resp, err := http.DefaultClient.Do(getReq) + if err != nil { + return nil, fmt.Errorf("get presigned URL: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("get presigned URL status %d: %s", resp.StatusCode, string(body)) + } + + urlBytes, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response body: %w", err) + } + var response types.GetURLResponse + if err := json.Unmarshal(urlBytes, &response); err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } + return &response, nil +} + +func getURLHash(url string) string { + return fmt.Sprintf("%x", sha256.Sum256([]byte(url))) +} diff --git a/client/server/debug_test.go b/client/internal/debug/upload_test.go similarity index 93% rename from client/server/debug_test.go rename to client/internal/debug/upload_test.go index 53d9ac8ed..e833c196d 100644 --- a/client/server/debug_test.go +++ b/client/internal/debug/upload_test.go @@ -1,4 +1,4 @@ -package server +package debug import ( "context" @@ -38,7 +38,7 @@ func TestUpload(t *testing.T) { fileContent := []byte("test file content") err := os.WriteFile(file, fileContent, 0640) require.NoError(t, err) - key, err := uploadDebugBundle(context.Background(), testURL+types.GetURLPath, testURL, file) + key, err := UploadDebugBundle(context.Background(), testURL+types.GetURLPath, testURL, file) require.NoError(t, err) id := getURLHash(testURL) require.Contains(t, key, id+"/") diff --git a/client/internal/engine.go b/client/internal/engine.go index 144c48c11..8da4fb3ae 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -34,6 +34,7 @@ import ( "github.com/netbirdio/netbird/client/iface/device" nbnetstack "github.com/netbirdio/netbird/client/iface/netstack" "github.com/netbirdio/netbird/client/internal/acl" + "github.com/netbirdio/netbird/client/internal/debug" "github.com/netbirdio/netbird/client/internal/dns" "github.com/netbirdio/netbird/client/internal/dnsfwd" "github.com/netbirdio/netbird/client/internal/ingressgw" @@ -50,12 +51,12 @@ import ( "github.com/netbirdio/netbird/client/internal/routemanager" "github.com/netbirdio/netbird/client/internal/routemanager/systemops" "github.com/netbirdio/netbird/client/internal/statemanager" + "github.com/netbirdio/netbird/client/jobexec" cProto "github.com/netbirdio/netbird/client/proto" "github.com/netbirdio/netbird/shared/management/domain" semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group" nbssh "github.com/netbirdio/netbird/client/ssh" - nbstatus "github.com/netbirdio/netbird/client/status" "github.com/netbirdio/netbird/client/system" nbdns "github.com/netbirdio/netbird/dns" "github.com/netbirdio/netbird/route" @@ -65,9 +66,7 @@ import ( relayClient "github.com/netbirdio/netbird/shared/relay/client" signal "github.com/netbirdio/netbird/shared/signal/client" sProto "github.com/netbirdio/netbird/shared/signal/proto" - "github.com/netbirdio/netbird/upload-server/types" "github.com/netbirdio/netbird/util" - "google.golang.org/grpc/status" ) // PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer. @@ -131,6 +130,9 @@ type EngineConfig struct { LazyConnectionEnabled bool DaemonAddress string + + // for debug bundle generation + ProfileConfig *profilemanager.Config } // Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers. @@ -200,6 +202,8 @@ type Engine struct { latestSyncResponse *mgmProto.SyncResponse connSemaphore *semaphoregroup.SemaphoreGroup flowManager nftypes.FlowManager + + jobExecutor *jobexec.Executor } // Peer is an instance of the Connection Peer @@ -213,17 +217,7 @@ type localIpUpdater interface { } // NewEngine creates a new Connection Engine with probes attached -func NewEngine( - clientCtx context.Context, - clientCancel context.CancelFunc, - signalClient signal.Client, - mgmClient mgm.Client, - relayManager *relayClient.Manager, - config *EngineConfig, - mobileDep MobileDependency, - statusRecorder *peer.Status, - checks []*mgmProto.Checks, -) *Engine { +func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signalClient signal.Client, mgmClient mgm.Client, relayManager *relayClient.Manager, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, c *profilemanager.Config) *Engine { engine := &Engine{ clientCtx: clientCtx, clientCancel: clientCancel, @@ -242,6 +236,7 @@ func NewEngine( statusRecorder: statusRecorder, checks: checks, connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit), + jobExecutor: jobexec.NewExecutor(), } sm := profilemanager.NewServiceManager("") @@ -906,9 +901,9 @@ func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) { return conn, nil } - func (e *Engine) receiveJobEvents() { go func() { + // todo: engine can be restarted any time. We need to handle the case when a job is being processed while the engine is stopping err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse { resp := mgmProto.JobResponse{ ID: msg.ID, @@ -919,6 +914,7 @@ func (e *Engine) receiveJobEvents() { bundleResult, err := e.handleBundle(params.Bundle) if err != nil { resp.Reason = []byte(err.Error()) + resp.Status = mgmProto.JobStatus_failed return &resp } resp.Status = mgmProto.JobStatus_succeeded @@ -941,65 +937,49 @@ func (e *Engine) receiveJobEvents() { } func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobResponse_Bundle, error) { - // todo: implement with real daemon address - conn, err := e.getPeerClient("unix:///var/run/netbird.sock") + // todo: @Vic, do we need to latest sync response all the time here or could it be nil in the debug bundle? + // todo: e.GetLatestSyncResponse() is exported and has been protected by a mutex. If you will use handleBundle in + // also protected context then will be deadlock + syncResponse, err := e.GetLatestSyncResponse() + if err != nil { + return nil, fmt.Errorf("get latest sync response: %w", err) + } + + if syncResponse == nil { + return nil, errors.New("sync response is not available") + } + + var statusOutput string + + // todo: convert fullStatus to statusOutput + // fullStatus := e.statusRecorder.GetFullStatus() + // overview := nbstatus.ConvertToStatusOutputOverview(statusResp, anonymize, "", nil, nil, nil, "", "") + // statusOutput = nbstatus.ParseToFullDetailSummary(overview) + bundleDeps := debug.GeneratorDependencies{ + InternalConfig: e.config.ProfileConfig, + StatusRecorder: e.statusRecorder, + SyncResponse: syncResponse, + LogFile: "", // todo: figure out where come from the log file. I suppose the client who invokes engine creation knows it. + } + + bundleJobParams := debug.BundleConfig{ + Anonymize: params.Anonymize, + ClientStatus: statusOutput, + IncludeSystemInfo: true, + LogFileCount: uint32(params.LogFileCount), + } + + uploadKey, err := e.jobExecutor.BundleJob(e.ctx, bundleDeps, bundleJobParams, e.config.ProfileConfig.ManagementURL.String()) if err != nil { return nil, err } - defer func() { - if err := conn.Close(); err != nil { - log.Errorf("Failed to close connection: %v", err) - } - }() - statusOutput, err := e.getStatusOutput(params.Anonymize) - if err != nil { - return nil, err - } - request := &cProto.DebugBundleRequest{ - Anonymize: params.Anonymize, - SystemInfo: true, - Status: statusOutput, - LogFileCount: uint32(params.LogFileCount), - UploadURL: types.DefaultBundleURL, - } - service := cProto.NewDaemonServiceClient(conn) - resp, err := service.DebugBundle(e.clientCtx, request) - if err != nil { - return nil, fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message()) - } - - if resp.GetUploadFailureReason() != "" { - return nil, fmt.Errorf("upload failed: " + resp.GetUploadFailureReason()) - } - // return resp.GetUploadedKey(), nil - - return &mgmProto.JobResponse_Bundle{ + response := &mgmProto.JobResponse_Bundle{ Bundle: &mgmProto.BundleResult{ - UploadKey: resp.GetUploadedKey(), + UploadKey: uploadKey, }, - }, nil -} - -func (e *Engine) getStatusOutput(anon bool) (string, error) { - // todo: implement with real daemon address - conn, err := e.getPeerClient("unix:///var/run/netbird.sock") - if err != nil { - return "", err } - defer func() { - if err := conn.Close(); err != nil { - log.Errorf("Failed to close connection: %v", err) - } - }() - - statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true}) - if err != nil { - return "", fmt.Errorf("status failed: %v", status.Convert(err).Message()) - } - return nbstatus.ParseToFullDetailSummary( - nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""), - ), nil + return response, nil } // receiveManagementEvents connects to the Management Service event stream to receive updates from the management service diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index c2c9ae84a..1a179c6ce 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -27,6 +27,7 @@ import ( "golang.zx2c4.com/wireguard/tun/netstack" "github.com/netbirdio/management-integrations/integrations" + "github.com/netbirdio/netbird/management/internals/server/config" "github.com/netbirdio/netbird/management/server/groups" @@ -219,22 +220,13 @@ func TestEngine_SSH(t *testing.T) { defer cancel() relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String()) - engine := NewEngine( - ctx, cancel, - &signal.MockClient{}, - &mgmt.MockClient{}, - relayMgr, - &EngineConfig{ - WgIfaceName: "utun101", - WgAddr: "100.64.0.1/24", - WgPrivateKey: key, - WgPort: 33100, - ServerSSHAllowed: true, - }, - MobileDependency{}, - peer.NewRecorder("https://mgm"), - nil, - ) + engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{ + WgIfaceName: "utun101", + WgAddr: "100.64.0.1/24", + WgPrivateKey: key, + WgPort: 33100, + ServerSSHAllowed: true, + }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil) engine.dnsServer = &dns.MockServer{ UpdateDNSServerFunc: func(serial uint64, update nbdns.Config) error { return nil }, @@ -364,20 +356,12 @@ func TestEngine_UpdateNetworkMap(t *testing.T) { defer cancel() relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String()) - engine := NewEngine( - ctx, cancel, - &signal.MockClient{}, - &mgmt.MockClient{}, - relayMgr, - &EngineConfig{ - WgIfaceName: "utun102", - WgAddr: "100.64.0.1/24", - WgPrivateKey: key, - WgPort: 33100, - }, - MobileDependency{}, - peer.NewRecorder("https://mgm"), - nil) + engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{ + WgIfaceName: "utun102", + WgAddr: "100.64.0.1/24", + WgPrivateKey: key, + WgPort: 33100, + }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil) wgIface := &MockWGIface{ NameFunc: func() string { return "utun102" }, @@ -595,7 +579,7 @@ func TestEngine_Sync(t *testing.T) { WgAddr: "100.64.0.1/24", WgPrivateKey: key, WgPort: 33100, - }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil) + }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil) engine.ctx = ctx engine.dnsServer = &dns.MockServer{ @@ -759,7 +743,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) { WgAddr: wgAddr, WgPrivateKey: key, WgPort: 33100, - }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil) + }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil) engine.ctx = ctx newNet, err := stdnet.NewNet() if err != nil { @@ -960,7 +944,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) { WgAddr: wgAddr, WgPrivateKey: key, WgPort: 33100, - }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil) + }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil) engine.ctx = ctx newNet, err := stdnet.NewNet() @@ -1484,7 +1468,7 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin } relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String()) - e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil), nil + e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil), nil e.ctx = ctx return e, err } diff --git a/client/jobexec/executor.go b/client/jobexec/executor.go new file mode 100644 index 000000000..4d64d6fbf --- /dev/null +++ b/client/jobexec/executor.go @@ -0,0 +1,35 @@ +package jobexec + +import ( + "context" + "fmt" + + log "github.com/sirupsen/logrus" + + "github.com/netbirdio/netbird/client/internal/debug" + "github.com/netbirdio/netbird/upload-server/types" +) + +type Executor struct { +} + +func NewExecutor() *Executor { + return &Executor{} +} + +func (e *Executor) BundleJob(ctx context.Context, debugBundleDependencies debug.GeneratorDependencies, params debug.BundleConfig, mgmURL string) (string, error) { + bundleGenerator := debug.NewBundleGenerator(debugBundleDependencies, params) + + path, err := bundleGenerator.Generate() + if err != nil { + return "", fmt.Errorf("generate debug bundle: %w", err) + } + + key, err := debug.UploadDebugBundle(ctx, types.DefaultBundleURL, mgmURL, path) + if err != nil { + log.Errorf("failed to upload debug bundle to %v", err) + return "", fmt.Errorf("upload debug bundle: %w", err) + } + + return key, nil +} diff --git a/client/server/debug.go b/client/server/debug.go index 056d9df21..330b476e6 100644 --- a/client/server/debug.go +++ b/client/server/debug.go @@ -4,24 +4,16 @@ package server import ( "context" - "crypto/sha256" - "encoding/json" "errors" "fmt" - "io" - "net/http" - "os" log "github.com/sirupsen/logrus" "github.com/netbirdio/netbird/client/internal/debug" "github.com/netbirdio/netbird/client/proto" mgmProto "github.com/netbirdio/netbird/shared/management/proto" - "github.com/netbirdio/netbird/upload-server/types" ) -const maxBundleUploadSize = 50 * 1024 * 1024 - // DebugBundle creates a debug bundle and returns the location. func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (resp *proto.DebugBundleResponse, err error) { s.mutex.Lock() @@ -55,7 +47,7 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) ( if req.GetUploadURL() == "" { return &proto.DebugBundleResponse{Path: path}, nil } - key, err := uploadDebugBundle(context.Background(), req.GetUploadURL(), s.config.ManagementURL.String(), path) + key, err := debug.UploadDebugBundle(context.Background(), req.GetUploadURL(), s.config.ManagementURL.String(), path) if err != nil { log.Errorf("failed to upload debug bundle to %s: %v", req.GetUploadURL(), err) return &proto.DebugBundleResponse{Path: path, UploadFailureReason: err.Error()}, nil @@ -66,92 +58,6 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) ( return &proto.DebugBundleResponse{Path: path, UploadedKey: key}, nil } -func uploadDebugBundle(ctx context.Context, url, managementURL, filePath string) (key string, err error) { - response, err := getUploadURL(ctx, url, managementURL) - if err != nil { - return "", err - } - - err = upload(ctx, filePath, response) - if err != nil { - return "", err - } - return response.Key, nil -} - -func upload(ctx context.Context, filePath string, response *types.GetURLResponse) error { - fileData, err := os.Open(filePath) - if err != nil { - return fmt.Errorf("open file: %w", err) - } - - defer fileData.Close() - - stat, err := fileData.Stat() - if err != nil { - return fmt.Errorf("stat file: %w", err) - } - - if stat.Size() > maxBundleUploadSize { - return fmt.Errorf("file size exceeds maximum limit of %d bytes", maxBundleUploadSize) - } - - req, err := http.NewRequestWithContext(ctx, "PUT", response.URL, fileData) - if err != nil { - return fmt.Errorf("create PUT request: %w", err) - } - - req.ContentLength = stat.Size() - req.Header.Set("Content-Type", "application/octet-stream") - - putResp, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("upload failed: %v", err) - } - defer putResp.Body.Close() - - if putResp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(putResp.Body) - return fmt.Errorf("upload status %d: %s", putResp.StatusCode, string(body)) - } - return nil -} - -func getUploadURL(ctx context.Context, url string, managementURL string) (*types.GetURLResponse, error) { - id := getURLHash(managementURL) - getReq, err := http.NewRequestWithContext(ctx, "GET", url+"?id="+id, nil) - if err != nil { - return nil, fmt.Errorf("create GET request: %w", err) - } - - getReq.Header.Set(types.ClientHeader, types.ClientHeaderValue) - - resp, err := http.DefaultClient.Do(getReq) - if err != nil { - return nil, fmt.Errorf("get presigned URL: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("get presigned URL status %d: %s", resp.StatusCode, string(body)) - } - - urlBytes, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("read response body: %w", err) - } - var response types.GetURLResponse - if err := json.Unmarshal(urlBytes, &response); err != nil { - return nil, fmt.Errorf("unmarshal response: %w", err) - } - return &response, nil -} - -func getURLHash(url string) string { - return fmt.Sprintf("%x", sha256.Sum256([]byte(url))) -} - // GetLogLevel gets the current logging level for the server. func (s *Server) GetLogLevel(_ context.Context, _ *proto.GetLogLevelRequest) (*proto.GetLogLevelResponse, error) { s.mutex.Lock()