mirror of
https://github.com/fosrl/gerbil.git
synced 2026-03-27 04:56:42 +00:00
Merge branch 'dev' of github.com:fosrl/gerbil into dev
This commit is contained in:
@@ -9,6 +9,7 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"runtime"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -162,6 +163,9 @@ type UDPProxyServer struct {
|
|||||||
commPatterns sync.Map
|
commPatterns sync.Map
|
||||||
// Rate limiter for encrypted hole punch messages, keyed by "ip:port"
|
// Rate limiter for encrypted hole punch messages, keyed by "ip:port"
|
||||||
holePunchRateLimiter sync.Map
|
holePunchRateLimiter sync.Map
|
||||||
|
// Cache for resolved UDP addresses to avoid per-packet DNS lookups
|
||||||
|
// Key: "ip:port" string, Value: *net.UDPAddr
|
||||||
|
addrCache sync.Map
|
||||||
// ReachableAt is the URL where this server can be reached
|
// ReachableAt is the URL where this server can be reached
|
||||||
ReachableAt string
|
ReachableAt string
|
||||||
}
|
}
|
||||||
@@ -173,7 +177,7 @@ func NewUDPProxyServer(parentCtx context.Context, addr, serverURL string, privat
|
|||||||
addr: addr,
|
addr: addr,
|
||||||
serverURL: serverURL,
|
serverURL: serverURL,
|
||||||
privateKey: privateKey,
|
privateKey: privateKey,
|
||||||
packetChan: make(chan Packet, 1000),
|
packetChan: make(chan Packet, 50000), // Increased from 1000 to handle high throughput
|
||||||
ReachableAt: reachableAt,
|
ReachableAt: reachableAt,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
cancel: cancel,
|
cancel: cancel,
|
||||||
@@ -198,8 +202,13 @@ func (s *UDPProxyServer) Start() error {
|
|||||||
s.conn = conn
|
s.conn = conn
|
||||||
logger.Info("UDP server listening on %s", s.addr)
|
logger.Info("UDP server listening on %s", s.addr)
|
||||||
|
|
||||||
// Start a fixed number of worker goroutines.
|
// Start worker goroutines based on CPU cores for better parallelism
|
||||||
workerCount := 10 // TODO: Make this configurable or pick it better!
|
// At high throughput (160+ Mbps), we need many workers to avoid bottlenecks
|
||||||
|
workerCount := runtime.NumCPU() * 10
|
||||||
|
if workerCount < 20 {
|
||||||
|
workerCount = 20 // Minimum 20 workers
|
||||||
|
}
|
||||||
|
logger.Info("Starting %d packet workers (CPUs: %d)", workerCount, runtime.NumCPU())
|
||||||
for i := 0; i < workerCount; i++ {
|
for i := 0; i < workerCount; i++ {
|
||||||
go s.packetWorker()
|
go s.packetWorker()
|
||||||
}
|
}
|
||||||
@@ -449,6 +458,43 @@ func extractWireGuardIndices(packet []byte) (uint32, uint32, bool) {
|
|||||||
return 0, 0, false
|
return 0, 0, false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// cachedAddr holds a resolved UDP address with TTL
|
||||||
|
type cachedAddr struct {
|
||||||
|
addr *net.UDPAddr
|
||||||
|
expiresAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// addrCacheTTL is how long resolved addresses are cached before re-resolving
|
||||||
|
const addrCacheTTL = 5 * time.Minute
|
||||||
|
|
||||||
|
// getCachedAddr returns a cached UDP address or resolves and caches it.
|
||||||
|
// This avoids per-packet DNS lookups which are a major throughput bottleneck.
|
||||||
|
func (s *UDPProxyServer) getCachedAddr(ip string, port int) (*net.UDPAddr, error) {
|
||||||
|
key := fmt.Sprintf("%s:%d", ip, port)
|
||||||
|
|
||||||
|
// Check cache first
|
||||||
|
if cached, ok := s.addrCache.Load(key); ok {
|
||||||
|
entry := cached.(*cachedAddr)
|
||||||
|
if time.Now().Before(entry.expiresAt) {
|
||||||
|
return entry.addr, nil
|
||||||
|
}
|
||||||
|
// Cache expired, delete and re-resolve
|
||||||
|
s.addrCache.Delete(key)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve and cache
|
||||||
|
addr, err := net.ResolveUDPAddr("udp", key)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
s.addrCache.Store(key, &cachedAddr{
|
||||||
|
addr: addr,
|
||||||
|
expiresAt: time.Now().Add(addrCacheTTL),
|
||||||
|
})
|
||||||
|
return addr, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Updated to handle multi-peer WireGuard communication
|
// Updated to handle multi-peer WireGuard communication
|
||||||
func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UDPAddr) {
|
func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UDPAddr) {
|
||||||
if len(packet) == 0 {
|
if len(packet) == 0 {
|
||||||
@@ -483,7 +529,7 @@ func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UD
|
|||||||
logger.Debug("Forwarding handshake initiation from %s (sender index: %d) to peers %v", remoteAddr, senderIndex, proxyMapping.Destinations)
|
logger.Debug("Forwarding handshake initiation from %s (sender index: %d) to peers %v", remoteAddr, senderIndex, proxyMapping.Destinations)
|
||||||
|
|
||||||
for _, dest := range proxyMapping.Destinations {
|
for _, dest := range proxyMapping.Destinations {
|
||||||
destAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", dest.DestinationIP, dest.DestinationPort))
|
destAddr, err := s.getCachedAddr(dest.DestinationIP, dest.DestinationPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to resolve destination address: %v", err)
|
logger.Error("Failed to resolve destination address: %v", err)
|
||||||
continue
|
continue
|
||||||
@@ -519,7 +565,7 @@ func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UD
|
|||||||
|
|
||||||
// Forward the response to the original sender
|
// Forward the response to the original sender
|
||||||
for _, dest := range proxyMapping.Destinations {
|
for _, dest := range proxyMapping.Destinations {
|
||||||
destAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", dest.DestinationIP, dest.DestinationPort))
|
destAddr, err := s.getCachedAddr(dest.DestinationIP, dest.DestinationPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to resolve destination address: %v", err)
|
logger.Error("Failed to resolve destination address: %v", err)
|
||||||
continue
|
continue
|
||||||
@@ -576,7 +622,7 @@ func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UD
|
|||||||
// No known session, fall back to forwarding to all peers
|
// No known session, fall back to forwarding to all peers
|
||||||
logger.Debug("No session found for receiver index %d, forwarding to all destinations", receiverIndex)
|
logger.Debug("No session found for receiver index %d, forwarding to all destinations", receiverIndex)
|
||||||
for _, dest := range proxyMapping.Destinations {
|
for _, dest := range proxyMapping.Destinations {
|
||||||
destAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", dest.DestinationIP, dest.DestinationPort))
|
destAddr, err := s.getCachedAddr(dest.DestinationIP, dest.DestinationPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to resolve destination address: %v", err)
|
logger.Error("Failed to resolve destination address: %v", err)
|
||||||
continue
|
continue
|
||||||
@@ -604,7 +650,7 @@ func (s *UDPProxyServer) handleWireGuardPacket(packet []byte, remoteAddr *net.UD
|
|||||||
|
|
||||||
// Forward to all peers
|
// Forward to all peers
|
||||||
for _, dest := range proxyMapping.Destinations {
|
for _, dest := range proxyMapping.Destinations {
|
||||||
destAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf("%s:%d", dest.DestinationIP, dest.DestinationPort))
|
destAddr, err := s.getCachedAddr(dest.DestinationIP, dest.DestinationPort)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("Failed to resolve destination address: %v", err)
|
logger.Error("Failed to resolve destination address: %v", err)
|
||||||
continue
|
continue
|
||||||
|
|||||||
Reference in New Issue
Block a user