remote_fx: refactor collector (#1738)

Signed-off-by: Jan-Otto Kröpke <mail@jkroepke.de>
This commit is contained in:
Jan-Otto Kröpke
2024-11-15 19:34:00 +01:00
committed by GitHub
parent a4ec0a96f1
commit 78bd720e88
32 changed files with 278 additions and 204 deletions

View File

@@ -3,6 +3,7 @@
package update
import (
"context"
"errors"
"fmt"
"log/slog"
@@ -37,7 +38,8 @@ var ErrNoUpdates = errors.New("no updates available")
type Collector struct {
config Config
mu sync.RWMutex
mu sync.RWMutex
ctxCancelFn context.CancelFunc
metricsBuf []prometheus.Metric
@@ -77,6 +79,8 @@ func NewWithFlags(app *kingpin.Application) *Collector {
}
func (c *Collector) Close(_ *slog.Logger) error {
c.ctxCancelFn()
return nil
}
@@ -85,8 +89,12 @@ func (c *Collector) Build(logger *slog.Logger, _ *mi.Session) error {
logger.Info("update collector is in an experimental state! The configuration and metrics may change in future. Please report any issues.")
ctx, cancel := context.WithCancel(context.Background())
initErrCh := make(chan error, 1)
go c.scheduleUpdateStatus(logger, initErrCh, c.config.online)
go c.scheduleUpdateStatus(ctx, logger, initErrCh, c.config.online)
c.ctxCancelFn = cancel
if err := <-initErrCh; err != nil {
return fmt.Errorf("failed to initialize Windows Update collector: %w", err)
@@ -137,7 +145,7 @@ func (c *Collector) Collect(_ *types.ScrapeContext, _ *slog.Logger, ch chan<- pr
return nil
}
func (c *Collector) scheduleUpdateStatus(logger *slog.Logger, initErrCh chan<- error, online bool) {
func (c *Collector) scheduleUpdateStatus(ctx context.Context, logger *slog.Logger, initErrCh chan<- error, online bool) {
// The only way to run WMI queries in parallel while being thread-safe is to
// ensure the CoInitialize[Ex]() call is bound to its current OS thread.
// Otherwise, attempting to initialize and run parallel queries across
@@ -226,10 +234,12 @@ func (c *Collector) scheduleUpdateStatus(logger *slog.Logger, initErrCh chan<- e
usd := us.ToIDispatch()
defer usd.Release()
var metricsBuf []prometheus.Metric
for {
metricsBuf, err := c.fetchUpdates(logger, usd)
metricsBuf, err = c.fetchUpdates(logger, usd)
if err != nil {
logger.Error("failed to fetch updates",
logger.ErrorContext(ctx, "failed to fetch updates",
slog.Any("err", err),
)
@@ -244,7 +254,11 @@ func (c *Collector) scheduleUpdateStatus(logger *slog.Logger, initErrCh chan<- e
c.metricsBuf = metricsBuf
c.mu.Unlock()
time.Sleep(c.config.scrapeInterval)
select {
case <-time.After(c.config.scrapeInterval):
case <-ctx.Done():
return
}
}
}