mirror of
https://github.com/prometheus-community/windows_exporter.git
synced 2026-03-07 02:56:35 +00:00
feat: Tolerate collector failures (#1769)
Signed-off-by: Jan-Otto Kröpke <mail@jkroepke.de>
This commit is contained in:
@@ -20,24 +20,30 @@ import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"regexp"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/alecthomas/kingpin/v2"
|
||||
"github.com/go-ole/go-ole"
|
||||
"github.com/go-ole/go-ole/oleutil"
|
||||
"github.com/prometheus-community/windows_exporter/internal/headers/schedule_service"
|
||||
"github.com/prometheus-community/windows_exporter/internal/mi"
|
||||
"github.com/prometheus-community/windows_exporter/internal/types"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const Name = "scheduled_task"
|
||||
const (
|
||||
Name = "scheduled_task"
|
||||
|
||||
workerCount = 4
|
||||
)
|
||||
|
||||
type Config struct {
|
||||
TaskExclude *regexp.Regexp `yaml:"task_exclude"`
|
||||
TaskInclude *regexp.Regexp `yaml:"task_include"`
|
||||
}
|
||||
|
||||
//nolint:gochecknoglobals
|
||||
var ConfigDefaults = Config{
|
||||
TaskExclude: types.RegExpEmpty,
|
||||
TaskInclude: types.RegExpAny,
|
||||
@@ -46,8 +52,11 @@ var ConfigDefaults = Config{
|
||||
type Collector struct {
|
||||
config Config
|
||||
|
||||
scheduledTasksReqCh chan struct{}
|
||||
scheduledTasksCh chan *scheduledTaskResults
|
||||
logger *slog.Logger
|
||||
|
||||
scheduledTasksReqCh chan struct{}
|
||||
scheduledTasksWorker chan scheduledTaskWorkerRequest
|
||||
scheduledTasksCh chan scheduledTaskResults
|
||||
|
||||
lastResult *prometheus.Desc
|
||||
missedRuns *prometheus.Desc
|
||||
@@ -73,6 +82,7 @@ const (
|
||||
SCHED_S_TASK_HAS_NOT_RUN TaskResult = 0x00041303
|
||||
)
|
||||
|
||||
//nolint:gochecknoglobals
|
||||
var taskStates = []string{"disabled", "queued", "ready", "running", "unknown"}
|
||||
|
||||
type scheduledTask struct {
|
||||
@@ -85,8 +95,13 @@ type scheduledTask struct {
|
||||
}
|
||||
|
||||
type scheduledTaskResults struct {
|
||||
scheduledTasks []scheduledTask
|
||||
err error
|
||||
tasks []scheduledTask
|
||||
err error
|
||||
}
|
||||
|
||||
type scheduledTaskWorkerRequest struct {
|
||||
folderPath string
|
||||
results chan<- scheduledTaskResults
|
||||
}
|
||||
|
||||
func New(config *Config) *Collector {
|
||||
@@ -157,10 +172,13 @@ func (c *Collector) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error {
|
||||
func (c *Collector) Build(logger *slog.Logger, _ *mi.Session) error {
|
||||
c.logger = logger.With(slog.String("collector", Name))
|
||||
|
||||
initErrCh := make(chan error)
|
||||
c.scheduledTasksReqCh = make(chan struct{})
|
||||
c.scheduledTasksCh = make(chan *scheduledTaskResults)
|
||||
c.scheduledTasksCh = make(chan scheduledTaskResults)
|
||||
c.scheduledTasksWorker = make(chan scheduledTaskWorkerRequest, 100)
|
||||
|
||||
go c.initializeScheduleService(initErrCh)
|
||||
|
||||
@@ -256,68 +274,41 @@ func (c *Collector) getScheduledTasks() ([]scheduledTask, error) {
|
||||
return []scheduledTask{}, nil
|
||||
}
|
||||
|
||||
if scheduledTasks == nil {
|
||||
return nil, errors.New("scheduled tasks channel is nil")
|
||||
}
|
||||
|
||||
if scheduledTasks.err != nil {
|
||||
return nil, scheduledTasks.err
|
||||
}
|
||||
|
||||
return scheduledTasks.scheduledTasks, scheduledTasks.err
|
||||
return scheduledTasks.tasks, scheduledTasks.err
|
||||
}
|
||||
|
||||
func (c *Collector) initializeScheduleService(initErrCh chan<- error) {
|
||||
// The only way to run WMI queries in parallel while being thread-safe is to
|
||||
// ensure the CoInitialize[Ex]() call is bound to its current OS thread.
|
||||
// Otherwise, attempting to initialize and run parallel queries across
|
||||
// goroutines will result in protected memory errors.
|
||||
runtime.LockOSThread()
|
||||
defer runtime.UnlockOSThread()
|
||||
service := schedule_service.New()
|
||||
if err := service.Connect(); err != nil {
|
||||
initErrCh <- fmt.Errorf("failed to connect to schedule service: %w", err)
|
||||
|
||||
if err := ole.CoInitializeEx(0, ole.COINIT_MULTITHREADED); err != nil {
|
||||
var oleCode *ole.OleError
|
||||
if errors.As(err, &oleCode) && oleCode.Code() != ole.S_OK && oleCode.Code() != 0x00000001 {
|
||||
initErrCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
return
|
||||
defer service.Close()
|
||||
|
||||
errs := make([]error, 0, workerCount)
|
||||
|
||||
for range workerCount {
|
||||
errCh := make(chan error, workerCount)
|
||||
|
||||
go c.collectWorker(errCh)
|
||||
|
||||
if err := <-errCh; err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
defer ole.CoUninitialize()
|
||||
|
||||
scheduleClassID, err := ole.ClassIDFrom("Schedule.Service.1")
|
||||
if err != nil {
|
||||
if err := errors.Join(errs...); err != nil {
|
||||
initErrCh <- err
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
taskSchedulerObj, err := ole.CreateInstance(scheduleClassID, nil)
|
||||
if err != nil || taskSchedulerObj == nil {
|
||||
initErrCh <- err
|
||||
|
||||
return
|
||||
}
|
||||
defer taskSchedulerObj.Release()
|
||||
|
||||
taskServiceObj := taskSchedulerObj.MustQueryInterface(ole.IID_IDispatch)
|
||||
defer taskServiceObj.Release()
|
||||
|
||||
taskService, err := oleutil.CallMethod(taskServiceObj, "Connect")
|
||||
if err != nil {
|
||||
initErrCh <- err
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
defer func(taskService *ole.VARIANT) {
|
||||
_ = taskService.Clear()
|
||||
}(taskService)
|
||||
|
||||
close(initErrCh)
|
||||
|
||||
scheduledTasks := make([]scheduledTask, 0, 100)
|
||||
taskServiceObj := service.GetOLETaskServiceObj()
|
||||
scheduledTasks := make([]scheduledTask, 0, 500)
|
||||
|
||||
for range c.scheduledTasksReqCh {
|
||||
func() {
|
||||
@@ -327,30 +318,102 @@ func (c *Collector) initializeScheduleService(initErrCh chan<- error) {
|
||||
|
||||
res, err := oleutil.CallMethod(taskServiceObj, "GetFolder", `\`)
|
||||
if err != nil {
|
||||
c.scheduledTasksCh <- &scheduledTaskResults{err: err}
|
||||
|
||||
return
|
||||
c.scheduledTasksCh <- scheduledTaskResults{err: err}
|
||||
}
|
||||
|
||||
rootFolderObj := res.ToIDispatch()
|
||||
defer rootFolderObj.Release()
|
||||
|
||||
err = fetchTasksRecursively(rootFolderObj, &scheduledTasks)
|
||||
errs := make([]error, 0)
|
||||
scheduledTasksWorkerResults := make(chan scheduledTaskResults)
|
||||
|
||||
c.scheduledTasksCh <- &scheduledTaskResults{scheduledTasks: scheduledTasks, err: err}
|
||||
wg := &sync.WaitGroup{}
|
||||
|
||||
go func() {
|
||||
for workerResults := range scheduledTasksWorkerResults {
|
||||
wg.Done()
|
||||
|
||||
if workerResults.err != nil {
|
||||
errs = append(errs, workerResults.err)
|
||||
}
|
||||
|
||||
if workerResults.tasks != nil {
|
||||
errs = append(errs, workerResults.err)
|
||||
|
||||
scheduledTasks = append(scheduledTasks, workerResults.tasks...)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if err := c.fetchRecursively(rootFolderObj, wg, scheduledTasksWorkerResults); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
|
||||
close(scheduledTasksWorkerResults)
|
||||
|
||||
c.scheduledTasksCh <- scheduledTaskResults{tasks: scheduledTasks, err: errors.Join(errs...)}
|
||||
}()
|
||||
}
|
||||
|
||||
close(c.scheduledTasksCh)
|
||||
close(c.scheduledTasksWorker)
|
||||
|
||||
c.scheduledTasksCh = nil
|
||||
c.scheduledTasksWorker = nil
|
||||
}
|
||||
|
||||
func fetchTasksRecursively(folder *ole.IDispatch, scheduledTasks *[]scheduledTask) error {
|
||||
if err := fetchTasksInFolder(folder, scheduledTasks); err != nil {
|
||||
return err
|
||||
func (c *Collector) collectWorker(errCh chan<- error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
c.logger.Error("worker panic",
|
||||
slog.Any("panic", r),
|
||||
)
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
// Restart the collectWorker
|
||||
go c.collectWorker(errCh)
|
||||
|
||||
if err := <-errCh; err != nil {
|
||||
c.logger.Error("failed to restart worker",
|
||||
slog.Any("err", err),
|
||||
)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
service := schedule_service.New()
|
||||
if err := service.Connect(); err != nil {
|
||||
errCh <- fmt.Errorf("failed to connect to schedule service: %w", err)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
close(errCh)
|
||||
|
||||
defer service.Close()
|
||||
|
||||
taskServiceObj := service.GetOLETaskServiceObj()
|
||||
|
||||
for task := range c.scheduledTasksWorker {
|
||||
scheduledTasks, err := fetchTasksInFolder(taskServiceObj, task.folderPath)
|
||||
|
||||
task.results <- scheduledTaskResults{tasks: scheduledTasks, err: err}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Collector) fetchRecursively(folder *ole.IDispatch, wg *sync.WaitGroup, results chan<- scheduledTaskResults) error {
|
||||
folderPathVariant, err := oleutil.GetProperty(folder, "Path")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get folder path: %w", err)
|
||||
}
|
||||
|
||||
folderPath := folderPathVariant.ToString()
|
||||
|
||||
wg.Add(1)
|
||||
c.scheduledTasksWorker <- scheduledTaskWorkerRequest{folderPath: folderPath, results: results}
|
||||
|
||||
res, err := oleutil.CallMethod(folder, "GetFolders", 1)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -359,25 +422,41 @@ func fetchTasksRecursively(folder *ole.IDispatch, scheduledTasks *[]scheduledTas
|
||||
subFolders := res.ToIDispatch()
|
||||
defer subFolders.Release()
|
||||
|
||||
err = oleutil.ForEach(subFolders, func(v *ole.VARIANT) error {
|
||||
return oleutil.ForEach(subFolders, func(v *ole.VARIANT) error {
|
||||
subFolder := v.ToIDispatch()
|
||||
defer subFolder.Release()
|
||||
|
||||
return fetchTasksRecursively(subFolder, scheduledTasks)
|
||||
return c.fetchRecursively(subFolder, wg, results)
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func fetchTasksInFolder(folder *ole.IDispatch, scheduledTasks *[]scheduledTask) error {
|
||||
res, err := oleutil.CallMethod(folder, "GetTasks", 1)
|
||||
func fetchTasksInFolder(taskServiceObj *ole.IDispatch, folderPath string) ([]scheduledTask, error) {
|
||||
folderObjRes, err := oleutil.CallMethod(taskServiceObj, "GetFolder", folderPath)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, fmt.Errorf("failed to get folder %s: %w", folderPath, err)
|
||||
}
|
||||
|
||||
tasks := res.ToIDispatch()
|
||||
folderObj := folderObjRes.ToIDispatch()
|
||||
defer folderObj.Release()
|
||||
|
||||
tasksRes, err := oleutil.CallMethod(folderObj, "GetTasks", 1)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get tasks in folder %s: %w", folderPath, err)
|
||||
}
|
||||
|
||||
tasks := tasksRes.ToIDispatch()
|
||||
defer tasks.Release()
|
||||
|
||||
// Get task count
|
||||
countVariant, err := oleutil.GetProperty(tasks, "Count")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get task count: %w", err)
|
||||
}
|
||||
|
||||
taskCount := int(countVariant.Val)
|
||||
|
||||
scheduledTasks := make([]scheduledTask, 0, taskCount)
|
||||
|
||||
err = oleutil.ForEach(tasks, func(v *ole.VARIANT) error {
|
||||
task := v.ToIDispatch()
|
||||
defer task.Release()
|
||||
@@ -387,12 +466,15 @@ func fetchTasksInFolder(folder *ole.IDispatch, scheduledTasks *[]scheduledTask)
|
||||
return err
|
||||
}
|
||||
|
||||
*scheduledTasks = append(*scheduledTasks, parsedTask)
|
||||
scheduledTasks = append(scheduledTasks, parsedTask)
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to iterate over tasks: %w", err)
|
||||
}
|
||||
|
||||
return err
|
||||
return scheduledTasks, nil
|
||||
}
|
||||
|
||||
func parseTask(task *ole.IDispatch) (scheduledTask, error) {
|
||||
|
||||
Reference in New Issue
Block a user