[client,management] add netflow support to client and update management (#3414)

adds NetFlow functionality to track and log network traffic information between peers, with features including:

- Flow logging for TCP, UDP, and ICMP traffic
- Integration with connection tracking system
- Resource ID tracking in NetFlow events
- DNS and exit node collection configuration
- Flow API and Redis cache in management
- Memory-based flow storage implementation
- Kernel conntrack counters and userspace counters
- TCP state machine improvements for more accurate tracking
- Migration from net.IP to netip.Addr in the userspace firewall
This commit is contained in:
Maycon Santos
2025-03-20 17:05:48 +01:00
committed by GitHub
parent f51e0b59bd
commit c02e236196
151 changed files with 7118 additions and 2234 deletions

113
management/server/cache/idp.go vendored Normal file
View File

@@ -0,0 +1,113 @@
package cache
import (
"context"
"fmt"
"time"
"github.com/eko/gocache/lib/v4/cache"
"github.com/eko/gocache/lib/v4/marshaler"
"github.com/eko/gocache/lib/v4/store"
"github.com/eko/gocache/store/redis/v4"
"github.com/vmihailenco/msgpack/v5"
"github.com/netbirdio/netbird/management/server/idp"
)
const (
DefaultIDPCacheExpirationMax = 7 * 24 * time.Hour // 7 days
DefaultIDPCacheExpirationMin = 3 * 24 * time.Hour // 3 days
DefaultIDPCacheCleanupInterval = 30 * time.Minute
)
// UserDataCache is an interface that wraps the basic Get, Set and Delete methods for idp.UserData objects.
type UserDataCache interface {
Get(ctx context.Context, key string) (*idp.UserData, error)
Set(ctx context.Context, key string, value *idp.UserData, expiration time.Duration) error
Delete(ctx context.Context, key string) error
}
// UserDataCacheImpl is a struct that implements the UserDataCache interface.
type UserDataCacheImpl struct {
cache Marshaler
}
func (u *UserDataCacheImpl) Get(ctx context.Context, key string) (*idp.UserData, error) {
v, err := u.cache.Get(ctx, key, new(idp.UserData))
if err != nil {
return nil, err
}
data := v.(*idp.UserData)
return data, nil
}
func (u *UserDataCacheImpl) Set(ctx context.Context, key string, value *idp.UserData, expiration time.Duration) error {
return u.cache.Set(ctx, key, value, store.WithExpiration(expiration))
}
func (u *UserDataCacheImpl) Delete(ctx context.Context, key string) error {
return u.cache.Delete(ctx, key)
}
// NewUserDataCache creates a new UserDataCacheImpl object.
func NewUserDataCache(store store.StoreInterface) *UserDataCacheImpl {
simpleCache := cache.New[any](store)
if store.GetType() == redis.RedisType {
m := marshaler.New(simpleCache)
return &UserDataCacheImpl{cache: m}
}
return &UserDataCacheImpl{cache: &marshalerWraper{simpleCache}}
}
// AccountUserDataCache wraps the basic Get, Set and Delete methods for []*idp.UserData objects.
type AccountUserDataCache struct {
cache Marshaler
}
func (a *AccountUserDataCache) Get(ctx context.Context, key string) ([]*idp.UserData, error) {
var m []*idp.UserData
v, err := a.cache.Get(ctx, key, &m)
if err != nil {
return nil, err
}
switch v := v.(type) {
case []*idp.UserData:
return v, nil
case *[]*idp.UserData:
return *v, nil
case []byte:
return unmarshalUserData(v)
}
return nil, fmt.Errorf("unexpected type: %T", v)
}
func unmarshalUserData(data []byte) ([]*idp.UserData, error) {
returnObj := &[]*idp.UserData{}
err := msgpack.Unmarshal(data, returnObj)
if err != nil {
return nil, err
}
return *returnObj, nil
}
func (a *AccountUserDataCache) Set(ctx context.Context, key string, value []*idp.UserData, expiration time.Duration) error {
return a.cache.Set(ctx, key, value, store.WithExpiration(expiration))
}
func (a *AccountUserDataCache) Delete(ctx context.Context, key string) error {
return a.cache.Delete(ctx, key)
}
// NewAccountUserDataCache creates a new AccountUserDataCache object.
func NewAccountUserDataCache(loadableFunc cache.LoadFunction[any], store store.StoreInterface) *AccountUserDataCache {
simpleCache := cache.New[any](store)
loadable := cache.NewLoadable[any](loadableFunc, simpleCache)
if store.GetType() == redis.RedisType {
m := marshaler.New(loadable)
return &AccountUserDataCache{cache: m}
}
return &AccountUserDataCache{cache: &marshalerWraper{loadable}}
}

135
management/server/cache/idp_test.go vendored Normal file
View File

@@ -0,0 +1,135 @@
package cache_test
import (
"context"
"os"
"testing"
"time"
"github.com/eko/gocache/lib/v4/store"
"github.com/redis/go-redis/v9"
"github.com/testcontainers/testcontainers-go"
testcontainersredis "github.com/testcontainers/testcontainers-go/modules/redis"
"github.com/vmihailenco/msgpack/v5"
"github.com/netbirdio/netbird/management/server/cache"
"github.com/netbirdio/netbird/management/server/idp"
)
func TestNewIDPCacheManagers(t *testing.T) {
tt := []struct {
name string
redis bool
}{
{"memory", false},
{"redis", true},
}
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
if tc.redis {
ctx := context.Background()
redisContainer, err := testcontainersredis.RunContainer(ctx, testcontainers.WithImage("redis:7"))
if err != nil {
t.Fatalf("couldn't start redis container: %s", err)
}
defer func() {
if err := redisContainer.Terminate(ctx); err != nil {
t.Logf("failed to terminate container: %s", err)
}
}()
redisURL, err := redisContainer.ConnectionString(ctx)
if err != nil {
t.Fatalf("couldn't get connection string: %s", err)
}
t.Setenv(cache.RedisStoreEnvVar, redisURL)
}
cacheStore, err := cache.NewStore(cache.DefaultIDPCacheExpirationMax, cache.DefaultIDPCacheCleanupInterval)
if err != nil {
t.Fatalf("couldn't create cache store: %s", err)
}
simple := cache.NewUserDataCache(cacheStore)
loadable := cache.NewAccountUserDataCache(loader, cacheStore)
ctx := context.Background()
value := &idp.UserData{ID: "v", Name: "vv"}
err = simple.Set(ctx, "key1", value, time.Minute)
if err != nil {
t.Errorf("couldn't set testing data: %s", err)
}
result, err := simple.Get(ctx, "key1")
if err != nil {
t.Errorf("couldn't get testing data: %s", err)
}
if value.ID != result.ID || value.Name != result.Name {
t.Errorf("value returned doesn't match testing data, got %v, expected %v", result, "value1")
}
values := []*idp.UserData{
{ID: "v2", Name: "v2v2"},
{ID: "v3", Name: "v3v3"},
{ID: "v4", Name: "v4v4"},
}
err = loadable.Set(ctx, "key2", values, time.Minute)
if err != nil {
t.Errorf("couldn't set testing data: %s", err)
}
result2, err := loadable.Get(ctx, "key2")
if err != nil {
t.Errorf("couldn't get testing data: %s", err)
}
if values[0].ID != result2[0].ID || values[0].Name != result2[0].Name {
t.Errorf("value returned doesn't match testing data, got %v, expected %v", result2[0], values[0])
}
if values[1].ID != result2[1].ID || values[1].Name != result2[1].Name {
t.Errorf("value returned doesn't match testing data, got %v, expected %v", result2[1], values[1])
}
// checking with direct store client
if tc.redis {
// wait for redis to sync
options, err := redis.ParseURL(os.Getenv(cache.RedisStoreEnvVar))
if err != nil {
t.Fatalf("parsing redis cache url: %s", err)
}
redisClient := redis.NewClient(options)
_, err = redisClient.Get(ctx, "loadKey").Result()
if err == nil {
t.Errorf("shouldn't find testing data from redis")
}
}
// testing loadable capability
result2, err = loadable.Get(ctx, "loadKey")
if err != nil {
t.Errorf("couldn't get testing data: %s", err)
}
if loadData[0].ID != result2[0].ID || loadData[0].Name != result2[0].Name {
t.Errorf("value returned doesn't match testing data, got %v, expected %v", result2[0], loadData[0])
}
if loadData[1].ID != result2[1].ID || loadData[1].Name != result2[1].Name {
t.Errorf("value returned doesn't match testing data, got %v, expected %v", result2[1], loadData[1])
}
})
}
}
var loadData = []*idp.UserData{
{ID: "a", Name: "aa"},
{ID: "b", Name: "bb"},
{ID: "c", Name: "cc"},
}
func loader(ctx context.Context, key any) (any, []store.Option, error) {
bytes, err := msgpack.Marshal(loadData)
if err != nil {
return nil, nil, err
}
return bytes, nil, nil
}

