mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-16 15:26:40 +00:00
Fix lint issue
This commit is contained in:
@@ -26,8 +26,8 @@ func (s *BaseServer) PeersUpdateManager() network_map.PeersUpdateManager {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *BaseServer) JobManager() *job.JobManager {
|
||||
return Create(s, func() *job.JobManager {
|
||||
func (s *BaseServer) JobManager() *job.Manager {
|
||||
return Create(s, func() *job.Manager {
|
||||
return job.NewJobManager(s.Metrics(), s.Store())
|
||||
})
|
||||
}
|
||||
|
||||
@@ -60,7 +60,7 @@ type Server struct {
|
||||
wgKey wgtypes.Key
|
||||
proto.UnimplementedManagementServiceServer
|
||||
peersUpdateManager network_map.PeersUpdateManager
|
||||
jobManager *job.JobManager
|
||||
jobManager *job.Manager
|
||||
config *nbconfig.Config
|
||||
secretsManager SecretsManager
|
||||
appMetrics telemetry.AppMetrics
|
||||
@@ -86,7 +86,7 @@ func NewServer(
|
||||
accountManager account.Manager,
|
||||
settingsManager settings.Manager,
|
||||
peersUpdateManager network_map.PeersUpdateManager,
|
||||
jobManager *job.JobManager,
|
||||
jobManager *job.Manager,
|
||||
secretsManager SecretsManager,
|
||||
appMetrics telemetry.AppMetrics,
|
||||
ephemeralManager ephemeral.Manager,
|
||||
@@ -376,7 +376,7 @@ func (s *Server) sendJobsLoop(
|
||||
accountID string,
|
||||
peerKey wgtypes.Key,
|
||||
peer *nbpeer.Peer,
|
||||
updates <-chan *job.JobEvent,
|
||||
updates <-chan *job.Event,
|
||||
srv proto.ManagementService_JobServer,
|
||||
) error {
|
||||
for {
|
||||
@@ -454,7 +454,7 @@ func (s *Server) sendUpdate(ctx context.Context, accountID string, peerKey wgtyp
|
||||
|
||||
// sendJob encrypts the update message using the peer key and the server's wireguard key,
|
||||
// then sends the encrypted message to the connected peer via the sync server.
|
||||
func (s *Server) sendJob(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, job *job.JobEvent, srv proto.ManagementService_JobServer) error {
|
||||
func (s *Server) sendJob(ctx context.Context, accountID string, peerKey wgtypes.Key, peer *nbpeer.Peer, job *job.Event, srv proto.ManagementService_JobServer) error {
|
||||
encryptedResp, err := encryption.EncryptMessage(peerKey, s.wgKey, job.Request)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Errorf("failed to encrypt job for peer %s: %v", peerKey.String(), err)
|
||||
|
||||
@@ -72,7 +72,7 @@ type DefaultAccountManager struct {
|
||||
// cacheLoading keeps the accountIDs that are currently reloading. The accountID has to be removed once cache has been reloaded
|
||||
cacheLoading map[string]chan struct{}
|
||||
networkMapController network_map.Controller
|
||||
jobManager *job.JobManager
|
||||
jobManager *job.Manager
|
||||
idpManager idp.Manager
|
||||
cacheManager *nbcache.AccountUserDataCache
|
||||
externalCacheManager nbcache.UserDataCache
|
||||
@@ -182,7 +182,7 @@ func BuildManager(
|
||||
config *nbconfig.Config,
|
||||
store store.Store,
|
||||
networkMapController network_map.Controller,
|
||||
jobManager *job.JobManager,
|
||||
jobManager *job.Manager,
|
||||
idpManager idp.Manager,
|
||||
singleAccountModeDomain string,
|
||||
eventStore activity.Store,
|
||||
|
||||
@@ -16,26 +16,26 @@ import (
|
||||
|
||||
const jobChannelBuffer = 100
|
||||
|
||||
type JobEvent struct {
|
||||
type Event struct {
|
||||
PeerID string
|
||||
Request *proto.JobRequest
|
||||
Response *proto.JobResponse
|
||||
}
|
||||
|
||||
type JobManager struct {
|
||||
type Manager struct {
|
||||
mu *sync.RWMutex
|
||||
jobChannels map[string]chan *JobEvent // per-peer job streams
|
||||
pending map[string]*JobEvent // jobID → event
|
||||
jobChannels map[string]chan *Event // per-peer job streams
|
||||
pending map[string]*Event // jobID → event
|
||||
responseWait time.Duration
|
||||
metrics telemetry.AppMetrics
|
||||
Store store.Store
|
||||
}
|
||||
|
||||
func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager {
|
||||
func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *Manager {
|
||||
|
||||
return &JobManager{
|
||||
jobChannels: make(map[string]chan *JobEvent),
|
||||
pending: make(map[string]*JobEvent),
|
||||
return &Manager{
|
||||
jobChannels: make(map[string]chan *Event),
|
||||
pending: make(map[string]*Event),
|
||||
responseWait: 5 * time.Minute,
|
||||
metrics: metrics,
|
||||
mu: &sync.RWMutex{},
|
||||
@@ -44,7 +44,7 @@ func NewJobManager(metrics telemetry.AppMetrics, store store.Store) *JobManager
|
||||
}
|
||||
|
||||
// CreateJobChannel creates or replaces a channel for a peer
|
||||
func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID string) chan *JobEvent {
|
||||
func (jm *Manager) CreateJobChannel(ctx context.Context, accountID, peerID string) chan *Event {
|
||||
// all pending jobs stored in db for this peer should be failed
|
||||
if err := jm.Store.MarkPendingJobsAsFailed(ctx, accountID, peerID, "Pending job cleanup: marked as failed automatically due to being stuck too long"); err != nil {
|
||||
log.WithContext(ctx).Error(err.Error())
|
||||
@@ -58,13 +58,13 @@ func (jm *JobManager) CreateJobChannel(ctx context.Context, accountID, peerID st
|
||||
delete(jm.jobChannels, peerID)
|
||||
}
|
||||
|
||||
ch := make(chan *JobEvent, jobChannelBuffer)
|
||||
ch := make(chan *Event, jobChannelBuffer)
|
||||
jm.jobChannels[peerID] = ch
|
||||
return ch
|
||||
}
|
||||
|
||||
// SendJob sends a job to a peer and tracks it as pending
|
||||
func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req *proto.JobRequest) error {
|
||||
func (jm *Manager) SendJob(ctx context.Context, accountID, peerID string, req *proto.JobRequest) error {
|
||||
jm.mu.RLock()
|
||||
ch, ok := jm.jobChannels[peerID]
|
||||
jm.mu.RUnlock()
|
||||
@@ -72,7 +72,7 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req
|
||||
return fmt.Errorf("peer %s has no channel", peerID)
|
||||
}
|
||||
|
||||
event := &JobEvent{
|
||||
event := &Event{
|
||||
PeerID: peerID,
|
||||
Request: req,
|
||||
}
|
||||
@@ -94,7 +94,7 @@ func (jm *JobManager) SendJob(ctx context.Context, accountID, peerID string, req
|
||||
}
|
||||
|
||||
// HandleResponse marks a job as finished and moves it to completed
|
||||
func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error {
|
||||
func (jm *Manager) HandleResponse(ctx context.Context, resp *proto.JobResponse) error {
|
||||
jm.mu.Lock()
|
||||
defer jm.mu.Unlock()
|
||||
|
||||
@@ -120,7 +120,7 @@ func (jm *JobManager) HandleResponse(ctx context.Context, resp *proto.JobRespons
|
||||
}
|
||||
|
||||
// CloseChannel closes a peer’s channel and cleans up its jobs
|
||||
func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string) {
|
||||
func (jm *Manager) CloseChannel(ctx context.Context, accountID, peerID string) {
|
||||
jm.mu.Lock()
|
||||
defer jm.mu.Unlock()
|
||||
|
||||
@@ -142,7 +142,7 @@ func (jm *JobManager) CloseChannel(ctx context.Context, accountID, peerID string
|
||||
}
|
||||
|
||||
// cleanup removes a pending job safely
|
||||
func (jm *JobManager) cleanup(ctx context.Context, accountID, jobID string, reason string) {
|
||||
func (jm *Manager) cleanup(ctx context.Context, accountID, jobID string, reason string) {
|
||||
jm.mu.Lock()
|
||||
defer jm.mu.Unlock()
|
||||
|
||||
@@ -154,7 +154,7 @@ func (jm *JobManager) cleanup(ctx context.Context, accountID, jobID string, reas
|
||||
}
|
||||
}
|
||||
|
||||
func (jm *JobManager) IsPeerConnected(peerID string) bool {
|
||||
func (jm *Manager) IsPeerConnected(peerID string) bool {
|
||||
jm.mu.RLock()
|
||||
defer jm.mu.RUnlock()
|
||||
|
||||
@@ -162,7 +162,7 @@ func (jm *JobManager) IsPeerConnected(peerID string) bool {
|
||||
return ok
|
||||
}
|
||||
|
||||
func (jm *JobManager) IsPeerHasPendingJobs(peerID string) bool {
|
||||
func (jm *Manager) IsPeerHasPendingJobs(peerID string) bool {
|
||||
jm.mu.RLock()
|
||||
defer jm.mu.RUnlock()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user