mirror of
https://github.com/fosrl/newt.git
synced 2026-03-27 04:56:41 +00:00
Compare commits
6 Commits
dependabot
...
dev
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a2683eb385 | ||
|
|
d3722c2519 | ||
|
|
8fda35db4f | ||
|
|
de4353f2e6 | ||
|
|
13448f76aa | ||
|
|
836144aebf |
@@ -5,7 +5,9 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -365,11 +367,12 @@ func (m *Monitor) performHealthCheck(target *Target) {
|
|||||||
target.LastCheck = time.Now()
|
target.LastCheck = time.Now()
|
||||||
target.LastError = ""
|
target.LastError = ""
|
||||||
|
|
||||||
// Build URL
|
// Build URL (use net.JoinHostPort to properly handle IPv6 addresses with ports)
|
||||||
url := fmt.Sprintf("%s://%s", target.Config.Scheme, target.Config.Hostname)
|
host := target.Config.Hostname
|
||||||
if target.Config.Port > 0 {
|
if target.Config.Port > 0 {
|
||||||
url = fmt.Sprintf("%s:%d", url, target.Config.Port)
|
host = net.JoinHostPort(target.Config.Hostname, strconv.Itoa(target.Config.Port))
|
||||||
}
|
}
|
||||||
|
url := fmt.Sprintf("%s://%s", target.Config.Scheme, host)
|
||||||
if target.Config.Path != "" {
|
if target.Config.Path != "" {
|
||||||
if !strings.HasPrefix(target.Config.Path, "/") {
|
if !strings.HasPrefix(target.Config.Path, "/") {
|
||||||
url += "/"
|
url += "/"
|
||||||
|
|||||||
19
main.go
19
main.go
@@ -10,6 +10,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"net/http/pprof"
|
||||||
"net/netip"
|
"net/netip"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
@@ -147,6 +148,7 @@ var (
|
|||||||
adminAddr string
|
adminAddr string
|
||||||
region string
|
region string
|
||||||
metricsAsyncBytes bool
|
metricsAsyncBytes bool
|
||||||
|
pprofEnabled bool
|
||||||
blueprintFile string
|
blueprintFile string
|
||||||
noCloud bool
|
noCloud bool
|
||||||
|
|
||||||
@@ -225,6 +227,7 @@ func runNewtMain(ctx context.Context) {
|
|||||||
adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR")
|
adminAddrEnv := os.Getenv("NEWT_ADMIN_ADDR")
|
||||||
regionEnv := os.Getenv("NEWT_REGION")
|
regionEnv := os.Getenv("NEWT_REGION")
|
||||||
asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES")
|
asyncBytesEnv := os.Getenv("NEWT_METRICS_ASYNC_BYTES")
|
||||||
|
pprofEnabledEnv := os.Getenv("NEWT_PPROF_ENABLED")
|
||||||
|
|
||||||
disableClientsEnv := os.Getenv("DISABLE_CLIENTS")
|
disableClientsEnv := os.Getenv("DISABLE_CLIENTS")
|
||||||
disableClients = disableClientsEnv == "true"
|
disableClients = disableClientsEnv == "true"
|
||||||
@@ -390,6 +393,14 @@ func runNewtMain(ctx context.Context) {
|
|||||||
metricsAsyncBytes = v
|
metricsAsyncBytes = v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// pprof debug endpoint toggle
|
||||||
|
if pprofEnabledEnv == "" {
|
||||||
|
flag.BoolVar(&pprofEnabled, "pprof", false, "Enable pprof debug endpoints on admin server")
|
||||||
|
} else {
|
||||||
|
if v, err := strconv.ParseBool(pprofEnabledEnv); err == nil {
|
||||||
|
pprofEnabled = v
|
||||||
|
}
|
||||||
|
}
|
||||||
// Optional region flag (resource attribute)
|
// Optional region flag (resource attribute)
|
||||||
if regionEnv == "" {
|
if regionEnv == "" {
|
||||||
flag.StringVar(®ion, "region", "", "Optional region resource attribute (also NEWT_REGION)")
|
flag.StringVar(®ion, "region", "", "Optional region resource attribute (also NEWT_REGION)")
|
||||||
@@ -485,6 +496,14 @@ func runNewtMain(ctx context.Context) {
|
|||||||
if tel.PrometheusHandler != nil {
|
if tel.PrometheusHandler != nil {
|
||||||
mux.Handle("/metrics", tel.PrometheusHandler)
|
mux.Handle("/metrics", tel.PrometheusHandler)
|
||||||
}
|
}
|
||||||
|
if pprofEnabled {
|
||||||
|
mux.HandleFunc("/debug/pprof/", pprof.Index)
|
||||||
|
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
|
||||||
|
mux.HandleFunc("/debug/pprof/profile", pprof.Profile)
|
||||||
|
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
|
||||||
|
mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
|
||||||
|
logger.Info("pprof debugging enabled on %s/debug/pprof/", tcfg.AdminAddr)
|
||||||
|
}
|
||||||
admin := &http.Server{
|
admin := &http.Server{
|
||||||
Addr: tcfg.AdminAddr,
|
Addr: tcfg.AdminAddr,
|
||||||
Handler: otelhttp.NewHandler(mux, "newt-admin"),
|
Handler: otelhttp.NewHandler(mux, "newt-admin"),
|
||||||
|
|||||||
@@ -21,7 +21,10 @@ import (
|
|||||||
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
|
"gvisor.dev/gvisor/pkg/tcpip/adapters/gonet"
|
||||||
)
|
)
|
||||||
|
|
||||||
const errUnsupportedProtoFmt = "unsupported protocol: %s"
|
const (
|
||||||
|
errUnsupportedProtoFmt = "unsupported protocol: %s"
|
||||||
|
maxUDPPacketSize = 65507
|
||||||
|
)
|
||||||
|
|
||||||
// Target represents a proxy target with its address and port
|
// Target represents a proxy target with its address and port
|
||||||
type Target struct {
|
type Target struct {
|
||||||
@@ -105,14 +108,10 @@ func classifyProxyError(err error) string {
|
|||||||
if errors.Is(err, net.ErrClosed) {
|
if errors.Is(err, net.ErrClosed) {
|
||||||
return "closed"
|
return "closed"
|
||||||
}
|
}
|
||||||
if ne, ok := err.(net.Error); ok {
|
var ne net.Error
|
||||||
if ne.Timeout() {
|
if errors.As(err, &ne) && ne.Timeout() {
|
||||||
return "timeout"
|
return "timeout"
|
||||||
}
|
}
|
||||||
if ne.Temporary() {
|
|
||||||
return "temporary"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
msg := strings.ToLower(err.Error())
|
msg := strings.ToLower(err.Error())
|
||||||
switch {
|
switch {
|
||||||
case strings.Contains(msg, "refused"):
|
case strings.Contains(msg, "refused"):
|
||||||
@@ -437,14 +436,6 @@ func (pm *ProxyManager) Stop() error {
|
|||||||
pm.udpConns = append(pm.udpConns[:i], pm.udpConns[i+1:]...)
|
pm.udpConns = append(pm.udpConns[:i], pm.udpConns[i+1:]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
// // Clear the target maps
|
|
||||||
// for k := range pm.tcpTargets {
|
|
||||||
// delete(pm.tcpTargets, k)
|
|
||||||
// }
|
|
||||||
// for k := range pm.udpTargets {
|
|
||||||
// delete(pm.udpTargets, k)
|
|
||||||
// }
|
|
||||||
|
|
||||||
// Give active connections a chance to close gracefully
|
// Give active connections a chance to close gracefully
|
||||||
time.Sleep(100 * time.Millisecond)
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
@@ -498,7 +489,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
|
|||||||
if !pm.running {
|
if !pm.running {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
if ne, ok := err.(net.Error); ok && !ne.Temporary() {
|
if errors.Is(err, net.ErrClosed) {
|
||||||
logger.Info("TCP listener closed, stopping proxy handler for %v", listener.Addr())
|
logger.Info("TCP listener closed, stopping proxy handler for %v", listener.Addr())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -564,7 +555,7 @@ func (pm *ProxyManager) handleTCPProxy(listener net.Listener, targetAddr string)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
||||||
buffer := make([]byte, 65507) // Max UDP packet size
|
buffer := make([]byte, maxUDPPacketSize) // Max UDP packet size
|
||||||
clientConns := make(map[string]*net.UDPConn)
|
clientConns := make(map[string]*net.UDPConn)
|
||||||
var clientsMutex sync.RWMutex
|
var clientsMutex sync.RWMutex
|
||||||
|
|
||||||
@@ -583,7 +574,7 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Check for connection closed conditions
|
// Check for connection closed conditions
|
||||||
if err == io.EOF || strings.Contains(err.Error(), "use of closed network connection") {
|
if errors.Is(err, io.EOF) || errors.Is(err, net.ErrClosed) {
|
||||||
logger.Info("UDP connection closed, stopping proxy handler")
|
logger.Info("UDP connection closed, stopping proxy handler")
|
||||||
|
|
||||||
// Clean up existing client connections
|
// Clean up existing client connections
|
||||||
@@ -662,10 +653,14 @@ func (pm *ProxyManager) handleUDPProxy(conn *gonet.UDPConn, targetAddr string) {
|
|||||||
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionClosed)
|
telemetry.IncProxyConnectionEvent(context.Background(), tunnelID, "udp", telemetry.ProxyConnectionClosed)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
buffer := make([]byte, 65507)
|
buffer := make([]byte, maxUDPPacketSize)
|
||||||
for {
|
for {
|
||||||
n, _, err := targetConn.ReadFromUDP(buffer)
|
n, _, err := targetConn.ReadFromUDP(buffer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
// Connection closed is normal during cleanup
|
||||||
|
if errors.Is(err, net.ErrClosed) || errors.Is(err, io.EOF) {
|
||||||
|
return // defer will handle cleanup, result stays "success"
|
||||||
|
}
|
||||||
logger.Error("Error reading from target: %v", err)
|
logger.Error("Error reading from target: %v", err)
|
||||||
result = "failure"
|
result = "failure"
|
||||||
return // defer will handle cleanup
|
return // defer will handle cleanup
|
||||||
|
|||||||
Reference in New Issue
Block a user