From 068bcb7237a787f4531abb254f5651a510c0f69d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan-Otto=20Kr=C3=B6pke?= Date: Tue, 20 May 2025 15:37:01 +0200 Subject: [PATCH] tcp: relax metrics collectos on error (#2041) --- internal/collector/tcp/tcp.go | 135 +++++++++++++++----------- internal/collector/udp/udp.go | 45 +++++---- internal/headers/iphlpapi/iphlpapi.go | 14 +-- 3 files changed, 110 insertions(+), 84 deletions(-) diff --git a/internal/collector/tcp/tcp.go b/internal/collector/tcp/tcp.go index 208d545b..8f26d2ed 100644 --- a/internal/collector/tcp/tcp.go +++ b/internal/collector/tcp/tcp.go @@ -33,7 +33,15 @@ import ( "golang.org/x/sys/windows" ) -const Name = "tcp" +const ( + Name = "tcp" + + ipAddressFamilyIPv4 = "ipv4" + ipAddressFamilyIPv6 = "ipv6" + + subCollectorMetrics = "metrics" + subCollectorConnectionsState = "connections_state" +) type Config struct { CollectorsEnabled []string `yaml:"enabled"` @@ -42,8 +50,8 @@ type Config struct { //nolint:gochecknoglobals var ConfigDefaults = Config{ CollectorsEnabled: []string{ - "metrics", - "connections_state", + subCollectorMetrics, + subCollectorConnectionsState, }, } @@ -111,7 +119,7 @@ func (c *Collector) GetName() string { } func (c *Collector) Close() error { - if slices.Contains(c.config.CollectorsEnabled, "metrics") { + if slices.Contains(c.config.CollectorsEnabled, subCollectorMetrics) { c.perfDataCollector4.Close() c.perfDataCollector6.Close() } @@ -120,79 +128,86 @@ func (c *Collector) Close() error { } func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error { + labels := []string{"af"} + c.connectionFailures = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "connection_failures_total"), "(TCP.ConnectionFailures)", - []string{"af"}, + labels, nil, ) c.connectionsActive = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "connections_active_total"), "(TCP.ConnectionsActive)", - []string{"af"}, + labels, nil, ) c.connectionsEstablished = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "connections_established"), "(TCP.ConnectionsEstablished)", - []string{"af"}, + labels, nil, ) c.connectionsPassive = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "connections_passive_total"), "(TCP.ConnectionsPassive)", - []string{"af"}, + labels, nil, ) c.connectionsReset = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "connections_reset_total"), "(TCP.ConnectionsReset)", - []string{"af"}, + labels, nil, ) c.segmentsTotal = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "segments_total"), "(TCP.SegmentsTotal)", - []string{"af"}, + labels, nil, ) c.segmentsReceivedTotal = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "segments_received_total"), "(TCP.SegmentsReceivedTotal)", - []string{"af"}, + labels, nil, ) c.segmentsRetransmittedTotal = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "segments_retransmitted_total"), "(TCP.SegmentsRetransmittedTotal)", - []string{"af"}, + labels, nil, ) c.segmentsSentTotal = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "segments_sent_total"), "(TCP.SegmentsSentTotal)", - []string{"af"}, + labels, nil, ) c.connectionsStateCount = prometheus.NewDesc( prometheus.BuildFQName(types.Namespace, Name, "connections_state_count"), "Number of TCP connections by state and address family", - []string{"af", "state"}, nil, + []string{"af", "state"}, + nil, ) - var err error + errs := make([]error, 0) - c.perfDataCollector4, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "TCPv4", nil) - if err != nil { - return fmt.Errorf("failed to create TCPv4 collector: %w", err) + if slices.Contains(c.config.CollectorsEnabled, subCollectorMetrics) { + var err error + + c.perfDataCollector4, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "TCPv4", nil) + if err != nil { + errs = append(errs, fmt.Errorf("failed to create TCPv4 collector: %w", err)) + } + + c.perfDataCollector6, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "TCPv6", nil) + if err != nil { + errs = append(errs, fmt.Errorf("failed to create TCPv6 collector: %w", err)) + } } - c.perfDataCollector6, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "TCPv6", nil) - if err != nil { - return fmt.Errorf("failed to create TCPv6 collector: %w", err) - } - - return nil + return errors.Join(errs...) } // Collect sends the metric values for each metric @@ -200,13 +215,13 @@ func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error { func (c *Collector) Collect(ch chan<- prometheus.Metric) error { errs := make([]error, 0) - if slices.Contains(c.config.CollectorsEnabled, "metrics") { + if slices.Contains(c.config.CollectorsEnabled, subCollectorMetrics) { if err := c.collect(ch); err != nil { errs = append(errs, fmt.Errorf("failed collecting tcp metrics: %w", err)) } } - if slices.Contains(c.config.CollectorsEnabled, "connections_state") { + if slices.Contains(c.config.CollectorsEnabled, subCollectorConnectionsState) { if err := c.collectConnectionsState(ch); err != nil { errs = append(errs, fmt.Errorf("failed collecting tcp connection state metrics: %w", err)) } @@ -216,96 +231,100 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) error { } func (c *Collector) collect(ch chan<- prometheus.Metric) error { - err := c.perfDataCollector4.Collect(&c.perfDataObject4) - if err != nil { - return fmt.Errorf("failed to collect TCPv4 metrics[0]. %w", err) + errs := make([]error, 0) + + if err := c.perfDataCollector4.Collect(&c.perfDataObject4); err != nil { + errs = append(errs, fmt.Errorf("failed to collect TCPv4 metrics. %w", err)) + } else if len(c.perfDataObject4) == 0 { + errs = append(errs, fmt.Errorf("failed to collect TCPv4 metrics: %w", types.ErrNoDataUnexpected)) + } else { + c.writeTCPCounters(ch, c.perfDataObject4, ipAddressFamilyIPv4) } - c.writeTCPCounters(ch, c.perfDataObject4, []string{"ipv4"}) - - err = c.perfDataCollector6.Collect(&c.perfDataObject6) - if err != nil { - return fmt.Errorf("failed to collect TCPv6 metrics[0]. %w", err) + if err := c.perfDataCollector6.Collect(&c.perfDataObject6); err != nil { + errs = append(errs, fmt.Errorf("failed to collect TCPv6 metrics. %w", err)) + } else if len(c.perfDataObject6) == 0 { + errs = append(errs, fmt.Errorf("failed to collect TCPv6 metrics: %w", types.ErrNoDataUnexpected)) + } else { + c.writeTCPCounters(ch, c.perfDataObject6, ipAddressFamilyIPv6) } - c.writeTCPCounters(ch, c.perfDataObject6, []string{"ipv6"}) - - return nil + return errors.Join(errs...) } -func (c *Collector) writeTCPCounters(ch chan<- prometheus.Metric, metrics []perfDataCounterValues, labels []string) { +func (c *Collector) writeTCPCounters(ch chan<- prometheus.Metric, metrics []perfDataCounterValues, af string) { ch <- prometheus.MustNewConstMetric( c.connectionFailures, prometheus.CounterValue, metrics[0].ConnectionFailures, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.connectionsActive, prometheus.CounterValue, metrics[0].ConnectionsActive, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.connectionsEstablished, prometheus.GaugeValue, metrics[0].ConnectionsEstablished, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.connectionsPassive, prometheus.CounterValue, metrics[0].ConnectionsPassive, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.connectionsReset, prometheus.CounterValue, metrics[0].ConnectionsReset, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.segmentsTotal, prometheus.CounterValue, metrics[0].SegmentsPerSec, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.segmentsReceivedTotal, prometheus.CounterValue, metrics[0].SegmentsReceivedPerSec, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.segmentsRetransmittedTotal, prometheus.CounterValue, metrics[0].SegmentsRetransmittedPerSec, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.segmentsSentTotal, prometheus.CounterValue, metrics[0].SegmentsSentPerSec, - labels..., + af, ) } func (c *Collector) collectConnectionsState(ch chan<- prometheus.Metric) error { - stateCounts, err := iphlpapi.GetTCPConnectionStates(windows.AF_INET) - if err != nil { - return fmt.Errorf("failed to collect TCP connection states for %s: %w", "ipv4", err) + errs := make([]error, 0) + + if stateCounts, err := iphlpapi.GetTCPConnectionStates(windows.AF_INET); err != nil { + errs = append(errs, fmt.Errorf("failed to collect TCP connection states for %s: %w", ipAddressFamilyIPv4, err)) + } else { + c.sendTCPStateMetrics(ch, stateCounts, ipAddressFamilyIPv4) } - c.sendTCPStateMetrics(ch, stateCounts, "ipv4") - - stateCounts, err = iphlpapi.GetTCPConnectionStates(windows.AF_INET6) - if err != nil { - return fmt.Errorf("failed to collect TCP6 connection states for %s: %w", "ipv6", err) + if stateCounts, err := iphlpapi.GetTCPConnectionStates(windows.AF_INET6); err != nil { + errs = append(errs, fmt.Errorf("failed to collect TCP6 connection states for %s: %w", ipAddressFamilyIPv6, err)) + } else { + c.sendTCPStateMetrics(ch, stateCounts, ipAddressFamilyIPv6) } - c.sendTCPStateMetrics(ch, stateCounts, "ipv6") - - return nil + return errors.Join(errs...) } func (c *Collector) sendTCPStateMetrics(ch chan<- prometheus.Metric, stateCounts map[iphlpapi.MIB_TCP_STATE]uint32, af string) { diff --git a/internal/collector/udp/udp.go b/internal/collector/udp/udp.go index 0a7c656c..afa53107 100644 --- a/internal/collector/udp/udp.go +++ b/internal/collector/udp/udp.go @@ -18,6 +18,7 @@ package udp import ( + "errors" "fmt" "log/slog" @@ -107,19 +108,21 @@ func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error { nil, ) + errs := make([]error, 0) + var err error c.perfDataCollector4, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "UDPv4", nil) if err != nil { - return fmt.Errorf("failed to create UDPv4 collector: %w", err) + errs = append(errs, fmt.Errorf("failed to create UDPv4 collector: %w", err)) } c.perfDataCollector6, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "UDPv6", nil) if err != nil { - return fmt.Errorf("failed to create UDPv6 collector: %w", err) + errs = append(errs, fmt.Errorf("failed to create UDPv6 collector: %w", err)) } - return nil + return errors.Join(errs...) } // Collect sends the metric values for each metric @@ -129,46 +132,50 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) error { } func (c *Collector) collect(ch chan<- prometheus.Metric) error { - err := c.perfDataCollector4.Collect(&c.perfDataObject4) - if err != nil { - return fmt.Errorf("failed to collect UDPv4 metrics: %w", err) + errs := make([]error, 0) + + if err := c.perfDataCollector4.Collect(&c.perfDataObject4); err != nil { + errs = append(errs, fmt.Errorf("failed to collect UDPv4 metrics: %w", err)) + } else if len(c.perfDataObject4) == 0 { + errs = append(errs, fmt.Errorf("failed to collect UDPv4 metrics: %w", types.ErrNoDataUnexpected)) + } else { + c.writeUDPCounters(ch, c.perfDataObject4, "ipv4") } - c.writeUDPCounters(ch, c.perfDataObject4, []string{"ipv4"}) - - err = c.perfDataCollector6.Collect(&c.perfDataObject6) - if err != nil { - return fmt.Errorf("failed to collect UDPv6 metrics: %w", err) + if err := c.perfDataCollector6.Collect(&c.perfDataObject6); err != nil { + errs = append(errs, fmt.Errorf("failed to collect UDPv6 metrics: %w", err)) + } else if len(c.perfDataObject6) == 0 { + errs = append(errs, fmt.Errorf("failed to collect UDPv6 metrics: %w", types.ErrNoDataUnexpected)) + } else { + c.writeUDPCounters(ch, c.perfDataObject6, "ipv6") } - c.writeUDPCounters(ch, c.perfDataObject6, []string{"ipv6"}) - - return nil + return errors.Join(errs...) } -func (c *Collector) writeUDPCounters(ch chan<- prometheus.Metric, metrics []perfDataCounterValues, labels []string) { +func (c *Collector) writeUDPCounters(ch chan<- prometheus.Metric, metrics []perfDataCounterValues, af string) { ch <- prometheus.MustNewConstMetric( c.datagramsNoPortTotal, prometheus.CounterValue, metrics[0].DatagramsNoPortPerSec, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.datagramsReceivedErrorsTotal, prometheus.CounterValue, metrics[0].DatagramsReceivedErrors, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.datagramsReceivedTotal, prometheus.GaugeValue, metrics[0].DatagramsReceivedPerSec, - labels..., + af, ) ch <- prometheus.MustNewConstMetric( c.datagramsSentTotal, prometheus.CounterValue, metrics[0].DatagramsSentPerSec, - labels..., + af, ) } diff --git a/internal/headers/iphlpapi/iphlpapi.go b/internal/headers/iphlpapi/iphlpapi.go index 8bd71167..b6d8d73d 100644 --- a/internal/headers/iphlpapi/iphlpapi.go +++ b/internal/headers/iphlpapi/iphlpapi.go @@ -41,7 +41,7 @@ func GetTCPConnectionStates(family uint32) (map[MIB_TCP_STATE]uint32, error) { case windows.AF_INET: table, err := getExtendedTcpTable[MIB_TCPROW_OWNER_PID](family, TCPTableOwnerPIDAll) if err != nil { - return nil, err + return nil, fmt.Errorf("failed getExtendedTcpTable: %w", err) } for _, row := range table { @@ -52,7 +52,7 @@ func GetTCPConnectionStates(family uint32) (map[MIB_TCP_STATE]uint32, error) { case windows.AF_INET6: table, err := getExtendedTcpTable[MIB_TCP6ROW_OWNER_PID](family, TCPTableOwnerPIDAll) if err != nil { - return nil, err + return nil, fmt.Errorf("failed getExtendedTcpTable: %w", err) } for _, row := range table { @@ -102,12 +102,12 @@ func getExtendedTcpTable[T any](ulAf uint32, tableClass uint32) ([]T, error) { var size uint32 ret, _, _ := procGetExtendedTcpTable.Call( - uintptr(0), + 0, uintptr(unsafe.Pointer(&size)), - uintptr(0), + 0, uintptr(ulAf), uintptr(tableClass), - uintptr(0), + 0, ) if ret != uintptr(windows.ERROR_INSUFFICIENT_BUFFER) { @@ -119,10 +119,10 @@ func getExtendedTcpTable[T any](ulAf uint32, tableClass uint32) ([]T, error) { ret, _, _ = procGetExtendedTcpTable.Call( uintptr(unsafe.Pointer(&buf[0])), uintptr(unsafe.Pointer(&size)), - uintptr(0), + 0, uintptr(ulAf), uintptr(tableClass), - uintptr(0), + 0, ) if ret != 0 {