diff --git a/exporter.go b/exporter.go index 8197865a..9a140474 100644 --- a/exporter.go +++ b/exporter.go @@ -23,7 +23,8 @@ import ( // WmiCollector implements the prometheus.Collector interface. type WmiCollector struct { - collectors map[string]collector.Collector + maxScrapeDuration time.Duration + collectors map[string]collector.Collector } const ( @@ -45,6 +46,12 @@ var ( []string{"collector"}, nil, ) + scrapeTimeoutDesc = prometheus.NewDesc( + prometheus.BuildFQName(collector.Namespace, "exporter", "collector_timeout"), + "wmi_exporter: Whether the collector timed out.", + []string{"collector"}, + nil, + ) // This can be removed when client_golang exposes this on Windows // (See https://github.com/prometheus/client_golang/issues/376) @@ -65,8 +72,7 @@ func (coll WmiCollector) Describe(ch chan<- *prometheus.Desc) { } // Collect sends the collected metrics from each of the collectors to -// prometheus. Collect could be called several times concurrently -// and thus its run is protected by a single mutex. +// prometheus. func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) { scrapeContext, err := collector.PrepareScrapeContext() if err != nil { @@ -74,12 +80,40 @@ func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) { return } + remainingCollectors := make(map[string]bool) + for name := range coll.collectors { + remainingCollectors[name] = true + } + + metricsBuffer := make(chan prometheus.Metric) + allDone := make(chan struct{}) + stopped := false + go func() { + for { + select { + case m := <-metricsBuffer: + if !stopped { + ch <- m + } + case <-allDone: + return + } + } + }() + wg := sync.WaitGroup{} wg.Add(len(coll.collectors)) + go func() { + wg.Wait() + close(allDone) + close(metricsBuffer) + }() + for name, c := range coll.collectors { go func(name string, c collector.Collector) { - execute(name, c, scrapeContext, ch) + execute(name, c, scrapeContext, metricsBuffer) wg.Done() + delete(remainingCollectors, name) }(name, c) } @@ -88,7 +122,33 @@ func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) { prometheus.CounterValue, startTime, ) - wg.Wait() + + select { + case <-allDone: + stopped = true + return + case <-time.After(coll.maxScrapeDuration): + stopped = true + remainingCollectorNames := make([]string, 0, len(remainingCollectors)) + for rc := range remainingCollectors { + remainingCollectorNames = append(remainingCollectorNames, rc) + } + log.Warn("Collection timed out, still waiting for ", remainingCollectorNames) + for name := range remainingCollectors { + ch <- prometheus.MustNewConstMetric( + scrapeSuccessDesc, + prometheus.GaugeValue, + 0.0, + name, + ) + ch <- prometheus.MustNewConstMetric( + scrapeTimeoutDesc, + prometheus.GaugeValue, + 1.0, + name, + ) + } + } } func filterAvailableCollectors(collectors string) string { @@ -127,6 +187,12 @@ func execute(name string, c collector.Collector, ctx *collector.ScrapeContext, c success, name, ) + ch <- prometheus.MustNewConstMetric( + scrapeTimeoutDesc, + prometheus.GaugeValue, + 0.0, + name, + ) } func expandEnabledCollectors(enabled string) []string { @@ -198,6 +264,10 @@ func main() { "collectors.print", "If true, print available collectors and exit.", ).Bool() + maxScrapeDuration = kingpin.Flag( + "scrape.max-duration", + "Time after which collectors are aborted during a scrape", + ).Default("30s").Duration() ) log.AddFlags(kingpin.CommandLine) @@ -242,8 +312,11 @@ func main() { log.Infof("Enabled collectors: %v", strings.Join(keys(collectors), ", ")) - nodeCollector := WmiCollector{collectors: collectors} - prometheus.MustRegister(nodeCollector) + exporter := WmiCollector{ + collectors: collectors, + maxScrapeDuration: *maxScrapeDuration, + } + prometheus.MustRegister(exporter) http.Handle(*metricsPath, promhttp.Handler()) http.HandleFunc("/health", healthCheck)