Compare commits

...

12 Commits

Author SHA1 Message Date
Dmitri
b21f7f7d6a updated event aggregation test
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-12 15:14:17 +02:00
Dmitri
98ce097ecb update test to validate event aggregation over tcp, udp, icmp, and icmpv6
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-11 15:34:03 +02:00
Dmitri
598558c77e Merge remote-tracking branch 'origin/main' into dmitri-event-aggregation 2026-06-11 13:32:28 +02:00
Dmitri
d9d585e1d4 pacifying linter
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-11 12:19:58 +02:00
Dmitri
a593e32a1d removed inadvertenly added google proto files
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-11 12:07:29 +02:00
Dmitri
12a8943b99 regenerated proto files
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-11 12:03:46 +02:00
Dmitri
42e0007f4a fixes based on sonarcube checks
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-11 10:18:08 +02:00
Dmitri
8f99362a25 added tracking of the number of start-, drop, and end-events in an aggregation window
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-10 16:06:29 +02:00
Dmitri
101ae3ca77 added manager integration test
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-10 14:48:58 +02:00
Dmitri
b654a75a43 added tcp-aggregation test
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-10 10:33:37 +02:00
Dmitri
243e93477f initial support for aggregation of events
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-09 15:54:39 +02:00
Dmitri
60bcf7dfc3 added an implementation of aggregating memory store
Signed-off-by: Dmitri <dmitri.external@netbird.io>
2026-06-09 11:38:02 +02:00
10 changed files with 847 additions and 296 deletions

View File

@@ -27,7 +27,7 @@ type Logger struct {
wgIfaceNetV6 netip.Prefix
dnsCollection atomic.Bool
exitNodeCollection atomic.Bool
Store types.Store
Store types.AggregatingStore
}
func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix) *Logger {
@@ -35,7 +35,7 @@ func New(statusRecorder *peer.Status, wgIfaceIPNet, wgIfaceIPNetV6 netip.Prefix)
statusRecorder: statusRecorder,
wgIfaceNet: wgIfaceIPNet,
wgIfaceNetV6: wgIfaceIPNetV6,
Store: store.NewMemoryStore(),
Store: store.NewAggregatingMemoryStore(),
}
}
@@ -125,6 +125,10 @@ func (l *Logger) stop() {
l.mux.Unlock()
}
func (l *Logger) ResetAggregationWindow() types.FlowEventAggregator {
return l.Store.ResetAggregationWindow()
}
func (l *Logger) GetEvents() []*types.Event {
return l.Store.GetEvents()
}

View File

