Fix CrowdSec review findings: O(1) prefix lookup, context leak, fail-close tests

This commit is contained in:
Viktor Liu
2026-03-29 08:34:34 +02:00
parent a22c849ae0
commit ae84272a30
6 changed files with 63 additions and 50 deletions

View File

@@ -17,17 +17,12 @@ import (
"github.com/netbirdio/netbird/proxy/internal/restrict"
)
type prefixEntry struct {
prefix netip.Prefix
decision *restrict.CrowdSecDecision
}
// Bouncer wraps a CrowdSec StreamBouncer, maintaining a local cache of
// active decisions for fast IP lookups. It implements restrict.CrowdSecChecker.
type Bouncer struct {
mu sync.RWMutex
ips map[netip.Addr]*restrict.CrowdSecDecision
prefixes []prefixEntry
prefixes map[netip.Prefix]*restrict.CrowdSecDecision
ready atomic.Bool
apiURL string
@@ -47,10 +42,11 @@ var _ restrict.CrowdSecChecker = (*Bouncer)(nil)
// NewBouncer creates a bouncer but does not start the stream.
func NewBouncer(apiURL, apiKey string, logger *log.Entry) *Bouncer {
return &Bouncer{
apiURL: apiURL,
apiKey: apiKey,
logger: logger,
ips: make(map[netip.Addr]*restrict.CrowdSecDecision),
apiURL: apiURL,
apiKey: apiKey,
logger: logger,
ips: make(map[netip.Addr]*restrict.CrowdSecDecision),
prefixes: make(map[netip.Prefix]*restrict.CrowdSecDecision),
}
}
@@ -70,6 +66,8 @@ func (b *Bouncer) Start(ctx context.Context) error {
RetryInitialConnect: true,
}
b.logger.Infof("connecting to CrowdSec LAPI at %s", b.apiURL)
if err := stream.Init(); err != nil {
return err
}
@@ -77,7 +75,7 @@ func (b *Bouncer) Start(ctx context.Context) error {
// Reset state from any previous run.
b.mu.Lock()
b.ips = make(map[netip.Addr]*restrict.CrowdSecDecision)
b.prefixes = nil
b.prefixes = make(map[netip.Prefix]*restrict.CrowdSecDecision)
b.mu.Unlock()
b.ready.Store(false)
@@ -133,6 +131,10 @@ func (b *Bouncer) Ready() bool {
// CheckIP looks up addr in the local decision cache. Returns nil if no
// active decision exists for the address.
//
// Prefix lookups are O(1): instead of scanning all stored prefixes, we
// probe the map for every possible containing prefix of the address
// (at most 33 for IPv4, 129 for IPv6).
func (b *Bouncer) CheckIP(addr netip.Addr) *restrict.CrowdSecDecision {
addr = addr.Unmap()
@@ -143,9 +145,16 @@ func (b *Bouncer) CheckIP(addr netip.Addr) *restrict.CrowdSecDecision {
return d
}
for _, pe := range b.prefixes {
if pe.prefix.Contains(addr) {
return pe.decision
maxBits := 32
if addr.Is6() {
maxBits = 128
}
// Walk from most-specific to least-specific prefix so the narrowest
// matching decision wins when ranges overlap.
for bits := maxBits; bits >= 0; bits-- {
prefix := netip.PrefixFrom(addr, bits).Masked()
if d, ok := b.prefixes[prefix]; ok {
return d
}
}
@@ -189,8 +198,8 @@ func (b *Bouncer) applyDeleted(decisions []*models.Decision) {
b.logger.Debugf("skip unparsable CrowdSec range deletion %q: %v", value, err)
continue
}
prefix = netip.PrefixFrom(prefix.Addr().Unmap(), prefix.Bits())
b.removePrefix(prefix)
prefix = normalizePrefix(prefix)
delete(b.prefixes, prefix)
} else {
addr, err := netip.ParseAddr(value)
if err != nil {
@@ -216,10 +225,8 @@ func (b *Bouncer) applyNew(decisions []*models.Decision) {
b.logger.Debugf("skip unparsable CrowdSec range %q: %v", value, err)
continue
}
// Normalize v4-mapped-v6 prefix base so Contains() matches unmapped query addresses.
prefix = netip.PrefixFrom(prefix.Addr().Unmap(), prefix.Bits())
b.removePrefix(prefix)
b.prefixes = append(b.prefixes, prefixEntry{prefix: prefix, decision: dec})
prefix = normalizePrefix(prefix)
b.prefixes[prefix] = dec
} else {
addr, err := netip.ParseAddr(value)
if err != nil {
@@ -231,12 +238,8 @@ func (b *Bouncer) applyNew(decisions []*models.Decision) {
}
}
func (b *Bouncer) removePrefix(target netip.Prefix) {
for i := 0; i < len(b.prefixes); i++ {
if b.prefixes[i].prefix == target {
b.prefixes[i] = b.prefixes[len(b.prefixes)-1]
b.prefixes = b.prefixes[:len(b.prefixes)-1]
return
}
}
// normalizePrefix unmaps v4-mapped-v6 addresses and zeros host bits so
// the prefix is a valid map key that matches CheckIP's probe logic.
func normalizePrefix(p netip.Prefix) netip.Prefix {
return netip.PrefixFrom(p.Addr().Unmap(), p.Bits()).Masked()
}

View File

@@ -40,10 +40,7 @@ func TestBouncer_CheckIP_ExactMatch(t *testing.T) {
func TestBouncer_CheckIP_PrefixMatch(t *testing.T) {
b := newTestBouncer()
b.ready.Store(true)
b.prefixes = append(b.prefixes, prefixEntry{
prefix: netip.MustParsePrefix("192.168.1.0/24"),
decision: &restrict.CrowdSecDecision{Type: restrict.DecisionBan},
})
b.prefixes[netip.MustParsePrefix("192.168.1.0/24")] = &restrict.CrowdSecDecision{Type: restrict.DecisionBan}
d := b.CheckIP(netip.MustParseAddr("192.168.1.100"))
require.NotNil(t, d)
@@ -74,10 +71,7 @@ func TestBouncer_CheckIP_ExactBeforePrefix(t *testing.T) {
b := newTestBouncer()
b.ready.Store(true)
b.ips[netip.MustParseAddr("10.0.0.1")] = &restrict.CrowdSecDecision{Type: restrict.DecisionCaptcha}
b.prefixes = append(b.prefixes, prefixEntry{
prefix: netip.MustParsePrefix("10.0.0.0/8"),
decision: &restrict.CrowdSecDecision{Type: restrict.DecisionBan},
})
b.prefixes[netip.MustParsePrefix("10.0.0.0/8")] = &restrict.CrowdSecDecision{Type: restrict.DecisionBan}
d := b.CheckIP(netip.MustParseAddr("10.0.0.1"))
require.NotNil(t, d)
@@ -109,7 +103,7 @@ func TestBouncer_ApplyNew_Range(t *testing.T) {
))
require.Len(t, b.prefixes, 1)
assert.Equal(t, netip.MustParsePrefix("10.0.0.0/8"), b.prefixes[0].prefix)
assert.NotNil(t, b.prefixes[netip.MustParsePrefix("10.0.0.0/8")])
}
func TestBouncer_ApplyDeleted_IP(t *testing.T) {
@@ -128,17 +122,15 @@ func TestBouncer_ApplyDeleted_IP(t *testing.T) {
func TestBouncer_ApplyDeleted_Range(t *testing.T) {
b := newTestBouncer()
b.prefixes = append(b.prefixes,
prefixEntry{prefix: netip.MustParsePrefix("10.0.0.0/8"), decision: &restrict.CrowdSecDecision{Type: restrict.DecisionBan}},
prefixEntry{prefix: netip.MustParsePrefix("192.168.0.0/16"), decision: &restrict.CrowdSecDecision{Type: restrict.DecisionBan}},
)
b.prefixes[netip.MustParsePrefix("10.0.0.0/8")] = &restrict.CrowdSecDecision{Type: restrict.DecisionBan}
b.prefixes[netip.MustParsePrefix("192.168.0.0/16")] = &restrict.CrowdSecDecision{Type: restrict.DecisionBan}
b.applyDeleted(makeDecisions(
decision{scope: "range", value: "10.0.0.0/8", dtype: "ban"},
))
require.Len(t, b.prefixes, 1)
assert.Equal(t, netip.MustParsePrefix("192.168.0.0/16"), b.prefixes[0].prefix)
assert.NotNil(t, b.prefixes[netip.MustParsePrefix("192.168.0.0/16")])
}
func TestBouncer_ApplyNew_OverwritesExisting(t *testing.T) {
@@ -229,8 +221,9 @@ func TestBouncer_StreamIntegration(t *testing.T) {
func newTestBouncer() *Bouncer {
return &Bouncer{
ips: make(map[netip.Addr]*restrict.CrowdSecDecision),
logger: log.NewEntry(log.StandardLogger()),
ips: make(map[netip.Addr]*restrict.CrowdSecDecision),
prefixes: make(map[netip.Prefix]*restrict.CrowdSecDecision),
logger: log.NewEntry(log.StandardLogger()),
}
}

View File

@@ -96,6 +96,7 @@ func (r *Registry) startLocked() {
func (r *Registry) stopLocked() {
r.bouncer.Stop()
r.cancel()
r.bouncer = nil
r.cancel = nil
r.logger.Info("CrowdSec bouncer stopped")

View File

@@ -385,6 +385,20 @@ func TestFilter_CrowdSec_CIDR_RunsBeforeCrowdSec(t *testing.T) {
assert.Equal(t, DenyCIDR, f.Check(netip.MustParseAddr("10.0.0.1"), nil))
}
func TestFilter_CrowdSec_Enforce_NilChecker(t *testing.T) {
// LAPI not configured: checker is nil but mode is enforce. Must fail closed.
f := ParseFilter(FilterConfig{CrowdSec: nil, CrowdSecMode: CrowdSecEnforce})
assert.Equal(t, DenyCrowdSecUnavailable, f.Check(netip.MustParseAddr("1.2.3.4"), nil))
}
func TestFilter_CrowdSec_Observe_NilChecker(t *testing.T) {
// LAPI not configured: checker is nil but mode is observe. Must allow.
f := ParseFilter(FilterConfig{CrowdSec: nil, CrowdSecMode: CrowdSecObserve})
assert.Equal(t, Allow, f.Check(netip.MustParseAddr("1.2.3.4"), nil))
}
func TestFilter_HasRestrictions_CrowdSec(t *testing.T) {
cs := &mockCrowdSec{ready: true}
f := ParseFilter(FilterConfig{CrowdSec: cs, CrowdSecMode: CrowdSecEnforce})

View File

@@ -634,11 +634,12 @@ func (r *Router) logL4Deny(route Route, conn net.Conn, verdict restrict.Verdict,
SourceIP: sourceIP,
DenyReason: verdict.String(),
}
if observeOnly {
entry.DenyReason = "crowdsec_observe"
}
if verdict.IsCrowdSec() {
entry.Metadata = map[string]string{"crowdsec_verdict": verdict.String()}
if observeOnly {
entry.Metadata["crowdsec_mode"] = "observe"
entry.DenyReason = ""
}
}
al.LogL4(entry)
}

View File

@@ -516,11 +516,12 @@ func (r *Relay) logDeny(clientIP netip.Addr, verdict restrict.Verdict, observeOn
SourceIP: clientIP,
DenyReason: verdict.String(),
}
if observeOnly {
entry.DenyReason = "crowdsec_observe"
}
if verdict.IsCrowdSec() {
entry.Metadata = map[string]string{"crowdsec_verdict": verdict.String()}
if observeOnly {
entry.Metadata["crowdsec_mode"] = "observe"
entry.DenyReason = ""
}
}
r.accessLog.LogL4(entry)
}