mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-21 17:56:39 +00:00
Refactor the debug bundle generator to be ready to use from engine (#4469)
This commit is contained in:
@@ -444,6 +444,8 @@ func createEngineConfig(key wgtypes.Key, config *profilemanager.Config, peerConf
|
|||||||
BlockInbound: config.BlockInbound,
|
BlockInbound: config.BlockInbound,
|
||||||
|
|
||||||
LazyConnectionEnabled: config.LazyConnectionEnabled,
|
LazyConnectionEnabled: config.LazyConnectionEnabled,
|
||||||
|
|
||||||
|
ProfileConfig: config,
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.PreSharedKey != "" {
|
if config.PreSharedKey != "" {
|
||||||
|
|||||||
101
client/internal/debug/upload.go
Normal file
101
client/internal/debug/upload.go
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
package debug
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/upload-server/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxBundleUploadSize = 50 * 1024 * 1024
|
||||||
|
|
||||||
|
func UploadDebugBundle(ctx context.Context, url, managementURL, filePath string) (key string, err error) {
|
||||||
|
response, err := getUploadURL(ctx, url, managementURL)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = upload(ctx, filePath, response)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return response.Key, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func upload(ctx context.Context, filePath string, response *types.GetURLResponse) error {
|
||||||
|
fileData, err := os.Open(filePath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer fileData.Close()
|
||||||
|
|
||||||
|
stat, err := fileData.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("stat file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if stat.Size() > maxBundleUploadSize {
|
||||||
|
return fmt.Errorf("file size exceeds maximum limit of %d bytes", maxBundleUploadSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "PUT", response.URL, fileData)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create PUT request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.ContentLength = stat.Size()
|
||||||
|
req.Header.Set("Content-Type", "application/octet-stream")
|
||||||
|
|
||||||
|
putResp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("upload failed: %v", err)
|
||||||
|
}
|
||||||
|
defer putResp.Body.Close()
|
||||||
|
|
||||||
|
if putResp.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(putResp.Body)
|
||||||
|
return fmt.Errorf("upload status %d: %s", putResp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getUploadURL(ctx context.Context, url string, managementURL string) (*types.GetURLResponse, error) {
|
||||||
|
id := getURLHash(managementURL)
|
||||||
|
getReq, err := http.NewRequestWithContext(ctx, "GET", url+"?id="+id, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create GET request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
getReq.Header.Set(types.ClientHeader, types.ClientHeaderValue)
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(getReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get presigned URL: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
return nil, fmt.Errorf("get presigned URL status %d: %s", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
urlBytes, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read response body: %w", err)
|
||||||
|
}
|
||||||
|
var response types.GetURLResponse
|
||||||
|
if err := json.Unmarshal(urlBytes, &response); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal response: %w", err)
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getURLHash(url string) string {
|
||||||
|
return fmt.Sprintf("%x", sha256.Sum256([]byte(url)))
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package server
|
package debug
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -38,7 +38,7 @@ func TestUpload(t *testing.T) {
|
|||||||
fileContent := []byte("test file content")
|
fileContent := []byte("test file content")
|
||||||
err := os.WriteFile(file, fileContent, 0640)
|
err := os.WriteFile(file, fileContent, 0640)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
key, err := uploadDebugBundle(context.Background(), testURL+types.GetURLPath, testURL, file)
|
key, err := UploadDebugBundle(context.Background(), testURL+types.GetURLPath, testURL, file)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
id := getURLHash(testURL)
|
id := getURLHash(testURL)
|
||||||
require.Contains(t, key, id+"/")
|
require.Contains(t, key, id+"/")
|
||||||
@@ -34,6 +34,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/iface/device"
|
"github.com/netbirdio/netbird/client/iface/device"
|
||||||
nbnetstack "github.com/netbirdio/netbird/client/iface/netstack"
|
nbnetstack "github.com/netbirdio/netbird/client/iface/netstack"
|
||||||
"github.com/netbirdio/netbird/client/internal/acl"
|
"github.com/netbirdio/netbird/client/internal/acl"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/debug"
|
||||||
"github.com/netbirdio/netbird/client/internal/dns"
|
"github.com/netbirdio/netbird/client/internal/dns"
|
||||||
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
||||||
"github.com/netbirdio/netbird/client/internal/ingressgw"
|
"github.com/netbirdio/netbird/client/internal/ingressgw"
|
||||||
@@ -50,12 +51,12 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/internal/routemanager"
|
"github.com/netbirdio/netbird/client/internal/routemanager"
|
||||||
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
|
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
|
||||||
"github.com/netbirdio/netbird/client/internal/statemanager"
|
"github.com/netbirdio/netbird/client/internal/statemanager"
|
||||||
|
"github.com/netbirdio/netbird/client/jobexec"
|
||||||
cProto "github.com/netbirdio/netbird/client/proto"
|
cProto "github.com/netbirdio/netbird/client/proto"
|
||||||
"github.com/netbirdio/netbird/shared/management/domain"
|
"github.com/netbirdio/netbird/shared/management/domain"
|
||||||
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
semaphoregroup "github.com/netbirdio/netbird/util/semaphore-group"
|
||||||
|
|
||||||
nbssh "github.com/netbirdio/netbird/client/ssh"
|
nbssh "github.com/netbirdio/netbird/client/ssh"
|
||||||
nbstatus "github.com/netbirdio/netbird/client/status"
|
|
||||||
"github.com/netbirdio/netbird/client/system"
|
"github.com/netbirdio/netbird/client/system"
|
||||||
nbdns "github.com/netbirdio/netbird/dns"
|
nbdns "github.com/netbirdio/netbird/dns"
|
||||||
"github.com/netbirdio/netbird/route"
|
"github.com/netbirdio/netbird/route"
|
||||||
@@ -65,9 +66,7 @@ import (
|
|||||||
relayClient "github.com/netbirdio/netbird/shared/relay/client"
|
relayClient "github.com/netbirdio/netbird/shared/relay/client"
|
||||||
signal "github.com/netbirdio/netbird/shared/signal/client"
|
signal "github.com/netbirdio/netbird/shared/signal/client"
|
||||||
sProto "github.com/netbirdio/netbird/shared/signal/proto"
|
sProto "github.com/netbirdio/netbird/shared/signal/proto"
|
||||||
"github.com/netbirdio/netbird/upload-server/types"
|
|
||||||
"github.com/netbirdio/netbird/util"
|
"github.com/netbirdio/netbird/util"
|
||||||
"google.golang.org/grpc/status"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
|
// PeerConnectionTimeoutMax is a timeout of an initial connection attempt to a remote peer.
|
||||||
@@ -131,6 +130,9 @@ type EngineConfig struct {
|
|||||||
|
|
||||||
LazyConnectionEnabled bool
|
LazyConnectionEnabled bool
|
||||||
DaemonAddress string
|
DaemonAddress string
|
||||||
|
|
||||||
|
// for debug bundle generation
|
||||||
|
ProfileConfig *profilemanager.Config
|
||||||
}
|
}
|
||||||
|
|
||||||
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
|
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
|
||||||
@@ -200,6 +202,8 @@ type Engine struct {
|
|||||||
latestSyncResponse *mgmProto.SyncResponse
|
latestSyncResponse *mgmProto.SyncResponse
|
||||||
connSemaphore *semaphoregroup.SemaphoreGroup
|
connSemaphore *semaphoregroup.SemaphoreGroup
|
||||||
flowManager nftypes.FlowManager
|
flowManager nftypes.FlowManager
|
||||||
|
|
||||||
|
jobExecutor *jobexec.Executor
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peer is an instance of the Connection Peer
|
// Peer is an instance of the Connection Peer
|
||||||
@@ -213,17 +217,7 @@ type localIpUpdater interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewEngine creates a new Connection Engine with probes attached
|
// NewEngine creates a new Connection Engine with probes attached
|
||||||
func NewEngine(
|
func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signalClient signal.Client, mgmClient mgm.Client, relayManager *relayClient.Manager, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, c *profilemanager.Config) *Engine {
|
||||||
clientCtx context.Context,
|
|
||||||
clientCancel context.CancelFunc,
|
|
||||||
signalClient signal.Client,
|
|
||||||
mgmClient mgm.Client,
|
|
||||||
relayManager *relayClient.Manager,
|
|
||||||
config *EngineConfig,
|
|
||||||
mobileDep MobileDependency,
|
|
||||||
statusRecorder *peer.Status,
|
|
||||||
checks []*mgmProto.Checks,
|
|
||||||
) *Engine {
|
|
||||||
engine := &Engine{
|
engine := &Engine{
|
||||||
clientCtx: clientCtx,
|
clientCtx: clientCtx,
|
||||||
clientCancel: clientCancel,
|
clientCancel: clientCancel,
|
||||||
@@ -242,6 +236,7 @@ func NewEngine(
|
|||||||
statusRecorder: statusRecorder,
|
statusRecorder: statusRecorder,
|
||||||
checks: checks,
|
checks: checks,
|
||||||
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
|
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
|
||||||
|
jobExecutor: jobexec.NewExecutor(),
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := profilemanager.NewServiceManager("")
|
sm := profilemanager.NewServiceManager("")
|
||||||
@@ -906,9 +901,9 @@ func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) {
|
|||||||
|
|
||||||
return conn, nil
|
return conn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) receiveJobEvents() {
|
func (e *Engine) receiveJobEvents() {
|
||||||
go func() {
|
go func() {
|
||||||
|
// todo: engine can be restarted any time. We need to handle the case when a job is being processed while the engine is stopping
|
||||||
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
|
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
|
||||||
resp := mgmProto.JobResponse{
|
resp := mgmProto.JobResponse{
|
||||||
ID: msg.ID,
|
ID: msg.ID,
|
||||||
@@ -919,6 +914,7 @@ func (e *Engine) receiveJobEvents() {
|
|||||||
bundleResult, err := e.handleBundle(params.Bundle)
|
bundleResult, err := e.handleBundle(params.Bundle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
resp.Reason = []byte(err.Error())
|
resp.Reason = []byte(err.Error())
|
||||||
|
resp.Status = mgmProto.JobStatus_failed
|
||||||
return &resp
|
return &resp
|
||||||
}
|
}
|
||||||
resp.Status = mgmProto.JobStatus_succeeded
|
resp.Status = mgmProto.JobStatus_succeeded
|
||||||
@@ -941,65 +937,49 @@ func (e *Engine) receiveJobEvents() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobResponse_Bundle, error) {
|
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobResponse_Bundle, error) {
|
||||||
// todo: implement with real daemon address
|
// todo: @Vic, do we need to latest sync response all the time here or could it be nil in the debug bundle?
|
||||||
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
|
// todo: e.GetLatestSyncResponse() is exported and has been protected by a mutex. If you will use handleBundle in
|
||||||
|
// also protected context then will be deadlock
|
||||||
|
syncResponse, err := e.GetLatestSyncResponse()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get latest sync response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if syncResponse == nil {
|
||||||
|
return nil, errors.New("sync response is not available")
|
||||||
|
}
|
||||||
|
|
||||||
|
var statusOutput string
|
||||||
|
|
||||||
|
// todo: convert fullStatus to statusOutput
|
||||||
|
// fullStatus := e.statusRecorder.GetFullStatus()
|
||||||
|
// overview := nbstatus.ConvertToStatusOutputOverview(statusResp, anonymize, "", nil, nil, nil, "", "")
|
||||||
|
// statusOutput = nbstatus.ParseToFullDetailSummary(overview)
|
||||||
|
bundleDeps := debug.GeneratorDependencies{
|
||||||
|
InternalConfig: e.config.ProfileConfig,
|
||||||
|
StatusRecorder: e.statusRecorder,
|
||||||
|
SyncResponse: syncResponse,
|
||||||
|
LogFile: "", // todo: figure out where come from the log file. I suppose the client who invokes engine creation knows it.
|
||||||
|
}
|
||||||
|
|
||||||
|
bundleJobParams := debug.BundleConfig{
|
||||||
|
Anonymize: params.Anonymize,
|
||||||
|
ClientStatus: statusOutput,
|
||||||
|
IncludeSystemInfo: true,
|
||||||
|
LogFileCount: uint32(params.LogFileCount),
|
||||||
|
}
|
||||||
|
|
||||||
|
uploadKey, err := e.jobExecutor.BundleJob(e.ctx, bundleDeps, bundleJobParams, e.config.ProfileConfig.ManagementURL.String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer func() {
|
|
||||||
if err := conn.Close(); err != nil {
|
|
||||||
log.Errorf("Failed to close connection: %v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
statusOutput, err := e.getStatusOutput(params.Anonymize)
|
response := &mgmProto.JobResponse_Bundle{
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
request := &cProto.DebugBundleRequest{
|
|
||||||
Anonymize: params.Anonymize,
|
|
||||||
SystemInfo: true,
|
|
||||||
Status: statusOutput,
|
|
||||||
LogFileCount: uint32(params.LogFileCount),
|
|
||||||
UploadURL: types.DefaultBundleURL,
|
|
||||||
}
|
|
||||||
service := cProto.NewDaemonServiceClient(conn)
|
|
||||||
resp, err := service.DebugBundle(e.clientCtx, request)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message())
|
|
||||||
}
|
|
||||||
|
|
||||||
if resp.GetUploadFailureReason() != "" {
|
|
||||||
return nil, fmt.Errorf("upload failed: " + resp.GetUploadFailureReason())
|
|
||||||
}
|
|
||||||
// return resp.GetUploadedKey(), nil
|
|
||||||
|
|
||||||
return &mgmProto.JobResponse_Bundle{
|
|
||||||
Bundle: &mgmProto.BundleResult{
|
Bundle: &mgmProto.BundleResult{
|
||||||
UploadKey: resp.GetUploadedKey(),
|
UploadKey: uploadKey,
|
||||||
},
|
},
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e *Engine) getStatusOutput(anon bool) (string, error) {
|
|
||||||
// todo: implement with real daemon address
|
|
||||||
conn, err := e.getPeerClient("unix:///var/run/netbird.sock")
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
}
|
||||||
defer func() {
|
return response, nil
|
||||||
if err := conn.Close(); err != nil {
|
|
||||||
log.Errorf("Failed to close connection: %v", err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
statusResp, err := cProto.NewDaemonServiceClient(conn).Status(e.clientCtx, &cProto.StatusRequest{GetFullPeerStatus: true, ShouldRunProbes: true})
|
|
||||||
if err != nil {
|
|
||||||
return "", fmt.Errorf("status failed: %v", status.Convert(err).Message())
|
|
||||||
}
|
|
||||||
return nbstatus.ParseToFullDetailSummary(
|
|
||||||
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", ""),
|
|
||||||
), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
|
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
|
||||||
|
|||||||
@@ -27,6 +27,7 @@ import (
|
|||||||
"golang.zx2c4.com/wireguard/tun/netstack"
|
"golang.zx2c4.com/wireguard/tun/netstack"
|
||||||
|
|
||||||
"github.com/netbirdio/management-integrations/integrations"
|
"github.com/netbirdio/management-integrations/integrations"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/internals/server/config"
|
"github.com/netbirdio/netbird/management/internals/server/config"
|
||||||
"github.com/netbirdio/netbird/management/server/groups"
|
"github.com/netbirdio/netbird/management/server/groups"
|
||||||
|
|
||||||
@@ -219,22 +220,13 @@ func TestEngine_SSH(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
|
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
|
||||||
engine := NewEngine(
|
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{
|
||||||
ctx, cancel,
|
WgIfaceName: "utun101",
|
||||||
&signal.MockClient{},
|
WgAddr: "100.64.0.1/24",
|
||||||
&mgmt.MockClient{},
|
WgPrivateKey: key,
|
||||||
relayMgr,
|
WgPort: 33100,
|
||||||
&EngineConfig{
|
ServerSSHAllowed: true,
|
||||||
WgIfaceName: "utun101",
|
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
|
||||||
WgAddr: "100.64.0.1/24",
|
|
||||||
WgPrivateKey: key,
|
|
||||||
WgPort: 33100,
|
|
||||||
ServerSSHAllowed: true,
|
|
||||||
},
|
|
||||||
MobileDependency{},
|
|
||||||
peer.NewRecorder("https://mgm"),
|
|
||||||
nil,
|
|
||||||
)
|
|
||||||
|
|
||||||
engine.dnsServer = &dns.MockServer{
|
engine.dnsServer = &dns.MockServer{
|
||||||
UpdateDNSServerFunc: func(serial uint64, update nbdns.Config) error { return nil },
|
UpdateDNSServerFunc: func(serial uint64, update nbdns.Config) error { return nil },
|
||||||
@@ -364,20 +356,12 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
|
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
|
||||||
engine := NewEngine(
|
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{
|
||||||
ctx, cancel,
|
WgIfaceName: "utun102",
|
||||||
&signal.MockClient{},
|
WgAddr: "100.64.0.1/24",
|
||||||
&mgmt.MockClient{},
|
WgPrivateKey: key,
|
||||||
relayMgr,
|
WgPort: 33100,
|
||||||
&EngineConfig{
|
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
|
||||||
WgIfaceName: "utun102",
|
|
||||||
WgAddr: "100.64.0.1/24",
|
|
||||||
WgPrivateKey: key,
|
|
||||||
WgPort: 33100,
|
|
||||||
},
|
|
||||||
MobileDependency{},
|
|
||||||
peer.NewRecorder("https://mgm"),
|
|
||||||
nil)
|
|
||||||
|
|
||||||
wgIface := &MockWGIface{
|
wgIface := &MockWGIface{
|
||||||
NameFunc: func() string { return "utun102" },
|
NameFunc: func() string { return "utun102" },
|
||||||
@@ -595,7 +579,7 @@ func TestEngine_Sync(t *testing.T) {
|
|||||||
WgAddr: "100.64.0.1/24",
|
WgAddr: "100.64.0.1/24",
|
||||||
WgPrivateKey: key,
|
WgPrivateKey: key,
|
||||||
WgPort: 33100,
|
WgPort: 33100,
|
||||||
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
|
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
|
||||||
engine.ctx = ctx
|
engine.ctx = ctx
|
||||||
|
|
||||||
engine.dnsServer = &dns.MockServer{
|
engine.dnsServer = &dns.MockServer{
|
||||||
@@ -759,7 +743,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
|
|||||||
WgAddr: wgAddr,
|
WgAddr: wgAddr,
|
||||||
WgPrivateKey: key,
|
WgPrivateKey: key,
|
||||||
WgPort: 33100,
|
WgPort: 33100,
|
||||||
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
|
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
|
||||||
engine.ctx = ctx
|
engine.ctx = ctx
|
||||||
newNet, err := stdnet.NewNet()
|
newNet, err := stdnet.NewNet()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -960,7 +944,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
|
|||||||
WgAddr: wgAddr,
|
WgAddr: wgAddr,
|
||||||
WgPrivateKey: key,
|
WgPrivateKey: key,
|
||||||
WgPort: 33100,
|
WgPort: 33100,
|
||||||
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
|
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
|
||||||
engine.ctx = ctx
|
engine.ctx = ctx
|
||||||
|
|
||||||
newNet, err := stdnet.NewNet()
|
newNet, err := stdnet.NewNet()
|
||||||
@@ -1484,7 +1468,7 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
|
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String())
|
||||||
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil), nil
|
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil), nil
|
||||||
e.ctx = ctx
|
e.ctx = ctx
|
||||||
return e, err
|
return e, err
|
||||||
}
|
}
|
||||||
|
|||||||
35
client/jobexec/executor.go
Normal file
35
client/jobexec/executor.go
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
package jobexec
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/debug"
|
||||||
|
"github.com/netbirdio/netbird/upload-server/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Executor struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewExecutor() *Executor {
|
||||||
|
return &Executor{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Executor) BundleJob(ctx context.Context, debugBundleDependencies debug.GeneratorDependencies, params debug.BundleConfig, mgmURL string) (string, error) {
|
||||||
|
bundleGenerator := debug.NewBundleGenerator(debugBundleDependencies, params)
|
||||||
|
|
||||||
|
path, err := bundleGenerator.Generate()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("generate debug bundle: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := debug.UploadDebugBundle(ctx, types.DefaultBundleURL, mgmURL, path)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to upload debug bundle to %v", err)
|
||||||
|
return "", fmt.Errorf("upload debug bundle: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return key, nil
|
||||||
|
}
|
||||||
@@ -4,24 +4,16 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/internal/debug"
|
"github.com/netbirdio/netbird/client/internal/debug"
|
||||||
"github.com/netbirdio/netbird/client/proto"
|
"github.com/netbirdio/netbird/client/proto"
|
||||||
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
||||||
"github.com/netbirdio/netbird/upload-server/types"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const maxBundleUploadSize = 50 * 1024 * 1024
|
|
||||||
|
|
||||||
// DebugBundle creates a debug bundle and returns the location.
|
// DebugBundle creates a debug bundle and returns the location.
|
||||||
func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (resp *proto.DebugBundleResponse, err error) {
|
func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (resp *proto.DebugBundleResponse, err error) {
|
||||||
s.mutex.Lock()
|
s.mutex.Lock()
|
||||||
@@ -55,7 +47,7 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
|
|||||||
if req.GetUploadURL() == "" {
|
if req.GetUploadURL() == "" {
|
||||||
return &proto.DebugBundleResponse{Path: path}, nil
|
return &proto.DebugBundleResponse{Path: path}, nil
|
||||||
}
|
}
|
||||||
key, err := uploadDebugBundle(context.Background(), req.GetUploadURL(), s.config.ManagementURL.String(), path)
|
key, err := debug.UploadDebugBundle(context.Background(), req.GetUploadURL(), s.config.ManagementURL.String(), path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to upload debug bundle to %s: %v", req.GetUploadURL(), err)
|
log.Errorf("failed to upload debug bundle to %s: %v", req.GetUploadURL(), err)
|
||||||
return &proto.DebugBundleResponse{Path: path, UploadFailureReason: err.Error()}, nil
|
return &proto.DebugBundleResponse{Path: path, UploadFailureReason: err.Error()}, nil
|
||||||
@@ -66,92 +58,6 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
|
|||||||
return &proto.DebugBundleResponse{Path: path, UploadedKey: key}, nil
|
return &proto.DebugBundleResponse{Path: path, UploadedKey: key}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func uploadDebugBundle(ctx context.Context, url, managementURL, filePath string) (key string, err error) {
|
|
||||||
response, err := getUploadURL(ctx, url, managementURL)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = upload(ctx, filePath, response)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return response.Key, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func upload(ctx context.Context, filePath string, response *types.GetURLResponse) error {
|
|
||||||
fileData, err := os.Open(filePath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("open file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer fileData.Close()
|
|
||||||
|
|
||||||
stat, err := fileData.Stat()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("stat file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if stat.Size() > maxBundleUploadSize {
|
|
||||||
return fmt.Errorf("file size exceeds maximum limit of %d bytes", maxBundleUploadSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, "PUT", response.URL, fileData)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("create PUT request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
req.ContentLength = stat.Size()
|
|
||||||
req.Header.Set("Content-Type", "application/octet-stream")
|
|
||||||
|
|
||||||
putResp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("upload failed: %v", err)
|
|
||||||
}
|
|
||||||
defer putResp.Body.Close()
|
|
||||||
|
|
||||||
if putResp.StatusCode != http.StatusOK {
|
|
||||||
body, _ := io.ReadAll(putResp.Body)
|
|
||||||
return fmt.Errorf("upload status %d: %s", putResp.StatusCode, string(body))
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getUploadURL(ctx context.Context, url string, managementURL string) (*types.GetURLResponse, error) {
|
|
||||||
id := getURLHash(managementURL)
|
|
||||||
getReq, err := http.NewRequestWithContext(ctx, "GET", url+"?id="+id, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("create GET request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
getReq.Header.Set(types.ClientHeader, types.ClientHeaderValue)
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(getReq)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("get presigned URL: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
body, _ := io.ReadAll(resp.Body)
|
|
||||||
return nil, fmt.Errorf("get presigned URL status %d: %s", resp.StatusCode, string(body))
|
|
||||||
}
|
|
||||||
|
|
||||||
urlBytes, err := io.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("read response body: %w", err)
|
|
||||||
}
|
|
||||||
var response types.GetURLResponse
|
|
||||||
if err := json.Unmarshal(urlBytes, &response); err != nil {
|
|
||||||
return nil, fmt.Errorf("unmarshal response: %w", err)
|
|
||||||
}
|
|
||||||
return &response, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getURLHash(url string) string {
|
|
||||||
return fmt.Sprintf("%x", sha256.Sum256([]byte(url)))
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetLogLevel gets the current logging level for the server.
|
// GetLogLevel gets the current logging level for the server.
|
||||||
func (s *Server) GetLogLevel(_ context.Context, _ *proto.GetLogLevelRequest) (*proto.GetLogLevelResponse, error) {
|
func (s *Server) GetLogLevel(_ context.Context, _ *proto.GetLogLevelRequest) (*proto.GetLogLevelResponse, error) {
|
||||||
s.mutex.Lock()
|
s.mutex.Lock()
|
||||||
|
|||||||
Reference in New Issue
Block a user