container: fix collector (#2057)

This commit is contained in:
Jan-Otto Kröpke
2025-05-24 11:29:52 +02:00
committed by GitHub
parent 6dd21a8e00
commit 5e1a802237
4 changed files with 129 additions and 66 deletions

View File

@@ -1,6 +1,7 @@
<component name="ProjectDictionaryState">
<dictionary name="project">
<words>
<w>containerd</w>
<w>spdx</w>
</words>
</dictionary>

View File

@@ -46,11 +46,13 @@ const (
subCollectorHCS = "hcs"
subCollectorHostprocess = "hostprocess"
containerDStateDir = `C:\ProgramData\containerd\state\io.containerd.runtime.v2.task\k8s.io\`
JobObjectMemoryUsageInformation = 28
)
type Config struct {
CollectorsEnabled []string `yaml:"collectors_enabled"`
CollectorsEnabled []string `yaml:"enabled"`
ContainerDStateDir string `yaml:"containerd-state-dir"`
}
//nolint:gochecknoglobals
@@ -59,6 +61,7 @@ var ConfigDefaults = Config{
subCollectorHCS,
subCollectorHostprocess,
},
ContainerDStateDir: `C:\ProgramData\containerd\state\io.containerd.runtime.v2.task\k8s.io\`,
}
// A Collector is a Prometheus Collector for containers metrics.
@@ -142,6 +145,11 @@ func NewWithFlags(app *kingpin.Application) *Collector {
"Comma-separated list of collectors to use. Defaults to all, if not specified.",
).Default(strings.Join(ConfigDefaults.CollectorsEnabled, ",")).StringVar(&collectorsEnabled)
app.Flag(
"collector.container.containerd-state-dir",
"Path to the containerd state directory. Defaults to C:\\ProgramData\\containerd\\state\\io.containerd.runtime.v2.task\\k8s.io\\",
).Default(ConfigDefaults.ContainerDStateDir).StringVar(&c.config.ContainerDStateDir)
app.Action(func(*kingpin.ParseContext) error {
c.config.CollectorsEnabled = strings.Split(collectorsEnabled, ",")
@@ -168,10 +176,13 @@ func (c *Collector) Build(logger *slog.Logger, _ *mi.Session) error {
}
}
c.annotationsCacheHCS = make(map[string]containerInfo)
c.annotationsCacheJob = make(map[string]containerInfo)
c.containerAvailable = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "available"),
"Available",
[]string{"container_id", "namespace", "pod", "container"},
[]string{"container_id", "namespace", "pod", "container", "hostprocess"},
nil,
)
c.containersCount = prometheus.NewDesc(
@@ -321,7 +332,7 @@ func (c *Collector) collectHCS(ch chan<- prometheus.Metric) error {
var countersCount float64
containerIDs := make([]string, 0, len(containers))
collectErrors := make([]error, 0, len(containers))
collectErrors := make([]error, 0)
for _, container := range containers {
if container.State != "Running" {
@@ -339,7 +350,7 @@ func (c *Collector) collectHCS(ch chan<- prometheus.Metric) error {
)
if _, ok := c.annotationsCacheHCS[container.ID]; !ok {
if spec, err := getContainerAnnotations(container.ID); err == nil {
if spec, err := c.getContainerAnnotations(container.ID); err == nil {
namespace = spec.Annotations["io.kubernetes.cri.sandbox-namespace"]
podName = spec.Annotations["io.kubernetes.cri.sandbox-name"]
containerName = spec.Annotations["io.kubernetes.cri.container-name"]
@@ -357,11 +368,17 @@ func (c *Collector) collectHCS(ch chan<- prometheus.Metric) error {
if errors.Is(err, hcs.ErrIDNotFound) {
c.logger.Debug("err in fetching container statistics",
slog.String("container_id", container.ID),
slog.String("container_name", c.annotationsCacheHCS[container.ID].container),
slog.String("container_pod_name", c.annotationsCacheHCS[container.ID].pod),
slog.String("container_namespace", c.annotationsCacheHCS[container.ID].namespace),
slog.Any("err", err),
)
} else {
c.logger.Error("err in fetching container statistics",
slog.String("container_id", container.ID),
slog.String("container_name", c.annotationsCacheHCS[container.ID].container),
slog.String("container_pod_name", c.annotationsCacheHCS[container.ID].pod),
slog.String("container_namespace", c.annotationsCacheHCS[container.ID].namespace),
slog.Any("err", err),
)
@@ -397,6 +414,18 @@ func (c *Collector) collectHCS(ch chan<- prometheus.Metric) error {
}
func (c *Collector) collectHCSContainer(ch chan<- prometheus.Metric, containerDetails hcs.Properties, containerInfo containerInfo) error {
// Skip if the container is a pause container
if containerInfo.pod != "" && containerInfo.container == "" {
c.logger.Debug("skipping pause container",
slog.String("container_id", containerDetails.ID),
slog.String("container_name", containerInfo.container),
slog.String("pod_name", containerInfo.pod),
slog.String("namespace", containerInfo.namespace),
)
return nil
}
containerStats, err := hcs.GetContainerStatistics(containerDetails.ID)
if err != nil {
return fmt.Errorf("error fetching container statistics: %w", err)
@@ -404,9 +433,9 @@ func (c *Collector) collectHCSContainer(ch chan<- prometheus.Metric, containerDe
ch <- prometheus.MustNewConstMetric(
c.containerAvailable,
prometheus.CounterValue,
prometheus.GaugeValue,
1,
containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container,
containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container, "false",
)
ch <- prometheus.MustNewConstMetric(
c.usageCommitBytes,
@@ -483,9 +512,6 @@ func (c *Collector) collectHCSContainer(ch chan<- prometheus.Metric, containerDe
}
// collectNetworkMetrics collects network metrics for containers.
// With HNSv2, the network stats must be collected from hcsshim.HNSListEndpointRequest.
// Network statistics from the container.Statistics() are providing data only, if HNSv1 is used.
// Ref: https://github.com/prometheus-community/windows_exporter/pull/1218
func (c *Collector) collectNetworkMetrics(ch chan<- prometheus.Metric) error {
endpoints, err := hcn.EnumerateEndpoints()
if err != nil {
@@ -506,6 +532,10 @@ func (c *Collector) collectNetworkMetrics(ch chan<- prometheus.Metric) error {
continue
}
if len(properties.SharedContainers) == 0 {
continue
}
var nicGUID *guid.GUID
for _, allocator := range properties.Resources.Allocators {
@@ -547,6 +577,11 @@ func (c *Collector) collectNetworkMetrics(ch chan<- prometheus.Metric) error {
continue
}
// Skip if the container is a pause container
if containerInfo.pod != "" && containerInfo.container == "" {
continue
}
endpointId := strings.ToUpper(endpoint.String())
ch <- prometheus.MustNewConstMetric(
@@ -598,7 +633,7 @@ func (c *Collector) collectNetworkMetrics(ch chan<- prometheus.Metric) error {
//
// Job containers are containers that aren't managed by HCS, e.g host process containers.
func (c *Collector) collectJobContainers(ch chan<- prometheus.Metric) error {
containerDStateFS := os.DirFS(containerDStateDir)
containerDStateFS := os.DirFS(c.config.ContainerDStateDir)
allContainerIDs := make([]string, 0, len(c.annotationsCacheJob)+len(c.annotationsCacheHCS))
jobContainerIDs := make([]string, 0, len(allContainerIDs))
@@ -607,7 +642,7 @@ func (c *Collector) collectJobContainers(ch chan<- prometheus.Metric) error {
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
c.logger.Warn("containerd state directory does not exist",
slog.String("path", containerDStateDir),
slog.String("path", c.config.ContainerDStateDir),
slog.Any("err", err),
)
@@ -617,13 +652,42 @@ func (c *Collector) collectJobContainers(ch chan<- prometheus.Metric) error {
return err
}
if path == "." {
return nil
}
if !d.IsDir() {
return nil
}
if _, err := os.Stat(path + "\\config.json"); err != nil {
containerID := strings.TrimPrefix(strings.Replace(path, containerDStateDir, "", 1), `\`)
allContainerIDs = append(allContainerIDs, containerID)
containerID := strings.TrimPrefix(strings.Replace(path, c.config.ContainerDStateDir, "", 1), `\`)
if spec, err := c.getContainerAnnotations(containerID); err == nil {
isHostProcess, ok := spec.Annotations["microsoft.com/hostprocess-container"]
if ok && isHostProcess == "true" {
allContainerIDs = append(allContainerIDs, containerID)
if _, ok := c.annotationsCacheJob[containerID]; !ok {
var (
namespace string
podName string
containerName string
)
namespace = spec.Annotations["io.kubernetes.cri.sandbox-namespace"]
podName = spec.Annotations["io.kubernetes.cri.sandbox-name"]
containerName = spec.Annotations["io.kubernetes.cri.container-name"]
c.annotationsCacheJob[containerID] = containerInfo{
id: "containerd://" + containerID,
namespace: namespace,
pod: podName,
container: containerName,
}
}
}
}
}
// Skip the directory content
@@ -653,7 +717,7 @@ func (c *Collector) collectJobContainers(ch chan<- prometheus.Metric) error {
}
func (c *Collector) collectJobContainer(ch chan<- prometheus.Metric, containerID string) error {
jobObjectHandle, err := kernel32.OpenJobObject("JobContainer_" + containerID)
jobObjectHandle, err := kernel32.OpenJobObject("Global\\JobContainer_" + containerID)
if err != nil {
if errors.Is(err, windows.ERROR_FILE_NOT_FOUND) {
return nil
@@ -666,37 +730,29 @@ func (c *Collector) collectJobContainer(ch chan<- prometheus.Metric, containerID
_ = windows.Close(fd)
}(jobObjectHandle)
if _, ok := c.annotationsCacheJob[containerID]; !ok {
var (
namespace string
podName string
containerName string
)
var jobInfo kernel32.JobObjectBasicAndIOAccountingInformation
if spec, err := getContainerAnnotations(containerID); err == nil {
namespace = spec.Annotations["io.kubernetes.cri.sandbox-namespace"]
podName = spec.Annotations["io.kubernetes.cri.sandbox-name"]
containerName = spec.Annotations["io.kubernetes.cri.container-name"]
}
c.annotationsCacheJob[containerID] = containerInfo{
id: "containerd://" + containerID,
namespace: namespace,
pod: podName,
container: containerName,
}
if err = windows.QueryInformationJobObject(
jobObjectHandle,
windows.JobObjectBasicAndIoAccountingInformation,
uintptr(unsafe.Pointer(&jobInfo)),
uint32(unsafe.Sizeof(jobInfo)),
nil,
); err != nil {
return fmt.Errorf("error in querying job object information: %w", err)
}
var jobInfo kernel32.JobObjectExtendedLimitInformation
var jobMemoryInfo kernel32.JobObjectMemoryUsageInformation
retLen := uint32(unsafe.Sizeof(jobInfo))
if err := windows.QueryInformationJobObject(
// https://github.com/microsoft/hcsshim/blob/bfb2a106798d3765666f6e39ec6cf0117275eab4/internal/jobobject/jobobject.go#L410
if err = windows.QueryInformationJobObject(
jobObjectHandle,
windows.JobObjectExtendedLimitInformation,
uintptr(unsafe.Pointer(&jobInfo)),
retLen, &retLen); err != nil {
return err
JobObjectMemoryUsageInformation,
uintptr(unsafe.Pointer(&jobMemoryInfo)),
uint32(unsafe.Sizeof(jobMemoryInfo)),
nil,
); err != nil {
return fmt.Errorf("error in querying job object memory usage information: %w", err)
}
privateWorkingSetBytes, err := calculatePrivateWorkingSetBytes(jobObjectHandle)
@@ -708,21 +764,21 @@ func (c *Collector) collectJobContainer(ch chan<- prometheus.Metric, containerID
ch <- prometheus.MustNewConstMetric(
c.containerAvailable,
prometheus.CounterValue,
prometheus.GaugeValue,
1,
containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container,
containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container, "true",
)
ch <- prometheus.MustNewConstMetric(
c.usageCommitBytes,
prometheus.GaugeValue,
float64(jobInfo.JobMemoryLimit),
float64(jobMemoryInfo.JobMemory),
containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container,
)
ch <- prometheus.MustNewConstMetric(
c.usageCommitPeakBytes,
prometheus.GaugeValue,
float64(jobInfo.PeakProcessMemoryUsed),
float64(jobMemoryInfo.PeakJobMemoryUsed),
containerInfo.id, containerInfo.namespace, containerInfo.pod, containerInfo.container,
)
@@ -796,8 +852,8 @@ func getContainerIdWithPrefix(container hcs.Properties) string {
}
}
func getContainerAnnotations(containerID string) (ociSpec, error) {
configJSON, err := os.OpenFile(containerDStateDir+containerID+`\config.json`, os.O_RDONLY, 0)
func (c *Collector) getContainerAnnotations(containerID string) (ociSpec, error) {
configJSON, err := os.OpenFile(fmt.Sprintf(`%s%s\config.json`, c.config.ContainerDStateDir, containerID), os.O_RDONLY, 0)
if err != nil {
return ociSpec{}, fmt.Errorf("error in opening config.json file: %w", err)
}
@@ -831,7 +887,7 @@ func calculatePrivateWorkingSetBytes(jobObjectHandle windows.Handle) (uint64, er
retLen = uint32(unsafe.Sizeof(vmCounters))
getPrivateWorkingSetBytes := func(pid uint32) (uint64, error) {
getMemoryStats := func(pid uint32) (uint64, error) {
processHandle, err := windows.OpenProcess(windows.PROCESS_QUERY_LIMITED_INFORMATION, false, pid)
if err != nil {
return 0, fmt.Errorf("error in opening process: %w", err)
@@ -865,7 +921,7 @@ func calculatePrivateWorkingSetBytes(jobObjectHandle windows.Handle) (uint64, er
}
for _, pid := range pidList.PIDs() {
privateWorkingSetSize, err := getPrivateWorkingSetBytes(pid)
privateWorkingSetSize, err := getMemoryStats(pid)
if err != nil {
return 0, fmt.Errorf("error in getting private working set bytes: %w", err)
}

View File

@@ -31,7 +31,13 @@ const (
)
func OpenJobObject(name string) (windows.Handle, error) {
handle, _, err := procOpenJobObject.Call(JobObjectQuery, 0, uintptr(unsafe.Pointer(&name)))
ptr, _ := windows.UTF16PtrFromString(name)
handle, _, err := procOpenJobObject.Call(
JobObjectQuery,
0,
uintptr(unsafe.Pointer(ptr)),
)
if handle == 0 {
return 0, err
}

View File

@@ -17,7 +17,11 @@
package kernel32
import "unsafe"
import (
"unsafe"
"golang.org/x/sys/windows"
)
type JobObjectBasicAccountingInformation struct {
TotalUserTime uint64
@@ -30,22 +34,18 @@ type JobObjectBasicAccountingInformation struct {
TotalTerminatedProcesses uint32
}
type IOCounters struct {
ReadOperationCount uint64
WriteOperationCount uint64
OtherOperationCount uint64
ReadTransferCount uint64
WriteTransferCount uint64
OtherTransferCount uint64
// JobObjectBasicAndIOAccountingInformation is a structure that contains
// both basic accounting information and I/O accounting information
// for a job object. It is used with the QueryInformationJobObject function.
// The structure is defined in the Windows API documentation.
// https://learn.microsoft.com/en-us/windows/win32/api/winnt/ns-winnt-jobobject_basic_and_io_accounting_information
type JobObjectBasicAndIOAccountingInformation struct {
BasicInfo JobObjectBasicAccountingInformation
IoInfo windows.IO_COUNTERS
}
type JobObjectExtendedLimitInformation struct {
BasicInfo JobObjectBasicAccountingInformation
IoInfo IOCounters
ProcessMemoryLimit uint64
JobMemoryLimit uint64
PeakProcessMemoryUsed uint64
PeakJobMemoryUsed uint64
type JobObjectMemoryUsageInformation struct {
JobMemory uint64
PeakJobMemoryUsed uint64
}
type JobObjectBasicProcessIDList struct {