chore: release 0.29.0.rc0 (#1600)

This commit is contained in:
Jan-Otto Kröpke
2024-09-11 00:34:10 +02:00
committed by GitHub
parent 83b0aa8f62
commit f712c07c38
119 changed files with 5113 additions and 2255 deletions

View File

@@ -4,48 +4,71 @@ package collector
import (
"fmt"
"log/slog"
"sync"
"sync/atomic"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus-community/windows_exporter/pkg/types"
"github.com/prometheus/client_golang/prometheus"
)
// Prometheus implements prometheus.Collector for a set of Windows Collectors.
// Interface guard.
var _ prometheus.Collector = (*Prometheus)(nil)
// Prometheus implements prometheus.Collector for a set of Windows MetricCollectors.
type Prometheus struct {
maxScrapeDuration time.Duration
collectors *Collectors
logger log.Logger
logger *slog.Logger
metricCollectors *MetricCollectors
// Base metrics returned by Prometheus
scrapeDurationDesc *prometheus.Desc
scrapeSuccessDesc *prometheus.Desc
scrapeTimeoutDesc *prometheus.Desc
snapshotDuration *prometheus.Desc
scrapeDurationDesc *prometheus.Desc
collectorScrapeDurationDesc *prometheus.Desc
collectorScrapeSuccessDesc *prometheus.Desc
collectorScrapeTimeoutDesc *prometheus.Desc
snapshotDuration *prometheus.Desc
}
// NewPrometheus returns a new Prometheus where the set of Collectors must
type collectorStatus struct {
name string
statusCode collectorStatusCode
}
type collectorStatusCode int
const (
pending collectorStatusCode = iota
success
failed
)
// NewPrometheusCollector returns a new Prometheus where the set of MetricCollectors must
// return metrics within the given timeout.
func NewPrometheus(timeout time.Duration, cs *Collectors, logger log.Logger) *Prometheus {
func (c *MetricCollectors) NewPrometheusCollector(timeout time.Duration, logger *slog.Logger) *Prometheus {
return &Prometheus{
maxScrapeDuration: timeout,
collectors: cs,
metricCollectors: c,
logger: logger,
scrapeDurationDesc: prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, "exporter", "scrape_duration_seconds"),
"windows_exporter: Total scrape duration.",
nil,
nil,
),
collectorScrapeDurationDesc: prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, "exporter", "collector_duration_seconds"),
"windows_exporter: Duration of a collection.",
[]string{"collector"},
nil,
),
scrapeSuccessDesc: prometheus.NewDesc(
collectorScrapeSuccessDesc: prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, "exporter", "collector_success"),
"windows_exporter: Whether the collector was successful.",
[]string{"collector"},
nil,
),
scrapeTimeoutDesc: prometheus.NewDesc(
collectorScrapeTimeoutDesc: prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, "exporter", "collector_timeout"),
"windows_exporter: Whether the collector timed out.",
[]string{"collector"},
@@ -60,133 +83,159 @@ func NewPrometheus(timeout time.Duration, cs *Collectors, logger log.Logger) *Pr
}
}
// Describe sends all the descriptors of the Collectors included to
// the provided channel.
func (coll *Prometheus) Describe(ch chan<- *prometheus.Desc) {
ch <- coll.scrapeDurationDesc
ch <- coll.scrapeSuccessDesc
}
func (p *Prometheus) Describe(_ chan<- *prometheus.Desc) {}
type collectorOutcome int
const (
pending collectorOutcome = iota
success
failed
)
// Collect sends the collected metrics from each of the Collectors to
// Collect sends the collected metrics from each of the MetricCollectors to
// prometheus.
func (coll *Prometheus) Collect(ch chan<- prometheus.Metric) {
func (p *Prometheus) Collect(ch chan<- prometheus.Metric) {
t := time.Now()
scrapeContext, err := coll.collectors.PrepareScrapeContext()
// Scrape Performance Counters for all collectors
scrapeContext, err := p.metricCollectors.PrepareScrapeContext()
ch <- prometheus.MustNewConstMetric(
coll.snapshotDuration,
p.snapshotDuration,
prometheus.GaugeValue,
time.Since(t).Seconds(),
)
if err != nil {
ch <- prometheus.NewInvalidMetric(coll.scrapeSuccessDesc, fmt.Errorf("failed to prepare scrape: %w", err))
ch <- prometheus.NewInvalidMetric(p.collectorScrapeSuccessDesc, fmt.Errorf("failed to prepare scrape: %w", err))
return
}
// WaitGroup to wait for all collectors to finish
wg := sync.WaitGroup{}
wg.Add(len(coll.collectors.collectors))
collectorOutcomes := make(map[string]collectorOutcome)
for name := range coll.collectors.collectors {
collectorOutcomes[name] = pending
}
wg.Add(len(p.metricCollectors.Collectors))
metricsBuffer := make(chan prometheus.Metric)
l := sync.Mutex{}
finished := false
go func() {
for m := range metricsBuffer {
l.Lock()
if !finished {
ch <- m
}
l.Unlock()
}
}()
// Using a channel to collect the status of each collector
// A channel is safe to use concurrently while a map is not
collectorStatusCh := make(chan collectorStatus, len(p.metricCollectors.Collectors))
for name, c := range coll.collectors.collectors {
go func(name string, c Collector) {
// Execute all collectors concurrently
// timeout handling is done in the execute function
for name, metricsCollector := range p.metricCollectors.Collectors {
go func(name string, metricsCollector Collector) {
defer wg.Done()
outcome := coll.execute(coll.logger, name, c, scrapeContext, metricsBuffer)
l.Lock()
if !finished {
collectorOutcomes[name] = outcome
collectorStatusCh <- collectorStatus{
name: name,
statusCode: p.execute(name, metricsCollector, scrapeContext, ch),
}
l.Unlock()
}(name, c)
}(name, metricsCollector)
}
allDone := make(chan struct{})
go func() {
wg.Wait()
close(allDone)
close(metricsBuffer)
}()
// Wait for all collectors to finish
wg.Wait()
// Wait until either all Collectors finish, or timeout expires
select {
case <-allDone:
case <-time.After(coll.maxScrapeDuration):
}
// Close the channel since we are done writing to it
close(collectorStatusCh)
l.Lock()
finished = true
remainingCollectorNames := make([]string, 0)
for name, outcome := range collectorOutcomes {
for status := range collectorStatusCh {
var successValue, timeoutValue float64
if outcome == pending {
if status.statusCode == pending {
timeoutValue = 1.0
remainingCollectorNames = append(remainingCollectorNames, name)
}
if outcome == success {
if status.statusCode == success {
successValue = 1.0
}
ch <- prometheus.MustNewConstMetric(
coll.scrapeSuccessDesc,
p.collectorScrapeSuccessDesc,
prometheus.GaugeValue,
successValue,
name,
status.name,
)
ch <- prometheus.MustNewConstMetric(
coll.scrapeTimeoutDesc,
p.collectorScrapeTimeoutDesc,
prometheus.GaugeValue,
timeoutValue,
name,
status.name,
)
}
if len(remainingCollectorNames) > 0 {
_ = level.Warn(coll.logger).Log("msg", fmt.Sprintf("Collection timed out, still waiting for %v", remainingCollectorNames))
}
l.Unlock()
ch <- prometheus.MustNewConstMetric(
p.scrapeDurationDesc,
prometheus.GaugeValue,
time.Since(t).Seconds(),
)
}
func (coll *Prometheus) execute(logger log.Logger, name string, c Collector, ctx *types.ScrapeContext, ch chan<- prometheus.Metric) collectorOutcome {
t := time.Now()
err := c.Collect(ctx, logger, ch)
duration := time.Since(t).Seconds()
ch <- prometheus.MustNewConstMetric(
coll.scrapeDurationDesc,
prometheus.GaugeValue,
duration,
name,
func (p *Prometheus) execute(name string, c Collector, ctx *types.ScrapeContext, ch chan<- prometheus.Metric) collectorStatusCode {
var (
err error
duration time.Duration
timeout atomic.Bool
)
// bufCh is a buffer channel to store the metrics
// This is needed because once timeout is reached, the prometheus registry channel is closed.
bufCh := make(chan prometheus.Metric, 10)
errCh := make(chan error, 1)
// Execute the collector
go func() {
errCh <- c.Collect(ctx, p.logger, bufCh)
close(bufCh)
}()
go func() {
defer func() {
// This prevents a panic from race-condition when closing the ch channel too early.
_ = recover()
}()
// Pass metrics to the prometheus registry
// If timeout is reached, the channel is closed.
// This will cause a panic if we try to write to it.
for m := range bufCh {
if !timeout.Load() {
ch <- m
}
}
}()
t := time.Now()
// Wait for the collector to finish or timeout
select {
case err = <-errCh:
duration = time.Since(t)
ch <- prometheus.MustNewConstMetric(
p.collectorScrapeDurationDesc,
prometheus.GaugeValue,
duration.Seconds(),
name,
)
case <-time.After(p.maxScrapeDuration):
timeout.Store(true)
duration = time.Since(t)
ch <- prometheus.MustNewConstMetric(
p.collectorScrapeDurationDesc,
prometheus.GaugeValue,
duration.Seconds(),
name,
)
p.logger.Warn(fmt.Sprintf("collector %s timeouted after %s", name, p.maxScrapeDuration))
return pending
}
if err != nil {
_ = level.Error(coll.logger).Log("msg", fmt.Sprintf("collector %s failed after %fs", name, duration), "err", err)
p.logger.Error(fmt.Sprintf("collector %s failed after %s", name, p.maxScrapeDuration),
slog.Any("err", err),
)
return failed
}
_ = level.Debug(coll.logger).Log("msg", fmt.Sprintf("collector %s succeeded after %fs.", name, duration))
p.logger.Error(fmt.Sprintf("collector %s succeeded after %s", name, p.maxScrapeDuration))
return success
}