tcp: relax metrics collectos on error (#2041)

This commit is contained in:
Jan-Otto Kröpke
2025-05-20 15:37:01 +02:00
committed by GitHub
parent 898e16bcb1
commit 068bcb7237
3 changed files with 110 additions and 84 deletions

View File

@@ -33,7 +33,15 @@ import (
"golang.org/x/sys/windows" "golang.org/x/sys/windows"
) )
const Name = "tcp" const (
Name = "tcp"
ipAddressFamilyIPv4 = "ipv4"
ipAddressFamilyIPv6 = "ipv6"
subCollectorMetrics = "metrics"
subCollectorConnectionsState = "connections_state"
)
type Config struct { type Config struct {
CollectorsEnabled []string `yaml:"enabled"` CollectorsEnabled []string `yaml:"enabled"`
@@ -42,8 +50,8 @@ type Config struct {
//nolint:gochecknoglobals //nolint:gochecknoglobals
var ConfigDefaults = Config{ var ConfigDefaults = Config{
CollectorsEnabled: []string{ CollectorsEnabled: []string{
"metrics", subCollectorMetrics,
"connections_state", subCollectorConnectionsState,
}, },
} }
@@ -111,7 +119,7 @@ func (c *Collector) GetName() string {
} }
func (c *Collector) Close() error { func (c *Collector) Close() error {
if slices.Contains(c.config.CollectorsEnabled, "metrics") { if slices.Contains(c.config.CollectorsEnabled, subCollectorMetrics) {
c.perfDataCollector4.Close() c.perfDataCollector4.Close()
c.perfDataCollector6.Close() c.perfDataCollector6.Close()
} }
@@ -120,79 +128,86 @@ func (c *Collector) Close() error {
} }
func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error { func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error {
labels := []string{"af"}
c.connectionFailures = prometheus.NewDesc( c.connectionFailures = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "connection_failures_total"), prometheus.BuildFQName(types.Namespace, Name, "connection_failures_total"),
"(TCP.ConnectionFailures)", "(TCP.ConnectionFailures)",
[]string{"af"}, labels,
nil, nil,
) )
c.connectionsActive = prometheus.NewDesc( c.connectionsActive = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "connections_active_total"), prometheus.BuildFQName(types.Namespace, Name, "connections_active_total"),
"(TCP.ConnectionsActive)", "(TCP.ConnectionsActive)",
[]string{"af"}, labels,
nil, nil,
) )
c.connectionsEstablished = prometheus.NewDesc( c.connectionsEstablished = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "connections_established"), prometheus.BuildFQName(types.Namespace, Name, "connections_established"),
"(TCP.ConnectionsEstablished)", "(TCP.ConnectionsEstablished)",
[]string{"af"}, labels,
nil, nil,
) )
c.connectionsPassive = prometheus.NewDesc( c.connectionsPassive = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "connections_passive_total"), prometheus.BuildFQName(types.Namespace, Name, "connections_passive_total"),
"(TCP.ConnectionsPassive)", "(TCP.ConnectionsPassive)",
[]string{"af"}, labels,
nil, nil,
) )
c.connectionsReset = prometheus.NewDesc( c.connectionsReset = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "connections_reset_total"), prometheus.BuildFQName(types.Namespace, Name, "connections_reset_total"),
"(TCP.ConnectionsReset)", "(TCP.ConnectionsReset)",
[]string{"af"}, labels,
nil, nil,
) )
c.segmentsTotal = prometheus.NewDesc( c.segmentsTotal = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "segments_total"), prometheus.BuildFQName(types.Namespace, Name, "segments_total"),
"(TCP.SegmentsTotal)", "(TCP.SegmentsTotal)",
[]string{"af"}, labels,
nil, nil,
) )
c.segmentsReceivedTotal = prometheus.NewDesc( c.segmentsReceivedTotal = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "segments_received_total"), prometheus.BuildFQName(types.Namespace, Name, "segments_received_total"),
"(TCP.SegmentsReceivedTotal)", "(TCP.SegmentsReceivedTotal)",
[]string{"af"}, labels,
nil, nil,
) )
c.segmentsRetransmittedTotal = prometheus.NewDesc( c.segmentsRetransmittedTotal = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "segments_retransmitted_total"), prometheus.BuildFQName(types.Namespace, Name, "segments_retransmitted_total"),
"(TCP.SegmentsRetransmittedTotal)", "(TCP.SegmentsRetransmittedTotal)",
[]string{"af"}, labels,
nil, nil,
) )
c.segmentsSentTotal = prometheus.NewDesc( c.segmentsSentTotal = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "segments_sent_total"), prometheus.BuildFQName(types.Namespace, Name, "segments_sent_total"),
"(TCP.SegmentsSentTotal)", "(TCP.SegmentsSentTotal)",
[]string{"af"}, labels,
nil, nil,
) )
c.connectionsStateCount = prometheus.NewDesc( c.connectionsStateCount = prometheus.NewDesc(
prometheus.BuildFQName(types.Namespace, Name, "connections_state_count"), prometheus.BuildFQName(types.Namespace, Name, "connections_state_count"),
"Number of TCP connections by state and address family", "Number of TCP connections by state and address family",
[]string{"af", "state"}, nil, []string{"af", "state"},
nil,
) )
var err error errs := make([]error, 0)
c.perfDataCollector4, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "TCPv4", nil) if slices.Contains(c.config.CollectorsEnabled, subCollectorMetrics) {
if err != nil { var err error
return fmt.Errorf("failed to create TCPv4 collector: %w", err)
c.perfDataCollector4, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "TCPv4", nil)
if err != nil {
errs = append(errs, fmt.Errorf("failed to create TCPv4 collector: %w", err))
}
c.perfDataCollector6, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "TCPv6", nil)
if err != nil {
errs = append(errs, fmt.Errorf("failed to create TCPv6 collector: %w", err))
}
} }
c.perfDataCollector6, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "TCPv6", nil) return errors.Join(errs...)
if err != nil {
return fmt.Errorf("failed to create TCPv6 collector: %w", err)
}
return nil
} }
// Collect sends the metric values for each metric // Collect sends the metric values for each metric
@@ -200,13 +215,13 @@ func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error {
func (c *Collector) Collect(ch chan<- prometheus.Metric) error { func (c *Collector) Collect(ch chan<- prometheus.Metric) error {
errs := make([]error, 0) errs := make([]error, 0)
if slices.Contains(c.config.CollectorsEnabled, "metrics") { if slices.Contains(c.config.CollectorsEnabled, subCollectorMetrics) {
if err := c.collect(ch); err != nil { if err := c.collect(ch); err != nil {
errs = append(errs, fmt.Errorf("failed collecting tcp metrics: %w", err)) errs = append(errs, fmt.Errorf("failed collecting tcp metrics: %w", err))
} }
} }
if slices.Contains(c.config.CollectorsEnabled, "connections_state") { if slices.Contains(c.config.CollectorsEnabled, subCollectorConnectionsState) {
if err := c.collectConnectionsState(ch); err != nil { if err := c.collectConnectionsState(ch); err != nil {
errs = append(errs, fmt.Errorf("failed collecting tcp connection state metrics: %w", err)) errs = append(errs, fmt.Errorf("failed collecting tcp connection state metrics: %w", err))
} }
@@ -216,96 +231,100 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) error {
} }
func (c *Collector) collect(ch chan<- prometheus.Metric) error { func (c *Collector) collect(ch chan<- prometheus.Metric) error {
err := c.perfDataCollector4.Collect(&c.perfDataObject4) errs := make([]error, 0)
if err != nil {
return fmt.Errorf("failed to collect TCPv4 metrics[0]. %w", err) if err := c.perfDataCollector4.Collect(&c.perfDataObject4); err != nil {
errs = append(errs, fmt.Errorf("failed to collect TCPv4 metrics. %w", err))
} else if len(c.perfDataObject4) == 0 {
errs = append(errs, fmt.Errorf("failed to collect TCPv4 metrics: %w", types.ErrNoDataUnexpected))
} else {
c.writeTCPCounters(ch, c.perfDataObject4, ipAddressFamilyIPv4)
} }
c.writeTCPCounters(ch, c.perfDataObject4, []string{"ipv4"}) if err := c.perfDataCollector6.Collect(&c.perfDataObject6); err != nil {
errs = append(errs, fmt.Errorf("failed to collect TCPv6 metrics. %w", err))
err = c.perfDataCollector6.Collect(&c.perfDataObject6) } else if len(c.perfDataObject6) == 0 {
if err != nil { errs = append(errs, fmt.Errorf("failed to collect TCPv6 metrics: %w", types.ErrNoDataUnexpected))
return fmt.Errorf("failed to collect TCPv6 metrics[0]. %w", err) } else {
c.writeTCPCounters(ch, c.perfDataObject6, ipAddressFamilyIPv6)
} }
c.writeTCPCounters(ch, c.perfDataObject6, []string{"ipv6"}) return errors.Join(errs...)
return nil
} }
func (c *Collector) writeTCPCounters(ch chan<- prometheus.Metric, metrics []perfDataCounterValues, labels []string) { func (c *Collector) writeTCPCounters(ch chan<- prometheus.Metric, metrics []perfDataCounterValues, af string) {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.connectionFailures, c.connectionFailures,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].ConnectionFailures, metrics[0].ConnectionFailures,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.connectionsActive, c.connectionsActive,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].ConnectionsActive, metrics[0].ConnectionsActive,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.connectionsEstablished, c.connectionsEstablished,
prometheus.GaugeValue, prometheus.GaugeValue,
metrics[0].ConnectionsEstablished, metrics[0].ConnectionsEstablished,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.connectionsPassive, c.connectionsPassive,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].ConnectionsPassive, metrics[0].ConnectionsPassive,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.connectionsReset, c.connectionsReset,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].ConnectionsReset, metrics[0].ConnectionsReset,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.segmentsTotal, c.segmentsTotal,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].SegmentsPerSec, metrics[0].SegmentsPerSec,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.segmentsReceivedTotal, c.segmentsReceivedTotal,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].SegmentsReceivedPerSec, metrics[0].SegmentsReceivedPerSec,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.segmentsRetransmittedTotal, c.segmentsRetransmittedTotal,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].SegmentsRetransmittedPerSec, metrics[0].SegmentsRetransmittedPerSec,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.segmentsSentTotal, c.segmentsSentTotal,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].SegmentsSentPerSec, metrics[0].SegmentsSentPerSec,
labels..., af,
) )
} }
func (c *Collector) collectConnectionsState(ch chan<- prometheus.Metric) error { func (c *Collector) collectConnectionsState(ch chan<- prometheus.Metric) error {
stateCounts, err := iphlpapi.GetTCPConnectionStates(windows.AF_INET) errs := make([]error, 0)
if err != nil {
return fmt.Errorf("failed to collect TCP connection states for %s: %w", "ipv4", err) if stateCounts, err := iphlpapi.GetTCPConnectionStates(windows.AF_INET); err != nil {
errs = append(errs, fmt.Errorf("failed to collect TCP connection states for %s: %w", ipAddressFamilyIPv4, err))
} else {
c.sendTCPStateMetrics(ch, stateCounts, ipAddressFamilyIPv4)
} }
c.sendTCPStateMetrics(ch, stateCounts, "ipv4") if stateCounts, err := iphlpapi.GetTCPConnectionStates(windows.AF_INET6); err != nil {
errs = append(errs, fmt.Errorf("failed to collect TCP6 connection states for %s: %w", ipAddressFamilyIPv6, err))
stateCounts, err = iphlpapi.GetTCPConnectionStates(windows.AF_INET6) } else {
if err != nil { c.sendTCPStateMetrics(ch, stateCounts, ipAddressFamilyIPv6)
return fmt.Errorf("failed to collect TCP6 connection states for %s: %w", "ipv6", err)
} }
c.sendTCPStateMetrics(ch, stateCounts, "ipv6") return errors.Join(errs...)
return nil
} }
func (c *Collector) sendTCPStateMetrics(ch chan<- prometheus.Metric, stateCounts map[iphlpapi.MIB_TCP_STATE]uint32, af string) { func (c *Collector) sendTCPStateMetrics(ch chan<- prometheus.Metric, stateCounts map[iphlpapi.MIB_TCP_STATE]uint32, af string) {

View File

@@ -18,6 +18,7 @@
package udp package udp
import ( import (
"errors"
"fmt" "fmt"
"log/slog" "log/slog"
@@ -107,19 +108,21 @@ func (c *Collector) Build(_ *slog.Logger, _ *mi.Session) error {
nil, nil,
) )
errs := make([]error, 0)
var err error var err error
c.perfDataCollector4, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "UDPv4", nil) c.perfDataCollector4, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "UDPv4", nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to create UDPv4 collector: %w", err) errs = append(errs, fmt.Errorf("failed to create UDPv4 collector: %w", err))
} }
c.perfDataCollector6, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "UDPv6", nil) c.perfDataCollector6, err = pdh.NewCollector[perfDataCounterValues](pdh.CounterTypeRaw, "UDPv6", nil)
if err != nil { if err != nil {
return fmt.Errorf("failed to create UDPv6 collector: %w", err) errs = append(errs, fmt.Errorf("failed to create UDPv6 collector: %w", err))
} }
return nil return errors.Join(errs...)
} }
// Collect sends the metric values for each metric // Collect sends the metric values for each metric
@@ -129,46 +132,50 @@ func (c *Collector) Collect(ch chan<- prometheus.Metric) error {
} }
func (c *Collector) collect(ch chan<- prometheus.Metric) error { func (c *Collector) collect(ch chan<- prometheus.Metric) error {
err := c.perfDataCollector4.Collect(&c.perfDataObject4) errs := make([]error, 0)
if err != nil {
return fmt.Errorf("failed to collect UDPv4 metrics: %w", err) if err := c.perfDataCollector4.Collect(&c.perfDataObject4); err != nil {
errs = append(errs, fmt.Errorf("failed to collect UDPv4 metrics: %w", err))
} else if len(c.perfDataObject4) == 0 {
errs = append(errs, fmt.Errorf("failed to collect UDPv4 metrics: %w", types.ErrNoDataUnexpected))
} else {
c.writeUDPCounters(ch, c.perfDataObject4, "ipv4")
} }
c.writeUDPCounters(ch, c.perfDataObject4, []string{"ipv4"}) if err := c.perfDataCollector6.Collect(&c.perfDataObject6); err != nil {
errs = append(errs, fmt.Errorf("failed to collect UDPv6 metrics: %w", err))
err = c.perfDataCollector6.Collect(&c.perfDataObject6) } else if len(c.perfDataObject6) == 0 {
if err != nil { errs = append(errs, fmt.Errorf("failed to collect UDPv6 metrics: %w", types.ErrNoDataUnexpected))
return fmt.Errorf("failed to collect UDPv6 metrics: %w", err) } else {
c.writeUDPCounters(ch, c.perfDataObject6, "ipv6")
} }
c.writeUDPCounters(ch, c.perfDataObject6, []string{"ipv6"}) return errors.Join(errs...)
return nil
} }
func (c *Collector) writeUDPCounters(ch chan<- prometheus.Metric, metrics []perfDataCounterValues, labels []string) { func (c *Collector) writeUDPCounters(ch chan<- prometheus.Metric, metrics []perfDataCounterValues, af string) {
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.datagramsNoPortTotal, c.datagramsNoPortTotal,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].DatagramsNoPortPerSec, metrics[0].DatagramsNoPortPerSec,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.datagramsReceivedErrorsTotal, c.datagramsReceivedErrorsTotal,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].DatagramsReceivedErrors, metrics[0].DatagramsReceivedErrors,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.datagramsReceivedTotal, c.datagramsReceivedTotal,
prometheus.GaugeValue, prometheus.GaugeValue,
metrics[0].DatagramsReceivedPerSec, metrics[0].DatagramsReceivedPerSec,
labels..., af,
) )
ch <- prometheus.MustNewConstMetric( ch <- prometheus.MustNewConstMetric(
c.datagramsSentTotal, c.datagramsSentTotal,
prometheus.CounterValue, prometheus.CounterValue,
metrics[0].DatagramsSentPerSec, metrics[0].DatagramsSentPerSec,
labels..., af,
) )
} }

