collector: fix flapping metrics if process is enabled. (#1643)

This commit is contained in:
Jan-Otto Kröpke
2024-09-27 11:09:17 +02:00
committed by GitHub
parent 2d334e4df0
commit 65d19f433e
4 changed files with 64 additions and 25 deletions

View File

@@ -97,6 +97,7 @@ func (c *Collector) Build(logger *slog.Logger, _ *wmi.Client) error {
// to the provided prometheus Metric channel.
func (c *Collector) Collect(_ *types.ScrapeContext, logger *slog.Logger, ch chan<- prometheus.Metric) error {
logger = logger.With(slog.String("collector", Name))
if err := c.collect(ch); err != nil {
logger.Error("failed collecting cs metrics",
slog.Any("err", err),

View File

@@ -3,6 +3,7 @@
package collector
import (
"context"
"fmt"
"log/slog"
"sync"
@@ -164,37 +165,65 @@ func (p *Prometheus) Collect(ch chan<- prometheus.Metric) {
)
}
func (p *Prometheus) execute(name string, c Collector, ctx *types.ScrapeContext, ch chan<- prometheus.Metric) collectorStatusCode {
func (p *Prometheus) execute(name string, c Collector, scrapeCtx *types.ScrapeContext, ch chan<- prometheus.Metric) collectorStatusCode {
var (
err error
duration time.Duration
timeout atomic.Bool
err error
numMetrics int
duration time.Duration
timeout atomic.Bool
)
// bufCh is a buffer channel to store the metrics
// This is needed because once timeout is reached, the prometheus registry channel is closed.
bufCh := make(chan prometheus.Metric, 10)
bufCh := make(chan prometheus.Metric, 1000)
errCh := make(chan error, 1)
ctx, cancel := context.WithTimeout(context.Background(), p.maxScrapeDuration)
defer cancel()
// Execute the collector
go func() {
errCh <- c.Collect(ctx, p.logger, bufCh)
defer func() {
if r := recover(); r != nil {
p.logger.Error("panic in collector "+name,
slog.Any("panic", r),
)
}
}()
errCh <- c.Collect(scrapeCtx, p.logger, bufCh)
close(bufCh)
}()
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer func() {
// This prevents a panic from race-condition when closing the ch channel too early.
_ = recover()
wg.Done()
}()
// Pass metrics to the prometheus registry
// If timeout is reached, the channel is closed.
// This will cause a panic if we try to write to it.
for m := range bufCh {
if !timeout.Load() {
ch <- m
for {
select {
case <-ctx.Done():
return
case m, ok := <-bufCh:
if !ok {
return
}
if !timeout.Load() {
ch <- m
numMetrics++
}
}
}
}()
@@ -204,6 +233,8 @@ func (p *Prometheus) execute(name string, c Collector, ctx *types.ScrapeContext,
// Wait for the collector to finish or timeout
select {
case err = <-errCh:
wg.Wait() // Wait for the buffer channel to be closed and empty
duration = time.Since(t)
ch <- prometheus.MustNewConstMetric(
p.collectorScrapeDurationDesc,
@@ -211,7 +242,7 @@ func (p *Prometheus) execute(name string, c Collector, ctx *types.ScrapeContext,
duration.Seconds(),
name,
)
case <-time.After(p.maxScrapeDuration):
case <-ctx.Done():
timeout.Store(true)
duration = time.Since(t)
@@ -222,20 +253,27 @@ func (p *Prometheus) execute(name string, c Collector, ctx *types.ScrapeContext,
name,
)
p.logger.Warn(fmt.Sprintf("collector %s timeouted after %s", name, p.maxScrapeDuration))
p.logger.Warn(fmt.Sprintf("collector %s timeouted after %s, resulting in %d metrics", name, p.maxScrapeDuration, numMetrics))
go func() {
// Drain channel in case of premature return to not leak a goroutine.
//nolint:revive
for range bufCh {
}
}()
return pending
}
if err != nil {
p.logger.Error(fmt.Sprintf("collector %s failed after %s", name, duration),
p.logger.Error(fmt.Sprintf("collector %s failed after %s, resulting in %d metrics", name, duration, numMetrics),
slog.Any("err", err),
)
return failed
}
p.logger.Info(fmt.Sprintf("collector %s succeeded after %s", name, duration))
p.logger.Info(fmt.Sprintf("collector %s succeeded after %s, resulting in %d metrics", name, duration, numMetrics))
return success
}