Add kernel conntrack counters (#3434)

This commit is contained in:
Viktor Liu
2025-03-04 16:46:03 +01:00
committed by GitHub
parent 8c81a823fa
commit acf172b52c
9 changed files with 545 additions and 360 deletions

View File

@@ -26,9 +26,10 @@ type ConnTrack struct {
conn *nfct.Conn
mux sync.Mutex
instanceID uuid.UUID
started bool
done chan struct{}
instanceID uuid.UUID
started bool
done chan struct{}
sysctlModified bool
}
// New creates a new connection tracker that interfaces with the kernel's conntrack system
@@ -43,7 +44,7 @@ func New(flowLogger nftypes.FlowLogger, iface nftypes.IFaceMapper) *ConnTrack {
}
// Start begins tracking connections by listening for conntrack events. This method is idempotent.
func (c *ConnTrack) Start() error {
func (c *ConnTrack) Start(enableCounters bool) error {
c.mux.Lock()
defer c.mux.Unlock()
@@ -53,6 +54,10 @@ func (c *ConnTrack) Start() error {
log.Info("Starting conntrack event listening")
if enableCounters {
c.EnableAccounting()
}
conn, err := nfct.Dial(nil)
if err != nil {
return fmt.Errorf("dial conntrack: %w", err)
@@ -121,6 +126,8 @@ func (c *ConnTrack) Stop() {
}
c.started = false
c.RestoreAccounting()
}
// Close stops listening for events and cleans up resources
@@ -139,6 +146,9 @@ func (c *ConnTrack) Close() error {
err := c.conn.Close()
c.conn = nil
c.started = false
c.RestoreAccounting()
if err != nil {
return fmt.Errorf("close conntrack: %w", err)
}
@@ -153,6 +163,10 @@ func (c *ConnTrack) handleEvent(event nfct.Event) {
return
}
if event.Type != nfct.EventNew && event.Type != nfct.EventDestroy {
return
}
flow := *event.Flow
proto := nftypes.Protocol(flow.TupleOrig.Proto.Protocol)
@@ -178,13 +192,35 @@ func (c *ConnTrack) handleEvent(event nfct.Event) {
icmpCode = flow.TupleOrig.Proto.ICMPCode
}
switch event.Type {
case nfct.EventNew:
c.handleNewFlow(flow.ID, proto, srcIP, dstIP, srcPort, dstPort, icmpType, icmpCode)
flowID := c.getFlowID(flow.ID)
direction := c.inferDirection(srcIP, dstIP)
case nfct.EventDestroy:
c.handleDestroyFlow(flow.ID, proto, srcIP, dstIP, srcPort, dstPort, icmpType, icmpCode)
eventType := nftypes.TypeStart
eventStr := "New"
if event.Type == nfct.EventDestroy {
eventType = nftypes.TypeEnd
eventStr = "Ended"
}
log.Tracef("%s %s %s connection: %s:%d -> %s:%d", eventStr, direction, proto, srcIP, srcPort, dstIP, dstPort)
c.flowLogger.StoreEvent(nftypes.EventFields{
FlowID: flowID,
Type: eventType,
Direction: direction,
Protocol: proto,
SourceIP: srcIP,
DestIP: dstIP,
SourcePort: srcPort,
DestPort: dstPort,
ICMPType: icmpType,
ICMPCode: icmpCode,
RxPackets: c.mapRxPackets(flow, direction),
TxPackets: c.mapTxPackets(flow, direction),
RxBytes: c.mapRxBytes(flow, direction),
TxBytes: c.mapTxBytes(flow, direction),
})
}
// relevantFlow checks if the flow is related to the specified interface
@@ -199,42 +235,44 @@ func (c *ConnTrack) relevantFlow(srcIP, dstIP netip.Addr) bool {
return true
}
func (c *ConnTrack) handleNewFlow(id uint32, proto nftypes.Protocol, srcIP, dstIP netip.Addr, srcPort, dstPort uint16, icmpType, icmpCode uint8) {
flowID := c.getFlowID(id)
direction := c.inferDirection(srcIP, dstIP)
log.Tracef("New %s %s connection: %s:%d -> %s:%d", direction, proto, srcIP, srcPort, dstIP, dstPort)
c.flowLogger.StoreEvent(nftypes.EventFields{
FlowID: flowID,
Type: nftypes.TypeStart,
Direction: direction,
Protocol: proto,
SourceIP: srcIP,
DestIP: dstIP,
SourcePort: srcPort,
DestPort: dstPort,
ICMPType: icmpType,
ICMPCode: icmpCode,
})
// mapRxPackets maps packet counts to RX based on flow direction
func (c *ConnTrack) mapRxPackets(flow nfct.Flow, direction nftypes.Direction) uint64 {
// For Ingress: CountersOrig is from external to us (RX)
// For Egress: CountersReply is from external to us (RX)
if direction == nftypes.Ingress {
return flow.CountersOrig.Packets
}
return flow.CountersReply.Packets
}
func (c *ConnTrack) handleDestroyFlow(id uint32, proto nftypes.Protocol, srcIP, dstIP netip.Addr, srcPort, dstPort uint16, icmpType, icmpCode uint8) {
flowID := c.getFlowID(id)
direction := c.inferDirection(srcIP, dstIP)
// mapTxPackets maps packet counts to TX based on flow direction
func (c *ConnTrack) mapTxPackets(flow nfct.Flow, direction nftypes.Direction) uint64 {
// For Ingress: CountersReply is from us to external (TX)
// For Egress: CountersOrig is from us to external (TX)
if direction == nftypes.Ingress {
return flow.CountersReply.Packets
}
return flow.CountersOrig.Packets
}
log.Tracef("Ended %s %s connection: %s:%d -> %s:%d", direction, proto, srcIP, srcPort, dstIP, dstPort)
c.flowLogger.StoreEvent(nftypes.EventFields{
FlowID: flowID,
Type: nftypes.TypeEnd,
Direction: direction,
Protocol: proto,
SourceIP: srcIP,
DestIP: dstIP,
SourcePort: srcPort,
DestPort: dstPort,
ICMPType: icmpType,
ICMPCode: icmpCode,
})
// mapRxBytes maps byte counts to RX based on flow direction
func (c *ConnTrack) mapRxBytes(flow nfct.Flow, direction nftypes.Direction) uint64 {
// For Ingress: CountersOrig is from external to us (RX)
// For Egress: CountersReply is from external to us (RX)
if direction == nftypes.Ingress {
return flow.CountersOrig.Bytes
}
return flow.CountersReply.Bytes
}
// mapTxBytes maps byte counts to TX based on flow direction
func (c *ConnTrack) mapTxBytes(flow nfct.Flow, direction nftypes.Direction) uint64 {
// For Ingress: CountersReply is from us to external (TX)
// For Egress: CountersOrig is from us to external (TX)
if direction == nftypes.Ingress {
return flow.CountersReply.Bytes
}
return flow.CountersOrig.Bytes
}
// getFlowID creates a unique UUID based on the conntrack ID and instance ID

View File

@@ -0,0 +1,73 @@
//go:build linux && !android
package conntrack
import (
"fmt"
"os"
"strconv"
"strings"
log "github.com/sirupsen/logrus"
)
const (
// conntrackAcctPath is the sysctl path for conntrack accounting
conntrackAcctPath = "net.netfilter.nf_conntrack_acct"
)
// EnableAccounting ensures that connection tracking accounting is enabled in the kernel.
func (c *ConnTrack) EnableAccounting() {
// haven't restored yet
if c.sysctlModified {
return
}
modified, err := setSysctl(conntrackAcctPath, 1)
if err != nil {
log.Warnf("Failed to enable conntrack accounting: %v", err)
return
}
c.sysctlModified = modified
}
// RestoreAccounting restores the connection tracking accounting setting to its original value.
func (c *ConnTrack) RestoreAccounting() {
if !c.sysctlModified {
return
}
if _, err := setSysctl(conntrackAcctPath, 0); err != nil {
log.Warnf("Failed to restore conntrack accounting: %v", err)
return
}
c.sysctlModified = false
}
// setSysctl sets a sysctl configuration and returns whether it was modified.
func setSysctl(key string, desiredValue int) (bool, error) {
path := fmt.Sprintf("/proc/sys/%s", strings.ReplaceAll(key, ".", "/"))
currentValue, err := os.ReadFile(path)
if err != nil {
return false, fmt.Errorf("read sysctl %s: %w", key, err)
}
currentV, err := strconv.Atoi(strings.TrimSpace(string(currentValue)))
if err != nil && len(currentValue) > 0 {
return false, fmt.Errorf("convert current value to int: %w", err)
}
if currentV == desiredValue {
return false, nil
}
// nolint:gosec
if err := os.WriteFile(path, []byte(strconv.Itoa(desiredValue)), 0644); err != nil {
return false, fmt.Errorf("write sysctl %s: %w", key, err)
}
log.Debugf("Set sysctl %s from %d to %d", key, currentV, desiredValue)
return true, nil
}

View File

@@ -57,7 +57,7 @@ func (m *Manager) Update(update *nftypes.FlowConfig) error {
if update.Enabled {
if m.conntrack != nil {
if err := m.conntrack.Start(); err != nil {
if err := m.conntrack.Start(update.Counters); err != nil {
return fmt.Errorf("start conntrack: %w", err)
}
}
@@ -157,6 +157,10 @@ func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
Protocol: uint32(event.Protocol),
SourceIp: event.SourceIP.AsSlice(),
DestIp: event.DestIP.AsSlice(),
RxPackets: event.RxPackets,
TxPackets: event.TxPackets,
RxBytes: event.RxBytes,
TxBytes: event.TxBytes,
},
}
if event.Protocol == nftypes.ICMP {

View File

@@ -78,12 +78,17 @@ type EventFields struct {
DestPort uint16
ICMPType uint8
ICMPCode uint8
RxPackets uint64
TxPackets uint64
RxBytes uint64
TxBytes uint64
}
type FlowConfig struct {
URL string
Interval time.Duration
Enabled bool
Counters bool
TokenPayload string
TokenSignature string
}
@@ -126,7 +131,7 @@ type Store interface {
// ConnTracker defines the interface for connection tracking functionality
type ConnTracker interface {
// Start begins tracking connections by listening for conntrack events.
Start() error
Start(bool) error
// Stop stops the connection tracking.
Stop()
// Close stops listening for events and cleans up resources