mirror of
https://github.com/netbirdio/netbird.git
synced 2026-06-18 22:09:56 +00:00
Compare commits
30 Commits
fix/ipv6-a
...
dmitri-eve
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
17cc13f20f | ||
|
|
fd763ec0dd | ||
|
|
0e95b6d1c9 | ||
|
|
5dc159e06a | ||
|
|
1721a4ff7d | ||
|
|
9ea463ec2e | ||
|
|
3d6fc3bf92 | ||
|
|
0286c17ad6 | ||
|
|
e89c0f5656 | ||
|
|
ca4ce0a639 | ||
|
|
67d1419874 | ||
|
|
7295e2e51f | ||
|
|
a93cb66ea1 | ||
|
|
07c527f3fd | ||
|
|
9dc5e77ec0 | ||
|
|
fae5b7e007 | ||
|
|
c875aa6b4b | ||
|
|
e3f9396578 | ||
|
|
b21f7f7d6a | ||
|
|
98ce097ecb | ||
|
|
598558c77e | ||
|
|
d9d585e1d4 | ||
|
|
a593e32a1d | ||
|
|
12a8943b99 | ||
|
|
42e0007f4a | ||
|
|
8f99362a25 | ||
|
|
101ae3ca77 | ||
|
|
b654a75a43 | ||
|
|
243e93477f | ||
|
|
60bcf7dfc3 |
@@ -27,7 +27,7 @@ type Logger struct {
|
||||
wgIfaceNetV6 netip.Prefix
|
||||
dnsCollection atomic.Bool
|
||||
exitNodeCollection atomic.Bool
|
||||
Store types.Store
|
||||
Store types.AggregatingStore
|
||||
}
|
||||
|
||||
func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix) *Logger {
|
||||
@@ -35,7 +35,7 @@ func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix)
|
||||
statusRecorder: statusRecorder,
|
||||
wgIfaceNet: wgIfaceIPNet,
|
||||
wgIfaceNetV6: wgIfaceIPNetV6,
|
||||
Store: store.NewMemoryStore(),
|
||||
Store: store.NewAggregatingMemoryStore(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -125,6 +125,10 @@ func (l *Logger) stop() {
|
||||
l.mux.Unlock()
|
||||
}
|
||||
|
||||
func (l *Logger) ResetAggregationWindow() types.FlowEventAggregator {
|
||||
return l.Store.ResetAggregationWindow()
|
||||
}
|
||||
|
||||
func (l *Logger) GetEvents() []*types.Event {
|
||||
return l.Store.GetEvents()
|
||||
}
|
||||
|
||||
@@ -9,12 +9,14 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/google/uuid"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"google.golang.org/protobuf/types/known/timestamppb"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/conntrack"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/logger"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/store"
|
||||
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/netbirdio/netbird/client/internal/peer"
|
||||
"github.com/netbirdio/netbird/flow/client"
|
||||
@@ -23,14 +25,16 @@ import (
|
||||
|
||||
// Manager handles netflow tracking and logging
|
||||
type Manager struct {
|
||||
mux sync.Mutex
|
||||
shutdownWg sync.WaitGroup
|
||||
logger nftypes.FlowLogger
|
||||
flowConfig *nftypes.FlowConfig
|
||||
conntrack nftypes.ConnTracker
|
||||
receiverClient *client.GRPCClient
|
||||
publicKey []byte
|
||||
cancel context.CancelFunc
|
||||
mux sync.Mutex
|
||||
shutdownWg sync.WaitGroup
|
||||
logger nftypes.FlowLogger
|
||||
flowConfig *nftypes.FlowConfig
|
||||
conntrack nftypes.ConnTracker
|
||||
receiverClient *client.GRPCClient
|
||||
eventsWithoutAcks nftypes.Store
|
||||
publicKey []byte
|
||||
cancel context.CancelFunc
|
||||
retryInterval time.Duration
|
||||
}
|
||||
|
||||
// NewManager creates a new netflow manager
|
||||
@@ -48,9 +52,11 @@ func NewManager(iface nftypes.IFaceMapper, publicKey []byte, statusRecorder *pee
|
||||
}
|
||||
|
||||
return &Manager{
|
||||
logger: flowLogger,
|
||||
conntrack: ct,
|
||||
publicKey: publicKey,
|
||||
logger: flowLogger,
|
||||
conntrack: ct,
|
||||
publicKey: publicKey,
|
||||
retryInterval: time.Second,
|
||||
eventsWithoutAcks: store.NewMemoryStore(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,7 +113,7 @@ func (m *Manager) resetClient() error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
m.cancel = cancel
|
||||
|
||||
m.shutdownWg.Add(2)
|
||||
m.shutdownWg.Add(3)
|
||||
go func() {
|
||||
defer m.shutdownWg.Done()
|
||||
m.receiveACKs(ctx, flowClient)
|
||||
@@ -116,6 +122,10 @@ func (m *Manager) resetClient() error {
|
||||
defer m.shutdownWg.Done()
|
||||
m.startSender(ctx)
|
||||
}()
|
||||
go func() {
|
||||
defer m.shutdownWg.Done()
|
||||
m.startRetries(ctx)
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -207,13 +217,15 @@ func (m *Manager) startSender(ctx context.Context) {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
events := m.logger.GetEvents()
|
||||
collectedEvents := m.logger.ResetAggregationWindow()
|
||||
events := collectedEvents.GetAggregatedEvents()
|
||||
for _, event := range events {
|
||||
if err := m.send(event); err != nil {
|
||||
log.Errorf("failed to send flow event to server: %v", err)
|
||||
continue
|
||||
} else {
|
||||
log.Tracef("sent flow event: %s", event.ID)
|
||||
}
|
||||
log.Tracef("sent flow event: %s", event.ID)
|
||||
m.eventsWithoutAcks.StoreEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -227,7 +239,7 @@ func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
|
||||
return nil
|
||||
}
|
||||
log.Tracef("received flow event ack: %s", id)
|
||||
m.logger.DeleteEvents([]uuid.UUID{id})
|
||||
m.eventsWithoutAcks.DeleteEvents([]uuid.UUID{id})
|
||||
return nil
|
||||
})
|
||||
|
||||
@@ -236,6 +248,43 @@ func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
|
||||
}
|
||||
}
|
||||
|
||||
// We effectively never drop events (see MaxInterval), which makes eventsWithoutAcks unbounded.
|
||||
// We may want to limit the max size of the store, and start dropping oldest events when the threshold is reached.
|
||||
func (m *Manager) startRetries(ctx context.Context) {
|
||||
timer := time.NewTimer(m.retryInterval)
|
||||
retryBackoff := backoff.WithContext(&backoff.ExponentialBackOff{
|
||||
InitialInterval: 1 * time.Second,
|
||||
RandomizationFactor: 0.5,
|
||||
Multiplier: 1.7,
|
||||
MaxInterval: m.flowConfig.Interval / 2,
|
||||
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
|
||||
Stop: backoff.Stop,
|
||||
Clock: backoff.SystemClock,
|
||||
}, ctx)
|
||||
defer timer.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-timer.C:
|
||||
for _, e := range m.eventsWithoutAcks.GetEvents() {
|
||||
if e.Timestamp.Add(time.Second).After(time.Now()) {
|
||||
// grace period on retries to avoid early retries
|
||||
// do not retry if the event is less than 1 sec old
|
||||
continue
|
||||
}
|
||||
if err := m.send(e); err != nil {
|
||||
timer = time.NewTimer(retryBackoff.NextBackOff()) //nolint:staticcheck,wastedassign
|
||||
break
|
||||
}
|
||||
}
|
||||
retryBackoff.Reset()
|
||||
timer = time.NewTimer(time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) send(event *nftypes.Event) error {
|
||||
m.mux.Lock()
|
||||
client := m.receiverClient
|
||||
@@ -250,9 +299,11 @@ func (m *Manager) send(event *nftypes.Event) error {
|
||||
|
||||
func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
|
||||
protoEvent := &proto.FlowEvent{
|
||||
EventId: event.ID[:],
|
||||
Timestamp: timestamppb.New(event.Timestamp),
|
||||
PublicKey: publicKey,
|
||||
EventId: event.ID[:],
|
||||
Timestamp: timestamppb.New(event.Timestamp),
|
||||
PublicKey: publicKey,
|
||||
WindowStart: timestamppb.New(event.WindowStart),
|
||||
WindowEnd: timestamppb.New(event.WindowEnd),
|
||||
FlowFields: &proto.FlowFields{
|
||||
FlowId: event.FlowID[:],
|
||||
RuleId: event.RuleID,
|
||||
@@ -267,6 +318,9 @@ func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent {
|
||||
TxBytes: event.TxBytes,
|
||||
SourceResourceId: event.SourceResourceID,
|
||||
DestResourceId: event.DestResourceID,
|
||||
NumOfStarts: event.NumOfStarts,
|
||||
NumOfEnds: event.NumOfEnds,
|
||||
NumOfDrops: event.NumOfDrops,
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
291
client/internal/netflow/manager_integration_test.go
Normal file
291
client/internal/netflow/manager_integration_test.go
Normal file
@@ -0,0 +1,291 @@
|
||||
package netflow
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/netbirdio/netbird/client/iface/wgaddr"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/netbirdio/netbird/flow/proto"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
type testServer struct {
|
||||
proto.UnimplementedFlowServiceServer
|
||||
events chan *proto.FlowEvent
|
||||
acks chan *proto.FlowEventAck
|
||||
grpcSrv *grpc.Server
|
||||
addr string
|
||||
handlerDone chan struct{} // signaled each time Events() exits
|
||||
handlerStarted chan struct{} // signaled each time Events() begins
|
||||
}
|
||||
|
||||
func newTestServer(t *testing.T) *testServer {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
s := &testServer{
|
||||
events: make(chan *proto.FlowEvent, 100),
|
||||
acks: make(chan *proto.FlowEventAck, 100),
|
||||
grpcSrv: grpc.NewServer(),
|
||||
addr: listener.Addr().String(),
|
||||
handlerDone: make(chan struct{}, 10),
|
||||
handlerStarted: make(chan struct{}, 10),
|
||||
}
|
||||
|
||||
proto.RegisterFlowServiceServer(s.grpcSrv, s)
|
||||
|
||||
go func() {
|
||||
if err := s.grpcSrv.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
|
||||
t.Logf("server error: %v", err)
|
||||
}
|
||||
}()
|
||||
|
||||
t.Cleanup(func() {
|
||||
s.grpcSrv.Stop()
|
||||
})
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
|
||||
defer func() {
|
||||
select {
|
||||
case s.handlerDone <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}()
|
||||
|
||||
err := stream.Send(&proto.FlowEventAck{IsInitiator: true})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
select {
|
||||
case s.handlerStarted <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(stream.Context())
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
for {
|
||||
event, err := stream.Recv()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !event.IsInitiator {
|
||||
select {
|
||||
case s.events <- event:
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
select {
|
||||
case ack := <-s.acks:
|
||||
if err := stream.Send(ack); err != nil {
|
||||
return err
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendEventReceiveAck(t *testing.T) {
|
||||
_, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
server := newTestServer(t)
|
||||
manager := createManager(t, server.addr, 60*time.Second) // set high to prevent retries in this test
|
||||
defer manager.Close()
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
event1 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.2"),
|
||||
DestPort: 2345,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event1)
|
||||
event2 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.1"),
|
||||
DestPort: 1234,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event2)
|
||||
|
||||
// verify the server received logged events
|
||||
serverSideEvents := make([]*proto.FlowEvent, 0)
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case event := <-server.events:
|
||||
serverSideEvents = append(serverSideEvents, event)
|
||||
if len(serverSideEvents) == 2 {
|
||||
return true
|
||||
}
|
||||
default:
|
||||
if len(serverSideEvents) == 2 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}, 5*time.Second, 100*time.Millisecond)
|
||||
|
||||
serverSideFlowIds := make([]uuid.UUID, 0, 2)
|
||||
slices.Values(serverSideEvents)(func(e *proto.FlowEvent) bool {
|
||||
id, err := uuid.FromBytes(e.FlowFields.FlowId)
|
||||
assert.NoError(t, err)
|
||||
serverSideFlowIds = append(serverSideFlowIds, id)
|
||||
return true
|
||||
})
|
||||
assert.ElementsMatch(t, []uuid.UUID{event1.FlowID, event2.FlowID}, serverSideFlowIds)
|
||||
|
||||
// verify the manager tracks un-acked events
|
||||
unackedEvents := manager.eventsWithoutAcks.GetEvents()
|
||||
assert.Len(t, unackedEvents, 2)
|
||||
flowIds := make([]uuid.UUID, 0)
|
||||
slices.Values(unackedEvents)(func(e *types.Event) bool {
|
||||
flowIds = append(flowIds, e.FlowID)
|
||||
return true
|
||||
})
|
||||
assert.ElementsMatch(t, flowIds, []uuid.UUID{event1.FlowID, event2.FlowID})
|
||||
}
|
||||
|
||||
// verify handling of retries:
|
||||
// - unacked events are retried
|
||||
// - when acks arrive, events are removed from the un-acked event tracker
|
||||
func TestRetryEvents(t *testing.T) {
|
||||
_, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
server := newTestServer(t)
|
||||
manager := createManager(t, server.addr, time.Second) // set low to start retries sooner
|
||||
defer manager.Close()
|
||||
|
||||
assert.Eventually(t, func() bool {
|
||||
select {
|
||||
case <-server.handlerStarted:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
|
||||
event1 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.2"),
|
||||
DestPort: 2345,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event1)
|
||||
event2 := types.EventFields{
|
||||
FlowID: uuid.New(),
|
||||
Type: types.TypeStart,
|
||||
Direction: types.Ingress,
|
||||
DestIP: ipAddr("172.16.1.1"),
|
||||
DestPort: 1234,
|
||||
Protocol: 6,
|
||||
}
|
||||
manager.logger.StoreEvent(event2)
|
||||
|
||||
// verify the server received retries of logged events
|
||||
serverSideEvents := make([]*proto.FlowEvent, 0)
|
||||
func() {
|
||||
c := time.After(2500 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case event := <-server.events:
|
||||
serverSideEvents = append(serverSideEvents, event)
|
||||
case <-c:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
assert.True(t, len(serverSideEvents) > 2) // must see retries
|
||||
|
||||
uniqueServerSideEvents := make(map[uuid.UUID]*proto.FlowEvent)
|
||||
slices.Values(serverSideEvents)(func(e *proto.FlowEvent) bool {
|
||||
id, err := uuid.FromBytes(e.FlowFields.FlowId)
|
||||
assert.NoError(t, err)
|
||||
uniqueServerSideEvents[id] = e
|
||||
return true
|
||||
})
|
||||
assert.Contains(t, uniqueServerSideEvents, event1.FlowID)
|
||||
assert.Contains(t, uniqueServerSideEvents, event2.FlowID)
|
||||
|
||||
// ack events
|
||||
server.acks <- &proto.FlowEventAck{EventId: uniqueServerSideEvents[event1.FlowID].EventId}
|
||||
server.acks <- &proto.FlowEventAck{EventId: uniqueServerSideEvents[event2.FlowID].EventId}
|
||||
|
||||
assert.EventuallyWithT(t, func(c *assert.CollectT) {
|
||||
unackedEvents := manager.eventsWithoutAcks.GetEvents()
|
||||
assert.Empty(c, unackedEvents)
|
||||
|
||||
}, 3*time.Second, 100*time.Millisecond)
|
||||
}
|
||||
|
||||
func createManager(t *testing.T, serverAddr string, retryInterval time.Duration) *Manager {
|
||||
t.Helper()
|
||||
|
||||
mockIFace := &mockIFaceMapper{
|
||||
address: wgaddr.Address{
|
||||
Network: netip.MustParsePrefix("192.168.1.1/32"),
|
||||
},
|
||||
isUserspaceBind: true,
|
||||
}
|
||||
|
||||
publicKey := []byte("test-public-key")
|
||||
manager := NewManager(mockIFace, publicKey, nil)
|
||||
manager.retryInterval = retryInterval
|
||||
|
||||
initialConfig := &types.FlowConfig{
|
||||
Enabled: true,
|
||||
URL: fmt.Sprintf("http://%s", serverAddr),
|
||||
TokenPayload: "initial-payload",
|
||||
TokenSignature: "initial-signature",
|
||||
Interval: 500 * time.Millisecond,
|
||||
}
|
||||
|
||||
err := manager.Update(initialConfig)
|
||||
require.NoError(t, err)
|
||||
|
||||
return manager
|
||||
}
|
||||
|
||||
func ipAddr(a string) netip.Addr {
|
||||
addr, _ := netip.ParseAddr(a)
|
||||
return addr
|
||||
}
|
||||
209
client/internal/netflow/store/event_aggregation_test.go
Normal file
209
client/internal/netflow/store/event_aggregation_test.go
Normal file
@@ -0,0 +1,209 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"net/netip"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
var random = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
func TestFlowAggregation(t *testing.T) {
|
||||
var protocols = []types.Protocol{types.ICMP, types.ICMPv6, types.TCP, types.UDP}
|
||||
var tests = []struct {
|
||||
description string
|
||||
addresses [][]netip.Addr
|
||||
dstPort uint16
|
||||
eventTypes []types.Type
|
||||
}{
|
||||
{
|
||||
description: "start and stop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
|
||||
},
|
||||
{
|
||||
description: "start and drop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
|
||||
},
|
||||
{
|
||||
description: "start only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeStart},
|
||||
},
|
||||
{
|
||||
description: "drop only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}, {ipAddr("3.3.3.3"), ipAddr("2.2.2.2")}},
|
||||
dstPort: uint16(random.Uint32() >> 16),
|
||||
eventTypes: []types.Type{types.TypeDrop},
|
||||
}}
|
||||
|
||||
for _, protocol := range protocols {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description+" "+protocol.String(), func(t *testing.T) {
|
||||
store := NewAggregatingMemoryStore()
|
||||
store.WindowEnd = time.Now().Add(5 * time.Second)
|
||||
|
||||
allExpected := make([]*types.Event, 0)
|
||||
|
||||
for _, srcAndDst := range tt.addresses {
|
||||
inEvents, expected := generateEvents(srcAndDst[0], srcAndDst[1], tt.dstPort, tt.eventTypes, protocol, types.Ingress, 0, store.WindowStart, store.WindowEnd)
|
||||
for _, e := range inEvents {
|
||||
store.StoreEvent(e)
|
||||
}
|
||||
allExpected = append(allExpected, expected)
|
||||
}
|
||||
|
||||
events := store.GetAggregatedEvents()
|
||||
assert.ElementsMatch(t, events, allExpected)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestIcmpEventAggregation(t *testing.T) {
|
||||
var protocols = []types.Protocol{types.ICMP, types.ICMPv6}
|
||||
var icmpTypes = []uint8{1, 2, 3}
|
||||
|
||||
var tests = []struct {
|
||||
description string
|
||||
addresses [][]netip.Addr
|
||||
eventTypes []types.Type
|
||||
}{
|
||||
{
|
||||
description: "start and stop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
|
||||
},
|
||||
{
|
||||
description: "start and drop",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
|
||||
},
|
||||
{
|
||||
description: "start only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeStart},
|
||||
},
|
||||
{
|
||||
description: "drop only",
|
||||
addresses: [][]netip.Addr{{ipAddr("1.1.1.1"), ipAddr("2.2.2.2")}},
|
||||
eventTypes: []types.Type{types.TypeDrop},
|
||||
}}
|
||||
|
||||
for _, protocol := range protocols {
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.description+" "+protocol.String(), func(t *testing.T) {
|
||||
store := NewAggregatingMemoryStore()
|
||||
store.WindowEnd = time.Now().Add(5 * time.Second)
|
||||
|
||||
allExpected := make([]*types.Event, 0)
|
||||
for _, icmpType := range icmpTypes {
|
||||
events, expected := generateEvents(tt.addresses[0][0], tt.addresses[0][1], 0, tt.eventTypes, protocol, types.Ingress, icmpType, store.WindowStart, store.WindowEnd)
|
||||
for _, e := range events {
|
||||
store.StoreEvent(e)
|
||||
}
|
||||
allExpected = append(allExpected, expected)
|
||||
}
|
||||
aggregatedEvents := store.GetAggregatedEvents()
|
||||
assert.Len(t, aggregatedEvents, len(allExpected))
|
||||
assert.ElementsMatch(t, aggregatedEvents, allExpected)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ipAddr(a string) netip.Addr {
|
||||
addr, _ := netip.ParseAddr(a)
|
||||
return addr
|
||||
}
|
||||
|
||||
func generateEvents(srcIp, dstIp netip.Addr, dstPort uint16, eventTypes []types.Type, protocol types.Protocol,
|
||||
direction types.Direction, icmpType uint8, windowStart, windowEnd time.Time) ([]*types.Event, *types.Event) {
|
||||
var rxPackets, txPackets, rxBytes, txBytes uint64
|
||||
inEvents := make([]*types.Event, 0)
|
||||
ts := time.Now()
|
||||
flowId := uuid.New()
|
||||
srcPort := uint16(random.Uint32() >> 16)
|
||||
|
||||
for idx, eventType := range eventTypes {
|
||||
e := &types.Event{
|
||||
ID: uuid.New(),
|
||||
Timestamp: ts.Add(time.Duration(idx) * time.Second),
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: eventType,
|
||||
Protocol: protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: random.Uint64(),
|
||||
TxPackets: random.Uint64(),
|
||||
RxBytes: random.Uint64(),
|
||||
TxBytes: random.Uint64(),
|
||||
}}
|
||||
rxBytes += e.RxBytes
|
||||
txBytes += e.TxBytes
|
||||
rxPackets += e.RxPackets
|
||||
txPackets += e.TxPackets
|
||||
inEvents = append(inEvents, e)
|
||||
if protocol == types.ICMP || protocol == types.ICMPv6 {
|
||||
e.ICMPType = icmpType
|
||||
}
|
||||
}
|
||||
|
||||
var start, end, drop uint64
|
||||
for _, eventType := range eventTypes {
|
||||
switch eventType {
|
||||
case types.TypeStart:
|
||||
start += 1
|
||||
case types.TypeDrop:
|
||||
drop += 1
|
||||
case types.TypeEnd:
|
||||
end += 1
|
||||
}
|
||||
}
|
||||
aggregatedEvent := &types.Event{
|
||||
ID: inEvents[0].ID,
|
||||
Timestamp: inEvents[0].Timestamp,
|
||||
WindowStart: windowStart,
|
||||
WindowEnd: windowEnd,
|
||||
EventFields: types.EventFields{
|
||||
FlowID: flowId,
|
||||
Type: types.TypeUnknown,
|
||||
Protocol: inEvents[0].Protocol,
|
||||
RuleID: []byte("rule-id-1"),
|
||||
Direction: inEvents[0].Direction,
|
||||
SourceIP: srcIp,
|
||||
SourcePort: srcPort,
|
||||
DestIP: dstIp,
|
||||
DestPort: dstPort,
|
||||
SourceResourceID: []byte("source-resource-id"),
|
||||
DestResourceID: []byte("dest-resource-id"),
|
||||
RxPackets: rxPackets,
|
||||
TxPackets: txPackets,
|
||||
RxBytes: rxBytes,
|
||||
TxBytes: txBytes,
|
||||
NumOfStarts: start,
|
||||
NumOfEnds: end,
|
||||
NumOfDrops: drop,
|
||||
}}
|
||||
if protocol == types.ICMP || protocol == types.ICMPv6 {
|
||||
aggregatedEvent.ICMPType = icmpType
|
||||
}
|
||||
|
||||
return inEvents, aggregatedEvent
|
||||
}
|
||||
@@ -1,10 +1,13 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"maps"
|
||||
"net/netip"
|
||||
"slices"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/netbirdio/netbird/client/internal/netflow/types"
|
||||
)
|
||||
|
||||
@@ -19,6 +22,12 @@ type Memory struct {
|
||||
events map[uuid.UUID]*types.Event
|
||||
}
|
||||
|
||||
type AggregatingMemory struct {
|
||||
Memory
|
||||
WindowStart time.Time
|
||||
WindowEnd time.Time
|
||||
}
|
||||
|
||||
func (m *Memory) StoreEvent(event *types.Event) {
|
||||
m.mux.Lock()
|
||||
defer m.mux.Unlock()
|
||||
@@ -48,3 +57,92 @@ func (m *Memory) DeleteEvents(ids []uuid.UUID) {
|
||||
delete(m.events, id)
|
||||
}
|
||||
}
|
||||
|
||||
func NewAggregatingMemoryStore() *AggregatingMemory {
|
||||
return &AggregatingMemory{WindowStart: time.Now(), Memory: Memory{events: make(map[uuid.UUID]*types.Event)}}
|
||||
}
|
||||
|
||||
func (am *AggregatingMemory) ResetAggregationWindow() types.FlowEventAggregator {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
toret := AggregatingMemory{WindowStart: am.WindowStart, WindowEnd: time.Now(), Memory: Memory{events: am.events}}
|
||||
|
||||
am.events = make(map[uuid.UUID]*types.Event)
|
||||
am.WindowStart = time.Now()
|
||||
|
||||
return &toret
|
||||
}
|
||||
|
||||
type aggregationKey struct {
|
||||
srcAddr netip.Addr
|
||||
destAddr netip.Addr
|
||||
destPort uint16
|
||||
direction int
|
||||
protocol uint8
|
||||
icmpType uint8
|
||||
unique int64 // used to prevent aggregation on non icmp/udp/tcp events
|
||||
}
|
||||
|
||||
func (am *AggregatingMemory) GetAggregatedEvents() []*types.Event {
|
||||
am.mux.Lock()
|
||||
defer am.mux.Unlock()
|
||||
|
||||
aggregated := make(map[aggregationKey]*types.Event)
|
||||
for _, v := range am.events {
|
||||
lookupKey := aggregationKey{srcAddr: v.SourceIP, destAddr: v.DestIP, destPort: v.DestPort, direction: int(v.Direction), protocol: uint8(v.Protocol), icmpType: v.ICMPType}
|
||||
if _, ok := aggregated[lookupKey]; !ok {
|
||||
event := v.Clone()
|
||||
|
||||
if event.Protocol != types.ICMP && event.Protocol != types.ICMPv6 && event.Protocol != types.UDP && event.Protocol != types.TCP {
|
||||
lookupKey.unique = time.Now().UnixNano() // to make the lookup key unique so we don't aggregate on it
|
||||
aggregated[lookupKey] = event
|
||||
continue
|
||||
}
|
||||
aggregated[lookupKey] = event
|
||||
|
||||
switch event.Type {
|
||||
case types.TypeStart:
|
||||
event.NumOfStarts += 1
|
||||
case types.TypeDrop:
|
||||
event.NumOfDrops += 1
|
||||
case types.TypeEnd:
|
||||
event.NumOfEnds += 1
|
||||
}
|
||||
event.Type = types.TypeUnknown
|
||||
|
||||
// Please note that ICMPCode field isn't propagated by the manager (see flow/proto/flow.pb.go, FlowFields struct)
|
||||
// so the field value in an icmp event in the "aggregated" doesn't matter
|
||||
|
||||
event.WindowStart = am.WindowStart
|
||||
event.WindowEnd = am.WindowEnd
|
||||
continue
|
||||
}
|
||||
|
||||
aggregatedEvent := aggregated[lookupKey]
|
||||
if aggregatedEvent.Protocol != types.ICMP && aggregatedEvent.Protocol != types.ICMPv6 && aggregatedEvent.Protocol != types.UDP && aggregatedEvent.Protocol != types.TCP {
|
||||
continue // we don't aggregate this type of events; shouldn't ever get here
|
||||
}
|
||||
|
||||
// track the number of connections, duration?, open and close events?
|
||||
aggregatedEvent.RxBytes += v.RxBytes
|
||||
aggregatedEvent.RxPackets += v.RxPackets
|
||||
aggregatedEvent.TxBytes += v.TxBytes
|
||||
aggregatedEvent.TxPackets += v.TxPackets
|
||||
switch v.Type {
|
||||
case types.TypeStart:
|
||||
aggregatedEvent.NumOfStarts += 1
|
||||
case types.TypeDrop:
|
||||
aggregatedEvent.NumOfDrops += 1
|
||||
case types.TypeEnd:
|
||||
aggregatedEvent.NumOfEnds += 1
|
||||
}
|
||||
if aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0 {
|
||||
aggregatedEvent.Timestamp = v.Timestamp
|
||||
aggregatedEvent.ID = v.ID
|
||||
aggregatedEvent.SourcePort = v.SourcePort
|
||||
}
|
||||
}
|
||||
|
||||
return slices.Collect(maps.Values(aggregated)) // could return an iterator instead here
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package types
|
||||
|
||||
import (
|
||||
"net/netip"
|
||||
"slices"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
@@ -69,8 +70,10 @@ const (
|
||||
)
|
||||
|
||||
type Event struct {
|
||||
ID uuid.UUID
|
||||
Timestamp time.Time
|
||||
ID uuid.UUID
|
||||
Timestamp time.Time
|
||||
WindowStart time.Time
|
||||
WindowEnd time.Time
|
||||
EventFields
|
||||
}
|
||||
|
||||
@@ -92,6 +95,17 @@ type EventFields struct {
|
||||
TxPackets uint64
|
||||
RxBytes uint64
|
||||
TxBytes uint64
|
||||
NumOfStarts uint64
|
||||
NumOfEnds uint64
|
||||
NumOfDrops uint64
|
||||
}
|
||||
|
||||
func (e *Event) Clone() *Event {
|
||||
toret := *e
|
||||
toret.RuleID = slices.Clone(e.RuleID)
|
||||
toret.SourceResourceID = slices.Clone(e.SourceResourceID)
|
||||
toret.DestResourceID = slices.Clone(e.DestResourceID)
|
||||
return &toret
|
||||
}
|
||||
|
||||
type FlowConfig struct {
|
||||
@@ -114,13 +128,15 @@ type FlowManager interface {
|
||||
GetLogger() FlowLogger
|
||||
}
|
||||
|
||||
type FlowEventAggregator interface {
|
||||
ResetAggregationWindow() FlowEventAggregator
|
||||
GetAggregatedEvents() []*Event
|
||||
}
|
||||
|
||||
type FlowLogger interface {
|
||||
ResetAggregationWindow() FlowEventAggregator
|
||||
// StoreEvent stores a flow event
|
||||
StoreEvent(flowEvent EventFields)
|
||||
// GetEvents returns all stored events
|
||||
GetEvents() []*Event
|
||||
// DeleteEvents deletes events from the store
|
||||
DeleteEvents([]uuid.UUID)
|
||||
// Close closes the logger
|
||||
Close()
|
||||
// Enable enables the flow logger receiver
|
||||
@@ -140,6 +156,11 @@ type Store interface {
|
||||
Close()
|
||||
}
|
||||
|
||||
type AggregatingStore interface {
|
||||
FlowEventAggregator
|
||||
Store
|
||||
}
|
||||
|
||||
// ConnTracker defines the interface for connection tracking functionality
|
||||
type ConnTracker interface {
|
||||
// Start begins tracking connections by listening for conntrack events.
|
||||
|
||||
@@ -134,9 +134,11 @@ type FlowEvent struct {
|
||||
// When the event occurred
|
||||
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
|
||||
// Public key of the sending peer
|
||||
PublicKey []byte `protobuf:"bytes,3,opt,name=public_key,json=publicKey,proto3" json:"public_key,omitempty"`
|
||||
FlowFields *FlowFields `protobuf:"bytes,4,opt,name=flow_fields,json=flowFields,proto3" json:"flow_fields,omitempty"`
|
||||
IsInitiator bool `protobuf:"varint,5,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
|
||||
PublicKey []byte `protobuf:"bytes,3,opt,name=public_key,json=publicKey,proto3" json:"public_key,omitempty"`
|
||||
FlowFields *FlowFields `protobuf:"bytes,4,opt,name=flow_fields,json=flowFields,proto3" json:"flow_fields,omitempty"`
|
||||
IsInitiator bool `protobuf:"varint,5,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
|
||||
WindowStart *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=window_start,json=windowStart,proto3" json:"window_start,omitempty"`
|
||||
WindowEnd *timestamppb.Timestamp `protobuf:"bytes,7,opt,name=window_end,json=windowEnd,proto3" json:"window_end,omitempty"`
|
||||
}
|
||||
|
||||
func (x *FlowEvent) Reset() {
|
||||
@@ -206,6 +208,20 @@ func (x *FlowEvent) GetIsInitiator() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *FlowEvent) GetWindowStart() *timestamppb.Timestamp {
|
||||
if x != nil {
|
||||
return x.WindowStart
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *FlowEvent) GetWindowEnd() *timestamppb.Timestamp {
|
||||
if x != nil {
|
||||
return x.WindowEnd
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type FlowEventAck struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
@@ -284,7 +300,6 @@ type FlowFields struct {
|
||||
// Layer 4 -specific information
|
||||
//
|
||||
// Types that are assignable to ConnectionInfo:
|
||||
//
|
||||
// *FlowFields_PortInfo
|
||||
// *FlowFields_IcmpInfo
|
||||
ConnectionInfo isFlowFields_ConnectionInfo `protobuf_oneof:"connection_info"`
|
||||
@@ -297,6 +312,9 @@ type FlowFields struct {
|
||||
// Resource ID
|
||||
SourceResourceId []byte `protobuf:"bytes,14,opt,name=source_resource_id,json=sourceResourceId,proto3" json:"source_resource_id,omitempty"`
|
||||
DestResourceId []byte `protobuf:"bytes,15,opt,name=dest_resource_id,json=destResourceId,proto3" json:"dest_resource_id,omitempty"`
|
||||
NumOfStarts uint64 `protobuf:"varint,16,opt,name=num_of_starts,json=numOfStarts,proto3" json:"num_of_starts,omitempty"`
|
||||
NumOfEnds uint64 `protobuf:"varint,17,opt,name=num_of_ends,json=numOfEnds,proto3" json:"num_of_ends,omitempty"`
|
||||
NumOfDrops uint64 `protobuf:"varint,18,opt,name=num_of_drops,json=numOfDrops,proto3" json:"num_of_drops,omitempty"`
|
||||
}
|
||||
|
||||
func (x *FlowFields) Reset() {
|
||||
@@ -443,6 +461,27 @@ func (x *FlowFields) GetDestResourceId() []byte {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *FlowFields) GetNumOfStarts() uint64 {
|
||||
if x != nil {
|
||||
return x.NumOfStarts
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *FlowFields) GetNumOfEnds() uint64 {
|
||||
if x != nil {
|
||||
return x.NumOfEnds
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *FlowFields) GetNumOfDrops() uint64 {
|
||||
if x != nil {
|
||||
return x.NumOfDrops
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type isFlowFields_ConnectionInfo interface {
|
||||
isFlowFields_ConnectionInfo()
|
||||
}
|
||||
@@ -579,7 +618,7 @@ var file_flow_proto_rawDesc = []byte{
|
||||
0x0a, 0x0a, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72,
|
||||
0x6f, 0x74, 0x6f, 0x22, 0xd4, 0x01, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x6f, 0x74, 0x6f, 0x22, 0xce, 0x02, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x74, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09,
|
||||
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
|
||||
@@ -592,45 +631,59 @@ var file_flow_proto_rawDesc = []byte{
|
||||
0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x52, 0x0a, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e,
|
||||
0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69,
|
||||
0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x4b, 0x0a, 0x0c, 0x46, 0x6c,
|
||||
0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76,
|
||||
0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76,
|
||||
0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69,
|
||||
0x61, 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x49, 0x6e,
|
||||
0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x9c, 0x04, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77,
|
||||
0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69,
|
||||
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12,
|
||||
0x1e, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e,
|
||||
0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12,
|
||||
0x17, 0x0a, 0x07, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c,
|
||||
0x52, 0x06, 0x72, 0x75, 0x6c, 0x65, 0x49, 0x64, 0x12, 0x2d, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65,
|
||||
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x2e, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x64, 0x69,
|
||||
0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x63, 0x6f, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x63, 0x6f, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x70,
|
||||
0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x70,
|
||||
0x12, 0x17, 0x0a, 0x07, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28,
|
||||
0x0c, 0x52, 0x06, 0x64, 0x65, 0x73, 0x74, 0x49, 0x70, 0x12, 0x2d, 0x0a, 0x09, 0x70, 0x6f, 0x72,
|
||||
0x74, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66,
|
||||
0x6c, 0x6f, 0x77, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08,
|
||||
0x70, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70,
|
||||
0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c,
|
||||
0x6f, 0x77, 0x2e, 0x49, 0x43, 0x4d, 0x50, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x69,
|
||||
0x63, 0x6d, 0x70, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x78, 0x5f, 0x70, 0x61,
|
||||
0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x78, 0x50,
|
||||
0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x78, 0x5f, 0x70, 0x61, 0x63,
|
||||
0x6b, 0x65, 0x74, 0x73, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x78, 0x50, 0x61,
|
||||
0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65,
|
||||
0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x72, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73,
|
||||
0x12, 0x19, 0x0a, 0x08, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01,
|
||||
0x28, 0x04, 0x52, 0x07, 0x74, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x73,
|
||||
0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69,
|
||||
0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52,
|
||||
0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x10, 0x64, 0x65, 0x73,
|
||||
0x74, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0f, 0x20,
|
||||
0x01, 0x28, 0x0c, 0x52, 0x0e, 0x64, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63,
|
||||
0x65, 0x49, 0x64, 0x42, 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f,
|
||||
0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x12, 0x3d, 0x0a, 0x0c, 0x77, 0x69,
|
||||
0x6e, 0x64, 0x6f, 0x77, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b,
|
||||
0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62,
|
||||
0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x0b, 0x77, 0x69,
|
||||
0x6e, 0x64, 0x6f, 0x77, 0x53, 0x74, 0x61, 0x72, 0x74, 0x12, 0x39, 0x0a, 0x0a, 0x77, 0x69, 0x6e,
|
||||
0x64, 0x6f, 0x77, 0x5f, 0x65, 0x6e, 0x64, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e,
|
||||
0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e,
|
||||
0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x77, 0x69, 0x6e, 0x64, 0x6f,
|
||||
0x77, 0x45, 0x6e, 0x64, 0x22, 0x4b, 0x0a, 0x0c, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
|
||||
0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64,
|
||||
0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12,
|
||||
0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x02,
|
||||
0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f,
|
||||
0x72, 0x22, 0x82, 0x05, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73,
|
||||
0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28,
|
||||
0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x74, 0x79, 0x70,
|
||||
0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54,
|
||||
0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12, 0x17, 0x0a, 0x07, 0x72, 0x75, 0x6c,
|
||||
0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x72, 0x75, 0x6c, 0x65,
|
||||
0x49, 0x64, 0x12, 0x2d, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18,
|
||||
0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x44, 0x69, 0x72,
|
||||
0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x18, 0x05, 0x20,
|
||||
0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x63, 0x6f, 0x6c, 0x12, 0x1b, 0x0a,
|
||||
0x09, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x70, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0c,
|
||||
0x52, 0x08, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x70, 0x12, 0x17, 0x0a, 0x07, 0x64, 0x65,
|
||||
0x73, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x64, 0x65, 0x73,
|
||||
0x74, 0x49, 0x70, 0x12, 0x2d, 0x0a, 0x09, 0x70, 0x6f, 0x72, 0x74, 0x5f, 0x69, 0x6e, 0x66, 0x6f,
|
||||
0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x50, 0x6f,
|
||||
0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x70, 0x6f, 0x72, 0x74, 0x49, 0x6e,
|
||||
0x66, 0x6f, 0x12, 0x2d, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18,
|
||||
0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x49, 0x43, 0x4d,
|
||||
0x50, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x69, 0x63, 0x6d, 0x70, 0x49, 0x6e, 0x66,
|
||||
0x6f, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x78, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x18,
|
||||
0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x78, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73,
|
||||
0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x78, 0x5f, 0x70, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x0b,
|
||||
0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x78, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x12,
|
||||
0x19, 0x0a, 0x08, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28,
|
||||
0x04, 0x52, 0x07, 0x72, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x74, 0x78,
|
||||
0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x74, 0x78,
|
||||
0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f,
|
||||
0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0e, 0x20, 0x01, 0x28,
|
||||
0x0c, 0x52, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63,
|
||||
0x65, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x10, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x72, 0x65, 0x73, 0x6f,
|
||||
0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x0e, 0x64,
|
||||
0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x22, 0x0a,
|
||||
0x0d, 0x6e, 0x75, 0x6d, 0x5f, 0x6f, 0x66, 0x5f, 0x73, 0x74, 0x61, 0x72, 0x74, 0x73, 0x18, 0x10,
|
||||
0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x6e, 0x75, 0x6d, 0x4f, 0x66, 0x53, 0x74, 0x61, 0x72, 0x74,
|
||||
0x73, 0x12, 0x1e, 0x0a, 0x0b, 0x6e, 0x75, 0x6d, 0x5f, 0x6f, 0x66, 0x5f, 0x65, 0x6e, 0x64, 0x73,
|
||||
0x18, 0x11, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x6e, 0x75, 0x6d, 0x4f, 0x66, 0x45, 0x6e, 0x64,
|
||||
0x73, 0x12, 0x20, 0x0a, 0x0c, 0x6e, 0x75, 0x6d, 0x5f, 0x6f, 0x66, 0x5f, 0x64, 0x72, 0x6f, 0x70,
|
||||
0x73, 0x18, 0x12, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0a, 0x6e, 0x75, 0x6d, 0x4f, 0x66, 0x44, 0x72,
|
||||
0x6f, 0x70, 0x73, 0x42, 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f,
|
||||
0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x22, 0x48, 0x0a, 0x08, 0x50, 0x6f, 0x72, 0x74, 0x49, 0x6e,
|
||||
0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x6f, 0x72,
|
||||
0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50,
|
||||
@@ -683,17 +736,19 @@ var file_flow_proto_goTypes = []interface{}{
|
||||
var file_flow_proto_depIdxs = []int32{
|
||||
7, // 0: flow.FlowEvent.timestamp:type_name -> google.protobuf.Timestamp
|
||||
4, // 1: flow.FlowEvent.flow_fields:type_name -> flow.FlowFields
|
||||
0, // 2: flow.FlowFields.type:type_name -> flow.Type
|
||||
1, // 3: flow.FlowFields.direction:type_name -> flow.Direction
|
||||
5, // 4: flow.FlowFields.port_info:type_name -> flow.PortInfo
|
||||
6, // 5: flow.FlowFields.icmp_info:type_name -> flow.ICMPInfo
|
||||
2, // 6: flow.FlowService.Events:input_type -> flow.FlowEvent
|
||||
3, // 7: flow.FlowService.Events:output_type -> flow.FlowEventAck
|
||||
7, // [7:8] is the sub-list for method output_type
|
||||
6, // [6:7] is the sub-list for method input_type
|
||||
6, // [6:6] is the sub-list for extension type_name
|
||||
6, // [6:6] is the sub-list for extension extendee
|
||||
0, // [0:6] is the sub-list for field type_name
|
||||
7, // 2: flow.FlowEvent.window_start:type_name -> google.protobuf.Timestamp
|
||||
7, // 3: flow.FlowEvent.window_end:type_name -> google.protobuf.Timestamp
|
||||
0, // 4: flow.FlowFields.type:type_name -> flow.Type
|
||||
1, // 5: flow.FlowFields.direction:type_name -> flow.Direction
|
||||
5, // 6: flow.FlowFields.port_info:type_name -> flow.PortInfo
|
||||
6, // 7: flow.FlowFields.icmp_info:type_name -> flow.ICMPInfo
|
||||
2, // 8: flow.FlowService.Events:input_type -> flow.FlowEvent
|
||||
3, // 9: flow.FlowService.Events:output_type -> flow.FlowEventAck
|
||||
9, // [9:10] is the sub-list for method output_type
|
||||
8, // [8:9] is the sub-list for method input_type
|
||||
8, // [8:8] is the sub-list for extension type_name
|
||||
8, // [8:8] is the sub-list for extension extendee
|
||||
0, // [0:8] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_flow_proto_init() }
|
||||
|
||||
@@ -24,6 +24,9 @@ message FlowEvent {
|
||||
FlowFields flow_fields = 4;
|
||||
|
||||
bool isInitiator = 5;
|
||||
|
||||
google.protobuf.Timestamp window_start = 6;
|
||||
google.protobuf.Timestamp window_end = 7;
|
||||
}
|
||||
|
||||
message FlowEventAck {
|
||||
@@ -75,6 +78,9 @@ message FlowFields {
|
||||
bytes source_resource_id = 14;
|
||||
bytes dest_resource_id = 15;
|
||||
|
||||
uint64 num_of_starts = 16;
|
||||
uint64 num_of_ends = 17;
|
||||
uint64 num_of_drops = 18;
|
||||
}
|
||||
|
||||
// Flow event types
|
||||
|
||||
@@ -1,4 +1,8 @@
|
||||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.6.1
|
||||
// - protoc v3.21.9
|
||||
// source: flow.proto
|
||||
|
||||
package proto
|
||||
|
||||
@@ -11,15 +15,19 @@ import (
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.32.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion7
|
||||
// Requires gRPC-Go v1.64.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion9
|
||||
|
||||
const (
|
||||
FlowService_Events_FullMethodName = "/flow.FlowService/Events"
|
||||
)
|
||||
|
||||
// FlowServiceClient is the client API for FlowService service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type FlowServiceClient interface {
|
||||
// Client to receiver streams of events and acknowledgements
|
||||
Events(ctx context.Context, opts ...grpc.CallOption) (FlowService_EventsClient, error)
|
||||
Events(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[FlowEvent, FlowEventAck], error)
|
||||
}
|
||||
|
||||
type flowServiceClient struct {
|
||||
@@ -30,54 +38,40 @@ func NewFlowServiceClient(cc grpc.ClientConnInterface) FlowServiceClient {
|
||||
return &flowServiceClient{cc}
|
||||
}
|
||||
|
||||
func (c *flowServiceClient) Events(ctx context.Context, opts ...grpc.CallOption) (FlowService_EventsClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, &FlowService_ServiceDesc.Streams[0], "/flow.FlowService/Events", opts...)
|
||||
func (c *flowServiceClient) Events(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[FlowEvent, FlowEventAck], error) {
|
||||
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
|
||||
stream, err := c.cc.NewStream(ctx, &FlowService_ServiceDesc.Streams[0], FlowService_Events_FullMethodName, cOpts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &flowServiceEventsClient{stream}
|
||||
x := &grpc.GenericClientStream[FlowEvent, FlowEventAck]{ClientStream: stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type FlowService_EventsClient interface {
|
||||
Send(*FlowEvent) error
|
||||
Recv() (*FlowEventAck, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type flowServiceEventsClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsClient) Send(m *FlowEvent) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsClient) Recv() (*FlowEventAck, error) {
|
||||
m := new(FlowEventAck)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type FlowService_EventsClient = grpc.BidiStreamingClient[FlowEvent, FlowEventAck]
|
||||
|
||||
// FlowServiceServer is the server API for FlowService service.
|
||||
// All implementations must embed UnimplementedFlowServiceServer
|
||||
// for forward compatibility
|
||||
// for forward compatibility.
|
||||
type FlowServiceServer interface {
|
||||
// Client to receiver streams of events and acknowledgements
|
||||
Events(FlowService_EventsServer) error
|
||||
Events(grpc.BidiStreamingServer[FlowEvent, FlowEventAck]) error
|
||||
mustEmbedUnimplementedFlowServiceServer()
|
||||
}
|
||||
|
||||
// UnimplementedFlowServiceServer must be embedded to have forward compatible implementations.
|
||||
type UnimplementedFlowServiceServer struct {
|
||||
}
|
||||
// UnimplementedFlowServiceServer must be embedded to have
|
||||
// forward compatible implementations.
|
||||
//
|
||||
// NOTE: this should be embedded by value instead of pointer to avoid a nil
|
||||
// pointer dereference when methods are called.
|
||||
type UnimplementedFlowServiceServer struct{}
|
||||
|
||||
func (UnimplementedFlowServiceServer) Events(FlowService_EventsServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Events not implemented")
|
||||
func (UnimplementedFlowServiceServer) Events(grpc.BidiStreamingServer[FlowEvent, FlowEventAck]) error {
|
||||
return status.Error(codes.Unimplemented, "method Events not implemented")
|
||||
}
|
||||
func (UnimplementedFlowServiceServer) mustEmbedUnimplementedFlowServiceServer() {}
|
||||
func (UnimplementedFlowServiceServer) testEmbeddedByValue() {}
|
||||
|
||||
// UnsafeFlowServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to FlowServiceServer will
|
||||
@@ -87,34 +81,22 @@ type UnsafeFlowServiceServer interface {
|
||||
}
|
||||
|
||||
func RegisterFlowServiceServer(s grpc.ServiceRegistrar, srv FlowServiceServer) {
|
||||
// If the following call panics, it indicates UnimplementedFlowServiceServer was
|
||||
// embedded by pointer and is nil. This will cause panics if an
|
||||
// unimplemented method is ever invoked, so we test this at initialization
|
||||
// time to prevent it from happening at runtime later due to I/O.
|
||||
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
|
||||
t.testEmbeddedByValue()
|
||||
}
|
||||
s.RegisterService(&FlowService_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _FlowService_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(FlowServiceServer).Events(&flowServiceEventsServer{stream})
|
||||
return srv.(FlowServiceServer).Events(&grpc.GenericServerStream[FlowEvent, FlowEventAck]{ServerStream: stream})
|
||||
}
|
||||
|
||||
type FlowService_EventsServer interface {
|
||||
Send(*FlowEventAck) error
|
||||
Recv() (*FlowEvent, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type flowServiceEventsServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsServer) Send(m *FlowEventAck) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *flowServiceEventsServer) Recv() (*FlowEvent, error) {
|
||||
m := new(FlowEvent)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
|
||||
type FlowService_EventsServer = grpc.BidiStreamingServer[FlowEvent, FlowEventAck]
|
||||
|
||||
// FlowService_ServiceDesc is the grpc.ServiceDesc for FlowService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
|
||||
@@ -10,6 +10,7 @@ fi
|
||||
|
||||
old_pwd=$(pwd)
|
||||
script_path=$(dirname $(realpath "$0"))
|
||||
echo "$script_path"
|
||||
cd "$script_path"
|
||||
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26
|
||||
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1
|
||||
|
||||
@@ -2765,6 +2765,28 @@ components:
|
||||
type: integer
|
||||
description: "Number of packets transmitted."
|
||||
example: 5
|
||||
num_of_starts:
|
||||
type: integer
|
||||
description: "Number of start events."
|
||||
example: 3
|
||||
num_of_ends:
|
||||
type: integer
|
||||
description: "Number of end events."
|
||||
example: 4
|
||||
num_of_drops:
|
||||
type: integer
|
||||
description: "Number of drop events."
|
||||
example: 5
|
||||
window_start:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Timestamp of the start of the aggregation window.
|
||||
example: 2025-03-20T16:23:58.125397Z
|
||||
window_end:
|
||||
type: string
|
||||
format: date-time
|
||||
description: Timestamp of the end of the aggregation window.
|
||||
example: 2025-03-20T16:23:58.125397Z
|
||||
events:
|
||||
type: array
|
||||
description: "List of events that are correlated to this flow (e.g., start, end)."
|
||||
@@ -2786,6 +2808,11 @@ components:
|
||||
- rx_packets
|
||||
- tx_bytes
|
||||
- tx_packets
|
||||
- num_of_starts
|
||||
- num_of_ends
|
||||
- num_of_drops
|
||||
- window_start
|
||||
- window_end
|
||||
- events
|
||||
NetworkTrafficEventsResponse:
|
||||
type: object
|
||||
|
||||
@@ -2905,9 +2905,18 @@ type NetworkTrafficEvent struct {
|
||||
Events []NetworkTrafficSubEvent `json:"events"`
|
||||
|
||||
// FlowId FlowID is the ID of the connection flow. Not unique because it can be the same for multiple events (e.g., start and end of the connection).
|
||||
FlowId string `json:"flow_id"`
|
||||
Icmp NetworkTrafficICMP `json:"icmp"`
|
||||
Policy NetworkTrafficPolicy `json:"policy"`
|
||||
FlowId string `json:"flow_id"`
|
||||
Icmp NetworkTrafficICMP `json:"icmp"`
|
||||
|
||||
// NumOfDrops Number of drop events.
|
||||
NumOfDrops int `json:"num_of_drops"`
|
||||
|
||||
// NumOfEnds Number of end events.
|
||||
NumOfEnds int `json:"num_of_ends"`
|
||||
|
||||
// NumOfStarts Number of start events.
|
||||
NumOfStarts int `json:"num_of_starts"`
|
||||
Policy NetworkTrafficPolicy `json:"policy"`
|
||||
|
||||
// Protocol Protocol is the protocol of the traffic (e.g. 1 = ICMP, 6 = TCP, 17 = UDP, etc.).
|
||||
Protocol int `json:"protocol"`
|
||||
@@ -2928,6 +2937,12 @@ type NetworkTrafficEvent struct {
|
||||
// TxPackets Number of packets transmitted.
|
||||
TxPackets int `json:"tx_packets"`
|
||||
User NetworkTrafficUser `json:"user"`
|
||||
|
||||
// WindowEnd Timestamp of the end of the aggregation window.
|
||||
WindowEnd time.Time `json:"window_end"`
|
||||
|
||||
// WindowStart Timestamp of the start of the aggregation window.
|
||||
WindowStart time.Time `json:"window_start"`
|
||||
}
|
||||
|
||||
// NetworkTrafficEventsResponse defines model for NetworkTrafficEventsResponse.
|
||||
|
||||
Reference in New Issue
Block a user