35
management/server/cache/marshaler.go vendored Normal file
View File

@@ -0,0 +1,35 @@
package cache
import (
"context"
"github.com/eko/gocache/lib/v4/store"
)
type Marshaler interface {
Get(ctx context.Context, key any, returnObj any) (any, error)
Set(ctx context.Context, key, object any, options ...store.Option) error
Delete(ctx context.Context, key any) error
}
type cacher[T any] interface {
Get(ctx context.Context, key any) (T, error)
Set(ctx context.Context, key any, object T, options ...store.Option) error
Delete(ctx context.Context, key any) error
}
type marshalerWraper struct {
cache cacher[any]
}
func (m marshalerWraper) Get(ctx context.Context, key any, _ any) (any, error) {
return m.cache.Get(ctx, key)
}
func (m marshalerWraper) Set(ctx context.Context, key, object any, options ...store.Option) error {
return m.cache.Set(ctx, key, object, options...)
}
func (m marshalerWraper) Delete(ctx context.Context, key any) error {
return m.cache.Delete(ctx, key)
}

50
management/server/cache/store.go vendored Normal file
View File

@@ -0,0 +1,50 @@
package cache
import (
"context"
"fmt"
"os"
"time"
"github.com/eko/gocache/lib/v4/store"
gocache_store "github.com/eko/gocache/store/go_cache/v4"
redis_store "github.com/eko/gocache/store/redis/v4"
gocache "github.com/patrickmn/go-cache"
"github.com/redis/go-redis/v9"
)
// RedisStoreEnvVar is the environment variable that determines if a redis store should be used.
// The value should follow redis URL format. https://github.com/redis/redis-specifications/blob/master/uri/redis.txt
const RedisStoreEnvVar = "NB_IDP_CACHE_REDIS_ADDRESS"
// NewStore creates a new cache store with the given max timeout and cleanup interval. It checks for the environment Variable RedisStoreEnvVar
// to determine if a redis store should be used. If the environment variable is set, it will attempt to connect to the redis store.
func NewStore(maxTimeout, cleanupInterval time.Duration) (store.StoreInterface, error) {
redisAddr := os.Getenv(RedisStoreEnvVar)
if redisAddr != "" {
return getRedisStore(redisAddr)
}
goc := gocache.New(maxTimeout, cleanupInterval)
return gocache_store.NewGoCache(goc), nil
}
func getRedisStore(redisEnvAddr string) (store.StoreInterface, error) {
options, err := redis.ParseURL(redisEnvAddr)
if err != nil {
return nil, fmt.Errorf("parsing redis cache url: %s", err)
}
options.MaxIdleConns = 6
options.MinIdleConns = 3
options.MaxActiveConns = 100
redisClient := redis.NewClient(options)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err = redisClient.Ping(ctx).Result()
if err != nil {
return nil, err
}
return redis_store.NewRedis(redisClient), nil
}

