Reconnect conntrack netlink listener on error

This commit is contained in:
Viktor Liu
2026-04-14 17:13:18 +02:00
parent e804a705b7
commit 56a13cdece

View File

@@ -7,7 +7,9 @@ import (
"fmt" "fmt"
"net/netip" "net/netip"
"sync" "sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid" "github.com/google/uuid"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
nfct "github.com/ti-mo/conntrack" nfct "github.com/ti-mo/conntrack"
@@ -17,7 +19,12 @@ import (
nbnet "github.com/netbirdio/netbird/client/net" nbnet "github.com/netbirdio/netbird/client/net"
) )
const defaultChannelSize = 100 const (
defaultChannelSize = 100
reconnectInitInterval = 5 * time.Second
reconnectMaxInterval = 5 * time.Minute
reconnectRandomization = 0.5
)
// ConnTrack manages kernel-based conntrack events // ConnTrack manages kernel-based conntrack events
type ConnTrack struct { type ConnTrack struct {
@@ -79,6 +86,12 @@ func (c *ConnTrack) Start(enableCounters bool) error {
return fmt.Errorf("start conntrack listener: %w", err) return fmt.Errorf("start conntrack listener: %w", err)
} }
// Drain any stale stop signal from a previous cycle.
select {
case <-c.done:
default:
}
c.started = true c.started = true
go c.receiverRoutine(events, errChan) go c.receiverRoutine(events, errChan)
@@ -92,17 +105,90 @@ func (c *ConnTrack) receiverRoutine(events chan nfct.Event, errChan chan error)
case event := <-events: case event := <-events:
c.handleEvent(event) c.handleEvent(event)
case err := <-errChan: case err := <-errChan:
log.Errorf("Error from conntrack event listener: %v", err) if events, errChan = c.handleListenerError(err); events == nil {
if err := c.conn.Close(); err != nil { return
log.Errorf("Error closing conntrack connection: %v", err)
} }
return
case <-c.done: case <-c.done:
return return
} }
} }
} }
// handleListenerError closes the failed connection and attempts to reconnect.
// Returns new channels on success, or nil if shutdown was requested.
func (c *ConnTrack) handleListenerError(err error) (chan nfct.Event, chan error) {
log.Warnf("conntrack event listener failed: %v", err)
c.closeConn()
return c.reconnect()
}
func (c *ConnTrack) closeConn() {
c.mux.Lock()
defer c.mux.Unlock()
if c.conn != nil {
if err := c.conn.Close(); err != nil {
log.Debugf("close conntrack connection: %v", err)
}
c.conn = nil
}
}
// reconnect attempts to re-establish the conntrack netlink listener with exponential backoff.
// Returns new channels on success, or nil if shutdown was requested.
func (c *ConnTrack) reconnect() (chan nfct.Event, chan error) {
bo := &backoff.ExponentialBackOff{
InitialInterval: reconnectInitInterval,
RandomizationFactor: reconnectRandomization,
Multiplier: backoff.DefaultMultiplier,
MaxInterval: reconnectMaxInterval,
MaxElapsedTime: 0, // retry indefinitely
Clock: backoff.SystemClock,
}
bo.Reset()
for {
delay := bo.NextBackOff()
log.Infof("reconnecting conntrack listener in %s", delay)
select {
case <-c.done:
c.mux.Lock()
c.started = false
c.mux.Unlock()
return nil, nil
case <-time.After(delay):
}
conn, err := nfct.Dial(nil)
if err != nil {
log.Warnf("reconnect conntrack dial: %v", err)
continue
}
events := make(chan nfct.Event, defaultChannelSize)
errChan, err := conn.Listen(events, 1, []netfilter.NetlinkGroup{
netfilter.GroupCTNew,
netfilter.GroupCTDestroy,
})
if err != nil {
log.Warnf("reconnect conntrack listen: %v", err)
if closeErr := conn.Close(); closeErr != nil {
log.Debugf("close conntrack connection: %v", closeErr)
}
continue
}
c.mux.Lock()
c.conn = conn
c.mux.Unlock()
log.Infof("conntrack listener reconnected successfully")
return events, errChan
}
}
// Stop stops the connection tracking. This method is idempotent. // Stop stops the connection tracking. This method is idempotent.
func (c *ConnTrack) Stop() { func (c *ConnTrack) Stop() {
c.mux.Lock() c.mux.Lock()