diff --git a/.idea/dictionaries/project.xml b/.idea/dictionaries/project.xml index 1ea1cf2a..486bc5cc 100644 --- a/.idea/dictionaries/project.xml +++ b/.idea/dictionaries/project.xml @@ -1,6 +1,7 @@ + containerd spdx diff --git a/internal/collector/container/container.go b/internal/collector/container/container.go index 90407875..c88ddb85 100644 --- a/internal/collector/container/container.go +++ b/internal/collector/container/container.go @@ -46,11 +46,13 @@ const ( subCollectorHCS = "hcs" subCollectorHostprocess = "hostprocess" - containerDStateDir = `C:\ProgramData\containerd\state\io.containerd.runtime.v2.task\k8s.io\` + + JobObjectMemoryUsageInformation = 28 ) type Config struct { - CollectorsEnabled []string `yaml:"collectors_enabled"` + CollectorsEnabled []string `yaml:"enabled"` + ContainerDStateDir string `yaml:"containerd-state-dir"` } //nolint:gochecknoglobals @@ -59,6 +61,7 @@ var ConfigDefaults = Config{ subCollectorHCS, subCollectorHostprocess, }, + ContainerDStateDir: `C:\ProgramData\containerd\state\io.containerd.runtime.v2.task\k8s.io\`, } // A Collector is a Prometheus Collector for containers metrics. @@ -142,6 +145,11 @@ func NewWithFlags(app *kingpin.Application) *Collector { "Comma-separated list of collectors to use. Defaults to all, if not specified.", ).Default(strings.Join(ConfigDefaults.CollectorsEnabled, ",")).StringVar(&collectorsEnabled) + app.Flag( + "collector.container.containerd-state-dir", + "Path to the containerd state directory. Defaults to C:\\ProgramData\\containerd\\state\\io.containerd.runtime.v2.task\\k8s.io\\", + ).Default(ConfigDefaults.ContainerDStateDir).StringVar(&c.config.ContainerDStateDir) + app.Action(func(*kingpin.ParseContext) error { c.config.CollectorsEnabled = strings.Split(collectorsEnabled, ",") @@ -168,10 +176,13 @@ func (c *Collector) Build(logger *slog.Logger, _ *mi.Session) error { } } + c.annotationsCacheHCS = make(map[string]containerInfo) + c.annotationsCacheJob = make(map[string]containerInfo) + c.containerAvailable = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "available"), "Available", - []string{"container_id", "namespace", "pod", "container"}, + []string{"container_id", "namespace", "pod", "container", "hostprocess"}, nil, ) c.containersCount = prometheus.NewDesc( @@ -321,7 +332,7 @@ func (c *Collector) collectHCS(ch chan<- prometheus.Metric) error { var countersCount float64 containerIDs := make([]string, 0, len(containers)) - collectErrors := make([]error, 0, len(containers)) + collectErrors := make([]error, 0) for _, container := range containers { if container.State != "Running" { @@ -339,7 +350,7 @@ func (c *Collector) collectHCS(ch chan<- prometheus.Metric) error { ) if _, ok := c.annotationsCacheHCS[container.ID]; !ok { - if spec, err := getContainerAnnotations(container.ID); err == nil { + if spec, err := c.getContainerAnnotations(container.ID); err == nil { namespace = spec.Annotations["io.kubernetes.cri.sandbox-namespace"] podName = spec.Annotations["io.kubernetes.cri.sandbox-name"] containerName = spec.Annotations["io.kubernetes.cri.container-name"] @@ -357,11 +368,17 @@ func (c *Collector) collectHCS(ch chan<- prometheus.Metric) error { if errors.Is(err, hcs.ErrIDNotFound) { c.logger.Debug("err in fetching container statistics", slog.String("container_id", container.ID), + slog.String("container_name", c.annotationsCacheHCS[container.ID].container), + slog.String("container_pod_name", c.annotationsCacheHCS[container.ID].pod), + slog.String("container_namespace", c.annotationsCacheHCS[container.ID].namespace), slog.Any("err", err), ) } else { c.logger.Error("err in fetching container statistics", slog.String("container_id", container.ID), + slog.String("container_name", c.annotationsCacheHCS[container.ID].container), + slog.String("container_pod_name", c.annotationsCacheHCS[container.ID].pod), + slog.String("container_namespace", c.annotationsCacheHCS[container.ID].namespace), slog.Any("err", err), ) @@ -397,6 +414,18 @@ func (c *Collector) collectHCS(ch chan<- prometheus.Metric) error { } func (c *Collector) collectHCSContainer(ch chan<- prometheus.Metric, containerDetails hcs.Properties, containerInfo containerInfo) error { + // Skip if the container is a pause container + if containerInfo.pod != "" && containerInfo.container == "" { + c.logger.Debug("skipping pause container", + slog.String("container_id", containerDetails.ID), + slog.String("container_name", containerInfo.container), + slog.String("pod_name", containerInfo.pod), + slog.String("namespace", containerInfo.namespace), + ) + + return nil + } + containerStats, err := hcs.GetContainerStatistics(containerDetails.ID) if err != nil { return fmt.Errorf("error fetching container statistics: %w", err) @@ -404,9 +433,9 @@ func (c *Collector) collectHCSContainer(ch chan<- prometheus.Metric, containerDe ch <- prometheus.MustNewConstMetric( c.containerAvailable, - prometheus.CounterValue, + prometheus.GaugeValue, 1, - containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container, + containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container, "false", ) ch <- prometheus.MustNewConstMetric( c.usageCommitBytes, @@ -483,9 +512,6 @@ func (c *Collector) collectHCSContainer(ch chan<- prometheus.Metric, containerDe } // collectNetworkMetrics collects network metrics for containers. -// With HNSv2, the network stats must be collected from hcsshim.HNSListEndpointRequest. -// Network statistics from the container.Statistics() are providing data only, if HNSv1 is used. -// Ref: https://github.com/prometheus-community/windows_exporter/pull/1218 func (c *Collector) collectNetworkMetrics(ch chan<- prometheus.Metric) error { endpoints, err := hcn.EnumerateEndpoints() if err != nil { @@ -506,6 +532,10 @@ func (c *Collector) collectNetworkMetrics(ch chan<- prometheus.Metric) error { continue } + if len(properties.SharedContainers) == 0 { + continue + } + var nicGUID *guid.GUID for _, allocator := range properties.Resources.Allocators { @@ -547,6 +577,11 @@ func (c *Collector) collectNetworkMetrics(ch chan<- prometheus.Metric) error { continue } + // Skip if the container is a pause container + if containerInfo.pod != "" && containerInfo.container == "" { + continue + } + endpointId := strings.ToUpper(endpoint.String()) ch <- prometheus.MustNewConstMetric( @@ -598,7 +633,7 @@ func (c *Collector) collectNetworkMetrics(ch chan<- prometheus.Metric) error { // // Job containers are containers that aren't managed by HCS, e.g host process containers. func (c *Collector) collectJobContainers(ch chan<- prometheus.Metric) error { - containerDStateFS := os.DirFS(containerDStateDir) + containerDStateFS := os.DirFS(c.config.ContainerDStateDir) allContainerIDs := make([]string, 0, len(c.annotationsCacheJob)+len(c.annotationsCacheHCS)) jobContainerIDs := make([]string, 0, len(allContainerIDs)) @@ -607,7 +642,7 @@ func (c *Collector) collectJobContainers(ch chan<- prometheus.Metric) error { if err != nil { if errors.Is(err, fs.ErrNotExist) { c.logger.Warn("containerd state directory does not exist", - slog.String("path", containerDStateDir), + slog.String("path", c.config.ContainerDStateDir), slog.Any("err", err), ) @@ -617,13 +652,42 @@ func (c *Collector) collectJobContainers(ch chan<- prometheus.Metric) error { return err } + if path == "." { + return nil + } + if !d.IsDir() { return nil } if _, err := os.Stat(path + "\\config.json"); err != nil { - containerID := strings.TrimPrefix(strings.Replace(path, containerDStateDir, "", 1), `\`) - allContainerIDs = append(allContainerIDs, containerID) + containerID := strings.TrimPrefix(strings.Replace(path, c.config.ContainerDStateDir, "", 1), `\`) + + if spec, err := c.getContainerAnnotations(containerID); err == nil { + isHostProcess, ok := spec.Annotations["microsoft.com/hostprocess-container"] + if ok && isHostProcess == "true" { + allContainerIDs = append(allContainerIDs, containerID) + + if _, ok := c.annotationsCacheJob[containerID]; !ok { + var ( + namespace string + podName string + containerName string + ) + + namespace = spec.Annotations["io.kubernetes.cri.sandbox-namespace"] + podName = spec.Annotations["io.kubernetes.cri.sandbox-name"] + containerName = spec.Annotations["io.kubernetes.cri.container-name"] + + c.annotationsCacheJob[containerID] = containerInfo{ + id: "containerd://" + containerID, + namespace: namespace, + pod: podName, + container: containerName, + } + } + } + } } // Skip the directory content @@ -653,7 +717,7 @@ func (c *Collector) collectJobContainers(ch chan<- prometheus.Metric) error { } func (c *Collector) collectJobContainer(ch chan<- prometheus.Metric, containerID string) error { - jobObjectHandle, err := kernel32.OpenJobObject("JobContainer_" + containerID) + jobObjectHandle, err := kernel32.OpenJobObject("Global\\JobContainer_" + containerID) if err != nil { if errors.Is(err, windows.ERROR_FILE_NOT_FOUND) { return nil @@ -666,37 +730,29 @@ func (c *Collector) collectJobContainer(ch chan<- prometheus.Metric, containerID _ = windows.Close(fd) }(jobObjectHandle) - if _, ok := c.annotationsCacheJob[containerID]; !ok { - var ( - namespace string - podName string - containerName string - ) + var jobInfo kernel32.JobObjectBasicAndIOAccountingInformation - if spec, err := getContainerAnnotations(containerID); err == nil { - namespace = spec.Annotations["io.kubernetes.cri.sandbox-namespace"] - podName = spec.Annotations["io.kubernetes.cri.sandbox-name"] - containerName = spec.Annotations["io.kubernetes.cri.container-name"] - } - - c.annotationsCacheJob[containerID] = containerInfo{ - id: "containerd://" + containerID, - namespace: namespace, - pod: podName, - container: containerName, - } + if err = windows.QueryInformationJobObject( + jobObjectHandle, + windows.JobObjectBasicAndIoAccountingInformation, + uintptr(unsafe.Pointer(&jobInfo)), + uint32(unsafe.Sizeof(jobInfo)), + nil, + ); err != nil { + return fmt.Errorf("error in querying job object information: %w", err) } - var jobInfo kernel32.JobObjectExtendedLimitInformation + var jobMemoryInfo kernel32.JobObjectMemoryUsageInformation - retLen := uint32(unsafe.Sizeof(jobInfo)) - - if err := windows.QueryInformationJobObject( + // https://github.com/microsoft/hcsshim/blob/bfb2a106798d3765666f6e39ec6cf0117275eab4/internal/jobobject/jobobject.go#L410 + if err = windows.QueryInformationJobObject( jobObjectHandle, - windows.JobObjectExtendedLimitInformation, - uintptr(unsafe.Pointer(&jobInfo)), - retLen, &retLen); err != nil { - return err + JobObjectMemoryUsageInformation, + uintptr(unsafe.Pointer(&jobMemoryInfo)), + uint32(unsafe.Sizeof(jobMemoryInfo)), + nil, + ); err != nil { + return fmt.Errorf("error in querying job object memory usage information: %w", err) } privateWorkingSetBytes, err := calculatePrivateWorkingSetBytes(jobObjectHandle) @@ -708,21 +764,21 @@ func (c *Collector) collectJobContainer(ch chan<- prometheus.Metric, containerID ch <- prometheus.MustNewConstMetric( c.containerAvailable, - prometheus.CounterValue, + prometheus.GaugeValue, 1, - containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container, + containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container, "true", ) ch <- prometheus.MustNewConstMetric( c.usageCommitBytes, prometheus.GaugeValue, - float64(jobInfo.JobMemoryLimit), + float64(jobMemoryInfo.JobMemory), containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container, ) ch <- prometheus.MustNewConstMetric( c.usageCommitPeakBytes, prometheus.GaugeValue, - float64(jobInfo.PeakProcessMemoryUsed), + float64(jobMemoryInfo.PeakJobMemoryUsed), containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container, ) @@ -796,8 +852,8 @@ func getContainerIdWithPrefix(container hcs.Properties) string { } } -func getContainerAnnotations(containerID string) (ociSpec, error) { - configJSON, err := os.OpenFile(containerDStateDir+containerID+`\config.json`, os.O_RDONLY, 0) +func (c *Collector) getContainerAnnotations(containerID string) (ociSpec, error) { + configJSON, err := os.OpenFile(fmt.Sprintf(`%s%s\config.json`, c.config.ContainerDStateDir, containerID), os.O_RDONLY, 0) if err != nil { return ociSpec{}, fmt.Errorf("error in opening config.json file: %w", err) } @@ -831,7 +887,7 @@ func calculatePrivateWorkingSetBytes(jobObjectHandle windows.Handle) (uint64, er retLen = uint32(unsafe.Sizeof(vmCounters)) - getPrivateWorkingSetBytes := func(pid uint32) (uint64, error) { + getMemoryStats := func(pid uint32) (uint64, error) { processHandle, err := windows.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, pid) if err != nil { return 0, fmt.Errorf("error in opening process: %w", err) @@ -865,7 +921,7 @@ func calculatePrivateWorkingSetBytes(jobObjectHandle windows.Handle) (uint64, er } for _, pid := range pidList.PIDs() { - privateWorkingSetSize, err := getPrivateWorkingSetBytes(pid) + privateWorkingSetSize, err := getMemoryStats(pid) if err != nil { return 0, fmt.Errorf("error in getting private working set bytes: %w", err) } diff --git a/internal/headers/kernel32/job.go b/internal/headers/kernel32/job.go index fb524a4b..a924c24f 100644 --- a/internal/headers/kernel32/job.go +++ b/internal/headers/kernel32/job.go @@ -31,7 +31,13 @@ const ( ) func OpenJobObject(name string) (windows.Handle, error) { - handle, _, err := procOpenJobObject.Call(JobObjectQuery, 0, uintptr(unsafe.Pointer(&name))) + ptr, _ := windows.UTF16PtrFromString(name) + handle, _, err := procOpenJobObject.Call( + JobObjectQuery, + 0, + uintptr(unsafe.Pointer(ptr)), + ) + if handle == 0 { return 0, err } diff --git a/internal/headers/kernel32/types.go b/internal/headers/kernel32/types.go index 2f0d9d85..8c52f3e5 100644 --- a/internal/headers/kernel32/types.go +++ b/internal/headers/kernel32/types.go @@ -17,7 +17,11 @@ package kernel32 -import "unsafe" +import ( + "unsafe" + + "golang.org/x/sys/windows" +) type JobObjectBasicAccountingInformation struct { TotalUserTime uint64 @@ -30,22 +34,18 @@ type JobObjectBasicAccountingInformation struct { TotalTerminatedProcesses uint32 } -type IOCounters struct { - ReadOperationCount uint64 - WriteOperationCount uint64 - OtherOperationCount uint64 - ReadTransferCount uint64 - WriteTransferCount uint64 - OtherTransferCount uint64 +// JobObjectBasicAndIOAccountingInformation is a structure that contains +// both basic accounting information and I/O accounting information +// for a job object. It is used with the QueryInformationJobObject function. +// The structure is defined in the Windows API documentation. +// https://learn.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-jobobject_basic_and_io_accounting_information +type JobObjectBasicAndIOAccountingInformation struct { + BasicInfo JobObjectBasicAccountingInformation + IoInfo windows.IO_COUNTERS } - -type JobObjectExtendedLimitInformation struct { - BasicInfo JobObjectBasicAccountingInformation - IoInfo IOCounters - ProcessMemoryLimit uint64 - JobMemoryLimit uint64 - PeakProcessMemoryUsed uint64 - PeakJobMemoryUsed uint64 +type JobObjectMemoryUsageInformation struct { + JobMemory uint64 + PeakJobMemoryUsed uint64 } type JobObjectBasicProcessIDList struct {