Refactor timeout management

This commit is contained in:
Calle Pettersson
2019-08-03 19:29:01 +02:00
parent 1a9d4afdd6
commit 3c20887433

View File

@@ -79,9 +79,23 @@ func (coll WmiCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- scrapeSuccessDesc ch <- scrapeSuccessDesc
} }
type collectorOutcome int
const (
pending collectorOutcome = iota
success
failed
)
// Collect sends the collected metrics from each of the collectors to // Collect sends the collected metrics from each of the collectors to
// prometheus. // prometheus.
func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) { func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) {
ch <- prometheus.MustNewConstMetric(
startTimeDesc,
prometheus.CounterValue,
startTime,
)
t := time.Now() t := time.Now()
scrapeContext, err := collector.PrepareScrapeContext() scrapeContext, err := collector.PrepareScrapeContext()
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
@@ -94,75 +108,83 @@ func (coll WmiCollector) Collect(ch chan<- prometheus.Metric) {
return return
} }
remainingCollectors := make(map[string]bool) wg := sync.WaitGroup{}
wg.Add(len(coll.collectors))
collectorOutcomes := make(map[string]collectorOutcome)
for name := range coll.collectors { for name := range coll.collectors {
remainingCollectors[name] = true collectorOutcomes[name] = pending
} }
metricsBuffer := make(chan prometheus.Metric) metricsBuffer := make(chan prometheus.Metric)
allDone := make(chan struct{}) l := sync.Mutex{}
stopped := false finished := false
go func() { go func() {
for { for m := range metricsBuffer {
select { l.Lock()
case m, ok := <-metricsBuffer: if !finished {
if ok && !stopped { ch <- m
ch <- m
}
case <-allDone:
return
} }
l.Unlock()
} }
}() }()
wg := sync.WaitGroup{}
wg.Add(len(coll.collectors))
go func() {
wg.Wait()
close(allDone)
close(metricsBuffer)
}()
for name, c := range coll.collectors { for name, c := range coll.collectors {
go func(name string, c collector.Collector) { go func(name string, c collector.Collector) {
defer wg.Done() defer wg.Done()
execute(name, c, scrapeContext, metricsBuffer) outcome := execute(name, c, scrapeContext, metricsBuffer)
delete(remainingCollectors, name) l.Lock()
if !finished {
collectorOutcomes[name] = outcome
}
l.Unlock()
}(name, c) }(name, c)
} }
ch <- prometheus.MustNewConstMetric( allDone := make(chan struct{})
startTimeDesc, go func() {
prometheus.CounterValue, wg.Wait()
startTime, close(allDone)
) }()
// Wait until either all collectors finish, or timeout expires
select { select {
case <-allDone: case <-allDone:
stopped = true
return
case <-time.After(coll.maxScrapeDuration): case <-time.After(coll.maxScrapeDuration):
stopped = true
remainingCollectorNames := make([]string, 0, len(remainingCollectors))
for rc := range remainingCollectors {
remainingCollectorNames = append(remainingCollectorNames, rc)
}
log.Warn("Collection timed out, still waiting for ", remainingCollectorNames)
for name := range remainingCollectors {
ch <- prometheus.MustNewConstMetric(
scrapeSuccessDesc,
prometheus.GaugeValue,
0.0,
name,
)
ch <- prometheus.MustNewConstMetric(
scrapeTimeoutDesc,
prometheus.GaugeValue,
1.0,
name,
)
}
} }
l.Lock()
finished = true
remainingCollectorNames := make([]string, 0)
for name, outcome := range collectorOutcomes {
var successValue, timeoutValue float64
if outcome == pending {
timeoutValue = 1.0
remainingCollectorNames = append(remainingCollectorNames, name)
}
if outcome == success {
successValue = 1.0
}
ch <- prometheus.MustNewConstMetric(
scrapeSuccessDesc,
prometheus.GaugeValue,
successValue,
name,
)
ch <- prometheus.MustNewConstMetric(
scrapeTimeoutDesc,
prometheus.GaugeValue,
timeoutValue,
name,
)
}
if len(remainingCollectorNames) > 0 {
log.Warn("Collection timed out, still waiting for ", remainingCollectorNames)
}
l.Unlock()
} }
func filterAvailableCollectors(collectors string) string { func filterAvailableCollectors(collectors string) string {
@@ -176,37 +198,23 @@ func filterAvailableCollectors(collectors string) string {
return strings.Join(availableCollectors, ",") return strings.Join(availableCollectors, ",")
} }
func execute(name string, c collector.Collector, ctx *collector.ScrapeContext, ch chan<- prometheus.Metric) { func execute(name string, c collector.Collector, ctx *collector.ScrapeContext, ch chan<- prometheus.Metric) collectorOutcome {
begin := time.Now() t := time.Now()
err := c.Collect(ctx, ch) err := c.Collect(ctx, ch)
duration := time.Since(begin) duration := time.Since(t).Seconds()
var success float64
if err != nil {
log.Errorf("collector %s failed after %fs: %s", name, duration.Seconds(), err)
success = 0
} else {
log.Debugf("collector %s succeeded after %fs.", name, duration.Seconds())
success = 1
}
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
scrapeDurationDesc, scrapeDurationDesc,
prometheus.GaugeValue, prometheus.GaugeValue,
duration.Seconds(), duration,
name,
)
ch <- prometheus.MustNewConstMetric(
scrapeSuccessDesc,
prometheus.GaugeValue,
success,
name,
)
ch <- prometheus.MustNewConstMetric(
scrapeTimeoutDesc,
prometheus.GaugeValue,
0.0,
name, name,
) )
if err != nil {
log.Errorf("collector %s failed after %fs: %s", name, duration, err)
return failed
}
log.Debugf("collector %s succeeded after %fs.", name, duration)
return success
} }
func expandEnabledCollectors(enabled string) []string { func expandEnabledCollectors(enabled string) []string {