terminal_services: refactor collector (#1729)

This commit is contained in:
Jan-Otto Kröpke
2024-11-13 21:38:31 +01:00
committed by GitHub
parent b4f50c542c
commit f332361723
5 changed files with 158 additions and 111 deletions

View File

@@ -0,0 +1,23 @@
package terminal_services
const (
HandleCount = "Handle Count"
PageFaultsPersec = "Page Faults/sec"
PageFileBytes = "Page File Bytes"
PageFileBytesPeak = "Page File Bytes Peak"
PercentPrivilegedTime = "% Privileged Time"
PercentProcessorTime = "% Processor Time"
PercentUserTime = "% User Time"
PoolNonpagedBytes = "Pool Nonpaged Bytes"
PoolPagedBytes = "Pool Paged Bytes"
PrivateBytes = "Private Bytes"
ThreadCount = "Thread Count"
VirtualBytes = "Virtual Bytes"
VirtualBytesPeak = "Virtual Bytes Peak"
WorkingSet = "Working Set"
WorkingSetPeak = "Working Set Peak"
SuccessfulConnections = "Successful Connections"
PendingConnections = "Pending Connections"
FailedConnections = "Failed Connections"
)

View File

@@ -12,7 +12,8 @@ import (
"github.com/alecthomas/kingpin/v2" "github.com/alecthomas/kingpin/v2"
"github.com/prometheus-community/windows_exporter/internal/headers/wtsapi32" "github.com/prometheus-community/windows_exporter/internal/headers/wtsapi32"
"github.com/prometheus-community/windows_exporter/internal/mi" "github.com/prometheus-community/windows_exporter/internal/mi"
v1 "github.com/prometheus-community/windows_exporter/internal/perfdata/v1" "github.com/prometheus-community/windows_exporter/internal/perfdata"
"github.com/prometheus-community/windows_exporter/internal/perfdata/perftypes"
"github.com/prometheus-community/windows_exporter/internal/types" "github.com/prometheus-community/windows_exporter/internal/types"
"github.com/prometheus-community/windows_exporter/internal/utils" "github.com/prometheus-community/windows_exporter/internal/utils"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@@ -32,7 +33,7 @@ type Win32_ServerFeature struct {
ID uint32 ID uint32
} }
func isConnectionBrokerServer(logger *slog.Logger, miSession *mi.Session) bool { func isConnectionBrokerServer(miSession *mi.Session) bool {
var dst []Win32_ServerFeature var dst []Win32_ServerFeature
if err := miSession.Query(&dst, mi.NamespaceRootCIMv2, utils.Must(mi.NewQuery("SELECT * FROM Win32_ServerFeature"))); err != nil { if err := miSession.Query(&dst, mi.NamespaceRootCIMv2, utils.Must(mi.NewQuery("SELECT * FROM Win32_ServerFeature"))); err != nil {
return false return false
@@ -44,8 +45,6 @@ func isConnectionBrokerServer(logger *slog.Logger, miSession *mi.Session) bool {
} }
} }
logger.Debug("host is not a connection broker skipping Connection Broker performance metrics.")
return false return false
} }
@@ -58,6 +57,9 @@ type Collector struct {
connectionBrokerEnabled bool connectionBrokerEnabled bool
perfDataCollectorTerminalServicesSession perfdata.Collector
perfDataCollectorBroker perfdata.Collector
hServer windows.Handle hServer windows.Handle
sessionInfo *prometheus.Desc sessionInfo *prometheus.Desc
@@ -98,10 +100,7 @@ func (c *Collector) GetName() string {
} }
func (c *Collector) GetPerfCounter(_ *slog.Logger) ([]string, error) { func (c *Collector) GetPerfCounter(_ *slog.Logger) ([]string, error) {
return []string{ return []string{}, nil
"Terminal Services Session",
"Remote Desktop Connection Broker Counterset",
}, nil
} }
func (c *Collector) Close(_ *slog.Logger) error { func (c *Collector) Close(_ *slog.Logger) error {
@@ -110,6 +109,12 @@ func (c *Collector) Close(_ *slog.Logger) error {
return fmt.Errorf("failed to close WTS server: %w", err) return fmt.Errorf("failed to close WTS server: %w", err)
} }
c.perfDataCollectorTerminalServicesSession.Close()
if c.connectionBrokerEnabled {
c.perfDataCollectorBroker.Close()
}
return nil return nil
} }
@@ -120,7 +125,49 @@ func (c *Collector) Build(logger *slog.Logger, miSession *mi.Session) error {
logger = logger.With(slog.String("collector", Name)) logger = logger.With(slog.String("collector", Name))
c.connectionBrokerEnabled = isConnectionBrokerServer(logger, miSession) counters := []string{
HandleCount,
PageFaultsPersec,
PageFileBytes,
PageFileBytesPeak,
PercentPrivilegedTime,
PercentProcessorTime,
PercentUserTime,
PoolNonpagedBytes,
PoolPagedBytes,
PrivateBytes,
ThreadCount,
VirtualBytes,
VirtualBytesPeak,
WorkingSet,
WorkingSetPeak,
}
var err error
c.perfDataCollectorTerminalServicesSession, err = perfdata.NewCollector(perfdata.V2, "Terminal Services Session", perfdata.AllInstances, counters)
if err != nil {
return fmt.Errorf("failed to create Terminal Services Session collector: %w", err)
}
c.connectionBrokerEnabled = isConnectionBrokerServer(miSession)
if c.connectionBrokerEnabled {
counters = []string{
SuccessfulConnections,
PendingConnections,
FailedConnections,
}
var err error
c.perfDataCollectorBroker, err = perfdata.NewCollector(perfdata.V2, "Remote Desktop Connection Broker Counterset", perfdata.AllInstances, counters)
if err != nil {
return fmt.Errorf("failed to create Remote Desktop Connection Broker Counterset collector: %w", err)
}
} else {
logger.Debug("host is not a connection broker skipping Connection Broker performance metrics.")
}
c.sessionInfo = prometheus.NewDesc( c.sessionInfo = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "session_info"), prometheus.BuildFQName(types.Namespace, Name, "session_info"),
@@ -213,8 +260,6 @@ func (c *Collector) Build(logger *slog.Logger, miSession *mi.Session) error {
nil, nil,
) )
var err error
c.hServer, err = wtsapi32.WTSOpenServer("") c.hServer, err = wtsapi32.WTSOpenServer("")
if err != nil { if err != nil {
return fmt.Errorf("failed to open WTS server: %w", err) return fmt.Errorf("failed to open WTS server: %w", err)
@@ -225,71 +270,40 @@ func (c *Collector) Build(logger *slog.Logger, miSession *mi.Session) error {
// Collect sends the metric values for each metric // Collect sends the metric values for each metric
// to the provided prometheus Metric channel. // to the provided prometheus Metric channel.
func (c *Collector) Collect(ctx *types.ScrapeContext, logger *slog.Logger, ch chan<- prometheus.Metric) error { func (c *Collector) Collect(_ *types.ScrapeContext, logger *slog.Logger, ch chan<- prometheus.Metric) error {
logger = logger.With(slog.String("collector", Name)) logger = logger.With(slog.String("collector", Name))
if err := c.collectWTSSessions(logger, ch); err != nil {
logger.Error("failed collecting terminal services session infos",
slog.Any("err", err),
)
return err errs := make([]error, 0, 3)
if err := c.collectWTSSessions(logger, ch); err != nil {
errs = append(errs, fmt.Errorf("failed collecting terminal services session infos: %w", err))
} }
if err := c.collectTSSessionCounters(ctx, logger, ch); err != nil { if err := c.collectTSSessionCounters(ch); err != nil {
logger.Error("failed collecting terminal services session count metrics", errs = append(errs, fmt.Errorf("failed collecting terminal services session count metrics: %w", err))
slog.Any("err", err),
)
return err
} }
// only collect CollectionBrokerPerformance if host is a Connection Broker // only collect CollectionBrokerPerformance if host is a Connection Broker
if c.connectionBrokerEnabled { if c.connectionBrokerEnabled {
if err := c.collectCollectionBrokerPerformanceCounter(ctx, logger, ch); err != nil { if err := c.collectCollectionBrokerPerformanceCounter(ch); err != nil {
logger.Error("failed collecting Connection Broker performance metrics", errs = append(errs, fmt.Errorf("failed collecting Connection Broker performance metrics: %w", err))
slog.Any("err", err),
)
return err
} }
} }
return nil return errors.Join(errs...)
} }
type perflibTerminalServicesSession struct { func (c *Collector) collectTSSessionCounters(ch chan<- prometheus.Metric) error {
Name string perfData, err := c.perfDataCollectorTerminalServicesSession.Collect()
HandleCount float64 `perflib:"Handle Count"`
PageFaultsPersec float64 `perflib:"Page Faults/sec"`
PageFileBytes float64 `perflib:"Page File Bytes"`
PageFileBytesPeak float64 `perflib:"Page File Bytes Peak"`
PercentPrivilegedTime float64 `perflib:"% Privileged Time"`
PercentProcessorTime float64 `perflib:"% Processor Time"`
PercentUserTime float64 `perflib:"% User Time"`
PoolNonpagedBytes float64 `perflib:"Pool Nonpaged Bytes"`
PoolPagedBytes float64 `perflib:"Pool Paged Bytes"`
PrivateBytes float64 `perflib:"Private Bytes"`
ThreadCount float64 `perflib:"Thread Count"`
VirtualBytes float64 `perflib:"Virtual Bytes"`
VirtualBytesPeak float64 `perflib:"Virtual Bytes Peak"`
WorkingSet float64 `perflib:"Working Set"`
WorkingSetPeak float64 `perflib:"Working Set Peak"`
}
func (c *Collector) collectTSSessionCounters(ctx *types.ScrapeContext, logger *slog.Logger, ch chan<- prometheus.Metric) error {
logger = logger.With(slog.String("collector", Name))
dst := make([]perflibTerminalServicesSession, 0)
err := v1.UnmarshalObject(ctx.PerfObjects["Terminal Services Session"], &dst, logger)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to collect Terminal Services Session metrics: %w", err)
} }
names := make(map[string]bool) names := make(map[string]bool)
for _, d := range dst { for name, data := range perfData {
// only connect metrics for remote named sessions // only connect metrics for remote named sessions
n := strings.ToLower(d.Name) n := strings.ToLower(name)
if n == "" || n == "services" || n == "console" { if n == "" || n == "services" || n == "console" {
continue continue
} }
@@ -303,138 +317,130 @@ func (c *Collector) collectTSSessionCounters(ctx *types.ScrapeContext, logger *s
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.handleCount, c.handleCount,
prometheus.GaugeValue, prometheus.GaugeValue,
d.HandleCount, data[HandleCount].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.pageFaultsPerSec, c.pageFaultsPerSec,
prometheus.CounterValue, prometheus.CounterValue,
d.PageFaultsPersec, data[PageFaultsPersec].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.pageFileBytes, c.pageFileBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
d.PageFileBytes, data[PageFileBytes].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.pageFileBytesPeak, c.pageFileBytesPeak,
prometheus.GaugeValue, prometheus.GaugeValue,
d.PageFileBytesPeak, data[PageFileBytesPeak].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.percentCPUTime, c.percentCPUTime,
prometheus.CounterValue, prometheus.CounterValue,
d.PercentPrivilegedTime, data[PercentPrivilegedTime].FirstValue,
d.Name, name,
"privileged", "privileged",
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.percentCPUTime, c.percentCPUTime,
prometheus.CounterValue, prometheus.CounterValue,
d.PercentProcessorTime, data[PercentProcessorTime].FirstValue,
d.Name, name,
"processor", "processor",
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.percentCPUTime, c.percentCPUTime,
prometheus.CounterValue, prometheus.CounterValue,
d.PercentUserTime, data[PercentUserTime].FirstValue,
d.Name, name,
"user", "user",
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.poolNonPagedBytes, c.poolNonPagedBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
d.PoolNonpagedBytes, data[PoolNonpagedBytes].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.poolPagedBytes, c.poolPagedBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
d.PoolPagedBytes, data[PoolPagedBytes].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.privateBytes, c.privateBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
d.PrivateBytes, data[PrivateBytes].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.threadCount, c.threadCount,
prometheus.GaugeValue, prometheus.GaugeValue,
d.ThreadCount, data[ThreadCount].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.virtualBytes, c.virtualBytes,
prometheus.GaugeValue, prometheus.GaugeValue,
d.VirtualBytes, data[VirtualBytes].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.virtualBytesPeak, c.virtualBytesPeak,
prometheus.GaugeValue, prometheus.GaugeValue,
d.VirtualBytesPeak, data[VirtualBytesPeak].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.workingSet, c.workingSet,
prometheus.GaugeValue, prometheus.GaugeValue,
d.WorkingSet, data[WorkingSet].FirstValue,
d.Name, name,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.workingSetPeak, c.workingSetPeak,
prometheus.GaugeValue, prometheus.GaugeValue,
d.WorkingSetPeak, data[WorkingSetPeak].FirstValue,
d.Name, name,
) )
} }
return nil return nil
} }
type perflibRemoteDesktopConnectionBrokerCounterset struct { func (c *Collector) collectCollectionBrokerPerformanceCounter(ch chan<- prometheus.Metric) error {
SuccessfulConnections float64 `perflib:"Successful Connections"` perfData, err := c.perfDataCollectorBroker.Collect()
PendingConnections float64 `perflib:"Pending Connections"`
FailedConnections float64 `perflib:"Failed Connections"`
}
func (c *Collector) collectCollectionBrokerPerformanceCounter(ctx *types.ScrapeContext, logger *slog.Logger, ch chan<- prometheus.Metric) error {
logger = logger.With(slog.String("collector", Name))
dst := make([]perflibRemoteDesktopConnectionBrokerCounterset, 0)
err := v1.UnmarshalObject(ctx.PerfObjects["Remote Desktop Connection Broker Counterset"], &dst, logger)
if err != nil { if err != nil {
return err return fmt.Errorf("failed to collect Remote Desktop Connection Broker Counterset metrics: %w", err)
} }
if len(dst) == 0 { data, ok := perfData[perftypes.EmptyInstance]
return errors.New("WMI query returned empty result set") if !ok {
return errors.New("query for Remote Desktop Connection Broker Counterset returned empty result set")
} }
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.connectionBrokerPerformance, c.connectionBrokerPerformance,
prometheus.CounterValue, prometheus.CounterValue,
dst[0].SuccessfulConnections, data[SuccessfulConnections].FirstValue,
"Successful", "Successful",
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.connectionBrokerPerformance, c.connectionBrokerPerformance,
prometheus.CounterValue, prometheus.CounterValue,
dst[0].PendingConnections, data[PendingConnections].FirstValue,
"Pending", "Pending",
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.connectionBrokerPerformance, c.connectionBrokerPerformance,
prometheus.CounterValue, prometheus.CounterValue,
dst[0].FailedConnections, data[FailedConnections].FirstValue,
"Failed", "Failed",
) )
@@ -448,6 +454,12 @@ func (c *Collector) collectWTSSessions(logger *slog.Logger, ch chan<- prometheus
} }
for _, session := range sessions { for _, session := range sessions {
// only connect metrics for remote named sessions
n := strings.ReplaceAll(session.SessionName, "#", " ")
if n == "" || n == "Services" || n == "Console" {
continue
}
userName := session.UserName userName := session.UserName
if session.DomainName != "" { if session.DomainName != "" {
userName = fmt.Sprintf("%s\\%s", session.DomainName, session.UserName) userName = fmt.Sprintf("%s\\%s", session.DomainName, session.UserName)
@@ -458,12 +470,11 @@ func (c *Collector) collectWTSSessions(logger *slog.Logger, ch chan<- prometheus
if session.State == stateID { if session.State == stateID {
isState = 1.0 isState = 1.0
} }
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.sessionInfo, c.sessionInfo,
prometheus.GaugeValue, prometheus.GaugeValue,
isState, isState,
strings.ReplaceAll(session.SessionName, "#", " "), n,
userName, userName,
session.HostName, session.HostName,
stateName, stateName,

View File

@@ -1,6 +1,7 @@
package wtsapi32 package wtsapi32
import ( import (
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
"unsafe" "unsafe"
@@ -129,7 +130,7 @@ func WTSOpenServer(server string) (windows.Handle, error) {
func WTSCloseServer(server windows.Handle) error { func WTSCloseServer(server windows.Handle) error {
r1, _, err := procWTSCloseServer.Call(uintptr(server)) r1, _, err := procWTSCloseServer.Call(uintptr(server))
if r1 != 1 { if r1 != 1 && !errors.Is(err, windows.ERROR_SUCCESS) {
return fmt.Errorf("failed to close server: %w", err) return fmt.Errorf("failed to close server: %w", err)
} }
@@ -170,8 +171,7 @@ func WTSEnumerateSessionsEx(server windows.Handle, logger *slog.Logger) ([]WTSSe
if sessionInfoPointer != 0 { if sessionInfoPointer != 0 {
defer func(class WTSTypeClass, pMemory uintptr, NumberOfEntries uint32) { defer func(class WTSTypeClass, pMemory uintptr, NumberOfEntries uint32) {
err := WTSFreeMemoryEx(class, pMemory, NumberOfEntries) if err := WTSFreeMemoryEx(class, pMemory, NumberOfEntries); err != nil {
if err != nil {
logger.Warn("failed to free memory", "err", fmt.Errorf("WTSEnumerateSessionsEx: %w", err)) logger.Warn("failed to free memory", "err", fmt.Errorf("WTSEnumerateSessionsEx: %w", err))
} }
}(WTSTypeSessionInfoLevel1, sessionInfoPointer, count) }(WTSTypeSessionInfoLevel1, sessionInfoPointer, count)

View File

@@ -5,8 +5,10 @@ package mi
import ( import (
"errors" "errors"
"fmt" "fmt"
"math"
"reflect" "reflect"
"sync" "sync"
"time"
"unsafe" "unsafe"
"golang.org/x/sys/windows" "golang.org/x/sys/windows"
@@ -14,6 +16,10 @@ import (
// We have to registry a global callback function, since the amount of callbacks is limited. // We have to registry a global callback function, since the amount of callbacks is limited.
var operationUnmarshalCallbacksInstanceResult = sync.OnceValue[uintptr](func() uintptr { var operationUnmarshalCallbacksInstanceResult = sync.OnceValue[uintptr](func() uintptr {
// Workaround for a deadlock issue in go.
// Ref: https://github.com/golang/go/issues/55015
go time.Sleep(time.Duration(math.MaxInt64))
return windows.NewCallback(func( return windows.NewCallback(func(
operation *Operation, operation *Operation,
callbacks *OperationUnmarshalCallbacks, callbacks *OperationUnmarshalCallbacks,

View File

@@ -212,9 +212,16 @@ func (s *Session) QueryUnmarshal(dst any,
errs := make([]error, 0) errs := make([]error, 0)
for err := range errCh { // We need an active go routine to prevent a
if err != nil { // fatal error: all goroutines are asleep - deadlock!
// ref: https://github.com/golang/go/issues/55015
// go time.Sleep(5 * time.Second)
for {
if err, ok := <-errCh; err != nil {
errs = append(errs, err) errs = append(errs, err)
} else if !ok {
break
} }
} }