From 3c208874333119e19957cbd7b0e86b9606554b02 Mon Sep 17 00:00:00 2001 From: Calle Pettersson Date: Sat, 3 Aug 2019 19:29:01 +0200 Subject: [PATCH] Refactor timeout management --- exporter.go | 156 +++++++++++++++++++++++++++------------------------- 1 file changed, 82 insertions(+), 74 deletions(-) diff --git a/exporter.go b/exporter.go index fa88aa6f..67a54a02 100644 --- a/exporter.go +++ b/exporter.go @@ -79,9 +79,23 @@ func (coll WmiCollector) Describe(ch chan<- *prometheus.Desc) { ch <- scrapeSuccessDesc } +type collectorOutcome int + +const ( + pending collectorOutcome = iota + success + failed +) + // Collect sends the collected metrics from each of the collectors to // prometheus. func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) { + ch <- prometheus.MustNewConstMetric( + startTimeDesc, + prometheus.CounterValue, + startTime, + ) + t := time.Now() scrapeContext, err := collector.PrepareScrapeContext() ch <- prometheus.MustNewConstMetric( @@ -94,75 +108,83 @@ func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) { return } - remainingCollectors := make(map[string]bool) + wg := sync.WaitGroup{} + wg.Add(len(coll.collectors)) + collectorOutcomes := make(map[string]collectorOutcome) for name := range coll.collectors { - remainingCollectors[name] = true + collectorOutcomes[name] = pending } metricsBuffer := make(chan prometheus.Metric) - allDone := make(chan struct{}) - stopped := false + l := sync.Mutex{} + finished := false go func() { - for { - select { - case m, ok := <-metricsBuffer: - if ok && !stopped { - ch <- m - } - case <-allDone: - return + for m := range metricsBuffer { + l.Lock() + if !finished { + ch <- m } + l.Unlock() } }() - wg := sync.WaitGroup{} - wg.Add(len(coll.collectors)) - go func() { - wg.Wait() - close(allDone) - close(metricsBuffer) - }() - for name, c := range coll.collectors { go func(name string, c collector.Collector) { defer wg.Done() - execute(name, c, scrapeContext, metricsBuffer) - delete(remainingCollectors, name) + outcome := execute(name, c, scrapeContext, metricsBuffer) + l.Lock() + if !finished { + collectorOutcomes[name] = outcome + } + l.Unlock() }(name, c) } - ch <- prometheus.MustNewConstMetric( - startTimeDesc, - prometheus.CounterValue, - startTime, - ) + allDone := make(chan struct{}) + go func() { + wg.Wait() + close(allDone) + }() + // Wait until either all collectors finish, or timeout expires select { case <-allDone: - stopped = true - return case <-time.After(coll.maxScrapeDuration): - stopped = true - remainingCollectorNames := make([]string, 0, len(remainingCollectors)) - for rc := range remainingCollectors { - remainingCollectorNames = append(remainingCollectorNames, rc) - } - log.Warn("Collection timed out, still waiting for ", remainingCollectorNames) - for name := range remainingCollectors { - ch <- prometheus.MustNewConstMetric( - scrapeSuccessDesc, - prometheus.GaugeValue, - 0.0, - name, - ) - ch <- prometheus.MustNewConstMetric( - scrapeTimeoutDesc, - prometheus.GaugeValue, - 1.0, - name, - ) - } } + + l.Lock() + finished = true + + remainingCollectorNames := make([]string, 0) + for name, outcome := range collectorOutcomes { + var successValue, timeoutValue float64 + if outcome == pending { + timeoutValue = 1.0 + remainingCollectorNames = append(remainingCollectorNames, name) + } + if outcome == success { + successValue = 1.0 + } + + ch <- prometheus.MustNewConstMetric( + scrapeSuccessDesc, + prometheus.GaugeValue, + successValue, + name, + ) + ch <- prometheus.MustNewConstMetric( + scrapeTimeoutDesc, + prometheus.GaugeValue, + timeoutValue, + name, + ) + } + + if len(remainingCollectorNames) > 0 { + log.Warn("Collection timed out, still waiting for ", remainingCollectorNames) + } + + l.Unlock() } func filterAvailableCollectors(collectors string) string { @@ -176,37 +198,23 @@ func filterAvailableCollectors(collectors string) string { return strings.Join(availableCollectors, ",") } -func execute(name string, c collector.Collector, ctx *collector.ScrapeContext, ch chan<- prometheus.Metric) { - begin := time.Now() +func execute(name string, c collector.Collector, ctx *collector.ScrapeContext, ch chan<- prometheus.Metric) collectorOutcome { + t := time.Now() err := c.Collect(ctx, ch) - duration := time.Since(begin) - var success float64 - - if err != nil { - log.Errorf("collector %s failed after %fs: %s", name, duration.Seconds(), err) - success = 0 - } else { - log.Debugf("collector %s succeeded after %fs.", name, duration.Seconds()) - success = 1 - } + duration := time.Since(t).Seconds() ch <- prometheus.MustNewConstMetric( scrapeDurationDesc, prometheus.GaugeValue, - duration.Seconds(), - name, - ) - ch <- prometheus.MustNewConstMetric( - scrapeSuccessDesc, - prometheus.GaugeValue, - success, - name, - ) - ch <- prometheus.MustNewConstMetric( - scrapeTimeoutDesc, - prometheus.GaugeValue, - 0.0, + duration, name, ) + + if err != nil { + log.Errorf("collector %s failed after %fs: %s", name, duration, err) + return failed + } + log.Debugf("collector %s succeeded after %fs.", name, duration) + return success } func expandEnabledCollectors(enabled string) []string {