mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-26 17:59:56 +00:00
Compare commits
25 Commits
dmitri-eve
...
test/affec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e048f221f0 | ||
|
|
4e8780e0c3 | ||
|
|
a1933f792a | ||
|
|
ccf1e965c3 | ||
|
|
38bd53eb71 | ||
|
|
845dd0c9bb | ||
|
|
2f0093a7e1 | ||
|
|
c5d26106f2 | ||
|
|
9746bc2a08 | ||
|
|
42867c7a59 | ||
|
|
d7740f9868 | ||
|
|
c2db940a8c | ||
|
|
62ffa08744 | ||
|
|
d8e7f2e9e6 | ||
|
|
1205641b44 | ||
|
|
de0cf0fc7a | ||
|
|
57f475c5a9 | ||
|
|
56e8215ebe | ||
|
|
5ae36cd260 | ||
|
|
9b768d1773 | ||
|
|
33954ea15e | ||
|
|
4c4434a871 | ||
|
|
9eadf50f4c | ||
|
|
be06016ad2 | ||
|
|
7873f337df |
@@ -27,7 +27,7 @@ type Logger struct {
|
||||
wgIfaceNetV6 netip.Prefix
|
||||
dnsCollection atomic.Bool
|
||||
exitNodeCollection atomic.Bool
|
||||
Store types.AggregatingStore
|
||||
Store types.Store
|
||||
}
|
||||
|
||||
func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix) *Logger {
|
||||
@@ -35,7 +35,7 @@ func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix)
|
||||
statusRecorder: statusRecorder,
|
||||
wgIfaceNet: wgIfaceIPNet,
|
||||
wgIfaceNetV6: wgIfaceIPNetV6,
|
||||
Store: store.NewAggregatingMemoryStore(),
|
||||
Store: store.NewMemoryStore(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,10 +125,6 @@ func (l *Logger) stop() {
|
||||
l.mux.Unlock()
|
||||
}
|
||||
|
||||
func (l *Logger) ResetAggregationWindow() types.FlowEventAggregator {
|
||||
return l.Store.ResetAggregationWindow()
|
||||
}
|
||||
|
||||
func (l *Logger) GetEvents() []*types.Event {
|
||||
return l.Store.GetEvents()
|
||||
}
|
||||
|
||||
@@ -9,14 +9,12 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/conntrack"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/logger"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/store"
|
||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
"github.com/netbirdio/netbird/flow/client"
|
||||
@@ -25,16 +23,14 @@ import (
|
||||
|
||||
// Manager handles netflow tracking and logging
|
||||
type Manager struct {
|
||||
mux sync.Mutex
|
||||
shutdownWg sync.WaitGroup
|
||||
logger nftypes.FlowLogger
|
||||
flowConfig *nftypes.FlowConfig
|
||||
conntrack nftypes.ConnTracker
|
||||
receiverClient *client.GRPCClient
|
||||
eventsWithoutAcks nftypes.Store
|
||||
publicKey []byte
|
||||
cancel context.CancelFunc
|
||||
retryInterval time.Duration
|
||||
mux sync.Mutex
|
||||
shutdownWg sync.WaitGroup
|
||||
logger nftypes.FlowLogger
|
||||
flowConfig *nftypes.FlowConfig
|
||||
conntrack nftypes.ConnTracker
|
||||
receiverClient *client.GRPCClient
|
||||
publicKey []byte
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// NewManager creates a new netflow manager
|
||||
@@ -52,11 +48,9 @@ func NewManager(iface nftypes.IFaceMapper, publicKey []byte, statusRecorder *pee
|
||||
}
|
||||
|
||||
return &Manager{
|
||||
logger: flowLogger,
|
||||
conntrack: ct,
|
||||
publicKey: publicKey,
|
||||
retryInterval: time.Second,
|
||||
eventsWithoutAcks: store.NewMemoryStore(),
|
||||
logger: flowLogger,
|
||||
conntrack: ct,
|
||||
publicKey: publicKey,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +66,6 @@ func (m *Manager) needsNewClient(previous *nftypes.FlowConfig) bool {
|
||||
}
|
||||
|
||||
// enableFlow starts components for flow tracking
|
||||
// must be called under m.mux lock
|
||||
func (m *Manager) enableFlow(previous *nftypes.FlowConfig) error {
|
||||
// first make sender ready so events don't pile up
|
||||
if m.needsNewClient(previous) {
|
||||
@@ -92,7 +85,6 @@ func (m *Manager) enableFlow(previous *nftypes.FlowConfig) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// must be called under m.mux lock
|
||||
func (m *Manager) resetClient() error {
|
||||
if m.receiverClient != nil {
|
||||
if err := m.receiverClient.Close(); err != nil {
|
||||
@@ -115,19 +107,14 @@ func (m *Manager) resetClient() error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
m.cancel = cancel
|
||||
|
||||
m.shutdownWg.Add(3)
|
||||
flowConfigInterval := m.flowConfig.Interval
|
||||
m.shutdownWg.Add(2)
|
||||
go func() {
|
||||
defer m.shutdownWg.Done()
|
||||
m.receiveACKs(ctx, flowClient, flowConfigInterval)
|
||||
m.receiveACKs(ctx, flowClient)
|
||||
}()
|
||||
go func() {
|
||||
defer m.shutdownWg.Done()
|
||||
m.startSender(ctx, flowConfigInterval)
|
||||
}()
|
||||
go func() {
|
||||
defer m.shutdownWg.Done()
|
||||
m.startRetries(ctx, flowConfigInterval)
|
||||
m.startSender(ctx)
|
||||
}()
|
||||
|
||||
return nil
|
||||
@@ -211,8 +198,8 @@ func (m *Manager) GetLogger() nftypes.FlowLogger {
|
||||
return m.logger
|
||||
}
|
||||
|
||||
func (m *Manager) startSender(ctx context.Context, flowConfigInterval time.Duration) {
|
||||
ticker := time.NewTicker(flowConfigInterval)
|
||||
func (m *Manager) startSender(ctx context.Context) {
|
||||
ticker := time.NewTicker(m.flowConfig.Interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
@@ -220,29 +207,27 @@ func (m *Manager) startSender(ctx context.Context, flowConfigInterval time.Durat
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
collectedEvents := m.logger.ResetAggregationWindow()
|
||||
events := collectedEvents.GetAggregatedEvents()
|
||||
events := m.logger.GetEvents()
|
||||
for _, event := range events {
|
||||
if err := m.send(event); err != nil {
|
||||
log.Errorf("failed to send flow event to server: %v", err)
|
||||
} else {
|
||||
log.Tracef("sent flow event: %s", event.ID)
|
||||
continue
|
||||
}
|
||||
m.eventsWithoutAcks.StoreEvent(event)
|
||||
log.Tracef("sent flow event: %s", event.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient, flowConfigInterval time.Duration) {
|
||||
err := client.Receive(ctx, flowConfigInterval, func(ack *proto.FlowEventAck) error {
|
||||
func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
|
||||
err := client.Receive(ctx, m.flowConfig.Interval, func(ack *proto.FlowEventAck) error {
|
||||
id, err := uuid.FromBytes(ack.EventId)
|
||||
if err != nil {
|
||||
log.Warnf("failed to convert ack event id to uuid: %v", err)
|
||||
return nil
|
||||
}
|
||||
log.Tracef("received flow event ack: %s", id)
|
||||
m.eventsWithoutAcks.DeleteEvents([]uuid.UUID{id})
|
||||
m.logger.DeleteEvents([]uuid.UUID{id})
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -251,43 +236,6 @@ func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient, fl
|
||||
}
|
||||
}
|
||||
|
||||
// We effectively never drop events (see MaxInterval), which makes eventsWithoutAcks unbounded.
|
||||
// We may want to limit the max size of the store, and start dropping oldest events when the threshold is reached.
|
||||
func (m *Manager) startRetries(ctx context.Context, flowConfigInterval time.Duration) {
|
||||
timer := time.NewTimer(m.retryInterval)
|
||||
retryBackoff := backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 1 * time.Second,
|
||||
RandomizationFactor: 0.5,
|
||||
Multiplier: 1.7,
|
||||
MaxInterval: flowConfigInterval / 2,
|
||||
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
for _, e := range m.eventsWithoutAcks.GetEvents() {
|
||||
if e.Timestamp.Add(time.Second).After(time.Now()) {
|
||||
// grace period on retries to avoid early retries
|
||||
// do not retry if the event is less than 1 sec old
|
||||
continue
|
||||
}
|
||||
if err := m.send(e); err != nil {
|
||||
timer = time.NewTimer(retryBackoff.NextBackOff()) //nolint:staticcheck,wastedassign
|
||||
break
|
||||
}
|
||||
}
|
||||
retryBackoff.Reset()
|
||||
timer = time.NewTimer(m.retryInterval)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) send(event *nftypes.Event) error {
|
||||
m.mux.Lock()
|
||||
client := m.receiverClient
|
||||
@@ -302,11 +250,9 @@ func (m *Manager) send(event *nftypes.Event) error {
|
||||
|
||||
func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
|
||||
protoEvent := &proto.FlowEvent{
|
||||
EventId: event.ID[:],
|
||||
Timestamp: timestamppb.New(event.Timestamp),
|
||||
PublicKey: publicKey,
|
||||
WindowStart: timestamppb.New(event.WindowStart),
|
||||
WindowEnd: timestamppb.New(event.WindowEnd),
|
||||
EventId: event.ID[:],
|
||||
Timestamp: timestamppb.New(event.Timestamp),
|
||||
PublicKey: publicKey,
|
||||
FlowFields: &proto.FlowFields{
|
||||
FlowId: event.FlowID[:],
|
||||
RuleId: event.RuleID,
|
||||
@@ -321,9 +267,6 @@ func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
|
||||
TxBytes: event.TxBytes,
|
||||
SourceResourceId: event.SourceResourceID,
|
||||
DestResourceId: event.DestResourceID,
|
||||
NumOfStarts: event.NumOfStarts,
|
||||
NumOfEnds: event.NumOfEnds,
|
||||
NumOfDrops: event.NumOfDrops,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
@@ -1,291 +0,0 @@
|
||||
package netflow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/netbirdio/netbird/flow/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type testServer struct {
|
||||
proto.UnimplementedFlowServiceServer
|
||||
events chan *proto.FlowEvent
|
||||
acks chan *proto.FlowEventAck
|
||||
grpcSrv *grpc.Server
|
||||
addr string
|
||||
handlerDone chan struct{} // signaled each time Events() exits
|
||||
handlerStarted chan struct{} // signaled each time Events() begins
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T) *testServer {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
s := &testServer{
|
||||
events: make(chan *proto.FlowEvent, 100),
|
||||
acks: make(chan *proto.FlowEventAck, 100),
|
||||
grpcSrv: grpc.NewServer(),
|
||||
addr: listener.Addr().String(),
|
||||
handlerDone: make(chan struct{}, 10),
|
||||
handlerStarted: make(chan struct{}, 10),
|
||||
}
|
||||
|
||||
proto.RegisterFlowServiceServer(s.grpcSrv, s)
|
||||
|
||||
go func() {
|
||||
if err := s.grpcSrv.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
|
||||
t.Logf("server error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
t.Cleanup(func() {
|
||||
s.grpcSrv.Stop()
|
||||
})
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
|
||||
defer func() {
|
||||
select {
|
||||
case s.handlerDone <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
err := stream.Send(&proto.FlowEventAck{IsInitiator: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case s.handlerStarted <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(stream.Context())
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
for {
|
||||
event, err := stream.Recv()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !event.IsInitiator {
|
||||
select {
|
||||
case s.events <- event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case ack := <-s.acks:
|
||||
if err := stream.Send(ack); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendEventReceiveAck(t *testing.T) {
|
||||
_, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
server := newTestServer(t)
|
||||
manager := createManager(t, server.addr, 60*time.Second) // set high to prevent retries in this test
|
||||
defer manager.Close()
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
event1 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.2"),
|
||||
DestPort: 2345,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event1)
|
||||
event2 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.1"),
|
||||
DestPort: 1234,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event2)
|
||||
|
||||
// verify the server received logged events
|
||||
serverSideEvents := make([]*proto.FlowEvent, 0)
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case event := <-server.events:
|
||||
serverSideEvents = append(serverSideEvents, event)
|
||||
if len(serverSideEvents) == 2 {
|
||||
return true
|
||||
}
|
||||
default:
|
||||
if len(serverSideEvents) == 2 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
|
||||
serverSideFlowIds := make([]uuid.UUID, 0, 2)
|
||||
slices.Values(serverSideEvents)(func(e *proto.FlowEvent) bool {
|
||||
id, err := uuid.FromBytes(e.FlowFields.FlowId)
|
||||
assert.NoError(t, err)
|
||||
serverSideFlowIds = append(serverSideFlowIds, id)
|
||||
return true
|
||||
})
|
||||
assert.ElementsMatch(t, []uuid.UUID{event1.FlowID, event2.FlowID}, serverSideFlowIds)
|
||||
|
||||
// verify the manager tracks un-acked events
|
||||
unackedEvents := manager.eventsWithoutAcks.GetEvents()
|
||||
assert.Len(t, unackedEvents, 2)
|
||||
flowIds := make([]uuid.UUID, 0)
|
||||
slices.Values(unackedEvents)(func(e *types.Event) bool {
|
||||
flowIds = append(flowIds, e.FlowID)
|
||||
return true
|
||||
})
|
||||
assert.ElementsMatch(t, flowIds, []uuid.UUID{event1.FlowID, event2.FlowID})
|
||||
}
|
||||
|
||||
// verify handling of retries:
|
||||
// - unacked events are retried
|
||||
// - when acks arrive, events are removed from the un-acked event tracker
|
||||
func TestRetryEvents(t *testing.T) {
|
||||
_, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
server := newTestServer(t)
|
||||
manager := createManager(t, server.addr, time.Second) // set low to start retries sooner
|
||||
defer manager.Close()
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
event1 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.2"),
|
||||
DestPort: 2345,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event1)
|
||||
event2 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.1"),
|
||||
DestPort: 1234,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event2)
|
||||
|
||||
// verify the server received retries of logged events
|
||||
serverSideEvents := make([]*proto.FlowEvent, 0)
|
||||
func() {
|
||||
c := time.After(2500 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case event := <-server.events:
|
||||
serverSideEvents = append(serverSideEvents, event)
|
||||
case <-c:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
assert.True(t, len(serverSideEvents) > 2) // must see retries
|
||||
|
||||
uniqueServerSideEvents := make(map[uuid.UUID]*proto.FlowEvent)
|
||||
slices.Values(serverSideEvents)(func(e *proto.FlowEvent) bool {
|
||||
id, err := uuid.FromBytes(e.FlowFields.FlowId)
|
||||
assert.NoError(t, err)
|
||||
uniqueServerSideEvents[id] = e
|
||||
return true
|
||||
})
|
||||
assert.Contains(t, uniqueServerSideEvents, event1.FlowID)
|
||||
assert.Contains(t, uniqueServerSideEvents, event2.FlowID)
|
||||
|
||||
// ack events
|
||||
server.acks <- &proto.FlowEventAck{EventId: uniqueServerSideEvents[event1.FlowID].EventId}
|
||||
server.acks <- &proto.FlowEventAck{EventId: uniqueServerSideEvents[event2.FlowID].EventId}
|
||||
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
unackedEvents := manager.eventsWithoutAcks.GetEvents()
|
||||
assert.Empty(c, unackedEvents)
|
||||
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
func createManager(t *testing.T, serverAddr string, retryInterval time.Duration) *Manager {
|
||||
t.Helper()
|
||||
|
||||
mockIFace := &mockIFaceMapper{
|
||||
address: wgaddr.Address{
|
||||
Network: netip.MustParsePrefix("192.168.1.1/32"),
|
||||
},
|
||||
isUserspaceBind: true,
|
||||
}
|
||||
|
||||
publicKey := []byte("test-public-key")
|
||||
manager := NewManager(mockIFace, publicKey, nil)
|
||||
manager.retryInterval = retryInterval
|
||||
|
||||
initialConfig := &types.FlowConfig{
|
||||
Enabled: true,
|
||||
URL: fmt.Sprintf("http://%s", serverAddr),
|
||||
TokenPayload: "initial-payload",
|
||||
TokenSignature: "initial-signature",
|
||||
Interval: 500 * time.Millisecond,
|
||||
}
|
||||
|
||||
err := manager.Update(initialConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
return manager
|
||||
}
|
||||
|
||||
func ipAddr(a string) netip.Addr {
|
||||
addr, _ := netip.ParseAddr(a)
|
||||
return addr
|
||||
}
|
||||
@@ -1,334 +0,0 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"net/netip"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var random = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
func TestFlowAggregation(t *testing.T) {
|
||||
var protocols = []types.Protocol{types.ICMP, types.ICMPv6, types.TCP, types.UDP}
|
||||
var tests = []struct {
|
||||
description string
|
||||
addresses [][]netip.Addr
|
||||
dstPort uint16
|
||||
eventTypes []types.Type
|
||||
}{
|
||||
{
|
||||
description: "start and stop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
|
||||
},
|
||||
{
|
||||
description: "start and drop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
|
||||
},
|
||||
{
|
||||
description: "start only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart},
|
||||
},
|
||||
{
|
||||
description: "drop only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeDrop},
|
||||
}}
|
||||
|
||||
for _, protocol := range protocols {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description+" "+protocol.String(), func(t *testing.T) {
|
||||
store := NewAggregatingMemoryStore()
|
||||
store.WindowEnd = time.Now().Add(5 * time.Second)
|
||||
|
||||
allExpected := make([]*types.Event, 0)
|
||||
|
||||
for _, srcAndDst := range tt.addresses {
|
||||
inEvents, expected := generateEvents(srcAndDst[0], srcAndDst[1], tt.dstPort, tt.eventTypes, protocol, types.Ingress, 0, store.WindowStart, store.WindowEnd)
|
||||
for _, e := range inEvents {
|
||||
store.StoreEvent(e)
|
||||
}
|
||||
allExpected = append(allExpected, expected)
|
||||
}
|
||||
|
||||
events := store.GetAggregatedEvents()
|
||||
assert.ElementsMatch(t, events, allExpected)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIcmpEventAggregation(t *testing.T) {
|
||||
var protocols = []types.Protocol{types.ICMP, types.ICMPv6}
|
||||
var icmpTypes = []uint8{1, 2, 3}
|
||||
|
||||
var tests = []struct {
|
||||
description string
|
||||
addresses [][]netip.Addr
|
||||
eventTypes []types.Type
|
||||
}{
|
||||
{
|
||||
description: "start and stop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
|
||||
},
|
||||
{
|
||||
description: "start and drop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
|
||||
},
|
||||
{
|
||||
description: "start only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeStart},
|
||||
},
|
||||
{
|
||||
description: "drop only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeDrop},
|
||||
}}
|
||||
|
||||
for _, protocol := range protocols {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description+" "+protocol.String(), func(t *testing.T) {
|
||||
store := NewAggregatingMemoryStore()
|
||||
store.WindowEnd = time.Now().Add(5 * time.Second)
|
||||
|
||||
allExpected := make([]*types.Event, 0)
|
||||
for _, icmpType := range icmpTypes {
|
||||
events, expected := generateEvents(tt.addresses[0][0], tt.addresses[0][1], 0, tt.eventTypes, protocol, types.Ingress, icmpType, store.WindowStart, store.WindowEnd)
|
||||
for _, e := range events {
|
||||
store.StoreEvent(e)
|
||||
}
|
||||
allExpected = append(allExpected, expected)
|
||||
}
|
||||
aggregatedEvents := store.GetAggregatedEvents()
|
||||
assert.Len(t, aggregatedEvents, len(allExpected))
|
||||
assert.ElementsMatch(t, aggregatedEvents, allExpected)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestFlowAggregationOfUnknownProtocols(t *testing.T) {
|
||||
var tests = []struct {
|
||||
description string
|
||||
addresses [][]netip.Addr
|
||||
dstPort uint16
|
||||
eventTypes []types.Type
|
||||
}{
|
||||
{
|
||||
description: "start and stop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
|
||||
},
|
||||
{
|
||||
description: "start and drop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
|
||||
},
|
||||
{
|
||||
description: "start only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart},
|
||||
},
|
||||
{
|
||||
description: "drop only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeDrop},
|
||||
}}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description+" "+types.ProtocolUnknown.String(), func(t *testing.T) {
|
||||
store := NewAggregatingMemoryStore()
|
||||
store.WindowEnd = time.Now().Add(5 * time.Second)
|
||||
|
||||
allExpected := make([]*types.Event, 0)
|
||||
|
||||
for _, srcAndDst := range tt.addresses {
|
||||
inEvents, expected := generateEventsForUnknownProtocol(srcAndDst[0], srcAndDst[1], tt.dstPort, tt.eventTypes, types.ProtocolUnknown, types.Ingress, store.WindowStart, store.WindowEnd)
|
||||
for _, e := range inEvents {
|
||||
store.StoreEvent(e)
|
||||
}
|
||||
allExpected = append(allExpected, expected...)
|
||||
}
|
||||
|
||||
events := store.GetAggregatedEvents()
|
||||
assert.ElementsMatch(t, events, allExpected)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func ipAddr(a string) netip.Addr {
|
||||
addr, _ := netip.ParseAddr(a)
|
||||
return addr
|
||||
}
|
||||
|
||||
func generateEvents(srcIp, dstIp netip.Addr, dstPort uint16, eventTypes []types.Type, protocol types.Protocol,
|
||||
direction types.Direction, icmpType uint8, windowStart, windowEnd time.Time) ([]*types.Event, *types.Event) {
|
||||
var rxPackets, txPackets, rxBytes, txBytes uint64
|
||||
inEvents := make([]*types.Event, 0)
|
||||
ts := time.Now()
|
||||
flowId := uuid.New()
|
||||
srcPort := uint16(random.Uint32() >> 16)
|
||||
|
||||
for idx, eventType := range eventTypes {
|
||||
e := &types.Event{
|
||||
ID: uuid.New(),
|
||||
Timestamp: ts.Add(time.Duration(idx) * time.Second),
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: eventType,
|
||||
Protocol: protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: random.Uint64(),
|
||||
TxPackets: random.Uint64(),
|
||||
RxBytes: random.Uint64(),
|
||||
TxBytes: random.Uint64(),
|
||||
}}
|
||||
rxBytes += e.RxBytes
|
||||
txBytes += e.TxBytes
|
||||
rxPackets += e.RxPackets
|
||||
txPackets += e.TxPackets
|
||||
inEvents = append(inEvents, e)
|
||||
if protocol == types.ICMP || protocol == types.ICMPv6 {
|
||||
e.ICMPType = icmpType
|
||||
}
|
||||
}
|
||||
|
||||
var start, end, drop uint64
|
||||
for _, eventType := range eventTypes {
|
||||
switch eventType {
|
||||
case types.TypeStart:
|
||||
start += 1
|
||||
case types.TypeDrop:
|
||||
drop += 1
|
||||
case types.TypeEnd:
|
||||
end += 1
|
||||
}
|
||||
}
|
||||
aggregatedEvent := &types.Event{
|
||||
ID: inEvents[0].ID,
|
||||
Timestamp: inEvents[0].Timestamp,
|
||||
WindowStart: windowStart,
|
||||
WindowEnd: windowEnd,
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: types.TypeUnknown,
|
||||
Protocol: inEvents[0].Protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: inEvents[0].Direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: rxPackets,
|
||||
TxPackets: txPackets,
|
||||
RxBytes: rxBytes,
|
||||
TxBytes: txBytes,
|
||||
NumOfStarts: start,
|
||||
NumOfEnds: end,
|
||||
NumOfDrops: drop,
|
||||
}}
|
||||
if protocol == types.ICMP || protocol == types.ICMPv6 {
|
||||
aggregatedEvent.ICMPType = icmpType
|
||||
}
|
||||
|
||||
return inEvents, aggregatedEvent
|
||||
}
|
||||
|
||||
func generateEventsForUnknownProtocol(srcIp, dstIp netip.Addr, dstPort uint16, eventTypes []types.Type, protocol types.Protocol,
|
||||
direction types.Direction, windowStart, windowEnd time.Time) ([]*types.Event, []*types.Event) {
|
||||
inEvents := make([]*types.Event, 0)
|
||||
expectedEvents := make([]*types.Event, 0)
|
||||
|
||||
ts := time.Now()
|
||||
flowId := uuid.New()
|
||||
srcPort := uint16(random.Uint32() >> 16)
|
||||
|
||||
for idx, eventType := range eventTypes {
|
||||
e := &types.Event{
|
||||
ID: uuid.New(),
|
||||
Timestamp: ts.Add(time.Duration(idx) * time.Second),
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: eventType,
|
||||
Protocol: protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: random.Uint64(),
|
||||
TxPackets: random.Uint64(),
|
||||
RxBytes: random.Uint64(),
|
||||
TxBytes: random.Uint64(),
|
||||
}}
|
||||
inEvents = append(inEvents, e)
|
||||
|
||||
var start, end, drop uint64
|
||||
switch eventType {
|
||||
case types.TypeStart:
|
||||
start = 1
|
||||
case types.TypeDrop:
|
||||
drop = 1
|
||||
case types.TypeEnd:
|
||||
end = 1
|
||||
}
|
||||
|
||||
expectedEvents = append(expectedEvents, &types.Event{
|
||||
ID: e.ID,
|
||||
Timestamp: e.Timestamp,
|
||||
WindowStart: windowStart,
|
||||
WindowEnd: windowEnd,
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: types.TypeUnknown,
|
||||
Protocol: e.Protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: e.Direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: e.RxPackets,
|
||||
TxPackets: e.TxPackets,
|
||||
RxBytes: e.RxBytes,
|
||||
TxBytes: e.TxBytes,
|
||||
NumOfStarts: start,
|
||||
NumOfEnds: end,
|
||||
NumOfDrops: drop,
|
||||
}})
|
||||
}
|
||||
|
||||
return inEvents, expectedEvents
|
||||
}
|
||||
@@ -1,15 +1,10 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"maps"
|
||||
"math/rand"
|
||||
v2 "math/rand/v2"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
)
|
||||
|
||||
@@ -24,13 +19,6 @@ type Memory struct {
|
||||
events map[uuid.UUID]*types.Event
|
||||
}
|
||||
|
||||
type AggregatingMemory struct {
|
||||
Memory
|
||||
WindowStart time.Time
|
||||
WindowEnd time.Time
|
||||
rnd *v2.PCG
|
||||
}
|
||||
|
||||
func (m *Memory) StoreEvent(event *types.Event) {
|
||||
m.mux.Lock()
|
||||
defer m.mux.Unlock()
|
||||
@@ -60,92 +48,3 @@ func (m *Memory) DeleteEvents(ids []uuid.UUID) {
|
||||
delete(m.events, id)
|
||||
}
|
||||
}
|
||||
|
||||
func NewAggregatingMemoryStore() *AggregatingMemory {
|
||||
return &AggregatingMemory{WindowStart: time.Now(), Memory: Memory{events: make(map[uuid.UUID]*types.Event)}, rnd: v2.NewPCG(rand.Uint64(), rand.Uint64())}
|
||||
}
|
||||
|
||||
func (am *AggregatingMemory) ResetAggregationWindow() types.FlowEventAggregator {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
now := time.Now()
|
||||
toret := AggregatingMemory{WindowStart: am.WindowStart, WindowEnd: now, Memory: Memory{events: am.events}}
|
||||
|
||||
am.events = make(map[uuid.UUID]*types.Event)
|
||||
am.WindowStart = now
|
||||
|
||||
return &toret
|
||||
}
|
||||
|
||||
type aggregationKey struct {
|
||||
srcAddr netip.Addr
|
||||
destAddr netip.Addr
|
||||
destPort uint16
|
||||
direction int
|
||||
protocol uint8
|
||||
icmpType uint8
|
||||
unique uint64 // used to prevent aggregation on non icmp/udp/tcp events
|
||||
}
|
||||
|
||||
func (am *AggregatingMemory) GetAggregatedEvents() []*types.Event {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
aggregated := make(map[aggregationKey]*types.Event)
|
||||
for _, v := range am.events {
|
||||
lookupKey := aggregationKey{srcAddr: v.SourceIP, destAddr: v.DestIP, destPort: v.DestPort, direction: int(v.Direction), protocol: uint8(v.Protocol), icmpType: v.ICMPType}
|
||||
if _, ok := aggregated[lookupKey]; !ok {
|
||||
event := v.Clone()
|
||||
|
||||
switch event.Type {
|
||||
case types.TypeStart:
|
||||
event.NumOfStarts += 1
|
||||
case types.TypeDrop:
|
||||
event.NumOfDrops += 1
|
||||
case types.TypeEnd:
|
||||
event.NumOfEnds += 1
|
||||
}
|
||||
event.Type = types.TypeUnknown
|
||||
|
||||
// Please note that ICMPCode field isn't propagated by the manager (see flow/proto/flow.pb.go, FlowFields struct)
|
||||
// so the field value in an icmp event in the "aggregated" doesn't matter
|
||||
|
||||
event.WindowStart = am.WindowStart
|
||||
event.WindowEnd = am.WindowEnd
|
||||
|
||||
if event.Protocol != types.ICMP && event.Protocol != types.ICMPv6 && event.Protocol != types.UDP && event.Protocol != types.TCP {
|
||||
lookupKey.unique = am.rnd.Uint64() // to make the lookup key unique so we don't aggregate on it
|
||||
}
|
||||
|
||||
aggregated[lookupKey] = event
|
||||
continue
|
||||
}
|
||||
|
||||
aggregatedEvent := aggregated[lookupKey]
|
||||
if aggregatedEvent.Protocol != types.ICMP && aggregatedEvent.Protocol != types.ICMPv6 && aggregatedEvent.Protocol != types.UDP && aggregatedEvent.Protocol != types.TCP {
|
||||
continue // we don't aggregate this type of events; shouldn't ever get here
|
||||
}
|
||||
|
||||
// track the number of connections, duration?, open and close events?
|
||||
aggregatedEvent.RxBytes += v.RxBytes
|
||||
aggregatedEvent.RxPackets += v.RxPackets
|
||||
aggregatedEvent.TxBytes += v.TxBytes
|
||||
aggregatedEvent.TxPackets += v.TxPackets
|
||||
switch v.Type {
|
||||
case types.TypeStart:
|
||||
aggregatedEvent.NumOfStarts += 1
|
||||
case types.TypeDrop:
|
||||
aggregatedEvent.NumOfDrops += 1
|
||||
case types.TypeEnd:
|
||||
aggregatedEvent.NumOfEnds += 1
|
||||
}
|
||||
if aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0 {
|
||||
aggregatedEvent.Timestamp = v.Timestamp
|
||||
aggregatedEvent.ID = v.ID
|
||||
aggregatedEvent.SourcePort = v.SourcePort
|
||||
}
|
||||
}
|
||||
|
||||
return slices.Collect(maps.Values(aggregated)) // could return an iterator instead here
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package types
|
||||
|
||||
import (
|
||||
"net/netip"
|
||||
"slices"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@@ -70,10 +69,8 @@ const (
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
ID uuid.UUID
|
||||
Timestamp time.Time
|
||||
WindowStart time.Time
|
||||
WindowEnd time.Time
|
||||
ID uuid.UUID
|
||||
Timestamp time.Time
|
||||
EventFields
|
||||
}
|
||||
|
||||
@@ -95,17 +92,6 @@ type EventFields struct {
|
||||
TxPackets uint64
|
||||
RxBytes uint64
|
||||
TxBytes uint64
|
||||
NumOfStarts uint64
|
||||
NumOfEnds uint64
|
||||
NumOfDrops uint64
|
||||
}
|
||||
|
||||
func (e *Event) Clone() *Event {
|
||||
toret := *e
|
||||
toret.RuleID = slices.Clone(e.RuleID)
|
||||
toret.SourceResourceID = slices.Clone(e.SourceResourceID)
|
||||
toret.DestResourceID = slices.Clone(e.DestResourceID)
|
||||
return &toret
|
||||
}
|
||||
|
||||
type FlowConfig struct {
|
||||
@@ -128,15 +114,13 @@ type FlowManager interface {
|
||||
GetLogger() FlowLogger
|
||||
}
|
||||
|
||||
type FlowEventAggregator interface {
|
||||
ResetAggregationWindow() FlowEventAggregator
|
||||
GetAggregatedEvents() []*Event
|
||||
}
|
||||
|
||||
type FlowLogger interface {
|
||||
ResetAggregationWindow() FlowEventAggregator
|
||||
// StoreEvent stores a flow event
|
||||
StoreEvent(flowEvent EventFields)
|
||||
// GetEvents returns all stored events
|
||||
GetEvents() []*Event
|
||||
// DeleteEvents deletes events from the store
|
||||
DeleteEvents([]uuid.UUID)
|
||||
// Close closes the logger
|
||||
Close()
|
||||
// Enable enables the flow logger receiver
|
||||
@@ -156,11 +140,6 @@ type Store interface {
|
||||
Close()
|
||||
}
|
||||
|
||||
type AggregatingStore interface {
|
||||
FlowEventAggregator
|
||||
Store
|
||||
}
|
||||
|
||||
// ConnTracker defines the interface for connection tracking functionality
|
||||
type ConnTracker interface {
|
||||
// Start begins tracking connections by listening for conntrack events.
|
||||
|
||||
@@ -418,14 +418,7 @@ func newServiceClient(args *newServiceClientArgs) *serviceClient {
|
||||
case args.showProfiles:
|
||||
s.showProfilesUI()
|
||||
case args.showQuickActions:
|
||||
// Suppress the on-boot Quick Actions popup when the daemon
|
||||
// reports DisableAutoConnect=true — that flag carries both the
|
||||
// user's "Connect on Startup = off" preference AND any MDM-
|
||||
// enforced override (applyMDMPolicy writes the policy value
|
||||
// into the same Config field). See netbirdio/netbird#5744.
|
||||
if !s.disableAutoConnectFromDaemon() {
|
||||
s.showQuickActionsUI()
|
||||
}
|
||||
s.showQuickActionsUI()
|
||||
case args.showUpdate:
|
||||
s.showUpdateProgress(ctx, args.showUpdateVersion)
|
||||
}
|
||||
@@ -1345,40 +1338,6 @@ func (s *serviceClient) getFeatures() (*proto.GetFeaturesResponse, error) {
|
||||
return features, nil
|
||||
}
|
||||
|
||||
// disableAutoConnectFromDaemon returns true when the daemon reports
|
||||
// the active profile has DisableAutoConnect=true. Used by the
|
||||
// --quick-actions startup path to suppress the on-boot popup when the
|
||||
// user (or an MDM admin) opted out of auto-connecting; both cases
|
||||
// converge on the same Config field because applyMDMPolicy writes the
|
||||
// policy value into it. Returns false on any RPC / lookup failure so a
|
||||
// daemon hiccup does not silently swallow the popup.
|
||||
func (s *serviceClient) disableAutoConnectFromDaemon() bool {
|
||||
activeProf, err := s.profileManager.GetActiveProfile()
|
||||
if err != nil {
|
||||
log.Warnf("disableAutoConnectFromDaemon: get active profile: %v", err)
|
||||
return false
|
||||
}
|
||||
currUser, err := user.Current()
|
||||
if err != nil {
|
||||
log.Warnf("disableAutoConnectFromDaemon: get current user: %v", err)
|
||||
return false
|
||||
}
|
||||
conn, err := s.getSrvClient(failFastTimeout)
|
||||
if err != nil {
|
||||
log.Warnf("disableAutoConnectFromDaemon: get daemon client: %v", err)
|
||||
return false
|
||||
}
|
||||
srvCfg, err := conn.GetConfig(s.ctx, &proto.GetConfigRequest{
|
||||
ProfileName: activeProf.ID.String(),
|
||||
Username: currUser.Username,
|
||||
})
|
||||
if err != nil {
|
||||
log.Warnf("disableAutoConnectFromDaemon: GetConfig RPC: %v", err)
|
||||
return false
|
||||
}
|
||||
return srvCfg.GetDisableAutoConnect()
|
||||
}
|
||||
|
||||
// getSrvConfig from the service to show it in the settings window.
|
||||
func (s *serviceClient) getSrvConfig() {
|
||||
s.managementURL = profilemanager.DefaultManagementURL
|
||||
|
||||
@@ -134,11 +134,9 @@ type FlowEvent struct {
|
||||
// When the event occurred
|
||||
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
// Public key of the sending peer
|
||||
PublicKey []byte `protobuf:"bytes,3,opt,name=public_key,json=publicKey,proto3" json:"public_key,omitempty"`
|
||||
FlowFields *FlowFields `protobuf:"bytes,4,opt,name=flow_fields,json=flowFields,proto3" json:"flow_fields,omitempty"`
|
||||
IsInitiator bool `protobuf:"varint,5,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
|
||||
WindowStart *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=window_start,json=windowStart,proto3" json:"window_start,omitempty"`
|
||||
WindowEnd *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=window_end,json=windowEnd,proto3" json:"window_end,omitempty"`
|
||||
PublicKey []byte `protobuf:"bytes,3,opt,name=public_key,json=publicKey,proto3" json:"public_key,omitempty"`
|
||||
FlowFields *FlowFields `protobuf:"bytes,4,opt,name=flow_fields,json=flowFields,proto3" json:"flow_fields,omitempty"`
|
||||
IsInitiator bool `protobuf:"varint,5,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
|
||||
}
|
||||
|
||||
func (x *FlowEvent) Reset() {
|
||||
@@ -208,20 +206,6 @@ func (x *FlowEvent) GetIsInitiator() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *FlowEvent) GetWindowStart() *timestamppb.Timestamp {
|
||||
if x != nil {
|
||||
return x.WindowStart
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *FlowEvent) GetWindowEnd() *timestamppb.Timestamp {
|
||||
if x != nil {
|
||||
return x.WindowEnd
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type FlowEventAck struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
@@ -300,6 +284,7 @@ type FlowFields struct {
|
||||
// Layer 4 -specific information
|
||||
//
|
||||
// Types that are assignable to ConnectionInfo:
|
||||
//
|
||||
// *FlowFields_PortInfo
|
||||
// *FlowFields_IcmpInfo
|
||||
ConnectionInfo isFlowFields_ConnectionInfo `protobuf_oneof:"connection_info"`
|
||||
@@ -312,9 +297,6 @@ type FlowFields struct {
|
||||
// Resource ID
|
||||
SourceResourceId []byte `protobuf:"bytes,14,opt,name=source_resource_id,json=sourceResourceId,proto3" json:"source_resource_id,omitempty"`
|
||||
DestResourceId []byte `protobuf:"bytes,15,opt,name=dest_resource_id,json=destResourceId,proto3" json:"dest_resource_id,omitempty"`
|
||||
NumOfStarts uint64 `protobuf:"varint,16,opt,name=num_of_starts,json=numOfStarts,proto3" json:"num_of_starts,omitempty"`
|
||||
NumOfEnds uint64 `protobuf:"varint,17,opt,name=num_of_ends,json=numOfEnds,proto3" json:"num_of_ends,omitempty"`
|
||||
NumOfDrops uint64 `protobuf:"varint,18,opt,name=num_of_drops,json=numOfDrops,proto3" json:"num_of_drops,omitempty"`
|
||||
}
|
||||
|
||||
func (x *FlowFields) Reset() {
|
||||
@@ -461,27 +443,6 @@ func (x *FlowFields) GetDestResourceId() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *FlowFields) GetNumOfStarts() uint64 {
|
||||
if x != nil {
|
||||
return x.NumOfStarts
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *FlowFields) GetNumOfEnds() uint64 {
|
||||
if x != nil {
|
||||
return x.NumOfEnds
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *FlowFields) GetNumOfDrops() uint64 {
|
||||
if x != nil {
|
||||
return x.NumOfDrops
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type isFlowFields_ConnectionInfo interface {
|
||||
isFlowFields_ConnectionInfo()
|
||||
}
|
||||
@@ -618,7 +579,7 @@ var file_flow_proto_rawDesc = []byte{
|
||||
0x0a, 0x0a, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x22, 0xce, 0x02, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x6f, 0x74, 0x6f, 0x22, 0xd4, 0x01, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x74, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09,
|
||||
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
|
||||
@@ -631,59 +592,45 @@ var file_flow_proto_rawDesc = []byte{
|
||||
0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x52, 0x0a, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e,
|
||||
0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69,
|
||||
0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x3d, 0x0a, 0x0c, 0x77, 0x69,
|
||||
0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b,
|
||||
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
|
||||
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x77, 0x69,
|
||||
0x6e, 0x64, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x77, 0x69, 0x6e,
|
||||
0x64, 0x6f, 0x77, 0x5f, 0x65, 0x6e, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
|
||||
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
|
||||
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f,
|
||||
0x77, 0x45, 0x6e, 0x64, 0x22, 0x4b, 0x0a, 0x0c, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64,
|
||||
0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12,
|
||||
0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x02,
|
||||
0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f,
|
||||
0x72, 0x22, 0x82, 0x05, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73,
|
||||
0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
|
||||
0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x74, 0x79, 0x70,
|
||||
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54,
|
||||
0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x72, 0x75, 0x6c,
|
||||
0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x72, 0x75, 0x6c, 0x65,
|
||||
0x49, 0x64, 0x12, 0x2d, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18,
|
||||
0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x44, 0x69, 0x72,
|
||||
0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x05, 0x20,
|
||||
0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x1b, 0x0a,
|
||||
0x09, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x70, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c,
|
||||
0x52, 0x08, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x70, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x65,
|
||||
0x73, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64, 0x65, 0x73,
|
||||
0x74, 0x49, 0x70, 0x12, 0x2d, 0x0a, 0x09, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x69, 0x6e, 0x66, 0x6f,
|
||||
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x50, 0x6f,
|
||||
0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x70, 0x6f, 0x72, 0x74, 0x49, 0x6e,
|
||||
0x66, 0x6f, 0x12, 0x2d, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18,
|
||||
0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x43, 0x4d,
|
||||
0x50, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x69, 0x63, 0x6d, 0x70, 0x49, 0x6e, 0x66,
|
||||
0x6f, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x78, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x18,
|
||||
0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x78, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73,
|
||||
0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x78, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x0b,
|
||||
0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x78, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x12,
|
||||
0x19, 0x0a, 0x08, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28,
|
||||
0x04, 0x52, 0x07, 0x72, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x74, 0x78,
|
||||
0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x78,
|
||||
0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f,
|
||||
0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28,
|
||||
0x0c, 0x52, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63,
|
||||
0x65, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x10, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x72, 0x65, 0x73, 0x6f,
|
||||
0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x64,
|
||||
0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x22, 0x0a,
|
||||
0x0d, 0x6e, 0x75, 0x6d, 0x5f, 0x6f, 0x66, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x73, 0x18, 0x10,
|
||||
0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x6e, 0x75, 0x6d, 0x4f, 0x66, 0x53, 0x74, 0x61, 0x72, 0x74,
|
||||
0x73, 0x12, 0x1e, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x6f, 0x66, 0x5f, 0x65, 0x6e, 0x64, 0x73,
|
||||
0x18, 0x11, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6e, 0x75, 0x6d, 0x4f, 0x66, 0x45, 0x6e, 0x64,
|
||||
0x73, 0x12, 0x20, 0x0a, 0x0c, 0x6e, 0x75, 0x6d, 0x5f, 0x6f, 0x66, 0x5f, 0x64, 0x72, 0x6f, 0x70,
|
||||
0x73, 0x18, 0x12, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x4f, 0x66, 0x44, 0x72,
|
||||
0x6f, 0x70, 0x73, 0x42, 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f,
|
||||
0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x4b, 0x0a, 0x0c, 0x46, 0x6c,
|
||||
0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76,
|
||||
0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76,
|
||||
0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69,
|
||||
0x61, 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x49, 0x6e,
|
||||
0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x9c, 0x04, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77,
|
||||
0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69,
|
||||
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12,
|
||||
0x1e, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e,
|
||||
0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12,
|
||||
0x17, 0x0a, 0x07, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c,
|
||||
0x52, 0x06, 0x72, 0x75, 0x6c, 0x65, 0x49, 0x64, 0x12, 0x2d, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65,
|
||||
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x2e, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x64, 0x69,
|
||||
0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x63, 0x6f, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x63, 0x6f, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x70,
|
||||
0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x70,
|
||||
0x12, 0x17, 0x0a, 0x07, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28,
|
||||
0x0c, 0x52, 0x06, 0x64, 0x65, 0x73, 0x74, 0x49, 0x70, 0x12, 0x2d, 0x0a, 0x09, 0x70, 0x6f, 0x72,
|
||||
0x74, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66,
|
||||
0x6c, 0x6f, 0x77, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08,
|
||||
0x70, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70,
|
||||
0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x2e, 0x49, 0x43, 0x4d, 0x50, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x69,
|
||||
0x63, 0x6d, 0x70, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x78, 0x5f, 0x70, 0x61,
|
||||
0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x78, 0x50,
|
||||
0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x78, 0x5f, 0x70, 0x61, 0x63,
|
||||
0x6b, 0x65, 0x74, 0x73, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x78, 0x50, 0x61,
|
||||
0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65,
|
||||
0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x72, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73,
|
||||
0x12, 0x19, 0x0a, 0x08, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01,
|
||||
0x28, 0x04, 0x52, 0x07, 0x74, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x73,
|
||||
0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69,
|
||||
0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52,
|
||||
0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x10, 0x64, 0x65, 0x73,
|
||||
0x74, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0f, 0x20,
|
||||
0x01, 0x28, 0x0c, 0x52, 0x0e, 0x64, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63,
|
||||
0x65, 0x49, 0x64, 0x42, 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x22, 0x48, 0x0a, 0x08, 0x50, 0x6f, 0x72, 0x74, 0x49, 0x6e,
|
||||
0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x6f, 0x72,
|
||||
0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50,
|
||||
@@ -736,19 +683,17 @@ var file_flow_proto_goTypes = []interface{}{
|
||||
var file_flow_proto_depIdxs = []int32{
|
||||
7, // 0: flow.FlowEvent.timestamp:type_name -> google.protobuf.Timestamp
|
||||
4, // 1: flow.FlowEvent.flow_fields:type_name -> flow.FlowFields
|
||||
7, // 2: flow.FlowEvent.window_start:type_name -> google.protobuf.Timestamp
|
||||
7, // 3: flow.FlowEvent.window_end:type_name -> google.protobuf.Timestamp
|
||||
0, // 4: flow.FlowFields.type:type_name -> flow.Type
|
||||
1, // 5: flow.FlowFields.direction:type_name -> flow.Direction
|
||||
5, // 6: flow.FlowFields.port_info:type_name -> flow.PortInfo
|
||||
6, // 7: flow.FlowFields.icmp_info:type_name -> flow.ICMPInfo
|
||||
2, // 8: flow.FlowService.Events:input_type -> flow.FlowEvent
|
||||
3, // 9: flow.FlowService.Events:output_type -> flow.FlowEventAck
|
||||
9, // [9:10] is the sub-list for method output_type
|
||||
8, // [8:9] is the sub-list for method input_type
|
||||
8, // [8:8] is the sub-list for extension type_name
|
||||
8, // [8:8] is the sub-list for extension extendee
|
||||
0, // [0:8] is the sub-list for field type_name
|
||||
0, // 2: flow.FlowFields.type:type_name -> flow.Type
|
||||
1, // 3: flow.FlowFields.direction:type_name -> flow.Direction
|
||||
5, // 4: flow.FlowFields.port_info:type_name -> flow.PortInfo
|
||||
6, // 5: flow.FlowFields.icmp_info:type_name -> flow.ICMPInfo
|
||||
2, // 6: flow.FlowService.Events:input_type -> flow.FlowEvent
|
||||
3, // 7: flow.FlowService.Events:output_type -> flow.FlowEventAck
|
||||
7, // [7:8] is the sub-list for method output_type
|
||||
6, // [6:7] is the sub-list for method input_type
|
||||
6, // [6:6] is the sub-list for extension type_name
|
||||
6, // [6:6] is the sub-list for extension extendee
|
||||
0, // [0:6] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_flow_proto_init() }
|
||||
|
||||
@@ -24,9 +24,6 @@ message FlowEvent {
|
||||
FlowFields flow_fields = 4;
|
||||
|
||||
bool isInitiator = 5;
|
||||
|
||||
google.protobuf.Timestamp window_start = 6;
|
||||
google.protobuf.Timestamp window_end = 7;
|
||||
}
|
||||
|
||||
message FlowEventAck {
|
||||
@@ -78,9 +75,6 @@ message FlowFields {
|
||||
bytes source_resource_id = 14;
|
||||
bytes dest_resource_id = 15;
|
||||
|
||||
uint64 num_of_starts = 16;
|
||||
uint64 num_of_ends = 17;
|
||||
uint64 num_of_drops = 18;
|
||||
}
|
||||
|
||||
// Flow event types
|
||||
|
||||
@@ -1,8 +1,4 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.6.1
|
||||
// - protoc v3.21.9
|
||||
// source: flow.proto
|
||||
|
||||
package proto
|
||||
|
||||
@@ -15,19 +11,15 @@ import (
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.64.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion9
|
||||
|
||||
const (
|
||||
FlowService_Events_FullMethodName = "/flow.FlowService/Events"
|
||||
)
|
||||
// Requires gRPC-Go v1.32.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion7
|
||||
|
||||
// FlowServiceClient is the client API for FlowService service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type FlowServiceClient interface {
|
||||
// Client to receiver streams of events and acknowledgements
|
||||
Events(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[FlowEvent, FlowEventAck], error)
|
||||
Events(ctx context.Context, opts ...grpc.CallOption) (FlowService_EventsClient, error)
|
||||
}
|
||||
|
||||
type flowServiceClient struct {
|
||||
@@ -38,40 +30,54 @@ func NewFlowServiceClient(cc grpc.ClientConnInterface) FlowServiceClient {
|
||||
return &flowServiceClient{cc}
|
||||
}
|
||||
|
||||
func (c *flowServiceClient) Events(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[FlowEvent, FlowEventAck], error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
stream, err := c.cc.NewStream(ctx, &FlowService_ServiceDesc.Streams[0], FlowService_Events_FullMethodName, cOpts...)
|
||||
func (c *flowServiceClient) Events(ctx context.Context, opts ...grpc.CallOption) (FlowService_EventsClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, &FlowService_ServiceDesc.Streams[0], "/flow.FlowService/Events", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &grpc.GenericClientStream[FlowEvent, FlowEventAck]{ClientStream: stream}
|
||||
x := &flowServiceEventsClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type FlowService_EventsClient = grpc.BidiStreamingClient[FlowEvent, FlowEventAck]
|
||||
type FlowService_EventsClient interface {
|
||||
Send(*FlowEvent) error
|
||||
Recv() (*FlowEventAck, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type flowServiceEventsClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsClient) Send(m *FlowEvent) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsClient) Recv() (*FlowEventAck, error) {
|
||||
m := new(FlowEventAck)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// FlowServiceServer is the server API for FlowService service.
|
||||
// All implementations must embed UnimplementedFlowServiceServer
|
||||
// for forward compatibility.
|
||||
// for forward compatibility
|
||||
type FlowServiceServer interface {
|
||||
// Client to receiver streams of events and acknowledgements
|
||||
Events(grpc.BidiStreamingServer[FlowEvent, FlowEventAck]) error
|
||||
Events(FlowService_EventsServer) error
|
||||
mustEmbedUnimplementedFlowServiceServer()
|
||||
}
|
||||
|
||||
// UnimplementedFlowServiceServer must be embedded to have
|
||||
// forward compatible implementations.
|
||||
//
|
||||
// NOTE: this should be embedded by value instead of pointer to avoid a nil
|
||||
// pointer dereference when methods are called.
|
||||
type UnimplementedFlowServiceServer struct{}
|
||||
// UnimplementedFlowServiceServer must be embedded to have forward compatible implementations.
|
||||
type UnimplementedFlowServiceServer struct {
|
||||
}
|
||||
|
||||
func (UnimplementedFlowServiceServer) Events(grpc.BidiStreamingServer[FlowEvent, FlowEventAck]) error {
|
||||
return status.Error(codes.Unimplemented, "method Events not implemented")
|
||||
func (UnimplementedFlowServiceServer) Events(FlowService_EventsServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Events not implemented")
|
||||
}
|
||||
func (UnimplementedFlowServiceServer) mustEmbedUnimplementedFlowServiceServer() {}
|
||||
func (UnimplementedFlowServiceServer) testEmbeddedByValue() {}
|
||||
|
||||
// UnsafeFlowServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to FlowServiceServer will
|
||||
@@ -81,22 +87,34 @@ type UnsafeFlowServiceServer interface {
|
||||
}
|
||||
|
||||
func RegisterFlowServiceServer(s grpc.ServiceRegistrar, srv FlowServiceServer) {
|
||||
// If the following call panics, it indicates UnimplementedFlowServiceServer was
|
||||
// embedded by pointer and is nil. This will cause panics if an
|
||||
// unimplemented method is ever invoked, so we test this at initialization
|
||||
// time to prevent it from happening at runtime later due to I/O.
|
||||
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
|
||||
t.testEmbeddedByValue()
|
||||
}
|
||||
s.RegisterService(&FlowService_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _FlowService_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(FlowServiceServer).Events(&grpc.GenericServerStream[FlowEvent, FlowEventAck]{ServerStream: stream})
|
||||
return srv.(FlowServiceServer).Events(&flowServiceEventsServer{stream})
|
||||
}
|
||||
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type FlowService_EventsServer = grpc.BidiStreamingServer[FlowEvent, FlowEventAck]
|
||||
type FlowService_EventsServer interface {
|
||||
Send(*FlowEventAck) error
|
||||
Recv() (*FlowEvent, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type flowServiceEventsServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsServer) Send(m *FlowEventAck) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsServer) Recv() (*FlowEvent, error) {
|
||||
m := new(FlowEvent)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// FlowService_ServiceDesc is the grpc.ServiceDesc for FlowService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
|
||||
@@ -497,7 +497,7 @@ func (c *Controller) BufferUpdateAffectedPeers(ctx context.Context, accountID st
|
||||
c.accountManagerMetrics.CountUpdateAccountPeersTriggered(string(reason.Resource), string(reason.Operation))
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Tracef("buffer updating %d affected peers for account %s from %s with reason %s/%s", len(peerIDs), accountID, util.GetCallerName(), reason.Operation, reason.Resource)
|
||||
log.WithContext(ctx).Tracef("buffer updating %d affected peers for account %s from %s", len(peerIDs), accountID, util.GetCallerName())
|
||||
|
||||
bufUpd, _ := c.affectedPeerUpdateLocks.LoadOrStore(accountID, &bufferAffectedUpdate{
|
||||
peerIDs: make(map[string]struct{}),
|
||||
|
||||
@@ -107,9 +107,7 @@ func TestAffectedPeers_DependencyCoverageMatrix(t *testing.T) {
|
||||
affected := resolveAffected(t, s.manager.Store, s.accountID, change)
|
||||
|
||||
assert.ElementsMatch(t, affected, mustContain, "expected peer to be affected")
|
||||
for _, peerID := range mustExclude {
|
||||
assert.NotContains(t, affected, peerID, "peer must not be affected")
|
||||
}
|
||||
assert.NotContains(t, affected, mustExclude, "peer must not be affected")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -519,7 +519,7 @@ func (r *resolver) appendPoliciesForPostureChecks(policies []*types.Policy, post
|
||||
}
|
||||
ids := toSet(postureCheckIDs)
|
||||
for _, policy := range r.policies() {
|
||||
if !policyReferencesPostureChecks(policy, ids) || !policy.Enabled {
|
||||
if !policyReferencesPostureChecks(policy, ids) {
|
||||
continue
|
||||
}
|
||||
log.WithContext(r.ctx).Tracef("appendPoliciesForPostureChecks: policy %s (%s) references changed posture checks %v -> both-sides policy",
|
||||
@@ -712,70 +712,33 @@ func (r *resolver) foldPolicySourcesForResources(resourceIDs map[string]struct{}
|
||||
}
|
||||
}
|
||||
|
||||
// collectFromRoutes folds, per matched route, the OPPOSITE side(s) fully and the
|
||||
// matched side's own groups only on a whole-group change (outputGroups). A route has
|
||||
// three peer sides — routing (Peer/PeerGroups), consumer (Groups) and ACL
|
||||
// (AccessControlGroups) — that each refresh the others; the changed side's own group
|
||||
// folds its siblings only when the group itself changed, never on a one-peer move.
|
||||
func (r *resolver) collectFromRoutes() {
|
||||
for _, rt := range r.snap.routes {
|
||||
if !rt.Enabled {
|
||||
continue // disabled routes route to nobody; skip existing account data
|
||||
}
|
||||
routing := anyInSet(rt.PeerGroups, r.linkGroups) || (rt.Peer != "" && isInSet(rt.Peer, r.changedPeers))
|
||||
consumer := anyInSet(rt.Groups, r.linkGroups)
|
||||
acl := anyInSet(rt.AccessControlGroups, r.linkGroups)
|
||||
if !routing && !consumer && !acl {
|
||||
matchedByGroup := anyInSet(rt.Groups, r.linkGroups) || anyInSet(rt.PeerGroups, r.linkGroups) || anyInSet(rt.AccessControlGroups, r.linkGroups)
|
||||
matchedByPeer := rt.Peer != "" && len(r.changedPeers) > 0 && isInSet(rt.Peer, r.changedPeers)
|
||||
if !matchedByGroup && !matchedByPeer {
|
||||
continue
|
||||
}
|
||||
log.WithContext(r.ctx).Tracef("collectFromRoutes: route %s matched (routing=%t consumer=%t acl=%t) -> folding opposite sides; own side gated on outputGroups",
|
||||
rt.ID, routing, consumer, acl)
|
||||
r.foldRouteSide(rt.PeerGroups, routing)
|
||||
r.foldRouteSide(rt.Groups, consumer)
|
||||
r.foldRouteSide(rt.AccessControlGroups, acl)
|
||||
// The single routing Peer folds when the routing side is the OPPOSITE of the
|
||||
// match (consumer/acl need it), or when that very peer is the change.
|
||||
if rt.Peer != "" && (consumer || acl || isInSet(rt.Peer, r.changedPeers)) {
|
||||
log.WithContext(r.ctx).Tracef("collectFromRoutes: route %s matched (byGroup=%t byPeer=%t) -> folding groups=%v peerGroups=%v accessControlGroups=%v peer=%q",
|
||||
rt.ID, matchedByGroup, matchedByPeer, rt.Groups, rt.PeerGroups, rt.AccessControlGroups, rt.Peer)
|
||||
addAll(r.affectedGroups, rt.Groups, rt.PeerGroups, rt.AccessControlGroups)
|
||||
if rt.Peer != "" {
|
||||
r.affectedPeers[rt.Peer] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// foldRouteSide folds a route side: when this side is the one that matched, fold its
|
||||
// groups only on a whole-group change (outputGroups) so siblings of a single moved
|
||||
// peer stay put; otherwise it is an opposite side and folds fully.
|
||||
func (r *resolver) foldRouteSide(groups []string, matchedHere bool) {
|
||||
if matchedHere {
|
||||
r.foldOutputGroups(groups)
|
||||
return
|
||||
}
|
||||
addAll(r.affectedGroups, groups)
|
||||
}
|
||||
|
||||
// foldOutputGroups folds only the groups that the caller reported as wholly changed
|
||||
// (outputGroups). Used for a matched object's OWN side, where a peer-seeded or
|
||||
// link-only group must not pull in its siblings.
|
||||
func (r *resolver) foldOutputGroups(groups ...[]string) {
|
||||
for _, gs := range groups {
|
||||
for _, gID := range gs {
|
||||
if _, ok := r.outputGroups[gID]; ok {
|
||||
r.affectedGroups[gID] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *resolver) collectFromNameServers() {
|
||||
if len(r.linkGroups) == 0 {
|
||||
return
|
||||
}
|
||||
for _, ns := range r.snap.nsGroups {
|
||||
if anyInSet(ns.Groups, r.linkGroups) {
|
||||
// A nameserver group has no opposite side: a peer's DNS config depends only
|
||||
// on its own membership, so a one-peer move refreshes that peer alone (folded
|
||||
// elsewhere). Fold the referenced groups only on a whole-group change.
|
||||
log.WithContext(r.ctx).Tracef("collectFromNameServers: nameserver group %s references a linked group -> folding its groups %v (outputGroups only)", ns.ID, ns.Groups)
|
||||
r.foldOutputGroups(ns.Groups)
|
||||
log.WithContext(r.ctx).Tracef("collectFromNameServers: nameserver group %s references a changed group -> folding its groups %v", ns.ID, ns.Groups)
|
||||
addAll(r.affectedGroups, ns.Groups)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -803,12 +766,9 @@ func (r *resolver) collectFromNetworkRouters() {
|
||||
if !matchedByGroup && !matchedByPeer {
|
||||
continue
|
||||
}
|
||||
log.WithContext(r.ctx).Tracef("collectFromNetworkRouters: router %s on network %s matched (byGroup=%t byPeer=%t) -> folding its peerGroups=%v peer=%q (own groups on outputGroups) + sources reaching network resources",
|
||||
log.WithContext(r.ctx).Tracef("collectFromNetworkRouters: router %s on network %s matched (byGroup=%t byPeer=%t) -> folding its peerGroups=%v peer=%q + sources reaching network resources",
|
||||
router.ID, router.NetworkID, matchedByGroup, matchedByPeer, router.PeerGroups, router.Peer)
|
||||
// The backing PeerGroups are the matched (own) side: fold them only on a
|
||||
// whole-group change so a one-peer move does not wake sibling backing peers. The
|
||||
// opposite side (policy sources reaching the network) is folded below.
|
||||
r.foldOutputGroups(router.PeerGroups)
|
||||
addAll(r.affectedGroups, router.PeerGroups)
|
||||
if router.Peer != "" {
|
||||
r.affectedPeers[router.Peer] = struct{}{}
|
||||
}
|
||||
@@ -839,7 +799,7 @@ func (r *resolver) collectFromProxyServices() {
|
||||
if !matchedByPeer && !matchedByAccessGroup {
|
||||
continue
|
||||
}
|
||||
log.WithContext(r.ctx).Tracef("collectFromProxyServices: service %s (cluster=%s) matched (byProxyOrTargetPeer=%t byAccessGroup=%t) -> folding %d proxy peers, peer targets; access groups %v on outputGroups only",
|
||||
log.WithContext(r.ctx).Tracef("collectFromProxyServices: service %s (cluster=%s) matched (byProxyOrTargetPeer=%t byAccessGroup=%t) -> folding %d proxy peers, peer targets and access groups %v",
|
||||
svc.ID, svc.ProxyCluster, matchedByPeer, matchedByAccessGroup, len(proxyPeers), svc.AccessGroups)
|
||||
for _, pid := range proxyPeers {
|
||||
r.affectedPeers[pid] = struct{}{}
|
||||
@@ -852,10 +812,7 @@ func (r *resolver) collectFromProxyServices() {
|
||||
r.affectedPeers[target.TargetId] = struct{}{}
|
||||
}
|
||||
}
|
||||
// AccessGroups are the matched (own) side with no opposite to fold: a member's
|
||||
// proxy access is self-contained, so a one-peer move refreshes that peer alone.
|
||||
// Fold the groups only on a whole-group change.
|
||||
r.foldOutputGroups(svc.AccessGroups)
|
||||
addAll(r.affectedGroups, svc.AccessGroups)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -209,14 +209,14 @@ func (am *DefaultAccountManager) resolvePeerLocation(ctx context.Context, peer *
|
||||
if am.geo == nil || realIP == nil {
|
||||
return nil
|
||||
}
|
||||
if peer.Location.ConnectionIP != nil && peer.Location.ConnectionIP.Equal(realIP) {
|
||||
return nil
|
||||
}
|
||||
location, err := am.geo.Lookup(realIP)
|
||||
if err != nil {
|
||||
log.WithContext(ctx).Warnf("failed to get location for peer %s realip: [%s]: %v", peer.ID, realIP.String(), err)
|
||||
return nil
|
||||
}
|
||||
if peer.Location.ConnectionIP != nil && peer.Location.ConnectionIP.Equal(realIP) && peer.Location.GeoNameID == location.City.GeonameID {
|
||||
return nil
|
||||
}
|
||||
return &nbpeer.Location{
|
||||
ConnectionIP: realIP,
|
||||
CountryCode: location.Country.ISOCode,
|
||||
@@ -1052,7 +1052,7 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
|
||||
}
|
||||
|
||||
metaDiffAffectsPosture := posture.AffectsPosture(ctx, &metaDiff, resPostureChecks)
|
||||
if requiresPeerUpdate(ctx, isStatusChanged, sync.UpdateAccountPeers, ipv6CapabilityChanged, metaDiffAffectsPosture, metaDiff.VersionChanged(), metaDiff.HostnameChanged()) {
|
||||
if isStatusChanged || sync.UpdateAccountPeers || ipv6CapabilityChanged || metaDiffAffectsPosture || metaDiff.VersionChanged() || metaDiff.HostnameChanged() {
|
||||
changedPeerIDs := []string{peer.ID}
|
||||
affectedPeerIDs := am.syncPeerAffectedPeers(ctx, accountID, peer.ID, nmap, peerNotValid, metaDiffAffectsPosture)
|
||||
if err = am.networkMapController.OnPeersUpdated(ctx, accountID, changedPeerIDs, affectedPeerIDs); err != nil {
|
||||
@@ -1063,29 +1063,6 @@ func (am *DefaultAccountManager) SyncPeer(ctx context.Context, sync types.PeerSy
|
||||
return peer, nmap, resPostureChecks, dnsFwdPort, nil
|
||||
}
|
||||
|
||||
func requiresPeerUpdate(ctx context.Context, isStatusChanged, updateAccountPeers, ipv6CapabilityChanged, metaDiffAffectsPosture, versionChanged, hostname bool) bool {
|
||||
var reason string
|
||||
switch {
|
||||
case isStatusChanged:
|
||||
reason = "status changed"
|
||||
case updateAccountPeers:
|
||||
reason = "update account peers"
|
||||
case ipv6CapabilityChanged:
|
||||
reason = "ipv6 capability changed"
|
||||
case metaDiffAffectsPosture:
|
||||
reason = "meta diff affects posture"
|
||||
case versionChanged:
|
||||
reason = "version changed"
|
||||
case hostname:
|
||||
reason = "hostname changed"
|
||||
default:
|
||||
return false
|
||||
}
|
||||
|
||||
log.WithContext(ctx).Tracef("peer update required: %s", reason)
|
||||
return true
|
||||
}
|
||||
|
||||
// syncPeerAffectedPeers resolves the peers affected by a SyncPeer change. The
|
||||
// peer's own validated network map is bidirectional for policy and routing
|
||||
// reachability, so when the peer stays valid and no source-posture gate is in
|
||||
|
||||
@@ -49,7 +49,6 @@ import (
|
||||
|
||||
nbdns "github.com/netbirdio/netbird/dns"
|
||||
"github.com/netbirdio/netbird/management/server/activity"
|
||||
"github.com/netbirdio/netbird/management/server/geolocation"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/posture"
|
||||
"github.com/netbirdio/netbird/management/server/store"
|
||||
@@ -2894,141 +2893,3 @@ func TestUpdatePeer_DnsLabelUniqueName(t *testing.T) {
|
||||
require.NoError(t, err, "renaming to unique FQDN should succeed")
|
||||
assert.Equal(t, "api-server", updated.DNSLabel, "DNS label should be first label of FQDN")
|
||||
}
|
||||
|
||||
// fakeGeo is a configurable geolocation.Geolocation implementation for tests. It
|
||||
// returns a record built from the configured city geoname id, or an error when set.
|
||||
type fakeGeo struct {
|
||||
geoNameID uint
|
||||
isoCode string
|
||||
cityName string
|
||||
err error
|
||||
}
|
||||
|
||||
func (g *fakeGeo) Lookup(net.IP) (*geolocation.Record, error) {
|
||||
if g.err != nil {
|
||||
return nil, g.err
|
||||
}
|
||||
record := &geolocation.Record{}
|
||||
record.City.GeonameID = g.geoNameID
|
||||
record.City.Names.En = g.cityName
|
||||
record.Country.ISOCode = g.isoCode
|
||||
return record, nil
|
||||
}
|
||||
|
||||
func (g *fakeGeo) GetAllCountries() ([]geolocation.Country, error) { return nil, nil }
|
||||
|
||||
func (g *fakeGeo) GetCitiesByCountry(string) ([]geolocation.City, error) { return nil, nil }
|
||||
|
||||
func (g *fakeGeo) Stop() error { return nil }
|
||||
|
||||
func TestResolvePeerLocation(t *testing.T) {
|
||||
realIP := net.ParseIP("203.0.113.10")
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
geo geolocation.Geolocation
|
||||
peer *nbpeer.Peer
|
||||
realIP net.IP
|
||||
want *nbpeer.Location
|
||||
wantNil bool
|
||||
}{
|
||||
{
|
||||
name: "no geo configured returns nil",
|
||||
geo: nil,
|
||||
peer: &nbpeer.Peer{ID: "p1"},
|
||||
realIP: realIP,
|
||||
wantNil: true,
|
||||
},
|
||||
{
|
||||
name: "nil real IP returns nil",
|
||||
geo: &fakeGeo{geoNameID: 100},
|
||||
peer: &nbpeer.Peer{ID: "p1"},
|
||||
realIP: nil,
|
||||
wantNil: true,
|
||||
},
|
||||
{
|
||||
name: "lookup error returns nil",
|
||||
geo: &fakeGeo{err: fmt.Errorf("lookup boom")},
|
||||
peer: &nbpeer.Peer{ID: "p1"},
|
||||
realIP: realIP,
|
||||
wantNil: true,
|
||||
},
|
||||
{
|
||||
name: "same IP and same geoname returns nil",
|
||||
geo: &fakeGeo{geoNameID: 100, isoCode: "US", cityName: "City A"},
|
||||
peer: &nbpeer.Peer{
|
||||
ID: "p1",
|
||||
Location: nbpeer.Location{
|
||||
ConnectionIP: realIP,
|
||||
GeoNameID: 100,
|
||||
},
|
||||
},
|
||||
realIP: realIP,
|
||||
wantNil: true,
|
||||
},
|
||||
{
|
||||
name: "same IP but changed geoname returns location",
|
||||
geo: &fakeGeo{geoNameID: 200, isoCode: "US", cityName: "City B"},
|
||||
peer: &nbpeer.Peer{
|
||||
ID: "p1",
|
||||
Location: nbpeer.Location{
|
||||
ConnectionIP: realIP,
|
||||
GeoNameID: 100,
|
||||
},
|
||||
},
|
||||
realIP: realIP,
|
||||
want: &nbpeer.Location{
|
||||
ConnectionIP: realIP,
|
||||
CountryCode: "US",
|
||||
CityName: "City B",
|
||||
GeoNameID: 200,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "different IP returns location",
|
||||
geo: &fakeGeo{geoNameID: 100, isoCode: "US", cityName: "City A"},
|
||||
peer: &nbpeer.Peer{
|
||||
ID: "p1",
|
||||
Location: nbpeer.Location{
|
||||
ConnectionIP: net.ParseIP("198.51.100.7"),
|
||||
GeoNameID: 100,
|
||||
},
|
||||
},
|
||||
realIP: realIP,
|
||||
want: &nbpeer.Location{
|
||||
ConnectionIP: realIP,
|
||||
CountryCode: "US",
|
||||
CityName: "City A",
|
||||
GeoNameID: 100,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no prior location returns location",
|
||||
geo: &fakeGeo{geoNameID: 100, isoCode: "US", cityName: "City A"},
|
||||
peer: &nbpeer.Peer{ID: "p1"},
|
||||
realIP: realIP,
|
||||
want: &nbpeer.Location{
|
||||
ConnectionIP: realIP,
|
||||
CountryCode: "US",
|
||||
CityName: "City A",
|
||||
GeoNameID: 100,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
am := &DefaultAccountManager{geo: tt.geo}
|
||||
got := am.resolvePeerLocation(context.Background(), tt.peer, tt.realIP)
|
||||
if tt.wantNil {
|
||||
assert.Nil(t, got, "resolved location should be nil")
|
||||
return
|
||||
}
|
||||
require.NotNil(t, got, "resolved location should not be nil")
|
||||
assert.True(t, tt.want.ConnectionIP.Equal(got.ConnectionIP), "connection IP should match")
|
||||
assert.Equal(t, tt.want.CountryCode, got.CountryCode, "country code should match")
|
||||
assert.Equal(t, tt.want.CityName, got.CityName, "city name should match")
|
||||
assert.Equal(t, tt.want.GeoNameID, got.GeoNameID, "geoname id should match")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2765,28 +2765,6 @@ components:
|
||||
type: integer
|
||||
description: "Number of packets transmitted."
|
||||
example: 5
|
||||
num_of_starts:
|
||||
type: integer
|
||||
description: "Number of start events."
|
||||
example: 3
|
||||
num_of_ends:
|
||||
type: integer
|
||||
description: "Number of end events."
|
||||
example: 4
|
||||
num_of_drops:
|
||||
type: integer
|
||||
description: "Number of drop events."
|
||||
example: 5
|
||||
window_start:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Timestamp of the start of the aggregation window.
|
||||
example: 2025-03-20T16:23:58.125397Z
|
||||
window_end:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Timestamp of the end of the aggregation window.
|
||||
example: 2025-03-20T16:23:58.125397Z
|
||||
events:
|
||||
type: array
|
||||
description: "List of events that are correlated to this flow (e.g., start, end)."
|
||||
@@ -2808,11 +2786,6 @@ components:
|
||||
- rx_packets
|
||||
- tx_bytes
|
||||
- tx_packets
|
||||
- num_of_starts
|
||||
- num_of_ends
|
||||
- num_of_drops
|
||||
- window_start
|
||||
- window_end
|
||||
- events
|
||||
NetworkTrafficEventsResponse:
|
||||
type: object
|
||||
|
||||
@@ -2905,18 +2905,9 @@ type NetworkTrafficEvent struct {
|
||||
Events []NetworkTrafficSubEvent `json:"events"`
|
||||
|
||||
// FlowId FlowID is the ID of the connection flow. Not unique because it can be the same for multiple events (e.g., start and end of the connection).
|
||||
FlowId string `json:"flow_id"`
|
||||
Icmp NetworkTrafficICMP `json:"icmp"`
|
||||
|
||||
// NumOfDrops Number of drop events.
|
||||
NumOfDrops int `json:"num_of_drops"`
|
||||
|
||||
// NumOfEnds Number of end events.
|
||||
NumOfEnds int `json:"num_of_ends"`
|
||||
|
||||
// NumOfStarts Number of start events.
|
||||
NumOfStarts int `json:"num_of_starts"`
|
||||
Policy NetworkTrafficPolicy `json:"policy"`
|
||||
FlowId string `json:"flow_id"`
|
||||
Icmp NetworkTrafficICMP `json:"icmp"`
|
||||
Policy NetworkTrafficPolicy `json:"policy"`
|
||||
|
||||
// Protocol Protocol is the protocol of the traffic (e.g. 1 = ICMP, 6 = TCP, 17 = UDP, etc.).
|
||||
Protocol int `json:"protocol"`
|
||||
@@ -2937,12 +2928,6 @@ type NetworkTrafficEvent struct {
|
||||
// TxPackets Number of packets transmitted.
|
||||
TxPackets int `json:"tx_packets"`
|
||||
User NetworkTrafficUser `json:"user"`
|
||||
|
||||
// WindowEnd Timestamp of the end of the aggregation window.
|
||||
WindowEnd time.Time `json:"window_end"`
|
||||
|
||||
// WindowStart Timestamp of the start of the aggregation window.
|
||||
WindowStart time.Time `json:"window_start"`
|
||||
}
|
||||
|
||||
// NetworkTrafficEventsResponse defines model for NetworkTrafficEventsResponse.
|
||||
|
||||
Reference in New Issue
Block a user