diff --git a/client/cmd/service_controller.go b/client/cmd/service_controller.go index 0d8887698..50fb35d5e 100644 --- a/client/cmd/service_controller.go +++ b/client/cmd/service_controller.go @@ -61,7 +61,7 @@ func (p *program) Start(svc service.Service) error { } } - serverInstance := server.New(p.ctx, util.FindFirstLogPath(logFiles), configPath, profilesDisabled, updateSettingsDisabled, daemonAddr) + serverInstance := server.New(p.ctx, util.FindFirstLogPath(logFiles), configPath, profilesDisabled, updateSettingsDisabled) if err := serverInstance.Start(); err != nil { log.Fatalf("failed to start daemon: %v", err) } diff --git a/client/cmd/testutil_test.go b/client/cmd/testutil_test.go index 543dfdfdf..42cca1a9b 100644 --- a/client/cmd/testutil_test.go +++ b/client/cmd/testutil_test.go @@ -138,7 +138,7 @@ func startClientDaemon( s := grpc.NewServer() server := client.New(ctx, - "", "", false, false, "") + "", "", false, false) if err := server.Start(); err != nil { t.Fatal(err) } diff --git a/client/cmd/up.go b/client/cmd/up.go index d5f103437..1fa58e6ed 100644 --- a/client/cmd/up.go +++ b/client/cmd/up.go @@ -196,7 +196,7 @@ func runInForegroundMode(ctx context.Context, cmd *cobra.Command, activeProf *pr r := peer.NewRecorder(config.ManagementURL.String()) r.GetFullStatus() - connectClient := internal.NewConnectClient(ctx, config, r, daemonAddr) + connectClient := internal.NewConnectClient(ctx, config, r) SetupDebugHandler(ctx, config, r, connectClient, "") return connectClient.Run(nil) diff --git a/client/internal/connect.go b/client/internal/connect.go index efe1f010a..523dcaf1f 100644 --- a/client/internal/connect.go +++ b/client/internal/connect.go @@ -45,20 +45,18 @@ type ConnectClient struct { engineMutex sync.Mutex persistSyncResponse bool - daemonAddress string } func NewConnectClient( ctx context.Context, config *profilemanager.Config, statusRecorder *peer.Status, - daemonAddress string, + ) *ConnectClient { return &ConnectClient{ ctx: ctx, config: config, statusRecorder: statusRecorder, - daemonAddress: daemonAddress, engineMutex: sync.Mutex{}, } } @@ -272,7 +270,7 @@ func (c *ConnectClient) run(mobileDependency MobileDependency, runningChan chan checks := loginResp.GetChecks() c.engineMutex.Lock() - c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks, c.daemonAddress) + c.engine = NewEngine(engineCtx, cancel, signalClient, mgmClient, relayManager, engineConfig, mobileDependency, c.statusRecorder, checks) c.engine.SetSyncResponsePersistence(c.persistSyncResponse) c.engineMutex.Unlock() diff --git a/client/internal/engine.go b/client/internal/engine.go index d350682a5..144c48c11 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -130,6 +130,7 @@ type EngineConfig struct { BlockInbound bool LazyConnectionEnabled bool + DaemonAddress string } // Engine is a mechanism responsible for reacting on Signal and Management stream events and managing connections to the remote peers. @@ -199,8 +200,6 @@ type Engine struct { latestSyncResponse *mgmProto.SyncResponse connSemaphore *semaphoregroup.SemaphoreGroup flowManager nftypes.FlowManager - - daemonAddress string } // Peer is an instance of the Connection Peer @@ -224,7 +223,6 @@ func NewEngine( mobileDep MobileDependency, statusRecorder *peer.Status, checks []*mgmProto.Checks, - daemonAddress string, ) *Engine { engine := &Engine{ clientCtx: clientCtx, @@ -244,7 +242,6 @@ func NewEngine( statusRecorder: statusRecorder, checks: checks, connSemaphore: semaphoregroup.NewSemaphoreGroup(connInitLimit), - daemonAddress: daemonAddress, } sm := profilemanager.NewServiceManager("") @@ -896,9 +893,9 @@ func (e *Engine) updateConfig(conf *mgmProto.PeerConfig) error { return nil } -func (e *Engine) getPeerClient() (*grpc.ClientConn, error) { +func (e *Engine) getPeerClient(addr string) (*grpc.ClientConn, error) { conn, err := grpc.NewClient( - strings.TrimPrefix(e.daemonAddress, "tcp://"), + strings.TrimPrefix(addr, "tcp://"), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { @@ -913,26 +910,20 @@ func (e *Engine) getPeerClient() (*grpc.ClientConn, error) { func (e *Engine) receiveJobEvents() { go func() { err := e.mgmClient.Job(e.ctx, func(msg *mgmProto.JobRequest) *mgmProto.JobResponse { - log.Infof("Received job request: %+v", msg) + resp := mgmProto.JobResponse{ + ID: msg.ID, + Status: mgmProto.JobStatus_failed, + } switch params := msg.WorkloadParameters.(type) { case *mgmProto.JobRequest_Bundle: - uploadKey, err := e.handleBundle(params.Bundle) + bundleResult, err := e.handleBundle(params.Bundle) if err != nil { - return &mgmProto.JobResponse{ - ID: msg.ID, - Status: mgmProto.JobStatus_failed, - Reason: []byte(err.Error()), - } - } - return &mgmProto.JobResponse{ - ID: msg.ID, - Status: mgmProto.JobStatus_succeeded, - WorkloadResults: &mgmProto.JobResponse_Bundle{ - Bundle: &mgmProto.BundleResult{ - UploadKey: uploadKey, - }, - }, + resp.Reason = []byte(err.Error()) + return &resp } + resp.Status = mgmProto.JobStatus_succeeded + resp.WorkloadResults = bundleResult + return &resp default: return nil } @@ -949,10 +940,11 @@ func (e *Engine) receiveJobEvents() { log.Debugf("connecting to Management Service jobs stream") } -func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) { - conn, err := e.getPeerClient() +func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (*mgmProto.JobResponse_Bundle, error) { + // todo: implement with real daemon address + conn, err := e.getPeerClient("unix:///var/run/netbird.sock") if err != nil { - return "", err + return nil, err } defer func() { if err := conn.Close(); err != nil { @@ -962,7 +954,7 @@ func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) statusOutput, err := e.getStatusOutput(params.Anonymize) if err != nil { - return "", err + return nil, err } request := &cProto.DebugBundleRequest{ Anonymize: params.Anonymize, @@ -974,18 +966,24 @@ func (e *Engine) handleBundle(params *mgmProto.BundleParameters) (string, error) service := cProto.NewDaemonServiceClient(conn) resp, err := service.DebugBundle(e.clientCtx, request) if err != nil { - return "", fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message()) + return nil, fmt.Errorf("failed to bundle debug: " + status.Convert(err).Message()) } if resp.GetUploadFailureReason() != "" { - return "", fmt.Errorf("upload failed: " + resp.GetUploadFailureReason()) + return nil, fmt.Errorf("upload failed: " + resp.GetUploadFailureReason()) } - return resp.GetUploadedKey(), nil + // return resp.GetUploadedKey(), nil + + return &mgmProto.JobResponse_Bundle{ + Bundle: &mgmProto.BundleResult{ + UploadKey: resp.GetUploadedKey(), + }, + }, nil } func (e *Engine) getStatusOutput(anon bool) (string, error) { // todo: implement with real daemon address - conn, err := e.getPeerClient() + conn, err := e.getPeerClient("unix:///var/run/netbird.sock") if err != nil { return "", err } diff --git a/client/internal/engine_test.go b/client/internal/engine_test.go index 0c5278640..c2c9ae84a 100644 --- a/client/internal/engine_test.go +++ b/client/internal/engine_test.go @@ -234,7 +234,6 @@ func TestEngine_SSH(t *testing.T) { MobileDependency{}, peer.NewRecorder("https://mgm"), nil, - "", ) engine.dnsServer = &dns.MockServer{ @@ -378,7 +377,7 @@ func TestEngine_UpdateNetworkMap(t *testing.T) { }, MobileDependency{}, peer.NewRecorder("https://mgm"), - nil, "") + nil) wgIface := &MockWGIface{ NameFunc: func() string { return "utun102" }, @@ -596,7 +595,7 @@ func TestEngine_Sync(t *testing.T) { WgAddr: "100.64.0.1/24", WgPrivateKey: key, WgPort: 33100, - }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, "") + }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil) engine.ctx = ctx engine.dnsServer = &dns.MockServer{ @@ -760,7 +759,7 @@ func TestEngine_UpdateNetworkMapWithRoutes(t *testing.T) { WgAddr: wgAddr, WgPrivateKey: key, WgPort: 33100, - }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, "") + }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil) engine.ctx = ctx newNet, err := stdnet.NewNet() if err != nil { @@ -961,7 +960,7 @@ func TestEngine_UpdateNetworkMapWithDNSUpdate(t *testing.T) { WgAddr: wgAddr, WgPrivateKey: key, WgPort: 33100, - }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, "") + }, MobileDependency{}, peer.NewRecorder("https://mgm"), nil) engine.ctx = ctx newNet, err := stdnet.NewNet() @@ -1485,7 +1484,7 @@ func createEngine(ctx context.Context, cancel context.CancelFunc, setupKey strin } relayMgr := relayClient.NewManager(ctx, nil, key.PublicKey().String()) - e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil, ""), nil + e, err := NewEngine(ctx, cancel, signalClient, mgmtClient, relayMgr, conf, MobileDependency{}, peer.NewRecorder("https://mgm"), nil), nil e.ctx = ctx return e, err } diff --git a/client/server/server.go b/client/server/server.go index 316027b45..f2e8dc12a 100644 --- a/client/server/server.go +++ b/client/server/server.go @@ -78,7 +78,6 @@ type Server struct { profileManager *profilemanager.ServiceManager profilesDisabled bool updateSettingsDisabled bool - daemonAddress string } type oauthAuthFlow struct { @@ -89,7 +88,7 @@ type oauthAuthFlow struct { } // New server instance constructor. -func New(ctx context.Context, logFile string, configFile string, profilesDisabled bool, updateSettingsDisabled bool, daemonAddress string) *Server { +func New(ctx context.Context, logFile string, configFile string, profilesDisabled bool, updateSettingsDisabled bool) *Server { return &Server{ rootCtx: ctx, logFile: logFile, @@ -98,7 +97,6 @@ func New(ctx context.Context, logFile string, configFile string, profilesDisable profileManager: profilemanager.NewServiceManager(configFile), profilesDisabled: profilesDisabled, updateSettingsDisabled: updateSettingsDisabled, - daemonAddress: daemonAddress, } } @@ -237,7 +235,7 @@ func (s *Server) connectWithRetryRuns(ctx context.Context, config *profilemanage runOperation := func() error { log.Tracef("running client connection") - s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder, s.daemonAddress) + s.connectClient = internal.NewConnectClient(ctx, config, statusRecorder) s.connectClient.SetSyncResponsePersistence(s.persistSyncResponse) err := s.connectClient.Run(runningChan) diff --git a/client/server/server_test.go b/client/server/server_test.go index cf9dae8ff..6f7c4a89a 100644 --- a/client/server/server_test.go +++ b/client/server/server_test.go @@ -95,7 +95,7 @@ func TestConnectWithRetryRuns(t *testing.T) { t.Fatalf("failed to set active profile state: %v", err) } - s := New(ctx, "debug", "", false, false, "") + s := New(ctx, "debug", "", false, false) s.config = config @@ -152,7 +152,7 @@ func TestServer_Up(t *testing.T) { t.Fatalf("failed to set active profile state: %v", err) } - s := New(ctx, "console", "", false, false, "") + s := New(ctx, "console", "", false, false) err = s.Start() require.NoError(t, err) @@ -228,7 +228,7 @@ func TestServer_SubcribeEvents(t *testing.T) { t.Fatalf("failed to set active profile state: %v", err) } - s := New(ctx, "console", "", false, false, "") + s := New(ctx, "console", "", false, false) err = s.Start() require.NoError(t, err) diff --git a/management/server/jobChannel.go b/management/server/jobChannel.go index 9c64d6102..3dbbe0e2c 100644 --- a/management/server/jobChannel.go +++ b/management/server/jobChannel.go @@ -8,6 +8,7 @@ import ( "github.com/netbirdio/netbird/management/server/store" "github.com/netbirdio/netbird/management/server/telemetry" + "github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/shared/management/proto" log "github.com/sirupsen/logrus" ) @@ -96,17 +97,23 @@ func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobRespons jm.mu.Lock() defer jm.mu.Unlock() - event, ok := jm.pending[string(resp.ID)] + jobID := string(resp.ID) + + event, ok := jm.pending[jobID] if !ok { - return fmt.Errorf("job %s not found", resp.ID) + return fmt.Errorf("job %s not found", jobID) + } + var job types.Job + if err := job.ApplyResponse(resp); err != nil { + return fmt.Errorf("invalid job response: %v", err) } //update or create the store for job response - err := jm.Store.CompletePeerJob(ctx, resp) + err := jm.Store.CompletePeerJob(ctx, &job) if err == nil { event.Response = resp } - delete(jm.pending, string(resp.ID)) + delete(jm.pending, jobID) return err } diff --git a/management/server/store/sql_store.go b/management/server/store/sql_store.go index 6eddeb1ca..f27eddb2f 100644 --- a/management/server/store/sql_store.go +++ b/management/server/store/sql_store.go @@ -34,7 +34,6 @@ import ( "github.com/netbirdio/netbird/management/server/types" "github.com/netbirdio/netbird/management/server/util" "github.com/netbirdio/netbird/route" - "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/shared/management/status" ) @@ -137,14 +136,10 @@ func (s *SqlStore) CreatePeerJob(ctx context.Context, job *types.Job) error { return nil } -func (s *SqlStore) CompletePeerJob(ctx context.Context, jobResponse *proto.JobResponse) error { - var job types.Job - if err := job.ApplyResponse(jobResponse); err != nil { - return status.Errorf(status.Internal, err.Error()) - } +func (s *SqlStore) CompletePeerJob(ctx context.Context, job *types.Job) error { result := s.db. Model(&types.Job{}). - Where(idQueryCondition, string(jobResponse.GetID())). + Where(idQueryCondition, job.ID). Updates(job) if result.Error != nil { diff --git a/management/server/store/store.go b/management/server/store/store.go index 42c4dbec9..d8566e086 100644 --- a/management/server/store/store.go +++ b/management/server/store/store.go @@ -26,7 +26,6 @@ import ( "github.com/netbirdio/netbird/management/server/telemetry" "github.com/netbirdio/netbird/management/server/testutil" "github.com/netbirdio/netbird/management/server/types" - "github.com/netbirdio/netbird/shared/management/proto" "github.com/netbirdio/netbird/util" "github.com/netbirdio/netbird/management/server/migration" @@ -207,7 +206,7 @@ type Store interface { MarkAccountPrimary(ctx context.Context, accountID string) error UpdateAccountNetwork(ctx context.Context, accountID string, ipNet net.IPNet) error CreatePeerJob(ctx context.Context, job *types.Job) error - CompletePeerJob(ctx context.Context, jobResponse *proto.JobResponse) error + CompletePeerJob(ctx context.Context, job *types.Job) error GetPeerJobByID(ctx context.Context, accountID, jobID string) (*types.Job, error) GetPeerJobs(ctx context.Context, accountID, peerID string) ([]*types.Job, error) MarkPendingJobsAsFailed(ctx context.Context, accountID, peerID, reason string) error