feat: Support OpenMetrics (#1772)

Signed-off-by: Jan-Otto Kröpke <mail@jkroepke.de>
This commit is contained in:
Jan-Otto Kröpke
2024-11-26 19:43:52 +01:00
committed by GitHub
parent 1a4c6c5ce7
commit c8eeb595c0
9 changed files with 89 additions and 64 deletions

View File

@@ -2,7 +2,7 @@
<configuration default="false" name="all" type="GoApplicationRunConfiguration" factoryName="Go Application" folderName="run"> <configuration default="false" name="all" type="GoApplicationRunConfiguration" factoryName="Go Application" folderName="run">
<module name="windows_exporter" /> <module name="windows_exporter" />
<working_directory value="$PROJECT_DIR$" /> <working_directory value="$PROJECT_DIR$" />
<parameters value="--web.listen-address=127.0.0.1:9182 --log.level=debug --collectors.enabled=ad,adcs,adfs,cache,container,cpu,cpu_info,cs,dfsr,dhcp,diskdrive,dns,exchange,filetime,fsrmquota,hyperv,iis,license,logical_disk,logon,memory,mscluster,msmq,mssql,net,netframework,nps,os,pagefile,perfdata,physical_disk,printer,process,remote_fx,scheduled_task,service,smb,smbclient,smtp,system,tcp,terminal_services,textfile,thermalzone,time,udp,update,vmware" /> <parameters value="--web.listen-address=127.0.0.1:9182 --log.level=info --collectors.enabled=ad,adcs,adfs,cache,container,cpu,cpu_info,cs,dfsr,dhcp,diskdrive,dns,exchange,filetime,fsrmquota,hyperv,iis,license,logical_disk,logon,memory,mscluster,msmq,mssql,net,netframework,nps,os,pagefile,perfdata,physical_disk,printer,process,remote_fx,scheduled_task,service,smb,smbclient,smtp,system,tcp,terminal_services,thermalzone,time,udp,update,vmware --debug.enabled" />
<sudo value="true" /> <sudo value="true" />
<kind value="PACKAGE" /> <kind value="PACKAGE" />
<package value="github.com/prometheus-community/windows_exporter/cmd/windows_exporter" /> <package value="github.com/prometheus-community/windows_exporter/cmd/windows_exporter" />

View File

@@ -199,6 +199,14 @@ Windows Server 2012 and 2012R2 are supported as best-effort only, but not guaran
The prometheus metrics will be exposed on [localhost:9182](http://localhost:9182) The prometheus metrics will be exposed on [localhost:9182](http://localhost:9182)
### HTTP Endpoints
windows_exporter provides the following HTTP endpoints:
* `/metrics`: Exposes metrics in the [Prometheus text format](https://prometheus.io/docs/instrumenting/exposition_formats/).
* `/health`: Returns 200 OK when the exporter is running.
* `/debug/pprof/`: Exposes the [pprof](https://golang.org/pkg/net/http/pprof/) endpoints. Only, if `--debug.enabled` is set.
## Examples ## Examples
### Enable only service collector and specify a custom query ### Enable only service collector and specify a custom query

View File

@@ -87,10 +87,6 @@ func run() int {
"web.disable-exporter-metrics", "web.disable-exporter-metrics",
"Exclude metrics about the exporter itself (promhttp_*, process_*, go_*).", "Exclude metrics about the exporter itself (promhttp_*, process_*, go_*).",
).Bool() ).Bool()
maxRequests = app.Flag(
"telemetry.max-requests",
"Maximum number of concurrent requests. 0 to disable.",
).Default("5").Int()
enabledCollectors = app.Flag( enabledCollectors = app.Flag(
"collectors.enabled", "collectors.enabled",
"Comma-separated list of collectors to use. Use '[defaults]' as a placeholder for all the collectors enabled by default."). "Comma-separated list of collectors to use. Use '[defaults]' as a placeholder for all the collectors enabled by default.").
@@ -220,7 +216,6 @@ func run() int {
mux.Handle("GET "+*metricsPath, httphandler.New(logger, collectors, &httphandler.Options{ mux.Handle("GET "+*metricsPath, httphandler.New(logger, collectors, &httphandler.Options{
DisableExporterMetrics: *disableExporterMetrics, DisableExporterMetrics: *disableExporterMetrics,
TimeoutMargin: *timeoutMargin, TimeoutMargin: *timeoutMargin,
MaxRequests: *maxRequests,
})) }))
if *debugEnabled { if *debugEnabled {

View File

@@ -49,7 +49,6 @@ type MetricsHTTPHandler struct {
type Options struct { type Options struct {
DisableExporterMetrics bool DisableExporterMetrics bool
TimeoutMargin float64 TimeoutMargin float64
MaxRequests int
} }
func New(logger *slog.Logger, metricCollectors *collector.MetricCollectors, options *Options) *MetricsHTTPHandler { func New(logger *slog.Logger, metricCollectors *collector.MetricCollectors, options *Options) *MetricsHTTPHandler {
@@ -57,7 +56,6 @@ func New(logger *slog.Logger, metricCollectors *collector.MetricCollectors, opti
options = &Options{ options = &Options{
DisableExporterMetrics: false, DisableExporterMetrics: false,
TimeoutMargin: 0.5, TimeoutMargin: 0.5,
MaxRequests: 5,
} }
} }
@@ -65,7 +63,9 @@ func New(logger *slog.Logger, metricCollectors *collector.MetricCollectors, opti
metricCollectors: metricCollectors, metricCollectors: metricCollectors,
logger: logger, logger: logger,
options: *options, options: *options,
concurrencyCh: make(chan struct{}, options.MaxRequests),
// We are expose metrics directly from the memory region of the Win32 API. We should not allow more than one request at a time.
concurrencyCh: make(chan struct{}, 1),
} }
if !options.DisableExporterMetrics { if !options.DisableExporterMetrics {
@@ -131,21 +131,11 @@ func (c *MetricsHTTPHandler) handlerFactory(logger *slog.Logger, scrapeTimeout t
if len(requestedCollectors) == 0 { if len(requestedCollectors) == 0 {
metricCollectors = c.metricCollectors metricCollectors = c.metricCollectors
} else { } else {
filteredCollectors := make(collector.Map) var err error
for _, name := range requestedCollectors { metricCollectors, err = c.metricCollectors.CloneWithCollectors(requestedCollectors)
metricCollector, ok := c.metricCollectors.Collectors[name] if err != nil {
if !ok { return nil, fmt.Errorf("couldn't clone metric collectors: %w", err)
return nil, fmt.Errorf("couldn't find collector %s", name)
}
filteredCollectors[name] = metricCollector
}
metricCollectors = &collector.MetricCollectors{
Collectors: filteredCollectors,
MISession: c.metricCollectors.MISession,
PerfCounterQuery: c.metricCollectors.PerfCounterQuery,
} }
} }
@@ -162,8 +152,10 @@ func (c *MetricsHTTPHandler) handlerFactory(logger *slog.Logger, scrapeTimeout t
promhttp.HandlerOpts{ promhttp.HandlerOpts{
ErrorLog: slog.NewLogLogger(logger.Handler(), slog.LevelError), ErrorLog: slog.NewLogLogger(logger.Handler(), slog.LevelError),
ErrorHandling: promhttp.ContinueOnError, ErrorHandling: promhttp.ContinueOnError,
MaxRequestsInFlight: c.options.MaxRequests, MaxRequestsInFlight: 1,
Registry: c.exporterMetricsRegistry, Registry: c.exporterMetricsRegistry,
EnableOpenMetrics: true,
ProcessStartTime: c.metricCollectors.GetStartTime(),
}, },
) )
@@ -178,7 +170,9 @@ func (c *MetricsHTTPHandler) handlerFactory(logger *slog.Logger, scrapeTimeout t
promhttp.HandlerOpts{ promhttp.HandlerOpts{
ErrorLog: slog.NewLogLogger(logger.Handler(), slog.LevelError), ErrorLog: slog.NewLogLogger(logger.Handler(), slog.LevelError),
ErrorHandling: promhttp.ContinueOnError, ErrorHandling: promhttp.ContinueOnError,
MaxRequestsInFlight: c.options.MaxRequests, MaxRequestsInFlight: 1,
EnableOpenMetrics: true,
ProcessStartTime: c.metricCollectors.GetStartTime(),
}, },
) )
} }
@@ -187,10 +181,6 @@ func (c *MetricsHTTPHandler) handlerFactory(logger *slog.Logger, scrapeTimeout t
} }
func (c *MetricsHTTPHandler) withConcurrencyLimit(next http.HandlerFunc) http.HandlerFunc { func (c *MetricsHTTPHandler) withConcurrencyLimit(next http.HandlerFunc) http.HandlerFunc {
if c.options.MaxRequests <= 0 {
return next
}
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
select { select {
case c.concurrencyCh <- struct{}{}: case c.concurrencyCh <- struct{}{}:

View File

@@ -193,6 +193,13 @@ func (c *Collector) Collect() (CounterValues, error) {
} }
func (c *Collector) collectRoutine() { func (c *Collector) collectRoutine() {
var (
itemCount uint32
bytesNeeded uint32
)
buf := make([]byte, 1)
for range c.collectCh { for range c.collectCh {
if ret := PdhCollectQueryData(c.handle); ret != ErrorSuccess { if ret := PdhCollectQueryData(c.handle); ret != ErrorSuccess {
c.counterValuesCh <- nil c.counterValuesCh <- nil
@@ -207,25 +214,24 @@ func (c *Collector) collectRoutine() {
for _, counter := range c.counters { for _, counter := range c.counters {
for _, instance := range counter.Instances { for _, instance := range counter.Instances {
// Get the info with the current buffer size // Get the info with the current buffer size
var itemCount uint32 bytesNeeded = uint32(cap(buf))
// Get the info with the current buffer size for {
bufLen := uint32(0) ret := PdhGetRawCounterArray(instance, &bytesNeeded, &itemCount, &buf[0])
ret := PdhGetRawCounterArray(instance, &bufLen, &itemCount, nil) if ret == ErrorSuccess {
if ret != PdhMoreData { break
return nil, fmt.Errorf("PdhGetRawCounterArray: %w", NewPdhError(ret)) }
}
buf := make([]byte, bufLen) if err := NewPdhError(ret); ret != PdhMoreData && !isKnownCounterDataError(err) {
ret = PdhGetRawCounterArray(instance, &bufLen, &itemCount, &buf[0])
if ret != ErrorSuccess {
if err := NewPdhError(ret); !isKnownCounterDataError(err) {
return nil, fmt.Errorf("PdhGetRawCounterArray: %w", err) return nil, fmt.Errorf("PdhGetRawCounterArray: %w", err)
} }
continue if bytesNeeded <= uint32(cap(buf)) {
return nil, fmt.Errorf("PdhGetRawCounterArray reports buffer too small (%d), but buffer is large enough (%d): %w", uint32(cap(buf)), bytesNeeded, NewPdhError(ret))
}
buf = make([]byte, bytesNeeded)
} }
items := unsafe.Slice((*PdhRawCounterItem)(unsafe.Pointer(&buf[0])), itemCount) items := unsafe.Slice((*PdhRawCounterItem)(unsafe.Pointer(&buf[0])), itemCount)

View File

@@ -61,4 +61,6 @@ func BenchmarkTestCollector(b *testing.B) {
} }
performanceData.Close() performanceData.Close()
b.ReportAllocs()
} }

View File

@@ -19,8 +19,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"log/slog" "log/slog"
"maps"
"slices" "slices"
"sync" "sync"
stdtime "time"
"github.com/alecthomas/kingpin/v2" "github.com/alecthomas/kingpin/v2"
"github.com/prometheus-community/windows_exporter/internal/collector/ad" "github.com/prometheus-community/windows_exporter/internal/collector/ad"
@@ -145,21 +147,21 @@ func NewWithConfig(config Config) *MetricCollectors {
// New To be called by the external libraries for collector initialization. // New To be called by the external libraries for collector initialization.
func New(collectors Map) *MetricCollectors { func New(collectors Map) *MetricCollectors {
return &MetricCollectors{ return &MetricCollectors{
Collectors: collectors, collectors: collectors,
} }
} }
// Enable removes all collectors that not enabledCollectors. // Enable removes all collectors that not enabledCollectors.
func (c *MetricCollectors) Enable(enabledCollectors []string) error { func (c *MetricCollectors) Enable(enabledCollectors []string) error {
for _, name := range enabledCollectors { for _, name := range enabledCollectors {
if _, ok := c.Collectors[name]; !ok { if _, ok := c.collectors[name]; !ok {
return fmt.Errorf("unknown collector %s", name) return fmt.Errorf("unknown collector %s", name)
} }
} }
for name := range c.Collectors { for name := range c.collectors {
if !slices.Contains(enabledCollectors, name) { if !slices.Contains(enabledCollectors, name) {
delete(c.Collectors, name) delete(c.collectors, name)
} }
} }
@@ -168,22 +170,24 @@ func (c *MetricCollectors) Enable(enabledCollectors []string) error {
// Build To be called by the exporter for collector initialization. // Build To be called by the exporter for collector initialization.
func (c *MetricCollectors) Build(logger *slog.Logger) error { func (c *MetricCollectors) Build(logger *slog.Logger) error {
c.startTime = stdtime.Now()
err := c.initMI() err := c.initMI()
if err != nil { if err != nil {
return fmt.Errorf("error from initialize MI: %w", err) return fmt.Errorf("error from initialize MI: %w", err)
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(c.Collectors)) wg.Add(len(c.collectors))
errCh := make(chan error, len(c.Collectors)) errCh := make(chan error, len(c.collectors))
errs := make([]error, 0, len(c.Collectors)) errs := make([]error, 0, len(c.collectors))
for _, collector := range c.Collectors { for _, collector := range c.collectors {
go func() { go func() {
defer wg.Done() defer wg.Done()
if err = collector.Build(logger, c.MISession); err != nil { if err = collector.Build(logger, c.miSession); err != nil {
errCh <- fmt.Errorf("error build collector %s: %w", collector.GetName(), err) errCh <- fmt.Errorf("error build collector %s: %w", collector.GetName(), err)
} }
}() }()
@@ -202,20 +206,20 @@ func (c *MetricCollectors) Build(logger *slog.Logger) error {
// Close To be called by the exporter for collector cleanup. // Close To be called by the exporter for collector cleanup.
func (c *MetricCollectors) Close() error { func (c *MetricCollectors) Close() error {
errs := make([]error, 0, len(c.Collectors)) errs := make([]error, 0, len(c.collectors))
for _, collector := range c.Collectors { for _, collector := range c.collectors {
if err := collector.Close(); err != nil { if err := collector.Close(); err != nil {
errs = append(errs, fmt.Errorf("error from close collector %s: %w", collector.GetName(), err)) errs = append(errs, fmt.Errorf("error from close collector %s: %w", collector.GetName(), err))
} }
} }
app, err := c.MISession.GetApplication() app, err := c.miSession.GetApplication()
if err != nil && !errors.Is(err, mi.ErrNotInitialized) { if err != nil && !errors.Is(err, mi.ErrNotInitialized) {
errs = append(errs, fmt.Errorf("error from get MI application: %w", err)) errs = append(errs, fmt.Errorf("error from get MI application: %w", err))
} }
if err := c.MISession.Close(); err != nil && !errors.Is(err, mi.ErrNotInitialized) { if err := c.miSession.Close(); err != nil && !errors.Is(err, mi.ErrNotInitialized) {
errs = append(errs, fmt.Errorf("error from close MI session: %w", err)) errs = append(errs, fmt.Errorf("error from close MI session: %w", err))
} }
@@ -226,7 +230,7 @@ func (c *MetricCollectors) Close() error {
return errors.Join(errs...) return errors.Join(errs...)
} }
// Close To be called by the exporter for collector cleanup. // initMI To be called by the exporter for collector initialization.
func (c *MetricCollectors) initMI() error { func (c *MetricCollectors) initMI() error {
app, err := mi.Application_Initialize() app, err := mi.Application_Initialize()
if err != nil { if err != nil {
@@ -242,10 +246,29 @@ func (c *MetricCollectors) initMI() error {
return fmt.Errorf("error from set locale: %w", err) return fmt.Errorf("error from set locale: %w", err)
} }
c.MISession, err = app.NewSession(destinationOptions) c.miSession, err = app.NewSession(destinationOptions)
if err != nil { if err != nil {
return fmt.Errorf("error from create NewSession: %w", err) return fmt.Errorf("error from create NewSession: %w", err)
} }
return nil return nil
} }
// CloneWithCollectors To be called by the exporter for collector initialization.
func (c *MetricCollectors) CloneWithCollectors(collectors []string) (*MetricCollectors, error) {
metricCollectors := &MetricCollectors{
collectors: maps.Clone(c.collectors),
miSession: c.miSession,
startTime: c.startTime,
}
if err := metricCollectors.Enable(collectors); err != nil {
return nil, err
}
return metricCollectors, nil
}
func (c *MetricCollectors) GetStartTime() stdtime.Time {
return c.startTime
}

View File

@@ -27,7 +27,7 @@ import (
"github.com/prometheus-community/windows_exporter/internal/mi" "github.com/prometheus-community/windows_exporter/internal/mi"
"github.com/prometheus-community/windows_exporter/internal/perfdata" "github.com/prometheus-community/windows_exporter/internal/perfdata"
types "github.com/prometheus-community/windows_exporter/internal/types" "github.com/prometheus-community/windows_exporter/internal/types"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
) )
@@ -103,15 +103,15 @@ func (p *Prometheus) Collect(ch chan<- prometheus.Metric) {
// WaitGroup to wait for all collectors to finish // WaitGroup to wait for all collectors to finish
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(len(p.metricCollectors.Collectors)) wg.Add(len(p.metricCollectors.collectors))
// Using a channel to collect the status of each collector // Using a channel to collect the status of each collector
// A channel is safe to use concurrently while a map is not // A channel is safe to use concurrently while a map is not
collectorStatusCh := make(chan collectorStatus, len(p.metricCollectors.Collectors)) collectorStatusCh := make(chan collectorStatus, len(p.metricCollectors.collectors))
// Execute all collectors concurrently // Execute all collectors concurrently
// timeout handling is done in the execute function // timeout handling is done in the execute function
for name, metricsCollector := range p.metricCollectors.Collectors { for name, metricsCollector := range p.metricCollectors.collectors {
go func(name string, metricsCollector Collector) { go func(name string, metricsCollector Collector) {
defer wg.Done() defer wg.Done()

View File

@@ -17,6 +17,7 @@ package collector
import ( import (
"log/slog" "log/slog"
"time"
"github.com/alecthomas/kingpin/v2" "github.com/alecthomas/kingpin/v2"
"github.com/prometheus-community/windows_exporter/internal/mi" "github.com/prometheus-community/windows_exporter/internal/mi"
@@ -26,9 +27,9 @@ import (
const DefaultCollectors = "cpu,cs,memory,logical_disk,physical_disk,net,os,service,system" const DefaultCollectors = "cpu,cs,memory,logical_disk,physical_disk,net,os,service,system"
type MetricCollectors struct { type MetricCollectors struct {
Collectors Map collectors Map
MISession *mi.Session miSession *mi.Session
PerfCounterQuery string startTime time.Time
} }
type ( type (