From 6820f8d23e1e6b7cce939cdb56f8040c65ecded4 Mon Sep 17 00:00:00 2001 From: Owen Date: Sun, 3 Aug 2025 16:12:00 -0700 Subject: [PATCH 1/5] Add basic heathchecks --- healthcheck/healthcheck.go | 352 +++++++++++++++++++++++++++++++++++++ main.go | 152 ++++++++++++++++ 2 files changed, 504 insertions(+) create mode 100644 healthcheck/healthcheck.go diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go new file mode 100644 index 0000000..dafa19b --- /dev/null +++ b/healthcheck/healthcheck.go @@ -0,0 +1,352 @@ +package healthcheck + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "strings" + "sync" + "time" +) + +// Status represents the health status of a target +type Status int + +const ( + StatusUnknown Status = iota + StatusHealthy + StatusUnhealthy +) + +func (s Status) String() string { + switch s { + case StatusHealthy: + return "healthy" + case StatusUnhealthy: + return "unhealthy" + default: + return "unknown" + } +} + +// Config holds the health check configuration for a target +type Config struct { + ID string `json:"id"` + Enabled bool `json:"hcEnabled"` + Path string `json:"hcPath"` + Scheme string `json:"hcScheme"` + Mode string `json:"hcMode"` + Hostname string `json:"hcHostname"` + Port int `json:"hcPort"` + Interval int `json:"hcInterval"` // in seconds + UnhealthyInterval int `json:"hcUnhealthyInterval"` // in seconds + Timeout int `json:"hcTimeout"` // in seconds + Headers map[string]string `json:"hcHeaders"` + Method string `json:"hcMethod"` +} + +// Target represents a health check target with its current status +type Target struct { + Config Config `json:"config"` + Status Status `json:"status"` + LastCheck time.Time `json:"lastCheck"` + LastError string `json:"lastError,omitempty"` + CheckCount int `json:"checkCount"` + ticker *time.Ticker + ctx context.Context + cancel context.CancelFunc +} + +// StatusChangeCallback is called when any target's status changes +type StatusChangeCallback func(targets map[string]*Target) + +// Monitor manages health check targets and their monitoring +type Monitor struct { + targets map[string]*Target + mutex sync.RWMutex + callback StatusChangeCallback + client *http.Client +} + +// NewMonitor creates a new health check monitor +func NewMonitor(callback StatusChangeCallback) *Monitor { + return &Monitor{ + targets: make(map[string]*Target), + callback: callback, + client: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +// parseHeaders parses the headers string into a map +func parseHeaders(headersStr string) map[string]string { + headers := make(map[string]string) + if headersStr == "" { + return headers + } + + // Try to parse as JSON first + if err := json.Unmarshal([]byte(headersStr), &headers); err == nil { + return headers + } + + // Fallback to simple key:value parsing + pairs := strings.Split(headersStr, ",") + for _, pair := range pairs { + kv := strings.SplitN(strings.TrimSpace(pair), ":", 2) + if len(kv) == 2 { + headers[strings.TrimSpace(kv[0])] = strings.TrimSpace(kv[1]) + } + } + return headers +} + +// AddTarget adds a new health check target +func (m *Monitor) AddTarget(config Config) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + // Set defaults + if config.Scheme == "" { + config.Scheme = "http" + } + if config.Mode == "" { + config.Mode = "http" + } + if config.Method == "" { + config.Method = "GET" + } + if config.Interval == 0 { + config.Interval = 30 + } + if config.UnhealthyInterval == 0 { + config.UnhealthyInterval = 30 + } + if config.Timeout == 0 { + config.Timeout = 5 + } + + // Parse headers if provided as string + if len(config.Headers) == 0 && config.Path != "" { + // This is a simplified header parsing - in real use you might want more robust parsing + config.Headers = make(map[string]string) + } + + // Remove existing target if it exists + if existing, exists := m.targets[config.ID]; exists { + existing.cancel() + } + + // Create new target + ctx, cancel := context.WithCancel(context.Background()) + target := &Target{ + Config: config, + Status: StatusUnknown, + ctx: ctx, + cancel: cancel, + } + + m.targets[config.ID] = target + + // Start monitoring if enabled + if config.Enabled { + go m.monitorTarget(target) + } + + return nil +} + +// RemoveTarget removes a health check target +func (m *Monitor) RemoveTarget(id string) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + target, exists := m.targets[id] + if !exists { + return fmt.Errorf("target with id %s not found", id) + } + + target.cancel() + delete(m.targets, id) + + // Notify callback of status change + if m.callback != nil { + go m.callback(m.getAllTargets()) + } + + return nil +} + +// GetTargets returns a copy of all targets +func (m *Monitor) GetTargets() map[string]*Target { + return m.getAllTargets() +} + +// getAllTargets returns a copy of all targets (internal method) +func (m *Monitor) getAllTargets() map[string]*Target { + m.mutex.RLock() + defer m.mutex.RUnlock() + + targets := make(map[string]*Target) + for id, target := range m.targets { + // Create a copy to avoid race conditions + targetCopy := *target + targets[id] = &targetCopy + } + return targets +} + +// monitorTarget monitors a single target +func (m *Monitor) monitorTarget(target *Target) { + // Initial check + m.performHealthCheck(target) + + // Set up ticker based on current status + interval := time.Duration(target.Config.Interval) * time.Second + if target.Status == StatusUnhealthy { + interval = time.Duration(target.Config.UnhealthyInterval) * time.Second + } + + target.ticker = time.NewTicker(interval) + defer target.ticker.Stop() + + for { + select { + case <-target.ctx.Done(): + return + case <-target.ticker.C: + oldStatus := target.Status + m.performHealthCheck(target) + + // Update ticker interval if status changed + newInterval := time.Duration(target.Config.Interval) * time.Second + if target.Status == StatusUnhealthy { + newInterval = time.Duration(target.Config.UnhealthyInterval) * time.Second + } + + if newInterval != interval { + target.ticker.Stop() + target.ticker = time.NewTicker(newInterval) + interval = newInterval + } + + // Notify callback if status changed + if oldStatus != target.Status && m.callback != nil { + go m.callback(m.getAllTargets()) + } + } + } +} + +// performHealthCheck performs a health check on a target +func (m *Monitor) performHealthCheck(target *Target) { + target.CheckCount++ + target.LastCheck = time.Now() + target.LastError = "" + + // Build URL + url := fmt.Sprintf("%s://%s", target.Config.Scheme, target.Config.Hostname) + if target.Config.Port > 0 { + url = fmt.Sprintf("%s:%d", url, target.Config.Port) + } + if target.Config.Path != "" { + if !strings.HasPrefix(target.Config.Path, "/") { + url += "/" + } + url += target.Config.Path + } + + // Create request + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(target.Config.Timeout)*time.Second) + defer cancel() + + req, err := http.NewRequestWithContext(ctx, target.Config.Method, url, nil) + if err != nil { + target.Status = StatusUnhealthy + target.LastError = fmt.Sprintf("failed to create request: %v", err) + return + } + + // Add headers + for key, value := range target.Config.Headers { + req.Header.Set(key, value) + } + + // Perform request + resp, err := m.client.Do(req) + if err != nil { + target.Status = StatusUnhealthy + target.LastError = fmt.Sprintf("request failed: %v", err) + return + } + defer resp.Body.Close() + + // Check response status + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + target.Status = StatusHealthy + } else { + target.Status = StatusUnhealthy + target.LastError = fmt.Sprintf("unhealthy status code: %d", resp.StatusCode) + } +} + +// Stop stops monitoring all targets +func (m *Monitor) Stop() { + m.mutex.Lock() + defer m.mutex.Unlock() + + for _, target := range m.targets { + target.cancel() + } + m.targets = make(map[string]*Target) +} + +// EnableTarget enables monitoring for a specific target +func (m *Monitor) EnableTarget(id string) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + target, exists := m.targets[id] + if !exists { + return fmt.Errorf("target with id %s not found", id) + } + + if !target.Config.Enabled { + target.Config.Enabled = true + target.cancel() // Stop existing monitoring + + ctx, cancel := context.WithCancel(context.Background()) + target.ctx = ctx + target.cancel = cancel + + go m.monitorTarget(target) + } + + return nil +} + +// DisableTarget disables monitoring for a specific target +func (m *Monitor) DisableTarget(id string) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + target, exists := m.targets[id] + if !exists { + return fmt.Errorf("target with id %s not found", id) + } + + if target.Config.Enabled { + target.Config.Enabled = false + target.cancel() + target.Status = StatusUnknown + + // Notify callback of status change + if m.callback != nil { + go m.callback(m.getAllTargets()) + } + } + + return nil +} diff --git a/main.go b/main.go index 483aa23..e8e8896 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( "time" "github.com/fosrl/newt/docker" + "github.com/fosrl/newt/healthcheck" "github.com/fosrl/newt/logger" "github.com/fosrl/newt/proxy" "github.com/fosrl/newt/updates" @@ -99,6 +100,7 @@ var ( healthFile string useNativeInterface bool authorizedKeysFile string + healthMonitor *healthcheck.Monitor ) func main() { @@ -895,6 +897,152 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub logger.Info("SSH public key appended to authorized keys file") }) + // Initialize health check monitor with status change callback + healthMonitor = healthcheck.NewMonitor(func(targets map[string]*healthcheck.Target) { + logger.Debug("Health check status update for %d targets", len(targets)) + + // Send health status update to the server + healthStatuses := make(map[string]interface{}) + for id, target := range targets { + healthStatuses[id] = map[string]interface{}{ + "status": target.Status.String(), + "lastCheck": target.LastCheck.Format(time.RFC3339), + "checkCount": target.CheckCount, + "lastError": target.LastError, + "config": target.Config, + } + } + + err := client.SendMessage("newt/healthcheck/status", map[string]interface{}{ + "targets": healthStatuses, + }) + if err != nil { + logger.Error("Failed to send health check status update: %v", err) + } + }) + + // Register handler for adding health check targets + client.RegisterHandler("newt/healthcheck/add", func(msg websocket.WSMessage) { + logger.Debug("Received health check add request: %+v", msg) + + var config healthcheck.Config + jsonData, err := json.Marshal(msg.Data) + if err != nil { + logger.Error("Error marshaling health check data: %v", err) + return + } + + if err := json.Unmarshal(jsonData, &config); err != nil { + logger.Error("Error unmarshaling health check config: %v", err) + return + } + + if err := healthMonitor.AddTarget(config); err != nil { + logger.Error("Failed to add health check target %s: %v", config.ID, err) + } else { + logger.Info("Added health check target: %s", config.ID) + } + }) + + // Register handler for removing health check targets + client.RegisterHandler("newt/healthcheck/remove", func(msg websocket.WSMessage) { + logger.Debug("Received health check remove request: %+v", msg) + + var requestData struct { + ID string `json:"id"` + } + jsonData, err := json.Marshal(msg.Data) + if err != nil { + logger.Error("Error marshaling health check remove data: %v", err) + return + } + + if err := json.Unmarshal(jsonData, &requestData); err != nil { + logger.Error("Error unmarshaling health check remove request: %v", err) + return + } + + if err := healthMonitor.RemoveTarget(requestData.ID); err != nil { + logger.Error("Failed to remove health check target %s: %v", requestData.ID, err) + } else { + logger.Info("Removed health check target: %s", requestData.ID) + } + }) + + // Register handler for enabling health check targets + client.RegisterHandler("newt/healthcheck/enable", func(msg websocket.WSMessage) { + logger.Debug("Received health check enable request: %+v", msg) + + var requestData struct { + ID string `json:"id"` + } + jsonData, err := json.Marshal(msg.Data) + if err != nil { + logger.Error("Error marshaling health check enable data: %v", err) + return + } + + if err := json.Unmarshal(jsonData, &requestData); err != nil { + logger.Error("Error unmarshaling health check enable request: %v", err) + return + } + + if err := healthMonitor.EnableTarget(requestData.ID); err != nil { + logger.Error("Failed to enable health check target %s: %v", requestData.ID, err) + } else { + logger.Info("Enabled health check target: %s", requestData.ID) + } + }) + + // Register handler for disabling health check targets + client.RegisterHandler("newt/healthcheck/disable", func(msg websocket.WSMessage) { + logger.Debug("Received health check disable request: %+v", msg) + + var requestData struct { + ID string `json:"id"` + } + jsonData, err := json.Marshal(msg.Data) + if err != nil { + logger.Error("Error marshaling health check disable data: %v", err) + return + } + + if err := json.Unmarshal(jsonData, &requestData); err != nil { + logger.Error("Error unmarshaling health check disable request: %v", err) + return + } + + if err := healthMonitor.DisableTarget(requestData.ID); err != nil { + logger.Error("Failed to disable health check target %s: %v", requestData.ID, err) + } else { + logger.Info("Disabled health check target: %s", requestData.ID) + } + }) + + // Register handler for getting health check status + client.RegisterHandler("newt/healthcheck/status/request", func(msg websocket.WSMessage) { + logger.Debug("Received health check status request") + + targets := healthMonitor.GetTargets() + healthStatuses := make(map[string]interface{}) + for id, target := range targets { + healthStatuses[id] = map[string]interface{}{ + "status": target.Status.String(), + "lastCheck": target.LastCheck.Format(time.RFC3339), + "checkCount": target.CheckCount, + "lastError": target.LastError, + "config": target.Config, + } + } + + err := client.SendMessage("newt/healthcheck/status", map[string]interface{}{ + "targets": healthStatuses, + }) + if err != nil { + logger.Error("Failed to send health check status response: %v", err) + } + }) + client.OnConnect(func() error { publicKey = privateKey.PublicKey() logger.Debug("Public key: %s", publicKey) @@ -936,6 +1084,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub // Close clients first (including WGTester) closeClients() + if healthMonitor != nil { + healthMonitor.Stop() + } + if dev != nil { dev.Close() } From e8612c7e6bb82722e58f91f60bc9cc4ebeb2cf45 Mon Sep 17 00:00:00 2001 From: Owen Date: Sun, 3 Aug 2025 17:02:15 -0700 Subject: [PATCH 2/5] Handle adding and removing healthchecks --- healthcheck/healthcheck.go | 79 ++++++++++++++++++++++++++++++++++---- main.go | 45 +++++++++++++++------- 2 files changed, 102 insertions(+), 22 deletions(-) diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go index dafa19b..092025d 100644 --- a/healthcheck/healthcheck.go +++ b/healthcheck/healthcheck.go @@ -108,6 +108,30 @@ func (m *Monitor) AddTarget(config Config) error { m.mutex.Lock() defer m.mutex.Unlock() + return m.addTargetUnsafe(config) +} + +// AddTargets adds multiple health check targets in bulk +func (m *Monitor) AddTargets(configs []Config) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + for _, config := range configs { + if err := m.addTargetUnsafe(config); err != nil { + return fmt.Errorf("failed to add target %s: %v", config.ID, err) + } + } + + // Notify callback once after all targets are added + if m.callback != nil { + go m.callback(m.getAllTargetsUnsafe()) + } + + return nil +} + +// addTargetUnsafe adds a target without acquiring the mutex (internal method) +func (m *Monitor) addTargetUnsafe(config Config) error { // Set defaults if config.Scheme == "" { config.Scheme = "http" @@ -173,22 +197,56 @@ func (m *Monitor) RemoveTarget(id string) error { // Notify callback of status change if m.callback != nil { - go m.callback(m.getAllTargets()) + go m.callback(m.GetTargets()) } return nil } -// GetTargets returns a copy of all targets -func (m *Monitor) GetTargets() map[string]*Target { - return m.getAllTargets() +// RemoveTargets removes multiple health check targets +func (m *Monitor) RemoveTargets(ids []string) error { + m.mutex.Lock() + defer m.mutex.Unlock() + + var notFound []string + + for _, id := range ids { + target, exists := m.targets[id] + if !exists { + notFound = append(notFound, id) + continue + } + + target.cancel() + delete(m.targets, id) + } + + // Notify callback of status change if any targets were removed + if len(notFound) != len(ids) && m.callback != nil { + go m.callback(m.GetTargets()) + } + + if len(notFound) > 0 { + return fmt.Errorf("targets not found: %v", notFound) + } + + return nil } -// getAllTargets returns a copy of all targets (internal method) -func (m *Monitor) getAllTargets() map[string]*Target { +// RemoveTargetsByID is a convenience method that accepts either a single ID or multiple IDs +func (m *Monitor) RemoveTargetsByID(ids ...string) error { + return m.RemoveTargets(ids) +} + +// GetTargets returns a copy of all targets +func (m *Monitor) GetTargets() map[string]*Target { m.mutex.RLock() defer m.mutex.RUnlock() + return m.getAllTargetsUnsafe() +} +// getAllTargetsUnsafe returns a copy of all targets without acquiring the mutex (internal method) +func (m *Monitor) getAllTargetsUnsafe() map[string]*Target { targets := make(map[string]*Target) for id, target := range m.targets { // Create a copy to avoid race conditions @@ -198,6 +256,11 @@ func (m *Monitor) getAllTargets() map[string]*Target { return targets } +// getAllTargets returns a copy of all targets (deprecated, use GetTargets) +func (m *Monitor) getAllTargets() map[string]*Target { + return m.GetTargets() +} + // monitorTarget monitors a single target func (m *Monitor) monitorTarget(target *Target) { // Initial check @@ -234,7 +297,7 @@ func (m *Monitor) monitorTarget(target *Target) { // Notify callback if status changed if oldStatus != target.Status && m.callback != nil { - go m.callback(m.getAllTargets()) + go m.callback(m.GetTargets()) } } } @@ -344,7 +407,7 @@ func (m *Monitor) DisableTarget(id string) error { // Notify callback of status change if m.callback != nil { - go m.callback(m.getAllTargets()) + go m.callback(m.GetTargets()) } } diff --git a/main.go b/main.go index e8e8896..440d802 100644 --- a/main.go +++ b/main.go @@ -30,11 +30,12 @@ import ( ) type WgData struct { - Endpoint string `json:"endpoint"` - PublicKey string `json:"publicKey"` - ServerIP string `json:"serverIP"` - TunnelIP string `json:"tunnelIP"` - Targets TargetsByType `json:"targets"` + Endpoint string `json:"endpoint"` + PublicKey string `json:"publicKey"` + ServerIP string `json:"serverIP"` + TunnelIP string `json:"tunnelIP"` + Targets TargetsByType `json:"targets"` + HealthCheckTargets []healthcheck.Config `json:"healthCheckTargets"` } type TargetsByType struct { @@ -449,6 +450,12 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub clientsAddProxyTarget(pm, wgData.TunnelIP) + if err := healthMonitor.AddTargets(wgData.HealthCheckTargets); err != nil { + logger.Error("Failed to bulk add health check targets: %v", err) + } else { + logger.Info("Successfully added %d health check targets", len(wgData.HealthCheckTargets)) + } + err = pm.Start() if err != nil { logger.Error("Failed to start proxy manager: %v", err) @@ -925,7 +932,12 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub client.RegisterHandler("newt/healthcheck/add", func(msg websocket.WSMessage) { logger.Debug("Received health check add request: %+v", msg) - var config healthcheck.Config + type HealthCheckConfig struct { + Targets []healthcheck.Config `json:"targets"` + } + + var config HealthCheckConfig + // add a bunch of targets at once jsonData, err := json.Marshal(msg.Data) if err != nil { logger.Error("Error marshaling health check data: %v", err) @@ -937,20 +949,24 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub return } - if err := healthMonitor.AddTarget(config); err != nil { - logger.Error("Failed to add health check target %s: %v", config.ID, err) + if err := healthMonitor.AddTargets(config.Targets); err != nil { + logger.Error("Failed to add health check targets: %v", err) } else { - logger.Info("Added health check target: %s", config.ID) + logger.Info("Added %d health check targets", len(config.Targets)) } + + logger.Debug("Health check targets added: %+v", config.Targets) }) // Register handler for removing health check targets client.RegisterHandler("newt/healthcheck/remove", func(msg websocket.WSMessage) { logger.Debug("Received health check remove request: %+v", msg) - var requestData struct { - ID string `json:"id"` + type HealthCheckConfig struct { + IDs []string `json:"ids"` } + + var requestData HealthCheckConfig jsonData, err := json.Marshal(msg.Data) if err != nil { logger.Error("Error marshaling health check remove data: %v", err) @@ -962,10 +978,11 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub return } - if err := healthMonitor.RemoveTarget(requestData.ID); err != nil { - logger.Error("Failed to remove health check target %s: %v", requestData.ID, err) + // Multiple target removal + if err := healthMonitor.RemoveTargets(requestData.IDs); err != nil { + logger.Error("Failed to remove health check targets %v: %v", requestData.IDs, err) } else { - logger.Info("Removed health check target: %s", requestData.ID) + logger.Info("Removed %d health check targets: %v", len(requestData.IDs), requestData.IDs) } }) From 289cce3a2253e8592e1aef51750b7a55a4838c94 Mon Sep 17 00:00:00 2001 From: Owen Date: Sun, 3 Aug 2025 18:43:43 -0700 Subject: [PATCH 3/5] Add health checks --- healthcheck/healthcheck.go | 38 +++++++++++++++++++------------------- main.go | 15 +++++++++------ websocket/client.go | 18 ++++++++++++++++++ 3 files changed, 46 insertions(+), 25 deletions(-) diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go index 092025d..4e06343 100644 --- a/healthcheck/healthcheck.go +++ b/healthcheck/healthcheck.go @@ -32,7 +32,7 @@ func (s Status) String() string { // Config holds the health check configuration for a target type Config struct { - ID string `json:"id"` + ID int `json:"id"` Enabled bool `json:"hcEnabled"` Path string `json:"hcPath"` Scheme string `json:"hcScheme"` @@ -59,11 +59,11 @@ type Target struct { } // StatusChangeCallback is called when any target's status changes -type StatusChangeCallback func(targets map[string]*Target) +type StatusChangeCallback func(targets map[int]*Target) // Monitor manages health check targets and their monitoring type Monitor struct { - targets map[string]*Target + targets map[int]*Target mutex sync.RWMutex callback StatusChangeCallback client *http.Client @@ -72,7 +72,7 @@ type Monitor struct { // NewMonitor creates a new health check monitor func NewMonitor(callback StatusChangeCallback) *Monitor { return &Monitor{ - targets: make(map[string]*Target), + targets: make(map[int]*Target), callback: callback, client: &http.Client{ Timeout: 30 * time.Second, @@ -118,7 +118,7 @@ func (m *Monitor) AddTargets(configs []Config) error { for _, config := range configs { if err := m.addTargetUnsafe(config); err != nil { - return fmt.Errorf("failed to add target %s: %v", config.ID, err) + return fmt.Errorf("failed to add target %d: %v", config.ID, err) } } @@ -183,13 +183,13 @@ func (m *Monitor) addTargetUnsafe(config Config) error { } // RemoveTarget removes a health check target -func (m *Monitor) RemoveTarget(id string) error { +func (m *Monitor) RemoveTarget(id int) error { m.mutex.Lock() defer m.mutex.Unlock() target, exists := m.targets[id] if !exists { - return fmt.Errorf("target with id %s not found", id) + return fmt.Errorf("target with id %d not found", id) } target.cancel() @@ -204,11 +204,11 @@ func (m *Monitor) RemoveTarget(id string) error { } // RemoveTargets removes multiple health check targets -func (m *Monitor) RemoveTargets(ids []string) error { +func (m *Monitor) RemoveTargets(ids []int) error { m.mutex.Lock() defer m.mutex.Unlock() - var notFound []string + var notFound []int for _, id := range ids { target, exists := m.targets[id] @@ -234,20 +234,20 @@ func (m *Monitor) RemoveTargets(ids []string) error { } // RemoveTargetsByID is a convenience method that accepts either a single ID or multiple IDs -func (m *Monitor) RemoveTargetsByID(ids ...string) error { +func (m *Monitor) RemoveTargetsByID(ids ...int) error { return m.RemoveTargets(ids) } // GetTargets returns a copy of all targets -func (m *Monitor) GetTargets() map[string]*Target { +func (m *Monitor) GetTargets() map[int]*Target { m.mutex.RLock() defer m.mutex.RUnlock() return m.getAllTargetsUnsafe() } // getAllTargetsUnsafe returns a copy of all targets without acquiring the mutex (internal method) -func (m *Monitor) getAllTargetsUnsafe() map[string]*Target { - targets := make(map[string]*Target) +func (m *Monitor) getAllTargetsUnsafe() map[int]*Target { + targets := make(map[int]*Target) for id, target := range m.targets { // Create a copy to avoid race conditions targetCopy := *target @@ -257,7 +257,7 @@ func (m *Monitor) getAllTargetsUnsafe() map[string]*Target { } // getAllTargets returns a copy of all targets (deprecated, use GetTargets) -func (m *Monitor) getAllTargets() map[string]*Target { +func (m *Monitor) getAllTargets() map[int]*Target { return m.GetTargets() } @@ -363,17 +363,17 @@ func (m *Monitor) Stop() { for _, target := range m.targets { target.cancel() } - m.targets = make(map[string]*Target) + m.targets = make(map[int]*Target) } // EnableTarget enables monitoring for a specific target -func (m *Monitor) EnableTarget(id string) error { +func (m *Monitor) EnableTarget(id int) error { m.mutex.Lock() defer m.mutex.Unlock() target, exists := m.targets[id] if !exists { - return fmt.Errorf("target with id %s not found", id) + return fmt.Errorf("target with id %d not found", id) } if !target.Config.Enabled { @@ -391,13 +391,13 @@ func (m *Monitor) EnableTarget(id string) error { } // DisableTarget disables monitoring for a specific target -func (m *Monitor) DisableTarget(id string) error { +func (m *Monitor) DisableTarget(id int) error { m.mutex.Lock() defer m.mutex.Unlock() target, exists := m.targets[id] if !exists { - return fmt.Errorf("target with id %s not found", id) + return fmt.Errorf("target with id %d not found", id) } if target.Config.Enabled { diff --git a/main.go b/main.go index 440d802..4f96897 100644 --- a/main.go +++ b/main.go @@ -338,6 +338,9 @@ func main() { connected = false } + // print out the data + logger.Debug("Received registration message data: %+v", msg.Data) + jsonData, err := json.Marshal(msg.Data) if err != nil { logger.Info("Error marshaling data: %v", err) @@ -905,11 +908,11 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub }) // Initialize health check monitor with status change callback - healthMonitor = healthcheck.NewMonitor(func(targets map[string]*healthcheck.Target) { + healthMonitor = healthcheck.NewMonitor(func(targets map[int]*healthcheck.Target) { logger.Debug("Health check status update for %d targets", len(targets)) // Send health status update to the server - healthStatuses := make(map[string]interface{}) + healthStatuses := make(map[int]interface{}) for id, target := range targets { healthStatuses[id] = map[string]interface{}{ "status": target.Status.String(), @@ -963,7 +966,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub logger.Debug("Received health check remove request: %+v", msg) type HealthCheckConfig struct { - IDs []string `json:"ids"` + IDs []int `json:"ids"` } var requestData HealthCheckConfig @@ -991,7 +994,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub logger.Debug("Received health check enable request: %+v", msg) var requestData struct { - ID string `json:"id"` + ID int `json:"id"` } jsonData, err := json.Marshal(msg.Data) if err != nil { @@ -1016,7 +1019,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub logger.Debug("Received health check disable request: %+v", msg) var requestData struct { - ID string `json:"id"` + ID int `json:"id"` } jsonData, err := json.Marshal(msg.Data) if err != nil { @@ -1041,7 +1044,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub logger.Debug("Received health check status request") targets := healthMonitor.GetTargets() - healthStatuses := make(map[string]interface{}) + healthStatuses := make(map[int]interface{}) for id, target := range targets { healthStatuses[id] = map[string]interface{}{ "status": target.Status.String(), diff --git a/websocket/client.go b/websocket/client.go index 8aea1f7..d1b5178 100644 --- a/websocket/client.go +++ b/websocket/client.go @@ -205,6 +205,15 @@ func (c *Client) getToken() (string, error) { } } + // Check for environment variable to skip TLS verification + if os.Getenv("SKIP_TLS_VERIFY") == "true" { + if tlsConfig == nil { + tlsConfig = &tls.Config{} + } + tlsConfig.InsecureSkipVerify = true + logger.Debug("TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") + } + var tokenData map[string]interface{} // Get a new token @@ -339,6 +348,15 @@ func (c *Client) establishConnection() error { } dialer.TLSClientConfig = tlsConfig } + + // Check for environment variable to skip TLS verification for WebSocket connection + if os.Getenv("SKIP_TLS_VERIFY") == "true" { + if dialer.TLSClientConfig == nil { + dialer.TLSClientConfig = &tls.Config{} + } + dialer.TLSClientConfig.InsecureSkipVerify = true + logger.Debug("WebSocket TLS certificate verification disabled via SKIP_TLS_VERIFY environment variable") + } conn, _, err := dialer.Dial(u.String(), nil) if err != nil { return fmt.Errorf("failed to connect to WebSocket: %w", err) From 28b6865f738b637817a76c8e9147323be7147d50 Mon Sep 17 00:00:00 2001 From: Owen Date: Mon, 11 Aug 2025 08:14:29 -0700 Subject: [PATCH 4/5] Healthcheck working --- healthcheck/healthcheck.go | 113 ++++++++++++++++++++++++++++++++----- main.go | 67 ++++++++++++++-------- 2 files changed, 141 insertions(+), 39 deletions(-) diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go index 4e06343..ae4a411 100644 --- a/healthcheck/healthcheck.go +++ b/healthcheck/healthcheck.go @@ -8,18 +8,20 @@ import ( "strings" "sync" "time" + + "github.com/fosrl/newt/logger" ) -// Status represents the health status of a target -type Status int +// Health represents the health status of a target +type Health int const ( - StatusUnknown Status = iota + StatusUnknown Health = iota StatusHealthy StatusUnhealthy ) -func (s Status) String() string { +func (s Health) String() string { switch s { case StatusHealthy: return "healthy" @@ -44,12 +46,13 @@ type Config struct { Timeout int `json:"hcTimeout"` // in seconds Headers map[string]string `json:"hcHeaders"` Method string `json:"hcMethod"` + Status int `json:"hcStatus"` // HTTP status code } // Target represents a health check target with its current status type Target struct { Config Config `json:"config"` - Status Status `json:"status"` + Status Health `json:"status"` LastCheck time.Time `json:"lastCheck"` LastError string `json:"lastError,omitempty"` CheckCount int `json:"checkCount"` @@ -71,6 +74,7 @@ type Monitor struct { // NewMonitor creates a new health check monitor func NewMonitor(callback StatusChangeCallback) *Monitor { + logger.Info("Creating new health check monitor") return &Monitor{ targets: make(map[int]*Target), callback: callback, @@ -108,6 +112,9 @@ func (m *Monitor) AddTarget(config Config) error { m.mutex.Lock() defer m.mutex.Unlock() + logger.Info("Adding health check target: ID=%d, hostname=%s, port=%d, enabled=%t", + config.ID, config.Hostname, config.Port, config.Enabled) + return m.addTargetUnsafe(config) } @@ -116,17 +123,20 @@ func (m *Monitor) AddTargets(configs []Config) error { m.mutex.Lock() defer m.mutex.Unlock() + logger.Info("Adding %d health check targets in bulk", len(configs)) + for _, config := range configs { if err := m.addTargetUnsafe(config); err != nil { + logger.Error("Failed to add target %d: %v", config.ID, err) return fmt.Errorf("failed to add target %d: %v", config.ID, err) } + logger.Debug("Successfully added target: ID=%d, hostname=%s", config.ID, config.Hostname) } - // Notify callback once after all targets are added - if m.callback != nil { - go m.callback(m.getAllTargetsUnsafe()) - } + // Don't notify callback immediately - let the initial health checks complete first + // The callback will be triggered when the first health check results are available + logger.Info("Successfully added all %d health check targets", len(configs)) return nil } @@ -152,6 +162,9 @@ func (m *Monitor) addTargetUnsafe(config Config) error { config.Timeout = 5 } + logger.Debug("Target %d configuration: scheme=%s, method=%s, interval=%ds, timeout=%ds", + config.ID, config.Scheme, config.Method, config.Interval, config.Timeout) + // Parse headers if provided as string if len(config.Headers) == 0 && config.Path != "" { // This is a simplified header parsing - in real use you might want more robust parsing @@ -160,6 +173,7 @@ func (m *Monitor) addTargetUnsafe(config Config) error { // Remove existing target if it exists if existing, exists := m.targets[config.ID]; exists { + logger.Info("Replacing existing target with ID %d", config.ID) existing.cancel() } @@ -176,7 +190,10 @@ func (m *Monitor) addTargetUnsafe(config Config) error { // Start monitoring if enabled if config.Enabled { + logger.Info("Starting monitoring for target %d (%s:%d)", config.ID, config.Hostname, config.Port) go m.monitorTarget(target) + } else { + logger.Debug("Target %d added but monitoring is disabled", config.ID) } return nil @@ -189,9 +206,11 @@ func (m *Monitor) RemoveTarget(id int) error { target, exists := m.targets[id] if !exists { + logger.Warn("Attempted to remove non-existent target with ID %d", id) return fmt.Errorf("target with id %d not found", id) } + logger.Info("Removing health check target: ID=%d", id) target.cancel() delete(m.targets, id) @@ -200,6 +219,7 @@ func (m *Monitor) RemoveTarget(id int) error { go m.callback(m.GetTargets()) } + logger.Info("Successfully removed target %d", id) return nil } @@ -208,25 +228,32 @@ func (m *Monitor) RemoveTargets(ids []int) error { m.mutex.Lock() defer m.mutex.Unlock() + logger.Info("Removing %d health check targets", len(ids)) var notFound []int for _, id := range ids { target, exists := m.targets[id] if !exists { notFound = append(notFound, id) + logger.Warn("Target with ID %d not found during bulk removal", id) continue } + logger.Debug("Removing target %d", id) target.cancel() delete(m.targets, id) } + removedCount := len(ids) - len(notFound) + logger.Info("Successfully removed %d targets", removedCount) + // Notify callback of status change if any targets were removed if len(notFound) != len(ids) && m.callback != nil { go m.callback(m.GetTargets()) } if len(notFound) > 0 { + logger.Error("Some targets not found during removal: %v", notFound) return fmt.Errorf("targets not found: %v", notFound) } @@ -263,21 +290,33 @@ func (m *Monitor) getAllTargets() map[int]*Target { // monitorTarget monitors a single target func (m *Monitor) monitorTarget(target *Target) { + logger.Info("Starting health check monitoring for target %d (%s:%d)", + target.Config.ID, target.Config.Hostname, target.Config.Port) + // Initial check + oldStatus := target.Status m.performHealthCheck(target) + // Notify callback after initial check if status changed or if it's the first check + if (oldStatus != target.Status || oldStatus == StatusUnknown) && m.callback != nil { + logger.Info("Target %d initial status: %s", target.Config.ID, target.Status.String()) + go m.callback(m.GetTargets()) + } + // Set up ticker based on current status interval := time.Duration(target.Config.Interval) * time.Second if target.Status == StatusUnhealthy { interval = time.Duration(target.Config.UnhealthyInterval) * time.Second } + logger.Debug("Target %d: initial check interval set to %v", target.Config.ID, interval) target.ticker = time.NewTicker(interval) defer target.ticker.Stop() for { select { case <-target.ctx.Done(): + logger.Info("Stopping health check monitoring for target %d", target.Config.ID) return case <-target.ticker.C: oldStatus := target.Status @@ -290,6 +329,8 @@ func (m *Monitor) monitorTarget(target *Target) { } if newInterval != interval { + logger.Debug("Target %d: updating check interval from %v to %v due to status change", + target.Config.ID, interval, newInterval) target.ticker.Stop() target.ticker = time.NewTicker(newInterval) interval = newInterval @@ -297,6 +338,8 @@ func (m *Monitor) monitorTarget(target *Target) { // Notify callback if status changed if oldStatus != target.Status && m.callback != nil { + logger.Info("Target %d status changed: %s -> %s", + target.Config.ID, oldStatus.String(), target.Status.String()) go m.callback(m.GetTargets()) } } @@ -321,6 +364,9 @@ func (m *Monitor) performHealthCheck(target *Target) { url += target.Config.Path } + logger.Debug("Target %d: performing health check %d to %s", + target.Config.ID, target.CheckCount, url) + // Create request ctx, cancel := context.WithTimeout(context.Background(), time.Duration(target.Config.Timeout)*time.Second) defer cancel() @@ -329,6 +375,7 @@ func (m *Monitor) performHealthCheck(target *Target) { if err != nil { target.Status = StatusUnhealthy target.LastError = fmt.Sprintf("failed to create request: %v", err) + logger.Warn("Target %d: failed to create request: %v", target.Config.ID, err) return } @@ -342,16 +389,40 @@ func (m *Monitor) performHealthCheck(target *Target) { if err != nil { target.Status = StatusUnhealthy target.LastError = fmt.Sprintf("request failed: %v", err) + logger.Warn("Target %d: health check failed: %v", target.Config.ID, err) return } defer resp.Body.Close() // Check response status - if resp.StatusCode >= 200 && resp.StatusCode < 300 { - target.Status = StatusHealthy + var expectedStatus int + if target.Config.Status > 0 { + expectedStatus = target.Config.Status } else { - target.Status = StatusUnhealthy - target.LastError = fmt.Sprintf("unhealthy status code: %d", resp.StatusCode) + expectedStatus = 0 // Use range check for 200-299 + } + + if expectedStatus > 0 { + logger.Debug("Target %d: checking health status against expected code %d", target.Config.ID, expectedStatus) + // Check for specific status code + if resp.StatusCode == expectedStatus { + target.Status = StatusHealthy + logger.Debug("Target %d: health check passed (status: %d, expected: %d)", target.Config.ID, resp.StatusCode, expectedStatus) + } else { + target.Status = StatusUnhealthy + target.LastError = fmt.Sprintf("unexpected status code: %d (expected: %d)", resp.StatusCode, expectedStatus) + logger.Warn("Target %d: health check failed with status code %d (expected: %d)", target.Config.ID, resp.StatusCode, expectedStatus) + } + } else { + // Check for 2xx range + if resp.StatusCode >= 200 && resp.StatusCode < 300 { + target.Status = StatusHealthy + logger.Debug("Target %d: health check passed (status: %d)", target.Config.ID, resp.StatusCode) + } else { + target.Status = StatusUnhealthy + target.LastError = fmt.Sprintf("unhealthy status code: %d", resp.StatusCode) + logger.Warn("Target %d: health check failed with status code %d", target.Config.ID, resp.StatusCode) + } } } @@ -360,10 +431,16 @@ func (m *Monitor) Stop() { m.mutex.Lock() defer m.mutex.Unlock() - for _, target := range m.targets { + targetCount := len(m.targets) + logger.Info("Stopping health check monitor with %d targets", targetCount) + + for id, target := range m.targets { + logger.Debug("Stopping monitoring for target %d", id) target.cancel() } m.targets = make(map[int]*Target) + + logger.Info("Health check monitor stopped") } // EnableTarget enables monitoring for a specific target @@ -373,10 +450,12 @@ func (m *Monitor) EnableTarget(id int) error { target, exists := m.targets[id] if !exists { + logger.Warn("Attempted to enable non-existent target with ID %d", id) return fmt.Errorf("target with id %d not found", id) } if !target.Config.Enabled { + logger.Info("Enabling health check monitoring for target %d", id) target.Config.Enabled = true target.cancel() // Stop existing monitoring @@ -385,6 +464,8 @@ func (m *Monitor) EnableTarget(id int) error { target.cancel = cancel go m.monitorTarget(target) + } else { + logger.Debug("Target %d is already enabled", id) } return nil @@ -397,10 +478,12 @@ func (m *Monitor) DisableTarget(id int) error { target, exists := m.targets[id] if !exists { + logger.Warn("Attempted to disable non-existent target with ID %d", id) return fmt.Errorf("target with id %d not found", id) } if target.Config.Enabled { + logger.Info("Disabling health check monitoring for target %d", id) target.Config.Enabled = false target.cancel() target.Status = StatusUnknown @@ -409,6 +492,8 @@ func (m *Monitor) DisableTarget(id int) error { if m.callback != nil { go m.callback(m.GetTargets()) } + } else { + logger.Debug("Target %d is already disabled", id) } return nil diff --git a/main.go b/main.go index 4f96897..2db6160 100644 --- a/main.go +++ b/main.go @@ -101,6 +101,7 @@ var ( healthFile string useNativeInterface bool authorizedKeysFile string + preferEndpoint string healthMonitor *healthcheck.Monitor ) @@ -172,6 +173,9 @@ func main() { if pingTimeoutStr == "" { flag.StringVar(&pingTimeoutStr, "ping-timeout", "5s", " Timeout for each ping (default 5s)") } + // load the prefer endpoint just as a flag + flag.StringVar(&preferEndpoint, "prefer-endpoint", "", "Prefer this endpoint for the connection (if set, will override the endpoint from the server)") + // if authorizedKeysFile == "" { // flag.StringVar(&authorizedKeysFile, "authorized-keys-file", "~/.ssh/authorized_keys", "Path to authorized keys file (if unset, no keys will be authorized)") // } @@ -291,6 +295,33 @@ func main() { setupClients(client) } + // Initialize health check monitor with status change callback + healthMonitor = healthcheck.NewMonitor(func(targets map[int]*healthcheck.Target) { + logger.Debug("Health check status update for %d targets", len(targets)) + + // Send health status update to the server + healthStatuses := make(map[int]interface{}) + for id, target := range targets { + healthStatuses[id] = map[string]interface{}{ + "status": target.Status.String(), + "lastCheck": target.LastCheck.Format(time.RFC3339), + "checkCount": target.CheckCount, + "lastError": target.LastError, + "config": target.Config, + } + } + + // print the status of the targets + logger.Debug("Health check status: %+v", healthStatuses) + + err := client.SendMessage("newt/healthcheck/status", map[string]interface{}{ + "targets": healthStatuses, + }) + if err != nil { + logger.Error("Failed to send health check status update: %v", err) + } + }) + var pingWithRetryStopChan chan struct{} closeWgTunnel := func() { @@ -529,9 +560,19 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub } // If there is just one exit node, we can skip pinging it and use it directly - if len(exitNodes) == 1 { + if len(exitNodes) == 1 || preferEndpoint != "" { logger.Debug("Only one exit node available, using it directly: %s", exitNodes[0].Endpoint) + // if the preferEndpoint is set, we will use it instead of the exit node endpoint. first you need to find the exit node with that endpoint in the list and send that one + if preferEndpoint != "" { + for _, node := range exitNodes { + if node.Endpoint == preferEndpoint { + exitNodes[0] = node + break + } + } + } + // Prepare data to send to the cloud for selection pingResults := []ExitNodePingResult{ { @@ -907,30 +948,6 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub logger.Info("SSH public key appended to authorized keys file") }) - // Initialize health check monitor with status change callback - healthMonitor = healthcheck.NewMonitor(func(targets map[int]*healthcheck.Target) { - logger.Debug("Health check status update for %d targets", len(targets)) - - // Send health status update to the server - healthStatuses := make(map[int]interface{}) - for id, target := range targets { - healthStatuses[id] = map[string]interface{}{ - "status": target.Status.String(), - "lastCheck": target.LastCheck.Format(time.RFC3339), - "checkCount": target.CheckCount, - "lastError": target.LastError, - "config": target.Config, - } - } - - err := client.SendMessage("newt/healthcheck/status", map[string]interface{}{ - "targets": healthStatuses, - }) - if err != nil { - logger.Error("Failed to send health check status update: %v", err) - } - }) - // Register handler for adding health check targets client.RegisterHandler("newt/healthcheck/add", func(msg websocket.WSMessage) { logger.Debug("Received health check add request: %+v", msg) From d013dc05438a9bca30c644566881346d430a1659 Mon Sep 17 00:00:00 2001 From: Owen Date: Wed, 13 Aug 2025 14:18:47 -0700 Subject: [PATCH 5/5] Adjust logging --- healthcheck/healthcheck.go | 4 ++-- main.go | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/healthcheck/healthcheck.go b/healthcheck/healthcheck.go index ae4a411..523d34e 100644 --- a/healthcheck/healthcheck.go +++ b/healthcheck/healthcheck.go @@ -123,7 +123,7 @@ func (m *Monitor) AddTargets(configs []Config) error { m.mutex.Lock() defer m.mutex.Unlock() - logger.Info("Adding %d health check targets in bulk", len(configs)) + logger.Debug("Adding %d health check targets in bulk", len(configs)) for _, config := range configs { if err := m.addTargetUnsafe(config); err != nil { @@ -136,7 +136,7 @@ func (m *Monitor) AddTargets(configs []Config) error { // Don't notify callback immediately - let the initial health checks complete first // The callback will be triggered when the first health check results are available - logger.Info("Successfully added all %d health check targets", len(configs)) + logger.Debug("Successfully added all %d health check targets", len(configs)) return nil } diff --git a/main.go b/main.go index 2db6160..8c6ae69 100644 --- a/main.go +++ b/main.go @@ -448,7 +448,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub if err != nil { logger.Warn("Initial reliable ping failed, but continuing: %v", err) } else { - logger.Info("Initial connection test successful!") + logger.Info("Initial connection test successful") } pingWithRetryStopChan, _ = pingWithRetry(tnet, wgData.ServerIP, pingTimeout)