diff --git a/client/internal/netflow/logger/logger.go b/client/internal/netflow/logger/logger.go index 6569b75b7..403bdb45c 100644 --- a/client/internal/netflow/logger/logger.go +++ b/client/internal/netflow/logger/logger.go @@ -74,7 +74,7 @@ func (l *Logger) startReceiver() { log.Info("flow Memory store receiver stopped") return case eventFields := <-c: - id := uuid.NewString() + id := uuid.New() event := types.Event{ ID: id, EventFields: *eventFields, @@ -109,7 +109,7 @@ func (l *Logger) GetEvents() []*types.Event { return l.Store.GetEvents() } -func (l *Logger) DeleteEvents(ids []string) { +func (l *Logger) DeleteEvents(ids []uuid.UUID) { l.Store.DeleteEvents(ids) } diff --git a/client/internal/netflow/manager.go b/client/internal/netflow/manager.go index ed5655f8a..6c5e26711 100644 --- a/client/internal/netflow/manager.go +++ b/client/internal/netflow/manager.go @@ -8,6 +8,7 @@ import ( "sync" "time" + "github.com/google/uuid" log "github.com/sirupsen/logrus" "google.golang.org/protobuf/types/known/timestamppb" @@ -169,7 +170,7 @@ func (m *Manager) startSender() { func (m *Manager) receiveACKs(client *client.GRPCClient) { err := client.Receive(m.ctx, m.flowConfig.Interval, func(ack *proto.FlowEventAck) error { log.Tracef("received flow event ack: %s", ack.EventId) - m.logger.DeleteEvents([]string{ack.EventId}) + m.logger.DeleteEvents([]uuid.UUID{uuid.UUID(ack.EventId)}) return nil }) @@ -192,7 +193,7 @@ func (m *Manager) send(event *nftypes.Event) error { func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent { protoEvent := &proto.FlowEvent{ - EventId: event.ID, + EventId: event.ID[:], Timestamp: timestamppb.New(event.Timestamp), PublicKey: publicKey, FlowFields: &proto.FlowFields{ diff --git a/client/internal/netflow/store/memory.go b/client/internal/netflow/store/memory.go index 7fa08b510..b695a0a12 100644 --- a/client/internal/netflow/store/memory.go +++ b/client/internal/netflow/store/memory.go @@ -5,18 +5,20 @@ import ( "golang.org/x/exp/maps" + "github.com/google/uuid" + "github.com/netbirdio/netbird/client/internal/netflow/types" ) func NewMemoryStore() *Memory { return &Memory{ - events: make(map[string]*types.Event), + events: make(map[uuid.UUID]*types.Event), } } type Memory struct { mux sync.Mutex - events map[string]*types.Event + events map[uuid.UUID]*types.Event } func (m *Memory) StoreEvent(event *types.Event) { @@ -41,7 +43,7 @@ func (m *Memory) GetEvents() []*types.Event { return events } -func (m *Memory) DeleteEvents(ids []string) { +func (m *Memory) DeleteEvents(ids []uuid.UUID) { m.mux.Lock() defer m.mux.Unlock() for _, id := range ids { diff --git a/client/internal/netflow/types/types.go b/client/internal/netflow/types/types.go index f57fca1c9..fc90a53d6 100644 --- a/client/internal/netflow/types/types.go +++ b/client/internal/netflow/types/types.go @@ -64,7 +64,7 @@ const ( ) type Event struct { - ID string + ID uuid.UUID Timestamp time.Time EventFields } @@ -111,7 +111,7 @@ type FlowLogger interface { // GetEvents returns all stored events GetEvents() []*Event // DeleteEvents deletes events from the store - DeleteEvents([]string) + DeleteEvents([]uuid.UUID) // Close closes the logger Close() // Enable enables the flow logger receiver @@ -126,7 +126,7 @@ type Store interface { // GetEvents returns all stored events GetEvents() []*Event // DeleteEvents deletes events from the store - DeleteEvents([]string) + DeleteEvents([]uuid.UUID) // Close closes the store Close() } diff --git a/flow/proto/flow.pb.go b/flow/proto/flow.pb.go index b2857c2b2..1a94fd103 100644 --- a/flow/proto/flow.pb.go +++ b/flow/proto/flow.pb.go @@ -130,7 +130,7 @@ type FlowEvent struct { unknownFields protoimpl.UnknownFields // Unique client event identifier - EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` + EventId []byte `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` // When the event occurred Timestamp *timestamppb.Timestamp `protobuf:"bytes,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` // Public key of the sending peer @@ -170,11 +170,11 @@ func (*FlowEvent) Descriptor() ([]byte, []int) { return file_flow_proto_rawDescGZIP(), []int{0} } -func (x *FlowEvent) GetEventId() string { +func (x *FlowEvent) GetEventId() []byte { if x != nil { return x.EventId } - return "" + return nil } func (x *FlowEvent) GetTimestamp() *timestamppb.Timestamp { @@ -204,7 +204,7 @@ type FlowEventAck struct { unknownFields protoimpl.UnknownFields // Unique client event identifier that has been ack'ed - EventId string `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` + EventId []byte `protobuf:"bytes,1,opt,name=event_id,json=eventId,proto3" json:"event_id,omitempty"` } func (x *FlowEventAck) Reset() { @@ -239,11 +239,11 @@ func (*FlowEventAck) Descriptor() ([]byte, []int) { return file_flow_proto_rawDescGZIP(), []int{1} } -func (x *FlowEventAck) GetEventId() string { +func (x *FlowEventAck) GetEventId() []byte { if x != nil { return x.EventId } - return "" + return nil } type FlowFields struct { @@ -548,7 +548,7 @@ var file_flow_proto_rawDesc = []byte{ 0x62, 0x75, 0x66, 0x2f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x22, 0xb2, 0x01, 0x0a, 0x09, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, + 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x38, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x74, 0x69, 0x6d, @@ -559,7 +559,7 @@ var file_flow_proto_rawDesc = []byte{ 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x52, 0x0a, 0x66, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x29, 0x0a, 0x0c, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x49, 0x64, 0x22, 0xc4, 0x03, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x74, diff --git a/flow/proto/flow.proto b/flow/proto/flow.proto index 742de82f9..a4bd56137 100644 --- a/flow/proto/flow.proto +++ b/flow/proto/flow.proto @@ -13,7 +13,7 @@ service FlowService { message FlowEvent { // Unique client event identifier - string event_id = 1; + bytes event_id = 1; // When the event occurred google.protobuf.Timestamp timestamp = 2; @@ -26,7 +26,7 @@ message FlowEvent { message FlowEventAck { // Unique client event identifier that has been ack'ed - string event_id = 1; + bytes event_id = 1; } message FlowFields {