mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-25 01:09:54 +00:00
Compare commits
3 Commits
dmitri-eve
...
handle_syn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7de571e2f4 | ||
|
|
17b2044596 | ||
|
|
07101c59ac |
@@ -51,13 +51,20 @@ type cachedRecord struct {
|
||||
}
|
||||
|
||||
// Resolver caches critical NetBird infrastructure domains.
|
||||
// records, refreshing, mgmtDomain and serverDomains are all guarded by mutex.
|
||||
// records, refreshing, failedResolves, mgmtDomain and serverDomains are all
|
||||
// guarded by mutex.
|
||||
type Resolver struct {
|
||||
records map[dns.Question]*cachedRecord
|
||||
mgmtDomain *domain.Domain
|
||||
serverDomains *dnsconfig.ServerDomains
|
||||
mutex sync.RWMutex
|
||||
|
||||
// failedResolves records the last failed initial resolve per domain so a
|
||||
// domain that never resolves isn't retried on every server-domains update
|
||||
// until refreshBackoff elapses. Entries are cleared on success and pruned
|
||||
// to the current server-domains set.
|
||||
failedResolves map[domain.Domain]time.Time
|
||||
|
||||
chain ChainResolver
|
||||
chainMaxPriority int
|
||||
refreshGroup singleflight.Group
|
||||
@@ -76,9 +83,10 @@ type Resolver struct {
|
||||
// NewResolver creates a new management domains cache resolver.
|
||||
func NewResolver() *Resolver {
|
||||
return &Resolver{
|
||||
records: make(map[dns.Question]*cachedRecord),
|
||||
refreshing: make(map[dns.Question]*atomic.Bool),
|
||||
cacheTTL: resolveCacheTTL(),
|
||||
records: make(map[dns.Question]*cachedRecord),
|
||||
refreshing: make(map[dns.Question]*atomic.Bool),
|
||||
failedResolves: make(map[domain.Domain]time.Time),
|
||||
cacheTTL: resolveCacheTTL(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -173,7 +181,9 @@ func (m *Resolver) continueToNext(w dns.ResponseWriter, r *dns.Msg) {
|
||||
|
||||
// AddDomain resolves a domain and stores its A/AAAA records in the cache.
|
||||
// A family that resolves NODATA (nil err, zero records) evicts any stale
|
||||
// entry for that qtype.
|
||||
// entry for that qtype. When one family hard-errors while the other succeeds,
|
||||
// the resolved family is still cached but AddDomain returns an error so the
|
||||
// caller retries the incomplete resolve rather than treating it as complete.
|
||||
func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
|
||||
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
|
||||
|
||||
@@ -203,6 +213,10 @@ func (m *Resolver) AddDomain(ctx context.Context, d domain.Domain) error {
|
||||
log.Debugf("added/updated domain=%s with %d A records and %d AAAA records",
|
||||
d.SafeString(), len(aRecords), len(aaaaRecords))
|
||||
|
||||
if errA != nil || errAAAA != nil {
|
||||
return fmt.Errorf("resolve %s: incomplete, a family failed: %w", d.SafeString(), errors.Join(errA, errAAAA))
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -462,6 +476,7 @@ func (m *Resolver) RemoveDomain(d domain.Domain) error {
|
||||
delete(m.records, qAAAA)
|
||||
delete(m.refreshing, qA)
|
||||
delete(m.refreshing, qAAAA)
|
||||
delete(m.failedResolves, d)
|
||||
|
||||
log.Debugf("removed domain=%s from cache", d.SafeString())
|
||||
return nil
|
||||
@@ -505,6 +520,7 @@ func (m *Resolver) UpdateFromServerDomains(ctx context.Context, serverDomains dn
|
||||
allDomains := m.extractDomainsFromServerDomains(updatedServerDomains)
|
||||
currentDomains := m.GetCachedDomains()
|
||||
removedDomains = m.removeStaleDomains(currentDomains, allDomains)
|
||||
m.pruneFailedResolves(allDomains)
|
||||
}
|
||||
|
||||
m.addNewDomains(ctx, newDomains)
|
||||
@@ -577,13 +593,85 @@ func (m *Resolver) isManagementDomain(domain domain.Domain) bool {
|
||||
return m.mgmtDomain != nil && domain == *m.mgmtDomain
|
||||
}
|
||||
|
||||
// addNewDomains resolves and caches all domains from the update
|
||||
// addNewDomains resolves and caches domains that are not yet in the cache,
|
||||
// running the lookups concurrently. Domains already cached are skipped and left
|
||||
// to the stale-while-revalidate refresh path, so a sync never re-resolves them
|
||||
// synchronously: once NetBird owns the OS resolver the resolve runs through the
|
||||
// handler chain and would otherwise dial the managed upstreams under the engine
|
||||
// sync lock on every update.
|
||||
func (m *Resolver) addNewDomains(ctx context.Context, newDomains domain.List) {
|
||||
var wg sync.WaitGroup
|
||||
seen := make(map[domain.Domain]struct{}, len(newDomains))
|
||||
for _, newDomain := range newDomains {
|
||||
if err := m.AddDomain(ctx, newDomain); err != nil {
|
||||
log.Warnf("failed to add/update domain=%s: %v", newDomain.SafeString(), err)
|
||||
} else {
|
||||
log.Debugf("added/updated management cache domain=%s", newDomain.SafeString())
|
||||
if _, dup := seen[newDomain]; dup {
|
||||
continue
|
||||
}
|
||||
seen[newDomain] = struct{}{}
|
||||
|
||||
if !m.needsResolve(newDomain) {
|
||||
continue
|
||||
}
|
||||
|
||||
wg.Add(1)
|
||||
go func(d domain.Domain) {
|
||||
defer wg.Done()
|
||||
if err := m.AddDomain(ctx, d); err != nil {
|
||||
m.markResolveFailed(d)
|
||||
log.Warnf("failed to add/update domain=%s: %v", d.SafeString(), err)
|
||||
return
|
||||
}
|
||||
m.clearResolveFailed(d)
|
||||
log.Debugf("added/updated management cache domain=%s", d.SafeString())
|
||||
}(newDomain)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// needsResolve reports whether d should be resolved now. A recent failed or
|
||||
// incomplete resolve gates retries on the backoff even when one family is
|
||||
// already cached, so a transiently-failed family is retried instead of being
|
||||
// treated as fully resolved. Otherwise a domain with any cached record is left
|
||||
// to the stale-while-revalidate refresh path.
|
||||
func (m *Resolver) needsResolve(d domain.Domain) bool {
|
||||
dnsName := strings.ToLower(dns.Fqdn(d.PunycodeString()))
|
||||
|
||||
m.mutex.RLock()
|
||||
defer m.mutex.RUnlock()
|
||||
|
||||
if failedAt, ok := m.failedResolves[d]; ok {
|
||||
return time.Since(failedAt) >= refreshBackoff
|
||||
}
|
||||
|
||||
for _, qtype := range []uint16{dns.TypeA, dns.TypeAAAA} {
|
||||
q := dns.Question{Name: dnsName, Qtype: qtype, Qclass: dns.ClassINET}
|
||||
if _, ok := m.records[q]; ok {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (m *Resolver) markResolveFailed(d domain.Domain) {
|
||||
m.mutex.Lock()
|
||||
m.failedResolves[d] = time.Now()
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
func (m *Resolver) clearResolveFailed(d domain.Domain) {
|
||||
m.mutex.Lock()
|
||||
delete(m.failedResolves, d)
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
// pruneFailedResolves drops failure markers for domains no longer present in
|
||||
// the server-domains set, keeping the map bounded to the current set (a
|
||||
// failed-only domain has no cached record, so RemoveDomain never sees it).
|
||||
func (m *Resolver) pruneFailedResolves(domains domain.List) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
for d := range m.failedResolves {
|
||||
if !slices.Contains(domains, d) {
|
||||
delete(m.failedResolves, d)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,6 +21,7 @@ type fakeChain struct {
|
||||
mu sync.Mutex
|
||||
calls map[string]int
|
||||
answers map[string][]dns.RR
|
||||
qErr map[string]error
|
||||
err error
|
||||
hasRoot bool
|
||||
onLookup func()
|
||||
@@ -30,6 +31,7 @@ func newFakeChain() *fakeChain {
|
||||
return &fakeChain{
|
||||
calls: map[string]int{},
|
||||
answers: map[string][]dns.RR{},
|
||||
qErr: map[string]error{},
|
||||
hasRoot: true,
|
||||
}
|
||||
}
|
||||
@@ -47,6 +49,9 @@ func (f *fakeChain) ResolveInternal(ctx context.Context, msg *dns.Msg, maxPriori
|
||||
f.calls[key]++
|
||||
answers := f.answers[key]
|
||||
err := f.err
|
||||
if err == nil {
|
||||
err = f.qErr[key]
|
||||
}
|
||||
onLookup := f.onLookup
|
||||
f.mu.Unlock()
|
||||
|
||||
@@ -75,6 +80,12 @@ func (f *fakeChain) setAnswer(name string, qtype uint16, ip string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (f *fakeChain) setErr(name string, qtype uint16, err error) {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
f.qErr[name+"|"+dns.TypeToString[qtype]] = err
|
||||
}
|
||||
|
||||
func (f *fakeChain) callCount(name string, qtype uint16) int {
|
||||
f.mu.Lock()
|
||||
defer f.mu.Unlock()
|
||||
|
||||
183
client/internal/dns/mgmt/mgmt_resolve_test.go
Normal file
183
client/internal/dns/mgmt/mgmt_resolve_test.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package mgmt
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
dnsconfig "github.com/netbirdio/netbird/client/internal/dns/config"
|
||||
"github.com/netbirdio/netbird/shared/management/domain"
|
||||
)
|
||||
|
||||
// A domain already in the cache must not be re-resolved on a subsequent server
|
||||
// domains update; it is left to the stale-while-revalidate refresh path.
|
||||
func TestResolver_UpdateFromServerDomains_SkipsCached(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("signal.example.com.", dns.TypeA, "10.0.0.2")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"first update must resolve the domain")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"cached domain must not be re-resolved on a subsequent update")
|
||||
}
|
||||
|
||||
// New domains in a single update must resolve concurrently rather than serially.
|
||||
func TestResolver_AddNewDomains_ResolvesConcurrently(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
|
||||
var inflight, maxInflight atomic.Int32
|
||||
chain.onLookup = func() {
|
||||
n := inflight.Add(1)
|
||||
for {
|
||||
old := maxInflight.Load()
|
||||
if n <= old || maxInflight.CompareAndSwap(old, n) {
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
inflight.Add(-1)
|
||||
}
|
||||
|
||||
relays := []domain.Domain{"a.example.com", "b.example.com", "c.example.com", "d.example.com"}
|
||||
for _, d := range relays {
|
||||
chain.setAnswer(dns.Fqdn(string(d)), dns.TypeA, "10.0.0.2")
|
||||
}
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
start := time.Now()
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: relays})
|
||||
require.NoError(t, err)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
assert.GreaterOrEqual(t, int(maxInflight.Load()), 2, "domains must resolve concurrently")
|
||||
// Serial resolution of 4 domains would take at least 4*50ms; concurrent is far less.
|
||||
assert.Less(t, elapsed, 300*time.Millisecond, "resolution should not be serial")
|
||||
}
|
||||
|
||||
// A domain that fails to resolve must not be retried on every update; the
|
||||
// failure backoff suppresses re-resolution until it expires.
|
||||
func TestResolver_UpdateFromServerDomains_BacksOffFailures(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.err = errors.New("resolve boom")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{Signal: domain.Domain("signal.example.com")}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"first update must attempt the resolve")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("signal.example.com.", dns.TypeA),
|
||||
"failed resolve must back off and not retry on the next update")
|
||||
}
|
||||
|
||||
// A domain listed under more than one server-domain type (e.g. STUN and TURN on
|
||||
// the same host) must be resolved once per update, not once per occurrence.
|
||||
func TestResolver_AddNewDomains_DedupesDuplicateDomains(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("dup.example.com.", dns.TypeA, "10.0.0.9")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
sd := dnsconfig.ServerDomains{
|
||||
Stuns: []domain.Domain{"dup.example.com"},
|
||||
Turns: []domain.Domain{"dup.example.com"},
|
||||
}
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), sd)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 1, chain.callCount("dup.example.com.", dns.TypeA),
|
||||
"a domain appearing under multiple server-domain types must resolve once")
|
||||
}
|
||||
|
||||
// A failure marker must be dropped once its domain leaves the server-domains set
|
||||
// so the map stays bounded to the current set.
|
||||
func TestResolver_UpdateFromServerDomains_PrunesFailedResolves(t *testing.T) {
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.err = errors.New("resolve boom")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("gone.example.com")})
|
||||
require.NoError(t, err)
|
||||
r.mutex.RLock()
|
||||
_, marked := r.failedResolves[domain.Domain("gone.example.com")]
|
||||
r.mutex.RUnlock()
|
||||
require.True(t, marked, "failed resolve must be recorded")
|
||||
|
||||
_, err = r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Signal: domain.Domain("other.example.com")})
|
||||
require.NoError(t, err)
|
||||
r.mutex.RLock()
|
||||
_, stillMarked := r.failedResolves[domain.Domain("gone.example.com")]
|
||||
r.mutex.RUnlock()
|
||||
assert.False(t, stillMarked, "failure marker for a domain no longer in the set must be pruned")
|
||||
}
|
||||
|
||||
// When one family hard-errors while the other resolves, the domain is cached
|
||||
// for the working family but recorded as incomplete so the failed family is
|
||||
// retried under backoff instead of being treated as fully resolved forever.
|
||||
func TestResolver_AddNewDomains_RetriesPartialFamilyFailure(t *testing.T) {
|
||||
d := domain.Domain("relay.example.com")
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("relay.example.com.", dns.TypeA, "10.0.0.2")
|
||||
chain.setErr("relay.example.com.", dns.TypeAAAA, errors.New("servfail"))
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
|
||||
require.NoError(t, err)
|
||||
|
||||
r.mutex.RLock()
|
||||
_, aCached := r.records[dns.Question{Name: "relay.example.com.", Qtype: dns.TypeA, Qclass: dns.ClassINET}]
|
||||
_, marked := r.failedResolves[d]
|
||||
r.mutex.RUnlock()
|
||||
require.True(t, aCached, "the working family must still be cached")
|
||||
require.True(t, marked, "a partial failure must be recorded so the failed family is retried")
|
||||
|
||||
assert.False(t, r.needsResolve(d), "within the backoff window the domain is not retried")
|
||||
|
||||
r.mutex.Lock()
|
||||
r.failedResolves[d] = time.Now().Add(-2 * refreshBackoff)
|
||||
r.mutex.Unlock()
|
||||
assert.True(t, r.needsResolve(d), "after the backoff elapses the domain is retried to pick up the missing family")
|
||||
}
|
||||
|
||||
// A family that returns NODATA (legitimately absent, e.g. an IPv4-only host) is
|
||||
// not a failure: the domain must not be marked for retry, otherwise it would be
|
||||
// re-resolved on every sync.
|
||||
func TestResolver_AddNewDomains_NodataIsNotFailure(t *testing.T) {
|
||||
d := domain.Domain("v4only.example.com")
|
||||
r := NewResolver()
|
||||
chain := newFakeChain()
|
||||
chain.setAnswer("v4only.example.com.", dns.TypeA, "10.0.0.2")
|
||||
r.SetChainResolver(chain, 50)
|
||||
|
||||
_, err := r.UpdateFromServerDomains(context.Background(), dnsconfig.ServerDomains{Relay: []domain.Domain{d}})
|
||||
require.NoError(t, err)
|
||||
|
||||
r.mutex.RLock()
|
||||
_, marked := r.failedResolves[d]
|
||||
r.mutex.RUnlock()
|
||||
assert.False(t, marked, "a NODATA family must not be recorded as a failure")
|
||||
assert.False(t, r.needsResolve(d), "an IPv4-only host must not be re-resolved on later syncs")
|
||||
}
|
||||
@@ -895,6 +895,16 @@ func (e *Engine) handleAutoUpdateVersion(autoUpdateSettings *mgmProto.AutoUpdate
|
||||
e.updateManager.SetVersion(autoUpdateSettings.Version, autoUpdateSettings.AlwaysUpdate)
|
||||
}
|
||||
|
||||
// phase times a sync sub-phase: it returns a function that records the elapsed
|
||||
// duration when called. Starting the timer at the call site keeps inter-phase
|
||||
// glue code out of the measurement.
|
||||
func (e *Engine) phase(name string) func() {
|
||||
start := time.Now()
|
||||
return func() {
|
||||
e.clientMetrics.RecordSyncPhase(e.ctx, name, time.Since(start))
|
||||
}
|
||||
}
|
||||
|
||||
func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
started := time.Now()
|
||||
defer func() {
|
||||
@@ -914,9 +924,11 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
e.handleAutoUpdateVersion(update.NetworkMap.PeerConfig.AutoUpdate)
|
||||
}
|
||||
|
||||
done := e.phase("netbird_config")
|
||||
if err := e.updateNetbirdConfig(update.GetNetbirdConfig()); err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
// Posture checks are bound to the network map presence:
|
||||
// NetworkMap != nil, checks present -> apply the received checks
|
||||
@@ -928,11 +940,15 @@ func (e *Engine) handleSync(update *mgmProto.SyncResponse) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
done = e.phase("checks")
|
||||
if err := e.updateChecksIfNew(update.Checks); err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("persist")
|
||||
e.persistSyncResponse(update)
|
||||
done()
|
||||
|
||||
// only apply new changes and ignore old ones
|
||||
if err := e.updateNetworkMap(nm); err != nil {
|
||||
@@ -1357,13 +1373,16 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
|
||||
dnsConfig := toDNSConfig(protoDNSConfig, e.wgInterface.Address())
|
||||
|
||||
done := e.phase("dns_server")
|
||||
if err := e.dnsServer.UpdateDNSServer(serial, dnsConfig); err != nil {
|
||||
log.Errorf("failed to update dns server, err: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
e.routeManager.SetDNSForwarderPort(dnsConfig.ForwarderPort)
|
||||
|
||||
// apply routes first, route related actions might depend on routing being enabled
|
||||
done = e.phase("routes_classify")
|
||||
routes := toRoutes(networkMap.GetRoutes())
|
||||
serverRoutes, clientRoutes := e.routeManager.ClassifyRoutes(routes)
|
||||
|
||||
@@ -1372,28 +1391,39 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
e.connMgr.UpdateRouteHAMap(clientRoutes)
|
||||
log.Debugf("updated lazy connection manager with %d HA groups", len(clientRoutes))
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("routes_apply")
|
||||
dnsRouteFeatureFlag := toDNSFeatureFlag(networkMap)
|
||||
if err := e.routeManager.UpdateRoutes(serial, serverRoutes, clientRoutes, dnsRouteFeatureFlag); err != nil {
|
||||
log.Errorf("failed to update routes: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("filtering")
|
||||
if e.acl != nil {
|
||||
e.acl.ApplyFiltering(networkMap, dnsRouteFeatureFlag)
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("dns_forwarder")
|
||||
fwdEntries := toRouteDomains(e.config.WgPrivateKey.PublicKey().String(), routes)
|
||||
e.updateDNSForwarder(dnsRouteFeatureFlag, fwdEntries)
|
||||
done()
|
||||
|
||||
// Ingress forward rules
|
||||
done = e.phase("forward_rules")
|
||||
forwardingRules, err := e.updateForwardRules(networkMap.GetForwardingRules())
|
||||
if err != nil {
|
||||
log.Errorf("failed to update forward rules, err: %v", err)
|
||||
}
|
||||
done()
|
||||
|
||||
log.Debugf("got peers update from Management Service, total peers to connect to = %d", len(networkMap.GetRemotePeers()))
|
||||
|
||||
done = e.phase("offline_peers")
|
||||
e.updateOfflinePeers(networkMap.GetOfflinePeers())
|
||||
done()
|
||||
|
||||
// Filter out own peer from the remote peers list
|
||||
localPubKey := e.config.WgPrivateKey.PublicKey().String()
|
||||
@@ -1412,20 +1442,26 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
done = e.phase("removed_peers")
|
||||
err := e.removePeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("modified_peers")
|
||||
err = e.modifyPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
done = e.phase("added_peers")
|
||||
err = e.addNewPeers(remotePeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
done()
|
||||
|
||||
e.statusRecorder.FinishPeerListModifications()
|
||||
|
||||
@@ -1439,8 +1475,10 @@ func (e *Engine) updateNetworkMap(networkMap *mgmProto.NetworkMap) error {
|
||||
}
|
||||
|
||||
// must set the exclude list after the peers are added. Without it the manager can not figure out the peers parameters from the store
|
||||
done = e.phase("lazy_exclude")
|
||||
excludedLazyPeers := e.toExcludedLazyPeers(forwardingRules, remotePeers)
|
||||
e.connMgr.SetExcludeList(e.ctx, excludedLazyPeers)
|
||||
done()
|
||||
|
||||
e.networkSerial = serial
|
||||
|
||||
|
||||
@@ -120,6 +120,30 @@ func (m *influxDBMetrics) RecordSyncDuration(_ context.Context, agentInfo AgentI
|
||||
m.trimLocked()
|
||||
}
|
||||
|
||||
func (m *influxDBMetrics) RecordSyncPhase(_ context.Context, agentInfo AgentInfo, phase string, duration time.Duration) {
|
||||
tags := fmt.Sprintf("deployment_type=%s,version=%s,os=%s,arch=%s,peer_id=%s,phase=%s",
|
||||
agentInfo.DeploymentType.String(),
|
||||
agentInfo.Version,
|
||||
agentInfo.OS,
|
||||
agentInfo.Arch,
|
||||
agentInfo.peerID,
|
||||
phase,
|
||||
)
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
m.samples = append(m.samples, influxSample{
|
||||
measurement: "netbird_sync_phase",
|
||||
tags: tags,
|
||||
fields: map[string]float64{
|
||||
"duration_seconds": duration.Seconds(),
|
||||
},
|
||||
timestamp: time.Now(),
|
||||
})
|
||||
m.trimLocked()
|
||||
}
|
||||
|
||||
func (m *influxDBMetrics) RecordLoginDuration(_ context.Context, agentInfo AgentInfo, duration time.Duration, success bool) {
|
||||
result := "success"
|
||||
if !success {
|
||||
|
||||
@@ -0,0 +1,259 @@
|
||||
{
|
||||
"annotations": {
|
||||
"list": []
|
||||
},
|
||||
"editable": true,
|
||||
"fiscalYearStartMonth": 0,
|
||||
"graphTooltip": 1,
|
||||
"links": [],
|
||||
"refresh": "",
|
||||
"schemaVersion": 39,
|
||||
"tags": [
|
||||
"netbird",
|
||||
"sync"
|
||||
],
|
||||
"templating": {
|
||||
"list": [
|
||||
{
|
||||
"current": {
|
||||
"text": "All",
|
||||
"value": "$__all"
|
||||
},
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"definition": "import \"influxdata/influxdb/schema\"\nschema.tagValues(bucket: \"metrics\", tag: \"version\")",
|
||||
"includeAll": true,
|
||||
"label": "version",
|
||||
"multi": true,
|
||||
"name": "version",
|
||||
"query": "import \"influxdata/influxdb/schema\"\nschema.tagValues(bucket: \"metrics\", tag: \"version\")",
|
||||
"refresh": 2,
|
||||
"type": "query",
|
||||
"allValue": ".*"
|
||||
}
|
||||
]
|
||||
},
|
||||
"time": {
|
||||
"from": "now-2d",
|
||||
"to": "now"
|
||||
},
|
||||
"timepicker": {},
|
||||
"timezone": "",
|
||||
"title": "NetBird Sync Phases (where time goes)",
|
||||
"uid": "netbird-sync-phases",
|
||||
"version": 1,
|
||||
"panels": [
|
||||
{
|
||||
"id": 1,
|
||||
"title": "Time per phase over time (stacked, ms)",
|
||||
"type": "timeseries",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 10,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 0
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms",
|
||||
"custom": {
|
||||
"drawStyle": "bars",
|
||||
"stacking": {
|
||||
"mode": "normal",
|
||||
"group": "A"
|
||||
},
|
||||
"fillOpacity": 80,
|
||||
"lineWidth": 0
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": {
|
||||
"displayMode": "table",
|
||||
"placement": "right",
|
||||
"calcs": [
|
||||
"max",
|
||||
"mean"
|
||||
]
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "multi",
|
||||
"sort": "desc"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> keep(columns: [\"_time\", \"_value\", \"phase\"])\n |> group(columns: [\"phase\"])"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 2,
|
||||
"title": "p95 per phase (ms)",
|
||||
"type": "bargauge",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 11,
|
||||
"w": 12,
|
||||
"x": 0,
|
||||
"y": 10
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms",
|
||||
"color": {
|
||||
"mode": "continuous-GrYlRd"
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"displayMode": "gradient",
|
||||
"orientation": "horizontal",
|
||||
"reduceOptions": {
|
||||
"calcs": [
|
||||
"lastNotNull"
|
||||
],
|
||||
"fields": "",
|
||||
"values": false
|
||||
},
|
||||
"showUnfilled": true
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> quantile(q: 0.95)\n |> group()\n |> sort(columns: [\"_value\"], desc: true)"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 3,
|
||||
"title": "Per-phase stats (ms): mean / p95 / max",
|
||||
"type": "table",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 11,
|
||||
"w": 12,
|
||||
"x": 12,
|
||||
"y": 10
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms"
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"showHeader": true,
|
||||
"sortBy": [
|
||||
{
|
||||
"displayName": "max",
|
||||
"desc": true
|
||||
}
|
||||
]
|
||||
},
|
||||
"transformations": [
|
||||
{
|
||||
"id": "merge",
|
||||
"options": {}
|
||||
}
|
||||
],
|
||||
"targets": [
|
||||
{
|
||||
"refId": "mean",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> mean()\n |> group()\n |> keep(columns: [\"phase\", \"_value\"])\n |> rename(columns: {_value: \"mean\"})"
|
||||
},
|
||||
{
|
||||
"refId": "p95",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> quantile(q: 0.95)\n |> group()\n |> keep(columns: [\"phase\", \"_value\"])\n |> rename(columns: {_value: \"p95\"})"
|
||||
},
|
||||
{
|
||||
"refId": "max",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync_phase\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> group(columns: [\"phase\"])\n |> max()\n |> group()\n |> keep(columns: [\"phase\", \"_value\"])\n |> rename(columns: {_value: \"max\"})"
|
||||
}
|
||||
]
|
||||
},
|
||||
{
|
||||
"id": 4,
|
||||
"title": "Total sync duration (netbird_sync, ms) \u2014 reference",
|
||||
"type": "timeseries",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"gridPos": {
|
||||
"h": 8,
|
||||
"w": 24,
|
||||
"x": 0,
|
||||
"y": 21
|
||||
},
|
||||
"fieldConfig": {
|
||||
"defaults": {
|
||||
"unit": "ms",
|
||||
"custom": {
|
||||
"drawStyle": "points",
|
||||
"pointSize": 5
|
||||
}
|
||||
},
|
||||
"overrides": []
|
||||
},
|
||||
"options": {
|
||||
"legend": {
|
||||
"displayMode": "table",
|
||||
"placement": "right",
|
||||
"calcs": [
|
||||
"max",
|
||||
"mean"
|
||||
]
|
||||
},
|
||||
"tooltip": {
|
||||
"mode": "single"
|
||||
}
|
||||
},
|
||||
"targets": [
|
||||
{
|
||||
"refId": "A",
|
||||
"datasource": {
|
||||
"type": "influxdb",
|
||||
"uid": "influxdb"
|
||||
},
|
||||
"query": "from(bucket: \"metrics\")\n |> range(start: v.timeRangeStart, stop: v.timeRangeStop)\n |> filter(fn: (r) => r._measurement == \"netbird_sync\" and r._field == \"duration_seconds\")\n |> filter(fn: (r) => r.version =~ /${version:regex}/)\n |> map(fn: (r) => ({ r with _value: r._value * 1000.0 }))\n |> keep(columns: [\"_time\", \"_value\", \"version\"])\n |> group(columns: [\"version\"])"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
@@ -59,6 +59,19 @@ var allowedMeasurements = map[string]measurementSpec{
|
||||
"peer_id": true,
|
||||
},
|
||||
},
|
||||
"netbird_sync_phase": {
|
||||
allowedFields: map[string]bool{
|
||||
"duration_seconds": true,
|
||||
},
|
||||
allowedTags: map[string]bool{
|
||||
"deployment_type": true,
|
||||
"version": true,
|
||||
"os": true,
|
||||
"arch": true,
|
||||
"peer_id": true,
|
||||
"phase": true,
|
||||
},
|
||||
},
|
||||
"netbird_login": {
|
||||
allowedFields: map[string]bool{
|
||||
"duration_seconds": true,
|
||||
|
||||
@@ -56,6 +56,9 @@ type metricsImplementation interface {
|
||||
// RecordSyncDuration records how long it took to process a sync message
|
||||
RecordSyncDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration)
|
||||
|
||||
// RecordSyncPhase records how long a single sub-phase of sync processing took
|
||||
RecordSyncPhase(ctx context.Context, agentInfo AgentInfo, phase string, duration time.Duration)
|
||||
|
||||
// RecordLoginDuration records how long the login to management took
|
||||
RecordLoginDuration(ctx context.Context, agentInfo AgentInfo, duration time.Duration, success bool)
|
||||
|
||||
@@ -127,6 +130,18 @@ func (c *ClientMetrics) RecordSyncDuration(ctx context.Context, duration time.Du
|
||||
c.impl.RecordSyncDuration(ctx, agentInfo, duration)
|
||||
}
|
||||
|
||||
// RecordSyncPhase records the duration of a single sub-phase of sync processing
|
||||
func (c *ClientMetrics) RecordSyncPhase(ctx context.Context, phase string, duration time.Duration) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
c.mu.RLock()
|
||||
agentInfo := c.agentInfo
|
||||
c.mu.RUnlock()
|
||||
|
||||
c.impl.RecordSyncPhase(ctx, agentInfo, phase, duration)
|
||||
}
|
||||
|
||||
// RecordLoginDuration records how long the login to management server took
|
||||
func (c *ClientMetrics) RecordLoginDuration(ctx context.Context, duration time.Duration, success bool) {
|
||||
if c == nil {
|
||||
|
||||
@@ -70,6 +70,9 @@ func (m *mockMetrics) RecordConnectionStages(_ context.Context, _ AgentInfo, _ s
|
||||
func (m *mockMetrics) RecordSyncDuration(_ context.Context, _ AgentInfo, _ time.Duration) {
|
||||
}
|
||||
|
||||
func (m *mockMetrics) RecordSyncPhase(_ context.Context, _ AgentInfo, _ string, _ time.Duration) {
|
||||
}
|
||||
|
||||
func (m *mockMetrics) RecordLoginDuration(_ context.Context, _ AgentInfo, _ time.Duration, _ bool) {
|
||||
}
|
||||
|
||||
|
||||
@@ -27,7 +27,7 @@ type Logger struct {
|
||||
wgIfaceNetV6 netip.Prefix
|
||||
dnsCollection atomic.Bool
|
||||
exitNodeCollection atomic.Bool
|
||||
Store types.AggregatingStore
|
||||
Store types.Store
|
||||
}
|
||||
|
||||
func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix) *Logger {
|
||||
@@ -35,7 +35,7 @@ func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix)
|
||||
statusRecorder: statusRecorder,
|
||||
wgIfaceNet: wgIfaceIPNet,
|
||||
wgIfaceNetV6: wgIfaceIPNetV6,
|
||||
Store: store.NewAggregatingMemoryStore(),
|
||||
Store: store.NewMemoryStore(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,10 +125,6 @@ func (l *Logger) stop() {
|
||||
l.mux.Unlock()
|
||||
}
|
||||
|
||||
func (l *Logger) ResetAggregationWindow() types.FlowEventAggregator {
|
||||
return l.Store.ResetAggregationWindow()
|
||||
}
|
||||
|
||||
func (l *Logger) GetEvents() []*types.Event {
|
||||
return l.Store.GetEvents()
|
||||
}
|
||||
|
||||
@@ -9,14 +9,12 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/conntrack"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/logger"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/store"
|
||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
"github.com/netbirdio/netbird/flow/client"
|
||||
@@ -25,16 +23,14 @@ import (
|
||||
|
||||
// Manager handles netflow tracking and logging
|
||||
type Manager struct {
|
||||
mux sync.Mutex
|
||||
shutdownWg sync.WaitGroup
|
||||
logger nftypes.FlowLogger
|
||||
flowConfig *nftypes.FlowConfig
|
||||
conntrack nftypes.ConnTracker
|
||||
receiverClient *client.GRPCClient
|
||||
eventsWithoutAcks nftypes.Store
|
||||
publicKey []byte
|
||||
cancel context.CancelFunc
|
||||
retryInterval time.Duration
|
||||
mux sync.Mutex
|
||||
shutdownWg sync.WaitGroup
|
||||
logger nftypes.FlowLogger
|
||||
flowConfig *nftypes.FlowConfig
|
||||
conntrack nftypes.ConnTracker
|
||||
receiverClient *client.GRPCClient
|
||||
publicKey []byte
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewManager creates a new netflow manager
|
||||
@@ -52,11 +48,9 @@ func NewManager(iface nftypes.IFaceMapper, publicKey []byte, statusRecorder *pee
|
||||
}
|
||||
|
||||
return &Manager{
|
||||
logger: flowLogger,
|
||||
conntrack: ct,
|
||||
publicKey: publicKey,
|
||||
retryInterval: time.Second,
|
||||
eventsWithoutAcks: store.NewMemoryStore(),
|
||||
logger: flowLogger,
|
||||
conntrack: ct,
|
||||
publicKey: publicKey,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +66,6 @@ func (m *Manager) needsNewClient(previous *nftypes.FlowConfig) bool {
|
||||
}
|
||||
|
||||
// enableFlow starts components for flow tracking
|
||||
// must be called under m.mux lock
|
||||
func (m *Manager) enableFlow(previous *nftypes.FlowConfig) error {
|
||||
// first make sender ready so events don't pile up
|
||||
if m.needsNewClient(previous) {
|
||||
@@ -92,7 +85,6 @@ func (m *Manager) enableFlow(previous *nftypes.FlowConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// must be called under m.mux lock
|
||||
func (m *Manager) resetClient() error {
|
||||
if m.receiverClient != nil {
|
||||
if err := m.receiverClient.Close(); err != nil {
|
||||
@@ -115,19 +107,14 @@ func (m *Manager) resetClient() error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
m.cancel = cancel
|
||||
|
||||
m.shutdownWg.Add(3)
|
||||
flowConfigInterval := m.flowConfig.Interval
|
||||
m.shutdownWg.Add(2)
|
||||
go func() {
|
||||
defer m.shutdownWg.Done()
|
||||
m.receiveACKs(ctx, flowClient, flowConfigInterval)
|
||||
m.receiveACKs(ctx, flowClient)
|
||||
}()
|
||||
go func() {
|
||||
defer m.shutdownWg.Done()
|
||||
m.startSender(ctx, flowConfigInterval)
|
||||
}()
|
||||
go func() {
|
||||
defer m.shutdownWg.Done()
|
||||
m.startRetries(ctx, flowConfigInterval)
|
||||
m.startSender(ctx)
|
||||
}()
|
||||
|
||||
return nil
|
||||
@@ -211,8 +198,8 @@ func (m *Manager) GetLogger() nftypes.FlowLogger {
|
||||
return m.logger
|
||||
}
|
||||
|
||||
func (m *Manager) startSender(ctx context.Context, flowConfigInterval time.Duration) {
|
||||
ticker := time.NewTicker(flowConfigInterval)
|
||||
func (m *Manager) startSender(ctx context.Context) {
|
||||
ticker := time.NewTicker(m.flowConfig.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
@@ -220,29 +207,27 @@ func (m *Manager) startSender(ctx context.Context, flowConfigInterval time.Durat
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
collectedEvents := m.logger.ResetAggregationWindow()
|
||||
events := collectedEvents.GetAggregatedEvents()
|
||||
events := m.logger.GetEvents()
|
||||
for _, event := range events {
|
||||
if err := m.send(event); err != nil {
|
||||
log.Errorf("failed to send flow event to server: %v", err)
|
||||
} else {
|
||||
log.Tracef("sent flow event: %s", event.ID)
|
||||
continue
|
||||
}
|
||||
m.eventsWithoutAcks.StoreEvent(event)
|
||||
log.Tracef("sent flow event: %s", event.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient, flowConfigInterval time.Duration) {
|
||||
err := client.Receive(ctx, flowConfigInterval, func(ack *proto.FlowEventAck) error {
|
||||
func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
|
||||
err := client.Receive(ctx, m.flowConfig.Interval, func(ack *proto.FlowEventAck) error {
|
||||
id, err := uuid.FromBytes(ack.EventId)
|
||||
if err != nil {
|
||||
log.Warnf("failed to convert ack event id to uuid: %v", err)
|
||||
return nil
|
||||
}
|
||||
log.Tracef("received flow event ack: %s", id)
|
||||
m.eventsWithoutAcks.DeleteEvents([]uuid.UUID{id})
|
||||
m.logger.DeleteEvents([]uuid.UUID{id})
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -251,43 +236,6 @@ func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient, fl
|
||||
}
|
||||
}
|
||||
|
||||
// We effectively never drop events (see MaxInterval), which makes eventsWithoutAcks unbounded.
|
||||
// We may want to limit the max size of the store, and start dropping oldest events when the threshold is reached.
|
||||
func (m *Manager) startRetries(ctx context.Context, flowConfigInterval time.Duration) {
|
||||
timer := time.NewTimer(m.retryInterval)
|
||||
retryBackoff := backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 1 * time.Second,
|
||||
RandomizationFactor: 0.5,
|
||||
Multiplier: 1.7,
|
||||
MaxInterval: flowConfigInterval / 2,
|
||||
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
for _, e := range m.eventsWithoutAcks.GetEvents() {
|
||||
if e.Timestamp.Add(time.Second).After(time.Now()) {
|
||||
// grace period on retries to avoid early retries
|
||||
// do not retry if the event is less than 1 sec old
|
||||
continue
|
||||
}
|
||||
if err := m.send(e); err != nil {
|
||||
timer = time.NewTimer(retryBackoff.NextBackOff()) //nolint:staticcheck,wastedassign
|
||||
break
|
||||
}
|
||||
}
|
||||
retryBackoff.Reset()
|
||||
timer = time.NewTimer(m.retryInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) send(event *nftypes.Event) error {
|
||||
m.mux.Lock()
|
||||
client := m.receiverClient
|
||||
@@ -302,11 +250,9 @@ func (m *Manager) send(event *nftypes.Event) error {
|
||||
|
||||
func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
|
||||
protoEvent := &proto.FlowEvent{
|
||||
EventId: event.ID[:],
|
||||
Timestamp: timestamppb.New(event.Timestamp),
|
||||
PublicKey: publicKey,
|
||||
WindowStart: timestamppb.New(event.WindowStart),
|
||||
WindowEnd: timestamppb.New(event.WindowEnd),
|
||||
EventId: event.ID[:],
|
||||
Timestamp: timestamppb.New(event.Timestamp),
|
||||
PublicKey: publicKey,
|
||||
FlowFields: &proto.FlowFields{
|
||||
FlowId: event.FlowID[:],
|
||||
RuleId: event.RuleID,
|
||||
@@ -321,9 +267,6 @@ func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
|
||||
TxBytes: event.TxBytes,
|
||||
SourceResourceId: event.SourceResourceID,
|
||||
DestResourceId: event.DestResourceID,
|
||||
NumOfStarts: event.NumOfStarts,
|
||||
NumOfEnds: event.NumOfEnds,
|
||||
NumOfDrops: event.NumOfDrops,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -1,291 +0,0 @@
|
||||
package netflow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/netbirdio/netbird/flow/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type testServer struct {
|
||||
proto.UnimplementedFlowServiceServer
|
||||
events chan *proto.FlowEvent
|
||||
acks chan *proto.FlowEventAck
|
||||
grpcSrv *grpc.Server
|
||||
addr string
|
||||
handlerDone chan struct{} // signaled each time Events() exits
|
||||
handlerStarted chan struct{} // signaled each time Events() begins
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T) *testServer {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
s := &testServer{
|
||||
events: make(chan *proto.FlowEvent, 100),
|
||||
acks: make(chan *proto.FlowEventAck, 100),
|
||||
grpcSrv: grpc.NewServer(),
|
||||
addr: listener.Addr().String(),
|
||||
handlerDone: make(chan struct{}, 10),
|
||||
handlerStarted: make(chan struct{}, 10),
|
||||
}
|
||||
|
||||
proto.RegisterFlowServiceServer(s.grpcSrv, s)
|
||||
|
||||
go func() {
|
||||
if err := s.grpcSrv.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
|
||||
t.Logf("server error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
t.Cleanup(func() {
|
||||
s.grpcSrv.Stop()
|
||||
})
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
|
||||
defer func() {
|
||||
select {
|
||||
case s.handlerDone <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
err := stream.Send(&proto.FlowEventAck{IsInitiator: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case s.handlerStarted <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(stream.Context())
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
for {
|
||||
event, err := stream.Recv()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !event.IsInitiator {
|
||||
select {
|
||||
case s.events <- event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case ack := <-s.acks:
|
||||
if err := stream.Send(ack); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendEventReceiveAck(t *testing.T) {
|
||||
_, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
server := newTestServer(t)
|
||||
manager := createManager(t, server.addr, 60*time.Second) // set high to prevent retries in this test
|
||||
defer manager.Close()
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
event1 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.2"),
|
||||
DestPort: 2345,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event1)
|
||||
event2 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.1"),
|
||||
DestPort: 1234,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event2)
|
||||
|
||||
// verify the server received logged events
|
||||
serverSideEvents := make([]*proto.FlowEvent, 0)
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case event := <-server.events:
|
||||
serverSideEvents = append(serverSideEvents, event)
|
||||
if len(serverSideEvents) == 2 {
|
||||
return true
|
||||
}
|
||||
default:
|
||||
if len(serverSideEvents) == 2 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
|
||||
serverSideFlowIds := make([]uuid.UUID, 0, 2)
|
||||
slices.Values(serverSideEvents)(func(e *proto.FlowEvent) bool {
|
||||
id, err := uuid.FromBytes(e.FlowFields.FlowId)
|
||||
assert.NoError(t, err)
|
||||
serverSideFlowIds = append(serverSideFlowIds, id)
|
||||
return true
|
||||
})
|
||||
assert.ElementsMatch(t, []uuid.UUID{event1.FlowID, event2.FlowID}, serverSideFlowIds)
|
||||
|
||||
// verify the manager tracks un-acked events
|
||||
unackedEvents := manager.eventsWithoutAcks.GetEvents()
|
||||
assert.Len(t, unackedEvents, 2)
|
||||
flowIds := make([]uuid.UUID, 0)
|
||||
slices.Values(unackedEvents)(func(e *types.Event) bool {
|
||||
flowIds = append(flowIds, e.FlowID)
|
||||
return true
|
||||
})
|
||||
assert.ElementsMatch(t, flowIds, []uuid.UUID{event1.FlowID, event2.FlowID})
|
||||
}
|
||||
|
||||
// verify handling of retries:
|
||||
// - unacked events are retried
|
||||
// - when acks arrive, events are removed from the un-acked event tracker
|
||||
func TestRetryEvents(t *testing.T) {
|
||||
_, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
server := newTestServer(t)
|
||||
manager := createManager(t, server.addr, time.Second) // set low to start retries sooner
|
||||
defer manager.Close()
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
event1 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.2"),
|
||||
DestPort: 2345,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event1)
|
||||
event2 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.1"),
|
||||
DestPort: 1234,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event2)
|
||||
|
||||
// verify the server received retries of logged events
|
||||
serverSideEvents := make([]*proto.FlowEvent, 0)
|
||||
func() {
|
||||
c := time.After(2500 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case event := <-server.events:
|
||||
serverSideEvents = append(serverSideEvents, event)
|
||||
case <-c:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
assert.True(t, len(serverSideEvents) > 2) // must see retries
|
||||
|
||||
uniqueServerSideEvents := make(map[uuid.UUID]*proto.FlowEvent)
|
||||
slices.Values(serverSideEvents)(func(e *proto.FlowEvent) bool {
|
||||
id, err := uuid.FromBytes(e.FlowFields.FlowId)
|
||||
assert.NoError(t, err)
|
||||
uniqueServerSideEvents[id] = e
|
||||
return true
|
||||
})
|
||||
assert.Contains(t, uniqueServerSideEvents, event1.FlowID)
|
||||
assert.Contains(t, uniqueServerSideEvents, event2.FlowID)
|
||||
|
||||
// ack events
|
||||
server.acks <- &proto.FlowEventAck{EventId: uniqueServerSideEvents[event1.FlowID].EventId}
|
||||
server.acks <- &proto.FlowEventAck{EventId: uniqueServerSideEvents[event2.FlowID].EventId}
|
||||
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
unackedEvents := manager.eventsWithoutAcks.GetEvents()
|
||||
assert.Empty(c, unackedEvents)
|
||||
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
func createManager(t *testing.T, serverAddr string, retryInterval time.Duration) *Manager {
|
||||
t.Helper()
|
||||
|
||||
mockIFace := &mockIFaceMapper{
|
||||
address: wgaddr.Address{
|
||||
Network: netip.MustParsePrefix("192.168.1.1/32"),
|
||||
},
|
||||
isUserspaceBind: true,
|
||||
}
|
||||
|
||||
publicKey := []byte("test-public-key")
|
||||
manager := NewManager(mockIFace, publicKey, nil)
|
||||
manager.retryInterval = retryInterval
|
||||
|
||||
initialConfig := &types.FlowConfig{
|
||||
Enabled: true,
|
||||
URL: fmt.Sprintf("http://%s", serverAddr),
|
||||
TokenPayload: "initial-payload",
|
||||
TokenSignature: "initial-signature",
|
||||
Interval: 500 * time.Millisecond,
|
||||
}
|
||||
|
||||
err := manager.Update(initialConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
return manager
|
||||
}
|
||||
|
||||
func ipAddr(a string) netip.Addr {
|
||||
addr, _ := netip.ParseAddr(a)
|
||||
return addr
|
||||
}
|
||||
@@ -1,334 +0,0 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"net/netip"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var random = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
func TestFlowAggregation(t *testing.T) {
|
||||
var protocols = []types.Protocol{types.ICMP, types.ICMPv6, types.TCP, types.UDP}
|
||||
var tests = []struct {
|
||||
description string
|
||||
addresses [][]netip.Addr
|
||||
dstPort uint16
|
||||
eventTypes []types.Type
|
||||
}{
|
||||
{
|
||||
description: "start and stop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
|
||||
},
|
||||
{
|
||||
description: "start and drop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
|
||||
},
|
||||
{
|
||||
description: "start only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart},
|
||||
},
|
||||
{
|
||||
description: "drop only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeDrop},
|
||||
}}
|
||||
|
||||
for _, protocol := range protocols {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description+" "+protocol.String(), func(t *testing.T) {
|
||||
store := NewAggregatingMemoryStore()
|
||||
store.WindowEnd = time.Now().Add(5 * time.Second)
|
||||
|
||||
allExpected := make([]*types.Event, 0)
|
||||
|
||||
for _, srcAndDst := range tt.addresses {
|
||||
inEvents, expected := generateEvents(srcAndDst[0], srcAndDst[1], tt.dstPort, tt.eventTypes, protocol, types.Ingress, 0, store.WindowStart, store.WindowEnd)
|
||||
for _, e := range inEvents {
|
||||
store.StoreEvent(e)
|
||||
}
|
||||
allExpected = append(allExpected, expected)
|
||||
}
|
||||
|
||||
events := store.GetAggregatedEvents()
|
||||
assert.ElementsMatch(t, events, allExpected)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIcmpEventAggregation(t *testing.T) {
|
||||
var protocols = []types.Protocol{types.ICMP, types.ICMPv6}
|
||||
var icmpTypes = []uint8{1, 2, 3}
|
||||
|
||||
var tests = []struct {
|
||||
description string
|
||||
addresses [][]netip.Addr
|
||||
eventTypes []types.Type
|
||||
}{
|
||||
{
|
||||
description: "start and stop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
|
||||
},
|
||||
{
|
||||
description: "start and drop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
|
||||
},
|
||||
{
|
||||
description: "start only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeStart},
|
||||
},
|
||||
{
|
||||
description: "drop only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeDrop},
|
||||
}}
|
||||
|
||||
for _, protocol := range protocols {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description+" "+protocol.String(), func(t *testing.T) {
|
||||
store := NewAggregatingMemoryStore()
|
||||
store.WindowEnd = time.Now().Add(5 * time.Second)
|
||||
|
||||
allExpected := make([]*types.Event, 0)
|
||||
for _, icmpType := range icmpTypes {
|
||||
events, expected := generateEvents(tt.addresses[0][0], tt.addresses[0][1], 0, tt.eventTypes, protocol, types.Ingress, icmpType, store.WindowStart, store.WindowEnd)
|
||||
for _, e := range events {
|
||||
store.StoreEvent(e)
|
||||
}
|
||||
allExpected = append(allExpected, expected)
|
||||
}
|
||||
aggregatedEvents := store.GetAggregatedEvents()
|
||||
assert.Len(t, aggregatedEvents, len(allExpected))
|
||||
assert.ElementsMatch(t, aggregatedEvents, allExpected)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlowAggregationOfUnknownProtocols(t *testing.T) {
|
||||
var tests = []struct {
|
||||
description string
|
||||
addresses [][]netip.Addr
|
||||
dstPort uint16
|
||||
eventTypes []types.Type
|
||||
}{
|
||||
{
|
||||
description: "start and stop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
|
||||
},
|
||||
{
|
||||
description: "start and drop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
|
||||
},
|
||||
{
|
||||
description: "start only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart},
|
||||
},
|
||||
{
|
||||
description: "drop only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeDrop},
|
||||
}}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description+" "+types.ProtocolUnknown.String(), func(t *testing.T) {
|
||||
store := NewAggregatingMemoryStore()
|
||||
store.WindowEnd = time.Now().Add(5 * time.Second)
|
||||
|
||||
allExpected := make([]*types.Event, 0)
|
||||
|
||||
for _, srcAndDst := range tt.addresses {
|
||||
inEvents, expected := generateEventsForUnknownProtocol(srcAndDst[0], srcAndDst[1], tt.dstPort, tt.eventTypes, types.ProtocolUnknown, types.Ingress, store.WindowStart, store.WindowEnd)
|
||||
for _, e := range inEvents {
|
||||
store.StoreEvent(e)
|
||||
}
|
||||
allExpected = append(allExpected, expected...)
|
||||
}
|
||||
|
||||
events := store.GetAggregatedEvents()
|
||||
assert.ElementsMatch(t, events, allExpected)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func ipAddr(a string) netip.Addr {
|
||||
addr, _ := netip.ParseAddr(a)
|
||||
return addr
|
||||
}
|
||||
|
||||
func generateEvents(srcIp, dstIp netip.Addr, dstPort uint16, eventTypes []types.Type, protocol types.Protocol,
|
||||
direction types.Direction, icmpType uint8, windowStart, windowEnd time.Time) ([]*types.Event, *types.Event) {
|
||||
var rxPackets, txPackets, rxBytes, txBytes uint64
|
||||
inEvents := make([]*types.Event, 0)
|
||||
ts := time.Now()
|
||||
flowId := uuid.New()
|
||||
srcPort := uint16(random.Uint32() >> 16)
|
||||
|
||||
for idx, eventType := range eventTypes {
|
||||
e := &types.Event{
|
||||
ID: uuid.New(),
|
||||
Timestamp: ts.Add(time.Duration(idx) * time.Second),
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: eventType,
|
||||
Protocol: protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: random.Uint64(),
|
||||
TxPackets: random.Uint64(),
|
||||
RxBytes: random.Uint64(),
|
||||
TxBytes: random.Uint64(),
|
||||
}}
|
||||
rxBytes += e.RxBytes
|
||||
txBytes += e.TxBytes
|
||||
rxPackets += e.RxPackets
|
||||
txPackets += e.TxPackets
|
||||
inEvents = append(inEvents, e)
|
||||
if protocol == types.ICMP || protocol == types.ICMPv6 {
|
||||
e.ICMPType = icmpType
|
||||
}
|
||||
}
|
||||
|
||||
var start, end, drop uint64
|
||||
for _, eventType := range eventTypes {
|
||||
switch eventType {
|
||||
case types.TypeStart:
|
||||
start += 1
|
||||
case types.TypeDrop:
|
||||
drop += 1
|
||||
case types.TypeEnd:
|
||||
end += 1
|
||||
}
|
||||
}
|
||||
aggregatedEvent := &types.Event{
|
||||
ID: inEvents[0].ID,
|
||||
Timestamp: inEvents[0].Timestamp,
|
||||
WindowStart: windowStart,
|
||||
WindowEnd: windowEnd,
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: types.TypeUnknown,
|
||||
Protocol: inEvents[0].Protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: inEvents[0].Direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: rxPackets,
|
||||
TxPackets: txPackets,
|
||||
RxBytes: rxBytes,
|
||||
TxBytes: txBytes,
|
||||
NumOfStarts: start,
|
||||
NumOfEnds: end,
|
||||
NumOfDrops: drop,
|
||||
}}
|
||||
if protocol == types.ICMP || protocol == types.ICMPv6 {
|
||||
aggregatedEvent.ICMPType = icmpType
|
||||
}
|
||||
|
||||
return inEvents, aggregatedEvent
|
||||
}
|
||||
|
||||
func generateEventsForUnknownProtocol(srcIp, dstIp netip.Addr, dstPort uint16, eventTypes []types.Type, protocol types.Protocol,
|
||||
direction types.Direction, windowStart, windowEnd time.Time) ([]*types.Event, []*types.Event) {
|
||||
inEvents := make([]*types.Event, 0)
|
||||
expectedEvents := make([]*types.Event, 0)
|
||||
|
||||
ts := time.Now()
|
||||
flowId := uuid.New()
|
||||
srcPort := uint16(random.Uint32() >> 16)
|
||||
|
||||
for idx, eventType := range eventTypes {
|
||||
e := &types.Event{
|
||||
ID: uuid.New(),
|
||||
Timestamp: ts.Add(time.Duration(idx) * time.Second),
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: eventType,
|
||||
Protocol: protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: random.Uint64(),
|
||||
TxPackets: random.Uint64(),
|
||||
RxBytes: random.Uint64(),
|
||||
TxBytes: random.Uint64(),
|
||||
}}
|
||||
inEvents = append(inEvents, e)
|
||||
|
||||
var start, end, drop uint64
|
||||
switch eventType {
|
||||
case types.TypeStart:
|
||||
start = 1
|
||||
case types.TypeDrop:
|
||||
drop = 1
|
||||
case types.TypeEnd:
|
||||
end = 1
|
||||
}
|
||||
|
||||
expectedEvents = append(expectedEvents, &types.Event{
|
||||
ID: e.ID,
|
||||
Timestamp: e.Timestamp,
|
||||
WindowStart: windowStart,
|
||||
WindowEnd: windowEnd,
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: types.TypeUnknown,
|
||||
Protocol: e.Protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: e.Direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: e.RxPackets,
|
||||
TxPackets: e.TxPackets,
|
||||
RxBytes: e.RxBytes,
|
||||
TxBytes: e.TxBytes,
|
||||
NumOfStarts: start,
|
||||
NumOfEnds: end,
|
||||
NumOfDrops: drop,
|
||||
}})
|
||||
}
|
||||
|
||||
return inEvents, expectedEvents
|
||||
}
|
||||
@@ -1,15 +1,10 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"maps"
|
||||
"math/rand"
|
||||
v2 "math/rand/v2"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
)
|
||||
|
||||
@@ -24,13 +19,6 @@ type Memory struct {
|
||||
events map[uuid.UUID]*types.Event
|
||||
}
|
||||
|
||||
type AggregatingMemory struct {
|
||||
Memory
|
||||
WindowStart time.Time
|
||||
WindowEnd time.Time
|
||||
rnd *v2.PCG
|
||||
}
|
||||
|
||||
func (m *Memory) StoreEvent(event *types.Event) {
|
||||
m.mux.Lock()
|
||||
defer m.mux.Unlock()
|
||||
@@ -60,92 +48,3 @@ func (m *Memory) DeleteEvents(ids []uuid.UUID) {
|
||||
delete(m.events, id)
|
||||
}
|
||||
}
|
||||
|
||||
func NewAggregatingMemoryStore() *AggregatingMemory {
|
||||
return &AggregatingMemory{WindowStart: time.Now(), Memory: Memory{events: make(map[uuid.UUID]*types.Event)}, rnd: v2.NewPCG(rand.Uint64(), rand.Uint64())}
|
||||
}
|
||||
|
||||
func (am *AggregatingMemory) ResetAggregationWindow() types.FlowEventAggregator {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
toret := AggregatingMemory{WindowStart: am.WindowStart, WindowEnd: now, Memory: Memory{events: am.events}}
|
||||
|
||||
am.events = make(map[uuid.UUID]*types.Event)
|
||||
am.WindowStart = now
|
||||
|
||||
return &toret
|
||||
}
|
||||
|
||||
type aggregationKey struct {
|
||||
srcAddr netip.Addr
|
||||
destAddr netip.Addr
|
||||
destPort uint16
|
||||
direction int
|
||||
protocol uint8
|
||||
icmpType uint8
|
||||
unique uint64 // used to prevent aggregation on non icmp/udp/tcp events
|
||||
}
|
||||
|
||||
func (am *AggregatingMemory) GetAggregatedEvents() []*types.Event {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
aggregated := make(map[aggregationKey]*types.Event)
|
||||
for _, v := range am.events {
|
||||
lookupKey := aggregationKey{srcAddr: v.SourceIP, destAddr: v.DestIP, destPort: v.DestPort, direction: int(v.Direction), protocol: uint8(v.Protocol), icmpType: v.ICMPType}
|
||||
if _, ok := aggregated[lookupKey]; !ok {
|
||||
event := v.Clone()
|
||||
|
||||
switch event.Type {
|
||||
case types.TypeStart:
|
||||
event.NumOfStarts += 1
|
||||
case types.TypeDrop:
|
||||
event.NumOfDrops += 1
|
||||
case types.TypeEnd:
|
||||
event.NumOfEnds += 1
|
||||
}
|
||||
event.Type = types.TypeUnknown
|
||||
|
||||
// Please note that ICMPCode field isn't propagated by the manager (see flow/proto/flow.pb.go, FlowFields struct)
|
||||
// so the field value in an icmp event in the "aggregated" doesn't matter
|
||||
|
||||
event.WindowStart = am.WindowStart
|
||||
event.WindowEnd = am.WindowEnd
|
||||
|
||||
if event.Protocol != types.ICMP && event.Protocol != types.ICMPv6 && event.Protocol != types.UDP && event.Protocol != types.TCP {
|
||||
lookupKey.unique = am.rnd.Uint64() // to make the lookup key unique so we don't aggregate on it
|
||||
}
|
||||
|
||||
aggregated[lookupKey] = event
|
||||
continue
|
||||
}
|
||||
|
||||
aggregatedEvent := aggregated[lookupKey]
|
||||
if aggregatedEvent.Protocol != types.ICMP && aggregatedEvent.Protocol != types.ICMPv6 && aggregatedEvent.Protocol != types.UDP && aggregatedEvent.Protocol != types.TCP {
|
||||
continue // we don't aggregate this type of events; shouldn't ever get here
|
||||
}
|
||||
|
||||
// track the number of connections, duration?, open and close events?
|
||||
aggregatedEvent.RxBytes += v.RxBytes
|
||||
aggregatedEvent.RxPackets += v.RxPackets
|
||||
aggregatedEvent.TxBytes += v.TxBytes
|
||||
aggregatedEvent.TxPackets += v.TxPackets
|
||||
switch v.Type {
|
||||
case types.TypeStart:
|
||||
aggregatedEvent.NumOfStarts += 1
|
||||
case types.TypeDrop:
|
||||
aggregatedEvent.NumOfDrops += 1
|
||||
case types.TypeEnd:
|
||||
aggregatedEvent.NumOfEnds += 1
|
||||
}
|
||||
if aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0 {
|
||||
aggregatedEvent.Timestamp = v.Timestamp
|
||||
aggregatedEvent.ID = v.ID
|
||||
aggregatedEvent.SourcePort = v.SourcePort
|
||||
}
|
||||
}
|
||||
|
||||
return slices.Collect(maps.Values(aggregated)) // could return an iterator instead here
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package types
|
||||
|
||||
import (
|
||||
"net/netip"
|
||||
"slices"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@@ -70,10 +69,8 @@ const (
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
ID uuid.UUID
|
||||
Timestamp time.Time
|
||||
WindowStart time.Time
|
||||
WindowEnd time.Time
|
||||
ID uuid.UUID
|
||||
Timestamp time.Time
|
||||
EventFields
|
||||
}
|
||||
|
||||
@@ -95,17 +92,6 @@ type EventFields struct {
|
||||
TxPackets uint64
|
||||
RxBytes uint64
|
||||
TxBytes uint64
|
||||
NumOfStarts uint64
|
||||
NumOfEnds uint64
|
||||
NumOfDrops uint64
|
||||
}
|
||||
|
||||
func (e *Event) Clone() *Event {
|
||||
toret := *e
|
||||
toret.RuleID = slices.Clone(e.RuleID)
|
||||
toret.SourceResourceID = slices.Clone(e.SourceResourceID)
|
||||
toret.DestResourceID = slices.Clone(e.DestResourceID)
|
||||
return &toret
|
||||
}
|
||||
|
||||
type FlowConfig struct {
|
||||
@@ -128,15 +114,13 @@ type FlowManager interface {
|
||||
GetLogger() FlowLogger
|
||||
}
|
||||
|
||||
type FlowEventAggregator interface {
|
||||
ResetAggregationWindow() FlowEventAggregator
|
||||
GetAggregatedEvents() []*Event
|
||||
}
|
||||
|
||||
type FlowLogger interface {
|
||||
ResetAggregationWindow() FlowEventAggregator
|
||||
// StoreEvent stores a flow event
|
||||
StoreEvent(flowEvent EventFields)
|
||||
// GetEvents returns all stored events
|
||||
GetEvents() []*Event
|
||||
// DeleteEvents deletes events from the store
|
||||
DeleteEvents([]uuid.UUID)
|
||||
// Close closes the logger
|
||||
Close()
|
||||
// Enable enables the flow logger receiver
|
||||
@@ -156,11 +140,6 @@ type Store interface {
|
||||
Close()
|
||||
}
|
||||
|
||||
type AggregatingStore interface {
|
||||
FlowEventAggregator
|
||||
Store
|
||||
}
|
||||
|
||||
// ConnTracker defines the interface for connection tracking functionality
|
||||
type ConnTracker interface {
|
||||
// Start begins tracking connections by listening for conntrack events.
|
||||
|
||||
@@ -134,11 +134,9 @@ type FlowEvent struct {
|
||||
// When the event occurred
|
||||
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
// Public key of the sending peer
|
||||
PublicKey []byte `protobuf:"bytes,3,opt,name=public_key,json=publicKey,proto3" json:"public_key,omitempty"`
|
||||
FlowFields *FlowFields `protobuf:"bytes,4,opt,name=flow_fields,json=flowFields,proto3" json:"flow_fields,omitempty"`
|
||||
IsInitiator bool `protobuf:"varint,5,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
|
||||
WindowStart *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=window_start,json=windowStart,proto3" json:"window_start,omitempty"`
|
||||
WindowEnd *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=window_end,json=windowEnd,proto3" json:"window_end,omitempty"`
|
||||
PublicKey []byte `protobuf:"bytes,3,opt,name=public_key,json=publicKey,proto3" json:"public_key,omitempty"`
|
||||
FlowFields *FlowFields `protobuf:"bytes,4,opt,name=flow_fields,json=flowFields,proto3" json:"flow_fields,omitempty"`
|
||||
IsInitiator bool `protobuf:"varint,5,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
|
||||
}
|
||||
|
||||
func (x *FlowEvent) Reset() {
|
||||
@@ -208,20 +206,6 @@ func (x *FlowEvent) GetIsInitiator() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *FlowEvent) GetWindowStart() *timestamppb.Timestamp {
|
||||
if x != nil {
|
||||
return x.WindowStart
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *FlowEvent) GetWindowEnd() *timestamppb.Timestamp {
|
||||
if x != nil {
|
||||
return x.WindowEnd
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type FlowEventAck struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
@@ -300,6 +284,7 @@ type FlowFields struct {
|
||||
// Layer 4 -specific information
|
||||
//
|
||||
// Types that are assignable to ConnectionInfo:
|
||||
//
|
||||
// *FlowFields_PortInfo
|
||||
// *FlowFields_IcmpInfo
|
||||
ConnectionInfo isFlowFields_ConnectionInfo `protobuf_oneof:"connection_info"`
|
||||
@@ -312,9 +297,6 @@ type FlowFields struct {
|
||||
// Resource ID
|
||||
SourceResourceId []byte `protobuf:"bytes,14,opt,name=source_resource_id,json=sourceResourceId,proto3" json:"source_resource_id,omitempty"`
|
||||
DestResourceId []byte `protobuf:"bytes,15,opt,name=dest_resource_id,json=destResourceId,proto3" json:"dest_resource_id,omitempty"`
|
||||
NumOfStarts uint64 `protobuf:"varint,16,opt,name=num_of_starts,json=numOfStarts,proto3" json:"num_of_starts,omitempty"`
|
||||
NumOfEnds uint64 `protobuf:"varint,17,opt,name=num_of_ends,json=numOfEnds,proto3" json:"num_of_ends,omitempty"`
|
||||
NumOfDrops uint64 `protobuf:"varint,18,opt,name=num_of_drops,json=numOfDrops,proto3" json:"num_of_drops,omitempty"`
|
||||
}
|
||||
|
||||
func (x *FlowFields) Reset() {
|
||||
@@ -461,27 +443,6 @@ func (x *FlowFields) GetDestResourceId() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *FlowFields) GetNumOfStarts() uint64 {
|
||||
if x != nil {
|
||||
return x.NumOfStarts
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *FlowFields) GetNumOfEnds() uint64 {
|
||||
if x != nil {
|
||||
return x.NumOfEnds
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *FlowFields) GetNumOfDrops() uint64 {
|
||||
if x != nil {
|
||||
return x.NumOfDrops
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type isFlowFields_ConnectionInfo interface {
|
||||
isFlowFields_ConnectionInfo()
|
||||
}
|
||||
@@ -618,7 +579,7 @@ var file_flow_proto_rawDesc = []byte{
|
||||
0x0a, 0x0a, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x22, 0xce, 0x02, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x6f, 0x74, 0x6f, 0x22, 0xd4, 0x01, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x74, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09,
|
||||
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
|
||||
@@ -631,59 +592,45 @@ var file_flow_proto_rawDesc = []byte{
|
||||
0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x52, 0x0a, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e,
|
||||
0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69,
|
||||
0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x3d, 0x0a, 0x0c, 0x77, 0x69,
|
||||
0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b,
|
||||
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
|
||||
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x77, 0x69,
|
||||
0x6e, 0x64, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x77, 0x69, 0x6e,
|
||||
0x64, 0x6f, 0x77, 0x5f, 0x65, 0x6e, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
|
||||
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
|
||||
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f,
|
||||
0x77, 0x45, 0x6e, 0x64, 0x22, 0x4b, 0x0a, 0x0c, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64,
|
||||
0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12,
|
||||
0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x02,
|
||||
0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f,
|
||||
0x72, 0x22, 0x82, 0x05, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73,
|
||||
0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
|
||||
0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x74, 0x79, 0x70,
|
||||
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54,
|
||||
0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x72, 0x75, 0x6c,
|
||||
0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x72, 0x75, 0x6c, 0x65,
|
||||
0x49, 0x64, 0x12, 0x2d, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18,
|
||||
0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x44, 0x69, 0x72,
|
||||
0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x05, 0x20,
|
||||
0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x1b, 0x0a,
|
||||
0x09, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x70, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c,
|
||||
0x52, 0x08, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x70, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x65,
|
||||
0x73, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64, 0x65, 0x73,
|
||||
0x74, 0x49, 0x70, 0x12, 0x2d, 0x0a, 0x09, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x69, 0x6e, 0x66, 0x6f,
|
||||
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x50, 0x6f,
|
||||
0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x70, 0x6f, 0x72, 0x74, 0x49, 0x6e,
|
||||
0x66, 0x6f, 0x12, 0x2d, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18,
|
||||
0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x43, 0x4d,
|
||||
0x50, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x69, 0x63, 0x6d, 0x70, 0x49, 0x6e, 0x66,
|
||||
0x6f, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x78, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x18,
|
||||
0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x78, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73,
|
||||
0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x78, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x0b,
|
||||
0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x78, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x12,
|
||||
0x19, 0x0a, 0x08, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28,
|
||||
0x04, 0x52, 0x07, 0x72, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x74, 0x78,
|
||||
0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x78,
|
||||
0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f,
|
||||
0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28,
|
||||
0x0c, 0x52, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63,
|
||||
0x65, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x10, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x72, 0x65, 0x73, 0x6f,
|
||||
0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x64,
|
||||
0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x22, 0x0a,
|
||||
0x0d, 0x6e, 0x75, 0x6d, 0x5f, 0x6f, 0x66, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x73, 0x18, 0x10,
|
||||
0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x6e, 0x75, 0x6d, 0x4f, 0x66, 0x53, 0x74, 0x61, 0x72, 0x74,
|
||||
0x73, 0x12, 0x1e, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x6f, 0x66, 0x5f, 0x65, 0x6e, 0x64, 0x73,
|
||||
0x18, 0x11, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6e, 0x75, 0x6d, 0x4f, 0x66, 0x45, 0x6e, 0x64,
|
||||
0x73, 0x12, 0x20, 0x0a, 0x0c, 0x6e, 0x75, 0x6d, 0x5f, 0x6f, 0x66, 0x5f, 0x64, 0x72, 0x6f, 0x70,
|
||||
0x73, 0x18, 0x12, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x4f, 0x66, 0x44, 0x72,
|
||||
0x6f, 0x70, 0x73, 0x42, 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f,
|
||||
0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x4b, 0x0a, 0x0c, 0x46, 0x6c,
|
||||
0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76,
|
||||
0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76,
|
||||
0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69,
|
||||
0x61, 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x49, 0x6e,
|
||||
0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x9c, 0x04, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77,
|
||||
0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69,
|
||||
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12,
|
||||
0x1e, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e,
|
||||
0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12,
|
||||
0x17, 0x0a, 0x07, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c,
|
||||
0x52, 0x06, 0x72, 0x75, 0x6c, 0x65, 0x49, 0x64, 0x12, 0x2d, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65,
|
||||
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x2e, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x64, 0x69,
|
||||
0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x63, 0x6f, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x63, 0x6f, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x70,
|
||||
0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x70,
|
||||
0x12, 0x17, 0x0a, 0x07, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28,
|
||||
0x0c, 0x52, 0x06, 0x64, 0x65, 0x73, 0x74, 0x49, 0x70, 0x12, 0x2d, 0x0a, 0x09, 0x70, 0x6f, 0x72,
|
||||
0x74, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66,
|
||||
0x6c, 0x6f, 0x77, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08,
|
||||
0x70, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70,
|
||||
0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x2e, 0x49, 0x43, 0x4d, 0x50, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x69,
|
||||
0x63, 0x6d, 0x70, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x78, 0x5f, 0x70, 0x61,
|
||||
0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x78, 0x50,
|
||||
0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x78, 0x5f, 0x70, 0x61, 0x63,
|
||||
0x6b, 0x65, 0x74, 0x73, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x78, 0x50, 0x61,
|
||||
0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65,
|
||||
0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x72, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73,
|
||||
0x12, 0x19, 0x0a, 0x08, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01,
|
||||
0x28, 0x04, 0x52, 0x07, 0x74, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x73,
|
||||
0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69,
|
||||
0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52,
|
||||
0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x10, 0x64, 0x65, 0x73,
|
||||
0x74, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0f, 0x20,
|
||||
0x01, 0x28, 0x0c, 0x52, 0x0e, 0x64, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63,
|
||||
0x65, 0x49, 0x64, 0x42, 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x22, 0x48, 0x0a, 0x08, 0x50, 0x6f, 0x72, 0x74, 0x49, 0x6e,
|
||||
0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x6f, 0x72,
|
||||
0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50,
|
||||
@@ -736,19 +683,17 @@ var file_flow_proto_goTypes = []interface{}{
|
||||
var file_flow_proto_depIdxs = []int32{
|
||||
7, // 0: flow.FlowEvent.timestamp:type_name -> google.protobuf.Timestamp
|
||||
4, // 1: flow.FlowEvent.flow_fields:type_name -> flow.FlowFields
|
||||
7, // 2: flow.FlowEvent.window_start:type_name -> google.protobuf.Timestamp
|
||||
7, // 3: flow.FlowEvent.window_end:type_name -> google.protobuf.Timestamp
|
||||
0, // 4: flow.FlowFields.type:type_name -> flow.Type
|
||||
1, // 5: flow.FlowFields.direction:type_name -> flow.Direction
|
||||
5, // 6: flow.FlowFields.port_info:type_name -> flow.PortInfo
|
||||
6, // 7: flow.FlowFields.icmp_info:type_name -> flow.ICMPInfo
|
||||
2, // 8: flow.FlowService.Events:input_type -> flow.FlowEvent
|
||||
3, // 9: flow.FlowService.Events:output_type -> flow.FlowEventAck
|
||||
9, // [9:10] is the sub-list for method output_type
|
||||
8, // [8:9] is the sub-list for method input_type
|
||||
8, // [8:8] is the sub-list for extension type_name
|
||||
8, // [8:8] is the sub-list for extension extendee
|
||||
0, // [0:8] is the sub-list for field type_name
|
||||
0, // 2: flow.FlowFields.type:type_name -> flow.Type
|
||||
1, // 3: flow.FlowFields.direction:type_name -> flow.Direction
|
||||
5, // 4: flow.FlowFields.port_info:type_name -> flow.PortInfo
|
||||
6, // 5: flow.FlowFields.icmp_info:type_name -> flow.ICMPInfo
|
||||
2, // 6: flow.FlowService.Events:input_type -> flow.FlowEvent
|
||||
3, // 7: flow.FlowService.Events:output_type -> flow.FlowEventAck
|
||||
7, // [7:8] is the sub-list for method output_type
|
||||
6, // [6:7] is the sub-list for method input_type
|
||||
6, // [6:6] is the sub-list for extension type_name
|
||||
6, // [6:6] is the sub-list for extension extendee
|
||||
0, // [0:6] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_flow_proto_init() }
|
||||
|
||||
@@ -24,9 +24,6 @@ message FlowEvent {
|
||||
FlowFields flow_fields = 4;
|
||||
|
||||
bool isInitiator = 5;
|
||||
|
||||
google.protobuf.Timestamp window_start = 6;
|
||||
google.protobuf.Timestamp window_end = 7;
|
||||
}
|
||||
|
||||
message FlowEventAck {
|
||||
@@ -78,9 +75,6 @@ message FlowFields {
|
||||
bytes source_resource_id = 14;
|
||||
bytes dest_resource_id = 15;
|
||||
|
||||
uint64 num_of_starts = 16;
|
||||
uint64 num_of_ends = 17;
|
||||
uint64 num_of_drops = 18;
|
||||
}
|
||||
|
||||
// Flow event types
|
||||
|
||||
@@ -1,8 +1,4 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.6.1
|
||||
// - protoc v3.21.9
|
||||
// source: flow.proto
|
||||
|
||||
package proto
|
||||
|
||||
@@ -15,19 +11,15 @@ import (
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.64.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion9
|
||||
|
||||
const (
|
||||
FlowService_Events_FullMethodName = "/flow.FlowService/Events"
|
||||
)
|
||||
// Requires gRPC-Go v1.32.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion7
|
||||
|
||||
// FlowServiceClient is the client API for FlowService service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type FlowServiceClient interface {
|
||||
// Client to receiver streams of events and acknowledgements
|
||||
Events(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[FlowEvent, FlowEventAck], error)
|
||||
Events(ctx context.Context, opts ...grpc.CallOption) (FlowService_EventsClient, error)
|
||||
}
|
||||
|
||||
type flowServiceClient struct {
|
||||
@@ -38,40 +30,54 @@ func NewFlowServiceClient(cc grpc.ClientConnInterface) FlowServiceClient {
|
||||
return &flowServiceClient{cc}
|
||||
}
|
||||
|
||||
func (c *flowServiceClient) Events(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[FlowEvent, FlowEventAck], error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
stream, err := c.cc.NewStream(ctx, &FlowService_ServiceDesc.Streams[0], FlowService_Events_FullMethodName, cOpts...)
|
||||
func (c *flowServiceClient) Events(ctx context.Context, opts ...grpc.CallOption) (FlowService_EventsClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, &FlowService_ServiceDesc.Streams[0], "/flow.FlowService/Events", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &grpc.GenericClientStream[FlowEvent, FlowEventAck]{ClientStream: stream}
|
||||
x := &flowServiceEventsClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type FlowService_EventsClient = grpc.BidiStreamingClient[FlowEvent, FlowEventAck]
|
||||
type FlowService_EventsClient interface {
|
||||
Send(*FlowEvent) error
|
||||
Recv() (*FlowEventAck, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type flowServiceEventsClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsClient) Send(m *FlowEvent) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsClient) Recv() (*FlowEventAck, error) {
|
||||
m := new(FlowEventAck)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// FlowServiceServer is the server API for FlowService service.
|
||||
// All implementations must embed UnimplementedFlowServiceServer
|
||||
// for forward compatibility.
|
||||
// for forward compatibility
|
||||
type FlowServiceServer interface {
|
||||
// Client to receiver streams of events and acknowledgements
|
||||
Events(grpc.BidiStreamingServer[FlowEvent, FlowEventAck]) error
|
||||
Events(FlowService_EventsServer) error
|
||||
mustEmbedUnimplementedFlowServiceServer()
|
||||
}
|
||||
|
||||
// UnimplementedFlowServiceServer must be embedded to have
|
||||
// forward compatible implementations.
|
||||
//
|
||||
// NOTE: this should be embedded by value instead of pointer to avoid a nil
|
||||
// pointer dereference when methods are called.
|
||||
type UnimplementedFlowServiceServer struct{}
|
||||
// UnimplementedFlowServiceServer must be embedded to have forward compatible implementations.
|
||||
type UnimplementedFlowServiceServer struct {
|
||||
}
|
||||
|
||||
func (UnimplementedFlowServiceServer) Events(grpc.BidiStreamingServer[FlowEvent, FlowEventAck]) error {
|
||||
return status.Error(codes.Unimplemented, "method Events not implemented")
|
||||
func (UnimplementedFlowServiceServer) Events(FlowService_EventsServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Events not implemented")
|
||||
}
|
||||
func (UnimplementedFlowServiceServer) mustEmbedUnimplementedFlowServiceServer() {}
|
||||
func (UnimplementedFlowServiceServer) testEmbeddedByValue() {}
|
||||
|
||||
// UnsafeFlowServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to FlowServiceServer will
|
||||
@@ -81,22 +87,34 @@ type UnsafeFlowServiceServer interface {
|
||||
}
|
||||
|
||||
func RegisterFlowServiceServer(s grpc.ServiceRegistrar, srv FlowServiceServer) {
|
||||
// If the following call panics, it indicates UnimplementedFlowServiceServer was
|
||||
// embedded by pointer and is nil. This will cause panics if an
|
||||
// unimplemented method is ever invoked, so we test this at initialization
|
||||
// time to prevent it from happening at runtime later due to I/O.
|
||||
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
|
||||
t.testEmbeddedByValue()
|
||||
}
|
||||
s.RegisterService(&FlowService_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _FlowService_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(FlowServiceServer).Events(&grpc.GenericServerStream[FlowEvent, FlowEventAck]{ServerStream: stream})
|
||||
return srv.(FlowServiceServer).Events(&flowServiceEventsServer{stream})
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type FlowService_EventsServer = grpc.BidiStreamingServer[FlowEvent, FlowEventAck]
|
||||
type FlowService_EventsServer interface {
|
||||
Send(*FlowEventAck) error
|
||||
Recv() (*FlowEvent, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type flowServiceEventsServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsServer) Send(m *FlowEventAck) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsServer) Recv() (*FlowEvent, error) {
|
||||
m := new(FlowEvent)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// FlowService_ServiceDesc is the grpc.ServiceDesc for FlowService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
|
||||
@@ -1916,6 +1916,117 @@ func TestDefaultAccountManager_MarkPeerConnected_PeerLoginExpiration(t *testing.
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_MarkPeerDisconnected_SchedulesInactivityExpiration(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
|
||||
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
|
||||
require.NoError(t, err, "unable to create an account")
|
||||
|
||||
key, err := wgtypes.GenerateKey()
|
||||
require.NoError(t, err, "unable to generate WireGuard key")
|
||||
peerPubKey := key.PublicKey().String()
|
||||
|
||||
_, _, _, _, err = manager.AddPeer(context.Background(), "", "", userID, &nbpeer.Peer{
|
||||
Key: peerPubKey,
|
||||
Meta: nbpeer.PeerSystemMeta{Hostname: "test-peer"},
|
||||
InactivityExpirationEnabled: true,
|
||||
}, false)
|
||||
require.NoError(t, err, "unable to add peer")
|
||||
|
||||
_, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{
|
||||
PeerLoginExpiration: time.Hour,
|
||||
PeerLoginExpirationEnabled: true,
|
||||
PeerInactivityExpiration: time.Hour,
|
||||
PeerInactivityExpirationEnabled: true,
|
||||
Extra: &types.ExtraSettings{},
|
||||
})
|
||||
require.NoError(t, err, "expecting to update account settings successfully but got error")
|
||||
|
||||
// Establish a session so the matching-token disconnect is actually applied.
|
||||
streamStartTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
// Install the mock only now, so the assertion observes the disconnect, not
|
||||
// the earlier connect.
|
||||
scheduled := make(chan struct{}, 1)
|
||||
manager.peerInactivityExpiry = &MockScheduler{
|
||||
CancelFunc: func(ctx context.Context, IDs []string) {},
|
||||
ScheduleFunc: func(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
|
||||
select {
|
||||
case scheduled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
err = manager.MarkPeerDisconnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano())
|
||||
require.NoError(t, err, "unable to mark peer disconnected")
|
||||
|
||||
select {
|
||||
case <-scheduled:
|
||||
// expected: disconnect re-armed the inactivity expiry timer
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("expected inactivity expiration to be rescheduled when an eligible peer disconnects")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_MarkPeerDisconnected_SkipsInactivityExpirationWhenDisabled(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
|
||||
accountID, err := manager.GetAccountIDByUserID(context.Background(), auth.UserAuth{UserId: userID})
|
||||
require.NoError(t, err, "unable to create an account")
|
||||
|
||||
key, err := wgtypes.GenerateKey()
|
||||
require.NoError(t, err, "unable to generate WireGuard key")
|
||||
peerPubKey := key.PublicKey().String()
|
||||
|
||||
_, _, _, _, err = manager.AddPeer(context.Background(), "", "", userID, &nbpeer.Peer{
|
||||
Key: peerPubKey,
|
||||
Meta: nbpeer.PeerSystemMeta{Hostname: "test-peer"},
|
||||
InactivityExpirationEnabled: true,
|
||||
}, false)
|
||||
require.NoError(t, err, "unable to add peer")
|
||||
|
||||
// Peer is eligible (SSO + inactivity enabled) but the account-level setting
|
||||
// stays disabled, so disconnect must not schedule anything.
|
||||
_, err = manager.UpdateAccountSettings(context.Background(), accountID, userID, &types.Settings{
|
||||
PeerLoginExpiration: time.Hour,
|
||||
PeerLoginExpirationEnabled: true,
|
||||
PeerInactivityExpiration: time.Hour,
|
||||
PeerInactivityExpirationEnabled: false,
|
||||
Extra: &types.ExtraSettings{},
|
||||
})
|
||||
require.NoError(t, err, "expecting to update account settings successfully but got error")
|
||||
|
||||
streamStartTime := time.Now().UTC()
|
||||
err = manager.MarkPeerConnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano(), nil)
|
||||
require.NoError(t, err, "unable to mark peer connected")
|
||||
|
||||
scheduled := make(chan struct{}, 1)
|
||||
manager.peerInactivityExpiry = &MockScheduler{
|
||||
CancelFunc: func(ctx context.Context, IDs []string) {},
|
||||
ScheduleFunc: func(ctx context.Context, in time.Duration, ID string, job func() (nextRunIn time.Duration, reschedule bool)) {
|
||||
select {
|
||||
case scheduled <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
err = manager.MarkPeerDisconnected(context.Background(), peerPubKey, accountID, streamStartTime.UnixNano())
|
||||
require.NoError(t, err, "unable to mark peer disconnected")
|
||||
|
||||
select {
|
||||
case <-scheduled:
|
||||
t.Fatal("inactivity expiration must not be scheduled while the account-level setting is disabled")
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
// expected: nothing scheduled
|
||||
}
|
||||
}
|
||||
|
||||
func TestDefaultAccountManager_OnPeerDisconnected_LastSeenCheck(t *testing.T) {
|
||||
manager, _, err := createManager(t)
|
||||
require.NoError(t, err, "unable to create account manager")
|
||||
|
||||
@@ -188,6 +188,15 @@ func (am *DefaultAccountManager) MarkPeerDisconnected(ctx context.Context, peerP
|
||||
}
|
||||
}
|
||||
|
||||
if peer.AddedWithSSOLogin() && peer.InactivityExpirationEnabled {
|
||||
settings, err := am.Store.GetAccountSettings(ctx, store.LockingStrengthNone, accountID)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed getting account settings to schedule inactivity expiration for peer %s: %v", peer.ID, err)
|
||||
} else if settings.PeerInactivityExpirationEnabled {
|
||||
am.checkAndSchedulePeerInactivityExpiration(ctx, accountID)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -2765,28 +2765,6 @@ components:
|
||||
type: integer
|
||||
description: "Number of packets transmitted."
|
||||
example: 5
|
||||
num_of_starts:
|
||||
type: integer
|
||||
description: "Number of start events."
|
||||
example: 3
|
||||
num_of_ends:
|
||||
type: integer
|
||||
description: "Number of end events."
|
||||
example: 4
|
||||
num_of_drops:
|
||||
type: integer
|
||||
description: "Number of drop events."
|
||||
example: 5
|
||||
window_start:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Timestamp of the start of the aggregation window.
|
||||
example: 2025-03-20T16:23:58.125397Z
|
||||
window_end:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Timestamp of the end of the aggregation window.
|
||||
example: 2025-03-20T16:23:58.125397Z
|
||||
events:
|
||||
type: array
|
||||
description: "List of events that are correlated to this flow (e.g., start, end)."
|
||||
@@ -2808,11 +2786,6 @@ components:
|
||||
- rx_packets
|
||||
- tx_bytes
|
||||
- tx_packets
|
||||
- num_of_starts
|
||||
- num_of_ends
|
||||
- num_of_drops
|
||||
- window_start
|
||||
- window_end
|
||||
- events
|
||||
NetworkTrafficEventsResponse:
|
||||
type: object
|
||||
|
||||
@@ -2905,18 +2905,9 @@ type NetworkTrafficEvent struct {
|
||||
Events []NetworkTrafficSubEvent `json:"events"`
|
||||
|
||||
// FlowId FlowID is the ID of the connection flow. Not unique because it can be the same for multiple events (e.g., start and end of the connection).
|
||||
FlowId string `json:"flow_id"`
|
||||
Icmp NetworkTrafficICMP `json:"icmp"`
|
||||
|
||||
// NumOfDrops Number of drop events.
|
||||
NumOfDrops int `json:"num_of_drops"`
|
||||
|
||||
// NumOfEnds Number of end events.
|
||||
NumOfEnds int `json:"num_of_ends"`
|
||||
|
||||
// NumOfStarts Number of start events.
|
||||
NumOfStarts int `json:"num_of_starts"`
|
||||
Policy NetworkTrafficPolicy `json:"policy"`
|
||||
FlowId string `json:"flow_id"`
|
||||
Icmp NetworkTrafficICMP `json:"icmp"`
|
||||
Policy NetworkTrafficPolicy `json:"policy"`
|
||||
|
||||
// Protocol Protocol is the protocol of the traffic (e.g. 1 = ICMP, 6 = TCP, 17 = UDP, etc.).
|
||||
Protocol int `json:"protocol"`
|
||||
@@ -2937,12 +2928,6 @@ type NetworkTrafficEvent struct {
|
||||
// TxPackets Number of packets transmitted.
|
||||
TxPackets int `json:"tx_packets"`
|
||||
User NetworkTrafficUser `json:"user"`
|
||||
|
||||
// WindowEnd Timestamp of the end of the aggregation window.
|
||||
WindowEnd time.Time `json:"window_end"`
|
||||
|
||||
// WindowStart Timestamp of the start of the aggregation window.
|
||||
WindowStart time.Time `json:"window_start"`
|
||||
}
|
||||
|
||||
// NetworkTrafficEventsResponse defines model for NetworkTrafficEventsResponse.
|
||||
|
||||
Reference in New Issue
Block a user