Detect dead VNC peers on both ends and report session stats

This commit is contained in:
Viktor Liu
2026-05-18 12:39:51 +02:00
parent b9f5264e36
commit c2fdf62f1f
9 changed files with 372 additions and 23 deletions

View File

@@ -11,6 +11,7 @@ import (
log "github.com/sirupsen/logrus"
firewallManager "github.com/netbirdio/netbird/client/firewall/manager"
"github.com/netbirdio/netbird/client/internal/metrics"
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
sshauth "github.com/netbirdio/netbird/client/ssh/auth"
vncserver "github.com/netbirdio/netbird/client/vnc/server"
@@ -102,6 +103,20 @@ func (e *Engine) startVNCServer(sshConf *mgmProto.SSHConfig) error {
netbirdIP := e.wgInterface.Address().IP
srv := vncserver.New(capturer, injector)
if e.clientMetrics != nil {
srv.SetSessionRecorder(func(t vncserver.SessionTick) {
e.clientMetrics.RecordVNCSessionTick(e.ctx, metrics.VNCSessionTick{
Period: t.Period,
BytesOut: t.BytesOut,
Writes: t.Writes,
FBUs: t.FBUs,
MaxFBUBytes: t.MaxFBUBytes,
MaxFBURects: t.MaxFBURects,
MaxWriteBytes: t.MaxWriteBytes,
WriteNanos: t.WriteNanos,
})
})
}
if vncNeedsServiceMode() {
log.Info("VNC: running in Session 0, enabling service mode (agent proxy)")
srv.SetServiceMode(true)

View File

@@ -120,6 +120,36 @@ func (m *influxDBMetrics) RecordSyncDuration(_ context.Context, agentInfo AgentI
m.trimLocked()
}
func (m *influxDBMetrics) RecordVNCSessionTick(_ context.Context, agentInfo AgentInfo, tick VNCSessionTick) {
tags := fmt.Sprintf("deployment_type=%s,version=%s,os=%s,arch=%s,peer_id=%s",
agentInfo.DeploymentType.String(),
agentInfo.Version,
agentInfo.OS,
agentInfo.Arch,
agentInfo.peerID,
)
m.mu.Lock()
defer m.mu.Unlock()
m.samples = append(m.samples, influxSample{
measurement: "netbird_vnc_traffic",
tags: tags,
fields: map[string]float64{
"period_seconds": tick.Period.Seconds(),
"bytes_out": float64(tick.BytesOut),
"writes": float64(tick.Writes),
"fbus": float64(tick.FBUs),
"max_fbu_bytes": float64(tick.MaxFBUBytes),
"max_fbu_rects": float64(tick.MaxFBURects),
"max_write_bytes": float64(tick.MaxWriteBytes),
"write_time_seconds": float64(tick.WriteNanos) / 1e9,
},
timestamp: time.Now(),
})
m.trimLocked()
}
func (m *influxDBMetrics) RecordLoginDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration, success bool) {
result := "success"
if !success {

View File

@@ -59,6 +59,11 @@ type metricsImplementation interface {
// RecordLoginDuration records how long the login to management took
RecordLoginDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration, success bool)
// RecordVNCSessionTick records a periodic snapshot of one VNC
// session's wire activity. Called once per metricsConn tick interval
// (and once at session close), only when the tick saw activity.
RecordVNCSessionTick(ctx context.Context, agentInfo AgentInfo, tick VNCSessionTick)
// Export exports metrics in InfluxDB line protocol format
Export(w io.Writer) error
@@ -78,6 +83,21 @@ type ClientMetrics struct {
pushCancel context.CancelFunc
}
// VNCSessionTick is one sampling slice of a VNC session's wire activity.
// BytesOut / Writes / FBUs / WriteNanos are deltas observed during this
// tick; Max* fields are the high-water marks observed during the tick.
// Period is the wall-clock duration the deltas cover.
type VNCSessionTick struct {
Period time.Duration
BytesOut uint64
Writes uint64
FBUs uint64
MaxFBUBytes uint64
MaxFBURects uint64
MaxWriteBytes uint64
WriteNanos uint64
}
// ConnectionStageTimestamps holds timestamps for each connection stage
type ConnectionStageTimestamps struct {
SignalingReceived time.Time // First signal received from remote peer (both initial and reconnection)
@@ -127,6 +147,17 @@ func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Du
c.impl.RecordSyncDuration(ctx, agentInfo, duration)
}
// RecordVNCSessionTick records a periodic snapshot of one VNC session.
func (c *ClientMetrics) RecordVNCSessionTick(ctx context.Context, tick VNCSessionTick) {
if c == nil {
return
}
c.mu.RLock()
agentInfo := c.agentInfo
c.mu.RUnlock()
c.impl.RecordVNCSessionTick(ctx, agentInfo, tick)
}
// RecordLoginDuration records how long the login to management server took
func (c *ClientMetrics) RecordLoginDuration(ctx context.Context, duration time.Duration, success bool) {
if c == nil {

View File

@@ -73,6 +73,9 @@ func (m *mockMetrics) RecordSyncDuration(_ context.Context, _ AgentInfo, _ time.
func (m *mockMetrics) RecordLoginDuration(_ context.Context, _ AgentInfo, _ time.Duration, _ bool) {
}
func (m *mockMetrics) RecordVNCSessionTick(_ context.Context, _ AgentInfo, _ VNCSessionTick) {
}
func (m *mockMetrics) Export(w io.Writer) error {
if m.exportData != "" {
_, err := w.Write([]byte(m.exportData))

View File

@@ -0,0 +1,187 @@
//go:build !js && !ios && !android
package server
import (
"net"
"sync"
"sync/atomic"
"time"
)
// SessionTick is one sampling slice of a VNC session's wire activity.
// BytesOut / Writes / FBUs are deltas observed during this tick;
// Max* fields are the high-water marks observed during this tick (reset
// at the start of the next). Period is the wall-clock duration covered
// (typically sessionTickInterval, shorter for the final flush).
type SessionTick struct {
Period time.Duration
BytesOut uint64
Writes uint64
FBUs uint64
MaxFBUBytes uint64
MaxFBURects uint64
MaxWriteBytes uint64
WriteNanos uint64
}
// sessionTickInterval is how often metricsConn emits a SessionTick. One
// second matches noVNC's request cadence so each tick covers roughly one
// FBU round-trip during steady-state activity.
const sessionTickInterval = time.Second
// metricsConn wraps a net.Conn and tracks per-session byte / write / FBU
// counters. Updates are atomic so the cost is a few atomic ops per Write
// (well under 100 ns), negligible against the syscall itself, so the wrap
// is always installed. A goroutine emits a SessionTick to the recorder
// every sessionTickInterval (only when the tick has activity to report);
// a final partial-tick flush runs on Close.
type metricsConn struct {
net.Conn
recorder func(SessionTick)
bytesOut uint64
writes uint64
writeNanos uint64
largestPkt uint64
fbus uint64
fbuBytes uint64
fbuRects uint64
maxFBUBytes uint64
maxFBURects uint64
tickMu sync.Mutex
tickStart time.Time
tickPrevB uint64
tickPrevW uint64
tickPrevF uint64
tickPrevNS uint64
closeOnce sync.Once
done chan struct{}
}
func newMetricsConn(c net.Conn, recorder func(SessionTick)) net.Conn {
m := &metricsConn{
Conn: c,
recorder: recorder,
tickStart: time.Now(),
done: make(chan struct{}),
}
if recorder != nil {
go m.tickLoop()
}
return m
}
// tickLoop emits a SessionTick every sessionTickInterval until done.
// Empty ticks (no writes since the last tick) are skipped.
func (m *metricsConn) tickLoop() {
t := time.NewTicker(sessionTickInterval)
defer t.Stop()
for {
select {
case <-m.done:
return
case <-t.C:
m.flushTick(false)
}
}
}
// flushTick computes deltas since the last tick, resets the per-tick max
// trackers, and emits a SessionTick to the recorder. final=true forces
// emission even if no writes happened (used at session close to record
// the trailing partial period).
func (m *metricsConn) flushTick(final bool) {
m.tickMu.Lock()
defer m.tickMu.Unlock()
b := atomic.LoadUint64(&m.bytesOut)
w := atomic.LoadUint64(&m.writes)
f := atomic.LoadUint64(&m.fbus)
ns := atomic.LoadUint64(&m.writeNanos)
db := b - m.tickPrevB
dw := w - m.tickPrevW
df := f - m.tickPrevF
dns := ns - m.tickPrevNS
m.tickPrevB, m.tickPrevW, m.tickPrevF, m.tickPrevNS = b, w, f, ns
maxFBU := atomic.SwapUint64(&m.maxFBUBytes, 0)
maxRects := atomic.SwapUint64(&m.maxFBURects, 0)
maxPkt := atomic.SwapUint64(&m.largestPkt, 0)
period := time.Since(m.tickStart)
m.tickStart = time.Now()
if dw == 0 && !final {
return
}
m.recorder(SessionTick{
Period: period,
BytesOut: db,
Writes: dw,
FBUs: df,
MaxFBUBytes: maxFBU,
MaxFBURects: maxRects,
MaxWriteBytes: maxPkt,
WriteNanos: dns,
})
}
// isFBUHeader reports whether the given Write payload is the 4-byte
// FramebufferUpdate header (message type 0, padding 0, rect-count high
// byte). Rect bodies are written separately by sendDirtyAndMoves, so the
// FBU/rect boundary lines up with Write boundaries.
func isFBUHeader(p []byte) bool {
return len(p) == 4 && p[0] == serverFramebufferUpdate
}
func (m *metricsConn) Write(p []byte) (int, error) {
if isFBUHeader(p) {
if b := atomic.SwapUint64(&m.fbuBytes, 0); b > 0 {
if b > atomic.LoadUint64(&m.maxFBUBytes) {
atomic.StoreUint64(&m.maxFBUBytes, b)
}
}
if r := atomic.SwapUint64(&m.fbuRects, 0); r > 0 {
if r > atomic.LoadUint64(&m.maxFBURects) {
atomic.StoreUint64(&m.maxFBURects, r)
}
}
atomic.AddUint64(&m.fbus, 1)
}
t0 := time.Now()
n, err := m.Conn.Write(p)
atomic.AddUint64(&m.writeNanos, uint64(time.Since(t0).Nanoseconds()))
atomic.AddUint64(&m.bytesOut, uint64(n))
atomic.AddUint64(&m.writes, 1)
if !isFBUHeader(p) {
atomic.AddUint64(&m.fbuBytes, uint64(n))
atomic.AddUint64(&m.fbuRects, 1)
}
if uint64(n) > atomic.LoadUint64(&m.largestPkt) {
atomic.StoreUint64(&m.largestPkt, uint64(n))
}
return n, err
}
func (m *metricsConn) Close() error {
m.closeOnce.Do(func() {
close(m.done)
if m.recorder == nil {
return
}
if b := atomic.SwapUint64(&m.fbuBytes, 0); b > atomic.LoadUint64(&m.maxFBUBytes) {
atomic.StoreUint64(&m.maxFBUBytes, b)
}
if r := atomic.SwapUint64(&m.fbuRects, 0); r > atomic.LoadUint64(&m.maxFBURects) {
atomic.StoreUint64(&m.maxFBURects, r)
}
m.flushTick(true)
})
return m.Conn.Close()
}

View File

@@ -156,9 +156,15 @@ type Server struct {
netstackNet *netstack.Net
agentToken []byte // raw token bytes for agent-mode auth
sessionsMu sync.Mutex
sessionSeq uint64
sessions map[uint64]ActiveSessionInfo
sessionsMu sync.Mutex
sessionSeq uint64
sessions map[uint64]ActiveSessionInfo
sessionConns map[uint64]net.Conn
// sessionRecorder, when non-nil, receives a SessionTick periodically
// during each VNC session and on session close. The engine wires
// this to its metrics framework.
sessionRecorder func(SessionTick)
}
// ActiveSessionInfo describes a currently connected VNC client.
@@ -195,7 +201,8 @@ func New(capturer ScreenCapturer, injector InputInjector) *Server {
injector: injector,
authorizer: sshauth.NewAuthorizer(),
log: log.WithField("component", "vnc-server"),
sessions: make(map[uint64]ActiveSessionInfo),
sessions: make(map[uint64]ActiveSessionInfo),
sessionConns: make(map[uint64]net.Conn),
}
}
@@ -210,12 +217,13 @@ func (s *Server) ActiveSessions() []ActiveSessionInfo {
return out
}
func (s *Server) addSession(info ActiveSessionInfo) uint64 {
func (s *Server) addSession(info ActiveSessionInfo, conn net.Conn) uint64 {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
s.sessionSeq++
id := s.sessionSeq
s.sessions[id] = info
s.sessionConns[id] = conn
return id
}
@@ -223,6 +231,24 @@ func (s *Server) removeSession(id uint64) {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
delete(s.sessions, id)
delete(s.sessionConns, id)
}
// closeActiveSessions closes every active session's connection so the
// per-session serve goroutines unblock from their Read loops and exit.
// Called from Stop to make sure clients see an immediate disconnect when
// the server is brought down, instead of waiting for the OS to reclaim
// the sockets after process exit.
func (s *Server) closeActiveSessions() {
s.sessionsMu.Lock()
conns := make([]net.Conn, 0, len(s.sessionConns))
for _, c := range s.sessionConns {
conns = append(conns, c)
}
s.sessionsMu.Unlock()
for _, c := range conns {
_ = c.Close()
}
}
// SetServiceMode enables proxy-to-agent mode for Windows service operation.
@@ -230,6 +256,14 @@ func (s *Server) SetServiceMode(enabled bool) {
s.serviceMode = enabled
}
// SetSessionRecorder installs a callback that receives a SessionTick
// each sessionTickInterval during a VNC session and one final tick on
// session close. Pass nil to disable. Empty ticks (no wire activity)
// are skipped.
func (s *Server) SetSessionRecorder(recorder func(SessionTick)) {
s.sessionRecorder = recorder
}
// SetJWTConfig configures JWT authentication for VNC connections.
// Pass nil to disable JWT (public mode).
func (s *Server) SetJWTConfig(config *JWTConfig) {
@@ -340,6 +374,13 @@ func (s *Server) Stop() error {
s.cancel = nil
}
// Close active client connections before tearing down capturers and
// listeners. The per-session serve goroutines unblock from their Read
// loop with an error and run their deferred conn.Close, which surfaces
// a clean disconnect on the client side instead of leaving the
// connection hanging until the OS reclaims it on process exit.
s.closeActiveSessions()
if s.vmgr != nil {
s.vmgr.StopAll()
}
@@ -378,10 +419,36 @@ func (s *Server) acceptLoop() {
continue
}
enableTCPKeepAlive(conn, s.log)
go s.handleConnection(conn)
}
}
// vncKeepAlivePeriod controls how often TCP layer probes are sent on an
// idle connection. Default OS settings (2 hours) are too long for an
// interactive session: when the server-side host dies without sending FIN
// (power loss, network partition, hung kernel), the client only learns of
// the dead connection when the OS gives up on a probe. 30 s here means
// most clients notice within ~3 minutes worst case.
const vncKeepAlivePeriod = 30 * time.Second
// enableTCPKeepAlive turns on SO_KEEPALIVE on the underlying TCP socket.
// Non-TCP conns (e.g. the netstack-backed listener) are skipped silently;
// keepalive there is the netstack's concern.
func enableTCPKeepAlive(c net.Conn, log *log.Entry) {
tc, ok := c.(*net.TCPConn)
if !ok {
return
}
if err := tc.SetKeepAlive(true); err != nil {
log.Debugf("set keepalive: %v", err)
return
}
if err := tc.SetKeepAlivePeriod(vncKeepAlivePeriod); err != nil {
log.Debugf("set keepalive period: %v", err)
}
}
func (s *Server) validateCapturer(capturer ScreenCapturer) error {
// Quick check first: if already ready, return immediately.
if capturer.Width() > 0 && capturer.Height() > 0 {
@@ -472,7 +539,7 @@ func (s *Server) handleConnection(conn net.Conn) {
Mode: modeString(header.mode),
Username: header.username,
JWTUsername: jwtUserID,
})
}, conn)
defer s.removeSession(sessionID)
if err := s.validateCapturer(capturer); err != nil {
@@ -481,6 +548,7 @@ func (s *Server) handleConnection(conn net.Conn) {
return
}
conn = newMetricsConn(conn, s.sessionRecorder)
sess := &session{
conn: conn,
capturer: capturer,

View File

@@ -255,6 +255,8 @@ func (s *Server) serviceAcceptLoop() {
continue
}
enableTCPKeepAlive(conn, s.log)
conn = newMetricsConn(conn, s.sessionRecorder)
go s.handleServiceConnection(conn, sm)
}
}

View File

@@ -691,10 +691,10 @@ func createStartCaptureMethod(client *netbird.Client) js.Func {
//
// Usage from browser devtools console:
//
// await client.capture() // capture all packets
// await client.capture("tcp") // capture with filter
// await client.capture({filter: "host 10.0.0.1", verbose: true})
// client.stopCapture() // stop and print stats
// await netbird.capture() // capture all packets
// await netbird.capture("tcp") // capture with filter
// await netbird.capture({filter: "host 10.0.0.1", verbose: true})
// netbird.stopCapture() // stop and print stats
func captureMethods(client *netbird.Client) (startFn, stopFn js.Func) {
var mu sync.Mutex
var active *wasmcapture.Handle
@@ -722,7 +722,7 @@ func captureMethods(client *netbird.Client) (startFn, stopFn js.Func) {
active = h
console := js.Global().Get("console")
console.Call("log", "[capture] started, call client.stopCapture() to stop")
console.Call("log", "[capture] started, call netbird.stopCapture() to stop")
resolve.Invoke(js.Undefined())
})
})

View File

@@ -4,6 +4,7 @@ package vnc
import (
"context"
"errors"
"fmt"
"io"
"net"
@@ -23,6 +24,15 @@ const (
// Connection modes matching server/server.go constants.
modeAttach byte = 0
modeSession byte = 1
// WebSocket close codes the dashboard branches on. Codes 1000-1015
// are reserved by RFC 6455; 4000-4999 are application-defined.
wsCodeNormal = 1000
wsCodeAbnormal = 1006
wsCodeDialTimeout = 4001
wsCodeDialFailure = 4002
wsCodeSessionSetup = 4003
wsCodeTransport = 4004
)
// VNCProxy bridges WebSocket connections from noVNC in the browser
@@ -245,8 +255,12 @@ func (p *VNCProxy) connectToVNC(conn *vncConnection) {
if err != nil {
log.Errorf("VNC connect to %s: %v", conn.destination.address, err)
// Close the WebSocket so noVNC fires a disconnect event.
code := wsCodeDialFailure
if errors.Is(err, context.DeadlineExceeded) {
code = wsCodeDialTimeout
}
if conn.wsHandlers.Get("close").Truthy() {
conn.wsHandlers.Call("close", 1006, fmt.Sprintf("connect to peer: %v", err))
conn.wsHandlers.Call("close", code, fmt.Sprintf("connect to peer: %v", err))
}
p.cleanupConnection(conn)
return
@@ -259,7 +273,7 @@ func (p *VNCProxy) connectToVNC(conn *vncConnection) {
if err := p.sendSessionHeader(vncConn, conn.destination); err != nil {
log.Errorf("send VNC session header: %v", err)
if conn.wsHandlers.Get("close").Truthy() {
conn.wsHandlers.Call("close", 1006, fmt.Sprintf("send session header: %v", err))
conn.wsHandlers.Call("close", wsCodeSessionSetup, fmt.Sprintf("send session header: %v", err))
}
p.cleanupConnection(conn)
return
@@ -359,24 +373,23 @@ func (c *vncConnection) snapshotVNC() (net.Conn, bool) {
}
// handleConnReadError classifies an error from the VNC read loop. Returns
// true if the caller should exit; false to retry (transient timeout).
// true if the caller should exit and trigger the cleanup path. A read
// timeout counts as a fatal error: in a healthy session the server emits
// empty FramebufferUpdate responses several times per second, so a full
// idleReadDeadline of silence means the peer is dead (process gone,
// machine off, network partition) and the in-browser TCP stack will
// never surface that on its own.
func (p *VNCProxy) handleConnReadError(conn *vncConnection, err error) bool {
if conn.ctx.Err() != nil {
return true
}
if netErr, ok := err.(interface{ Timeout() bool }); ok && netErr.Timeout() {
// Read timeout: connection might be stale. The next iteration will
// fail too and trigger the close path.
return false
}
if err != io.EOF {
log.Debugf("VNC read deadline expired; treating peer as dead")
} else if err != io.EOF {
log.Debugf("read from VNC connection: %v", err)
}
// Close the WebSocket to notify noVNC, and cancel the local context so
// cleanupConnection isn't left waiting on the JS close callback that
// may never fire on hard errors.
if conn.wsHandlers.Get("close").Truthy() {
conn.wsHandlers.Call("close", 1006, "VNC connection lost")
conn.wsHandlers.Call("close", wsCodeTransport, "VNC connection lost")
}
conn.cancel()
return true