105
management/server/cache/store_test.go vendored Normal file
View File

@@ -0,0 +1,105 @@
package cache_test
import (
"context"
"testing"
"time"
"github.com/eko/gocache/lib/v4/store"
"github.com/redis/go-redis/v9"
"github.com/testcontainers/testcontainers-go"
testcontainersredis "github.com/testcontainers/testcontainers-go/modules/redis"
"github.com/netbirdio/netbird/management/server/cache"
)
func TestMemoryStore(t *testing.T) {
memStore, err := cache.NewStore(100*time.Millisecond, 300*time.Millisecond)
if err != nil {
t.Fatalf("couldn't create memory store: %s", err)
}
ctx := context.Background()
key, value := "testing", "tested"
err = memStore.Set(ctx, key, value)
if err != nil {
t.Errorf("couldn't set testing data: %s", err)
}
result, err := memStore.Get(ctx, key)
if err != nil {
t.Errorf("couldn't get testing data: %s", err)
}
if value != result.(string) {
t.Errorf("value returned doesn't match testing data, got %s, expected %s", result, value)
}
// test expiration
time.Sleep(300 * time.Millisecond)
_, err = memStore.Get(ctx, key)
if err == nil {
t.Error("value should not be found")
}
}
func TestRedisStoreConnectionFailure(t *testing.T) {
t.Setenv(cache.RedisStoreEnvVar, "redis://127.0.0.1:6379")
_, err := cache.NewStore(10*time.Millisecond, 30*time.Millisecond)
if err == nil {
t.Fatal("getting redis cache store should return error")
}
}
func TestRedisStoreConnectionSuccess(t *testing.T) {
ctx := context.Background()
redisContainer, err := testcontainersredis.RunContainer(ctx, testcontainers.WithImage("redis:7"))
if err != nil {
t.Fatalf("couldn't start redis container: %s", err)
}
defer func() {
if err := redisContainer.Terminate(ctx); err != nil {
t.Logf("failed to terminate container: %s", err)
}
}()
redisURL, err := redisContainer.ConnectionString(ctx)
if err != nil {
t.Fatalf("couldn't get connection string: %s", err)
}
t.Setenv(cache.RedisStoreEnvVar, redisURL)
redisStore, err := cache.NewStore(100*time.Millisecond, 300*time.Millisecond)
if err != nil {
t.Fatalf("couldn't create redis store: %s", err)
}
key, value := "testing", "tested"
err = redisStore.Set(ctx, key, value, store.WithExpiration(100*time.Millisecond))
if err != nil {
t.Errorf("couldn't set testing data: %s", err)
}
result, err := redisStore.Get(ctx, key)
if err != nil {
t.Errorf("couldn't get testing data: %s", err)
}
if value != result.(string) {
t.Errorf("value returned doesn't match testing data, got %s, expected %s", result, value)
}
options, err := redis.ParseURL(redisURL)
if err != nil {
t.Errorf("parsing redis cache url: %s", err)
}
redisClient := redis.NewClient(options)
r, e := redisClient.Get(ctx, key).Result()
if e != nil {
t.Errorf("couldn't get testing data from redis: %s", e)
}
if value != r {
t.Errorf("value returned from redis doesn't match testing data, got %s, expected %s", r, value)
}
// test expiration
time.Sleep(300 * time.Millisecond)
_, err = redisStore.Get(ctx, key)
if err == nil {
t.Error("value should not be found")
}
}