From f5b9ba35d020607f3d685dbf98f622dbb6e7d270 Mon Sep 17 00:00:00 2001 From: Dinifarb Date: Sat, 29 Jul 2023 20:02:47 +0200 Subject: [PATCH] feat: (#1236) textfile: collect files from multiple path Signed-off-by: Dinifarb --- collector/textfile.go | 179 +++++++++++++++++++++++------------------- 1 file changed, 99 insertions(+), 80 deletions(-) diff --git a/collector/textfile.go b/collector/textfile.go index 38db2450..2d60f713 100644 --- a/collector/textfile.go +++ b/collector/textfile.go @@ -19,7 +19,6 @@ package collector import ( "fmt" "io" - "io/ioutil" "os" "path/filepath" "reflect" @@ -37,11 +36,13 @@ import ( ) const ( - FlagTextFileDirectory = "collector.textfile.directory" + FlagTextFileDirectory = "collector.textfile.directory" + FlagTextFileDirectories = "collector.textfile.directories" ) var ( - textFileDirectory *string + textFileDirectory *string + textFileDirectories *string mtimeDesc = prometheus.NewDesc( prometheus.BuildFQName(Namespace, "textfile", "mtime_seconds"), @@ -54,7 +55,7 @@ var ( type textFileCollector struct { logger log.Logger - path string + directories string // Only set for testing to get predictable output. mtime *float64 } @@ -63,17 +64,27 @@ type textFileCollector struct { func newTextFileCollectorFlags(app *kingpin.Application) { textFileDirectory = app.Flag( FlagTextFileDirectory, - "Directory to read text files with metrics from.", - ).Default(getDefaultPath()).String() + "Directory or Directories to read text files with metrics from.", + ).Default("").String() + textFileDirectories = app.Flag( + FlagTextFileDirectories, + "Directory or Directories to read text files with metrics from.", + ).Default("").String() } // newTextFileCollector returns a new Collector exposing metrics read from files // in the given textfile directory. func newTextFileCollector(logger log.Logger) (Collector, error) { const subsystem = "textfile" + directories := getDefaultPath() + if *textFileDirectory != "" || *textFileDirectories != "" { + directories = *textFileDirectory + "," + *textFileDirectories + directories = strings.Trim(directories, ",") + } + _ = level.Info(logger).Log("msg", fmt.Sprintf("textfile collector directories: %s", directories)) return &textFileCollector{ - logger: log.With(logger, "collector", subsystem), - path: *textFileDirectory, + logger: log.With(logger, "collector", subsystem), + directories: directories, }, nil } @@ -250,89 +261,56 @@ func (cr carriageReturnFilteringReader) Read(p []byte) (int, error) { // Update implements the Collector interface. func (c *textFileCollector) Collect(ctx *ScrapeContext, ch chan<- prometheus.Metric) error { - error := 0.0 - mtimes := map[string]time.Time{} - - // Iterate over files and accumulate their metrics. - files, err := ioutil.ReadDir(c.path) - if err != nil && c.path != "" { - _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading textfile collector directory %q", c.path), "err", err) - error = 1.0 - } - + errorMetric := 0.0 + var mtimes = map[string]time.Time{} // Create empty metricFamily slice here and append parsedFamilies to it inside the loop. // Once loop is complete, raise error if any duplicates are present. // This will ensure that duplicate metrics are correctly detected between multiple .prom files. var metricFamilies = []*dto.MetricFamily{} -fileLoop: - for _, f := range files { - if !strings.HasSuffix(f.Name(), ".prom") { - continue - } - path := filepath.Join(c.path, f.Name()) - _ = level.Debug(c.logger).Log("msg", fmt.Sprintf("Processing file %q", path)) - file, err := os.Open(path) - if err != nil { - _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error opening %q: %v", path, err)) - error = 1.0 - continue - } - var parser expfmt.TextParser - r, encoding := utfbom.Skip(carriageReturnFilteringReader{r: file}) - if err = checkBOM(encoding); err != nil { - _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Invalid file encoding detected in %s: %s - file must be UTF8", path, err.Error())) - error = 1.0 - continue - } - parsedFamilies, err := parser.TextToMetricFamilies(r) - closeErr := file.Close() - if closeErr != nil { - _ = level.Warn(c.logger).Log("msg", fmt.Sprintf("Error closing file"), "err", err) - } - if err != nil { - _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error parsing %q: %v", path, err)) - error = 1.0 - continue - } - // Use temporary array to check for duplicates - var families_array []*dto.MetricFamily - - for _, mf := range parsedFamilies { - families_array = append(families_array, mf) - for _, m := range mf.Metric { - if m.TimestampMs != nil { - _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Textfile %q contains unsupported client-side timestamps, skipping entire file", path)) - error = 1.0 - continue fileLoop + // Iterate over files and accumulate their metrics. + for _, directory := range strings.Split(c.directories, ",") { + err := filepath.WalkDir(directory, func(path string, dirEntry os.DirEntry, err error) error { + if err != nil { + _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading directory: %s", path), "err", err) + errorMetric = 1.0 + return nil + } + if !dirEntry.IsDir() && strings.HasSuffix(dirEntry.Name(), ".prom") { + _ = level.Debug(c.logger).Log("msg", fmt.Sprintf("Processing file: %s", path)) + families_array, err := scrapeFile(path, c.logger) + if err != nil { + _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error screaping file: %q. Skip File.", path), "err", err) + errorMetric = 1.0 + return nil + } + fileInfo, err := os.Stat(path) + if err != nil { + _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading file info: %q. Skip File.", path), "err", err) + errorMetric = 1.0 + return nil + } + if _, hasName := mtimes[fileInfo.Name()]; hasName { + _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Duplicate filename detected: %q. Skip File.", path)) + errorMetric = 1.0 + return nil + } else { + mtimes[fileInfo.Name()] = fileInfo.ModTime() + metricFamilies = append(metricFamilies, families_array...) } } - if mf.Help == nil { - help := fmt.Sprintf("Metric read from %s", path) - mf.Help = &help - } - } - - // If duplicate metrics are detected in a *single* file, skip processing of file metrics - if duplicateMetricEntry(families_array) { - _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Duplicate metrics detected in file %s. Skipping file processing.", f.Name())) - error = 1.0 - continue - } - - // Only set this once it has been parsed and validated, so that - // a failure does not appear fresh. - mtimes[f.Name()] = f.ModTime() - - for _, metricFamily := range parsedFamilies { - metricFamilies = append(metricFamilies, metricFamily) + return nil + }) + if err != nil && directory != "" { + _ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading textfile collector directory: %s", c.directories), "err", err) + errorMetric = 1.0 } } // If duplicates are detected across *multiple* files, return error. if duplicateMetricEntry(metricFamilies) { _ = level.Error(c.logger).Log("msg", "Duplicate metrics detected across multiple files") - error = 1.0 + errorMetric = 1.0 } else { for _, mf := range metricFamilies { c.convertMetricFamily(mf, ch) @@ -340,7 +318,6 @@ fileLoop: } c.exportMTimes(mtimes, ch) - // Export if there were errors. ch <- prometheus.MustNewConstMetric( prometheus.NewDesc( @@ -348,11 +325,53 @@ fileLoop: "1 if there was an error opening or reading a file, 0 otherwise", nil, nil, ), - prometheus.GaugeValue, error, + prometheus.GaugeValue, errorMetric, ) return nil } +func scrapeFile(path string, log log.Logger) ([]*dto.MetricFamily, error) { + file, err := os.Open(path) + if err != nil { + return nil, err + } + var parser expfmt.TextParser + r, encoding := utfbom.Skip(carriageReturnFilteringReader{r: file}) + if err = checkBOM(encoding); err != nil { + return nil, err + } + parsedFamilies, err := parser.TextToMetricFamilies(r) + closeErr := file.Close() + if closeErr != nil { + _ = level.Warn(log).Log("msg", fmt.Sprintf("Error closing file %q", path), "err", closeErr) + } + if err != nil { + return nil, err + } + + // Use temporary array to check for duplicates + var families_array []*dto.MetricFamily + + for _, mf := range parsedFamilies { + families_array = append(families_array, mf) + for _, m := range mf.Metric { + if m.TimestampMs != nil { + return nil, fmt.Errorf("textfile contains unsupported client-side timestamps") + } + } + if mf.Help == nil { + help := fmt.Sprintf("Metric read from %s", path) + mf.Help = &help + } + } + + // If duplicate metrics are detected in a *single* file, skip processing of file metrics + if duplicateMetricEntry(families_array) { + return nil, fmt.Errorf("duplicate metrics detected") + } + return families_array, nil +} + func checkBOM(encoding utfbom.Encoding) error { if encoding == utfbom.Unknown || encoding == utfbom.UTF8 { return nil