From 34341d95a930a252588933eada70f92c296d756f Mon Sep 17 00:00:00 2001 From: Maycon Santos Date: Mon, 6 Oct 2025 15:22:02 -0300 Subject: [PATCH 1/3] Adjust signal port for websocket connections (#4594) --- infrastructure_files/docker-compose.yml.tmpl.traefik | 2 +- infrastructure_files/getting-started-with-zitadel.sh | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/infrastructure_files/docker-compose.yml.tmpl.traefik b/infrastructure_files/docker-compose.yml.tmpl.traefik index fb01e6867..196e26a66 100644 --- a/infrastructure_files/docker-compose.yml.tmpl.traefik +++ b/infrastructure_files/docker-compose.yml.tmpl.traefik @@ -47,7 +47,7 @@ services: - traefik.enable=true - traefik.http.routers.netbird-wsproxy-signal.rule=Host(`$NETBIRD_DOMAIN`) && PathPrefix(`/ws-proxy/signal`) - traefik.http.routers.netbird-wsproxy-signal.service=netbird-wsproxy-signal - - traefik.http.services.netbird-wsproxy-signal.loadbalancer.server.port=10000 + - traefik.http.services.netbird-wsproxy-signal.loadbalancer.server.port=80 - traefik.http.routers.netbird-signal.rule=Host(`$NETBIRD_DOMAIN`) && PathPrefix(`/signalexchange.SignalExchange/`) - traefik.http.services.netbird-signal.loadbalancer.server.port=10000 - traefik.http.services.netbird-signal.loadbalancer.server.scheme=h2c diff --git a/infrastructure_files/getting-started-with-zitadel.sh b/infrastructure_files/getting-started-with-zitadel.sh index be9662345..bc326cd7e 100644 --- a/infrastructure_files/getting-started-with-zitadel.sh +++ b/infrastructure_files/getting-started-with-zitadel.sh @@ -621,7 +621,7 @@ renderCaddyfile() { # relay reverse_proxy /relay* relay:80 # Signal - reverse_proxy /ws-proxy/signal* signal:10000 + reverse_proxy /ws-proxy/signal* signal:80 reverse_proxy /signalexchange.SignalExchange/* h2c://signal:10000 # Management reverse_proxy /api/* management:80 From 954f40991f3b6c399e6ce1cb20577aacca87d38e Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Mon, 6 Oct 2025 21:22:19 +0200 Subject: [PATCH 2/3] [client,management,signal] Handle grpc from ws proxy internally instead of via tcp (#4593) --- client/grpc/dialer.go | 7 +- management/internals/server/server.go | 3 +- signal/cmd/run.go | 9 +- util/wsproxy/server/proxy.go | 140 ++++++++++---------------- 4 files changed, 62 insertions(+), 97 deletions(-) diff --git a/client/grpc/dialer.go b/client/grpc/dialer.go index 54fbb002c..6aff53b92 100644 --- a/client/grpc/dialer.go +++ b/client/grpc/dialer.go @@ -29,7 +29,8 @@ func Backoff(ctx context.Context) backoff.BackOff { // The component parameter specifies the WebSocket proxy component path (e.g., "/management", "/signal"). func CreateConnection(ctx context.Context, addr string, tlsEnabled bool, component string) (*grpc.ClientConn, error) { transportOption := grpc.WithTransportCredentials(insecure.NewCredentials()) - if tlsEnabled { + // for js, the outer websocket layer takes care of tls + if tlsEnabled && runtime.GOOS != "js" { certPool, err := x509.SystemCertPool() if err != nil || certPool == nil { log.Debugf("System cert pool not available; falling back to embedded cert, error: %v", err) @@ -37,9 +38,7 @@ func CreateConnection(ctx context.Context, addr string, tlsEnabled bool, compone } transportOption = grpc.WithTransportCredentials(credentials.NewTLS(&tls.Config{ - // for js, outer websocket layer takes care of tls verification via WithCustomDialer - InsecureSkipVerify: runtime.GOOS == "js", - RootCAs: certPool, + RootCAs: certPool, })) } diff --git a/management/internals/server/server.go b/management/internals/server/server.go index 94c633fc6..ab1c2ebe7 100644 --- a/management/internals/server/server.go +++ b/management/internals/server/server.go @@ -6,7 +6,6 @@ import ( "fmt" "net" "net/http" - "net/netip" "strings" "sync" "time" @@ -252,7 +251,7 @@ func updateMgmtConfig(ctx context.Context, path string, config *nbconfig.Config) } func (s *BaseServer) handlerFunc(gRPCHandler *grpc.Server, httpHandler http.Handler, meter metric.Meter) http.Handler { - wsProxy := wsproxyserver.New(netip.AddrPortFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), ManagementLegacyPort), wsproxyserver.WithOTelMeter(meter)) + wsProxy := wsproxyserver.New(gRPCHandler, wsproxyserver.WithOTelMeter(meter)) return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { switch { diff --git a/signal/cmd/run.go b/signal/cmd/run.go index 696c44723..96873dee7 100644 --- a/signal/cmd/run.go +++ b/signal/cmd/run.go @@ -10,7 +10,6 @@ import ( "net/http" // nolint:gosec _ "net/http/pprof" - "net/netip" "time" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -63,10 +62,10 @@ var ( Use: "run", Short: "start NetBird Signal Server daemon", SilenceUsage: true, - PreRun: func(cmd *cobra.Command, args []string) { + PreRunE: func(cmd *cobra.Command, args []string) error { err := util.InitLog(logLevel, logFile) if err != nil { - log.Fatalf("failed initializing log %v", err) + return fmt.Errorf("failed initializing log: %w", err) } flag.Parse() @@ -87,6 +86,8 @@ var ( signalPort = 80 } } + + return nil }, RunE: func(cmd *cobra.Command, args []string) error { flag.Parse() @@ -254,7 +255,7 @@ func startServerWithCertManager(certManager *autocert.Manager, grpcRootHandler h } func grpcHandlerFunc(grpcServer *grpc.Server, meter metric.Meter) http.Handler { - wsProxy := wsproxyserver.New(netip.AddrPortFrom(netip.AddrFrom4([4]byte{127, 0, 0, 1}), legacyGRPCPort), wsproxyserver.WithOTelMeter(meter)) + wsProxy := wsproxyserver.New(grpcServer, wsproxyserver.WithOTelMeter(meter)) return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { switch { diff --git a/util/wsproxy/server/proxy.go b/util/wsproxy/server/proxy.go index 977440a60..8866924df 100644 --- a/util/wsproxy/server/proxy.go +++ b/util/wsproxy/server/proxy.go @@ -2,42 +2,41 @@ package server import ( "context" - "errors" "io" "net" "net/http" - "net/netip" "sync" "time" "github.com/coder/websocket" log "github.com/sirupsen/logrus" + "golang.org/x/net/http2" "github.com/netbirdio/netbird/util/wsproxy" ) const ( - dialTimeout = 10 * time.Second - bufferSize = 32 * 1024 + bufferSize = 32 * 1024 + ioTimeout = 5 * time.Second ) // Config contains the configuration for the WebSocket proxy. type Config struct { - LocalGRPCAddr netip.AddrPort + Handler http.Handler Path string MetricsRecorder MetricsRecorder } -// Proxy handles WebSocket to TCP proxying for gRPC connections. +// Proxy handles WebSocket to gRPC handler proxying. type Proxy struct { config Config metrics MetricsRecorder } // New creates a new WebSocket proxy instance with optional configuration -func New(localGRPCAddr netip.AddrPort, opts ...Option) *Proxy { +func New(handler http.Handler, opts ...Option) *Proxy { config := Config{ - LocalGRPCAddr: localGRPCAddr, + Handler: handler, Path: wsproxy.ProxyPath, MetricsRecorder: NoOpMetricsRecorder{}, // Default to no-op } @@ -63,7 +62,7 @@ func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request) { p.metrics.RecordConnection(ctx) defer p.metrics.RecordDisconnection(ctx) - log.Debugf("WebSocket proxy handling connection from %s, forwarding to %s", r.RemoteAddr, p.config.LocalGRPCAddr) + log.Debugf("WebSocket proxy handling connection from %s, forwarding to internal gRPC handler", r.RemoteAddr) acceptOptions := &websocket.AcceptOptions{ OriginPatterns: []string{"*"}, } @@ -75,71 +74,41 @@ func (p *Proxy) handleWebSocket(w http.ResponseWriter, r *http.Request) { return } defer func() { - if err := wsConn.Close(websocket.StatusNormalClosure, ""); err != nil { - log.Debugf("Failed to close WebSocket: %v", err) - } + _ = wsConn.Close(websocket.StatusNormalClosure, "") }() - log.Debugf("WebSocket proxy attempting to connect to local gRPC at %s", p.config.LocalGRPCAddr) - tcpConn, err := net.DialTimeout("tcp", p.config.LocalGRPCAddr.String(), dialTimeout) - if err != nil { - p.metrics.RecordError(ctx, "tcp_dial_failed") - log.Warnf("Failed to connect to local gRPC server at %s: %v", p.config.LocalGRPCAddr, err) - if err := wsConn.Close(websocket.StatusInternalError, "Backend unavailable"); err != nil { - log.Debugf("Failed to close WebSocket after connection failure: %v", err) - } - return - } + clientConn, serverConn := net.Pipe() defer func() { - if err := tcpConn.Close(); err != nil { - log.Debugf("Failed to close TCP connection: %v", err) - } + _ = clientConn.Close() + _ = serverConn.Close() }() - log.Debugf("WebSocket proxy established: client %s -> local gRPC %s", r.RemoteAddr, p.config.LocalGRPCAddr) + log.Debugf("WebSocket proxy established: %s -> gRPC handler", r.RemoteAddr) - p.proxyData(ctx, wsConn, tcpConn) + go func() { + (&http2.Server{}).ServeConn(serverConn, &http2.ServeConnOpts{ + Context: ctx, + Handler: p.config.Handler, + }) + }() + + p.proxyData(ctx, wsConn, clientConn, r.RemoteAddr) } -func (p *Proxy) proxyData(ctx context.Context, wsConn *websocket.Conn, tcpConn net.Conn) { +func (p *Proxy) proxyData(ctx context.Context, wsConn *websocket.Conn, pipeConn net.Conn, clientAddr string) { proxyCtx, cancel := context.WithCancel(ctx) defer cancel() var wg sync.WaitGroup wg.Add(2) - go p.wsToTCP(proxyCtx, cancel, &wg, wsConn, tcpConn) - go p.tcpToWS(proxyCtx, cancel, &wg, wsConn, tcpConn) + go p.wsToPipe(proxyCtx, cancel, &wg, wsConn, pipeConn, clientAddr) + go p.pipeToWS(proxyCtx, cancel, &wg, wsConn, pipeConn, clientAddr) - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - - select { - case <-done: - log.Tracef("Proxy data transfer completed, both goroutines terminated") - case <-proxyCtx.Done(): - log.Tracef("Proxy data transfer cancelled, forcing connection closure") - - if err := wsConn.Close(websocket.StatusGoingAway, "proxy cancelled"); err != nil { - log.Tracef("Error closing WebSocket during cancellation: %v", err) - } - if err := tcpConn.Close(); err != nil { - log.Tracef("Error closing TCP connection during cancellation: %v", err) - } - - select { - case <-done: - log.Tracef("Goroutines terminated after forced connection closure") - case <-time.After(2 * time.Second): - log.Tracef("Goroutines did not terminate within timeout after connection closure") - } - } + wg.Wait() } -func (p *Proxy) wsToTCP(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, wsConn *websocket.Conn, tcpConn net.Conn) { +func (p *Proxy) wsToPipe(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, wsConn *websocket.Conn, pipeConn net.Conn, clientAddr string) { defer wg.Done() defer cancel() @@ -148,80 +117,77 @@ func (p *Proxy) wsToTCP(ctx context.Context, cancel context.CancelFunc, wg *sync if err != nil { switch { case ctx.Err() != nil: - log.Debugf("wsToTCP goroutine terminating due to context cancellation") - case websocket.CloseStatus(err) == websocket.StatusNormalClosure: - log.Debugf("WebSocket closed normally") + log.Debugf("WebSocket from %s terminating due to context cancellation", clientAddr) + case websocket.CloseStatus(err) != -1: + log.Debugf("WebSocket from %s disconnected", clientAddr) default: p.metrics.RecordError(ctx, "websocket_read_error") - log.Errorf("WebSocket read error: %v", err) + log.Debugf("WebSocket read error from %s: %v", clientAddr, err) } return } if msgType != websocket.MessageBinary { - log.Warnf("Unexpected WebSocket message type: %v", msgType) + log.Warnf("Unexpected WebSocket message type from %s: %v", clientAddr, msgType) continue } if ctx.Err() != nil { - log.Tracef("wsToTCP goroutine terminating due to context cancellation before TCP write") + log.Tracef("wsToPipe goroutine terminating due to context cancellation before pipe write") return } - if err := tcpConn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { - log.Debugf("Failed to set TCP write deadline: %v", err) + if err := pipeConn.SetWriteDeadline(time.Now().Add(ioTimeout)); err != nil { + log.Debugf("Failed to set pipe write deadline: %v", err) } - n, err := tcpConn.Write(data) + n, err := pipeConn.Write(data) if err != nil { - p.metrics.RecordError(ctx, "tcp_write_error") - log.Errorf("TCP write error: %v", err) + p.metrics.RecordError(ctx, "pipe_write_error") + log.Warnf("Pipe write error for %s: %v", clientAddr, err) return } - p.metrics.RecordBytesTransferred(ctx, "ws_to_tcp", int64(n)) + p.metrics.RecordBytesTransferred(ctx, "ws_to_grpc", int64(n)) } } -func (p *Proxy) tcpToWS(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, wsConn *websocket.Conn, tcpConn net.Conn) { +func (p *Proxy) pipeToWS(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup, wsConn *websocket.Conn, pipeConn net.Conn, clientAddr string) { defer wg.Done() defer cancel() buf := make([]byte, bufferSize) for { - if err := tcpConn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { - log.Debugf("Failed to set TCP read deadline: %v", err) + if err := pipeConn.SetReadDeadline(time.Now().Add(ioTimeout)); err != nil { + log.Debugf("Failed to set pipe read deadline: %v", err) } - n, err := tcpConn.Read(buf) + n, err := pipeConn.Read(buf) if err != nil { if ctx.Err() != nil { - log.Tracef("tcpToWS goroutine terminating due to context cancellation") + log.Tracef("pipeToWS goroutine terminating due to context cancellation") return } - var netErr net.Error - if errors.As(err, &netErr) && netErr.Timeout() { - continue - } - if err != io.EOF { - log.Errorf("TCP read error: %v", err) + log.Debugf("Pipe read error for %s: %v", clientAddr, err) } return } if ctx.Err() != nil { - log.Tracef("tcpToWS goroutine terminating due to context cancellation before WebSocket write") + log.Tracef("pipeToWS goroutine terminating due to context cancellation before WebSocket write") return } - if err := wsConn.Write(ctx, websocket.MessageBinary, buf[:n]); err != nil { - p.metrics.RecordError(ctx, "websocket_write_error") - log.Errorf("WebSocket write error: %v", err) - return - } + if n > 0 { + if err := wsConn.Write(ctx, websocket.MessageBinary, buf[:n]); err != nil { + p.metrics.RecordError(ctx, "websocket_write_error") + log.Warnf("WebSocket write error for %s: %v", clientAddr, err) + return + } - p.metrics.RecordBytesTransferred(ctx, "tcp_to_ws", int64(n)) + p.metrics.RecordBytesTransferred(ctx, "grpc_to_ws", int64(n)) + } } } From 88467883fc4a14607393beaac412f573eaaa1d43 Mon Sep 17 00:00:00 2001 From: Viktor Liu <17948409+lixmal@users.noreply.github.com> Date: Mon, 6 Oct 2025 22:05:48 +0200 Subject: [PATCH 3/3] [management,signal] Remove ws-proxy read deadline (#4598) --- util/wsproxy/server/proxy.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/util/wsproxy/server/proxy.go b/util/wsproxy/server/proxy.go index 8866924df..ffb622200 100644 --- a/util/wsproxy/server/proxy.go +++ b/util/wsproxy/server/proxy.go @@ -158,10 +158,6 @@ func (p *Proxy) pipeToWS(ctx context.Context, cancel context.CancelFunc, wg *syn buf := make([]byte, bufferSize) for { - if err := pipeConn.SetReadDeadline(time.Now().Add(ioTimeout)); err != nil { - log.Debugf("Failed to set pipe read deadline: %v", err) - } - n, err := pipeConn.Read(buf) if err != nil { if ctx.Err() != nil {