View File

@@ -41,7 +41,7 @@ func GetTCPConnectionStates(family uint32) (map[MIB_TCP_STATE]uint32, error) {
case windows.AF_INET: case windows.AF_INET:
table, err := getExtendedTcpTable[MIB_TCPROW_OWNER_PID](family, TCPTableOwnerPIDAll) table, err := getExtendedTcpTable[MIB_TCPROW_OWNER_PID](family, TCPTableOwnerPIDAll)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed getExtendedTcpTable: %w", err)
} }
for _, row := range table { for _, row := range table {
@@ -52,7 +52,7 @@ func GetTCPConnectionStates(family uint32) (map[MIB_TCP_STATE]uint32, error) {
case windows.AF_INET6: case windows.AF_INET6:
table, err := getExtendedTcpTable[MIB_TCP6ROW_OWNER_PID](family, TCPTableOwnerPIDAll) table, err := getExtendedTcpTable[MIB_TCP6ROW_OWNER_PID](family, TCPTableOwnerPIDAll)
if err != nil { if err != nil {
return nil, err return nil, fmt.Errorf("failed getExtendedTcpTable: %w", err)
} }
for _, row := range table { for _, row := range table {
@@ -102,12 +102,12 @@ func getExtendedTcpTable[T any](ulAf uint32, tableClass uint32) ([]T, error) {
var size uint32 var size uint32
ret, _, _ := procGetExtendedTcpTable.Call( ret, _, _ := procGetExtendedTcpTable.Call(
uintptr(0), 0,
uintptr(unsafe.Pointer(&size)), uintptr(unsafe.Pointer(&size)),
uintptr(0), 0,
uintptr(ulAf), uintptr(ulAf),
uintptr(tableClass), uintptr(tableClass),
uintptr(0), 0,
) )
if ret != uintptr(windows.ERROR_INSUFFICIENT_BUFFER) { if ret != uintptr(windows.ERROR_INSUFFICIENT_BUFFER) {
@@ -119,10 +119,10 @@ func getExtendedTcpTable[T any](ulAf uint32, tableClass uint32) ([]T, error) {
ret, _, _ = procGetExtendedTcpTable.Call( ret, _, _ = procGetExtendedTcpTable.Call(
uintptr(unsafe.Pointer(&buf[0])), uintptr(unsafe.Pointer(&buf[0])),
uintptr(unsafe.Pointer(&size)), uintptr(unsafe.Pointer(&size)),
uintptr(0), 0,
uintptr(ulAf), uintptr(ulAf),
uintptr(tableClass), uintptr(tableClass),
uintptr(0), 0,
) )
if ret != 0 { if ret != 0 {