Merge pull request #1262 from DiniFarb/textfile_collector

Textfile collector: collect files from multiple paths
This commit is contained in:
Ben Reedy
2023-09-22 06:36:23 +10:00
committed by GitHub
9 changed files with 208 additions and 86 deletions

View File

@@ -19,7 +19,6 @@ package collector
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"reflect"
@@ -37,11 +36,13 @@ import (
)
const (
FlagTextFileDirectory = "collector.textfile.directory"
FlagTextFileDirectory = "collector.textfile.directory"
FlagTextFileDirectories = "collector.textfile.directories"
)
var (
textFileDirectory *string
textFileDirectory *string
textFileDirectories *string
mtimeDesc = prometheus.NewDesc(
prometheus.BuildFQName(Namespace, "textfile", "mtime_seconds"),
@@ -54,7 +55,7 @@ var (
type textFileCollector struct {
logger log.Logger
path string
directories string
// Only set for testing to get predictable output.
mtime *float64
}
@@ -63,17 +64,27 @@ type textFileCollector struct {
func newTextFileCollectorFlags(app *kingpin.Application) {
textFileDirectory = app.Flag(
FlagTextFileDirectory,
"Directory to read text files with metrics from.",
).Default(getDefaultPath()).String()
"DEPRECATED: Use --collector.textfile.directories",
).Default("").Hidden().String()
textFileDirectories = app.Flag(
FlagTextFileDirectories,
"Directory or Directories to read text files with metrics from.",
).Default("").String()
}
// newTextFileCollector returns a new Collector exposing metrics read from files
// in the given textfile directory.
func newTextFileCollector(logger log.Logger) (Collector, error) {
const subsystem = "textfile"
directories := getDefaultPath()
if *textFileDirectory != "" || *textFileDirectories != "" {
directories = *textFileDirectory + "," + *textFileDirectories
directories = strings.Trim(directories, ",")
}
_ = level.Info(logger).Log("msg", fmt.Sprintf("textfile collector directories: %s", directories))
return &textFileCollector{
logger: log.With(logger, "collector", subsystem),
path: *textFileDirectory,
logger: log.With(logger, "collector", subsystem),
directories: directories,
}, nil
}
@@ -250,89 +261,55 @@ func (cr carriageReturnFilteringReader) Read(p []byte) (int, error) {
// Update implements the Collector interface.
func (c *textFileCollector) Collect(ctx *ScrapeContext, ch chan<- prometheus.Metric) error {
error := 0.0
mtimes := map[string]time.Time{}
// Iterate over files and accumulate their metrics.
files, err := ioutil.ReadDir(c.path)
if err != nil && c.path != "" {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading textfile collector directory %q", c.path), "err", err)
error = 1.0
}
errorMetric := 0.0
var mtimes = map[string]time.Time{}
// Create empty metricFamily slice here and append parsedFamilies to it inside the loop.
// Once loop is complete, raise error if any duplicates are present.
// This will ensure that duplicate metrics are correctly detected between multiple .prom files.
var metricFamilies = []*dto.MetricFamily{}
fileLoop:
for _, f := range files {
if !strings.HasSuffix(f.Name(), ".prom") {
continue
}
path := filepath.Join(c.path, f.Name())
_ = level.Debug(c.logger).Log("msg", fmt.Sprintf("Processing file %q", path))
file, err := os.Open(path)
if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error opening %q: %v", path, err))
error = 1.0
continue
}
var parser expfmt.TextParser
r, encoding := utfbom.Skip(carriageReturnFilteringReader{r: file})
if err = checkBOM(encoding); err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Invalid file encoding detected in %s: %s - file must be UTF8", path, err.Error()))
error = 1.0
continue
}
parsedFamilies, err := parser.TextToMetricFamilies(r)
closeErr := file.Close()
if closeErr != nil {
_ = level.Warn(c.logger).Log("msg", fmt.Sprintf("Error closing file"), "err", err)
}
if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error parsing %q: %v", path, err))
error = 1.0
continue
}
// Use temporary array to check for duplicates
var families_array []*dto.MetricFamily
for _, mf := range parsedFamilies {
families_array = append(families_array, mf)
for _, m := range mf.Metric {
if m.TimestampMs != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Textfile %q contains unsupported client-side timestamps, skipping entire file", path))
error = 1.0
continue fileLoop
// Iterate over files and accumulate their metrics.
for _, directory := range strings.Split(c.directories, ",") {
err := filepath.WalkDir(directory, func(path string, dirEntry os.DirEntry, err error) error {
if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading directory: %s", path), "err", err)
errorMetric = 1.0
return nil
}
if !dirEntry.IsDir() && strings.HasSuffix(dirEntry.Name(), ".prom") {
_ = level.Debug(c.logger).Log("msg", fmt.Sprintf("Processing file: %s", path))
families_array, err := scrapeFile(path, c.logger)
if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error scraping file: %q. Skip File.", path), "err", err)
errorMetric = 1.0
return nil
}
fileInfo, err := os.Stat(path)
if err != nil {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading file info: %q. Skip File.", path), "err", err)
errorMetric = 1.0
return nil
}
if _, hasName := mtimes[fileInfo.Name()]; hasName {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Duplicate filename detected: %q. Skip File.", path))
errorMetric = 1.0
return nil
}
mtimes[fileInfo.Name()] = fileInfo.ModTime()
metricFamilies = append(metricFamilies, families_array...)
}
if mf.Help == nil {
help := fmt.Sprintf("Metric read from %s", path)
mf.Help = &help
}
}
// If duplicate metrics are detected in a *single* file, skip processing of file metrics
if duplicateMetricEntry(families_array) {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Duplicate metrics detected in file %s. Skipping file processing.", f.Name()))
error = 1.0
continue
}
// Only set this once it has been parsed and validated, so that
// a failure does not appear fresh.
mtimes[f.Name()] = f.ModTime()
for _, metricFamily := range parsedFamilies {
metricFamilies = append(metricFamilies, metricFamily)
return nil
})
if err != nil && directory != "" {
_ = level.Error(c.logger).Log("msg", fmt.Sprintf("Error reading textfile collector directory: %s", c.directories), "err", err)
errorMetric = 1.0
}
}
// If duplicates are detected across *multiple* files, return error.
if duplicateMetricEntry(metricFamilies) {
_ = level.Error(c.logger).Log("msg", "Duplicate metrics detected across multiple files")
error = 1.0
errorMetric = 1.0
} else {
for _, mf := range metricFamilies {
c.convertMetricFamily(mf, ch)
@@ -340,7 +317,6 @@ fileLoop:
}
c.exportMTimes(mtimes, ch)
// Export if there were errors.
ch <- prometheus.MustNewConstMetric(
prometheus.NewDesc(
@@ -348,11 +324,53 @@ fileLoop:
"1 if there was an error opening or reading a file, 0 otherwise",
nil, nil,
),
prometheus.GaugeValue, error,
prometheus.GaugeValue, errorMetric,
)
return nil
}
func scrapeFile(path string, log log.Logger) ([]*dto.MetricFamily, error) {
file, err := os.Open(path)
if err != nil {
return nil, err
}
var parser expfmt.TextParser
r, encoding := utfbom.Skip(carriageReturnFilteringReader{r: file})
if err = checkBOM(encoding); err != nil {
return nil, err
}
parsedFamilies, err := parser.TextToMetricFamilies(r)
closeErr := file.Close()
if closeErr != nil {
_ = level.Warn(log).Log("msg", fmt.Sprintf("Error closing file %q", path), "err", closeErr)
}
if err != nil {
return nil, err
}
// Use temporary array to check for duplicates
var families_array []*dto.MetricFamily
for _, mf := range parsedFamilies {
families_array = append(families_array, mf)
for _, m := range mf.Metric {
if m.TimestampMs != nil {
return nil, fmt.Errorf("textfile contains unsupported client-side timestamps")
}
}
if mf.Help == nil {
help := fmt.Sprintf("Metric read from %s", path)
mf.Help = &help
}
}
// If duplicate metrics are detected in a *single* file, skip processing of file metrics
if duplicateMetricEntry(families_array) {
return nil, fmt.Errorf("duplicate metrics detected")
}
return families_array, nil
}
func checkBOM(encoding utfbom.Encoding) error {
if encoding == utfbom.Unknown || encoding == utfbom.UTF8 {
return nil

View File

@@ -1,19 +1,25 @@
package collector
import (
"io/ioutil"
"fmt"
"io"
"os"
"strings"
"testing"
"github.com/dimchansky/utfbom"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)
var baseDir = "../tools/textfile-test"
func TestCRFilter(t *testing.T) {
sr := strings.NewReader("line 1\r\nline 2")
cr := carriageReturnFilteringReader{r: sr}
b, err := ioutil.ReadAll(cr)
b, err := io.ReadAll(cr)
if err != nil {
t.Error(err)
}
@@ -153,3 +159,73 @@ func TestDuplicateMetricEntry(t *testing.T) {
t.Errorf("Unexpected duplicate found in differentValues")
}
}
func TestMultipleDirectories(t *testing.T) {
testDir := baseDir + "/multiple-dirs"
testDirs := fmt.Sprintf("%[1]s/dir1,%[1]s/dir2,%[1]s/dir3", testDir)
collector := &textFileCollector{
logger: log.NewLogfmtLogger(os.Stdout),
directories: testDirs,
}
scrapeContext, err := PrepareScrapeContext([]string{"textfile_test"})
if err != nil {
t.Errorf("Unexpected error %s", err)
}
metrics := make(chan prometheus.Metric)
got := ""
go func() {
for {
var metric dto.Metric
val := <-metrics
err := val.Write(&metric)
if err != nil {
t.Errorf("Unexpected error %s", err)
}
got += metric.String()
}
}()
err = collector.Collect(scrapeContext, metrics)
if err != nil {
t.Errorf("Unexpected error %s", err)
}
for _, f := range []string{"dir1", "dir2", "dir3", "dir3sub"} {
if !strings.Contains(got, f) {
t.Errorf("Unexpected output %s: %q", f, got)
}
}
}
func TestDuplicateFileName(t *testing.T) {
testDir := baseDir + "/duplicate-filename"
collector := &textFileCollector{
logger: log.NewLogfmtLogger(os.Stdout),
directories: testDir,
}
scrapeContext, err := PrepareScrapeContext([]string{"textfile_test"})
if err != nil {
t.Errorf("Unexpected error %s", err)
}
metrics := make(chan prometheus.Metric)
got := ""
go func() {
for {
var metric dto.Metric
val := <-metrics
err := val.Write(&metric)
if err != nil {
t.Errorf("Unexpected error %s", err)
}
got += metric.String()
}
}()
err = collector.Collect(scrapeContext, metrics)
if err != nil {
t.Errorf("Unexpected error %s", err)
}
if !strings.Contains(got, "file") {
t.Errorf("Unexpected output %q", got)
}
if strings.Contains(got, "sub_file") {
t.Errorf("Unexpected output %q", got)
}
}

View File

@@ -10,15 +10,25 @@ Enabled by default? | Yes
## Flags
### `--collector.textfile.directory`
### `--collector.textfile.directory`
:warning: DEPRECATED Use `--collector.textfile.directories`
The directory containing the files to be ingested. Only files with the extension `.prom` are read. The `.prom` file must end with an empty line feed to work properly.
<br>
### `--collector.textfile.directories`
One or multiple directories containing the files to be ingested.
E.G. `--collector.textfile.directories="C:\MyDir1,C:\MyDir2"`
Default value: `C:\Program Files\windows_exporter\textfile_inputs`
Required: No
## Metrics
> **Note:**
> - If there are duplicated filenames among the directories, only the first one found will be read. For any other files with the same name, the `windows_textfile_scrape_error` metric will be set to 1 and a error message will be logged.
> - Only files with the extension `.prom` are read. The `.prom` file must end with an empty line feed to work properly.
Metrics will primarily come from the files on disk. The below listed metrics
are collected to give information about the reading of the metrics themselves.
@@ -38,7 +48,7 @@ _This collector does not yet have any useful queries added, we would appreciate
_This collector does not yet have alerting examples, we would appreciate your help adding them!_
# Example use
This Powershell script, when run in the `collector.textfile.directory` (default `C:\Program Files\windows_exporter\textfile_inputs`), generates a valid `.prom` file that should successfully ingested by windows_exporter.
This Powershell script, when run in the `--collector.textfile.directories` (default `C:\Program Files\windows_exporter\textfile_inputs`), generates a valid `.prom` file that should successfully ingested by windows_exporter.
```Powershell
$alpha = 42

View File

@@ -0,0 +1,3 @@
# HELP windows_test Some Test
# TYPE windows_test gauge
windows_test{flag="file"} 1

View File

@@ -0,0 +1,3 @@
# HELP windows_test Some Test
# TYPE windows_test gauge
windows_test{flag="sub_file"} 2

View File

@@ -0,0 +1,3 @@
# HELP windows_test Some Test
# TYPE windows_test gauge
windows_test{flag="dir1"} 1

View File

@@ -0,0 +1,3 @@
# HELP windows_test Some Test
# TYPE windows_test gauge
windows_test{flag="dir2"} 2

View File

@@ -0,0 +1,3 @@
# HELP windows_test Some Test
# TYPE windows_test gauge
windows_test{flag="dir3"} 3

View File

@@ -0,0 +1,3 @@
# HELP windows_test Some Test
# TYPE windows_test gauge
windows_test{flag="dir3sub"} 3