@@ -9,12 +9,14 @@ import (
"sync"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/netbirdio/netbird/client/internal/netflow/conntrack"
"github.com/netbirdio/netbird/client/internal/netflow/logger"
"github.com/netbirdio/netbird/client/internal/netflow/store"
nftypes "github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/client/internal/peer"
"github.com/netbirdio/netbird/flow/client"
@@ -23,14 +25,16 @@ import (
// Manager handles netflow tracking and logging
type Manager struct {
mux sync.Mutex
shutdownWg sync.WaitGroup
logger nftypes.FlowLogger
flowConfig *nftypes.FlowConfig
conntrack nftypes.ConnTracker
receiverClient *client.GRPCClient
publicKey []byte
cancel context.CancelFunc
mux sync.Mutex
shutdownWg sync.WaitGroup
logger nftypes.FlowLogger
flowConfig *nftypes.FlowConfig
conntrack nftypes.ConnTracker
receiverClient *client.GRPCClient
eventsWithoutAcks nftypes.Store
publicKey []byte
cancel context.CancelFunc
retryInterval time.Duration
}
// NewManager creates a new netflow manager
@@ -48,9 +52,11 @@ func NewManager(iface nftypes.IFaceMapper, publicKey []byte, statusRecorder *pee
}
return &Manager{
logger: flowLogger,
conntrack: ct,
publicKey: publicKey,
logger: flowLogger,
conntrack: ct,
publicKey: publicKey,
retryInterval: time.Second,
eventsWithoutAcks: store.NewMemoryStore(),
}
}
@@ -107,7 +113,7 @@ func (m *Manager) resetClient() error {
ctx, cancel := context.WithCancel(context.Background())
m.cancel = cancel
m.shutdownWg.Add(2)
m.shutdownWg.Add(3)
go func() {
defer m.shutdownWg.Done()
m.receiveACKs(ctx, flowClient)
@@ -116,6 +122,10 @@ func (m *Manager) resetClient() error {
defer m.shutdownWg.Done()
m.startSender(ctx)
}()
go func() {
defer m.shutdownWg.Done()
m.startRetries(ctx)
}()
return nil
}
@@ -207,13 +217,15 @@ func (m *Manager) startSender(ctx context.Context) {
case <-ctx.Done():
return
case <-ticker.C:
events := m.logger.GetEvents()
collectedEvents := m.logger.ResetAggregationWindow()
events := collectedEvents.GetAggregatedEvents()
for _, event := range events {
if err := m.send(event); err != nil {
log.Errorf("failed to send flow event to server: %v", err)
continue
} else {
log.Tracef("sent flow event: %s", event.ID)
}
log.Tracef("sent flow event: %s", event.ID)
m.eventsWithoutAcks.StoreEvent(event)
}
}
}
@@ -227,7 +239,7 @@ func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
return nil
}
log.Tracef("received flow event ack: %s", id)
m.logger.DeleteEvents([]uuid.UUID{id})
m.eventsWithoutAcks.DeleteEvents([]uuid.UUID{id})
return nil
})
@@ -236,6 +248,41 @@ func (m *Manager) receiveACKs(ctx context.Context, client *client.GRPCClient) {
}
}
func (m *Manager) startRetries(ctx context.Context) {
ticker := time.NewTimer(m.retryInterval)
retryBackoff := backoff.WithContext(&backoff.ExponentialBackOff{
InitialInterval: 1 * time.Second,
RandomizationFactor: 0.5,
Multiplier: 1.7,
MaxInterval: m.flowConfig.Interval / 2,
MaxElapsedTime: 3 * 30 * 24 * time.Hour, // 3 months
Stop: backoff.Stop,
Clock: backoff.SystemClock,
}, ctx)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
for _, e := range m.eventsWithoutAcks.GetEvents() {
if e.Timestamp.Add(time.Second).After(time.Now()) {
// grace period on retries to avoid early retries
// do not retry if the event is less than 1 sec old
continue
}
if err := m.send(e); err != nil {
ticker = time.NewTimer(retryBackoff.NextBackOff()) //nolint:staticcheck,wastedassign
break
}
}
retryBackoff.Reset()
ticker = time.NewTimer(time.Second)
}
}
}
func (m *Manager) send(event *nftypes.Event) error {
m.mux.Lock()
client := m.receiverClient

View File

@@ -0,0 +1,291 @@
package netflow
import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"slices"
"testing"
"time"
"github.com/google/uuid"
"github.com/netbirdio/netbird/client/iface/wgaddr"
"github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/netbirdio/netbird/flow/proto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
)
type testServer struct {
proto.UnimplementedFlowServiceServer
events chan *proto.FlowEvent
acks chan *proto.FlowEventAck
grpcSrv *grpc.Server
addr string
handlerDone chan struct{} // signaled each time Events() exits
handlerStarted chan struct{} // signaled each time Events() begins
}
func newTestServer(t *testing.T) *testServer {
listener, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)
s := &testServer{
events: make(chan *proto.FlowEvent, 100),
acks: make(chan *proto.FlowEventAck, 100),
grpcSrv: grpc.NewServer(),
addr: listener.Addr().String(),
handlerDone: make(chan struct{}, 10),
handlerStarted: make(chan struct{}, 10),
}
proto.RegisterFlowServiceServer(s.grpcSrv, s)
go func() {
if err := s.grpcSrv.Serve(listener); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
t.Logf("server error: %v", err)
}
}()
t.Cleanup(func() {
s.grpcSrv.Stop()
})
return s
}
func (s *testServer) Events(stream proto.FlowService_EventsServer) error {
defer func() {
select {
case s.handlerDone <- struct{}{}:
default:
}
}()
err := stream.Send(&proto.FlowEventAck{IsInitiator: true})
if err != nil {
return err
}
select {
case s.handlerStarted <- struct{}{}:
default:
}
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
go func() {
defer cancel()
for {
event, err := stream.Recv()
if err != nil {
return
}
if !event.IsInitiator {
select {
case s.events <- event:
case <-ctx.Done():
return
}
}
}
}()
for {
select {
case ack := <-s.acks:
if err := stream.Send(ack); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func TestSendEventReceiveAck(t *testing.T) {
_, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
server := newTestServer(t)
manager := createManager(t, server.addr, 60*time.Second) // set high to prevent retries in this test
defer manager.Close()
assert.Eventually(t, func() bool {
select {
case <-server.handlerStarted:
return true
default:
return false
}
}, 3*time.Second, 100*time.Millisecond)
event1 := types.EventFields{
FlowID: uuid.New(),
Type: types.TypeStart,
Direction: types.Ingress,
DestIP: ipAddr("172.16.1.2"),
DestPort: 2345,
Protocol: 6,
}
manager.logger.StoreEvent(event1)
event2 := types.EventFields{
FlowID: uuid.New(),
Type: types.TypeStart,
Direction: types.Ingress,
DestIP: ipAddr("172.16.1.1"),
DestPort: 1234,
Protocol: 6,
}
manager.logger.StoreEvent(event2)
// verify the server received logged events
serverSideEvents := make([]*proto.FlowEvent, 0)
assert.Eventually(t, func() bool {
select {
case event := <-server.events:
serverSideEvents = append(serverSideEvents, event)
if len(serverSideEvents) == 2 {
return true
}
default:
if len(serverSideEvents) == 2 {
return true
}
}
return false
}, 5*time.Second, 100*time.Millisecond)
serverSideFlowIds := make([]uuid.UUID, 0, 2)
slices.Values(serverSideEvents)(func(e *proto.FlowEvent) bool {
id, err := uuid.FromBytes(e.FlowFields.FlowId)
assert.NoError(t, err)
serverSideFlowIds = append(serverSideFlowIds, id)
return true
})
assert.ElementsMatch(t, []uuid.UUID{event1.FlowID, event2.FlowID}, serverSideFlowIds)
// verify the manager tracks un-acked events
unackedEvents := manager.eventsWithoutAcks.GetEvents()
assert.Len(t, unackedEvents, 2)
flowIds := make([]uuid.UUID, 0)
slices.Values(unackedEvents)(func(e *types.Event) bool {
flowIds = append(flowIds, e.FlowID)
return true
})
assert.ElementsMatch(t, flowIds, []uuid.UUID{event1.FlowID, event2.FlowID})
}
// verify handling of retries:
// - unacked events are retried
// - when acks arrive, events are removed from the un-acked event tracker
func TestRetryEvents(t *testing.T) {
_, cancel := context.WithTimeout(context.Background(), 10*time.Second)
t.Cleanup(cancel)
server := newTestServer(t)
manager := createManager(t, server.addr, time.Second) // set low to start retries sooner
defer manager.Close()
assert.Eventually(t, func() bool {
select {
case <-server.handlerStarted:
return true
default:
return false
}
}, 3*time.Second, 100*time.Millisecond)
event1 := types.EventFields{
FlowID: uuid.New(),
Type: types.TypeStart,
Direction: types.Ingress,
DestIP: ipAddr("172.16.1.2"),
DestPort: 2345,
Protocol: 6,
}
manager.logger.StoreEvent(event1)
event2 := types.EventFields{
FlowID: uuid.New(),
Type: types.TypeStart,
Direction: types.Ingress,
DestIP: ipAddr("172.16.1.1"),
DestPort: 1234,
Protocol: 6,
}
manager.logger.StoreEvent(event2)
// verify the server received retries of logged events
serverSideEvents := make([]*proto.FlowEvent, 0)
func() {
c := time.After(2500 * time.Millisecond)
for {
select {
case event := <-server.events:
serverSideEvents = append(serverSideEvents, event)
case <-c:
return
}
}
}()
assert.True(t, len(serverSideEvents) > 2) // must see retries
uniqueServerSideEvents := make(map[uuid.UUID]*proto.FlowEvent)
slices.Values(serverSideEvents)(func(e *proto.FlowEvent) bool {
id, err := uuid.FromBytes(e.FlowFields.FlowId)
assert.NoError(t, err)
uniqueServerSideEvents[id] = e
return true
})
assert.Contains(t, uniqueServerSideEvents, event1.FlowID)
assert.Contains(t, uniqueServerSideEvents, event2.FlowID)
// ack events
server.acks <- &proto.FlowEventAck{EventId: uniqueServerSideEvents[event1.FlowID].EventId}
server.acks <- &proto.FlowEventAck{EventId: uniqueServerSideEvents[event2.FlowID].EventId}
assert.EventuallyWithT(t, func(c *assert.CollectT) {
unackedEvents := manager.eventsWithoutAcks.GetEvents()
assert.Empty(c, unackedEvents)
}, 3*time.Second, 100*time.Millisecond)
}
func createManager(t *testing.T, serverAddr string, retryInterval time.Duration) *Manager {
t.Helper()
mockIFace := &mockIFaceMapper{
address: wgaddr.Address{
Network: netip.MustParsePrefix("192.168.1.1/32"),
},
isUserspaceBind: true,
}
publicKey := []byte("test-public-key")
manager := NewManager(mockIFace, publicKey, nil)
manager.retryInterval = retryInterval
initialConfig := &types.FlowConfig{
Enabled: true,
URL: fmt.Sprintf("http://%s", serverAddr),
TokenPayload: "initial-payload",
TokenSignature: "initial-signature",
Interval: 500 * time.Millisecond,
}
err := manager.Update(initialConfig)
require.NoError(t, err)
return manager
}
func ipAddr(a string) netip.Addr {
addr, _ := netip.ParseAddr(a)
return addr
}

View File

@@ -0,0 +1,190 @@
package store
import (
"math/rand"
"net/netip"
"testing"
"time"
"github.com/google/uuid"
"github.com/netbirdio/netbird/client/internal/netflow/types"
"github.com/stretchr/testify/assert"
)
var random = rand.New(rand.NewSource(time.Now().UnixNano()))
func TestFlowAggregation(t *testing.T) {
var protocols = []types.Protocol{types.ICMP, types.ICMPv6, types.TCP, types.UDP}
var tests = []struct {
description string
eventTypes []types.Type
}{
{
description: "start and stop",
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
},
{
description: "start and drop",
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
},
{
description: "start only",
eventTypes: []types.Type{types.TypeStart},
},
{
description: "drop only",
eventTypes: []types.Type{types.TypeDrop},
}}
for _, protocol := range protocols {
for _, tt := range tests {
t.Run(tt.description+" "+protocol.String(), func(t *testing.T) {
store := NewAggregatingMemoryStore()
allExpected := make([]*types.Event, 0)
for i := 0; i < 2; i++ {
inEvents, expected := generateEvents(tt.eventTypes, protocol, types.Ingress, 0)
for _, e := range inEvents {
store.StoreEvent(e)
}
allExpected = append(allExpected, expected)
}
events := store.GetAggregatedEvents()
assert.ElementsMatch(t, events, allExpected)
})
}
}
}
func TestIcmpEventAggregation(t *testing.T) {
var protocols = []types.Protocol{types.ICMP, types.ICMPv6}
var icmpTypes = []uint8{1, 2, 3}
var tests = []struct {
description string
eventTypes []types.Type
}{
{
description: "start and stop",
eventTypes: []types.Type{types.TypeStart, types.TypeEnd},
},
{
description: "start and drop",
eventTypes: []types.Type{types.TypeStart, types.TypeDrop},
},
{
description: "start only",
eventTypes: []types.Type{types.TypeStart},
},
{
description: "drop only",
eventTypes: []types.Type{types.TypeDrop},
}}
for _, protocol := range protocols {
for _, tt := range tests {
t.Run(tt.description+" "+protocol.String(), func(t *testing.T) {
store := NewAggregatingMemoryStore()
allExpected := make([]*types.Event, 0)
for _, icmpType := range icmpTypes {
events, expected := generateEvents(tt.eventTypes, protocol, types.Ingress, icmpType)
for _, e := range events {
store.StoreEvent(e)
}
allExpected = append(allExpected, expected)
}
aggregatedEvents := store.GetAggregatedEvents()
assert.Len(t, aggregatedEvents, len(allExpected))
assert.ElementsMatch(t, aggregatedEvents, allExpected)
})
}
}
}
func ipAddr(a string) netip.Addr {
addr, _ := netip.ParseAddr(a)
return addr
}
func generateEvents(eventTypes []types.Type, protocol types.Protocol, direction types.Direction, icmpType uint8) ([]*types.Event, *types.Event) {
var rxPackets, txPackets, rxBytes, txBytes uint64
inEvents := make([]*types.Event, 0)
ts := time.Now()
flowId := uuid.New()
srcIp := ipAddr("1.1.1.1")
srcPort := uint16(random.Uint32() >> 16)
dstIp := ipAddr("2.2.2.2")
dstPort := uint16(random.Uint32() >> 16)
for idx, eventType := range eventTypes {
e := &types.Event{
ID: uuid.New(),
Timestamp: ts.Add(time.Duration(idx) * time.Second),
EventFields: types.EventFields{
FlowID: flowId,
Type: eventType,
Protocol: protocol,
RuleID: []byte("rule-id-1"),
Direction: direction,
SourceIP: srcIp,
SourcePort: srcPort,
DestIP: dstIp,
DestPort: dstPort,
SourceResourceID: []byte("source-resource-id"),
DestResourceID: []byte("dest-resource-id"),
RxPackets: random.Uint64(),
TxPackets: random.Uint64(),
RxBytes: random.Uint64(),
TxBytes: random.Uint64(),
}}
rxBytes += e.RxBytes
txBytes += e.TxBytes
rxPackets += e.RxPackets
txPackets += e.TxPackets
inEvents = append(inEvents, e)
if protocol == types.ICMP || protocol == types.ICMPv6 {
e.ICMPType = icmpType
}
}
var start, end, drop uint64
for _, eventType := range eventTypes {
switch eventType {
case types.TypeStart:
start += 1
case types.TypeDrop:
drop += 1
case types.TypeEnd:
end += 1
}
}
aggregatedEvent := &types.Event{
ID: inEvents[0].ID,
Timestamp: inEvents[0].Timestamp,
EventFields: types.EventFields{
FlowID: flowId,
Type: inEvents[0].Type,
Protocol: inEvents[0].Protocol,
RuleID: []byte("rule-id-1"),
Direction: inEvents[0].Direction,
SourceIP: srcIp,
SourcePort: srcPort,
DestIP: dstIp,
DestPort: dstPort,
SourceResourceID: []byte("source-resource-id"),
DestResourceID: []byte("dest-resource-id"),
RxPackets: rxPackets,
TxPackets: txPackets,
RxBytes: rxBytes,
TxBytes: txBytes,
NumOfStarts: start,
NumOfEnds: end,
NumOfDrops: drop,
}}
if protocol == types.ICMP || protocol == types.ICMPv6 {
aggregatedEvent.ICMPType = icmpType
}
return inEvents, aggregatedEvent
}

View File

@@ -1,10 +1,13 @@
package store
import (
"maps"
"net/netip"
"slices"
"sync"
"time"
"github.com/google/uuid"
"github.com/netbirdio/netbird/client/internal/netflow/types"
)
@@ -19,6 +22,10 @@ type Memory struct {
events map[uuid.UUID]*types.Event
}
type AggregatingMemory struct {
Memory
}
func (m *Memory) StoreEvent(event *types.Event) {
m.mux.Lock()
defer m.mux.Unlock()
@@ -48,3 +55,78 @@ func (m *Memory) DeleteEvents(ids []uuid.UUID) {
delete(m.events, id)
}
}
func NewAggregatingMemoryStore() *AggregatingMemory {
return &AggregatingMemory{Memory{events: make(map[uuid.UUID]*types.Event)}}
}
func (am *AggregatingMemory) ResetAggregationWindow() types.FlowEventAggregator {
am.mux.Lock()
defer am.mux.Unlock()
toret := AggregatingMemory{Memory: Memory{events: am.events}}
am.events = make(map[uuid.UUID]*types.Event)
return &toret
}
type aggregationKey struct {
destAddr netip.Addr
destPort uint16
protocol uint8
icmpType uint8
unique int64 // used to prevent aggregation on non icmp/udp/tcp events
}
func (am *AggregatingMemory) GetAggregatedEvents() []*types.Event {
aggregated := make(map[aggregationKey]*types.Event)
for _, v := range am.events {
lookupKey := aggregationKey{destAddr: v.DestIP, destPort: v.DestPort, protocol: uint8(v.Protocol), icmpType: v.ICMPType}
if _, ok := aggregated[lookupKey]; !ok {
aggregated[lookupKey] = v.Clone()
event := aggregated[lookupKey]
if event.Protocol != types.ICMP && event.Protocol != types.ICMPv6 && event.Protocol != types.UDP && event.Protocol != types.TCP {
lookupKey.unique = time.Now().UnixNano() // to make the lookup key unique so we don't aggregate on it
continue
}
switch event.Type {
case types.TypeStart:
event.NumOfStarts += 1
case types.TypeDrop:
event.NumOfDrops += 1
case types.TypeEnd:
event.NumOfEnds += 1
}
continue
}
aggregatedEvent := aggregated[lookupKey]
if aggregatedEvent.Protocol != types.ICMP && aggregatedEvent.Protocol != types.ICMPv6 && aggregatedEvent.Protocol != types.UDP && aggregatedEvent.Protocol != types.TCP {
continue // we don't aggregate this type of events; shouldn't ever get here
}
// track the number of connections, duration?, open and close events?
aggregatedEvent.RxBytes += v.RxBytes
aggregatedEvent.RxPackets += v.RxPackets
aggregatedEvent.TxBytes += v.TxBytes
aggregatedEvent.TxPackets += v.TxPackets
switch v.Type {
case types.TypeStart:
aggregatedEvent.NumOfStarts += 1
case types.TypeDrop:
aggregatedEvent.NumOfDrops += 1
case types.TypeEnd:
aggregatedEvent.NumOfEnds += 1
}
if aggregatedEvent.Timestamp.Compare(v.Timestamp) > 0 {
aggregatedEvent.Timestamp = v.Timestamp
aggregatedEvent.ID = v.ID
aggregatedEvent.Type = v.Type
}
// do we aggregate icmp by code?
}
return slices.Collect(maps.Values(aggregated)) // could return an iterator instead here
}

View File

@@ -2,6 +2,7 @@ package types
import (
"net/netip"
"slices"
"strconv"
"time"
@@ -92,6 +93,17 @@ type EventFields struct {
TxPackets uint64
RxBytes uint64
TxBytes uint64
NumOfStarts uint64
NumOfEnds uint64
NumOfDrops uint64
}
func (e *Event) Clone() *Event {
toret := *e
toret.RuleID = slices.Clone(e.RuleID)
toret.SourceResourceID = slices.Clone(e.SourceResourceID)
toret.DestResourceID = slices.Clone(e.DestResourceID)
return &toret
}
type FlowConfig struct {
@@ -114,13 +126,15 @@ type FlowManager interface {
GetLogger() FlowLogger
}
type FlowEventAggregator interface {
ResetAggregationWindow() FlowEventAggregator
GetAggregatedEvents() []*Event
}
type FlowLogger interface {
ResetAggregationWindow() FlowEventAggregator
// StoreEvent stores a flow event
StoreEvent(flowEvent EventFields)
// GetEvents returns all stored events
GetEvents() []*Event
// DeleteEvents deletes events from the store
DeleteEvents([]uuid.UUID)
// Close closes the logger
Close()
// Enable enables the flow logger receiver
@@ -140,6 +154,11 @@ type Store interface {
Close()
}
type AggregatingStore interface {
FlowEventAggregator
Store
}
// ConnTracker defines the interface for connection tracking functionality
type ConnTracker interface {
// Start begins tracking connections by listening for conntrack events.

View File

@@ -1,7 +1,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.26.0
// protoc v3.21.9
// protoc-gen-go v1.36.11
// protoc v7.34.1
// source: flow.proto
package proto
@@ -12,6 +12,7 @@ import (
timestamppb "google.golang.org/protobuf/types/known/timestamppb"
reflect "reflect"
sync "sync"
unsafe "unsafe"
)
const (
@@ -125,27 +126,24 @@ func (Direction) EnumDescriptor() ([]byte, []int) {
}
type FlowEvent struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Unique client event identifier
EventId []byte `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
// When the event occurred
Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// Public key of the sending peer
PublicKey []byte `protobuf:"bytes,3,opt,name=public_key,json=publicKey,proto3" json:"public_key,omitempty"`
FlowFields *FlowFields `protobuf:"bytes,4,opt,name=flow_fields,json=flowFields,proto3" json:"flow_fields,omitempty"`
IsInitiator bool `protobuf:"varint,5,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
PublicKey []byte `protobuf:"bytes,3,opt,name=public_key,json=publicKey,proto3" json:"public_key,omitempty"`
FlowFields *FlowFields `protobuf:"bytes,4,opt,name=flow_fields,json=flowFields,proto3" json:"flow_fields,omitempty"`
IsInitiator bool `protobuf:"varint,5,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *FlowEvent) Reset() {
*x = FlowEvent{}
if protoimpl.UnsafeEnabled {
mi := &file_flow_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_flow_proto_msgTypes[0]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *FlowEvent) String() string {
@@ -156,7 +154,7 @@ func (*FlowEvent) ProtoMessage() {}
func (x *FlowEvent) ProtoReflect() protoreflect.Message {
mi := &file_flow_proto_msgTypes[0]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -207,22 +205,19 @@ func (x *FlowEvent) GetIsInitiator() bool {
}
type FlowEventAck struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Unique client event identifier that has been ack'ed
EventId []byte `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
IsInitiator bool `protobuf:"varint,2,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
EventId []byte `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"`
IsInitiator bool `protobuf:"varint,2,opt,name=isInitiator,proto3" json:"isInitiator,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *FlowEventAck) Reset() {
*x = FlowEventAck{}
if protoimpl.UnsafeEnabled {
mi := &file_flow_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_flow_proto_msgTypes[1]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *FlowEventAck) String() string {
@@ -233,7 +228,7 @@ func (*FlowEventAck) ProtoMessage() {}
func (x *FlowEventAck) ProtoReflect() protoreflect.Message {
mi := &file_flow_proto_msgTypes[1]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -263,10 +258,7 @@ func (x *FlowEventAck) GetIsInitiator() bool {
}
type FlowFields struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
state protoimpl.MessageState `protogen:"open.v1"`
// Unique client flow session identifier
FlowId []byte `protobuf:"bytes,1,opt,name=flow_id,json=flowId,proto3" json:"flow_id,omitempty"`
// Flow type
@@ -283,7 +275,7 @@ type FlowFields struct {
DestIp []byte `protobuf:"bytes,7,opt,name=dest_ip,json=destIp,proto3" json:"dest_ip,omitempty"`
// Layer 4 -specific information
//
// Types that are assignable to ConnectionInfo:
// Types that are valid to be assigned to ConnectionInfo:
//
// *FlowFields_PortInfo
// *FlowFields_IcmpInfo
@@ -297,15 +289,18 @@ type FlowFields struct {
// Resource ID
SourceResourceId []byte `protobuf:"bytes,14,opt,name=source_resource_id,json=sourceResourceId,proto3" json:"source_resource_id,omitempty"`
DestResourceId []byte `protobuf:"bytes,15,opt,name=dest_resource_id,json=destResourceId,proto3" json:"dest_resource_id,omitempty"`
NumOfStarts uint64 `protobuf:"varint,16,opt,name=num_of_starts,json=numOfStarts,proto3" json:"num_of_starts,omitempty"`
NumOfEnds uint64 `protobuf:"varint,17,opt,name=num_of_ends,json=numOfEnds,proto3" json:"num_of_ends,omitempty"`
NumOfDrops uint64 `protobuf:"varint,18,opt,name=num_of_drops,json=numOfDrops,proto3" json:"num_of_drops,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *FlowFields) Reset() {
*x = FlowFields{}
if protoimpl.UnsafeEnabled {
mi := &file_flow_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_flow_proto_msgTypes[2]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *FlowFields) String() string {
@@ -316,7 +311,7 @@ func (*FlowFields) ProtoMessage() {}
func (x *FlowFields) ProtoReflect() protoreflect.Message {
mi := &file_flow_proto_msgTypes[2]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -380,23 +375,27 @@ func (x *FlowFields) GetDestIp() []byte {
return nil
}
func (m *FlowFields) GetConnectionInfo() isFlowFields_ConnectionInfo {
if m != nil {
return m.ConnectionInfo
func (x *FlowFields) GetConnectionInfo() isFlowFields_ConnectionInfo {
if x != nil {
return x.ConnectionInfo
}
return nil
}
func (x *FlowFields) GetPortInfo() *PortInfo {
if x, ok := x.GetConnectionInfo().(*FlowFields_PortInfo); ok {
return x.PortInfo
if x != nil {
if x, ok := x.ConnectionInfo.(*FlowFields_PortInfo); ok {
return x.PortInfo
}
}
return nil
}
func (x *FlowFields) GetIcmpInfo() *ICMPInfo {
if x, ok := x.GetConnectionInfo().(*FlowFields_IcmpInfo); ok {
return x.IcmpInfo
if x != nil {
if x, ok := x.ConnectionInfo.(*FlowFields_IcmpInfo); ok {
return x.IcmpInfo
}
}
return nil
}
@@ -443,6 +442,27 @@ func (x *FlowFields) GetDestResourceId() []byte {
return nil
}
func (x *FlowFields) GetNumOfStarts() uint64 {
if x != nil {
return x.NumOfStarts
}
return 0
}
func (x *FlowFields) GetNumOfEnds() uint64 {
if x != nil {
return x.NumOfEnds
}
return 0
}
func (x *FlowFields) GetNumOfDrops() uint64 {
if x != nil {
return x.NumOfDrops
}
return 0
}
type isFlowFields_ConnectionInfo interface {
isFlowFields_ConnectionInfo()
}
@@ -463,21 +483,18 @@ func (*FlowFields_IcmpInfo) isFlowFields_ConnectionInfo() {}
// TCP/UDP port information
type PortInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
SourcePort uint32 `protobuf:"varint,1,opt,name=source_port,json=sourcePort,proto3" json:"source_port,omitempty"`
DestPort uint32 `protobuf:"varint,2,opt,name=dest_port,json=destPort,proto3" json:"dest_port,omitempty"`
unknownFields protoimpl.UnknownFields
SourcePort uint32 `protobuf:"varint,1,opt,name=source_port,json=sourcePort,proto3" json:"source_port,omitempty"`
DestPort uint32 `protobuf:"varint,2,opt,name=dest_port,json=destPort,proto3" json:"dest_port,omitempty"`
sizeCache protoimpl.SizeCache
}
func (x *PortInfo) Reset() {
*x = PortInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_flow_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_flow_proto_msgTypes[3]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *PortInfo) String() string {
@@ -488,7 +505,7 @@ func (*PortInfo) ProtoMessage() {}
func (x *PortInfo) ProtoReflect() protoreflect.Message {
mi := &file_flow_proto_msgTypes[3]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -519,21 +536,18 @@ func (x *PortInfo) GetDestPort() uint32 {
// ICMP message information
type ICMPInfo struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
IcmpType uint32 `protobuf:"varint,1,opt,name=icmp_type,json=icmpType,proto3" json:"icmp_type,omitempty"`
IcmpCode uint32 `protobuf:"varint,2,opt,name=icmp_code,json=icmpCode,proto3" json:"icmp_code,omitempty"`
unknownFields protoimpl.UnknownFields
IcmpType uint32 `protobuf:"varint,1,opt,name=icmp_type,json=icmpType,proto3" json:"icmp_type,omitempty"`
IcmpCode uint32 `protobuf:"varint,2,opt,name=icmp_code,json=icmpCode,proto3" json:"icmp_code,omitempty"`
sizeCache protoimpl.SizeCache
}
func (x *ICMPInfo) Reset() {
*x = ICMPInfo{}
if protoimpl.UnsafeEnabled {
mi := &file_flow_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
mi := &file_flow_proto_msgTypes[4]
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
ms.StoreMessageInfo(mi)
}
func (x *ICMPInfo) String() string {
@@ -544,7 +558,7 @@ func (*ICMPInfo) ProtoMessage() {}
func (x *ICMPInfo) ProtoReflect() protoreflect.Message {
mi := &file_flow_proto_msgTypes[4]
if protoimpl.UnsafeEnabled && x != nil {
if x != nil {
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
if ms.LoadMessageInfo() == nil {
ms.StoreMessageInfo(mi)
@@ -575,102 +589,83 @@ func (x *ICMPInfo) GetIcmpCode() uint32 {
var File_flow_proto protoreflect.FileDescriptor
var file_flow_proto_rawDesc = []byte{
0x0a, 0x0a, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x04, 0x66, 0x6c,
0x6f, 0x77, 0x1a, 0x1f, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72,
0x6f, 0x74, 0x6f, 0x22, 0xd4, 0x01, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e,
0x74, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20,
0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09,
0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32,
0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d,
0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63,
0x5f, 0x6b, 0x65, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x09, 0x70, 0x75, 0x62, 0x6c,
0x69, 0x63, 0x4b, 0x65, 0x79, 0x12, 0x31, 0x0a, 0x0b, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x66, 0x69,
0x65, 0x6c, 0x64, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x10, 0x2e, 0x66, 0x6c, 0x6f,
0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x52, 0x0a, 0x66, 0x6c,
0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e,
0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69,
0x73, 0x49, 0x6e, 0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x4b, 0x0a, 0x0c, 0x46, 0x6c,
0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76,
0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76,
0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x20, 0x0a, 0x0b, 0x69, 0x73, 0x49, 0x6e, 0x69, 0x74, 0x69,
0x61, 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0b, 0x69, 0x73, 0x49, 0x6e,
0x69, 0x74, 0x69, 0x61, 0x74, 0x6f, 0x72, 0x22, 0x9c, 0x04, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77,
0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69,
0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12,
0x1e, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e,
0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x54, 0x79, 0x70, 0x65, 0x52, 0x04, 0x74, 0x79, 0x70, 0x65, 0x12,
0x17, 0x0a, 0x07, 0x72, 0x75, 0x6c, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0c,
0x52, 0x06, 0x72, 0x75, 0x6c, 0x65, 0x49, 0x64, 0x12, 0x2d, 0x0a, 0x09, 0x64, 0x69, 0x72, 0x65,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0f, 0x2e, 0x66, 0x6c,
0x6f, 0x77, 0x2e, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x09, 0x64, 0x69,
0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x1a, 0x0a, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x63, 0x6f, 0x6c, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x63, 0x6f, 0x6c, 0x12, 0x1b, 0x0a, 0x09, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x70,
0x18, 0x06, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x08, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x70,
0x12, 0x17, 0x0a, 0x07, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x69, 0x70, 0x18, 0x07, 0x20, 0x01, 0x28,
0x0c, 0x52, 0x06, 0x64, 0x65, 0x73, 0x74, 0x49, 0x70, 0x12, 0x2d, 0x0a, 0x09, 0x70, 0x6f, 0x72,
0x74, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x08, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66,
0x6c, 0x6f, 0x77, 0x2e, 0x50, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08,
0x70, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x2d, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70,
0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x18, 0x09, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0e, 0x2e, 0x66, 0x6c,
0x6f, 0x77, 0x2e, 0x49, 0x43, 0x4d, 0x50, 0x49, 0x6e, 0x66, 0x6f, 0x48, 0x00, 0x52, 0x08, 0x69,
0x63, 0x6d, 0x70, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1d, 0x0a, 0x0a, 0x72, 0x78, 0x5f, 0x70, 0x61,
0x63, 0x6b, 0x65, 0x74, 0x73, 0x18, 0x0a, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x72, 0x78, 0x50,
0x61, 0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x1d, 0x0a, 0x0a, 0x74, 0x78, 0x5f, 0x70, 0x61, 0x63,
0x6b, 0x65, 0x74, 0x73, 0x18, 0x0b, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x78, 0x50, 0x61,
0x63, 0x6b, 0x65, 0x74, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65,
0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x72, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73,
0x12, 0x19, 0x0a, 0x08, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01,
0x28, 0x04, 0x52, 0x07, 0x74, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x73,
0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69,
0x64, 0x18, 0x0e, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52,
0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x10, 0x64, 0x65, 0x73,
0x74, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0f, 0x20,
0x01, 0x28, 0x0c, 0x52, 0x0e, 0x64, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63,
0x65, 0x49, 0x64, 0x42, 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f,
0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x22, 0x48, 0x0a, 0x08, 0x50, 0x6f, 0x72, 0x74, 0x49, 0x6e,
0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x6f, 0x72,
0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50,
0x6f, 0x72, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74,
0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x64, 0x65, 0x73, 0x74, 0x50, 0x6f, 0x72, 0x74,
0x22, 0x44, 0x0a, 0x08, 0x49, 0x43, 0x4d, 0x50, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1b, 0x0a, 0x09,
0x69, 0x63, 0x6d, 0x70, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52,
0x08, 0x69, 0x63, 0x6d, 0x70, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x63, 0x6d,
0x70, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x69, 0x63,
0x6d, 0x70, 0x43, 0x6f, 0x64, 0x65, 0x2a, 0x45, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x10,
0x0a, 0x0c, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00,
0x12, 0x0e, 0x0a, 0x0a, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x10, 0x01,
0x12, 0x0c, 0x0a, 0x08, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x4e, 0x44, 0x10, 0x02, 0x12, 0x0d,
0x0a, 0x09, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x44, 0x52, 0x4f, 0x50, 0x10, 0x03, 0x2a, 0x3b, 0x0a,
0x09, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x15, 0x0a, 0x11, 0x44, 0x49,
0x52, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10,
0x00, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x0a,
0x0a, 0x06, 0x45, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x02, 0x32, 0x42, 0x0a, 0x0b, 0x46, 0x6c,
0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x33, 0x0a, 0x06, 0x45, 0x76, 0x65,
0x6e, 0x74, 0x73, 0x12, 0x0f, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x45,
0x76, 0x65, 0x6e, 0x74, 0x1a, 0x12, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77,
0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08,
0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
const file_flow_proto_rawDesc = "" +
"\n" +
"\n" +
"flow.proto\x12\x04flow\x1a\x1fgoogle/protobuf/timestamp.proto\"\xd4\x01\n" +
"\tFlowEvent\x12\x19\n" +
"\bevent_id\x18\x01 \x01(\fR\aeventId\x128\n" +
"\ttimestamp\x18\x02 \x01(\v2\x1a.google.protobuf.TimestampR\ttimestamp\x12\x1d\n" +
"\n" +
"public_key\x18\x03 \x01(\fR\tpublicKey\x121\n" +
"\vflow_fields\x18\x04 \x01(\v2\x10.flow.FlowFieldsR\n" +
"flowFields\x12 \n" +
"\visInitiator\x18\x05 \x01(\bR\visInitiator\"K\n" +
"\fFlowEventAck\x12\x19\n" +
"\bevent_id\x18\x01 \x01(\fR\aeventId\x12 \n" +
"\visInitiator\x18\x02 \x01(\bR\visInitiator\"\x82\x05\n" +
"\n" +
"FlowFields\x12\x17\n" +
"\aflow_id\x18\x01 \x01(\fR\x06flowId\x12\x1e\n" +
"\x04type\x18\x02 \x01(\x0e2\n" +
".flow.TypeR\x04type\x12\x17\n" +
"\arule_id\x18\x03 \x01(\fR\x06ruleId\x12-\n" +
"\tdirection\x18\x04 \x01(\x0e2\x0f.flow.DirectionR\tdirection\x12\x1a\n" +
"\bprotocol\x18\x05 \x01(\rR\bprotocol\x12\x1b\n" +
"\tsource_ip\x18\x06 \x01(\fR\bsourceIp\x12\x17\n" +
"\adest_ip\x18\a \x01(\fR\x06destIp\x12-\n" +
"\tport_info\x18\b \x01(\v2\x0e.flow.PortInfoH\x00R\bportInfo\x12-\n" +
"\ticmp_info\x18\t \x01(\v2\x0e.flow.ICMPInfoH\x00R\bicmpInfo\x12\x1d\n" +
"\n" +
"rx_packets\x18\n" +
" \x01(\x04R\trxPackets\x12\x1d\n" +
"\n" +
"tx_packets\x18\v \x01(\x04R\ttxPackets\x12\x19\n" +
"\brx_bytes\x18\f \x01(\x04R\arxBytes\x12\x19\n" +
"\btx_bytes\x18\r \x01(\x04R\atxBytes\x12,\n" +
"\x12source_resource_id\x18\x0e \x01(\fR\x10sourceResourceId\x12(\n" +
"\x10dest_resource_id\x18\x0f \x01(\fR\x0edestResourceId\x12\"\n" +
"\rnum_of_starts\x18\x10 \x01(\x04R\vnumOfStarts\x12\x1e\n" +
"\vnum_of_ends\x18\x11 \x01(\x04R\tnumOfEnds\x12 \n" +
"\fnum_of_drops\x18\x12 \x01(\x04R\n" +
"numOfDropsB\x11\n" +
"\x0fconnection_info\"H\n" +
"\bPortInfo\x12\x1f\n" +
"\vsource_port\x18\x01 \x01(\rR\n" +
"sourcePort\x12\x1b\n" +
"\tdest_port\x18\x02 \x01(\rR\bdestPort\"D\n" +
"\bICMPInfo\x12\x1b\n" +
"\ticmp_type\x18\x01 \x01(\rR\bicmpType\x12\x1b\n" +
"\ticmp_code\x18\x02 \x01(\rR\bicmpCode*E\n" +
"\x04Type\x12\x10\n" +
"\fTYPE_UNKNOWN\x10\x00\x12\x0e\n" +
"\n" +
"TYPE_START\x10\x01\x12\f\n" +
"\bTYPE_END\x10\x02\x12\r\n" +
"\tTYPE_DROP\x10\x03*;\n" +
"\tDirection\x12\x15\n" +
"\x11DIRECTION_UNKNOWN\x10\x00\x12\v\n" +
"\aINGRESS\x10\x01\x12\n" +
"\n" +
"\x06EGRESS\x10\x022B\n" +
"\vFlowService\x123\n" +
"\x06Events\x12\x0f.flow.FlowEvent\x1a\x12.flow.FlowEventAck\"\x00(\x010\x01B\bZ\x06/protob\x06proto3"
var (
file_flow_proto_rawDescOnce sync.Once
file_flow_proto_rawDescData = file_flow_proto_rawDesc
file_flow_proto_rawDescData []byte
)
func file_flow_proto_rawDescGZIP() []byte {
file_flow_proto_rawDescOnce.Do(func() {
file_flow_proto_rawDescData = protoimpl.X.CompressGZIP(file_flow_proto_rawDescData)
file_flow_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_flow_proto_rawDesc), len(file_flow_proto_rawDesc)))
})
return file_flow_proto_rawDescData
}
var file_flow_proto_enumTypes = make([]protoimpl.EnumInfo, 2)
var file_flow_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
var file_flow_proto_goTypes = []interface{}{
var file_flow_proto_goTypes = []any{
(Type)(0), // 0: flow.Type
(Direction)(0), // 1: flow.Direction
(*FlowEvent)(nil), // 2: flow.FlowEvent
@@ -701,69 +696,7 @@ func file_flow_proto_init() {
if File_flow_proto != nil {
return
}
if !protoimpl.UnsafeEnabled {
file_flow_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FlowEvent); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_flow_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FlowEventAck); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_flow_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*FlowFields); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_flow_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*PortInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
file_flow_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
switch v := v.(*ICMPInfo); i {
case 0:
return &v.state
case 1:
return &v.sizeCache
case 2:
return &v.unknownFields
default:
return nil
}
}
}
file_flow_proto_msgTypes[2].OneofWrappers = []interface{}{
file_flow_proto_msgTypes[2].OneofWrappers = []any{
(*FlowFields_PortInfo)(nil),
(*FlowFields_IcmpInfo)(nil),
}
@@ -771,7 +704,7 @@ func file_flow_proto_init() {
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_flow_proto_rawDesc,
RawDescriptor: unsafe.Slice(unsafe.StringData(file_flow_proto_rawDesc), len(file_flow_proto_rawDesc)),
NumEnums: 2,
NumMessages: 5,
NumExtensions: 0,
@@ -783,7 +716,6 @@ func file_flow_proto_init() {
MessageInfos: file_flow_proto_msgTypes,
}.Build()
File_flow_proto = out.File
file_flow_proto_rawDesc = nil
file_flow_proto_goTypes = nil
file_flow_proto_depIdxs = nil
}

View File

@@ -75,6 +75,9 @@ message FlowFields {
bytes source_resource_id = 14;
bytes dest_resource_id = 15;
uint64 num_of_starts = 16;
uint64 num_of_ends = 17;
uint64 num_of_drops = 18;
}
// Flow event types

View File

@@ -1,4 +1,8 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.6.1
// - protoc v7.34.1
// source: flow.proto
package proto
@@ -11,15 +15,19 @@ import (
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// Requires gRPC-Go v1.64.0 or later.
const _ = grpc.SupportPackageIsVersion9
const (
FlowService_Events_FullMethodName = "/flow.FlowService/Events"
)
// FlowServiceClient is the client API for FlowService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type FlowServiceClient interface {
// Client to receiver streams of events and acknowledgements
Events(ctx context.Context, opts ...grpc.CallOption) (FlowService_EventsClient, error)
Events(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[FlowEvent, FlowEventAck], error)
}
type flowServiceClient struct {
@@ -30,54 +38,40 @@ func NewFlowServiceClient(cc grpc.ClientConnInterface) FlowServiceClient {
return &flowServiceClient{cc}
}
func (c *flowServiceClient) Events(ctx context.Context, opts ...grpc.CallOption) (FlowService_EventsClient, error) {
stream, err := c.cc.NewStream(ctx, &FlowService_ServiceDesc.Streams[0], "/flow.FlowService/Events", opts...)
func (c *flowServiceClient) Events(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[FlowEvent, FlowEventAck], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &FlowService_ServiceDesc.Streams[0], FlowService_Events_FullMethodName, cOpts...)
if err != nil {
return nil, err
}
x := &flowServiceEventsClient{stream}
x := &grpc.GenericClientStream[FlowEvent, FlowEventAck]{ClientStream: stream}
return x, nil
}
type FlowService_EventsClient interface {
Send(*FlowEvent) error
Recv() (*FlowEventAck, error)
grpc.ClientStream
}
type flowServiceEventsClient struct {
grpc.ClientStream
}
func (x *flowServiceEventsClient) Send(m *FlowEvent) error {
return x.ClientStream.SendMsg(m)
}
func (x *flowServiceEventsClient) Recv() (*FlowEventAck, error) {
m := new(FlowEventAck)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type FlowService_EventsClient = grpc.BidiStreamingClient[FlowEvent, FlowEventAck]
// FlowServiceServer is the server API for FlowService service.
// All implementations must embed UnimplementedFlowServiceServer
// for forward compatibility
// for forward compatibility.
type FlowServiceServer interface {
// Client to receiver streams of events and acknowledgements
Events(FlowService_EventsServer) error
Events(grpc.BidiStreamingServer[FlowEvent, FlowEventAck]) error
mustEmbedUnimplementedFlowServiceServer()
}
// UnimplementedFlowServiceServer must be embedded to have forward compatible implementations.
type UnimplementedFlowServiceServer struct {
}
// UnimplementedFlowServiceServer must be embedded to have
// forward compatible implementations.
//
// NOTE: this should be embedded by value instead of pointer to avoid a nil
// pointer dereference when methods are called.
type UnimplementedFlowServiceServer struct{}
func (UnimplementedFlowServiceServer) Events(FlowService_EventsServer) error {
return status.Errorf(codes.Unimplemented, "method Events not implemented")
func (UnimplementedFlowServiceServer) Events(grpc.BidiStreamingServer[FlowEvent, FlowEventAck]) error {
return status.Error(codes.Unimplemented, "method Events not implemented")
}
func (UnimplementedFlowServiceServer) mustEmbedUnimplementedFlowServiceServer() {}
func (UnimplementedFlowServiceServer) testEmbeddedByValue() {}
// UnsafeFlowServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to FlowServiceServer will
@@ -87,34 +81,22 @@ type UnsafeFlowServiceServer interface {
}
func RegisterFlowServiceServer(s grpc.ServiceRegistrar, srv FlowServiceServer) {
// If the following call panics, it indicates UnimplementedFlowServiceServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
if t, ok := srv.(interface{ testEmbeddedByValue() }); ok {
t.testEmbeddedByValue()
}
s.RegisterService(&FlowService_ServiceDesc, srv)
}
func _FlowService_Events_Handler(srv interface{}, stream grpc.ServerStream) error {
return srv.(FlowServiceServer).Events(&flowServiceEventsServer{stream})
return srv.(FlowServiceServer).Events(&grpc.GenericServerStream[FlowEvent, FlowEventAck]{ServerStream: stream})
}
type FlowService_EventsServer interface {
Send(*FlowEventAck) error
Recv() (*FlowEvent, error)
grpc.ServerStream
}
type flowServiceEventsServer struct {
grpc.ServerStream
}
func (x *flowServiceEventsServer) Send(m *FlowEventAck) error {
return x.ServerStream.SendMsg(m)
}
func (x *flowServiceEventsServer) Recv() (*FlowEvent, error) {
m := new(FlowEvent)
if err := x.ServerStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.
type FlowService_EventsServer = grpc.BidiStreamingServer[FlowEvent, FlowEventAck]
// FlowService_ServiceDesc is the grpc.ServiceDesc for FlowService service.
// It's only intended for direct use with grpc.RegisterService,

View File

@@ -10,8 +10,9 @@ fi
old_pwd=$(pwd)
script_path=$(dirname $(realpath "$0"))
echo "$script_path"
cd "$script_path"
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1
#go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.26
#go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.1
protoc -I ./ ./flow.proto --go_out=../ --go-grpc_out=../
cd "$old_pwd"