feat: (#1236) textfile: collect files from multiple path

Signed-off-by: Dinifarb <andreas.vogt89@bluewin.ch>
This commit is contained in:
Dinifarb
2023-07-29 20:02:47 +02:00
parent 134bae514d
commit f5b9ba35d0

View File

@@ -19,7 +19,6 @@ package collector
import ( import (
"fmt" "fmt"
"io" "io"
"io/ioutil"
"os" "os"
"path/filepath" "path/filepath"
"reflect" "reflect"
@@ -38,10 +37,12 @@ import (
const ( const (
FlagTextFileDirectory = "collector.textfile.directory" FlagTextFileDirectory = "collector.textfile.directory"
FlagTextFileDirectories = "collector.textfile.directories"
) )
var ( var (
textFileDirectory *string textFileDirectory *string
textFileDirectories *string
mtimeDesc = prometheus.NewDesc( mtimeDesc = prometheus.NewDesc(
prometheus.BuildFQName(Namespace, "textfile", "mtime_seconds"), prometheus.BuildFQName(Namespace, "textfile", "mtime_seconds"),
@@ -54,7 +55,7 @@ var (
type textFileCollector struct { type textFileCollector struct {
logger log.Logger logger log.Logger
path string directories string
// Only set for testing to get predictable output. // Only set for testing to get predictable output.
mtime *float64 mtime *float64
} }
@@ -63,17 +64,27 @@ type textFileCollector struct {
func newTextFileCollectorFlags(app *kingpin.Application) { func newTextFileCollectorFlags(app *kingpin.Application) {
textFileDirectory = app.Flag( textFileDirectory = app.Flag(
FlagTextFileDirectory, FlagTextFileDirectory,
"Directory to read text files with metrics from.", "Directory or Directories to read text files with metrics from.",
).Default(getDefaultPath()).String() ).Default("").String()
textFileDirectories = app.Flag(
FlagTextFileDirectories,
"Directory or Directories to read text files with metrics from.",
).Default("").String()
} }
// newTextFileCollector returns a new Collector exposing metrics read from files // newTextFileCollector returns a new Collector exposing metrics read from files
// in the given textfile directory. // in the given textfile directory.
func newTextFileCollector(logger log.Logger) (Collector, error) { func newTextFileCollector(logger log.Logger) (Collector, error) {
const subsystem = "textfile" const subsystem = "textfile"
directories := getDefaultPath()
if *textFileDirectory != "" || *textFileDirectories != "" {
directories = *textFileDirectory + "," + *textFileDirectories
directories = strings.Trim(directories, ",")
}
_ = level.Info(logger).Log("msg", fmt.Sprintf("textfile collector directories: %s", directories))
return &textFileCollector{ return &textFileCollector{
logger: log.With(logger, "collector", subsystem), logger: log.With(logger, "collector", subsystem),
path: *textFileDirectory, directories: directories,
}, nil }, nil
} }
@@ -250,49 +261,92 @@ func (cr carriageReturnFilteringReader) Read(p []byte) (int, error) {
// Update implements the Collector interface. // Update implements the Collector interface.
func (c *textFileCollector) Collect(ctx *ScrapeContext, ch chan<- prometheus.Metric) error { func (c *textFileCollector) Collect(ctx *ScrapeContext, ch chan<- prometheus.Metric) error {
error := 0.0 errorMetric := 0.0
mtimes := map[string]time.Time{} var mtimes = map[string]time.Time{}
// Iterate over files and accumulate their metrics.
files, err := ioutil.ReadDir(c.path)
if err != nil && c.path != "" {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading textfile collector directory %q", c.path), "err", err)
error = 1.0
}
// Create empty metricFamily slice here and append parsedFamilies to it inside the loop. // Create empty metricFamily slice here and append parsedFamilies to it inside the loop.
// Once loop is complete, raise error if any duplicates are present. // Once loop is complete, raise error if any duplicates are present.
// This will ensure that duplicate metrics are correctly detected between multiple .prom files. // This will ensure that duplicate metrics are correctly detected between multiple .prom files.
var metricFamilies = []*dto.MetricFamily{} var metricFamilies = []*dto.MetricFamily{}
fileLoop:
for _, f := range files { // Iterate over files and accumulate their metrics.
if !strings.HasSuffix(f.Name(), ".prom") { for _, directory := range strings.Split(c.directories, ",") {
continue err := filepath.WalkDir(directory, func(path string, dirEntry os.DirEntry, err error) error {
if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading directory: %s", path), "err", err)
errorMetric = 1.0
return nil
} }
path := filepath.Join(c.path, f.Name()) if !dirEntry.IsDir() && strings.HasSuffix(dirEntry.Name(), ".prom") {
_ = level.Debug(c.logger).Log("msg", fmt.Sprintf("Processing file %q", path)) _ = level.Debug(c.logger).Log("msg", fmt.Sprintf("Processing file: %s", path))
families_array, err := scrapeFile(path, c.logger)
if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error screaping file: %q. Skip File.", path), "err", err)
errorMetric = 1.0
return nil
}
fileInfo, err := os.Stat(path)
if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading file info: %q. Skip File.", path), "err", err)
errorMetric = 1.0
return nil
}
if _, hasName := mtimes[fileInfo.Name()]; hasName {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Duplicate filename detected: %q. Skip File.", path))
errorMetric = 1.0
return nil
} else {
mtimes[fileInfo.Name()] = fileInfo.ModTime()
metricFamilies = append(metricFamilies, families_array...)
}
}
return nil
})
if err != nil && directory != "" {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading textfile collector directory: %s", c.directories), "err", err)
errorMetric = 1.0
}
}
// If duplicates are detected across *multiple* files, return error.
if duplicateMetricEntry(metricFamilies) {
_ = level.Error(c.logger).Log("msg", "Duplicate metrics detected across multiple files")
errorMetric = 1.0
} else {
for _, mf := range metricFamilies {
c.convertMetricFamily(mf, ch)
}
}
c.exportMTimes(mtimes, ch)
// Export if there were errors.
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(Namespace, "textfile", "scrape_error"),
"1 if there was an error opening or reading a file, 0 otherwise",
nil, nil,
),
prometheus.GaugeValue, errorMetric,
)
return nil
}
func scrapeFile(path string, log log.Logger) ([]*dto.MetricFamily, error) {
file, err := os.Open(path) file, err := os.Open(path)
if err != nil { if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error opening %q: %v", path, err)) return nil, err
error = 1.0
continue
} }
var parser expfmt.TextParser var parser expfmt.TextParser
r, encoding := utfbom.Skip(carriageReturnFilteringReader{r: file}) r, encoding := utfbom.Skip(carriageReturnFilteringReader{r: file})
if err = checkBOM(encoding); err != nil { if err = checkBOM(encoding); err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Invalid file encoding detected in %s: %s - file must be UTF8", path, err.Error())) return nil, err
error = 1.0
continue
} }
parsedFamilies, err := parser.TextToMetricFamilies(r) parsedFamilies, err := parser.TextToMetricFamilies(r)
closeErr := file.Close() closeErr := file.Close()
if closeErr != nil { if closeErr != nil {
_ = level.Warn(c.logger).Log("msg", fmt.Sprintf("Error closing file"), "err", err) _ = level.Warn(log).Log("msg", fmt.Sprintf("Error closing file %q", path), "err", closeErr)
} }
if err != nil { if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error parsing %q: %v", path, err)) return nil, err
error = 1.0
continue
} }
// Use temporary array to check for duplicates // Use temporary array to check for duplicates
@@ -302,9 +356,7 @@ fileLoop:
families_array = append(families_array, mf) families_array = append(families_array, mf)
for _, m := range mf.Metric { for _, m := range mf.Metric {
if m.TimestampMs != nil { if m.TimestampMs != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Textfile %q contains unsupported client-side timestamps, skipping entire file", path)) return nil, fmt.Errorf("textfile contains unsupported client-side timestamps")
error = 1.0
continue fileLoop
} }
} }
if mf.Help == nil { if mf.Help == nil {
@@ -315,42 +367,9 @@ fileLoop:
// If duplicate metrics are detected in a *single* file, skip processing of file metrics // If duplicate metrics are detected in a *single* file, skip processing of file metrics
if duplicateMetricEntry(families_array) { if duplicateMetricEntry(families_array) {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Duplicate metrics detected in file %s. Skipping file processing.", f.Name())) return nil, fmt.Errorf("duplicate metrics detected")
error = 1.0
continue
} }
return families_array, nil
// Only set this once it has been parsed and validated, so that
// a failure does not appear fresh.
mtimes[f.Name()] = f.ModTime()
for _, metricFamily := range parsedFamilies {
metricFamilies = append(metricFamilies, metricFamily)
}
}
// If duplicates are detected across *multiple* files, return error.
if duplicateMetricEntry(metricFamilies) {
_ = level.Error(c.logger).Log("msg", "Duplicate metrics detected across multiple files")
error = 1.0
} else {
for _, mf := range metricFamilies {
c.convertMetricFamily(mf, ch)
}
}
c.exportMTimes(mtimes, ch)
// Export if there were errors.
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
prometheus.BuildFQName(Namespace, "textfile", "scrape_error"),
"1 if there was an error opening or reading a file, 0 otherwise",
nil, nil,
),
prometheus.GaugeValue, error,
)
return nil
} }
func checkBOM(encoding utfbom.Encoding) error { func checkBOM(encoding utfbom.Encoding) error {