mirror of
https://github.com/fosrl/newt.git
synced 2026-03-26 20:46:41 +00:00
Merge branch 'main' into otel
This commit is contained in:
106
main.go
106
main.go
@@ -79,6 +79,11 @@ type ExitNodePingResult struct {
|
||||
WasPreviouslyConnected bool `json:"wasPreviouslyConnected"`
|
||||
}
|
||||
|
||||
type BlueprintResult struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message,omitempty"`
|
||||
}
|
||||
|
||||
// Custom flag type for multiple CA files
|
||||
type stringSlice []string
|
||||
|
||||
@@ -137,6 +142,8 @@ var (
|
||||
adminAddr string
|
||||
region string
|
||||
metricsAsyncBytes bool
|
||||
blueprintFile string
|
||||
noCloud bool
|
||||
|
||||
// New mTLS configuration variables
|
||||
tlsClientCert string
|
||||
@@ -175,10 +182,12 @@ func main() {
|
||||
asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES")
|
||||
|
||||
keepInterface = keepInterfaceEnv == "true"
|
||||
acceptClientsEnv := os.Getenv("ACCEPT_CLIENTS")
|
||||
acceptClients = acceptClientsEnv == "true"
|
||||
useNativeInterfaceEnv := os.Getenv("USE_NATIVE_INTERFACE")
|
||||
useNativeInterface = useNativeInterfaceEnv == "true"
|
||||
enforceHealthcheckCertEnv := os.Getenv("ENFORCE_HC_CERT")
|
||||
enforceHealthcheckCert = enforceHealthcheckCertEnv == "true"
|
||||
|
||||
dockerSocket = os.Getenv("DOCKER_SOCKET")
|
||||
pingIntervalStr := os.Getenv("PING_INTERVAL")
|
||||
pingTimeoutStr := os.Getenv("PING_TIMEOUT")
|
||||
@@ -202,9 +211,12 @@ func main() {
|
||||
// Legacy PKCS12 support (deprecated)
|
||||
tlsPrivateKey = os.Getenv("TLS_CLIENT_CERT_PKCS12")
|
||||
// Keep backward compatibility with old environment variable name
|
||||
if tlsPrivateKey == "" {
|
||||
if tlsPrivateKey == "" && tlsClientKey == "" && len(tlsClientCAs) == 0 {
|
||||
tlsPrivateKey = os.Getenv("TLS_CLIENT_CERT")
|
||||
}
|
||||
blueprintFile = os.Getenv("BLUEPRINT_FILE")
|
||||
noCloudEnv := os.Getenv("NO_CLOUD")
|
||||
noCloud = noCloudEnv == "true"
|
||||
|
||||
if endpoint == "" {
|
||||
flag.StringVar(&endpoint, "endpoint", "", "Endpoint of your pangolin server")
|
||||
@@ -304,6 +316,12 @@ func main() {
|
||||
if healthFile == "" {
|
||||
flag.StringVar(&healthFile, "health-file", "", "Path to health file (if unset, health file won't be written)")
|
||||
}
|
||||
if blueprintFile == "" {
|
||||
flag.StringVar(&blueprintFile, "blueprint-file", "", "Path to blueprint file (if unset, no blueprint will be applied)")
|
||||
}
|
||||
if noCloudEnv == "" {
|
||||
flag.BoolVar(&noCloud, "no-cloud", false, "Disable cloud failover")
|
||||
}
|
||||
|
||||
// Metrics/observability flags (mirror ENV if unset)
|
||||
if metricsEnabledEnv == "" {
|
||||
@@ -520,6 +538,7 @@ func main() {
|
||||
var pm *proxy.ProxyManager
|
||||
var connected bool
|
||||
var wgData WgData
|
||||
var dockerEventMonitor *docker.EventMonitor
|
||||
|
||||
if acceptClients {
|
||||
setupClients(client)
|
||||
@@ -585,7 +604,7 @@ func main() {
|
||||
|
||||
// Register handlers for different message types
|
||||
client.RegisterHandler("newt/wg/connect", func(msg websocket.WSMessage) {
|
||||
logger.Info("Received registration message")
|
||||
logger.Debug("Received registration message")
|
||||
regResult := "success"
|
||||
defer func() {
|
||||
telemetry.IncSiteRegistration(ctx, regResult)
|
||||
@@ -693,7 +712,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
logger.Warn("Initial reliable ping failed, but continuing: %v", err)
|
||||
regResult = "failure"
|
||||
} else {
|
||||
logger.Info("Initial connection test successful")
|
||||
logger.Debug("Initial connection test successful")
|
||||
}
|
||||
|
||||
pingWithRetryStopChan, _ = pingWithRetry(tnet, wgData.ServerIP, pingTimeout)
|
||||
@@ -735,7 +754,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
if err := healthMonitor.AddTargets(wgData.HealthCheckTargets); err != nil {
|
||||
logger.Error("Failed to bulk add health check targets: %v", err)
|
||||
} else {
|
||||
logger.Info("Successfully added %d health check targets", len(wgData.HealthCheckTargets))
|
||||
logger.Debug("Successfully added %d health check targets", len(wgData.HealthCheckTargets))
|
||||
}
|
||||
|
||||
err = pm.Start()
|
||||
@@ -768,7 +787,9 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
}
|
||||
|
||||
// Request exit nodes from the server
|
||||
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second)
|
||||
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{
|
||||
"noCloud": noCloud,
|
||||
}, 3*time.Second)
|
||||
|
||||
logger.Info("Tunnel destroyed, ready for reconnection")
|
||||
})
|
||||
@@ -794,7 +815,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
})
|
||||
|
||||
client.RegisterHandler("newt/ping/exitNodes", func(msg websocket.WSMessage) {
|
||||
logger.Info("Received ping message")
|
||||
logger.Debug("Received ping message")
|
||||
if stopFunc != nil {
|
||||
stopFunc() // stop the ws from sending more requests
|
||||
stopFunc = nil // reset stopFunc to nil to avoid double stopping
|
||||
@@ -1085,7 +1106,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
if err != nil {
|
||||
logger.Error("Failed to send Docker socket check response: %v", err)
|
||||
} else {
|
||||
logger.Info("Docker socket check response sent: available=%t", isAvailable)
|
||||
logger.Debug("Docker socket check response sent: available=%t", isAvailable)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -1116,7 +1137,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
if err != nil {
|
||||
logger.Error("Failed to send Docker container list: %v", err)
|
||||
} else {
|
||||
logger.Info("Docker container list sent, count: %d", len(containers))
|
||||
logger.Debug("Docker container list sent, count: %d", len(containers))
|
||||
}
|
||||
})
|
||||
|
||||
@@ -1232,7 +1253,7 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
if err := healthMonitor.AddTargets(config.Targets); err != nil {
|
||||
logger.Error("Failed to add health check targets: %v", err)
|
||||
} else {
|
||||
logger.Info("Added %d health check targets", len(config.Targets))
|
||||
logger.Debug("Added %d health check targets", len(config.Targets))
|
||||
}
|
||||
|
||||
logger.Debug("Health check targets added: %+v", config.Targets)
|
||||
@@ -1340,6 +1361,29 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
}
|
||||
})
|
||||
|
||||
// Register handler for getting health check status
|
||||
client.RegisterHandler("newt/blueprint/results", func(msg websocket.WSMessage) {
|
||||
logger.Debug("Received blueprint results message")
|
||||
|
||||
var blueprintResult BlueprintResult
|
||||
|
||||
jsonData, err := json.Marshal(msg.Data)
|
||||
if err != nil {
|
||||
logger.Info("Error marshaling data: %v", err)
|
||||
return
|
||||
}
|
||||
if err := json.Unmarshal(jsonData, &blueprintResult); err != nil {
|
||||
logger.Info("Error unmarshaling config results data: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if blueprintResult.Success {
|
||||
logger.Debug("Blueprint applied successfully!")
|
||||
} else {
|
||||
logger.Warn("Blueprint application failed: %s", blueprintResult.Message)
|
||||
}
|
||||
})
|
||||
|
||||
client.OnConnect(func() error {
|
||||
publicKey = privateKey.PublicKey()
|
||||
logger.Debug("Public key: %s", publicKey)
|
||||
@@ -1350,9 +1394,11 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
if stopFunc != nil {
|
||||
stopFunc()
|
||||
}
|
||||
// request from the server the list of nodes to ping at newt/ping/request
|
||||
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{}, 3*time.Second)
|
||||
logger.Info("Requesting exit nodes from server")
|
||||
// request from the server the list of nodes to ping
|
||||
stopFunc = client.SendMessageInterval("newt/ping/request", map[string]interface{}{
|
||||
"noCloud": noCloud,
|
||||
}, 3*time.Second)
|
||||
logger.Debug("Requesting exit nodes from server")
|
||||
clientsOnConnect()
|
||||
}
|
||||
|
||||
@@ -1363,6 +1409,8 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
"backwardsCompatible": true,
|
||||
})
|
||||
|
||||
sendBlueprint(client)
|
||||
|
||||
if err != nil {
|
||||
logger.Error("Failed to send registration message: %v", err)
|
||||
return err
|
||||
@@ -1377,6 +1425,34 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
}
|
||||
defer client.Close()
|
||||
|
||||
// Initialize Docker event monitoring if Docker socket is available and monitoring is enabled
|
||||
if dockerSocket != "" {
|
||||
logger.Debug("Initializing Docker event monitoring")
|
||||
dockerEventMonitor, err = docker.NewEventMonitor(dockerSocket, dockerEnforceNetworkValidationBool, func(containers []docker.Container) {
|
||||
// Send updated container list via websocket when Docker events occur
|
||||
logger.Debug("Docker event detected, sending updated container list (%d containers)", len(containers))
|
||||
err := client.SendMessage("newt/socket/containers", map[string]interface{}{
|
||||
"containers": containers,
|
||||
})
|
||||
if err != nil {
|
||||
logger.Error("Failed to send updated container list after Docker event: %v", err)
|
||||
} else {
|
||||
logger.Debug("Updated container list sent successfully")
|
||||
}
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
logger.Error("Failed to create Docker event monitor: %v", err)
|
||||
} else {
|
||||
err = dockerEventMonitor.Start()
|
||||
if err != nil {
|
||||
logger.Error("Failed to start Docker event monitoring: %v", err)
|
||||
} else {
|
||||
logger.Debug("Docker event monitoring started successfully")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Wait for interrupt signal
|
||||
sigCh := make(chan os.Signal, 1)
|
||||
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
|
||||
@@ -1385,6 +1461,10 @@ persistent_keepalive_interval=5`, fixKey(privateKey.String()), fixKey(wgData.Pub
|
||||
// Close clients first (including WGTester)
|
||||
closeClients()
|
||||
|
||||
if dockerEventMonitor != nil {
|
||||
dockerEventMonitor.Stop()
|
||||
}
|
||||
|
||||
if healthMonitor != nil {
|
||||
healthMonitor.Stop()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user