mirror of
https://github.com/netbirdio/netbird.git
synced 2026-05-27 11:09:54 +00:00
Compare commits
47 Commits
feat/dev_v
...
test-ldfla
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
619f1588b3 | ||
|
|
60d2a2c7df | ||
|
|
ff9585735b | ||
|
|
194951c88d | ||
|
|
d63f2e5196 | ||
|
|
14b5637555 | ||
|
|
3d3b05c157 | ||
|
|
f36e206238 | ||
|
|
9a808244d7 | ||
|
|
2dec76f8ea | ||
|
|
224bd8ff22 | ||
|
|
fe88a5662e | ||
|
|
f9f6409f94 | ||
|
|
b03154dce5 | ||
|
|
c57364596a | ||
|
|
2765bcfb89 | ||
|
|
fa6151b849 | ||
|
|
a939c1767c | ||
|
|
938554fb0f | ||
|
|
39bec2dd74 | ||
|
|
554c9bcf4b | ||
|
|
f3639675e7 | ||
|
|
a1457f541b | ||
|
|
9cdfb0d78c | ||
|
|
22d796097e | ||
|
|
aa39a5d528 | ||
|
|
1d2a5371ce | ||
|
|
6898e57686 | ||
|
|
c8bc865f2f | ||
|
|
06bb8658b1 | ||
|
|
8fc4fed3a0 | ||
|
|
df14f1399f | ||
|
|
6d6f090764 | ||
|
|
13febbbfca | ||
|
|
49d36b7e7e | ||
|
|
976787dbf1 | ||
|
|
536b0003ab | ||
|
|
0e9438d658 | ||
|
|
e570570fe5 | ||
|
|
23f9dd04b8 | ||
|
|
7a95bf5652 | ||
|
|
60c5782905 | ||
|
|
5a12c5d345 | ||
|
|
bdae55ab79 | ||
|
|
d01c3d5011 | ||
|
|
17a2af96ea | ||
|
|
cc595da1ad |
2
.github/workflows/wasm-build-validation.yml
vendored
2
.github/workflows/wasm-build-validation.yml
vendored
@@ -47,7 +47,7 @@ jobs:
|
|||||||
with:
|
with:
|
||||||
go-version: "1.23.x"
|
go-version: "1.23.x"
|
||||||
- name: Build Wasm client
|
- name: Build Wasm client
|
||||||
run: GOOS=js GOARCH=wasm go build -o netbird.wasm ./client/wasm/cmd
|
run: GOOS=js GOARCH=wasm go build -o netbird.wasm -ldflags="-s -w" ./client/wasm/cmd
|
||||||
env:
|
env:
|
||||||
CGO_ENABLED: 0
|
CGO_ENABLED: 0
|
||||||
- name: Check Wasm build size
|
- name: Check Wasm build size
|
||||||
|
|||||||
@@ -16,7 +16,6 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/internal/profilemanager"
|
"github.com/netbirdio/netbird/client/internal/profilemanager"
|
||||||
"github.com/netbirdio/netbird/client/proto"
|
"github.com/netbirdio/netbird/client/proto"
|
||||||
"github.com/netbirdio/netbird/client/server"
|
"github.com/netbirdio/netbird/client/server"
|
||||||
nbstatus "github.com/netbirdio/netbird/client/status"
|
|
||||||
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
||||||
"github.com/netbirdio/netbird/upload-server/types"
|
"github.com/netbirdio/netbird/upload-server/types"
|
||||||
)
|
)
|
||||||
@@ -98,7 +97,6 @@ func debugBundle(cmd *cobra.Command, _ []string) error {
|
|||||||
client := proto.NewDaemonServiceClient(conn)
|
client := proto.NewDaemonServiceClient(conn)
|
||||||
request := &proto.DebugBundleRequest{
|
request := &proto.DebugBundleRequest{
|
||||||
Anonymize: anonymizeFlag,
|
Anonymize: anonymizeFlag,
|
||||||
Status: getStatusOutput(cmd, anonymizeFlag),
|
|
||||||
SystemInfo: systemInfoFlag,
|
SystemInfo: systemInfoFlag,
|
||||||
LogFileCount: logFileCount,
|
LogFileCount: logFileCount,
|
||||||
}
|
}
|
||||||
@@ -220,9 +218,6 @@ func runForDuration(cmd *cobra.Command, args []string) error {
|
|||||||
|
|
||||||
time.Sleep(3 * time.Second)
|
time.Sleep(3 * time.Second)
|
||||||
|
|
||||||
headerPostUp := fmt.Sprintf("----- NetBird post-up - Timestamp: %s", time.Now().Format(time.RFC3339))
|
|
||||||
statusOutput := fmt.Sprintf("%s\n%s", headerPostUp, getStatusOutput(cmd, anonymizeFlag))
|
|
||||||
|
|
||||||
if waitErr := waitForDurationOrCancel(cmd.Context(), duration, cmd); waitErr != nil {
|
if waitErr := waitForDurationOrCancel(cmd.Context(), duration, cmd); waitErr != nil {
|
||||||
return waitErr
|
return waitErr
|
||||||
}
|
}
|
||||||
@@ -230,11 +225,8 @@ func runForDuration(cmd *cobra.Command, args []string) error {
|
|||||||
|
|
||||||
cmd.Println("Creating debug bundle...")
|
cmd.Println("Creating debug bundle...")
|
||||||
|
|
||||||
headerPreDown := fmt.Sprintf("----- NetBird pre-down - Timestamp: %s - Duration: %s", time.Now().Format(time.RFC3339), duration)
|
|
||||||
statusOutput = fmt.Sprintf("%s\n%s\n%s", statusOutput, headerPreDown, getStatusOutput(cmd, anonymizeFlag))
|
|
||||||
request := &proto.DebugBundleRequest{
|
request := &proto.DebugBundleRequest{
|
||||||
Anonymize: anonymizeFlag,
|
Anonymize: anonymizeFlag,
|
||||||
Status: statusOutput,
|
|
||||||
SystemInfo: systemInfoFlag,
|
SystemInfo: systemInfoFlag,
|
||||||
LogFileCount: logFileCount,
|
LogFileCount: logFileCount,
|
||||||
}
|
}
|
||||||
@@ -301,25 +293,6 @@ func setSyncResponsePersistence(cmd *cobra.Command, args []string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getStatusOutput(cmd *cobra.Command, anon bool) string {
|
|
||||||
var statusOutputString string
|
|
||||||
statusResp, err := getStatus(cmd.Context(), true)
|
|
||||||
if err != nil {
|
|
||||||
cmd.PrintErrf("Failed to get status: %v\n", err)
|
|
||||||
} else {
|
|
||||||
pm := profilemanager.NewProfileManager()
|
|
||||||
var profName string
|
|
||||||
if activeProf, err := pm.GetActiveProfile(); err == nil {
|
|
||||||
profName = activeProf.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
statusOutputString = nbstatus.ParseToFullDetailSummary(
|
|
||||||
nbstatus.ConvertToStatusOutputOverview(statusResp, anon, "", nil, nil, nil, "", profName),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
return statusOutputString
|
|
||||||
}
|
|
||||||
|
|
||||||
func waitForDurationOrCancel(ctx context.Context, duration time.Duration, cmd *cobra.Command) error {
|
func waitForDurationOrCancel(ctx context.Context, duration time.Duration, cmd *cobra.Command) error {
|
||||||
ticker := time.NewTicker(1 * time.Second)
|
ticker := time.NewTicker(1 * time.Second)
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
@@ -378,7 +351,7 @@ func generateDebugBundle(config *profilemanager.Config, recorder *peer.Status, c
|
|||||||
InternalConfig: config,
|
InternalConfig: config,
|
||||||
StatusRecorder: recorder,
|
StatusRecorder: recorder,
|
||||||
SyncResponse: syncResponse,
|
SyncResponse: syncResponse,
|
||||||
LogFile: logFilePath,
|
LogPath: logFilePath,
|
||||||
},
|
},
|
||||||
debug.BundleConfig{
|
debug.BundleConfig{
|
||||||
IncludeSystemInfo: true,
|
IncludeSystemInfo: true,
|
||||||
|
|||||||
@@ -99,7 +99,7 @@ func statusFunc(cmd *cobra.Command, args []string) error {
|
|||||||
profName = activeProf.Name
|
profName = activeProf.Name
|
||||||
}
|
}
|
||||||
|
|
||||||
var outputInformationHolder = nbstatus.ConvertToStatusOutputOverview(resp, anonymizeFlag, statusFilter, prefixNamesFilter, prefixNamesFilterMap, ipsFilterMap, connectionTypeFilter, profName)
|
var outputInformationHolder = nbstatus.ConvertToStatusOutputOverview(resp.GetFullStatus(), anonymizeFlag, resp.GetDaemonVersion(), statusFilter, prefixNamesFilter, prefixNamesFilterMap, ipsFilterMap, connectionTypeFilter, profName)
|
||||||
var statusOutputString string
|
var statusOutputString string
|
||||||
switch {
|
switch {
|
||||||
case detailFlag:
|
case detailFlag:
|
||||||
|
|||||||
@@ -16,6 +16,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
||||||
nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc"
|
nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
|
|
||||||
clientProto "github.com/netbirdio/netbird/client/proto"
|
clientProto "github.com/netbirdio/netbird/client/proto"
|
||||||
client "github.com/netbirdio/netbird/client/server"
|
client "github.com/netbirdio/netbird/client/server"
|
||||||
@@ -88,6 +89,7 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
|
|||||||
}
|
}
|
||||||
t.Cleanup(cleanUp)
|
t.Cleanup(cleanUp)
|
||||||
|
|
||||||
|
jobManager := job.NewJobManager(nil, store)
|
||||||
eventStore := &activity.InMemoryEventStore{}
|
eventStore := &activity.InMemoryEventStore{}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
@@ -118,13 +120,13 @@ func startManagement(t *testing.T, config *config.Config, testFile string) (*grp
|
|||||||
requestBuffer := mgmt.NewAccountRequestBuffer(ctx, store)
|
requestBuffer := mgmt.NewAccountRequestBuffer(ctx, store)
|
||||||
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, mgmt.MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, mgmt.MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
||||||
|
|
||||||
accountManager, err := mgmt.BuildManager(context.Background(), config, store, networkMapController, nil, "", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
accountManager, err := mgmt.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, iv, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(updateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(updateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
||||||
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, updateManager, secretsManager, nil, &manager.EphemeralManager{}, nil, &mgmt.MockIntegratedValidator{}, networkMapController)
|
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, updateManager, jobManager, secretsManager, nil, &manager.EphemeralManager{}, nil, &mgmt.MockIntegratedValidator{}, networkMapController)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -200,7 +200,7 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command, activeProf *pr
|
|||||||
connectClient := internal.NewConnectClient(ctx, config, r)
|
connectClient := internal.NewConnectClient(ctx, config, r)
|
||||||
SetupDebugHandler(ctx, config, r, connectClient, "")
|
SetupDebugHandler(ctx, config, r, connectClient, "")
|
||||||
|
|
||||||
return connectClient.Run(nil)
|
return connectClient.Run(nil, util.FindFirstLogPath(logFiles))
|
||||||
}
|
}
|
||||||
|
|
||||||
func runInDaemonMode(ctx context.Context, cmd *cobra.Command, pm *profilemanager.ProfileManager, activeProf *profilemanager.Profile, profileSwitched bool) error {
|
func runInDaemonMode(ctx context.Context, cmd *cobra.Command, pm *profilemanager.ProfileManager, activeProf *profilemanager.Profile, profileSwitched bool) error {
|
||||||
|
|||||||
@@ -173,6 +173,7 @@ func (c *Client) Start(startCtx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
recorder := peer.NewRecorder(c.config.ManagementURL.String())
|
recorder := peer.NewRecorder(c.config.ManagementURL.String())
|
||||||
|
|
||||||
client := internal.NewConnectClient(ctx, c.config, recorder)
|
client := internal.NewConnectClient(ctx, c.config, recorder)
|
||||||
|
|
||||||
// either startup error (permanent backoff err) or nil err (successful engine up)
|
// either startup error (permanent backoff err) or nil err (successful engine up)
|
||||||
@@ -180,7 +181,7 @@ func (c *Client) Start(startCtx context.Context) error {
|
|||||||
run := make(chan struct{})
|
run := make(chan struct{})
|
||||||
clientErr := make(chan error, 1)
|
clientErr := make(chan error, 1)
|
||||||
go func() {
|
go func() {
|
||||||
if err := client.Run(run); err != nil {
|
if err := client.Run(run, ""); err != nil {
|
||||||
clientErr <- err
|
clientErr <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|||||||
@@ -52,7 +52,6 @@ func NewConnectClient(
|
|||||||
ctx context.Context,
|
ctx context.Context,
|
||||||
config *profilemanager.Config,
|
config *profilemanager.Config,
|
||||||
statusRecorder *peer.Status,
|
statusRecorder *peer.Status,
|
||||||
|
|
||||||
) *ConnectClient {
|
) *ConnectClient {
|
||||||
return &ConnectClient{
|
return &ConnectClient{
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
@@ -63,8 +62,8 @@ func NewConnectClient(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Run with main logic.
|
// Run with main logic.
|
||||||
func (c *ConnectClient) Run(runningChan chan struct{}) error {
|
func (c *ConnectClient) Run(runningChan chan struct{}, logPath string) error {
|
||||||
return c.run(MobileDependency{}, runningChan)
|
return c.run(MobileDependency{}, runningChan, logPath)
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunOnAndroid with main logic on mobile system
|
// RunOnAndroid with main logic on mobile system
|
||||||
@@ -83,7 +82,7 @@ func (c *ConnectClient) RunOnAndroid(
|
|||||||
HostDNSAddresses: dnsAddresses,
|
HostDNSAddresses: dnsAddresses,
|
||||||
DnsReadyListener: dnsReadyListener,
|
DnsReadyListener: dnsReadyListener,
|
||||||
}
|
}
|
||||||
return c.run(mobileDependency, nil)
|
return c.run(mobileDependency, nil, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConnectClient) RunOniOS(
|
func (c *ConnectClient) RunOniOS(
|
||||||
@@ -101,10 +100,10 @@ func (c *ConnectClient) RunOniOS(
|
|||||||
DnsManager: dnsManager,
|
DnsManager: dnsManager,
|
||||||
StateFilePath: stateFilePath,
|
StateFilePath: stateFilePath,
|
||||||
}
|
}
|
||||||
return c.run(mobileDependency, nil)
|
return c.run(mobileDependency, nil, "")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan struct{}) error {
|
func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan struct{}, logPath string) error {
|
||||||
defer func() {
|
defer func() {
|
||||||
if r := recover(); r != nil {
|
if r := recover(); r != nil {
|
||||||
rec := c.statusRecorder
|
rec := c.statusRecorder
|
||||||
@@ -247,7 +246,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
|
|||||||
relayURLs, token := parseRelayInfo(loginResp)
|
relayURLs, token := parseRelayInfo(loginResp)
|
||||||
peerConfig := loginResp.GetPeerConfig()
|
peerConfig := loginResp.GetPeerConfig()
|
||||||
|
|
||||||
engineConfig, err := createEngineConfig(myPrivateKey, c.config, peerConfig)
|
engineConfig, err := createEngineConfig(myPrivateKey, c.config, peerConfig, logPath)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err)
|
log.Error(err)
|
||||||
return wrapErr(err)
|
return wrapErr(err)
|
||||||
@@ -271,7 +270,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan
|
|||||||
checks := loginResp.GetChecks()
|
checks := loginResp.GetChecks()
|
||||||
|
|
||||||
c.engineMutex.Lock()
|
c.engineMutex.Lock()
|
||||||
c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks)
|
c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks, c.config)
|
||||||
c.engine.SetSyncResponsePersistence(c.persistSyncResponse)
|
c.engine.SetSyncResponsePersistence(c.persistSyncResponse)
|
||||||
c.engineMutex.Unlock()
|
c.engineMutex.Unlock()
|
||||||
|
|
||||||
@@ -410,7 +409,7 @@ func (c *ConnectClient) SetSyncResponsePersistence(enabled bool) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// createEngineConfig converts configuration received from Management Service to EngineConfig
|
// createEngineConfig converts configuration received from Management Service to EngineConfig
|
||||||
func createEngineConfig(key wgtypes.Key, config *profilemanager.Config, peerConfig *mgmProto.PeerConfig) (*EngineConfig, error) {
|
func createEngineConfig(key wgtypes.Key, config *profilemanager.Config, peerConfig *mgmProto.PeerConfig, logPath string) (*EngineConfig, error) {
|
||||||
nm := false
|
nm := false
|
||||||
if config.NetworkMonitor != nil {
|
if config.NetworkMonitor != nil {
|
||||||
nm = *config.NetworkMonitor
|
nm = *config.NetworkMonitor
|
||||||
@@ -445,7 +444,10 @@ func createEngineConfig(key wgtypes.Key, config *profilemanager.Config, peerConf
|
|||||||
|
|
||||||
LazyConnectionEnabled: config.LazyConnectionEnabled,
|
LazyConnectionEnabled: config.LazyConnectionEnabled,
|
||||||
|
|
||||||
MTU: selectMTU(config.MTU, peerConfig.Mtu),
|
MTU: selectMTU(config.MTU, peerConfig.Mtu),
|
||||||
|
LogPath: logPath,
|
||||||
|
|
||||||
|
ProfileConfig: config,
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.PreSharedKey != "" {
|
if config.PreSharedKey != "" {
|
||||||
|
|||||||
@@ -27,8 +27,10 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/anonymize"
|
"github.com/netbirdio/netbird/client/anonymize"
|
||||||
"github.com/netbirdio/netbird/client/internal/peer"
|
"github.com/netbirdio/netbird/client/internal/peer"
|
||||||
"github.com/netbirdio/netbird/client/internal/profilemanager"
|
"github.com/netbirdio/netbird/client/internal/profilemanager"
|
||||||
|
nbstatus "github.com/netbirdio/netbird/client/status"
|
||||||
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
||||||
"github.com/netbirdio/netbird/util"
|
"github.com/netbirdio/netbird/util"
|
||||||
|
"github.com/netbirdio/netbird/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
const readmeContent = `Netbird debug bundle
|
const readmeContent = `Netbird debug bundle
|
||||||
@@ -218,10 +220,9 @@ type BundleGenerator struct {
|
|||||||
internalConfig *profilemanager.Config
|
internalConfig *profilemanager.Config
|
||||||
statusRecorder *peer.Status
|
statusRecorder *peer.Status
|
||||||
syncResponse *mgmProto.SyncResponse
|
syncResponse *mgmProto.SyncResponse
|
||||||
logFile string
|
logPath string
|
||||||
|
|
||||||
anonymize bool
|
anonymize bool
|
||||||
clientStatus string
|
|
||||||
includeSystemInfo bool
|
includeSystemInfo bool
|
||||||
logFileCount uint32
|
logFileCount uint32
|
||||||
|
|
||||||
@@ -230,7 +231,6 @@ type BundleGenerator struct {
|
|||||||
|
|
||||||
type BundleConfig struct {
|
type BundleConfig struct {
|
||||||
Anonymize bool
|
Anonymize bool
|
||||||
ClientStatus string
|
|
||||||
IncludeSystemInfo bool
|
IncludeSystemInfo bool
|
||||||
LogFileCount uint32
|
LogFileCount uint32
|
||||||
}
|
}
|
||||||
@@ -239,7 +239,7 @@ type GeneratorDependencies struct {
|
|||||||
InternalConfig *profilemanager.Config
|
InternalConfig *profilemanager.Config
|
||||||
StatusRecorder *peer.Status
|
StatusRecorder *peer.Status
|
||||||
SyncResponse *mgmProto.SyncResponse
|
SyncResponse *mgmProto.SyncResponse
|
||||||
LogFile string
|
LogPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGenerator {
|
func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGenerator {
|
||||||
@@ -255,10 +255,9 @@ func NewBundleGenerator(deps GeneratorDependencies, cfg BundleConfig) *BundleGen
|
|||||||
internalConfig: deps.InternalConfig,
|
internalConfig: deps.InternalConfig,
|
||||||
statusRecorder: deps.StatusRecorder,
|
statusRecorder: deps.StatusRecorder,
|
||||||
syncResponse: deps.SyncResponse,
|
syncResponse: deps.SyncResponse,
|
||||||
logFile: deps.LogFile,
|
logPath: deps.LogPath,
|
||||||
|
|
||||||
anonymize: cfg.Anonymize,
|
anonymize: cfg.Anonymize,
|
||||||
clientStatus: cfg.ClientStatus,
|
|
||||||
includeSystemInfo: cfg.IncludeSystemInfo,
|
includeSystemInfo: cfg.IncludeSystemInfo,
|
||||||
logFileCount: logFileCount,
|
logFileCount: logFileCount,
|
||||||
}
|
}
|
||||||
@@ -304,13 +303,6 @@ func (g *BundleGenerator) createArchive() error {
|
|||||||
return fmt.Errorf("add status: %w", err)
|
return fmt.Errorf("add status: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if g.statusRecorder != nil {
|
|
||||||
status := g.statusRecorder.GetFullStatus()
|
|
||||||
seedFromStatus(g.anonymizer, &status)
|
|
||||||
} else {
|
|
||||||
log.Debugf("no status recorder available for seeding")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := g.addConfig(); err != nil {
|
if err := g.addConfig(); err != nil {
|
||||||
log.Errorf("failed to add config to debug bundle: %v", err)
|
log.Errorf("failed to add config to debug bundle: %v", err)
|
||||||
}
|
}
|
||||||
@@ -343,7 +335,7 @@ func (g *BundleGenerator) createArchive() error {
|
|||||||
log.Errorf("failed to add wg show output: %v", err)
|
log.Errorf("failed to add wg show output: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if g.logFile != "" && !slices.Contains(util.SpecialLogs, g.logFile) {
|
if g.logPath != "" && !slices.Contains(util.SpecialLogs, g.logPath) {
|
||||||
if err := g.addLogfile(); err != nil {
|
if err := g.addLogfile(); err != nil {
|
||||||
log.Errorf("failed to add log file to debug bundle: %v", err)
|
log.Errorf("failed to add log file to debug bundle: %v", err)
|
||||||
if err := g.trySystemdLogFallback(); err != nil {
|
if err := g.trySystemdLogFallback(); err != nil {
|
||||||
@@ -388,11 +380,26 @@ func (g *BundleGenerator) addReadme() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *BundleGenerator) addStatus() error {
|
func (g *BundleGenerator) addStatus() error {
|
||||||
if status := g.clientStatus; status != "" {
|
if g.statusRecorder != nil {
|
||||||
statusReader := strings.NewReader(status)
|
pm := profilemanager.NewProfileManager()
|
||||||
|
var profName string
|
||||||
|
if activeProf, err := pm.GetActiveProfile(); err == nil {
|
||||||
|
profName = activeProf.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
fullStatus := g.statusRecorder.GetFullStatus()
|
||||||
|
protoFullStatus := nbstatus.ToProtoFullStatus(fullStatus)
|
||||||
|
protoFullStatus.Events = g.statusRecorder.GetEventHistory()
|
||||||
|
overview := nbstatus.ConvertToStatusOutputOverview(protoFullStatus, g.anonymize, version.NetbirdVersion(), "", nil, nil, nil, "", profName)
|
||||||
|
statusOutput := nbstatus.ParseToFullDetailSummary(overview)
|
||||||
|
|
||||||
|
statusReader := strings.NewReader(statusOutput)
|
||||||
if err := g.addFileToZip(statusReader, "status.txt"); err != nil {
|
if err := g.addFileToZip(statusReader, "status.txt"); err != nil {
|
||||||
return fmt.Errorf("add status file to zip: %w", err)
|
return fmt.Errorf("add status file to zip: %w", err)
|
||||||
}
|
}
|
||||||
|
seedFromStatus(g.anonymizer, &fullStatus)
|
||||||
|
} else {
|
||||||
|
log.Debugf("no status recorder available for seeding")
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@@ -662,14 +669,14 @@ func (g *BundleGenerator) addCorruptedStateFiles() error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (g *BundleGenerator) addLogfile() error {
|
func (g *BundleGenerator) addLogfile() error {
|
||||||
if g.logFile == "" {
|
if g.logPath == "" {
|
||||||
log.Debugf("skipping empty log file in debug bundle")
|
log.Debugf("skipping empty log file in debug bundle")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
logDir := filepath.Dir(g.logFile)
|
logDir := filepath.Dir(g.logPath)
|
||||||
|
|
||||||
if err := g.addSingleLogfile(g.logFile, clientLogFile); err != nil {
|
if err := g.addSingleLogfile(g.logPath, clientLogFile); err != nil {
|
||||||
return fmt.Errorf("add client log file to zip: %w", err)
|
return fmt.Errorf("add client log file to zip: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
101
client/internal/debug/upload.go
Normal file
101
client/internal/debug/upload.go
Normal file
@@ -0,0 +1,101 @@
|
|||||||
|
package debug
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/upload-server/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const maxBundleUploadSize = 50 * 1024 * 1024
|
||||||
|
|
||||||
|
func UploadDebugBundle(ctx context.Context, url, managementURL, filePath string) (key string, err error) {
|
||||||
|
response, err := getUploadURL(ctx, url, managementURL)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = upload(ctx, filePath, response)
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
return response.Key, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func upload(ctx context.Context, filePath string, response *types.GetURLResponse) error {
|
||||||
|
fileData, err := os.Open(filePath)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("open file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer fileData.Close()
|
||||||
|
|
||||||
|
stat, err := fileData.Stat()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("stat file: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if stat.Size() > maxBundleUploadSize {
|
||||||
|
return fmt.Errorf("file size exceeds maximum limit of %d bytes", maxBundleUploadSize)
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "PUT", response.URL, fileData)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create PUT request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.ContentLength = stat.Size()
|
||||||
|
req.Header.Set("Content-Type", "application/octet-stream")
|
||||||
|
|
||||||
|
putResp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("upload failed: %v", err)
|
||||||
|
}
|
||||||
|
defer putResp.Body.Close()
|
||||||
|
|
||||||
|
if putResp.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(putResp.Body)
|
||||||
|
return fmt.Errorf("upload status %d: %s", putResp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getUploadURL(ctx context.Context, url string, managementURL string) (*types.GetURLResponse, error) {
|
||||||
|
id := getURLHash(managementURL)
|
||||||
|
getReq, err := http.NewRequestWithContext(ctx, "GET", url+"?id="+id, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("create GET request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
getReq.Header.Set(types.ClientHeader, types.ClientHeaderValue)
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(getReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get presigned URL: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
return nil, fmt.Errorf("get presigned URL status %d: %s", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
urlBytes, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("read response body: %w", err)
|
||||||
|
}
|
||||||
|
var response types.GetURLResponse
|
||||||
|
if err := json.Unmarshal(urlBytes, &response); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal response: %w", err)
|
||||||
|
}
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getURLHash(url string) string {
|
||||||
|
return fmt.Sprintf("%x", sha256.Sum256([]byte(url)))
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
package server
|
package debug
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -38,7 +38,7 @@ func TestUpload(t *testing.T) {
|
|||||||
fileContent := []byte("test file content")
|
fileContent := []byte("test file content")
|
||||||
err := os.WriteFile(file, fileContent, 0640)
|
err := os.WriteFile(file, fileContent, 0640)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
key, err := uploadDebugBundle(context.Background(), testURL+types.GetURLPath, testURL, file)
|
key, err := UploadDebugBundle(context.Background(), testURL+types.GetURLPath, testURL, file)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
id := getURLHash(testURL)
|
id := getURLHash(testURL)
|
||||||
require.Contains(t, key, id+"/")
|
require.Contains(t, key, id+"/")
|
||||||
@@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/iface/device"
|
"github.com/netbirdio/netbird/client/iface/device"
|
||||||
"github.com/netbirdio/netbird/client/iface/udpmux"
|
"github.com/netbirdio/netbird/client/iface/udpmux"
|
||||||
"github.com/netbirdio/netbird/client/internal/acl"
|
"github.com/netbirdio/netbird/client/internal/acl"
|
||||||
|
"github.com/netbirdio/netbird/client/internal/debug"
|
||||||
"github.com/netbirdio/netbird/client/internal/dns"
|
"github.com/netbirdio/netbird/client/internal/dns"
|
||||||
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
|
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
|
||||||
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
"github.com/netbirdio/netbird/client/internal/dnsfwd"
|
||||||
@@ -48,6 +49,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/internal/routemanager"
|
"github.com/netbirdio/netbird/client/internal/routemanager"
|
||||||
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
|
"github.com/netbirdio/netbird/client/internal/routemanager/systemops"
|
||||||
"github.com/netbirdio/netbird/client/internal/statemanager"
|
"github.com/netbirdio/netbird/client/internal/statemanager"
|
||||||
|
"github.com/netbirdio/netbird/client/jobexec"
|
||||||
cProto "github.com/netbirdio/netbird/client/proto"
|
cProto "github.com/netbirdio/netbird/client/proto"
|
||||||
sshconfig "github.com/netbirdio/netbird/client/ssh/config"
|
sshconfig "github.com/netbirdio/netbird/client/ssh/config"
|
||||||
"github.com/netbirdio/netbird/shared/management/domain"
|
"github.com/netbirdio/netbird/shared/management/domain"
|
||||||
@@ -132,6 +134,11 @@ type EngineConfig struct {
|
|||||||
LazyConnectionEnabled bool
|
LazyConnectionEnabled bool
|
||||||
|
|
||||||
MTU uint16
|
MTU uint16
|
||||||
|
|
||||||
|
// for debug bundle generation
|
||||||
|
ProfileConfig *profilemanager.Config
|
||||||
|
|
||||||
|
LogPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
|
// Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers.
|
||||||
@@ -195,7 +202,8 @@ type Engine struct {
|
|||||||
stateManager *statemanager.Manager
|
stateManager *statemanager.Manager
|
||||||
srWatcher *guard.SRWatcher
|
srWatcher *guard.SRWatcher
|
||||||
|
|
||||||
// Sync response persistence
|
// Sync response persistence (protected by syncRespMux)
|
||||||
|
syncRespMux sync.RWMutex
|
||||||
persistSyncResponse bool
|
persistSyncResponse bool
|
||||||
latestSyncResponse *mgmProto.SyncResponse
|
latestSyncResponse *mgmProto.SyncResponse
|
||||||
connSemaphore *semaphoregroup.SemaphoreGroup
|
connSemaphore *semaphoregroup.SemaphoreGroup
|
||||||
@@ -208,6 +216,9 @@ type Engine struct {
|
|||||||
shutdownWg sync.WaitGroup
|
shutdownWg sync.WaitGroup
|
||||||
|
|
||||||
probeStunTurn *relay.StunTurnProbe
|
probeStunTurn *relay.StunTurnProbe
|
||||||
|
|
||||||
|
jobExecutor *jobexec.Executor
|
||||||
|
jobExecutorWG sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
// Peer is an instance of the Connection Peer
|
// Peer is an instance of the Connection Peer
|
||||||
@@ -221,17 +232,7 @@ type localIpUpdater interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewEngine creates a new Connection Engine with probes attached
|
// NewEngine creates a new Connection Engine with probes attached
|
||||||
func NewEngine(
|
func NewEngine(clientCtx context.Context, clientCancel context.CancelFunc, signalClient signal.Client, mgmClient mgm.Client, relayManager *relayClient.Manager, config *EngineConfig, mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, c *profilemanager.Config) *Engine {
|
||||||
clientCtx context.Context,
|
|
||||||
clientCancel context.CancelFunc,
|
|
||||||
signalClient signal.Client,
|
|
||||||
mgmClient mgm.Client,
|
|
||||||
relayManager *relayClient.Manager,
|
|
||||||
config *EngineConfig,
|
|
||||||
mobileDep MobileDependency,
|
|
||||||
statusRecorder *peer.Status,
|
|
||||||
checks []*mgmProto.Checks,
|
|
||||||
) *Engine {
|
|
||||||
engine := &Engine{
|
engine := &Engine{
|
||||||
clientCtx: clientCtx,
|
clientCtx: clientCtx,
|
||||||
clientCancel: clientCancel,
|
clientCancel: clientCancel,
|
||||||
@@ -250,6 +251,7 @@ func NewEngine(
|
|||||||
checks: checks,
|
checks: checks,
|
||||||
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
|
connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit),
|
||||||
probeStunTurn: relay.NewStunTurnProbe(relay.DefaultCacheTTL),
|
probeStunTurn: relay.NewStunTurnProbe(relay.DefaultCacheTTL),
|
||||||
|
jobExecutor: jobexec.NewExecutor(),
|
||||||
}
|
}
|
||||||
|
|
||||||
sm := profilemanager.NewServiceManager("")
|
sm := profilemanager.NewServiceManager("")
|
||||||
@@ -330,6 +332,8 @@ func (e *Engine) Stop() error {
|
|||||||
e.cancel()
|
e.cancel()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
e.jobExecutorWG.Wait() // block until job goroutines finish
|
||||||
|
|
||||||
e.close()
|
e.close()
|
||||||
|
|
||||||
// stop flow manager after wg interface is gone
|
// stop flow manager after wg interface is gone
|
||||||
@@ -516,6 +520,7 @@ func (e *Engine) Start(netbirdConfig *mgmProto.NetbirdConfig, mgmtURL *url.URL)
|
|||||||
|
|
||||||
e.receiveSignalEvents()
|
e.receiveSignalEvents()
|
||||||
e.receiveManagementEvents()
|
e.receiveManagementEvents()
|
||||||
|
e.receiveJobEvents()
|
||||||
|
|
||||||
// starting network monitor at the very last to avoid disruptions
|
// starting network monitor at the very last to avoid disruptions
|
||||||
e.startNetworkMonitor()
|
e.startNetworkMonitor()
|
||||||
@@ -793,9 +798,18 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Persist sync response under the dedicated lock (syncRespMux), not under syncMsgMux.
|
||||||
|
// Read the storage-enabled flag under the syncRespMux too.
|
||||||
|
e.syncRespMux.RLock()
|
||||||
|
enabled := e.persistSyncResponse
|
||||||
|
e.syncRespMux.RUnlock()
|
||||||
|
|
||||||
// Store sync response if persistence is enabled
|
// Store sync response if persistence is enabled
|
||||||
if e.persistSyncResponse {
|
if enabled {
|
||||||
|
e.syncRespMux.Lock()
|
||||||
e.latestSyncResponse = update
|
e.latestSyncResponse = update
|
||||||
|
e.syncRespMux.Unlock()
|
||||||
|
|
||||||
log.Debugf("sync response persisted with serial %d", nm.GetSerial())
|
log.Debugf("sync response persisted with serial %d", nm.GetSerial())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -925,6 +939,77 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error {
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
func (e *Engine) receiveJobEvents() {
|
||||||
|
e.jobExecutorWG.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer e.jobExecutorWG.Done()
|
||||||
|
err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse {
|
||||||
|
resp := mgmProto.JobResponse{
|
||||||
|
ID: msg.ID,
|
||||||
|
Status: mgmProto.JobStatus_failed,
|
||||||
|
}
|
||||||
|
switch params := msg.WorkloadParameters.(type) {
|
||||||
|
case *mgmProto.JobRequest_Bundle:
|
||||||
|
bundleResult, err := e.handleBundle(params.Bundle)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handling bundle: %v", err)
|
||||||
|
resp.Reason = []byte(err.Error())
|
||||||
|
return &resp
|
||||||
|
}
|
||||||
|
resp.Status = mgmProto.JobStatus_succeeded
|
||||||
|
resp.WorkloadResults = bundleResult
|
||||||
|
return &resp
|
||||||
|
default:
|
||||||
|
resp.Reason = []byte(jobexec.ErrJobNotImplemented.Error())
|
||||||
|
return &resp
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// happens if management is unavailable for a long time.
|
||||||
|
// We want to cancel the operation of the whole client
|
||||||
|
_ = CtxGetState(e.ctx).Wrap(ErrResetConnection)
|
||||||
|
e.clientCancel()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.Info("stopped receiving jobs from Management Service")
|
||||||
|
}()
|
||||||
|
log.Info("connecting to Management Service jobs stream")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobResponse_Bundle, error) {
|
||||||
|
log.Infof("handle remote debug bundle request: %s", params.String())
|
||||||
|
syncResponse, err := e.GetLatestSyncResponse()
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("get latest sync response: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
bundleDeps := debug.GeneratorDependencies{
|
||||||
|
InternalConfig: e.config.ProfileConfig,
|
||||||
|
StatusRecorder: e.statusRecorder,
|
||||||
|
SyncResponse: syncResponse,
|
||||||
|
LogPath: e.config.LogPath,
|
||||||
|
}
|
||||||
|
|
||||||
|
bundleJobParams := debug.BundleConfig{
|
||||||
|
Anonymize: params.Anonymize,
|
||||||
|
IncludeSystemInfo: true,
|
||||||
|
LogFileCount: uint32(params.LogFileCount),
|
||||||
|
}
|
||||||
|
|
||||||
|
waitFor := time.Duration(params.BundleForTime) * time.Minute
|
||||||
|
|
||||||
|
uploadKey, err := e.jobExecutor.BundleJob(e.ctx, bundleDeps, bundleJobParams, waitFor, e.config.ProfileConfig.ManagementURL.String())
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
response := &mgmProto.JobResponse_Bundle{
|
||||||
|
Bundle: &mgmProto.BundleResult{
|
||||||
|
UploadKey: uploadKey,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return response, nil
|
||||||
|
}
|
||||||
|
|
||||||
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
|
// receiveManagementEvents connects to the Management Service event stream to receive updates from the management service
|
||||||
// E.g. when a new peer has been registered and we are allowed to connect to it.
|
// E.g. when a new peer has been registered and we are allowed to connect to it.
|
||||||
@@ -1192,7 +1277,7 @@ func toRouteDomains(myPubKey string, routes []*route.Route) []*dnsfwd.ForwarderE
|
|||||||
}
|
}
|
||||||
|
|
||||||
func toDNSConfig(protoDNSConfig *mgmProto.DNSConfig, network netip.Prefix) nbdns.Config {
|
func toDNSConfig(protoDNSConfig *mgmProto.DNSConfig, network netip.Prefix) nbdns.Config {
|
||||||
forwarderPort := uint16(protoDNSConfig.GetForwarderPort())
|
forwarderPort := uint16(protoDNSConfig.GetForwarderPort()) //nolint
|
||||||
if forwarderPort == 0 {
|
if forwarderPort == 0 {
|
||||||
forwarderPort = nbdns.ForwarderClientPort
|
forwarderPort = nbdns.ForwarderClientPort
|
||||||
}
|
}
|
||||||
@@ -1785,8 +1870,8 @@ func (e *Engine) stopDNSServer() {
|
|||||||
|
|
||||||
// SetSyncResponsePersistence enables or disables sync response persistence
|
// SetSyncResponsePersistence enables or disables sync response persistence
|
||||||
func (e *Engine) SetSyncResponsePersistence(enabled bool) {
|
func (e *Engine) SetSyncResponsePersistence(enabled bool) {
|
||||||
e.syncMsgMux.Lock()
|
e.syncRespMux.Lock()
|
||||||
defer e.syncMsgMux.Unlock()
|
defer e.syncRespMux.Unlock()
|
||||||
|
|
||||||
if enabled == e.persistSyncResponse {
|
if enabled == e.persistSyncResponse {
|
||||||
return
|
return
|
||||||
@@ -1801,20 +1886,22 @@ func (e *Engine) SetSyncResponsePersistence(enabled bool) {
|
|||||||
|
|
||||||
// GetLatestSyncResponse returns the stored sync response if persistence is enabled
|
// GetLatestSyncResponse returns the stored sync response if persistence is enabled
|
||||||
func (e *Engine) GetLatestSyncResponse() (*mgmProto.SyncResponse, error) {
|
func (e *Engine) GetLatestSyncResponse() (*mgmProto.SyncResponse, error) {
|
||||||
e.syncMsgMux.Lock()
|
e.syncRespMux.RLock()
|
||||||
defer e.syncMsgMux.Unlock()
|
enabled := e.persistSyncResponse
|
||||||
|
latest := e.latestSyncResponse
|
||||||
|
e.syncRespMux.RUnlock()
|
||||||
|
|
||||||
if !e.persistSyncResponse {
|
if !enabled {
|
||||||
return nil, errors.New("sync response persistence is disabled")
|
return nil, errors.New("sync response persistence is disabled")
|
||||||
}
|
}
|
||||||
|
|
||||||
if e.latestSyncResponse == nil {
|
if latest == nil {
|
||||||
//nolint:nilnil
|
//nolint:nilnil
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(e.latestSyncResponse))
|
log.Debugf("Retrieving latest sync response with size %d bytes", proto.Size(latest))
|
||||||
sr, ok := proto.Clone(e.latestSyncResponse).(*mgmProto.SyncResponse)
|
sr, ok := proto.Clone(latest).(*mgmProto.SyncResponse)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("failed to clone sync response")
|
return nil, fmt.Errorf("failed to clone sync response")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -25,6 +25,7 @@ import (
|
|||||||
"google.golang.org/grpc/keepalive"
|
"google.golang.org/grpc/keepalive"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/internal/stdnet"
|
"github.com/netbirdio/netbird/client/internal/stdnet"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
|
|
||||||
"github.com/netbirdio/management-integrations/integrations"
|
"github.com/netbirdio/management-integrations/integrations"
|
||||||
|
|
||||||
@@ -248,7 +249,7 @@ func TestEngine_SSH(t *testing.T) {
|
|||||||
},
|
},
|
||||||
MobileDependency{},
|
MobileDependency{},
|
||||||
peer.NewRecorder("https://mgm"),
|
peer.NewRecorder("https://mgm"),
|
||||||
nil,
|
nil, nil,
|
||||||
)
|
)
|
||||||
|
|
||||||
engine.dnsServer = &dns.MockServer{
|
engine.dnsServer = &dns.MockServer{
|
||||||
@@ -410,21 +411,13 @@ func TestEngine_UpdateNetworkMap(t *testing.T) {
|
|||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
|
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
|
||||||
engine := NewEngine(
|
engine := NewEngine(ctx, cancel, &signal.MockClient{}, &mgmt.MockClient{}, relayMgr, &EngineConfig{
|
||||||
ctx, cancel,
|
WgIfaceName: "utun102",
|
||||||
&signal.MockClient{},
|
WgAddr: "100.64.0.1/24",
|
||||||
&mgmt.MockClient{},
|
WgPrivateKey: key,
|
||||||
relayMgr,
|
WgPort: 33100,
|
||||||
&EngineConfig{
|
MTU: iface.DefaultMTU,
|
||||||
WgIfaceName: "utun102",
|
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
|
||||||
WgAddr: "100.64.0.1/24",
|
|
||||||
WgPrivateKey: key,
|
|
||||||
WgPort: 33100,
|
|
||||||
MTU: iface.DefaultMTU,
|
|
||||||
},
|
|
||||||
MobileDependency{},
|
|
||||||
peer.NewRecorder("https://mgm"),
|
|
||||||
nil)
|
|
||||||
|
|
||||||
wgIface := &MockWGIface{
|
wgIface := &MockWGIface{
|
||||||
NameFunc: func() string { return "utun102" },
|
NameFunc: func() string { return "utun102" },
|
||||||
@@ -643,7 +636,7 @@ func TestEngine_Sync(t *testing.T) {
|
|||||||
WgPrivateKey: key,
|
WgPrivateKey: key,
|
||||||
WgPort: 33100,
|
WgPort: 33100,
|
||||||
MTU: iface.DefaultMTU,
|
MTU: iface.DefaultMTU,
|
||||||
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
|
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
|
||||||
engine.ctx = ctx
|
engine.ctx = ctx
|
||||||
|
|
||||||
engine.dnsServer = &dns.MockServer{
|
engine.dnsServer = &dns.MockServer{
|
||||||
@@ -808,7 +801,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) {
|
|||||||
WgPrivateKey: key,
|
WgPrivateKey: key,
|
||||||
WgPort: 33100,
|
WgPort: 33100,
|
||||||
MTU: iface.DefaultMTU,
|
MTU: iface.DefaultMTU,
|
||||||
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
|
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
|
||||||
engine.ctx = ctx
|
engine.ctx = ctx
|
||||||
newNet, err := stdnet.NewNet(context.Background(), nil)
|
newNet, err := stdnet.NewNet(context.Background(), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -1010,7 +1003,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) {
|
|||||||
WgPrivateKey: key,
|
WgPrivateKey: key,
|
||||||
WgPort: 33100,
|
WgPort: 33100,
|
||||||
MTU: iface.DefaultMTU,
|
MTU: iface.DefaultMTU,
|
||||||
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil)
|
}, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil)
|
||||||
engine.ctx = ctx
|
engine.ctx = ctx
|
||||||
|
|
||||||
newNet, err := stdnet.NewNet(context.Background(), nil)
|
newNet, err := stdnet.NewNet(context.Background(), nil)
|
||||||
@@ -1536,7 +1529,7 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
|
relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String(), iface.DefaultMTU)
|
||||||
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil), nil
|
e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, nil), nil
|
||||||
e.ctx = ctx
|
e.ctx = ctx
|
||||||
return e, err
|
return e, err
|
||||||
}
|
}
|
||||||
@@ -1595,6 +1588,7 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
|
|||||||
}
|
}
|
||||||
t.Cleanup(cleanUp)
|
t.Cleanup(cleanUp)
|
||||||
|
|
||||||
|
jobManager := job.NewJobManager(nil, store)
|
||||||
eventStore := &activity.InMemoryEventStore{}
|
eventStore := &activity.InMemoryEventStore{}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", err
|
||||||
@@ -1625,13 +1619,13 @@ func startManagement(t *testing.T, dataDir, testFile string) (*grpc.Server, stri
|
|||||||
updateManager := update_channel.NewPeersUpdateManager(metrics)
|
updateManager := update_channel.NewPeersUpdateManager(metrics)
|
||||||
requestBuffer := server.NewAccountRequestBuffer(context.Background(), store)
|
requestBuffer := server.NewAccountRequestBuffer(context.Background(), store)
|
||||||
networkMapController := controller.NewController(context.Background(), store, metrics, updateManager, requestBuffer, server.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(context.Background(), store, metrics, updateManager, requestBuffer, server.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
||||||
accountManager, err := server.BuildManager(context.Background(), config, store, networkMapController, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
accountManager, err := server.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(updateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(updateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
||||||
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, updateManager, secretsManager, nil, &manager.EphemeralManager{}, nil, &server.MockIntegratedValidator{}, networkMapController)
|
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, updateManager, jobManager, secretsManager, nil, &manager.EphemeralManager{}, nil, &server.MockIntegratedValidator{}, networkMapController)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|||||||
66
client/jobexec/executor.go
Normal file
66
client/jobexec/executor.go
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
package jobexec
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/client/internal/debug"
|
||||||
|
"github.com/netbirdio/netbird/upload-server/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
MaxBundleWaitTime = 60 * time.Minute // maximum wait time for bundle generation (1 hour)
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrJobNotImplemented = errors.New("job not implemented")
|
||||||
|
)
|
||||||
|
|
||||||
|
type Executor struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewExecutor() *Executor {
|
||||||
|
return &Executor{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Executor) BundleJob(ctx context.Context, debugBundleDependencies debug.GeneratorDependencies, params debug.BundleConfig, waitForDuration time.Duration, mgmURL string) (string, error) {
|
||||||
|
if waitForDuration > MaxBundleWaitTime {
|
||||||
|
log.Warnf("bundle wait time %v exceeds maximum %v, capping to maximum", waitForDuration, MaxBundleWaitTime)
|
||||||
|
waitForDuration = MaxBundleWaitTime
|
||||||
|
}
|
||||||
|
|
||||||
|
if waitForDuration > 0 {
|
||||||
|
waitFor(ctx, waitForDuration)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("execute debug bundle generation")
|
||||||
|
|
||||||
|
bundleGenerator := debug.NewBundleGenerator(debugBundleDependencies, params)
|
||||||
|
|
||||||
|
path, err := bundleGenerator.Generate()
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("generate debug bundle: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
key, err := debug.UploadDebugBundle(ctx, types.DefaultBundleURL, mgmURL, path)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to upload debug bundle: %v", err)
|
||||||
|
return "", fmt.Errorf("upload debug bundle: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("debug bundle has been generated well")
|
||||||
|
return key, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func waitFor(ctx context.Context, duration time.Duration) {
|
||||||
|
log.Infof("wait for %v minutes before executing debug bundle", duration.Minutes())
|
||||||
|
select {
|
||||||
|
case <-time.After(duration):
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Infof("wait cancelled: %v", ctx.Err())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,7 +1,7 @@
|
|||||||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||||
// versions:
|
// versions:
|
||||||
// protoc-gen-go v1.36.6
|
// protoc-gen-go v1.36.6
|
||||||
// protoc v6.32.1
|
// protoc v6.33.1
|
||||||
// source: daemon.proto
|
// source: daemon.proto
|
||||||
|
|
||||||
package proto
|
package proto
|
||||||
@@ -2611,7 +2611,6 @@ func (x *ForwardingRulesResponse) GetRules() []*ForwardingRule {
|
|||||||
type DebugBundleRequest struct {
|
type DebugBundleRequest struct {
|
||||||
state protoimpl.MessageState `protogen:"open.v1"`
|
state protoimpl.MessageState `protogen:"open.v1"`
|
||||||
Anonymize bool `protobuf:"varint,1,opt,name=anonymize,proto3" json:"anonymize,omitempty"`
|
Anonymize bool `protobuf:"varint,1,opt,name=anonymize,proto3" json:"anonymize,omitempty"`
|
||||||
Status string `protobuf:"bytes,2,opt,name=status,proto3" json:"status,omitempty"`
|
|
||||||
SystemInfo bool `protobuf:"varint,3,opt,name=systemInfo,proto3" json:"systemInfo,omitempty"`
|
SystemInfo bool `protobuf:"varint,3,opt,name=systemInfo,proto3" json:"systemInfo,omitempty"`
|
||||||
UploadURL string `protobuf:"bytes,4,opt,name=uploadURL,proto3" json:"uploadURL,omitempty"`
|
UploadURL string `protobuf:"bytes,4,opt,name=uploadURL,proto3" json:"uploadURL,omitempty"`
|
||||||
LogFileCount uint32 `protobuf:"varint,5,opt,name=logFileCount,proto3" json:"logFileCount,omitempty"`
|
LogFileCount uint32 `protobuf:"varint,5,opt,name=logFileCount,proto3" json:"logFileCount,omitempty"`
|
||||||
@@ -2656,13 +2655,6 @@ func (x *DebugBundleRequest) GetAnonymize() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
func (x *DebugBundleRequest) GetStatus() string {
|
|
||||||
if x != nil {
|
|
||||||
return x.Status
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
func (x *DebugBundleRequest) GetSystemInfo() bool {
|
func (x *DebugBundleRequest) GetSystemInfo() bool {
|
||||||
if x != nil {
|
if x != nil {
|
||||||
return x.SystemInfo
|
return x.SystemInfo
|
||||||
@@ -5526,10 +5518,9 @@ const file_daemon_proto_rawDesc = "" +
|
|||||||
"\x12translatedHostname\x18\x04 \x01(\tR\x12translatedHostname\x128\n" +
|
"\x12translatedHostname\x18\x04 \x01(\tR\x12translatedHostname\x128\n" +
|
||||||
"\x0etranslatedPort\x18\x05 \x01(\v2\x10.daemon.PortInfoR\x0etranslatedPort\"G\n" +
|
"\x0etranslatedPort\x18\x05 \x01(\v2\x10.daemon.PortInfoR\x0etranslatedPort\"G\n" +
|
||||||
"\x17ForwardingRulesResponse\x12,\n" +
|
"\x17ForwardingRulesResponse\x12,\n" +
|
||||||
"\x05rules\x18\x01 \x03(\v2\x16.daemon.ForwardingRuleR\x05rules\"\xac\x01\n" +
|
"\x05rules\x18\x01 \x03(\v2\x16.daemon.ForwardingRuleR\x05rules\"\x94\x01\n" +
|
||||||
"\x12DebugBundleRequest\x12\x1c\n" +
|
"\x12DebugBundleRequest\x12\x1c\n" +
|
||||||
"\tanonymize\x18\x01 \x01(\bR\tanonymize\x12\x16\n" +
|
"\tanonymize\x18\x01 \x01(\bR\tanonymize\x12\x1e\n" +
|
||||||
"\x06status\x18\x02 \x01(\tR\x06status\x12\x1e\n" +
|
|
||||||
"\n" +
|
"\n" +
|
||||||
"systemInfo\x18\x03 \x01(\bR\n" +
|
"systemInfo\x18\x03 \x01(\bR\n" +
|
||||||
"systemInfo\x12\x1c\n" +
|
"systemInfo\x12\x1c\n" +
|
||||||
|
|||||||
@@ -434,7 +434,6 @@ message ForwardingRulesResponse {
|
|||||||
// DebugBundler
|
// DebugBundler
|
||||||
message DebugBundleRequest {
|
message DebugBundleRequest {
|
||||||
bool anonymize = 1;
|
bool anonymize = 1;
|
||||||
string status = 2;
|
|
||||||
bool systemInfo = 3;
|
bool systemInfo = 3;
|
||||||
string uploadURL = 4;
|
string uploadURL = 4;
|
||||||
uint32 logFileCount = 5;
|
uint32 logFileCount = 5;
|
||||||
|
|||||||
@@ -4,24 +4,16 @@ package server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/sha256"
|
|
||||||
"encoding/json"
|
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
|
||||||
"net/http"
|
|
||||||
"os"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/internal/debug"
|
"github.com/netbirdio/netbird/client/internal/debug"
|
||||||
"github.com/netbirdio/netbird/client/proto"
|
"github.com/netbirdio/netbird/client/proto"
|
||||||
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
mgmProto "github.com/netbirdio/netbird/shared/management/proto"
|
||||||
"github.com/netbirdio/netbird/upload-server/types"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
const maxBundleUploadSize = 50 * 1024 * 1024
|
|
||||||
|
|
||||||
// DebugBundle creates a debug bundle and returns the location.
|
// DebugBundle creates a debug bundle and returns the location.
|
||||||
func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (resp *proto.DebugBundleResponse, err error) {
|
func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (resp *proto.DebugBundleResponse, err error) {
|
||||||
s.mutex.Lock()
|
s.mutex.Lock()
|
||||||
@@ -37,11 +29,10 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
|
|||||||
InternalConfig: s.config,
|
InternalConfig: s.config,
|
||||||
StatusRecorder: s.statusRecorder,
|
StatusRecorder: s.statusRecorder,
|
||||||
SyncResponse: syncResponse,
|
SyncResponse: syncResponse,
|
||||||
LogFile: s.logFile,
|
LogPath: s.logFile,
|
||||||
},
|
},
|
||||||
debug.BundleConfig{
|
debug.BundleConfig{
|
||||||
Anonymize: req.GetAnonymize(),
|
Anonymize: req.GetAnonymize(),
|
||||||
ClientStatus: req.GetStatus(),
|
|
||||||
IncludeSystemInfo: req.GetSystemInfo(),
|
IncludeSystemInfo: req.GetSystemInfo(),
|
||||||
LogFileCount: req.GetLogFileCount(),
|
LogFileCount: req.GetLogFileCount(),
|
||||||
},
|
},
|
||||||
@@ -55,7 +46,7 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
|
|||||||
if req.GetUploadURL() == "" {
|
if req.GetUploadURL() == "" {
|
||||||
return &proto.DebugBundleResponse{Path: path}, nil
|
return &proto.DebugBundleResponse{Path: path}, nil
|
||||||
}
|
}
|
||||||
key, err := uploadDebugBundle(context.Background(), req.GetUploadURL(), s.config.ManagementURL.String(), path)
|
key, err := debug.UploadDebugBundle(context.Background(), req.GetUploadURL(), s.config.ManagementURL.String(), path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to upload debug bundle to %s: %v", req.GetUploadURL(), err)
|
log.Errorf("failed to upload debug bundle to %s: %v", req.GetUploadURL(), err)
|
||||||
return &proto.DebugBundleResponse{Path: path, UploadFailureReason: err.Error()}, nil
|
return &proto.DebugBundleResponse{Path: path, UploadFailureReason: err.Error()}, nil
|
||||||
@@ -66,92 +57,6 @@ func (s *Server) DebugBundle(_ context.Context, req *proto.DebugBundleRequest) (
|
|||||||
return &proto.DebugBundleResponse{Path: path, UploadedKey: key}, nil
|
return &proto.DebugBundleResponse{Path: path, UploadedKey: key}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func uploadDebugBundle(ctx context.Context, url, managementURL, filePath string) (key string, err error) {
|
|
||||||
response, err := getUploadURL(ctx, url, managementURL)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
|
|
||||||
err = upload(ctx, filePath, response)
|
|
||||||
if err != nil {
|
|
||||||
return "", err
|
|
||||||
}
|
|
||||||
return response.Key, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func upload(ctx context.Context, filePath string, response *types.GetURLResponse) error {
|
|
||||||
fileData, err := os.Open(filePath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("open file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer fileData.Close()
|
|
||||||
|
|
||||||
stat, err := fileData.Stat()
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("stat file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if stat.Size() > maxBundleUploadSize {
|
|
||||||
return fmt.Errorf("file size exceeds maximum limit of %d bytes", maxBundleUploadSize)
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, "PUT", response.URL, fileData)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("create PUT request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
req.ContentLength = stat.Size()
|
|
||||||
req.Header.Set("Content-Type", "application/octet-stream")
|
|
||||||
|
|
||||||
putResp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("upload failed: %v", err)
|
|
||||||
}
|
|
||||||
defer putResp.Body.Close()
|
|
||||||
|
|
||||||
if putResp.StatusCode != http.StatusOK {
|
|
||||||
body, _ := io.ReadAll(putResp.Body)
|
|
||||||
return fmt.Errorf("upload status %d: %s", putResp.StatusCode, string(body))
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getUploadURL(ctx context.Context, url string, managementURL string) (*types.GetURLResponse, error) {
|
|
||||||
id := getURLHash(managementURL)
|
|
||||||
getReq, err := http.NewRequestWithContext(ctx, "GET", url+"?id="+id, nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("create GET request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
getReq.Header.Set(types.ClientHeader, types.ClientHeaderValue)
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(getReq)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("get presigned URL: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
body, _ := io.ReadAll(resp.Body)
|
|
||||||
return nil, fmt.Errorf("get presigned URL status %d: %s", resp.StatusCode, string(body))
|
|
||||||
}
|
|
||||||
|
|
||||||
urlBytes, err := io.ReadAll(resp.Body)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("read response body: %w", err)
|
|
||||||
}
|
|
||||||
var response types.GetURLResponse
|
|
||||||
if err := json.Unmarshal(urlBytes, &response); err != nil {
|
|
||||||
return nil, fmt.Errorf("unmarshal response: %w", err)
|
|
||||||
}
|
|
||||||
return &response, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func getURLHash(url string) string {
|
|
||||||
return fmt.Sprintf("%x", sha256.Sum256([]byte(url)))
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetLogLevel gets the current logging level for the server.
|
// GetLogLevel gets the current logging level for the server.
|
||||||
func (s *Server) GetLogLevel(_ context.Context, _ *proto.GetLogLevelRequest) (*proto.GetLogLevelResponse, error) {
|
func (s *Server) GetLogLevel(_ context.Context, _ *proto.GetLogLevelRequest) (*proto.GetLogLevelResponse, error) {
|
||||||
s.mutex.Lock()
|
s.mutex.Lock()
|
||||||
|
|||||||
@@ -13,15 +13,11 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/cenkalti/backoff/v4"
|
"github.com/cenkalti/backoff/v4"
|
||||||
"golang.org/x/exp/maps"
|
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
||||||
"google.golang.org/protobuf/types/known/durationpb"
|
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
gstatus "google.golang.org/grpc/status"
|
gstatus "google.golang.org/grpc/status"
|
||||||
"google.golang.org/protobuf/types/known/timestamppb"
|
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/internal/auth"
|
"github.com/netbirdio/netbird/client/internal/auth"
|
||||||
"github.com/netbirdio/netbird/client/internal/profilemanager"
|
"github.com/netbirdio/netbird/client/internal/profilemanager"
|
||||||
@@ -32,6 +28,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/client/internal"
|
"github.com/netbirdio/netbird/client/internal"
|
||||||
"github.com/netbirdio/netbird/client/internal/peer"
|
"github.com/netbirdio/netbird/client/internal/peer"
|
||||||
"github.com/netbirdio/netbird/client/proto"
|
"github.com/netbirdio/netbird/client/proto"
|
||||||
|
nbstatus "github.com/netbirdio/netbird/client/status"
|
||||||
"github.com/netbirdio/netbird/version"
|
"github.com/netbirdio/netbird/version"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -1080,7 +1077,7 @@ func (s *Server) Status(
|
|||||||
if msg.GetFullPeerStatus {
|
if msg.GetFullPeerStatus {
|
||||||
s.runProbes(msg.ShouldRunProbes)
|
s.runProbes(msg.ShouldRunProbes)
|
||||||
fullStatus := s.statusRecorder.GetFullStatus()
|
fullStatus := s.statusRecorder.GetFullStatus()
|
||||||
pbFullStatus := toProtoFullStatus(fullStatus)
|
pbFullStatus := nbstatus.ToProtoFullStatus(fullStatus)
|
||||||
pbFullStatus.Events = s.statusRecorder.GetEventHistory()
|
pbFullStatus.Events = s.statusRecorder.GetEventHistory()
|
||||||
|
|
||||||
pbFullStatus.SshServerState = s.getSSHServerState()
|
pbFullStatus.SshServerState = s.getSSHServerState()
|
||||||
@@ -1538,7 +1535,7 @@ func (s *Server) connect(ctx context.Context, config *profilemanager.Config, sta
|
|||||||
log.Tracef("running client connection")
|
log.Tracef("running client connection")
|
||||||
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
|
s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder)
|
||||||
s.connectClient.SetSyncResponsePersistence(s.persistSyncResponse)
|
s.connectClient.SetSyncResponsePersistence(s.persistSyncResponse)
|
||||||
if err := s.connectClient.Run(runningChan); err != nil {
|
if err := s.connectClient.Run(runningChan, s.logFile); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@@ -1612,94 +1609,6 @@ func parseEnvDuration(envVar string, defaultDuration time.Duration) time.Duratio
|
|||||||
return defaultDuration
|
return defaultDuration
|
||||||
}
|
}
|
||||||
|
|
||||||
func toProtoFullStatus(fullStatus peer.FullStatus) *proto.FullStatus {
|
|
||||||
pbFullStatus := proto.FullStatus{
|
|
||||||
ManagementState: &proto.ManagementState{},
|
|
||||||
SignalState: &proto.SignalState{},
|
|
||||||
LocalPeerState: &proto.LocalPeerState{},
|
|
||||||
Peers: []*proto.PeerState{},
|
|
||||||
}
|
|
||||||
|
|
||||||
pbFullStatus.ManagementState.URL = fullStatus.ManagementState.URL
|
|
||||||
pbFullStatus.ManagementState.Connected = fullStatus.ManagementState.Connected
|
|
||||||
if err := fullStatus.ManagementState.Error; err != nil {
|
|
||||||
pbFullStatus.ManagementState.Error = err.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
pbFullStatus.SignalState.URL = fullStatus.SignalState.URL
|
|
||||||
pbFullStatus.SignalState.Connected = fullStatus.SignalState.Connected
|
|
||||||
if err := fullStatus.SignalState.Error; err != nil {
|
|
||||||
pbFullStatus.SignalState.Error = err.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
pbFullStatus.LocalPeerState.IP = fullStatus.LocalPeerState.IP
|
|
||||||
pbFullStatus.LocalPeerState.PubKey = fullStatus.LocalPeerState.PubKey
|
|
||||||
pbFullStatus.LocalPeerState.KernelInterface = fullStatus.LocalPeerState.KernelInterface
|
|
||||||
pbFullStatus.LocalPeerState.Fqdn = fullStatus.LocalPeerState.FQDN
|
|
||||||
pbFullStatus.LocalPeerState.RosenpassPermissive = fullStatus.RosenpassState.Permissive
|
|
||||||
pbFullStatus.LocalPeerState.RosenpassEnabled = fullStatus.RosenpassState.Enabled
|
|
||||||
pbFullStatus.LocalPeerState.Networks = maps.Keys(fullStatus.LocalPeerState.Routes)
|
|
||||||
pbFullStatus.NumberOfForwardingRules = int32(fullStatus.NumOfForwardingRules)
|
|
||||||
pbFullStatus.LazyConnectionEnabled = fullStatus.LazyConnectionEnabled
|
|
||||||
|
|
||||||
for _, peerState := range fullStatus.Peers {
|
|
||||||
pbPeerState := &proto.PeerState{
|
|
||||||
IP: peerState.IP,
|
|
||||||
PubKey: peerState.PubKey,
|
|
||||||
ConnStatus: peerState.ConnStatus.String(),
|
|
||||||
ConnStatusUpdate: timestamppb.New(peerState.ConnStatusUpdate),
|
|
||||||
Relayed: peerState.Relayed,
|
|
||||||
LocalIceCandidateType: peerState.LocalIceCandidateType,
|
|
||||||
RemoteIceCandidateType: peerState.RemoteIceCandidateType,
|
|
||||||
LocalIceCandidateEndpoint: peerState.LocalIceCandidateEndpoint,
|
|
||||||
RemoteIceCandidateEndpoint: peerState.RemoteIceCandidateEndpoint,
|
|
||||||
RelayAddress: peerState.RelayServerAddress,
|
|
||||||
Fqdn: peerState.FQDN,
|
|
||||||
LastWireguardHandshake: timestamppb.New(peerState.LastWireguardHandshake),
|
|
||||||
BytesRx: peerState.BytesRx,
|
|
||||||
BytesTx: peerState.BytesTx,
|
|
||||||
RosenpassEnabled: peerState.RosenpassEnabled,
|
|
||||||
Networks: maps.Keys(peerState.GetRoutes()),
|
|
||||||
Latency: durationpb.New(peerState.Latency),
|
|
||||||
SshHostKey: peerState.SSHHostKey,
|
|
||||||
}
|
|
||||||
pbFullStatus.Peers = append(pbFullStatus.Peers, pbPeerState)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, relayState := range fullStatus.Relays {
|
|
||||||
pbRelayState := &proto.RelayState{
|
|
||||||
URI: relayState.URI,
|
|
||||||
Available: relayState.Err == nil,
|
|
||||||
}
|
|
||||||
if err := relayState.Err; err != nil {
|
|
||||||
pbRelayState.Error = err.Error()
|
|
||||||
}
|
|
||||||
pbFullStatus.Relays = append(pbFullStatus.Relays, pbRelayState)
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, dnsState := range fullStatus.NSGroupStates {
|
|
||||||
var err string
|
|
||||||
if dnsState.Error != nil {
|
|
||||||
err = dnsState.Error.Error()
|
|
||||||
}
|
|
||||||
|
|
||||||
var servers []string
|
|
||||||
for _, server := range dnsState.Servers {
|
|
||||||
servers = append(servers, server.String())
|
|
||||||
}
|
|
||||||
|
|
||||||
pbDnsState := &proto.NSGroupState{
|
|
||||||
Servers: servers,
|
|
||||||
Domains: dnsState.Domains,
|
|
||||||
Enabled: dnsState.Enabled,
|
|
||||||
Error: err,
|
|
||||||
}
|
|
||||||
pbFullStatus.DnsServers = append(pbFullStatus.DnsServers, pbDnsState)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &pbFullStatus
|
|
||||||
}
|
|
||||||
|
|
||||||
// sendTerminalNotification sends a terminal notification message
|
// sendTerminalNotification sends a terminal notification message
|
||||||
// to inform the user that the NetBird connection session has expired.
|
// to inform the user that the NetBird connection session has expired.
|
||||||
func sendTerminalNotification() error {
|
func sendTerminalNotification() error {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
||||||
nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc"
|
nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/internals/server/config"
|
"github.com/netbirdio/netbird/management/internals/server/config"
|
||||||
"github.com/netbirdio/netbird/management/server/groups"
|
"github.com/netbirdio/netbird/management/server/groups"
|
||||||
@@ -294,6 +295,7 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
|
|||||||
}
|
}
|
||||||
t.Cleanup(cleanUp)
|
t.Cleanup(cleanUp)
|
||||||
|
|
||||||
|
jobManager := job.NewJobManager(nil, store)
|
||||||
eventStore := &activity.InMemoryEventStore{}
|
eventStore := &activity.InMemoryEventStore{}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", err
|
||||||
@@ -317,13 +319,13 @@ func startManagement(t *testing.T, signalAddr string, counter *int) (*grpc.Serve
|
|||||||
requestBuffer := server.NewAccountRequestBuffer(context.Background(), store)
|
requestBuffer := server.NewAccountRequestBuffer(context.Background(), store)
|
||||||
peersUpdateManager := update_channel.NewPeersUpdateManager(metrics)
|
peersUpdateManager := update_channel.NewPeersUpdateManager(metrics)
|
||||||
networkMapController := controller.NewController(context.Background(), store, metrics, peersUpdateManager, requestBuffer, server.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(context.Background(), store, metrics, peersUpdateManager, requestBuffer, server.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
||||||
accountManager, err := server.BuildManager(context.Background(), config, store, networkMapController, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
accountManager, err := server.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|
||||||
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(peersUpdateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
||||||
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, peersUpdateManager, secretsManager, nil, &manager.EphemeralManager{}, nil, &server.MockIntegratedValidator{}, networkMapController)
|
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, peersUpdateManager, jobManager, secretsManager, nil, &manager.EphemeralManager{}, nil, &server.MockIntegratedValidator{}, networkMapController)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, "", err
|
return nil, "", err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,8 +11,12 @@ import (
|
|||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"google.golang.org/protobuf/types/known/durationpb"
|
||||||
|
"google.golang.org/protobuf/types/known/timestamppb"
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
|
|
||||||
|
"golang.org/x/exp/maps"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/anonymize"
|
"github.com/netbirdio/netbird/client/anonymize"
|
||||||
"github.com/netbirdio/netbird/client/internal/peer"
|
"github.com/netbirdio/netbird/client/internal/peer"
|
||||||
probeRelay "github.com/netbirdio/netbird/client/internal/relay"
|
probeRelay "github.com/netbirdio/netbird/client/internal/relay"
|
||||||
@@ -115,9 +119,7 @@ type OutputOverview struct {
|
|||||||
SSHServerState SSHServerStateOutput `json:"sshServer" yaml:"sshServer"`
|
SSHServerState SSHServerStateOutput `json:"sshServer" yaml:"sshServer"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func ConvertToStatusOutputOverview(resp *proto.StatusResponse, anon bool, statusFilter string, prefixNamesFilter []string, prefixNamesFilterMap map[string]struct{}, ipsFilter map[string]struct{}, connectionTypeFilter string, profName string) OutputOverview {
|
func ConvertToStatusOutputOverview(pbFullStatus *proto.FullStatus, anon bool, daemonVersion string, statusFilter string, prefixNamesFilter []string, prefixNamesFilterMap map[string]struct{}, ipsFilter map[string]struct{}, connectionTypeFilter string, profName string) OutputOverview {
|
||||||
pbFullStatus := resp.GetFullStatus()
|
|
||||||
|
|
||||||
managementState := pbFullStatus.GetManagementState()
|
managementState := pbFullStatus.GetManagementState()
|
||||||
managementOverview := ManagementStateOutput{
|
managementOverview := ManagementStateOutput{
|
||||||
URL: managementState.GetURL(),
|
URL: managementState.GetURL(),
|
||||||
@@ -133,13 +135,13 @@ func ConvertToStatusOutputOverview(resp *proto.StatusResponse, anon bool, status
|
|||||||
}
|
}
|
||||||
|
|
||||||
relayOverview := mapRelays(pbFullStatus.GetRelays())
|
relayOverview := mapRelays(pbFullStatus.GetRelays())
|
||||||
peersOverview := mapPeers(resp.GetFullStatus().GetPeers(), statusFilter, prefixNamesFilter, prefixNamesFilterMap, ipsFilter, connectionTypeFilter)
|
|
||||||
sshServerOverview := mapSSHServer(pbFullStatus.GetSshServerState())
|
sshServerOverview := mapSSHServer(pbFullStatus.GetSshServerState())
|
||||||
|
peersOverview := mapPeers(pbFullStatus.GetPeers(), statusFilter, prefixNamesFilter, prefixNamesFilterMap, ipsFilter, connectionTypeFilter)
|
||||||
|
|
||||||
overview := OutputOverview{
|
overview := OutputOverview{
|
||||||
Peers: peersOverview,
|
Peers: peersOverview,
|
||||||
CliVersion: version.NetbirdVersion(),
|
CliVersion: version.NetbirdVersion(),
|
||||||
DaemonVersion: resp.GetDaemonVersion(),
|
DaemonVersion: daemonVersion,
|
||||||
ManagementState: managementOverview,
|
ManagementState: managementOverview,
|
||||||
SignalState: signalOverview,
|
SignalState: signalOverview,
|
||||||
Relays: relayOverview,
|
Relays: relayOverview,
|
||||||
@@ -544,6 +546,94 @@ func ParseToFullDetailSummary(overview OutputOverview) string {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func ToProtoFullStatus(fullStatus peer.FullStatus) *proto.FullStatus {
|
||||||
|
pbFullStatus := proto.FullStatus{
|
||||||
|
ManagementState: &proto.ManagementState{},
|
||||||
|
SignalState: &proto.SignalState{},
|
||||||
|
LocalPeerState: &proto.LocalPeerState{},
|
||||||
|
Peers: []*proto.PeerState{},
|
||||||
|
}
|
||||||
|
|
||||||
|
pbFullStatus.ManagementState.URL = fullStatus.ManagementState.URL
|
||||||
|
pbFullStatus.ManagementState.Connected = fullStatus.ManagementState.Connected
|
||||||
|
if err := fullStatus.ManagementState.Error; err != nil {
|
||||||
|
pbFullStatus.ManagementState.Error = err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
pbFullStatus.SignalState.URL = fullStatus.SignalState.URL
|
||||||
|
pbFullStatus.SignalState.Connected = fullStatus.SignalState.Connected
|
||||||
|
if err := fullStatus.SignalState.Error; err != nil {
|
||||||
|
pbFullStatus.SignalState.Error = err.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
pbFullStatus.LocalPeerState.IP = fullStatus.LocalPeerState.IP
|
||||||
|
pbFullStatus.LocalPeerState.PubKey = fullStatus.LocalPeerState.PubKey
|
||||||
|
pbFullStatus.LocalPeerState.KernelInterface = fullStatus.LocalPeerState.KernelInterface
|
||||||
|
pbFullStatus.LocalPeerState.Fqdn = fullStatus.LocalPeerState.FQDN
|
||||||
|
pbFullStatus.LocalPeerState.RosenpassPermissive = fullStatus.RosenpassState.Permissive
|
||||||
|
pbFullStatus.LocalPeerState.RosenpassEnabled = fullStatus.RosenpassState.Enabled
|
||||||
|
pbFullStatus.LocalPeerState.Networks = maps.Keys(fullStatus.LocalPeerState.Routes)
|
||||||
|
pbFullStatus.NumberOfForwardingRules = int32(fullStatus.NumOfForwardingRules)
|
||||||
|
pbFullStatus.LazyConnectionEnabled = fullStatus.LazyConnectionEnabled
|
||||||
|
|
||||||
|
for _, peerState := range fullStatus.Peers {
|
||||||
|
pbPeerState := &proto.PeerState{
|
||||||
|
IP: peerState.IP,
|
||||||
|
PubKey: peerState.PubKey,
|
||||||
|
ConnStatus: peerState.ConnStatus.String(),
|
||||||
|
ConnStatusUpdate: timestamppb.New(peerState.ConnStatusUpdate),
|
||||||
|
Relayed: peerState.Relayed,
|
||||||
|
LocalIceCandidateType: peerState.LocalIceCandidateType,
|
||||||
|
RemoteIceCandidateType: peerState.RemoteIceCandidateType,
|
||||||
|
LocalIceCandidateEndpoint: peerState.LocalIceCandidateEndpoint,
|
||||||
|
RemoteIceCandidateEndpoint: peerState.RemoteIceCandidateEndpoint,
|
||||||
|
RelayAddress: peerState.RelayServerAddress,
|
||||||
|
Fqdn: peerState.FQDN,
|
||||||
|
LastWireguardHandshake: timestamppb.New(peerState.LastWireguardHandshake),
|
||||||
|
BytesRx: peerState.BytesRx,
|
||||||
|
BytesTx: peerState.BytesTx,
|
||||||
|
RosenpassEnabled: peerState.RosenpassEnabled,
|
||||||
|
Networks: maps.Keys(peerState.GetRoutes()),
|
||||||
|
Latency: durationpb.New(peerState.Latency),
|
||||||
|
SshHostKey: peerState.SSHHostKey,
|
||||||
|
}
|
||||||
|
pbFullStatus.Peers = append(pbFullStatus.Peers, pbPeerState)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, relayState := range fullStatus.Relays {
|
||||||
|
pbRelayState := &proto.RelayState{
|
||||||
|
URI: relayState.URI,
|
||||||
|
Available: relayState.Err == nil,
|
||||||
|
}
|
||||||
|
if err := relayState.Err; err != nil {
|
||||||
|
pbRelayState.Error = err.Error()
|
||||||
|
}
|
||||||
|
pbFullStatus.Relays = append(pbFullStatus.Relays, pbRelayState)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, dnsState := range fullStatus.NSGroupStates {
|
||||||
|
var err string
|
||||||
|
if dnsState.Error != nil {
|
||||||
|
err = dnsState.Error.Error()
|
||||||
|
}
|
||||||
|
|
||||||
|
var servers []string
|
||||||
|
for _, server := range dnsState.Servers {
|
||||||
|
servers = append(servers, server.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
pbDnsState := &proto.NSGroupState{
|
||||||
|
Servers: servers,
|
||||||
|
Domains: dnsState.Domains,
|
||||||
|
Enabled: dnsState.Enabled,
|
||||||
|
Error: err,
|
||||||
|
}
|
||||||
|
pbFullStatus.DnsServers = append(pbFullStatus.DnsServers, pbDnsState)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &pbFullStatus
|
||||||
|
}
|
||||||
|
|
||||||
func parsePeers(peers PeersStateOutput, rosenpassEnabled, rosenpassPermissive bool) string {
|
func parsePeers(peers PeersStateOutput, rosenpassEnabled, rosenpassPermissive bool) string {
|
||||||
var (
|
var (
|
||||||
peersString = ""
|
peersString = ""
|
||||||
|
|||||||
@@ -238,7 +238,7 @@ var overview = OutputOverview{
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestConversionFromFullStatusToOutputOverview(t *testing.T) {
|
func TestConversionFromFullStatusToOutputOverview(t *testing.T) {
|
||||||
convertedResult := ConvertToStatusOutputOverview(resp, false, "", nil, nil, nil, "", "")
|
convertedResult := ConvertToStatusOutputOverview(resp.GetFullStatus(), false, resp.GetDaemonVersion(), "", nil, nil, nil, "", "")
|
||||||
|
|
||||||
assert.Equal(t, overview, convertedResult)
|
assert.Equal(t, overview, convertedResult)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,9 +18,7 @@ import (
|
|||||||
"github.com/skratchdot/open-golang/open"
|
"github.com/skratchdot/open-golang/open"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/internal"
|
"github.com/netbirdio/netbird/client/internal"
|
||||||
"github.com/netbirdio/netbird/client/internal/profilemanager"
|
|
||||||
"github.com/netbirdio/netbird/client/proto"
|
"github.com/netbirdio/netbird/client/proto"
|
||||||
nbstatus "github.com/netbirdio/netbird/client/status"
|
|
||||||
uptypes "github.com/netbirdio/netbird/upload-server/types"
|
uptypes "github.com/netbirdio/netbird/upload-server/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -291,19 +289,18 @@ func (s *serviceClient) handleRunForDuration(
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
statusOutput, err := s.collectDebugData(conn, initialState, params, progressUI)
|
defer s.restoreServiceState(conn, initialState)
|
||||||
if err != nil {
|
|
||||||
|
if err := s.collectDebugData(conn, initialState, params, progressUI); err != nil {
|
||||||
handleError(progressUI, err.Error())
|
handleError(progressUI, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.createDebugBundleFromCollection(conn, params, statusOutput, progressUI); err != nil {
|
if err := s.createDebugBundleFromCollection(conn, params, progressUI); err != nil {
|
||||||
handleError(progressUI, err.Error())
|
handleError(progressUI, err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
s.restoreServiceState(conn, initialState)
|
|
||||||
|
|
||||||
progressUI.statusLabel.SetText("Bundle created successfully")
|
progressUI.statusLabel.SetText("Bundle created successfully")
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -417,68 +414,33 @@ func (s *serviceClient) collectDebugData(
|
|||||||
state *debugInitialState,
|
state *debugInitialState,
|
||||||
params *debugCollectionParams,
|
params *debugCollectionParams,
|
||||||
progress *progressUI,
|
progress *progressUI,
|
||||||
) (string, error) {
|
) error {
|
||||||
ctx, cancel := context.WithTimeout(s.ctx, params.duration)
|
ctx, cancel := context.WithTimeout(s.ctx, params.duration)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
startProgressTracker(ctx, &wg, params.duration, progress)
|
startProgressTracker(ctx, &wg, params.duration, progress)
|
||||||
|
|
||||||
if err := s.configureServiceForDebug(conn, state, params.enablePersistence); err != nil {
|
if err := s.configureServiceForDebug(conn, state, params.enablePersistence); err != nil {
|
||||||
return "", err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
pm := profilemanager.NewProfileManager()
|
|
||||||
var profName string
|
|
||||||
if activeProf, err := pm.GetActiveProfile(); err == nil {
|
|
||||||
profName = activeProf.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
postUpStatus, err := conn.Status(s.ctx, &proto.StatusRequest{GetFullPeerStatus: true})
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("Failed to get post-up status: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var postUpStatusOutput string
|
|
||||||
if postUpStatus != nil {
|
|
||||||
overview := nbstatus.ConvertToStatusOutputOverview(postUpStatus, params.anonymize, "", nil, nil, nil, "", profName)
|
|
||||||
postUpStatusOutput = nbstatus.ParseToFullDetailSummary(overview)
|
|
||||||
}
|
|
||||||
headerPostUp := fmt.Sprintf("----- NetBird post-up - Timestamp: %s", time.Now().Format(time.RFC3339))
|
|
||||||
statusOutput := fmt.Sprintf("%s\n%s", headerPostUp, postUpStatusOutput)
|
|
||||||
|
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
progress.progressBar.Hide()
|
progress.progressBar.Hide()
|
||||||
progress.statusLabel.SetText("Collecting debug data...")
|
progress.statusLabel.SetText("Collecting debug data...")
|
||||||
|
|
||||||
preDownStatus, err := conn.Status(s.ctx, &proto.StatusRequest{GetFullPeerStatus: true})
|
return nil
|
||||||
if err != nil {
|
|
||||||
log.Warnf("Failed to get pre-down status: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var preDownStatusOutput string
|
|
||||||
if preDownStatus != nil {
|
|
||||||
overview := nbstatus.ConvertToStatusOutputOverview(preDownStatus, params.anonymize, "", nil, nil, nil, "", profName)
|
|
||||||
preDownStatusOutput = nbstatus.ParseToFullDetailSummary(overview)
|
|
||||||
}
|
|
||||||
headerPreDown := fmt.Sprintf("----- NetBird pre-down - Timestamp: %s - Duration: %s",
|
|
||||||
time.Now().Format(time.RFC3339), params.duration)
|
|
||||||
statusOutput = fmt.Sprintf("%s\n%s\n%s", statusOutput, headerPreDown, preDownStatusOutput)
|
|
||||||
|
|
||||||
return statusOutput, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create the debug bundle with collected data
|
// Create the debug bundle with collected data
|
||||||
func (s *serviceClient) createDebugBundleFromCollection(
|
func (s *serviceClient) createDebugBundleFromCollection(
|
||||||
conn proto.DaemonServiceClient,
|
conn proto.DaemonServiceClient,
|
||||||
params *debugCollectionParams,
|
params *debugCollectionParams,
|
||||||
statusOutput string,
|
|
||||||
progress *progressUI,
|
progress *progressUI,
|
||||||
) error {
|
) error {
|
||||||
progress.statusLabel.SetText("Creating debug bundle with collected logs...")
|
progress.statusLabel.SetText("Creating debug bundle with collected logs...")
|
||||||
|
|
||||||
request := &proto.DebugBundleRequest{
|
request := &proto.DebugBundleRequest{
|
||||||
Anonymize: params.anonymize,
|
Anonymize: params.anonymize,
|
||||||
Status: statusOutput,
|
|
||||||
SystemInfo: params.systemInfo,
|
SystemInfo: params.systemInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -581,26 +543,8 @@ func (s *serviceClient) createDebugBundle(anonymize bool, systemInfo bool, uploa
|
|||||||
return nil, fmt.Errorf("get client: %v", err)
|
return nil, fmt.Errorf("get client: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
pm := profilemanager.NewProfileManager()
|
|
||||||
var profName string
|
|
||||||
if activeProf, err := pm.GetActiveProfile(); err == nil {
|
|
||||||
profName = activeProf.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
statusResp, err := conn.Status(s.ctx, &proto.StatusRequest{GetFullPeerStatus: true})
|
|
||||||
if err != nil {
|
|
||||||
log.Warnf("failed to get status for debug bundle: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var statusOutput string
|
|
||||||
if statusResp != nil {
|
|
||||||
overview := nbstatus.ConvertToStatusOutputOverview(statusResp, anonymize, "", nil, nil, nil, "", profName)
|
|
||||||
statusOutput = nbstatus.ParseToFullDetailSummary(overview)
|
|
||||||
}
|
|
||||||
|
|
||||||
request := &proto.DebugBundleRequest{
|
request := &proto.DebugBundleRequest{
|
||||||
Anonymize: anonymize,
|
Anonymize: anonymize,
|
||||||
Status: statusOutput,
|
|
||||||
SystemInfo: systemInfo,
|
SystemInfo: systemInfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
2
go.mod
2
go.mod
@@ -66,6 +66,7 @@ require (
|
|||||||
github.com/mitchellh/hashstructure/v2 v2.0.2
|
github.com/mitchellh/hashstructure/v2 v2.0.2
|
||||||
github.com/netbirdio/management-integrations/integrations v0.0.0-20251027212525-d751b79f5d48
|
github.com/netbirdio/management-integrations/integrations v0.0.0-20251027212525-d751b79f5d48
|
||||||
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45
|
github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250805121659-6b4ac470ca45
|
||||||
|
github.com/oapi-codegen/runtime v1.1.2
|
||||||
github.com/okta/okta-sdk-golang/v2 v2.18.0
|
github.com/okta/okta-sdk-golang/v2 v2.18.0
|
||||||
github.com/oschwald/maxminddb-golang v1.12.0
|
github.com/oschwald/maxminddb-golang v1.12.0
|
||||||
github.com/patrickmn/go-cache v2.1.0+incompatible
|
github.com/patrickmn/go-cache v2.1.0+incompatible
|
||||||
@@ -132,6 +133,7 @@ require (
|
|||||||
github.com/Microsoft/go-winio v0.6.2 // indirect
|
github.com/Microsoft/go-winio v0.6.2 // indirect
|
||||||
github.com/Microsoft/hcsshim v0.12.3 // indirect
|
github.com/Microsoft/hcsshim v0.12.3 // indirect
|
||||||
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
|
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be // indirect
|
||||||
|
github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect
|
||||||
github.com/awnumar/memcall v0.4.0 // indirect
|
github.com/awnumar/memcall v0.4.0 // indirect
|
||||||
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
|
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.10 // indirect
|
||||||
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
|
github.com/aws/aws-sdk-go-v2/credentials v1.17.67 // indirect
|
||||||
|
|||||||
8
go.sum
8
go.sum
@@ -27,10 +27,13 @@ github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERo
|
|||||||
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
|
||||||
github.com/Microsoft/hcsshim v0.12.3 h1:LS9NXqXhMoqNCplK1ApmVSfB4UnVLRDWRapB6EIlxE0=
|
github.com/Microsoft/hcsshim v0.12.3 h1:LS9NXqXhMoqNCplK1ApmVSfB4UnVLRDWRapB6EIlxE0=
|
||||||
github.com/Microsoft/hcsshim v0.12.3/go.mod h1:Iyl1WVpZzr+UkzjekHZbV8o5Z9ZkxNGx6CtY2Qg/JVQ=
|
github.com/Microsoft/hcsshim v0.12.3/go.mod h1:Iyl1WVpZzr+UkzjekHZbV8o5Z9ZkxNGx6CtY2Qg/JVQ=
|
||||||
|
github.com/RaveNoX/go-jsoncommentstrip v1.0.0/go.mod h1:78ihd09MekBnJnxpICcwzCMzGrKSKYe4AqU6PDYYpjk=
|
||||||
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible h1:hqcTK6ZISdip65SR792lwYJTa/axESA0889D3UlZbLo=
|
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible h1:hqcTK6ZISdip65SR792lwYJTa/axESA0889D3UlZbLo=
|
||||||
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible/go.mod h1:6B1nuc1MUs6c62ODZDl7hVE5Pv7O2XGSkgg2olnq34I=
|
github.com/TheJumpCloud/jcapi-go v3.0.0+incompatible/go.mod h1:6B1nuc1MUs6c62ODZDl7hVE5Pv7O2XGSkgg2olnq34I=
|
||||||
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
|
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be h1:9AeTilPcZAjCFIImctFaOjnTIavg87rW78vTPkQqLI8=
|
||||||
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
|
github.com/anmitsu/go-shlex v0.0.0-20200514113438-38f4b401e2be/go.mod h1:ySMOLuWl6zY27l47sB3qLNK6tF2fkHG55UZxx8oIVo4=
|
||||||
|
github.com/apapsch/go-jsonmerge/v2 v2.0.0 h1:axGnT1gRIfimI7gJifB699GoE/oq+F2MU7Dml6nw9rQ=
|
||||||
|
github.com/apapsch/go-jsonmerge/v2 v2.0.0/go.mod h1:lvDnEdqiQrp0O42VQGgmlKpxL1AP2+08jFMw88y4klk=
|
||||||
github.com/awnumar/memcall v0.4.0 h1:B7hgZYdfH6Ot1Goaz8jGne/7i8xD4taZie/PNSFZ29g=
|
github.com/awnumar/memcall v0.4.0 h1:B7hgZYdfH6Ot1Goaz8jGne/7i8xD4taZie/PNSFZ29g=
|
||||||
github.com/awnumar/memcall v0.4.0/go.mod h1:8xOx1YbfyuCg3Fy6TO8DK0kZUua3V42/goA5Ru47E8w=
|
github.com/awnumar/memcall v0.4.0/go.mod h1:8xOx1YbfyuCg3Fy6TO8DK0kZUua3V42/goA5Ru47E8w=
|
||||||
github.com/awnumar/memguard v0.23.0 h1:sJ3a1/SWlcuKIQ7MV+R9p0Pvo9CWsMbGZvcZQtmc68A=
|
github.com/awnumar/memguard v0.23.0 h1:sJ3a1/SWlcuKIQ7MV+R9p0Pvo9CWsMbGZvcZQtmc68A=
|
||||||
@@ -75,6 +78,7 @@ github.com/aws/smithy-go v1.22.2 h1:6D9hW43xKFrRx/tXXfAlIZc4JI+yQe6snnWcQyxSyLQ=
|
|||||||
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
|
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
|
||||||
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
|
||||||
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
|
||||||
|
github.com/bmatcuk/doublestar v1.1.1/go.mod h1:UD6OnuiIn0yFxxA2le/rnRU1G4RaI4UvFv1sNto9p6w=
|
||||||
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs=
|
||||||
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
github.com/bsm/ginkgo/v2 v2.12.0/go.mod h1:SwYbGRRDovPVboqFv0tPTcG1sN61LM1Z4ARdbAV9g4c=
|
||||||
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
|
||||||
@@ -298,6 +302,7 @@ github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfC
|
|||||||
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
github.com/json-iterator/go v1.1.7/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
|
||||||
github.com/jsummers/gobmp v0.0.0-20230614200233-a9de23ed2e25 h1:YLvr1eE6cdCqjOe972w/cYF+FjW34v27+9Vo5106B4M=
|
github.com/jsummers/gobmp v0.0.0-20230614200233-a9de23ed2e25 h1:YLvr1eE6cdCqjOe972w/cYF+FjW34v27+9Vo5106B4M=
|
||||||
github.com/jsummers/gobmp v0.0.0-20230614200233-a9de23ed2e25/go.mod h1:kLgvv7o6UM+0QSf0QjAse3wReFDsb9qbZJdfexWlrQw=
|
github.com/jsummers/gobmp v0.0.0-20230614200233-a9de23ed2e25/go.mod h1:kLgvv7o6UM+0QSf0QjAse3wReFDsb9qbZJdfexWlrQw=
|
||||||
|
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d/go.mod h1:2PavIy+JPciBPrBUjwbNvtwB6RQlve+hkpll6QSNmOE=
|
||||||
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
|
github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8=
|
||||||
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
|
github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg=
|
||||||
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
|
||||||
@@ -383,6 +388,8 @@ github.com/nicksnyder/go-i18n/v2 v2.5.1/go.mod h1:DrhgsSDZxoAfvVrBVLXoxZn/pN5TXq
|
|||||||
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
|
||||||
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
|
||||||
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
|
||||||
|
github.com/oapi-codegen/runtime v1.1.2 h1:P2+CubHq8fO4Q6fV1tqDBZHCwpVpvPg7oKiYzQgXIyI=
|
||||||
|
github.com/oapi-codegen/runtime v1.1.2/go.mod h1:SK9X900oXmPWilYR5/WKPzt3Kqxn/uS/+lbpREv+eCg=
|
||||||
github.com/okta/okta-sdk-golang/v2 v2.18.0 h1:cfDasMb7CShbZvOrF6n+DnLevWwiHgedWMGJ8M8xKDc=
|
github.com/okta/okta-sdk-golang/v2 v2.18.0 h1:cfDasMb7CShbZvOrF6n+DnLevWwiHgedWMGJ8M8xKDc=
|
||||||
github.com/okta/okta-sdk-golang/v2 v2.18.0/go.mod h1:dz30v3ctAiMb7jpsCngGfQUAEGm1/NsWT92uTbNDQIs=
|
github.com/okta/okta-sdk-golang/v2 v2.18.0/go.mod h1:dz30v3ctAiMb7jpsCngGfQUAEGm1/NsWT92uTbNDQIs=
|
||||||
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
|
||||||
@@ -483,6 +490,7 @@ github.com/spf13/cobra v1.7.0 h1:hyqWnYt1ZQShIddO5kBpj3vu05/++x6tJ6dg8EC572I=
|
|||||||
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
|
github.com/spf13/cobra v1.7.0/go.mod h1:uLxZILRyS/50WlhOIKD7W6V5bgeIt+4sICxh6uRMrb0=
|
||||||
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
|
||||||
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
|
||||||
|
github.com/spkg/bom v0.0.0-20160624110644-59b7046e48ad/go.mod h1:qLr4V1qq6nMqFKkMo8ZTx3f+BZEkzsRUY10Xsm2mwU0=
|
||||||
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c h1:km8GpoQut05eY3GiYWEedbTT0qnSxrCjsVbb7yKY1KE=
|
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c h1:km8GpoQut05eY3GiYWEedbTT0qnSxrCjsVbb7yKY1KE=
|
||||||
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c/go.mod h1:cNQ3dwVJtS5Hmnjxy6AgTPd0Inb3pW05ftPSX7NZO7Q=
|
github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c/go.mod h1:cNQ3dwVJtS5Hmnjxy6AgTPd0Inb3pW05ftPSX7NZO7Q=
|
||||||
github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef h1:Ch6Q+AZUxDBCVqdkI8FSpFyZDtCVBc2VmejdNrm5rRQ=
|
github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef h1:Ch6Q+AZUxDBCVqdkI8FSpFyZDtCVBc2VmejdNrm5rRQ=
|
||||||
|
|||||||
@@ -145,7 +145,7 @@ func (s *BaseServer) GRPCServer() *grpc.Server {
|
|||||||
}
|
}
|
||||||
|
|
||||||
gRPCAPIHandler := grpc.NewServer(gRPCOpts...)
|
gRPCAPIHandler := grpc.NewServer(gRPCOpts...)
|
||||||
srv, err := nbgrpc.NewServer(s.config, s.AccountManager(), s.SettingsManager(), s.PeersUpdateManager(), s.SecretsManager(), s.Metrics(), s.EphemeralManager(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController())
|
srv, err := nbgrpc.NewServer(s.config, s.AccountManager(), s.SettingsManager(), s.PeersUpdateManager(), s.JobManager(), s.SecretsManager(), s.Metrics(), s.EphemeralManager(), s.AuthManager(), s.IntegratedValidator(), s.NetworkMapController())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to create management server: %v", err)
|
log.Fatalf("failed to create management server: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
"github.com/netbirdio/management-integrations/integrations"
|
"github.com/netbirdio/management-integrations/integrations"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
||||||
nmapcontroller "github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
nmapcontroller "github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
||||||
@@ -14,6 +15,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/server/auth"
|
"github.com/netbirdio/netbird/management/server/auth"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/integrated_validator"
|
"github.com/netbirdio/netbird/management/server/integrations/integrated_validator"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
"github.com/netbirdio/netbird/management/server/peers/ephemeral"
|
"github.com/netbirdio/netbird/management/server/peers/ephemeral"
|
||||||
"github.com/netbirdio/netbird/management/server/peers/ephemeral/manager"
|
"github.com/netbirdio/netbird/management/server/peers/ephemeral/manager"
|
||||||
)
|
)
|
||||||
@@ -24,6 +26,12 @@ func (s *BaseServer) PeersUpdateManager() network_map.PeersUpdateManager {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *BaseServer) JobManager() *job.Manager {
|
||||||
|
return Create(s, func() *job.Manager {
|
||||||
|
return job.NewJobManager(s.Metrics(), s.Store())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func (s *BaseServer) IntegratedValidator() integrated_validator.IntegratedValidator {
|
func (s *BaseServer) IntegratedValidator() integrated_validator.IntegratedValidator {
|
||||||
return Create(s, func() integrated_validator.IntegratedValidator {
|
return Create(s, func() integrated_validator.IntegratedValidator {
|
||||||
integratedPeerValidator, err := integrations.NewIntegratedValidator(
|
integratedPeerValidator, err := integrations.NewIntegratedValidator(
|
||||||
|
|||||||
@@ -66,7 +66,7 @@ func (s *BaseServer) PeersManager() peers.Manager {
|
|||||||
|
|
||||||
func (s *BaseServer) AccountManager() account.Manager {
|
func (s *BaseServer) AccountManager() account.Manager {
|
||||||
return Create(s, func() account.Manager {
|
return Create(s, func() account.Manager {
|
||||||
accountManager, err := server.BuildManager(context.Background(), s.config, s.Store(), s.NetworkMapController(), s.IdpManager(), s.mgmtSingleAccModeDomain, s.EventStore(), s.GeoLocationManager(), s.userDeleteFromIDPEnabled, s.IntegratedValidator(), s.Metrics(), s.ProxyController(), s.SettingsManager(), s.PermissionsManager(), s.config.DisableDefaultPolicy)
|
accountManager, err := server.BuildManager(context.Background(), s.config, s.Store(), s.NetworkMapController(), s.JobManager(), s.IdpManager(), s.mgmtSingleAccModeDomain, s.EventStore(), s.GeoLocationManager(), s.userDeleteFromIDPEnabled, s.IntegratedValidator(), s.Metrics(), s.ProxyController(), s.SettingsManager(), s.PermissionsManager(), s.config.DisableDefaultPolicy)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to create account manager: %v", err)
|
log.Fatalf("failed to create account manager: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"os"
|
"os"
|
||||||
@@ -24,6 +25,7 @@ import (
|
|||||||
|
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map"
|
||||||
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
|
nbconfig "github.com/netbirdio/netbird/management/internals/server/config"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
"github.com/netbirdio/netbird/management/server/peers/ephemeral"
|
"github.com/netbirdio/netbird/management/server/peers/ephemeral"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/integrated_validator"
|
"github.com/netbirdio/netbird/management/server/integrations/integrated_validator"
|
||||||
@@ -58,6 +60,7 @@ type Server struct {
|
|||||||
wgKey wgtypes.Key
|
wgKey wgtypes.Key
|
||||||
proto.UnimplementedManagementServiceServer
|
proto.UnimplementedManagementServiceServer
|
||||||
peersUpdateManager network_map.PeersUpdateManager
|
peersUpdateManager network_map.PeersUpdateManager
|
||||||
|
jobManager *job.Manager
|
||||||
config *nbconfig.Config
|
config *nbconfig.Config
|
||||||
secretsManager SecretsManager
|
secretsManager SecretsManager
|
||||||
appMetrics telemetry.AppMetrics
|
appMetrics telemetry.AppMetrics
|
||||||
@@ -83,6 +86,7 @@ func NewServer(
|
|||||||
accountManager account.Manager,
|
accountManager account.Manager,
|
||||||
settingsManager settings.Manager,
|
settingsManager settings.Manager,
|
||||||
peersUpdateManager network_map.PeersUpdateManager,
|
peersUpdateManager network_map.PeersUpdateManager,
|
||||||
|
jobManager *job.Manager,
|
||||||
secretsManager SecretsManager,
|
secretsManager SecretsManager,
|
||||||
appMetrics telemetry.AppMetrics,
|
appMetrics telemetry.AppMetrics,
|
||||||
ephemeralManager ephemeral.Manager,
|
ephemeralManager ephemeral.Manager,
|
||||||
@@ -123,6 +127,7 @@ func NewServer(
|
|||||||
wgKey: key,
|
wgKey: key,
|
||||||
// peerKey -> event channel
|
// peerKey -> event channel
|
||||||
peersUpdateManager: peersUpdateManager,
|
peersUpdateManager: peersUpdateManager,
|
||||||
|
jobManager: jobManager,
|
||||||
accountManager: accountManager,
|
accountManager: accountManager,
|
||||||
settingsManager: settingsManager,
|
settingsManager: settingsManager,
|
||||||
config: config,
|
config: config,
|
||||||
@@ -176,6 +181,43 @@ func getRealIP(ctx context.Context) net.IP {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) Job(srv proto.ManagementService_JobServer) error {
|
||||||
|
reqStart := time.Now()
|
||||||
|
ctx := srv.Context()
|
||||||
|
|
||||||
|
peerKey, err := s.handleHandshake(ctx, srv)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
accountID, err := s.accountManager.GetAccountIDForPeerKey(ctx, peerKey.String())
|
||||||
|
if err != nil {
|
||||||
|
// nolint:staticcheck
|
||||||
|
ctx = context.WithValue(ctx, nbContext.AccountIDKey, "UNKNOWN")
|
||||||
|
log.WithContext(ctx).Tracef("peer %s is not registered", peerKey.String())
|
||||||
|
if errStatus, ok := internalStatus.FromError(err); ok && errStatus.Type() == internalStatus.NotFound {
|
||||||
|
return status.Errorf(codes.PermissionDenied, "peer is not registered")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// nolint:staticcheck
|
||||||
|
ctx = context.WithValue(ctx, nbContext.AccountIDKey, accountID)
|
||||||
|
peer, err := s.accountManager.GetStore().GetPeerByPeerPubKey(ctx, store.LockingStrengthNone, peerKey.String())
|
||||||
|
if err != nil {
|
||||||
|
return status.Errorf(codes.Unauthenticated, "peer is not registered")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start background response handler
|
||||||
|
s.startResponseReceiver(ctx, srv)
|
||||||
|
|
||||||
|
// Prepare per-peer state
|
||||||
|
updates := s.jobManager.CreateJobChannel(ctx, accountID, peer.ID)
|
||||||
|
log.WithContext(ctx).Debugf("Job: took %v", time.Since(reqStart))
|
||||||
|
|
||||||
|
// Main loop: forward jobs to client
|
||||||
|
return s.sendJobsLoop(ctx, accountID, peerKey, peer, updates, srv)
|
||||||
|
}
|
||||||
|
|
||||||
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
// Sync validates the existence of a connecting peer, sends an initial state (all available for the connecting peers) and
|
||||||
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
// notifies the connected peer of any updates (e.g. new peers under the same account)
|
||||||
func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_SyncServer) error {
|
||||||
@@ -269,6 +311,7 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prepare per-peer state
|
||||||
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
|
updates := s.peersUpdateManager.CreateChannel(ctx, peer.ID)
|
||||||
|
|
||||||
s.ephemeralManager.OnPeerConnected(ctx, peer)
|
s.ephemeralManager.OnPeerConnected(ctx, peer)
|
||||||
@@ -287,6 +330,76 @@ func (s *Server) Sync(req *proto.EncryptedMessage, srv proto.ManagementService_S
|
|||||||
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
|
return s.handleUpdates(ctx, accountID, peerKey, peer, updates, srv)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleHandshake(ctx context.Context, srv proto.ManagementService_JobServer) (wgtypes.Key, error) {
|
||||||
|
hello, err := srv.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return wgtypes.Key{}, status.Errorf(codes.InvalidArgument, "missing hello: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobReq := &proto.JobRequest{}
|
||||||
|
peerKey, err := s.parseRequest(ctx, hello, jobReq)
|
||||||
|
if err != nil {
|
||||||
|
return wgtypes.Key{}, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return peerKey, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) startResponseReceiver(ctx context.Context, srv proto.ManagementService_JobServer) {
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
msg, err := srv.Recv()
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Warnf("recv job response error: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
jobResp := &proto.JobResponse{}
|
||||||
|
if _, err := s.parseRequest(ctx, msg, jobResp); err != nil {
|
||||||
|
log.WithContext(ctx).Warnf("invalid job response: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.jobManager.HandleResponse(ctx, jobResp); err != nil {
|
||||||
|
log.WithContext(ctx).Errorf("handle job response failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) sendJobsLoop(
|
||||||
|
ctx context.Context,
|
||||||
|
accountID string,
|
||||||
|
peerKey wgtypes.Key,
|
||||||
|
peer *nbpeer.Peer,
|
||||||
|
updates <-chan *job.Event,
|
||||||
|
srv proto.ManagementService_JobServer,
|
||||||
|
) error {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case job, open := <-updates:
|
||||||
|
if !open {
|
||||||
|
log.WithContext(ctx).Debugf("jobs channel for peer %s was closed", peerKey.String())
|
||||||
|
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := s.sendJob(ctx, accountID, peerKey, peer, job, srv); err != nil {
|
||||||
|
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
|
||||||
|
log.WithContext(ctx).Warnf("send job failed: %v", err)
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
// happens when connection drops, e.g. client disconnects
|
||||||
|
log.WithContext(ctx).Debugf("stream of peer %s has been closed", peerKey.String())
|
||||||
|
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// handleUpdates sends updates to the connected peer until the updates channel is closed.
|
// handleUpdates sends updates to the connected peer until the updates channel is closed.
|
||||||
func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *network_map.UpdateMessage, srv proto.ManagementService_SyncServer) error {
|
func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, updates chan *network_map.UpdateMessage, srv proto.ManagementService_SyncServer) error {
|
||||||
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
|
log.WithContext(ctx).Tracef("starting to handle updates for peer %s", peerKey.String())
|
||||||
@@ -304,7 +417,6 @@ func (s *Server) handleUpdates(ctx context.Context, accountID string, peerKey wg
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
|
log.WithContext(ctx).Debugf("received an update for peer %s", peerKey.String())
|
||||||
|
|
||||||
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
|
if err := s.sendUpdate(ctx, accountID, peerKey, peer, update, srv); err != nil {
|
||||||
log.WithContext(ctx).Debugf("error while sending an update to peer %s: %v", peerKey.String(), err)
|
log.WithContext(ctx).Debugf("error while sending an update to peer %s: %v", peerKey.String(), err)
|
||||||
return err
|
return err
|
||||||
@@ -328,7 +440,7 @@ func (s *Server) sendUpdate(ctx context.Context, accountID string, peerKey wgtyp
|
|||||||
s.cancelPeerRoutines(ctx, accountID, peer)
|
s.cancelPeerRoutines(ctx, accountID, peer)
|
||||||
return status.Errorf(codes.Internal, "failed processing update message")
|
return status.Errorf(codes.Internal, "failed processing update message")
|
||||||
}
|
}
|
||||||
err = srv.SendMsg(&proto.EncryptedMessage{
|
err = srv.Send(&proto.EncryptedMessage{
|
||||||
WgPubKey: s.wgKey.PublicKey().String(),
|
WgPubKey: s.wgKey.PublicKey().String(),
|
||||||
Body: encryptedResp,
|
Body: encryptedResp,
|
||||||
})
|
})
|
||||||
@@ -340,6 +452,27 @@ func (s *Server) sendUpdate(ctx context.Context, accountID string, peerKey wgtyp
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// sendJob encrypts the update message using the peer key and the server's wireguard key,
|
||||||
|
// then sends the encrypted message to the connected peer via the sync server.
|
||||||
|
func (s *Server) sendJob(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, job *job.Event, srv proto.ManagementService_JobServer) error {
|
||||||
|
encryptedResp, err := encryption.EncryptMessage(peerKey, s.wgKey, job.Request)
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(ctx).Errorf("failed to encrypt job for peer %s: %v", peerKey.String(), err)
|
||||||
|
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
|
||||||
|
return status.Errorf(codes.Internal, "failed processing job message")
|
||||||
|
}
|
||||||
|
err = srv.Send(&proto.EncryptedMessage{
|
||||||
|
WgPubKey: s.wgKey.PublicKey().String(),
|
||||||
|
Body: encryptedResp,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
s.jobManager.CloseChannel(ctx, accountID, peer.ID)
|
||||||
|
return status.Errorf(codes.Internal, "failed sending job message")
|
||||||
|
}
|
||||||
|
log.WithContext(ctx).Debugf("sent a job to peer: %s", peerKey.String())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) cancelPeerRoutines(ctx context.Context, accountID string, peer *nbpeer.Peer) {
|
func (s *Server) cancelPeerRoutines(ctx context.Context, accountID string, peer *nbpeer.Peer) {
|
||||||
unlock := s.acquirePeerLockByUID(ctx, peer.Key)
|
unlock := s.acquirePeerLockByUID(ctx, peer.Key)
|
||||||
defer unlock()
|
defer unlock()
|
||||||
@@ -686,8 +819,8 @@ func (s *Server) IsHealthy(ctx context.Context, req *proto.Empty) (*proto.Empty,
|
|||||||
// sendInitialSync sends initial proto.SyncResponse to the peer requesting synchronization
|
// sendInitialSync sends initial proto.SyncResponse to the peer requesting synchronization
|
||||||
func (s *Server) sendInitialSync(ctx context.Context, peerKey wgtypes.Key, peer *nbpeer.Peer, networkMap *types.NetworkMap, postureChecks []*posture.Checks, srv proto.ManagementService_SyncServer, dnsFwdPort int64) error {
|
func (s *Server) sendInitialSync(ctx context.Context, peerKey wgtypes.Key, peer *nbpeer.Peer, networkMap *types.NetworkMap, postureChecks []*posture.Checks, srv proto.ManagementService_SyncServer, dnsFwdPort int64) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
var turnToken *Token
|
var turnToken *Token
|
||||||
|
|
||||||
if s.config.TURNConfig != nil && s.config.TURNConfig.TimeBasedCredentials {
|
if s.config.TURNConfig != nil && s.config.TURNConfig.TimeBasedCredentials {
|
||||||
turnToken, err = s.secretsManager.GenerateTurnToken()
|
turnToken, err = s.secretsManager.GenerateTurnToken()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
"github.com/netbirdio/netbird/shared/auth"
|
"github.com/netbirdio/netbird/shared/auth"
|
||||||
|
|
||||||
cacheStore "github.com/eko/gocache/lib/v4/store"
|
cacheStore "github.com/eko/gocache/lib/v4/store"
|
||||||
@@ -71,6 +72,7 @@ type DefaultAccountManager struct {
|
|||||||
// cacheLoading keeps the accountIDs that are currently reloading. The accountID has to be removed once cache has been reloaded
|
// cacheLoading keeps the accountIDs that are currently reloading. The accountID has to be removed once cache has been reloaded
|
||||||
cacheLoading map[string]chan struct{}
|
cacheLoading map[string]chan struct{}
|
||||||
networkMapController network_map.Controller
|
networkMapController network_map.Controller
|
||||||
|
jobManager *job.Manager
|
||||||
idpManager idp.Manager
|
idpManager idp.Manager
|
||||||
cacheManager *nbcache.AccountUserDataCache
|
cacheManager *nbcache.AccountUserDataCache
|
||||||
externalCacheManager nbcache.UserDataCache
|
externalCacheManager nbcache.UserDataCache
|
||||||
@@ -180,6 +182,7 @@ func BuildManager(
|
|||||||
config *nbconfig.Config,
|
config *nbconfig.Config,
|
||||||
store store.Store,
|
store store.Store,
|
||||||
networkMapController network_map.Controller,
|
networkMapController network_map.Controller,
|
||||||
|
jobManager *job.Manager,
|
||||||
idpManager idp.Manager,
|
idpManager idp.Manager,
|
||||||
singleAccountModeDomain string,
|
singleAccountModeDomain string,
|
||||||
eventStore activity.Store,
|
eventStore activity.Store,
|
||||||
@@ -202,6 +205,7 @@ func BuildManager(
|
|||||||
config: config,
|
config: config,
|
||||||
geo: geo,
|
geo: geo,
|
||||||
networkMapController: networkMapController,
|
networkMapController: networkMapController,
|
||||||
|
jobManager: jobManager,
|
||||||
idpManager: idpManager,
|
idpManager: idpManager,
|
||||||
ctx: context.Background(),
|
ctx: context.Background(),
|
||||||
cacheMux: sync.Mutex{},
|
cacheMux: sync.Mutex{},
|
||||||
|
|||||||
@@ -124,5 +124,8 @@ type Manager interface {
|
|||||||
UpdateToPrimaryAccount(ctx context.Context, accountId string) error
|
UpdateToPrimaryAccount(ctx context.Context, accountId string) error
|
||||||
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
|
GetOwnerInfo(ctx context.Context, accountId string) (*types.UserInfo, error)
|
||||||
GetCurrentUserInfo(ctx context.Context, userAuth auth.UserAuth) (*users.UserInfoWithPermissions, error)
|
GetCurrentUserInfo(ctx context.Context, userAuth auth.UserAuth) (*users.UserInfoWithPermissions, error)
|
||||||
|
CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error
|
||||||
|
GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error)
|
||||||
|
GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error)
|
||||||
SetEphemeralManager(em ephemeral.Manager)
|
SetEphemeralManager(em ephemeral.Manager)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -31,6 +31,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
|
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
|
||||||
"github.com/netbirdio/netbird/management/server/idp"
|
"github.com/netbirdio/netbird/management/server/idp"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types"
|
resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types"
|
||||||
routerTypes "github.com/netbirdio/netbird/management/server/networks/routers/types"
|
routerTypes "github.com/netbirdio/netbird/management/server/networks/routers/types"
|
||||||
networkTypes "github.com/netbirdio/netbird/management/server/networks/types"
|
networkTypes "github.com/netbirdio/netbird/management/server/networks/types"
|
||||||
@@ -2959,7 +2960,7 @@ func createManager(t testing.TB) (*DefaultAccountManager, *update_channel.PeersU
|
|||||||
updateManager := update_channel.NewPeersUpdateManager(metrics)
|
updateManager := update_channel.NewPeersUpdateManager(metrics)
|
||||||
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
||||||
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
||||||
manager, err := BuildManager(ctx, nil, store, networkMapController, nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
manager, err := BuildManager(ctx, nil, store, networkMapController, job.NewJobManager(nil, store), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -177,8 +177,10 @@ const (
|
|||||||
|
|
||||||
AccountNetworkRangeUpdated Activity = 87
|
AccountNetworkRangeUpdated Activity = 87
|
||||||
PeerIPUpdated Activity = 88
|
PeerIPUpdated Activity = 88
|
||||||
UserApproved Activity = 89
|
|
||||||
UserRejected Activity = 90
|
UserApproved Activity = 89
|
||||||
|
UserRejected Activity = 90
|
||||||
|
JobCreatedByUser Activity = 91
|
||||||
|
|
||||||
AccountDeleted Activity = 99999
|
AccountDeleted Activity = 99999
|
||||||
)
|
)
|
||||||
@@ -288,6 +290,8 @@ var activityMap = map[Activity]Code{
|
|||||||
PeerIPUpdated: {"Peer IP updated", "peer.ip.update"},
|
PeerIPUpdated: {"Peer IP updated", "peer.ip.update"},
|
||||||
UserApproved: {"User approved", "user.approve"},
|
UserApproved: {"User approved", "user.approve"},
|
||||||
UserRejected: {"User rejected", "user.reject"},
|
UserRejected: {"User rejected", "user.reject"},
|
||||||
|
|
||||||
|
JobCreatedByUser: {"Create Job for peer", "peer.job.create"},
|
||||||
}
|
}
|
||||||
|
|
||||||
// StringCode returns a string code of the activity
|
// StringCode returns a string code of the activity
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
"github.com/netbirdio/netbird/management/server/permissions"
|
"github.com/netbirdio/netbird/management/server/permissions"
|
||||||
"github.com/netbirdio/netbird/management/server/settings"
|
"github.com/netbirdio/netbird/management/server/settings"
|
||||||
"github.com/netbirdio/netbird/management/server/store"
|
"github.com/netbirdio/netbird/management/server/store"
|
||||||
@@ -224,7 +225,7 @@ func createDNSManager(t *testing.T) (*DefaultAccountManager, error) {
|
|||||||
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
||||||
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.test", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.test", port_forwarding.NewControllerMock())
|
||||||
|
|
||||||
return BuildManager(context.Background(), nil, store, networkMapController, nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
return BuildManager(context.Background(), nil, store, networkMapController, job.NewJobManager(nil, store), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createDNSStore(t *testing.T) (store.Store, error) {
|
func createDNSStore(t *testing.T) (store.Store, error) {
|
||||||
|
|||||||
@@ -35,6 +35,9 @@ func AddEndpoints(accountManager account.Manager, router *mux.Router, networkMap
|
|||||||
Methods("GET", "PUT", "DELETE", "OPTIONS")
|
Methods("GET", "PUT", "DELETE", "OPTIONS")
|
||||||
router.HandleFunc("/peers/{peerId}/accessible-peers", peersHandler.GetAccessiblePeers).Methods("GET", "OPTIONS")
|
router.HandleFunc("/peers/{peerId}/accessible-peers", peersHandler.GetAccessiblePeers).Methods("GET", "OPTIONS")
|
||||||
router.HandleFunc("/peers/{peerId}/temporary-access", peersHandler.CreateTemporaryAccess).Methods("POST", "OPTIONS")
|
router.HandleFunc("/peers/{peerId}/temporary-access", peersHandler.CreateTemporaryAccess).Methods("POST", "OPTIONS")
|
||||||
|
router.HandleFunc("/peers/{peerId}/jobs", peersHandler.ListJobs).Methods("GET", "OPTIONS")
|
||||||
|
router.HandleFunc("/peers/{peerId}/jobs", peersHandler.CreateJob).Methods("POST", "OPTIONS")
|
||||||
|
router.HandleFunc("/peers/{peerId}/jobs/{jobId}", peersHandler.GetJob).Methods("GET", "OPTIONS")
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHandler creates a new peers Handler
|
// NewHandler creates a new peers Handler
|
||||||
@@ -45,6 +48,99 @@ func NewHandler(accountManager account.Manager, networkMapController network_map
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *Handler) CreateJob(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
peerID := vars["peerId"]
|
||||||
|
|
||||||
|
req := &api.JobRequest{}
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(req); err != nil {
|
||||||
|
util.WriteErrorResponse("couldn't parse JSON request", http.StatusBadRequest, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
job, err := types.NewJob(userAuth.UserId, userAuth.AccountId, peerID, req)
|
||||||
|
if err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err := h.accountManager.CreatePeerJob(ctx, userAuth.AccountId, peerID, userAuth.UserId, job); err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := toSingleJobResponse(job)
|
||||||
|
if err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
util.WriteJSONObject(ctx, w, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) ListJobs(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
peerID := vars["peerId"]
|
||||||
|
|
||||||
|
jobs, err := h.accountManager.GetAllPeerJobs(ctx, userAuth.AccountId, userAuth.UserId, peerID)
|
||||||
|
if err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
respBody := make([]*api.JobResponse, 0, len(jobs))
|
||||||
|
for _, job := range jobs {
|
||||||
|
resp, err := toSingleJobResponse(job)
|
||||||
|
if err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respBody = append(respBody, resp)
|
||||||
|
}
|
||||||
|
|
||||||
|
util.WriteJSONObject(ctx, w, respBody)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *Handler) GetJob(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
userAuth, err := nbcontext.GetUserAuthFromContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
peerID := vars["peerId"]
|
||||||
|
jobID := vars["jobId"]
|
||||||
|
|
||||||
|
job, err := h.accountManager.GetPeerJobByID(ctx, userAuth.AccountId, userAuth.UserId, peerID, jobID)
|
||||||
|
if err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := toSingleJobResponse(job)
|
||||||
|
if err != nil {
|
||||||
|
util.WriteError(ctx, err, w)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
util.WriteJSONObject(ctx, w, resp)
|
||||||
|
}
|
||||||
|
|
||||||
func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
|
func (h *Handler) checkPeerStatus(peer *nbpeer.Peer) (*nbpeer.Peer, error) {
|
||||||
peerToReturn := peer.Copy()
|
peerToReturn := peer.Copy()
|
||||||
if peer.Status.Connected {
|
if peer.Status.Connected {
|
||||||
@@ -519,6 +615,28 @@ func toPeerListItemResponse(peer *nbpeer.Peer, groupsInfo []api.GroupMinimum, dn
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func toSingleJobResponse(job *types.Job) (*api.JobResponse, error) {
|
||||||
|
workload, err := job.BuildWorkloadResponse()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var failed *string
|
||||||
|
if job.FailedReason != "" {
|
||||||
|
failed = &job.FailedReason
|
||||||
|
}
|
||||||
|
|
||||||
|
return &api.JobResponse{
|
||||||
|
Id: job.ID,
|
||||||
|
CreatedAt: job.CreatedAt,
|
||||||
|
CompletedAt: job.CompletedAt,
|
||||||
|
TriggeredBy: job.TriggeredBy,
|
||||||
|
Status: api.JobResponseStatus(job.Status),
|
||||||
|
FailedReason: failed,
|
||||||
|
Workload: *workload,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func fqdn(peer *nbpeer.Peer, dnsDomain string) string {
|
func fqdn(peer *nbpeer.Peer, dnsDomain string) string {
|
||||||
fqdn := peer.FQDN(dnsDomain)
|
fqdn := peer.FQDN(dnsDomain)
|
||||||
if fqdn == "" {
|
if fqdn == "" {
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/management/server"
|
"github.com/netbirdio/netbird/management/server"
|
||||||
"github.com/netbirdio/netbird/management/server/account"
|
"github.com/netbirdio/netbird/management/server/account"
|
||||||
@@ -49,6 +50,7 @@ func BuildApiBlackBoxWithDBState(t testing_tools.TB, sqlFile string, expectedPee
|
|||||||
}
|
}
|
||||||
|
|
||||||
peersUpdateManager := update_channel.NewPeersUpdateManager(nil)
|
peersUpdateManager := update_channel.NewPeersUpdateManager(nil)
|
||||||
|
jobManager := job.NewJobManager(nil, store)
|
||||||
updMsg := peersUpdateManager.CreateChannel(context.Background(), testing_tools.TestPeerId)
|
updMsg := peersUpdateManager.CreateChannel(context.Background(), testing_tools.TestPeerId)
|
||||||
done := make(chan struct{})
|
done := make(chan struct{})
|
||||||
if validateUpdate {
|
if validateUpdate {
|
||||||
@@ -72,7 +74,7 @@ func BuildApiBlackBoxWithDBState(t testing_tools.TB, sqlFile string, expectedPee
|
|||||||
ctx := context.Background()
|
ctx := context.Background()
|
||||||
requestBuffer := server.NewAccountRequestBuffer(ctx, store)
|
requestBuffer := server.NewAccountRequestBuffer(ctx, store)
|
||||||
networkMapController := controller.NewController(ctx, store, metrics, peersUpdateManager, requestBuffer, server.MockIntegratedValidator{}, settingsManager, "", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, store, metrics, peersUpdateManager, requestBuffer, server.MockIntegratedValidator{}, settingsManager, "", port_forwarding.NewControllerMock())
|
||||||
am, err := server.BuildManager(ctx, nil, store, networkMapController, nil, "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false)
|
am, err := server.BuildManager(ctx, nil, store, networkMapController, jobManager, nil, "", &activity.InMemoryEventStore{}, geoMock, false, validatorMock, metrics, proxyController, settingsManager, permissionsManager, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create manager: %v", err)
|
t.Fatalf("Failed to create manager: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
175
management/server/job/jobChannel.go
Normal file
175
management/server/job/jobChannel.go
Normal file
@@ -0,0 +1,175 @@
|
|||||||
|
package job
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/management/server/store"
|
||||||
|
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||||
|
"github.com/netbirdio/netbird/management/server/types"
|
||||||
|
"github.com/netbirdio/netbird/shared/management/proto"
|
||||||
|
)
|
||||||
|
|
||||||
|
const jobChannelBuffer = 100
|
||||||
|
|
||||||
|
type Event struct {
|
||||||
|
PeerID string
|
||||||
|
Request *proto.JobRequest
|
||||||
|
Response *proto.JobResponse
|
||||||
|
}
|
||||||
|
|
||||||
|
type Manager struct {
|
||||||
|
mu *sync.RWMutex
|
||||||
|
jobChannels map[string]chan *Event // per-peer job streams
|
||||||
|
pending map[string]*Event // jobID → event
|
||||||
|
responseWait time.Duration
|
||||||
|
metrics telemetry.AppMetrics
|
||||||
|
Store store.Store
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *Manager {
|
||||||
|
|
||||||
|
return &Manager{
|
||||||
|
jobChannels: make(map[string]chan *Event),
|
||||||
|
pending: make(map[string]*Event),
|
||||||
|
responseWait: 5 * time.Minute,
|
||||||
|
metrics: metrics,
|
||||||
|
mu: &sync.RWMutex{},
|
||||||
|
Store: store,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateJobChannel creates or replaces a channel for a peer
|
||||||
|
func (jm *Manager) CreateJobChannel(ctx context.Context, accountID, peerID string) chan *Event {
|
||||||
|
// all pending jobs stored in db for this peer should be failed
|
||||||
|
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil {
|
||||||
|
log.WithContext(ctx).Error(err.Error())
|
||||||
|
}
|
||||||
|
|
||||||
|
jm.mu.Lock()
|
||||||
|
defer jm.mu.Unlock()
|
||||||
|
|
||||||
|
if ch, ok := jm.jobChannels[peerID]; ok {
|
||||||
|
close(ch)
|
||||||
|
delete(jm.jobChannels, peerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
ch := make(chan *Event, jobChannelBuffer)
|
||||||
|
jm.jobChannels[peerID] = ch
|
||||||
|
return ch
|
||||||
|
}
|
||||||
|
|
||||||
|
// SendJob sends a job to a peer and tracks it as pending
|
||||||
|
func (jm *Manager) SendJob(ctx context.Context, accountID, peerID string, req *proto.JobRequest) error {
|
||||||
|
jm.mu.RLock()
|
||||||
|
ch, ok := jm.jobChannels[peerID]
|
||||||
|
jm.mu.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("peer %s has no channel", peerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
event := &Event{
|
||||||
|
PeerID: peerID,
|
||||||
|
Request: req,
|
||||||
|
}
|
||||||
|
|
||||||
|
jm.mu.Lock()
|
||||||
|
jm.pending[string(req.ID)] = event
|
||||||
|
jm.mu.Unlock()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case ch <- event:
|
||||||
|
case <-time.After(jm.responseWait):
|
||||||
|
jm.cleanup(ctx, accountID, string(req.ID), "timed out")
|
||||||
|
return fmt.Errorf("job %s timed out", req.ID)
|
||||||
|
case <-ctx.Done():
|
||||||
|
jm.cleanup(ctx, accountID, string(req.ID), ctx.Err().Error())
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleResponse marks a job as finished and moves it to completed
|
||||||
|
func (jm *Manager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error {
|
||||||
|
jm.mu.Lock()
|
||||||
|
defer jm.mu.Unlock()
|
||||||
|
|
||||||
|
// todo: validate job ID and would be nice to use uuid text marshal instead of string
|
||||||
|
jobID := string(resp.ID)
|
||||||
|
|
||||||
|
event, ok := jm.pending[jobID]
|
||||||
|
if !ok {
|
||||||
|
return fmt.Errorf("job %s not found", jobID)
|
||||||
|
}
|
||||||
|
var job types.Job
|
||||||
|
if err := job.ApplyResponse(resp); err != nil {
|
||||||
|
return fmt.Errorf("invalid job response: %v", err)
|
||||||
|
}
|
||||||
|
//update or create the store for job response
|
||||||
|
err := jm.Store.CompletePeerJob(ctx, &job)
|
||||||
|
if err == nil {
|
||||||
|
event.Response = resp
|
||||||
|
}
|
||||||
|
|
||||||
|
delete(jm.pending, jobID)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// CloseChannel closes a peer’s channel and cleans up its jobs
|
||||||
|
func (jm *Manager) CloseChannel(ctx context.Context, accountID, peerID string) {
|
||||||
|
jm.mu.Lock()
|
||||||
|
defer jm.mu.Unlock()
|
||||||
|
|
||||||
|
if ch, ok := jm.jobChannels[peerID]; ok {
|
||||||
|
close(ch)
|
||||||
|
jm.jobChannels[peerID] = nil
|
||||||
|
delete(jm.jobChannels, peerID)
|
||||||
|
}
|
||||||
|
|
||||||
|
for jobID, ev := range jm.pending {
|
||||||
|
if ev.PeerID == peerID {
|
||||||
|
// if the client disconnect and there is pending job then marke it as failed
|
||||||
|
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Time out peer disconnected"); err != nil {
|
||||||
|
log.WithContext(ctx).Errorf(err.Error())
|
||||||
|
}
|
||||||
|
delete(jm.pending, jobID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanup removes a pending job safely
|
||||||
|
func (jm *Manager) cleanup(ctx context.Context, accountID, jobID string, reason string) {
|
||||||
|
jm.mu.Lock()
|
||||||
|
defer jm.mu.Unlock()
|
||||||
|
|
||||||
|
if ev, ok := jm.pending[jobID]; ok {
|
||||||
|
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, ev.PeerID, reason); err != nil {
|
||||||
|
log.WithContext(ctx).Errorf(err.Error())
|
||||||
|
}
|
||||||
|
delete(jm.pending, jobID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jm *Manager) IsPeerConnected(peerID string) bool {
|
||||||
|
jm.mu.RLock()
|
||||||
|
defer jm.mu.RUnlock()
|
||||||
|
|
||||||
|
_, ok := jm.jobChannels[peerID]
|
||||||
|
return ok
|
||||||
|
}
|
||||||
|
|
||||||
|
func (jm *Manager) IsPeerHasPendingJobs(peerID string) bool {
|
||||||
|
jm.mu.RLock()
|
||||||
|
defer jm.mu.RUnlock()
|
||||||
|
|
||||||
|
for _, ev := range jm.pending {
|
||||||
|
if ev.PeerID == peerID {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
@@ -29,6 +29,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/server/activity"
|
"github.com/netbirdio/netbird/management/server/activity"
|
||||||
"github.com/netbirdio/netbird/management/server/groups"
|
"github.com/netbirdio/netbird/management/server/groups"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||||
"github.com/netbirdio/netbird/management/server/peers/ephemeral/manager"
|
"github.com/netbirdio/netbird/management/server/peers/ephemeral/manager"
|
||||||
"github.com/netbirdio/netbird/management/server/permissions"
|
"github.com/netbirdio/netbird/management/server/permissions"
|
||||||
@@ -338,6 +339,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
jobManager := job.NewJobManager(nil, store)
|
||||||
eventStore := &activity.InMemoryEventStore{}
|
eventStore := &activity.InMemoryEventStore{}
|
||||||
|
|
||||||
ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
|
ctx := context.WithValue(context.Background(), hook.ExecutionContextKey, hook.SystemSource) //nolint:staticcheck
|
||||||
@@ -364,7 +366,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
|
|||||||
updateManager := update_channel.NewPeersUpdateManager(metrics)
|
updateManager := update_channel.NewPeersUpdateManager(metrics)
|
||||||
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
||||||
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
||||||
accountManager, err := BuildManager(ctx, nil, store, networkMapController, nil, "",
|
accountManager, err := BuildManager(ctx, nil, store, networkMapController, jobManager, nil, "",
|
||||||
eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -375,7 +377,7 @@ func startManagementForTest(t *testing.T, testFile string, config *config.Config
|
|||||||
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(updateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(updateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
||||||
|
|
||||||
ephemeralMgr := manager.NewEphemeralManager(store, accountManager)
|
ephemeralMgr := manager.NewEphemeralManager(store, accountManager)
|
||||||
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, updateManager, secretsManager, nil, ephemeralMgr, nil, MockIntegratedValidator{}, networkMapController)
|
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, updateManager, jobManager, secretsManager, nil, ephemeralMgr, nil, MockIntegratedValidator{}, networkMapController)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, "", cleanup, err
|
return nil, nil, "", cleanup, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/server/activity"
|
"github.com/netbirdio/netbird/management/server/activity"
|
||||||
"github.com/netbirdio/netbird/management/server/groups"
|
"github.com/netbirdio/netbird/management/server/groups"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
"github.com/netbirdio/netbird/management/server/peers/ephemeral/manager"
|
"github.com/netbirdio/netbird/management/server/peers/ephemeral/manager"
|
||||||
"github.com/netbirdio/netbird/management/server/permissions"
|
"github.com/netbirdio/netbird/management/server/permissions"
|
||||||
"github.com/netbirdio/netbird/management/server/settings"
|
"github.com/netbirdio/netbird/management/server/settings"
|
||||||
@@ -179,6 +180,7 @@ func startServer(
|
|||||||
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
|
log.Fatalf("failed creating a store: %s: %v", config.Datadir, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
jobManager := job.NewJobManager(nil, str)
|
||||||
eventStore := &activity.InMemoryEventStore{}
|
eventStore := &activity.InMemoryEventStore{}
|
||||||
|
|
||||||
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
metrics, err := telemetry.NewDefaultAppMetrics(context.Background())
|
||||||
@@ -212,6 +214,7 @@ func startServer(
|
|||||||
nil,
|
nil,
|
||||||
str,
|
str,
|
||||||
networkMapController,
|
networkMapController,
|
||||||
|
jobManager,
|
||||||
nil,
|
nil,
|
||||||
"",
|
"",
|
||||||
eventStore,
|
eventStore,
|
||||||
@@ -234,6 +237,7 @@ func startServer(
|
|||||||
accountManager,
|
accountManager,
|
||||||
settingsMockManager,
|
settingsMockManager,
|
||||||
updateManager,
|
updateManager,
|
||||||
|
jobManager,
|
||||||
secretsManager,
|
secretsManager,
|
||||||
nil,
|
nil,
|
||||||
&manager.EphemeralManager{},
|
&manager.EphemeralManager{},
|
||||||
|
|||||||
@@ -2,11 +2,12 @@ package mock_server
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"github.com/netbirdio/netbird/shared/auth"
|
|
||||||
"net"
|
"net"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/shared/auth"
|
||||||
|
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
|
|
||||||
@@ -129,6 +130,29 @@ type MockAccountManager struct {
|
|||||||
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
UpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||||
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
BufferUpdateAccountPeersFunc func(ctx context.Context, accountID string)
|
||||||
RecalculateNetworkMapCacheFunc func(ctx context.Context, accountId string) error
|
RecalculateNetworkMapCacheFunc func(ctx context.Context, accountId string) error
|
||||||
|
CreatePeerJobFunc func(ctx context.Context, accountID, peerID, userID string, job *types.Job) error
|
||||||
|
GetAllPeerJobsFunc func(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error)
|
||||||
|
GetPeerJobByIDFunc func(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (am *MockAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error {
|
||||||
|
if am.CreatePeerJobFunc != nil {
|
||||||
|
return am.CreatePeerJobFunc(ctx, accountID, peerID, userID, job)
|
||||||
|
}
|
||||||
|
return status.Errorf(codes.Unimplemented, "method CreatePeerJob is not implemented")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (am *MockAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) {
|
||||||
|
if am.GetAllPeerJobsFunc != nil {
|
||||||
|
return am.GetAllPeerJobsFunc(ctx, accountID, userID, peerID)
|
||||||
|
}
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method GetAllPeerJobs is not implemented")
|
||||||
|
}
|
||||||
|
func (am *MockAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) {
|
||||||
|
if am.GetPeerJobByIDFunc != nil {
|
||||||
|
return am.GetPeerJobByIDFunc(ctx, accountID, userID, peerID, jobID)
|
||||||
|
}
|
||||||
|
return nil, status.Errorf(codes.Unimplemented, "method GetPeerJobByID is not implemented")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error {
|
func (am *MockAccountManager) CreateGroup(ctx context.Context, accountID, userID string, group *types.Group) error {
|
||||||
|
|||||||
@@ -15,6 +15,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
||||||
"github.com/netbirdio/netbird/management/server/activity"
|
"github.com/netbirdio/netbird/management/server/activity"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||||
"github.com/netbirdio/netbird/management/server/permissions"
|
"github.com/netbirdio/netbird/management/server/permissions"
|
||||||
"github.com/netbirdio/netbird/management/server/settings"
|
"github.com/netbirdio/netbird/management/server/settings"
|
||||||
@@ -793,7 +794,7 @@ func createNSManager(t *testing.T) (*DefaultAccountManager, error) {
|
|||||||
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
||||||
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
||||||
|
|
||||||
return BuildManager(context.Background(), nil, store, networkMapController, nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
return BuildManager(context.Background(), nil, store, networkMapController, job.NewJobManager(nil, store), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func createNSStore(t *testing.T) (store.Store, error) {
|
func createNSStore(t *testing.T) (store.Store, error) {
|
||||||
|
|||||||
@@ -314,6 +314,129 @@ func (am *DefaultAccountManager) UpdatePeer(ctx context.Context, accountID, user
|
|||||||
return peer, nil
|
return peer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (am *DefaultAccountManager) CreatePeerJob(ctx context.Context, accountID, peerID, userID string, job *types.Job) error {
|
||||||
|
// todo: Create permissions for job
|
||||||
|
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
|
||||||
|
if err != nil {
|
||||||
|
return status.NewPermissionValidationError(err)
|
||||||
|
}
|
||||||
|
if !allowed {
|
||||||
|
return status.NewPermissionDeniedError()
|
||||||
|
}
|
||||||
|
|
||||||
|
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if peerAccountID != accountID {
|
||||||
|
return status.NewPeerNotPartOfAccountError()
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if peer connected
|
||||||
|
if !am.jobManager.IsPeerConnected(peerID) {
|
||||||
|
return status.Errorf(status.BadRequest, "peer not connected")
|
||||||
|
}
|
||||||
|
|
||||||
|
// check if already has pending jobs
|
||||||
|
if am.jobManager.IsPeerHasPendingJobs(peerID) {
|
||||||
|
return status.Errorf(status.BadRequest, "peer already has pending job")
|
||||||
|
}
|
||||||
|
|
||||||
|
jobStream, err := job.ToStreamJobRequest()
|
||||||
|
if err != nil {
|
||||||
|
return status.Errorf(status.BadRequest, "invalid job request %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// try sending job first
|
||||||
|
if err := am.jobManager.SendJob(ctx, accountID, peerID, jobStream); err != nil {
|
||||||
|
return status.Errorf(status.Internal, "failed to send job: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var peer *nbpeer.Peer
|
||||||
|
var eventsToStore func()
|
||||||
|
|
||||||
|
// persist job in DB only if send succeeded
|
||||||
|
err = am.Store.ExecuteInTransaction(ctx, func(transaction store.Store) error {
|
||||||
|
peer, err = transaction.GetPeerByID(ctx, store.LockingStrengthUpdate, accountID, peerID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := transaction.CreatePeerJob(ctx, job); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
jobMeta := map[string]any{
|
||||||
|
"for_peer_name": peer.Name,
|
||||||
|
"job_type": job.Workload.Type,
|
||||||
|
}
|
||||||
|
|
||||||
|
eventsToStore = func() {
|
||||||
|
am.StoreEvent(ctx, userID, peer.ID, accountID, activity.JobCreatedByUser, jobMeta)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
eventsToStore()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (am *DefaultAccountManager) GetAllPeerJobs(ctx context.Context, accountID, userID, peerID string) ([]*types.Job, error) {
|
||||||
|
// todo: Create permissions for job
|
||||||
|
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.NewPermissionValidationError(err)
|
||||||
|
}
|
||||||
|
if !allowed {
|
||||||
|
return nil, status.NewPermissionDeniedError()
|
||||||
|
}
|
||||||
|
|
||||||
|
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if peerAccountID != accountID {
|
||||||
|
return nil, status.NewPeerNotPartOfAccountError()
|
||||||
|
}
|
||||||
|
|
||||||
|
accountJobs, err := am.Store.GetPeerJobs(ctx, accountID, peerID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return accountJobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (am *DefaultAccountManager) GetPeerJobByID(ctx context.Context, accountID, userID, peerID, jobID string) (*types.Job, error) {
|
||||||
|
// todo: Create permissions for job
|
||||||
|
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.NewPermissionValidationError(err)
|
||||||
|
}
|
||||||
|
if !allowed {
|
||||||
|
return nil, status.NewPermissionDeniedError()
|
||||||
|
}
|
||||||
|
|
||||||
|
peerAccountID, err := am.Store.GetAccountIDByPeerID(ctx, store.LockingStrengthNone, peerID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if peerAccountID != accountID {
|
||||||
|
return nil, status.NewPeerNotPartOfAccountError()
|
||||||
|
}
|
||||||
|
|
||||||
|
job, err := am.Store.GetPeerJobByID(ctx, accountID, jobID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return job, nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeletePeer removes peer from the account by its IP
|
// DeletePeer removes peer from the account by its IP
|
||||||
func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peerID, userID string) error {
|
func (am *DefaultAccountManager) DeletePeer(ctx context.Context, accountID, peerID, userID string) error {
|
||||||
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
|
allowed, err := am.permissionsManager.ValidateUserPermissions(ctx, accountID, userID, modules.Peers, operations.Delete)
|
||||||
|
|||||||
@@ -32,6 +32,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/internals/shared/grpc"
|
"github.com/netbirdio/netbird/management/internals/shared/grpc"
|
||||||
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
|
"github.com/netbirdio/netbird/management/server/http/testing/testing_tools"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
"github.com/netbirdio/netbird/management/server/permissions"
|
"github.com/netbirdio/netbird/management/server/permissions"
|
||||||
"github.com/netbirdio/netbird/management/server/settings"
|
"github.com/netbirdio/netbird/management/server/settings"
|
||||||
"github.com/netbirdio/netbird/shared/management/status"
|
"github.com/netbirdio/netbird/shared/management/status"
|
||||||
@@ -1228,7 +1229,7 @@ func TestToSyncResponse(t *testing.T) {
|
|||||||
assert.Equal(t, "route1", response.NetworkMap.Routes[0].NetID)
|
assert.Equal(t, "route1", response.NetworkMap.Routes[0].NetID)
|
||||||
// assert network map DNSConfig
|
// assert network map DNSConfig
|
||||||
assert.Equal(t, true, response.NetworkMap.DNSConfig.ServiceEnable)
|
assert.Equal(t, true, response.NetworkMap.DNSConfig.ServiceEnable)
|
||||||
assert.Equal(t, int64(dnsForwarderPort), response.NetworkMap.DNSConfig.ForwarderPort)
|
assert.Equal(t, int64(dnsForwarderPort), response.NetworkMap.DNSConfig.ForwarderPort) //nolint
|
||||||
assert.Equal(t, 1, len(response.NetworkMap.DNSConfig.CustomZones))
|
assert.Equal(t, 1, len(response.NetworkMap.DNSConfig.CustomZones))
|
||||||
assert.Equal(t, 2, len(response.NetworkMap.DNSConfig.NameServerGroups))
|
assert.Equal(t, 2, len(response.NetworkMap.DNSConfig.NameServerGroups))
|
||||||
// assert network map DNSConfig.CustomZones
|
// assert network map DNSConfig.CustomZones
|
||||||
@@ -1292,7 +1293,7 @@ func Test_RegisterPeerByUser(t *testing.T) {
|
|||||||
requestBuffer := NewAccountRequestBuffer(ctx, s)
|
requestBuffer := NewAccountRequestBuffer(ctx, s)
|
||||||
networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
||||||
|
|
||||||
am, err := BuildManager(context.Background(), nil, s, networkMapController, nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
||||||
@@ -1377,7 +1378,7 @@ func Test_RegisterPeerBySetupKey(t *testing.T) {
|
|||||||
requestBuffer := NewAccountRequestBuffer(ctx, s)
|
requestBuffer := NewAccountRequestBuffer(ctx, s)
|
||||||
networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
||||||
|
|
||||||
am, err := BuildManager(context.Background(), nil, s, networkMapController, nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
||||||
@@ -1530,7 +1531,7 @@ func Test_RegisterPeerRollbackOnFailure(t *testing.T) {
|
|||||||
requestBuffer := NewAccountRequestBuffer(ctx, s)
|
requestBuffer := NewAccountRequestBuffer(ctx, s)
|
||||||
networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
||||||
|
|
||||||
am, err := BuildManager(context.Background(), nil, s, networkMapController, nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
||||||
@@ -1610,7 +1611,7 @@ func Test_LoginPeer(t *testing.T) {
|
|||||||
requestBuffer := NewAccountRequestBuffer(ctx, s)
|
requestBuffer := NewAccountRequestBuffer(ctx, s)
|
||||||
networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, s, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.cloud", port_forwarding.NewControllerMock())
|
||||||
|
|
||||||
am, err := BuildManager(context.Background(), nil, s, networkMapController, nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
am, err := BuildManager(context.Background(), nil, s, networkMapController, job.NewJobManager(nil, s), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
|
|
||||||
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
existingAccountID := "bf1c8084-ba50-4ce7-9439-34653001fc3b"
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
||||||
"github.com/netbirdio/netbird/management/server/activity"
|
"github.com/netbirdio/netbird/management/server/activity"
|
||||||
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
"github.com/netbirdio/netbird/management/server/integrations/port_forwarding"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types"
|
resourceTypes "github.com/netbirdio/netbird/management/server/networks/resources/types"
|
||||||
routerTypes "github.com/netbirdio/netbird/management/server/networks/routers/types"
|
routerTypes "github.com/netbirdio/netbird/management/server/networks/routers/types"
|
||||||
networkTypes "github.com/netbirdio/netbird/management/server/networks/types"
|
networkTypes "github.com/netbirdio/netbird/management/server/networks/types"
|
||||||
@@ -1292,7 +1293,7 @@ func createRouterManager(t *testing.T) (*DefaultAccountManager, *update_channel.
|
|||||||
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
requestBuffer := NewAccountRequestBuffer(ctx, store)
|
||||||
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
||||||
|
|
||||||
am, err := BuildManager(context.Background(), nil, store, networkMapController, nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
am, err := BuildManager(context.Background(), nil, store, networkMapController, job.NewJobManager(nil, store), nil, "", eventStore, nil, false, MockIntegratedValidator{}, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManager, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -41,14 +41,15 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
storeSqliteFileName = "store.db"
|
storeSqliteFileName = "store.db"
|
||||||
idQueryCondition = "id = ?"
|
idQueryCondition = "id = ?"
|
||||||
keyQueryCondition = "key = ?"
|
keyQueryCondition = "key = ?"
|
||||||
mysqlKeyQueryCondition = "`key` = ?"
|
mysqlKeyQueryCondition = "`key` = ?"
|
||||||
accountAndIDQueryCondition = "account_id = ? and id = ?"
|
accountAndIDQueryCondition = "account_id = ? and id = ?"
|
||||||
accountAndIDsQueryCondition = "account_id = ? AND id IN ?"
|
accountAndPeerIDQueryCondition = "account_id = ? and peer_id = ?"
|
||||||
accountIDCondition = "account_id = ?"
|
accountAndIDsQueryCondition = "account_id = ? AND id IN ?"
|
||||||
peerNotFoundFMT = "peer %s not found"
|
accountIDCondition = "account_id = ?"
|
||||||
|
peerNotFoundFMT = "peer %s not found"
|
||||||
|
|
||||||
pgMaxConnections = 30
|
pgMaxConnections = 30
|
||||||
pgMinConnections = 1
|
pgMinConnections = 1
|
||||||
@@ -113,6 +114,7 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine types.Engine, met
|
|||||||
&types.Account{}, &types.Policy{}, &types.PolicyRule{}, &route.Route{}, &nbdns.NameServerGroup{},
|
&types.Account{}, &types.Policy{}, &types.PolicyRule{}, &route.Route{}, &nbdns.NameServerGroup{},
|
||||||
&installation{}, &types.ExtraSettings{}, &posture.Checks{}, &nbpeer.NetworkAddress{},
|
&installation{}, &types.ExtraSettings{}, &posture.Checks{}, &nbpeer.NetworkAddress{},
|
||||||
&networkTypes.Network{}, &routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{}, &types.AccountOnboarding{},
|
&networkTypes.Network{}, &routerTypes.NetworkRouter{}, &resourceTypes.NetworkResource{}, &types.AccountOnboarding{},
|
||||||
|
&types.Job{},
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("auto migratePreAuto: %w", err)
|
return nil, fmt.Errorf("auto migratePreAuto: %w", err)
|
||||||
@@ -131,6 +133,79 @@ func GetKeyQueryCondition(s *SqlStore) string {
|
|||||||
return keyQueryCondition
|
return keyQueryCondition
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SaveJob persists a job in DB
|
||||||
|
func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error {
|
||||||
|
result := s.db.Create(job)
|
||||||
|
if result.Error != nil {
|
||||||
|
log.WithContext(ctx).Errorf("failed to create job in store: %s", result.Error)
|
||||||
|
return status.Errorf(status.Internal, "failed to create job in store")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *SqlStore) CompletePeerJob(ctx context.Context, job *types.Job) error {
|
||||||
|
result := s.db.
|
||||||
|
Model(&types.Job{}).
|
||||||
|
Where(idQueryCondition, job.ID).
|
||||||
|
Updates(job)
|
||||||
|
|
||||||
|
if result.Error != nil {
|
||||||
|
log.WithContext(ctx).Errorf("failed to update job in store: %s", result.Error)
|
||||||
|
return status.Errorf(status.Internal, "failed to update job in store")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// job was pending for too long and has been cancelled
|
||||||
|
func (s *SqlStore) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error {
|
||||||
|
now := time.Now().UTC()
|
||||||
|
result := s.db.
|
||||||
|
Model(&types.Job{}).
|
||||||
|
Where(accountAndPeerIDQueryCondition+" AND status = ?", accountID, peerID, types.JobStatusPending).
|
||||||
|
Updates(types.Job{
|
||||||
|
Status: types.JobStatusFailed,
|
||||||
|
FailedReason: reason,
|
||||||
|
CompletedAt: &now,
|
||||||
|
})
|
||||||
|
if result.Error != nil {
|
||||||
|
log.WithContext(ctx).Errorf("failed to mark pending jobs as Failed job in store: %s", result.Error)
|
||||||
|
return status.Errorf(status.Internal, "failed to mark pending job as Failed in store")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetJobByID fetches job by ID
|
||||||
|
func (s *SqlStore) GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) {
|
||||||
|
var job types.Job
|
||||||
|
err := s.db.
|
||||||
|
Where(accountAndIDQueryCondition, accountID, jobID).
|
||||||
|
First(&job).Error
|
||||||
|
if errors.Is(err, gorm.ErrRecordNotFound) {
|
||||||
|
return nil, status.Errorf(status.NotFound, "job %s not found", jobID)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(ctx).Errorf("failed to fetch job from store: %s", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return &job, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// get all jobs
|
||||||
|
func (s *SqlStore) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) {
|
||||||
|
var jobs []*types.Job
|
||||||
|
err := s.db.
|
||||||
|
Where(accountAndPeerIDQueryCondition, accountID, peerID).
|
||||||
|
Order("created_at DESC").
|
||||||
|
Find(&jobs).Error
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
log.WithContext(ctx).Errorf("failed to fetch jobs from store: %s", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobs, nil
|
||||||
|
}
|
||||||
|
|
||||||
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock
|
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock
|
||||||
func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
|
func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
|
||||||
log.WithContext(ctx).Tracef("acquiring global lock")
|
log.WithContext(ctx).Tracef("acquiring global lock")
|
||||||
|
|||||||
@@ -203,6 +203,11 @@ type Store interface {
|
|||||||
MarkAccountPrimary(ctx context.Context, accountID string) error
|
MarkAccountPrimary(ctx context.Context, accountID string) error
|
||||||
UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error
|
UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error
|
||||||
GetPolicyRulesByResourceID(ctx context.Context, lockStrength LockingStrength, accountID string, peerID string) ([]*types.PolicyRule, error)
|
GetPolicyRulesByResourceID(ctx context.Context, lockStrength LockingStrength, accountID string, peerID string) ([]*types.PolicyRule, error)
|
||||||
|
CreatePeerJob(ctx context.Context, job *types.Job) error
|
||||||
|
CompletePeerJob(ctx context.Context, job *types.Job) error
|
||||||
|
GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error)
|
||||||
|
GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error)
|
||||||
|
MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error
|
||||||
}
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|||||||
228
management/server/types/job.go
Normal file
228
management/server/types/job.go
Normal file
@@ -0,0 +1,228 @@
|
|||||||
|
package types
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/google/uuid"
|
||||||
|
|
||||||
|
"github.com/netbirdio/netbird/shared/management/http/api"
|
||||||
|
"github.com/netbirdio/netbird/shared/management/proto"
|
||||||
|
"github.com/netbirdio/netbird/shared/management/status"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobStatus string
|
||||||
|
|
||||||
|
const (
|
||||||
|
JobStatusPending JobStatus = "pending"
|
||||||
|
JobStatusSucceeded JobStatus = "succeeded"
|
||||||
|
JobStatusFailed JobStatus = "failed"
|
||||||
|
)
|
||||||
|
|
||||||
|
type JobType string
|
||||||
|
|
||||||
|
const (
|
||||||
|
JobTypeBundle JobType = "bundle"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// MaxJobReasonLength is the maximum length allowed for job failure reasons
|
||||||
|
MaxJobReasonLength = 4096
|
||||||
|
)
|
||||||
|
|
||||||
|
type Job struct {
|
||||||
|
// ID is the primary identifier
|
||||||
|
ID string `gorm:"primaryKey"`
|
||||||
|
|
||||||
|
// CreatedAt when job was created (UTC)
|
||||||
|
CreatedAt time.Time `gorm:"autoCreateTime"`
|
||||||
|
|
||||||
|
// CompletedAt when job finished, null if still running
|
||||||
|
CompletedAt *time.Time
|
||||||
|
|
||||||
|
// TriggeredBy user that triggered this job
|
||||||
|
TriggeredBy string `gorm:"index"`
|
||||||
|
|
||||||
|
PeerID string `gorm:"index"`
|
||||||
|
|
||||||
|
AccountID string `gorm:"index"`
|
||||||
|
|
||||||
|
// Status of the job: pending, succeeded, failed
|
||||||
|
Status JobStatus `gorm:"index;type:varchar(50)"`
|
||||||
|
|
||||||
|
// FailedReason describes why the job failed (if failed)
|
||||||
|
FailedReason string
|
||||||
|
|
||||||
|
Workload Workload `gorm:"embedded;embeddedPrefix:workload_"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type Workload struct {
|
||||||
|
Type JobType `gorm:"column:workload_type;index;type:varchar(50)"`
|
||||||
|
Parameters json.RawMessage `gorm:"type:json"`
|
||||||
|
Result json.RawMessage `gorm:"type:json"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewJob creates a new job with default fields and validation
|
||||||
|
func NewJob(triggeredBy, accountID, peerID string, req *api.JobRequest) (*Job, error) {
|
||||||
|
if req == nil {
|
||||||
|
return nil, status.Errorf(status.BadRequest, "job request cannot be nil")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine job type
|
||||||
|
jobTypeStr, err := req.Workload.Discriminator()
|
||||||
|
if err != nil {
|
||||||
|
return nil, status.Errorf(status.BadRequest, "could not determine job type: %v", err)
|
||||||
|
}
|
||||||
|
jobType := JobType(jobTypeStr)
|
||||||
|
|
||||||
|
if jobType == "" {
|
||||||
|
return nil, status.Errorf(status.BadRequest, "job type is required")
|
||||||
|
}
|
||||||
|
|
||||||
|
var workload Workload
|
||||||
|
|
||||||
|
switch jobType {
|
||||||
|
case JobTypeBundle:
|
||||||
|
if err := validateAndBuildBundleParams(req.Workload, &workload); err != nil {
|
||||||
|
return nil, status.Errorf(status.BadRequest, "%v", err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return nil, status.Errorf(status.BadRequest, "unsupported job type: %s", jobType)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Job{
|
||||||
|
ID: uuid.New().String(),
|
||||||
|
TriggeredBy: triggeredBy,
|
||||||
|
PeerID: peerID,
|
||||||
|
AccountID: accountID,
|
||||||
|
Status: JobStatusPending,
|
||||||
|
CreatedAt: time.Now().UTC(),
|
||||||
|
Workload: workload,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) BuildWorkloadResponse() (*api.WorkloadResponse, error) {
|
||||||
|
var wl api.WorkloadResponse
|
||||||
|
|
||||||
|
switch j.Workload.Type {
|
||||||
|
case JobTypeBundle:
|
||||||
|
if err := j.buildBundleResponse(&wl); err != nil {
|
||||||
|
return nil, status.Errorf(status.InvalidArgument, err.Error())
|
||||||
|
}
|
||||||
|
return &wl, nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) buildBundleResponse(wl *api.WorkloadResponse) error {
|
||||||
|
var p api.BundleParameters
|
||||||
|
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
|
||||||
|
return fmt.Errorf("invalid parameters for bundle job: %w", err)
|
||||||
|
}
|
||||||
|
var r api.BundleResult
|
||||||
|
if err := json.Unmarshal(j.Workload.Result, &r); err != nil {
|
||||||
|
return fmt.Errorf("invalid result for bundle job: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := wl.FromBundleWorkloadResponse(api.BundleWorkloadResponse{
|
||||||
|
Type: api.WorkloadTypeBundle,
|
||||||
|
Parameters: p,
|
||||||
|
Result: r,
|
||||||
|
}); err != nil {
|
||||||
|
return fmt.Errorf("unknown job parameters: %v", err)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func validateAndBuildBundleParams(req api.WorkloadRequest, workload *Workload) error {
|
||||||
|
bundle, err := req.AsBundleWorkloadRequest()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("invalid parameters for bundle job")
|
||||||
|
}
|
||||||
|
// validate bundle_for_time <= 5 minutes if BundleFor is enabled
|
||||||
|
if bundle.Parameters.BundleFor && (bundle.Parameters.BundleForTime < 1 || bundle.Parameters.BundleForTime > 5) {
|
||||||
|
return fmt.Errorf("bundle_for_time must be between 1 and 5, got %d", bundle.Parameters.BundleForTime)
|
||||||
|
}
|
||||||
|
// validate log-file-count ≥ 1 and ≤ 1000
|
||||||
|
if bundle.Parameters.LogFileCount < 1 || bundle.Parameters.LogFileCount > 1000 {
|
||||||
|
return fmt.Errorf("log-file-count must be between 1 and 1000, got %d", bundle.Parameters.LogFileCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
workload.Parameters, err = json.Marshal(bundle.Parameters)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal workload parameters: %w", err)
|
||||||
|
}
|
||||||
|
workload.Result = []byte("{}")
|
||||||
|
workload.Type = JobType(api.WorkloadTypeBundle)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ApplyResponse validates and maps a proto.JobResponse into the Job fields.
|
||||||
|
func (j *Job) ApplyResponse(resp *proto.JobResponse) error {
|
||||||
|
if resp == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
j.ID = string(resp.ID)
|
||||||
|
now := time.Now().UTC()
|
||||||
|
j.CompletedAt = &now
|
||||||
|
switch resp.Status {
|
||||||
|
case proto.JobStatus_succeeded:
|
||||||
|
j.Status = JobStatusSucceeded
|
||||||
|
case proto.JobStatus_failed:
|
||||||
|
j.Status = JobStatusFailed
|
||||||
|
if len(resp.Reason) > 0 {
|
||||||
|
reason := string(resp.Reason)
|
||||||
|
if len(resp.Reason) > MaxJobReasonLength {
|
||||||
|
reason = string(resp.Reason[:MaxJobReasonLength]) + "... (truncated)"
|
||||||
|
}
|
||||||
|
j.FailedReason = fmt.Sprintf("Client error: '%s'", reason)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unexpected job status: %v", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handle workload results (oneof)
|
||||||
|
var err error
|
||||||
|
switch r := resp.WorkloadResults.(type) {
|
||||||
|
case *proto.JobResponse_Bundle:
|
||||||
|
if j.Workload.Result, err = json.Marshal(r.Bundle); err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal workload results: %w", err)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
return fmt.Errorf("unsupported workload response type: %T", r)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) ToStreamJobRequest() (*proto.JobRequest, error) {
|
||||||
|
switch j.Workload.Type {
|
||||||
|
case JobTypeBundle:
|
||||||
|
return j.buildStreamBundleResponse()
|
||||||
|
default:
|
||||||
|
return nil, status.Errorf(status.InvalidArgument, "unknown job type: %v", j.Workload.Type)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (j *Job) buildStreamBundleResponse() (*proto.JobRequest, error) {
|
||||||
|
var p api.BundleParameters
|
||||||
|
if err := json.Unmarshal(j.Workload.Parameters, &p); err != nil {
|
||||||
|
return nil, fmt.Errorf("invalid parameters for bundle job: %w", err)
|
||||||
|
}
|
||||||
|
return &proto.JobRequest{
|
||||||
|
ID: []byte(j.ID),
|
||||||
|
WorkloadParameters: &proto.JobRequest_Bundle{
|
||||||
|
Bundle: &proto.BundleParameters{
|
||||||
|
BundleFor: p.BundleFor,
|
||||||
|
BundleForTime: int64(p.BundleForTime),
|
||||||
|
LogFileCount: int32(p.LogFileCount),
|
||||||
|
Anonymize: p.Anonymize,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
@@ -14,6 +14,7 @@ import (
|
|||||||
type Client interface {
|
type Client interface {
|
||||||
io.Closer
|
io.Closer
|
||||||
Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error
|
Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error
|
||||||
|
Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
|
||||||
GetServerPublicKey() (*wgtypes.Key, error)
|
GetServerPublicKey() (*wgtypes.Key, error)
|
||||||
Register(serverKey wgtypes.Key, setupKey string, jwtToken string, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
|
Register(serverKey wgtypes.Key, setupKey string, jwtToken string, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
|
||||||
Login(serverKey wgtypes.Key, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
|
Login(serverKey wgtypes.Key, sysInfo *system.Info, sshKey []byte, dnsLabels domain.List) (*proto.LoginResponse, error)
|
||||||
|
|||||||
@@ -22,6 +22,7 @@ import (
|
|||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/controller"
|
||||||
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
"github.com/netbirdio/netbird/management/internals/controllers/network_map/update_channel"
|
||||||
nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc"
|
nbgrpc "github.com/netbirdio/netbird/management/internals/shared/grpc"
|
||||||
|
"github.com/netbirdio/netbird/management/server/job"
|
||||||
|
|
||||||
"github.com/netbirdio/netbird/client/system"
|
"github.com/netbirdio/netbird/client/system"
|
||||||
"github.com/netbirdio/netbird/encryption"
|
"github.com/netbirdio/netbird/encryption"
|
||||||
@@ -72,6 +73,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
|||||||
}
|
}
|
||||||
t.Cleanup(cleanUp)
|
t.Cleanup(cleanUp)
|
||||||
|
|
||||||
|
jobManager := job.NewJobManager(nil, store)
|
||||||
eventStore := &activity.InMemoryEventStore{}
|
eventStore := &activity.InMemoryEventStore{}
|
||||||
|
|
||||||
ctrl := gomock.NewController(t)
|
ctrl := gomock.NewController(t)
|
||||||
@@ -118,7 +120,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
|||||||
updateManager := update_channel.NewPeersUpdateManager(metrics)
|
updateManager := update_channel.NewPeersUpdateManager(metrics)
|
||||||
requestBuffer := mgmt.NewAccountRequestBuffer(ctx, store)
|
requestBuffer := mgmt.NewAccountRequestBuffer(ctx, store)
|
||||||
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, mgmt.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
networkMapController := controller.NewController(ctx, store, metrics, updateManager, requestBuffer, mgmt.MockIntegratedValidator{}, settingsMockManager, "netbird.selfhosted", port_forwarding.NewControllerMock())
|
||||||
accountManager, err := mgmt.BuildManager(context.Background(), config, store, networkMapController, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
accountManager, err := mgmt.BuildManager(context.Background(), config, store, networkMapController, jobManager, nil, "", eventStore, nil, false, ia, metrics, port_forwarding.NewControllerMock(), settingsMockManager, permissionsManagerMock, false)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -126,7 +128,7 @@ func startManagement(t *testing.T) (*grpc.Server, net.Listener) {
|
|||||||
groupsManager := groups.NewManagerMock()
|
groupsManager := groups.NewManagerMock()
|
||||||
|
|
||||||
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(updateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
secretsManager := nbgrpc.NewTimeBasedAuthSecretsManager(updateManager, config.TURNConfig, config.Relay, settingsMockManager, groupsManager)
|
||||||
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, updateManager, secretsManager, nil, &manager.EphemeralManager{}, nil, mgmt.MockIntegratedValidator{}, networkMapController)
|
mgmtServer, err := nbgrpc.NewServer(config, accountManager, settingsMockManager, updateManager, jobManager, secretsManager, nil, &manager.EphemeralManager{}, nil, mgmt.MockIntegratedValidator{}, networkMapController)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import (
|
|||||||
gstatus "google.golang.org/grpc/status"
|
gstatus "google.golang.org/grpc/status"
|
||||||
|
|
||||||
"github.com/cenkalti/backoff/v4"
|
"github.com/cenkalti/backoff/v4"
|
||||||
|
"github.com/google/uuid"
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@@ -111,6 +112,25 @@ func (c *GrpcClient) ready() bool {
|
|||||||
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
|
// Sync wraps the real client's Sync endpoint call and takes care of retries and encryption/decryption of messages
|
||||||
// Blocking request. The result will be sent via msgHandler callback function
|
// Blocking request. The result will be sent via msgHandler callback function
|
||||||
func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
|
func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
|
||||||
|
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
|
||||||
|
return c.handleSyncStream(ctx, serverPubKey, sysInfo, msgHandler)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Job wraps the real client's Job endpoint call and takes care of retries and encryption/decryption of messages
|
||||||
|
// Blocking request. The result will be sent via msgHandler callback function
|
||||||
|
func (c *GrpcClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
|
||||||
|
return c.withMgmtStream(ctx, func(ctx context.Context, serverPubKey wgtypes.Key) error {
|
||||||
|
return c.handleJobStream(ctx, serverPubKey, msgHandler)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// withMgmtStream runs a streaming operation against the ManagementService
|
||||||
|
// It takes care of retries, connection readiness, and fetching server public key.
|
||||||
|
func (c *GrpcClient) withMgmtStream(
|
||||||
|
ctx context.Context,
|
||||||
|
handler func(ctx context.Context, serverPubKey wgtypes.Key) error,
|
||||||
|
) error {
|
||||||
operation := func() error {
|
operation := func() error {
|
||||||
log.Debugf("management connection state %v", c.conn.GetState())
|
log.Debugf("management connection state %v", c.conn.GetState())
|
||||||
connState := c.conn.GetState()
|
connState := c.conn.GetState()
|
||||||
@@ -128,7 +148,7 @@ func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
return c.handleStream(ctx, *serverPubKey, sysInfo, msgHandler)
|
return handler(ctx, *serverPubKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
err := backoff.Retry(operation, defaultBackoff(ctx))
|
err := backoff.Retry(operation, defaultBackoff(ctx))
|
||||||
@@ -139,12 +159,146 @@ func (c *GrpcClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *GrpcClient) handleStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info,
|
func (c *GrpcClient) handleJobStream(
|
||||||
msgHandler func(msg *proto.SyncResponse) error) error {
|
ctx context.Context,
|
||||||
|
serverPubKey wgtypes.Key,
|
||||||
|
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
|
||||||
|
) error {
|
||||||
ctx, cancelStream := context.WithCancel(ctx)
|
ctx, cancelStream := context.WithCancel(ctx)
|
||||||
defer cancelStream()
|
defer cancelStream()
|
||||||
|
|
||||||
stream, err := c.connectToStream(ctx, serverPubKey, sysInfo)
|
stream, err := c.realClient.Job(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to open job stream: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Handshake with the server
|
||||||
|
if err := c.sendHandshake(ctx, stream, serverPubKey); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debug("job stream handshake sent successfully")
|
||||||
|
|
||||||
|
// Main loop: receive, process, respond
|
||||||
|
for {
|
||||||
|
jobReq, err := c.receiveJobRequest(ctx, stream, serverPubKey)
|
||||||
|
if err != nil {
|
||||||
|
c.notifyDisconnected(err)
|
||||||
|
s, _ := gstatus.FromError(err)
|
||||||
|
switch s.Code() {
|
||||||
|
case codes.PermissionDenied:
|
||||||
|
return backoff.Permanent(err) // unrecoverable error, propagate to the upper layer
|
||||||
|
case codes.Canceled:
|
||||||
|
log.Debugf("management connection context has been canceled, this usually indicates shutdown")
|
||||||
|
return err
|
||||||
|
case codes.Unimplemented:
|
||||||
|
log.Warn("Job feature is not supported by the current management server version. " +
|
||||||
|
"Please update the management service to use this feature.")
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
log.Warnf("disconnected from the Management service but will retry silently. Reason: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if jobReq == nil || len(jobReq.ID) == 0 {
|
||||||
|
log.Debug("received unknown or empty job request, skipping")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("received a new job from the management server (ID: %s)", jobReq.ID)
|
||||||
|
jobResp := c.processJobRequest(ctx, jobReq, msgHandler)
|
||||||
|
if err := c.sendJobResponse(ctx, stream, serverPubKey, jobResp); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendHandshake sends the initial handshake message
|
||||||
|
func (c *GrpcClient) sendHandshake(ctx context.Context, stream proto.ManagementService_JobClient, serverPubKey wgtypes.Key) error {
|
||||||
|
handshakeReq := &proto.JobRequest{
|
||||||
|
ID: []byte(uuid.New().String()),
|
||||||
|
}
|
||||||
|
encHello, err := encryption.EncryptMessage(serverPubKey, c.key, handshakeReq)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to encrypt handshake message: %v", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return stream.Send(&proto.EncryptedMessage{
|
||||||
|
WgPubKey: c.key.PublicKey().String(),
|
||||||
|
Body: encHello,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// receiveJobRequest waits for and decrypts a job request
|
||||||
|
func (c *GrpcClient) receiveJobRequest(
|
||||||
|
ctx context.Context,
|
||||||
|
stream proto.ManagementService_JobClient,
|
||||||
|
serverPubKey wgtypes.Key,
|
||||||
|
) (*proto.JobRequest, error) {
|
||||||
|
encryptedMsg, err := stream.Recv()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
jobReq := &proto.JobRequest{}
|
||||||
|
if err := encryption.DecryptMessage(serverPubKey, c.key, encryptedMsg.Body, jobReq); err != nil {
|
||||||
|
log.Warnf("failed to decrypt job request: %v", err)
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return jobReq, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// processJobRequest executes the handler and ensures a valid response
|
||||||
|
func (c *GrpcClient) processJobRequest(
|
||||||
|
ctx context.Context,
|
||||||
|
jobReq *proto.JobRequest,
|
||||||
|
msgHandler func(msg *proto.JobRequest) *proto.JobResponse,
|
||||||
|
) *proto.JobResponse {
|
||||||
|
jobResp := msgHandler(jobReq)
|
||||||
|
if jobResp == nil {
|
||||||
|
jobResp = &proto.JobResponse{
|
||||||
|
ID: jobReq.ID,
|
||||||
|
Status: proto.JobStatus_failed,
|
||||||
|
Reason: []byte("handler returned nil response"),
|
||||||
|
}
|
||||||
|
log.Warnf("job handler returned nil for job %s", string(jobReq.ID))
|
||||||
|
}
|
||||||
|
return jobResp
|
||||||
|
}
|
||||||
|
|
||||||
|
// sendJobResponse encrypts and sends a job response
|
||||||
|
func (c *GrpcClient) sendJobResponse(
|
||||||
|
ctx context.Context,
|
||||||
|
stream proto.ManagementService_JobClient,
|
||||||
|
serverPubKey wgtypes.Key,
|
||||||
|
resp *proto.JobResponse,
|
||||||
|
) error {
|
||||||
|
encResp, err := encryption.EncryptMessage(serverPubKey, c.key, resp)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("failed to encrypt job response for job %s: %v", string(resp.ID), err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := stream.Send(&proto.EncryptedMessage{
|
||||||
|
WgPubKey: c.key.PublicKey().String(),
|
||||||
|
Body: encResp,
|
||||||
|
}); err != nil {
|
||||||
|
log.Errorf("failed to send job response for job %s: %v", string(resp.ID), err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Infof("job response sent for job %s (status: %s)", string(resp.ID), resp.Status.String())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *GrpcClient) handleSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info, msgHandler func(msg *proto.SyncResponse) error) error {
|
||||||
|
ctx, cancelStream := context.WithCancel(ctx)
|
||||||
|
defer cancelStream()
|
||||||
|
|
||||||
|
stream, err := c.connectToSyncStream(ctx, serverPubKey, sysInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("failed to open Management Service stream: %s", err)
|
log.Debugf("failed to open Management Service stream: %s", err)
|
||||||
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
|
if s, ok := gstatus.FromError(err); ok && s.Code() == codes.PermissionDenied {
|
||||||
@@ -157,7 +311,7 @@ func (c *GrpcClient) handleStream(ctx context.Context, serverPubKey wgtypes.Key,
|
|||||||
c.notifyConnected()
|
c.notifyConnected()
|
||||||
|
|
||||||
// blocking until error
|
// blocking until error
|
||||||
err = c.receiveEvents(stream, serverPubKey, msgHandler)
|
err = c.receiveUpdatesEvents(stream, serverPubKey, msgHandler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.notifyDisconnected(err)
|
c.notifyDisconnected(err)
|
||||||
s, _ := gstatus.FromError(err)
|
s, _ := gstatus.FromError(err)
|
||||||
@@ -186,7 +340,7 @@ func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, err
|
|||||||
|
|
||||||
ctx, cancelStream := context.WithCancel(c.ctx)
|
ctx, cancelStream := context.WithCancel(c.ctx)
|
||||||
defer cancelStream()
|
defer cancelStream()
|
||||||
stream, err := c.connectToStream(ctx, *serverPubKey, sysInfo)
|
stream, err := c.connectToSyncStream(ctx, *serverPubKey, sysInfo)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Debugf("failed to open Management Service stream: %s", err)
|
log.Debugf("failed to open Management Service stream: %s", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
@@ -219,7 +373,7 @@ func (c *GrpcClient) GetNetworkMap(sysInfo *system.Info) (*proto.NetworkMap, err
|
|||||||
return decryptedResp.GetNetworkMap(), nil
|
return decryptedResp.GetNetworkMap(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
|
func (c *GrpcClient) connectToSyncStream(ctx context.Context, serverPubKey wgtypes.Key, sysInfo *system.Info) (proto.ManagementService_SyncClient, error) {
|
||||||
req := &proto.SyncRequest{Meta: infoToMetaData(sysInfo)}
|
req := &proto.SyncRequest{Meta: infoToMetaData(sysInfo)}
|
||||||
|
|
||||||
myPrivateKey := c.key
|
myPrivateKey := c.key
|
||||||
@@ -238,7 +392,7 @@ func (c *GrpcClient) connectToStream(ctx context.Context, serverPubKey wgtypes.K
|
|||||||
return sync, nil
|
return sync, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *GrpcClient) receiveEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
|
func (c *GrpcClient) receiveUpdatesEvents(stream proto.ManagementService_SyncClient, serverPubKey wgtypes.Key, msgHandler func(msg *proto.SyncResponse) error) error {
|
||||||
for {
|
for {
|
||||||
update, err := stream.Recv()
|
update, err := stream.Recv()
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ type MockClient struct {
|
|||||||
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
|
GetPKCEAuthorizationFlowFunc func(serverKey wgtypes.Key) (*proto.PKCEAuthorizationFlow, error)
|
||||||
SyncMetaFunc func(sysInfo *system.Info) error
|
SyncMetaFunc func(sysInfo *system.Info) error
|
||||||
LogoutFunc func() error
|
LogoutFunc func() error
|
||||||
|
JobFunc func(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *MockClient) IsHealthy() bool {
|
func (m *MockClient) IsHealthy() bool {
|
||||||
@@ -40,6 +41,13 @@ func (m *MockClient) Sync(ctx context.Context, sysInfo *system.Info, msgHandler
|
|||||||
return m.SyncFunc(ctx, sysInfo, msgHandler)
|
return m.SyncFunc(ctx, sysInfo, msgHandler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *MockClient) Job(ctx context.Context, msgHandler func(msg *proto.JobRequest) *proto.JobResponse) error {
|
||||||
|
if m.JobFunc == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return m.JobFunc(ctx, msgHandler)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *MockClient) GetServerPublicKey() (*wgtypes.Key, error) {
|
func (m *MockClient) GetServerPublicKey() (*wgtypes.Key, error) {
|
||||||
if m.GetServerPublicKeyFunc == nil {
|
if m.GetServerPublicKeyFunc == nil {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
|||||||
@@ -11,6 +11,6 @@ fi
|
|||||||
old_pwd=$(pwd)
|
old_pwd=$(pwd)
|
||||||
script_path=$(dirname $(realpath "$0"))
|
script_path=$(dirname $(realpath "$0"))
|
||||||
cd "$script_path"
|
cd "$script_path"
|
||||||
go install github.com/deepmap/oapi-codegen/cmd/oapi-codegen@4a1477f6a8ba6ca8115cc23bb2fb67f0b9fca18e
|
go install github.com/oapi-codegen/oapi-codegen/v2/cmd/oapi-codegen@latest
|
||||||
oapi-codegen --config cfg.yaml openapi.yml
|
oapi-codegen --config cfg.yaml openapi.yml
|
||||||
cd "$old_pwd"
|
cd "$old_pwd"
|
||||||
|
|||||||
@@ -32,8 +32,128 @@ tags:
|
|||||||
- name: Ingress Ports
|
- name: Ingress Ports
|
||||||
description: Interact with and view information about the ingress peers and ports.
|
description: Interact with and view information about the ingress peers and ports.
|
||||||
x-cloud-only: true
|
x-cloud-only: true
|
||||||
|
- name: Jobs
|
||||||
|
description: Interact with and view information about remote jobs.
|
||||||
|
x-experimental: true
|
||||||
|
|
||||||
components:
|
components:
|
||||||
schemas:
|
schemas:
|
||||||
|
WorkloadType:
|
||||||
|
type: string
|
||||||
|
description: |
|
||||||
|
Identifies the type of workload the job will execute.
|
||||||
|
Currently only `"bundle"` is supported.
|
||||||
|
enum:
|
||||||
|
- bundle
|
||||||
|
example: "bundle"
|
||||||
|
BundleParameters:
|
||||||
|
type: object
|
||||||
|
description: These parameters control what gets included in the bundle and how it is processed.
|
||||||
|
properties:
|
||||||
|
bundle_for:
|
||||||
|
type: boolean
|
||||||
|
description: Whether to generate a bundle for the given timeframe.
|
||||||
|
example: true
|
||||||
|
bundle_for_time:
|
||||||
|
type: integer
|
||||||
|
minimum: 1
|
||||||
|
maximum: 5
|
||||||
|
description: Time period in minutes for which to generate the bundle.
|
||||||
|
example: 2
|
||||||
|
log_file_count:
|
||||||
|
type: integer
|
||||||
|
minimum: 1
|
||||||
|
maximum: 1000
|
||||||
|
description: Maximum number of log files to include in the bundle.
|
||||||
|
example: 100
|
||||||
|
anonymize:
|
||||||
|
type: boolean
|
||||||
|
description: Whether sensitive data should be anonymized in the bundle.
|
||||||
|
example: false
|
||||||
|
required:
|
||||||
|
- bundle_for
|
||||||
|
- bundle_for_time
|
||||||
|
- log_file_count
|
||||||
|
- anonymize
|
||||||
|
BundleResult:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
upload_key:
|
||||||
|
type: string
|
||||||
|
example: "upload_key_123"
|
||||||
|
nullable: true
|
||||||
|
BundleWorkloadRequest:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
type:
|
||||||
|
$ref: '#/components/schemas/WorkloadType'
|
||||||
|
parameters:
|
||||||
|
$ref: '#/components/schemas/BundleParameters'
|
||||||
|
required:
|
||||||
|
- type
|
||||||
|
- parameters
|
||||||
|
BundleWorkloadResponse:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
type:
|
||||||
|
$ref: '#/components/schemas/WorkloadType'
|
||||||
|
parameters:
|
||||||
|
$ref: '#/components/schemas/BundleParameters'
|
||||||
|
result:
|
||||||
|
$ref: '#/components/schemas/BundleResult'
|
||||||
|
required:
|
||||||
|
- type
|
||||||
|
- parameters
|
||||||
|
- result
|
||||||
|
WorkloadRequest:
|
||||||
|
oneOf:
|
||||||
|
- $ref: '#/components/schemas/BundleWorkloadRequest'
|
||||||
|
discriminator:
|
||||||
|
propertyName: type
|
||||||
|
mapping:
|
||||||
|
bundle: '#/components/schemas/BundleWorkloadRequest'
|
||||||
|
WorkloadResponse:
|
||||||
|
oneOf:
|
||||||
|
- $ref: '#/components/schemas/BundleWorkloadResponse'
|
||||||
|
discriminator:
|
||||||
|
propertyName: type
|
||||||
|
mapping:
|
||||||
|
bundle: '#/components/schemas/BundleWorkloadResponse'
|
||||||
|
JobRequest:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
workload:
|
||||||
|
$ref: '#/components/schemas/WorkloadRequest'
|
||||||
|
required:
|
||||||
|
- workload
|
||||||
|
JobResponse:
|
||||||
|
type: object
|
||||||
|
properties:
|
||||||
|
id:
|
||||||
|
type: string
|
||||||
|
created_at:
|
||||||
|
type: string
|
||||||
|
format: date-time
|
||||||
|
completed_at:
|
||||||
|
type: string
|
||||||
|
format: date-time
|
||||||
|
nullable: true
|
||||||
|
triggered_by:
|
||||||
|
type: string
|
||||||
|
status:
|
||||||
|
type: string
|
||||||
|
enum: [pending, succeeded, failed]
|
||||||
|
failed_reason:
|
||||||
|
type: string
|
||||||
|
nullable: true
|
||||||
|
workload:
|
||||||
|
$ref: '#/components/schemas/WorkloadResponse'
|
||||||
|
required:
|
||||||
|
- id
|
||||||
|
- created_at
|
||||||
|
- status
|
||||||
|
- triggered_by
|
||||||
|
- workload
|
||||||
Account:
|
Account:
|
||||||
type: object
|
type: object
|
||||||
properties:
|
properties:
|
||||||
@@ -2230,6 +2350,110 @@ security:
|
|||||||
- BearerAuth: [ ]
|
- BearerAuth: [ ]
|
||||||
- TokenAuth: [ ]
|
- TokenAuth: [ ]
|
||||||
paths:
|
paths:
|
||||||
|
/api/peers/{peerId}/jobs:
|
||||||
|
get:
|
||||||
|
summary: List Jobs
|
||||||
|
description: Retrieve all jobs for a given peer
|
||||||
|
tags: [ Jobs ]
|
||||||
|
security:
|
||||||
|
- BearerAuth: []
|
||||||
|
- TokenAuth: []
|
||||||
|
parameters:
|
||||||
|
- in: path
|
||||||
|
name: peerId
|
||||||
|
description: The unique identifier of a peer
|
||||||
|
required: true
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
responses:
|
||||||
|
'200':
|
||||||
|
description: List of jobs
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
type: array
|
||||||
|
items:
|
||||||
|
$ref: '#/components/schemas/JobResponse'
|
||||||
|
'400':
|
||||||
|
$ref: '#/components/responses/bad_request'
|
||||||
|
'401':
|
||||||
|
$ref: '#/components/responses/requires_authentication'
|
||||||
|
'403':
|
||||||
|
$ref: '#/components/responses/forbidden'
|
||||||
|
'500':
|
||||||
|
$ref: '#/components/responses/internal_error'
|
||||||
|
post:
|
||||||
|
summary: Create Job
|
||||||
|
description: Create a new job for a given peer
|
||||||
|
tags: [ Jobs ]
|
||||||
|
security:
|
||||||
|
- BearerAuth: []
|
||||||
|
- TokenAuth: []
|
||||||
|
parameters:
|
||||||
|
- in: path
|
||||||
|
name: peerId
|
||||||
|
description: The unique identifier of a peer
|
||||||
|
required: true
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
requestBody:
|
||||||
|
description: Create job request
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: '#/components/schemas/JobRequest'
|
||||||
|
required: true
|
||||||
|
responses:
|
||||||
|
'201':
|
||||||
|
description: Job created
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: '#/components/schemas/JobResponse'
|
||||||
|
'400':
|
||||||
|
"$ref": "#/components/responses/bad_request"
|
||||||
|
'401':
|
||||||
|
"$ref": "#/components/responses/requires_authentication"
|
||||||
|
'403':
|
||||||
|
"$ref": "#/components/responses/forbidden"
|
||||||
|
'500':
|
||||||
|
"$ref": "#/components/responses/internal_error"
|
||||||
|
/api/peers/{peerId}/jobs/{jobId}:
|
||||||
|
get:
|
||||||
|
summary: Get Job
|
||||||
|
description: Retrieve details of a specific job
|
||||||
|
tags: [ Jobs ]
|
||||||
|
security:
|
||||||
|
- BearerAuth: []
|
||||||
|
- TokenAuth: []
|
||||||
|
parameters:
|
||||||
|
- in: path
|
||||||
|
name: peerId
|
||||||
|
required: true
|
||||||
|
description: The unique identifier of a peer
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
- in: path
|
||||||
|
name: jobId
|
||||||
|
required: true
|
||||||
|
description: The unique identifier of a job
|
||||||
|
schema:
|
||||||
|
type: string
|
||||||
|
responses:
|
||||||
|
'200':
|
||||||
|
description: A Job object
|
||||||
|
content:
|
||||||
|
application/json:
|
||||||
|
schema:
|
||||||
|
$ref: '#/components/schemas/JobResponse'
|
||||||
|
'400':
|
||||||
|
"$ref": "#/components/responses/bad_request"
|
||||||
|
'401':
|
||||||
|
"$ref": "#/components/responses/requires_authentication"
|
||||||
|
'403':
|
||||||
|
"$ref": "#/components/responses/forbidden"
|
||||||
|
'500':
|
||||||
|
"$ref": "#/components/responses/internal_error"
|
||||||
/api/accounts:
|
/api/accounts:
|
||||||
get:
|
get:
|
||||||
summary: List all Accounts
|
summary: List all Accounts
|
||||||
|
|||||||
@@ -1,10 +1,14 @@
|
|||||||
// Package api provides primitives to interact with the openapi HTTP API.
|
// Package api provides primitives to interact with the openapi HTTP API.
|
||||||
//
|
//
|
||||||
// Code generated by github.com/deepmap/oapi-codegen version v1.11.1-0.20220912230023-4a1477f6a8ba DO NOT EDIT.
|
// Code generated by github.com/oapi-codegen/oapi-codegen/v2 version v2.5.0 DO NOT EDIT.
|
||||||
package api
|
package api
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/oapi-codegen/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -104,6 +108,13 @@ const (
|
|||||||
IngressPortAllocationRequestPortRangeProtocolUdp IngressPortAllocationRequestPortRangeProtocol = "udp"
|
IngressPortAllocationRequestPortRangeProtocolUdp IngressPortAllocationRequestPortRangeProtocol = "udp"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Defines values for JobResponseStatus.
|
||||||
|
const (
|
||||||
|
JobResponseStatusFailed JobResponseStatus = "failed"
|
||||||
|
JobResponseStatusPending JobResponseStatus = "pending"
|
||||||
|
JobResponseStatusSucceeded JobResponseStatus = "succeeded"
|
||||||
|
)
|
||||||
|
|
||||||
// Defines values for NameserverNsType.
|
// Defines values for NameserverNsType.
|
||||||
const (
|
const (
|
||||||
NameserverNsTypeUdp NameserverNsType = "udp"
|
NameserverNsTypeUdp NameserverNsType = "udp"
|
||||||
@@ -179,6 +190,11 @@ const (
|
|||||||
UserStatusInvited UserStatus = "invited"
|
UserStatusInvited UserStatus = "invited"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Defines values for WorkloadType.
|
||||||
|
const (
|
||||||
|
WorkloadTypeBundle WorkloadType = "bundle"
|
||||||
|
)
|
||||||
|
|
||||||
// Defines values for GetApiEventsNetworkTrafficParamsType.
|
// Defines values for GetApiEventsNetworkTrafficParamsType.
|
||||||
const (
|
const (
|
||||||
GetApiEventsNetworkTrafficParamsTypeTYPEDROP GetApiEventsNetworkTrafficParamsType = "TYPE_DROP"
|
GetApiEventsNetworkTrafficParamsTypeTYPEDROP GetApiEventsNetworkTrafficParamsType = "TYPE_DROP"
|
||||||
@@ -341,6 +357,47 @@ type AvailablePorts struct {
|
|||||||
Udp int `json:"udp"`
|
Udp int `json:"udp"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// BundleParameters These parameters control what gets included in the bundle and how it is processed.
|
||||||
|
type BundleParameters struct {
|
||||||
|
// Anonymize Whether sensitive data should be anonymized in the bundle.
|
||||||
|
Anonymize bool `json:"anonymize"`
|
||||||
|
|
||||||
|
// BundleFor Whether to generate a bundle for the given timeframe.
|
||||||
|
BundleFor bool `json:"bundle_for"`
|
||||||
|
|
||||||
|
// BundleForTime Time period in minutes for which to generate the bundle.
|
||||||
|
BundleForTime int `json:"bundle_for_time"`
|
||||||
|
|
||||||
|
// LogFileCount Maximum number of log files to include in the bundle.
|
||||||
|
LogFileCount int `json:"log_file_count"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BundleResult defines model for BundleResult.
|
||||||
|
type BundleResult struct {
|
||||||
|
UploadKey *string `json:"upload_key"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BundleWorkloadRequest defines model for BundleWorkloadRequest.
|
||||||
|
type BundleWorkloadRequest struct {
|
||||||
|
// Parameters These parameters control what gets included in the bundle and how it is processed.
|
||||||
|
Parameters BundleParameters `json:"parameters"`
|
||||||
|
|
||||||
|
// Type Identifies the type of workload the job will execute.
|
||||||
|
// Currently only `"bundle"` is supported.
|
||||||
|
Type WorkloadType `json:"type"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BundleWorkloadResponse defines model for BundleWorkloadResponse.
|
||||||
|
type BundleWorkloadResponse struct {
|
||||||
|
// Parameters These parameters control what gets included in the bundle and how it is processed.
|
||||||
|
Parameters BundleParameters `json:"parameters"`
|
||||||
|
Result BundleResult `json:"result"`
|
||||||
|
|
||||||
|
// Type Identifies the type of workload the job will execute.
|
||||||
|
// Currently only `"bundle"` is supported.
|
||||||
|
Type WorkloadType `json:"type"`
|
||||||
|
}
|
||||||
|
|
||||||
// Checks List of objects that perform the actual checks
|
// Checks List of objects that perform the actual checks
|
||||||
type Checks struct {
|
type Checks struct {
|
||||||
// GeoLocationCheck Posture check for geo location
|
// GeoLocationCheck Posture check for geo location
|
||||||
@@ -647,6 +704,25 @@ type IngressPortAllocationRequestPortRange struct {
|
|||||||
// IngressPortAllocationRequestPortRangeProtocol The protocol accepted by the port range
|
// IngressPortAllocationRequestPortRangeProtocol The protocol accepted by the port range
|
||||||
type IngressPortAllocationRequestPortRangeProtocol string
|
type IngressPortAllocationRequestPortRangeProtocol string
|
||||||
|
|
||||||
|
// JobRequest defines model for JobRequest.
|
||||||
|
type JobRequest struct {
|
||||||
|
Workload WorkloadRequest `json:"workload"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// JobResponse defines model for JobResponse.
|
||||||
|
type JobResponse struct {
|
||||||
|
CompletedAt *time.Time `json:"completed_at"`
|
||||||
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
FailedReason *string `json:"failed_reason"`
|
||||||
|
Id string `json:"id"`
|
||||||
|
Status JobResponseStatus `json:"status"`
|
||||||
|
TriggeredBy string `json:"triggered_by"`
|
||||||
|
Workload WorkloadResponse `json:"workload"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// JobResponseStatus defines model for JobResponse.Status.
|
||||||
|
type JobResponseStatus string
|
||||||
|
|
||||||
// Location Describe geographical location information
|
// Location Describe geographical location information
|
||||||
type Location struct {
|
type Location struct {
|
||||||
// CityName Commonly used English name of the city
|
// CityName Commonly used English name of the city
|
||||||
@@ -1863,6 +1939,20 @@ type UserRequest struct {
|
|||||||
Role string `json:"role"`
|
Role string `json:"role"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WorkloadRequest defines model for WorkloadRequest.
|
||||||
|
type WorkloadRequest struct {
|
||||||
|
union json.RawMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
// WorkloadResponse defines model for WorkloadResponse.
|
||||||
|
type WorkloadResponse struct {
|
||||||
|
union json.RawMessage
|
||||||
|
}
|
||||||
|
|
||||||
|
// WorkloadType Identifies the type of workload the job will execute.
|
||||||
|
// Currently only `"bundle"` is supported.
|
||||||
|
type WorkloadType string
|
||||||
|
|
||||||
// GetApiEventsNetworkTrafficParams defines parameters for GetApiEventsNetworkTraffic.
|
// GetApiEventsNetworkTrafficParams defines parameters for GetApiEventsNetworkTraffic.
|
||||||
type GetApiEventsNetworkTrafficParams struct {
|
type GetApiEventsNetworkTrafficParams struct {
|
||||||
// Page Page number
|
// Page Page number
|
||||||
@@ -1980,6 +2070,9 @@ type PostApiPeersPeerIdIngressPortsJSONRequestBody = IngressPortAllocationReques
|
|||||||
// PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody defines body for PutApiPeersPeerIdIngressPortsAllocationId for application/json ContentType.
|
// PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody defines body for PutApiPeersPeerIdIngressPortsAllocationId for application/json ContentType.
|
||||||
type PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody = IngressPortAllocationRequest
|
type PutApiPeersPeerIdIngressPortsAllocationIdJSONRequestBody = IngressPortAllocationRequest
|
||||||
|
|
||||||
|
// PostApiPeersPeerIdJobsJSONRequestBody defines body for PostApiPeersPeerIdJobs for application/json ContentType.
|
||||||
|
type PostApiPeersPeerIdJobsJSONRequestBody = JobRequest
|
||||||
|
|
||||||
// PostApiPeersPeerIdTemporaryAccessJSONRequestBody defines body for PostApiPeersPeerIdTemporaryAccess for application/json ContentType.
|
// PostApiPeersPeerIdTemporaryAccessJSONRequestBody defines body for PostApiPeersPeerIdTemporaryAccess for application/json ContentType.
|
||||||
type PostApiPeersPeerIdTemporaryAccessJSONRequestBody = PeerTemporaryAccessRequest
|
type PostApiPeersPeerIdTemporaryAccessJSONRequestBody = PeerTemporaryAccessRequest
|
||||||
|
|
||||||
@@ -2015,3 +2108,121 @@ type PutApiUsersUserIdJSONRequestBody = UserRequest
|
|||||||
|
|
||||||
// PostApiUsersUserIdTokensJSONRequestBody defines body for PostApiUsersUserIdTokens for application/json ContentType.
|
// PostApiUsersUserIdTokensJSONRequestBody defines body for PostApiUsersUserIdTokens for application/json ContentType.
|
||||||
type PostApiUsersUserIdTokensJSONRequestBody = PersonalAccessTokenRequest
|
type PostApiUsersUserIdTokensJSONRequestBody = PersonalAccessTokenRequest
|
||||||
|
|
||||||
|
// AsBundleWorkloadRequest returns the union data inside the WorkloadRequest as a BundleWorkloadRequest
|
||||||
|
func (t WorkloadRequest) AsBundleWorkloadRequest() (BundleWorkloadRequest, error) {
|
||||||
|
var body BundleWorkloadRequest
|
||||||
|
err := json.Unmarshal(t.union, &body)
|
||||||
|
return body, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromBundleWorkloadRequest overwrites any union data inside the WorkloadRequest as the provided BundleWorkloadRequest
|
||||||
|
func (t *WorkloadRequest) FromBundleWorkloadRequest(v BundleWorkloadRequest) error {
|
||||||
|
v.Type = "bundle"
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
t.union = b
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// MergeBundleWorkloadRequest performs a merge with any union data inside the WorkloadRequest, using the provided BundleWorkloadRequest
|
||||||
|
func (t *WorkloadRequest) MergeBundleWorkloadRequest(v BundleWorkloadRequest) error {
|
||||||
|
v.Type = "bundle"
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
merged, err := runtime.JSONMerge(t.union, b)
|
||||||
|
t.union = merged
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t WorkloadRequest) Discriminator() (string, error) {
|
||||||
|
var discriminator struct {
|
||||||
|
Discriminator string `json:"type"`
|
||||||
|
}
|
||||||
|
err := json.Unmarshal(t.union, &discriminator)
|
||||||
|
return discriminator.Discriminator, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t WorkloadRequest) ValueByDiscriminator() (interface{}, error) {
|
||||||
|
discriminator, err := t.Discriminator()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
switch discriminator {
|
||||||
|
case "bundle":
|
||||||
|
return t.AsBundleWorkloadRequest()
|
||||||
|
default:
|
||||||
|
return nil, errors.New("unknown discriminator value: " + discriminator)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t WorkloadRequest) MarshalJSON() ([]byte, error) {
|
||||||
|
b, err := t.union.MarshalJSON()
|
||||||
|
return b, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *WorkloadRequest) UnmarshalJSON(b []byte) error {
|
||||||
|
err := t.union.UnmarshalJSON(b)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// AsBundleWorkloadResponse returns the union data inside the WorkloadResponse as a BundleWorkloadResponse
|
||||||
|
func (t WorkloadResponse) AsBundleWorkloadResponse() (BundleWorkloadResponse, error) {
|
||||||
|
var body BundleWorkloadResponse
|
||||||
|
err := json.Unmarshal(t.union, &body)
|
||||||
|
return body, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromBundleWorkloadResponse overwrites any union data inside the WorkloadResponse as the provided BundleWorkloadResponse
|
||||||
|
func (t *WorkloadResponse) FromBundleWorkloadResponse(v BundleWorkloadResponse) error {
|
||||||
|
v.Type = "bundle"
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
t.union = b
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// MergeBundleWorkloadResponse performs a merge with any union data inside the WorkloadResponse, using the provided BundleWorkloadResponse
|
||||||
|
func (t *WorkloadResponse) MergeBundleWorkloadResponse(v BundleWorkloadResponse) error {
|
||||||
|
v.Type = "bundle"
|
||||||
|
b, err := json.Marshal(v)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
merged, err := runtime.JSONMerge(t.union, b)
|
||||||
|
t.union = merged
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t WorkloadResponse) Discriminator() (string, error) {
|
||||||
|
var discriminator struct {
|
||||||
|
Discriminator string `json:"type"`
|
||||||
|
}
|
||||||
|
err := json.Unmarshal(t.union, &discriminator)
|
||||||
|
return discriminator.Discriminator, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t WorkloadResponse) ValueByDiscriminator() (interface{}, error) {
|
||||||
|
discriminator, err := t.Discriminator()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
switch discriminator {
|
||||||
|
case "bundle":
|
||||||
|
return t.AsBundleWorkloadResponse()
|
||||||
|
default:
|
||||||
|
return nil, errors.New("unknown discriminator value: " + discriminator)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t WorkloadResponse) MarshalJSON() ([]byte, error) {
|
||||||
|
b, err := t.union.MarshalJSON()
|
||||||
|
return b, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (t *WorkloadResponse) UnmarshalJSON(b []byte) error {
|
||||||
|
err := t.union.UnmarshalJSON(b)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|||||||
File diff suppressed because it is too large
Load Diff
@@ -48,6 +48,9 @@ service ManagementService {
|
|||||||
|
|
||||||
// Logout logs out the peer and removes it from the management server
|
// Logout logs out the peer and removes it from the management server
|
||||||
rpc Logout(EncryptedMessage) returns (Empty) {}
|
rpc Logout(EncryptedMessage) returns (Empty) {}
|
||||||
|
|
||||||
|
// Executes a job on a target peer (e.g., debug bundle)
|
||||||
|
rpc Job(stream EncryptedMessage) returns (stream EncryptedMessage) {}
|
||||||
}
|
}
|
||||||
|
|
||||||
message EncryptedMessage {
|
message EncryptedMessage {
|
||||||
@@ -60,6 +63,42 @@ message EncryptedMessage {
|
|||||||
int32 version = 3;
|
int32 version = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message JobRequest {
|
||||||
|
bytes ID = 1;
|
||||||
|
|
||||||
|
oneof workload_parameters {
|
||||||
|
BundleParameters bundle = 10;
|
||||||
|
//OtherParameters other = 11;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
enum JobStatus {
|
||||||
|
unknown_status = 0; //placeholder
|
||||||
|
succeeded = 1;
|
||||||
|
failed = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
message JobResponse{
|
||||||
|
bytes ID = 1;
|
||||||
|
JobStatus status=2;
|
||||||
|
bytes Reason=3;
|
||||||
|
oneof workload_results {
|
||||||
|
BundleResult bundle = 10;
|
||||||
|
//OtherResult other = 11;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message BundleParameters {
|
||||||
|
bool bundle_for = 1;
|
||||||
|
int64 bundle_for_time = 2;
|
||||||
|
int32 log_file_count = 3;
|
||||||
|
bool anonymize = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message BundleResult {
|
||||||
|
string upload_key = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message SyncRequest {
|
message SyncRequest {
|
||||||
// Meta data of the peer
|
// Meta data of the peer
|
||||||
PeerSystemMeta meta = 1;
|
PeerSystemMeta meta = 1;
|
||||||
|
|||||||
@@ -50,6 +50,8 @@ type ManagementServiceClient interface {
|
|||||||
SyncMeta(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
|
SyncMeta(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
|
||||||
// Logout logs out the peer and removes it from the management server
|
// Logout logs out the peer and removes it from the management server
|
||||||
Logout(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
|
Logout(ctx context.Context, in *EncryptedMessage, opts ...grpc.CallOption) (*Empty, error)
|
||||||
|
// Executes a job on a target peer (e.g., debug bundle)
|
||||||
|
Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type managementServiceClient struct {
|
type managementServiceClient struct {
|
||||||
@@ -155,6 +157,37 @@ func (c *managementServiceClient) Logout(ctx context.Context, in *EncryptedMessa
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *managementServiceClient) Job(ctx context.Context, opts ...grpc.CallOption) (ManagementService_JobClient, error) {
|
||||||
|
stream, err := c.cc.NewStream(ctx, &ManagementService_ServiceDesc.Streams[1], "/management.ManagementService/Job", opts...)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
x := &managementServiceJobClient{stream}
|
||||||
|
return x, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type ManagementService_JobClient interface {
|
||||||
|
Send(*EncryptedMessage) error
|
||||||
|
Recv() (*EncryptedMessage, error)
|
||||||
|
grpc.ClientStream
|
||||||
|
}
|
||||||
|
|
||||||
|
type managementServiceJobClient struct {
|
||||||
|
grpc.ClientStream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *managementServiceJobClient) Send(m *EncryptedMessage) error {
|
||||||
|
return x.ClientStream.SendMsg(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *managementServiceJobClient) Recv() (*EncryptedMessage, error) {
|
||||||
|
m := new(EncryptedMessage)
|
||||||
|
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ManagementServiceServer is the server API for ManagementService service.
|
// ManagementServiceServer is the server API for ManagementService service.
|
||||||
// All implementations must embed UnimplementedManagementServiceServer
|
// All implementations must embed UnimplementedManagementServiceServer
|
||||||
// for forward compatibility
|
// for forward compatibility
|
||||||
@@ -191,6 +224,8 @@ type ManagementServiceServer interface {
|
|||||||
SyncMeta(context.Context, *EncryptedMessage) (*Empty, error)
|
SyncMeta(context.Context, *EncryptedMessage) (*Empty, error)
|
||||||
// Logout logs out the peer and removes it from the management server
|
// Logout logs out the peer and removes it from the management server
|
||||||
Logout(context.Context, *EncryptedMessage) (*Empty, error)
|
Logout(context.Context, *EncryptedMessage) (*Empty, error)
|
||||||
|
// Executes a job on a target peer (e.g., debug bundle)
|
||||||
|
Job(ManagementService_JobServer) error
|
||||||
mustEmbedUnimplementedManagementServiceServer()
|
mustEmbedUnimplementedManagementServiceServer()
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -222,6 +257,9 @@ func (UnimplementedManagementServiceServer) SyncMeta(context.Context, *Encrypted
|
|||||||
func (UnimplementedManagementServiceServer) Logout(context.Context, *EncryptedMessage) (*Empty, error) {
|
func (UnimplementedManagementServiceServer) Logout(context.Context, *EncryptedMessage) (*Empty, error) {
|
||||||
return nil, status.Errorf(codes.Unimplemented, "method Logout not implemented")
|
return nil, status.Errorf(codes.Unimplemented, "method Logout not implemented")
|
||||||
}
|
}
|
||||||
|
func (UnimplementedManagementServiceServer) Job(ManagementService_JobServer) error {
|
||||||
|
return status.Errorf(codes.Unimplemented, "method Job not implemented")
|
||||||
|
}
|
||||||
func (UnimplementedManagementServiceServer) mustEmbedUnimplementedManagementServiceServer() {}
|
func (UnimplementedManagementServiceServer) mustEmbedUnimplementedManagementServiceServer() {}
|
||||||
|
|
||||||
// UnsafeManagementServiceServer may be embedded to opt out of forward compatibility for this service.
|
// UnsafeManagementServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||||
@@ -382,6 +420,32 @@ func _ManagementService_Logout_Handler(srv interface{}, ctx context.Context, dec
|
|||||||
return interceptor(ctx, in, info, handler)
|
return interceptor(ctx, in, info, handler)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func _ManagementService_Job_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||||
|
return srv.(ManagementServiceServer).Job(&managementServiceJobServer{stream})
|
||||||
|
}
|
||||||
|
|
||||||
|
type ManagementService_JobServer interface {
|
||||||
|
Send(*EncryptedMessage) error
|
||||||
|
Recv() (*EncryptedMessage, error)
|
||||||
|
grpc.ServerStream
|
||||||
|
}
|
||||||
|
|
||||||
|
type managementServiceJobServer struct {
|
||||||
|
grpc.ServerStream
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *managementServiceJobServer) Send(m *EncryptedMessage) error {
|
||||||
|
return x.ServerStream.SendMsg(m)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (x *managementServiceJobServer) Recv() (*EncryptedMessage, error) {
|
||||||
|
m := new(EncryptedMessage)
|
||||||
|
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
return m, nil
|
||||||
|
}
|
||||||
|
|
||||||
// ManagementService_ServiceDesc is the grpc.ServiceDesc for ManagementService service.
|
// ManagementService_ServiceDesc is the grpc.ServiceDesc for ManagementService service.
|
||||||
// It's only intended for direct use with grpc.RegisterService,
|
// It's only intended for direct use with grpc.RegisterService,
|
||||||
// and not to be introspected or modified (even as a copy)
|
// and not to be introspected or modified (even as a copy)
|
||||||
@@ -424,6 +488,12 @@ var ManagementService_ServiceDesc = grpc.ServiceDesc{
|
|||||||
Handler: _ManagementService_Sync_Handler,
|
Handler: _ManagementService_Sync_Handler,
|
||||||
ServerStreams: true,
|
ServerStreams: true,
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
StreamName: "Job",
|
||||||
|
Handler: _ManagementService_Job_Handler,
|
||||||
|
ServerStreams: true,
|
||||||
|
ClientStreams: true,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
Metadata: "management.proto",
|
Metadata: "management.proto",
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user