From c2fdf62f1fd7a9e52eddb18004b619006f7190bd Mon Sep 17 00:00:00 2001 From: Viktor Liu Date: Mon, 18 May 2026 12:39:51 +0200 Subject: [PATCH] Detect dead VNC peers on both ends and report session stats --- client/internal/engine_vnc.go | 15 +++ client/internal/metrics/influxdb.go | 30 +++++ client/internal/metrics/metrics.go | 31 +++++ client/internal/metrics/push_test.go | 3 + client/vnc/server/metrics_conn.go | 187 +++++++++++++++++++++++++++ client/vnc/server/server.go | 80 +++++++++++- client/vnc/server/server_windows.go | 2 + client/wasm/cmd/main.go | 10 +- client/wasm/internal/vnc/proxy.go | 37 ++++-- 9 files changed, 372 insertions(+), 23 deletions(-) create mode 100644 client/vnc/server/metrics_conn.go diff --git a/client/internal/engine_vnc.go b/client/internal/engine_vnc.go index d162f27cb..fa62f8396 100644 --- a/client/internal/engine_vnc.go +++ b/client/internal/engine_vnc.go @@ -11,6 +11,7 @@ import ( log "github.com/sirupsen/logrus" firewallManager "github.com/netbirdio/netbird/client/firewall/manager" + "github.com/netbirdio/netbird/client/internal/metrics" nftypes "github.com/netbirdio/netbird/client/internal/netflow/types" sshauth "github.com/netbirdio/netbird/client/ssh/auth" vncserver "github.com/netbirdio/netbird/client/vnc/server" @@ -102,6 +103,20 @@ func (e *Engine) startVNCServer(sshConf *mgmProto.SSHConfig) error { netbirdIP := e.wgInterface.Address().IP srv := vncserver.New(capturer, injector) + if e.clientMetrics != nil { + srv.SetSessionRecorder(func(t vncserver.SessionTick) { + e.clientMetrics.RecordVNCSessionTick(e.ctx, metrics.VNCSessionTick{ + Period: t.Period, + BytesOut: t.BytesOut, + Writes: t.Writes, + FBUs: t.FBUs, + MaxFBUBytes: t.MaxFBUBytes, + MaxFBURects: t.MaxFBURects, + MaxWriteBytes: t.MaxWriteBytes, + WriteNanos: t.WriteNanos, + }) + }) + } if vncNeedsServiceMode() { log.Info("VNC: running in Session 0, enabling service mode (agent proxy)") srv.SetServiceMode(true) diff --git a/client/internal/metrics/influxdb.go b/client/internal/metrics/influxdb.go index 531f6a986..dd7c24907 100644 --- a/client/internal/metrics/influxdb.go +++ b/client/internal/metrics/influxdb.go @@ -120,6 +120,36 @@ func (m *influxDBMetrics) RecordSyncDuration(_ context.Context, agentInfo AgentI m.trimLocked() } +func (m *influxDBMetrics) RecordVNCSessionTick(_ context.Context, agentInfo AgentInfo, tick VNCSessionTick) { + tags := fmt.Sprintf("deployment_type=%s,version=%s,os=%s,arch=%s,peer_id=%s", + agentInfo.DeploymentType.String(), + agentInfo.Version, + agentInfo.OS, + agentInfo.Arch, + agentInfo.peerID, + ) + + m.mu.Lock() + defer m.mu.Unlock() + + m.samples = append(m.samples, influxSample{ + measurement: "netbird_vnc_traffic", + tags: tags, + fields: map[string]float64{ + "period_seconds": tick.Period.Seconds(), + "bytes_out": float64(tick.BytesOut), + "writes": float64(tick.Writes), + "fbus": float64(tick.FBUs), + "max_fbu_bytes": float64(tick.MaxFBUBytes), + "max_fbu_rects": float64(tick.MaxFBURects), + "max_write_bytes": float64(tick.MaxWriteBytes), + "write_time_seconds": float64(tick.WriteNanos) / 1e9, + }, + timestamp: time.Now(), + }) + m.trimLocked() +} + func (m *influxDBMetrics) RecordLoginDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration, success bool) { result := "success" if !success { diff --git a/client/internal/metrics/metrics.go b/client/internal/metrics/metrics.go index 4ebb43496..7a0ed2912 100644 --- a/client/internal/metrics/metrics.go +++ b/client/internal/metrics/metrics.go @@ -59,6 +59,11 @@ type metricsImplementation interface { // RecordLoginDuration records how long the login to management took RecordLoginDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration, success bool) + // RecordVNCSessionTick records a periodic snapshot of one VNC + // session's wire activity. Called once per metricsConn tick interval + // (and once at session close), only when the tick saw activity. + RecordVNCSessionTick(ctx context.Context, agentInfo AgentInfo, tick VNCSessionTick) + // Export exports metrics in InfluxDB line protocol format Export(w io.Writer) error @@ -78,6 +83,21 @@ type ClientMetrics struct { pushCancel context.CancelFunc } +// VNCSessionTick is one sampling slice of a VNC session's wire activity. +// BytesOut / Writes / FBUs / WriteNanos are deltas observed during this +// tick; Max* fields are the high-water marks observed during the tick. +// Period is the wall-clock duration the deltas cover. +type VNCSessionTick struct { + Period time.Duration + BytesOut uint64 + Writes uint64 + FBUs uint64 + MaxFBUBytes uint64 + MaxFBURects uint64 + MaxWriteBytes uint64 + WriteNanos uint64 +} + // ConnectionStageTimestamps holds timestamps for each connection stage type ConnectionStageTimestamps struct { SignalingReceived time.Time // First signal received from remote peer (both initial and reconnection) @@ -127,6 +147,17 @@ func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Du c.impl.RecordSyncDuration(ctx, agentInfo, duration) } +// RecordVNCSessionTick records a periodic snapshot of one VNC session. +func (c *ClientMetrics) RecordVNCSessionTick(ctx context.Context, tick VNCSessionTick) { + if c == nil { + return + } + c.mu.RLock() + agentInfo := c.agentInfo + c.mu.RUnlock() + c.impl.RecordVNCSessionTick(ctx, agentInfo, tick) +} + // RecordLoginDuration records how long the login to management server took func (c *ClientMetrics) RecordLoginDuration(ctx context.Context, duration time.Duration, success bool) { if c == nil { diff --git a/client/internal/metrics/push_test.go b/client/internal/metrics/push_test.go index 20a509da1..e9f8cb976 100644 --- a/client/internal/metrics/push_test.go +++ b/client/internal/metrics/push_test.go @@ -73,6 +73,9 @@ func (m *mockMetrics) RecordSyncDuration(_ context.Context, _ AgentInfo, _ time. func (m *mockMetrics) RecordLoginDuration(_ context.Context, _ AgentInfo, _ time.Duration, _ bool) { } +func (m *mockMetrics) RecordVNCSessionTick(_ context.Context, _ AgentInfo, _ VNCSessionTick) { +} + func (m *mockMetrics) Export(w io.Writer) error { if m.exportData != "" { _, err := w.Write([]byte(m.exportData)) diff --git a/client/vnc/server/metrics_conn.go b/client/vnc/server/metrics_conn.go new file mode 100644 index 000000000..bc0f36d57 --- /dev/null +++ b/client/vnc/server/metrics_conn.go @@ -0,0 +1,187 @@ +//go:build !js && !ios && !android + +package server + +import ( + "net" + "sync" + "sync/atomic" + "time" +) + +// SessionTick is one sampling slice of a VNC session's wire activity. +// BytesOut / Writes / FBUs are deltas observed during this tick; +// Max* fields are the high-water marks observed during this tick (reset +// at the start of the next). Period is the wall-clock duration covered +// (typically sessionTickInterval, shorter for the final flush). +type SessionTick struct { + Period time.Duration + BytesOut uint64 + Writes uint64 + FBUs uint64 + MaxFBUBytes uint64 + MaxFBURects uint64 + MaxWriteBytes uint64 + WriteNanos uint64 +} + +// sessionTickInterval is how often metricsConn emits a SessionTick. One +// second matches noVNC's request cadence so each tick covers roughly one +// FBU round-trip during steady-state activity. +const sessionTickInterval = time.Second + +// metricsConn wraps a net.Conn and tracks per-session byte / write / FBU +// counters. Updates are atomic so the cost is a few atomic ops per Write +// (well under 100 ns), negligible against the syscall itself, so the wrap +// is always installed. A goroutine emits a SessionTick to the recorder +// every sessionTickInterval (only when the tick has activity to report); +// a final partial-tick flush runs on Close. +type metricsConn struct { + net.Conn + + recorder func(SessionTick) + + bytesOut uint64 + writes uint64 + writeNanos uint64 + largestPkt uint64 + fbus uint64 + fbuBytes uint64 + fbuRects uint64 + maxFBUBytes uint64 + maxFBURects uint64 + + tickMu sync.Mutex + tickStart time.Time + tickPrevB uint64 + tickPrevW uint64 + tickPrevF uint64 + tickPrevNS uint64 + + closeOnce sync.Once + done chan struct{} +} + +func newMetricsConn(c net.Conn, recorder func(SessionTick)) net.Conn { + m := &metricsConn{ + Conn: c, + recorder: recorder, + tickStart: time.Now(), + done: make(chan struct{}), + } + if recorder != nil { + go m.tickLoop() + } + return m +} + +// tickLoop emits a SessionTick every sessionTickInterval until done. +// Empty ticks (no writes since the last tick) are skipped. +func (m *metricsConn) tickLoop() { + t := time.NewTicker(sessionTickInterval) + defer t.Stop() + for { + select { + case <-m.done: + return + case <-t.C: + m.flushTick(false) + } + } +} + +// flushTick computes deltas since the last tick, resets the per-tick max +// trackers, and emits a SessionTick to the recorder. final=true forces +// emission even if no writes happened (used at session close to record +// the trailing partial period). +func (m *metricsConn) flushTick(final bool) { + m.tickMu.Lock() + defer m.tickMu.Unlock() + + b := atomic.LoadUint64(&m.bytesOut) + w := atomic.LoadUint64(&m.writes) + f := atomic.LoadUint64(&m.fbus) + ns := atomic.LoadUint64(&m.writeNanos) + + db := b - m.tickPrevB + dw := w - m.tickPrevW + df := f - m.tickPrevF + dns := ns - m.tickPrevNS + m.tickPrevB, m.tickPrevW, m.tickPrevF, m.tickPrevNS = b, w, f, ns + + maxFBU := atomic.SwapUint64(&m.maxFBUBytes, 0) + maxRects := atomic.SwapUint64(&m.maxFBURects, 0) + maxPkt := atomic.SwapUint64(&m.largestPkt, 0) + + period := time.Since(m.tickStart) + m.tickStart = time.Now() + + if dw == 0 && !final { + return + } + m.recorder(SessionTick{ + Period: period, + BytesOut: db, + Writes: dw, + FBUs: df, + MaxFBUBytes: maxFBU, + MaxFBURects: maxRects, + MaxWriteBytes: maxPkt, + WriteNanos: dns, + }) +} + +// isFBUHeader reports whether the given Write payload is the 4-byte +// FramebufferUpdate header (message type 0, padding 0, rect-count high +// byte). Rect bodies are written separately by sendDirtyAndMoves, so the +// FBU/rect boundary lines up with Write boundaries. +func isFBUHeader(p []byte) bool { + return len(p) == 4 && p[0] == serverFramebufferUpdate +} + +func (m *metricsConn) Write(p []byte) (int, error) { + if isFBUHeader(p) { + if b := atomic.SwapUint64(&m.fbuBytes, 0); b > 0 { + if b > atomic.LoadUint64(&m.maxFBUBytes) { + atomic.StoreUint64(&m.maxFBUBytes, b) + } + } + if r := atomic.SwapUint64(&m.fbuRects, 0); r > 0 { + if r > atomic.LoadUint64(&m.maxFBURects) { + atomic.StoreUint64(&m.maxFBURects, r) + } + } + atomic.AddUint64(&m.fbus, 1) + } + + t0 := time.Now() + n, err := m.Conn.Write(p) + atomic.AddUint64(&m.writeNanos, uint64(time.Since(t0).Nanoseconds())) + atomic.AddUint64(&m.bytesOut, uint64(n)) + atomic.AddUint64(&m.writes, 1) + if !isFBUHeader(p) { + atomic.AddUint64(&m.fbuBytes, uint64(n)) + atomic.AddUint64(&m.fbuRects, 1) + } + if uint64(n) > atomic.LoadUint64(&m.largestPkt) { + atomic.StoreUint64(&m.largestPkt, uint64(n)) + } + return n, err +} + +func (m *metricsConn) Close() error { + m.closeOnce.Do(func() { + close(m.done) + if m.recorder == nil { + return + } + if b := atomic.SwapUint64(&m.fbuBytes, 0); b > atomic.LoadUint64(&m.maxFBUBytes) { + atomic.StoreUint64(&m.maxFBUBytes, b) + } + if r := atomic.SwapUint64(&m.fbuRects, 0); r > atomic.LoadUint64(&m.maxFBURects) { + atomic.StoreUint64(&m.maxFBURects, r) + } + m.flushTick(true) + }) + return m.Conn.Close() +} diff --git a/client/vnc/server/server.go b/client/vnc/server/server.go index cdcea9570..0cbd95182 100644 --- a/client/vnc/server/server.go +++ b/client/vnc/server/server.go @@ -156,9 +156,15 @@ type Server struct { netstackNet *netstack.Net agentToken []byte // raw token bytes for agent-mode auth - sessionsMu sync.Mutex - sessionSeq uint64 - sessions map[uint64]ActiveSessionInfo + sessionsMu sync.Mutex + sessionSeq uint64 + sessions map[uint64]ActiveSessionInfo + sessionConns map[uint64]net.Conn + + // sessionRecorder, when non-nil, receives a SessionTick periodically + // during each VNC session and on session close. The engine wires + // this to its metrics framework. + sessionRecorder func(SessionTick) } // ActiveSessionInfo describes a currently connected VNC client. @@ -195,7 +201,8 @@ func New(capturer ScreenCapturer, injector InputInjector) *Server { injector: injector, authorizer: sshauth.NewAuthorizer(), log: log.WithField("component", "vnc-server"), - sessions: make(map[uint64]ActiveSessionInfo), + sessions: make(map[uint64]ActiveSessionInfo), + sessionConns: make(map[uint64]net.Conn), } } @@ -210,12 +217,13 @@ func (s *Server) ActiveSessions() []ActiveSessionInfo { return out } -func (s *Server) addSession(info ActiveSessionInfo) uint64 { +func (s *Server) addSession(info ActiveSessionInfo, conn net.Conn) uint64 { s.sessionsMu.Lock() defer s.sessionsMu.Unlock() s.sessionSeq++ id := s.sessionSeq s.sessions[id] = info + s.sessionConns[id] = conn return id } @@ -223,6 +231,24 @@ func (s *Server) removeSession(id uint64) { s.sessionsMu.Lock() defer s.sessionsMu.Unlock() delete(s.sessions, id) + delete(s.sessionConns, id) +} + +// closeActiveSessions closes every active session's connection so the +// per-session serve goroutines unblock from their Read loops and exit. +// Called from Stop to make sure clients see an immediate disconnect when +// the server is brought down, instead of waiting for the OS to reclaim +// the sockets after process exit. +func (s *Server) closeActiveSessions() { + s.sessionsMu.Lock() + conns := make([]net.Conn, 0, len(s.sessionConns)) + for _, c := range s.sessionConns { + conns = append(conns, c) + } + s.sessionsMu.Unlock() + for _, c := range conns { + _ = c.Close() + } } // SetServiceMode enables proxy-to-agent mode for Windows service operation. @@ -230,6 +256,14 @@ func (s *Server) SetServiceMode(enabled bool) { s.serviceMode = enabled } +// SetSessionRecorder installs a callback that receives a SessionTick +// each sessionTickInterval during a VNC session and one final tick on +// session close. Pass nil to disable. Empty ticks (no wire activity) +// are skipped. +func (s *Server) SetSessionRecorder(recorder func(SessionTick)) { + s.sessionRecorder = recorder +} + // SetJWTConfig configures JWT authentication for VNC connections. // Pass nil to disable JWT (public mode). func (s *Server) SetJWTConfig(config *JWTConfig) { @@ -340,6 +374,13 @@ func (s *Server) Stop() error { s.cancel = nil } + // Close active client connections before tearing down capturers and + // listeners. The per-session serve goroutines unblock from their Read + // loop with an error and run their deferred conn.Close, which surfaces + // a clean disconnect on the client side instead of leaving the + // connection hanging until the OS reclaims it on process exit. + s.closeActiveSessions() + if s.vmgr != nil { s.vmgr.StopAll() } @@ -378,10 +419,36 @@ func (s *Server) acceptLoop() { continue } + enableTCPKeepAlive(conn, s.log) go s.handleConnection(conn) } } +// vncKeepAlivePeriod controls how often TCP layer probes are sent on an +// idle connection. Default OS settings (2 hours) are too long for an +// interactive session: when the server-side host dies without sending FIN +// (power loss, network partition, hung kernel), the client only learns of +// the dead connection when the OS gives up on a probe. 30 s here means +// most clients notice within ~3 minutes worst case. +const vncKeepAlivePeriod = 30 * time.Second + +// enableTCPKeepAlive turns on SO_KEEPALIVE on the underlying TCP socket. +// Non-TCP conns (e.g. the netstack-backed listener) are skipped silently; +// keepalive there is the netstack's concern. +func enableTCPKeepAlive(c net.Conn, log *log.Entry) { + tc, ok := c.(*net.TCPConn) + if !ok { + return + } + if err := tc.SetKeepAlive(true); err != nil { + log.Debugf("set keepalive: %v", err) + return + } + if err := tc.SetKeepAlivePeriod(vncKeepAlivePeriod); err != nil { + log.Debugf("set keepalive period: %v", err) + } +} + func (s *Server) validateCapturer(capturer ScreenCapturer) error { // Quick check first: if already ready, return immediately. if capturer.Width() > 0 && capturer.Height() > 0 { @@ -472,7 +539,7 @@ func (s *Server) handleConnection(conn net.Conn) { Mode: modeString(header.mode), Username: header.username, JWTUsername: jwtUserID, - }) + }, conn) defer s.removeSession(sessionID) if err := s.validateCapturer(capturer); err != nil { @@ -481,6 +548,7 @@ func (s *Server) handleConnection(conn net.Conn) { return } + conn = newMetricsConn(conn, s.sessionRecorder) sess := &session{ conn: conn, capturer: capturer, diff --git a/client/vnc/server/server_windows.go b/client/vnc/server/server_windows.go index 6caec5cdd..efff86a9a 100644 --- a/client/vnc/server/server_windows.go +++ b/client/vnc/server/server_windows.go @@ -255,6 +255,8 @@ func (s *Server) serviceAcceptLoop() { continue } + enableTCPKeepAlive(conn, s.log) + conn = newMetricsConn(conn, s.sessionRecorder) go s.handleServiceConnection(conn, sm) } } diff --git a/client/wasm/cmd/main.go b/client/wasm/cmd/main.go index b7a92f3a1..aed7d2b2a 100644 --- a/client/wasm/cmd/main.go +++ b/client/wasm/cmd/main.go @@ -691,10 +691,10 @@ func createStartCaptureMethod(client *netbird.Client) js.Func { // // Usage from browser devtools console: // -// await client.capture() // capture all packets -// await client.capture("tcp") // capture with filter -// await client.capture({filter: "host 10.0.0.1", verbose: true}) -// client.stopCapture() // stop and print stats +// await netbird.capture() // capture all packets +// await netbird.capture("tcp") // capture with filter +// await netbird.capture({filter: "host 10.0.0.1", verbose: true}) +// netbird.stopCapture() // stop and print stats func captureMethods(client *netbird.Client) (startFn, stopFn js.Func) { var mu sync.Mutex var active *wasmcapture.Handle @@ -722,7 +722,7 @@ func captureMethods(client *netbird.Client) (startFn, stopFn js.Func) { active = h console := js.Global().Get("console") - console.Call("log", "[capture] started, call client.stopCapture() to stop") + console.Call("log", "[capture] started, call netbird.stopCapture() to stop") resolve.Invoke(js.Undefined()) }) }) diff --git a/client/wasm/internal/vnc/proxy.go b/client/wasm/internal/vnc/proxy.go index 5d9b58a2a..84718383c 100644 --- a/client/wasm/internal/vnc/proxy.go +++ b/client/wasm/internal/vnc/proxy.go @@ -4,6 +4,7 @@ package vnc import ( "context" + "errors" "fmt" "io" "net" @@ -23,6 +24,15 @@ const ( // Connection modes matching server/server.go constants. modeAttach byte = 0 modeSession byte = 1 + + // WebSocket close codes the dashboard branches on. Codes 1000-1015 + // are reserved by RFC 6455; 4000-4999 are application-defined. + wsCodeNormal = 1000 + wsCodeAbnormal = 1006 + wsCodeDialTimeout = 4001 + wsCodeDialFailure = 4002 + wsCodeSessionSetup = 4003 + wsCodeTransport = 4004 ) // VNCProxy bridges WebSocket connections from noVNC in the browser @@ -245,8 +255,12 @@ func (p *VNCProxy) connectToVNC(conn *vncConnection) { if err != nil { log.Errorf("VNC connect to %s: %v", conn.destination.address, err) // Close the WebSocket so noVNC fires a disconnect event. + code := wsCodeDialFailure + if errors.Is(err, context.DeadlineExceeded) { + code = wsCodeDialTimeout + } if conn.wsHandlers.Get("close").Truthy() { - conn.wsHandlers.Call("close", 1006, fmt.Sprintf("connect to peer: %v", err)) + conn.wsHandlers.Call("close", code, fmt.Sprintf("connect to peer: %v", err)) } p.cleanupConnection(conn) return @@ -259,7 +273,7 @@ func (p *VNCProxy) connectToVNC(conn *vncConnection) { if err := p.sendSessionHeader(vncConn, conn.destination); err != nil { log.Errorf("send VNC session header: %v", err) if conn.wsHandlers.Get("close").Truthy() { - conn.wsHandlers.Call("close", 1006, fmt.Sprintf("send session header: %v", err)) + conn.wsHandlers.Call("close", wsCodeSessionSetup, fmt.Sprintf("send session header: %v", err)) } p.cleanupConnection(conn) return @@ -359,24 +373,23 @@ func (c *vncConnection) snapshotVNC() (net.Conn, bool) { } // handleConnReadError classifies an error from the VNC read loop. Returns -// true if the caller should exit; false to retry (transient timeout). +// true if the caller should exit and trigger the cleanup path. A read +// timeout counts as a fatal error: in a healthy session the server emits +// empty FramebufferUpdate responses several times per second, so a full +// idleReadDeadline of silence means the peer is dead (process gone, +// machine off, network partition) and the in-browser TCP stack will +// never surface that on its own. func (p *VNCProxy) handleConnReadError(conn *vncConnection, err error) bool { if conn.ctx.Err() != nil { return true } if netErr, ok := err.(interface{ Timeout() bool }); ok && netErr.Timeout() { - // Read timeout: connection might be stale. The next iteration will - // fail too and trigger the close path. - return false - } - if err != io.EOF { + log.Debugf("VNC read deadline expired; treating peer as dead") + } else if err != io.EOF { log.Debugf("read from VNC connection: %v", err) } - // Close the WebSocket to notify noVNC, and cancel the local context so - // cleanupConnection isn't left waiting on the JS close callback that - // may never fire on hard errors. if conn.wsHandlers.Get("close").Truthy() { - conn.wsHandlers.Call("close", 1006, "VNC connection lost") + conn.wsHandlers.Call("close", wsCodeTransport, "VNC connection lost") } conn.cancel() return true