Abort scrapes after configurable timeout

This commit is contained in:
Calle Pettersson
2019-05-15 21:22:29 +02:00
parent 33879449a2
commit 85fdfb44b8

View File

@@ -23,7 +23,8 @@ import (
// WmiCollector implements the prometheus.Collector interface.
type WmiCollector struct {
collectors map[string]collector.Collector
maxScrapeDuration time.Duration
collectors map[string]collector.Collector
}
const (
@@ -45,6 +46,12 @@ var (
[]string{"collector"},
nil,
)
scrapeTimeoutDesc = prometheus.NewDesc(
prometheus.BuildFQName(collector.Namespace, "exporter", "collector_timeout"),
"wmi_exporter: Whether the collector timed out.",
[]string{"collector"},
nil,
)
// This can be removed when client_golang exposes this on Windows
// (See https://github.com/prometheus/client_golang/issues/376)
@@ -65,8 +72,7 @@ func (coll WmiCollector) Describe(ch chan<- *prometheus.Desc) {
}
// Collect sends the collected metrics from each of the collectors to
// prometheus. Collect could be called several times concurrently
// and thus its run is protected by a single mutex.
// prometheus.
func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) {
scrapeContext, err := collector.PrepareScrapeContext()
if err != nil {
@@ -74,12 +80,40 @@ func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) {
return
}
remainingCollectors := make(map[string]bool)
for name := range coll.collectors {
remainingCollectors[name] = true
}
metricsBuffer := make(chan prometheus.Metric)
allDone := make(chan struct{})
stopped := false
go func() {
for {
select {
case m := <-metricsBuffer:
if !stopped {
ch <- m
}
case <-allDone:
return
}
}
}()
wg := sync.WaitGroup{}
wg.Add(len(coll.collectors))
go func() {
wg.Wait()
close(allDone)
close(metricsBuffer)
}()
for name, c := range coll.collectors {
go func(name string, c collector.Collector) {
execute(name, c, scrapeContext, ch)
execute(name, c, scrapeContext, metricsBuffer)
wg.Done()
delete(remainingCollectors, name)
}(name, c)
}
@@ -88,7 +122,33 @@ func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) {
prometheus.CounterValue,
startTime,
)
wg.Wait()
select {
case <-allDone:
stopped = true
return
case <-time.After(coll.maxScrapeDuration):
stopped = true
remainingCollectorNames := make([]string, 0, len(remainingCollectors))
for rc := range remainingCollectors {
remainingCollectorNames = append(remainingCollectorNames, rc)
}
log.Warn("Collection timed out, still waiting for ", remainingCollectorNames)
for name := range remainingCollectors {
ch <- prometheus.MustNewConstMetric(
scrapeSuccessDesc,
prometheus.GaugeValue,
0.0,
name,
)
ch <- prometheus.MustNewConstMetric(
scrapeTimeoutDesc,
prometheus.GaugeValue,
1.0,
name,
)
}
}
}
func filterAvailableCollectors(collectors string) string {
@@ -127,6 +187,12 @@ func execute(name string, c collector.Collector, ctx *collector.ScrapeContext, c
success,
name,
)
ch <- prometheus.MustNewConstMetric(
scrapeTimeoutDesc,
prometheus.GaugeValue,
0.0,
name,
)
}
func expandEnabledCollectors(enabled string) []string {
@@ -198,6 +264,10 @@ func main() {
"collectors.print",
"If true, print available collectors and exit.",
).Bool()
maxScrapeDuration = kingpin.Flag(
"scrape.max-duration",
"Time after which collectors are aborted during a scrape",
).Default("30s").Duration()
)
log.AddFlags(kingpin.CommandLine)
@@ -242,8 +312,11 @@ func main() {
log.Infof("Enabled collectors: %v", strings.Join(keys(collectors), ", "))
nodeCollector := WmiCollector{collectors: collectors}
prometheus.MustRegister(nodeCollector)
exporter := WmiCollector{
collectors: collectors,
maxScrapeDuration: *maxScrapeDuration,
}
prometheus.MustRegister(exporter)
http.Handle(*metricsPath, promhttp.Handler())
http.HandleFunc("/health", healthCheck)