diff --git a/client/firewall/uspfilter/conntrack/common_test.go b/client/firewall/uspfilter/conntrack/common_test.go index ca1136e6f..f28cd56e5 100644 --- a/client/firewall/uspfilter/conntrack/common_test.go +++ b/client/firewall/uspfilter/conntrack/common_test.go @@ -12,7 +12,7 @@ import ( ) var logger = log.NewFromLogrus(logrus.StandardLogger()) -var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger() +var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger() // Memory pressure tests func BenchmarkMemoryPressure(b *testing.B) { diff --git a/client/firewall/uspfilter/uspfilter_test.go b/client/firewall/uspfilter/uspfilter_test.go index e525b6246..a095a5e39 100644 --- a/client/firewall/uspfilter/uspfilter_test.go +++ b/client/firewall/uspfilter/uspfilter_test.go @@ -24,7 +24,7 @@ import ( ) var logger = log.NewFromLogrus(logrus.StandardLogger()) -var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger() +var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger() type IFaceMock struct { SetFilterFunc func(device.PacketFilter) error diff --git a/client/internal/acl/manager_test.go b/client/internal/acl/manager_test.go index b54a105b3..ca79111ef 100644 --- a/client/internal/acl/manager_test.go +++ b/client/internal/acl/manager_test.go @@ -15,7 +15,7 @@ import ( mgmProto "github.com/netbirdio/netbird/management/proto" ) -var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger() +var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger() func TestDefaultManager(t *testing.T) { networkMap := &mgmProto.NetworkMap{ diff --git a/client/internal/dns/server_test.go b/client/internal/dns/server_test.go index 8871158ed..c7eeb7870 100644 --- a/client/internal/dns/server_test.go +++ b/client/internal/dns/server_test.go @@ -31,7 +31,7 @@ import ( "github.com/netbirdio/netbird/formatter" ) -var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}).GetLogger() +var flowLogger = netflow.NewManager(context.Background(), nil, []byte{}, nil).GetLogger() type mocWGIface struct { filter device.PacketFilter diff --git a/client/internal/engine.go b/client/internal/engine.go index 2627e5232..5fc93c52e 100644 --- a/client/internal/engine.go +++ b/client/internal/engine.go @@ -353,7 +353,7 @@ func (e *Engine) Start() error { // start flow manager right after interface creation publicKey := e.config.WgPrivateKey.PublicKey() - e.flowManager = netflow.NewManager(e.ctx, e.wgInterface, publicKey[:]) + e.flowManager = netflow.NewManager(e.ctx, e.wgInterface, publicKey[:], e.statusRecorder) if e.config.RosenpassEnabled { log.Infof("rosenpass is enabled") diff --git a/client/internal/netflow/logger/logger.go b/client/internal/netflow/logger/logger.go index 403bdb45c..7dde01d06 100644 --- a/client/internal/netflow/logger/logger.go +++ b/client/internal/netflow/logger/logger.go @@ -2,6 +2,7 @@ package logger import ( "context" + "net" "sync" "sync/atomic" "time" @@ -11,6 +12,7 @@ import ( "github.com/netbirdio/netbird/client/internal/netflow/store" "github.com/netbirdio/netbird/client/internal/netflow/types" + "github.com/netbirdio/netbird/client/internal/peer" ) type rcvChan chan *types.EventFields @@ -21,15 +23,20 @@ type Logger struct { enabled atomic.Bool rcvChan atomic.Pointer[rcvChan] cancelReceiver context.CancelFunc + statusRecorder *peer.Status + wgIfaceIPNet net.IPNet Store types.Store } -func New(ctx context.Context) *Logger { +func New(ctx context.Context, statusRecorder *peer.Status, wgIfaceIPNet net.IPNet) *Logger { + ctx, cancel := context.WithCancel(ctx) return &Logger{ - ctx: ctx, - cancel: cancel, - Store: store.NewMemoryStore(), + ctx: ctx, + cancel: cancel, + statusRecorder: statusRecorder, + wgIfaceIPNet: wgIfaceIPNet, + Store: store.NewMemoryStore(), } } @@ -80,6 +87,17 @@ func (l *Logger) startReceiver() { EventFields: *eventFields, Timestamp: time.Now(), } + + if event.Direction == types.Ingress { + if !l.wgIfaceIPNet.Contains(net.IP(event.SourceIP.AsSlice())) { + event.SourceResourceID = []byte(l.statusRecorder.CheckRoutes(event.SourceIP)) + } + } else if event.Direction == types.Egress { + if !l.wgIfaceIPNet.Contains(net.IP(event.DestIP.AsSlice())) { + event.DestResourceID = []byte(l.statusRecorder.CheckRoutes(event.DestIP)) + } + } + l.Store.StoreEvent(&event) } } diff --git a/client/internal/netflow/logger/logger_test.go b/client/internal/netflow/logger/logger_test.go index e986118ec..3ce9d8fd8 100644 --- a/client/internal/netflow/logger/logger_test.go +++ b/client/internal/netflow/logger/logger_test.go @@ -2,6 +2,7 @@ package logger_test import ( "context" + "net" "testing" "time" @@ -12,7 +13,7 @@ import ( ) func TestStore(t *testing.T) { - logger := logger.New(context.Background()) + logger := logger.New(context.Background(), nil, net.IPNet{}) logger.Enable() event := types.EventFields{ diff --git a/client/internal/netflow/manager.go b/client/internal/netflow/manager.go index 6c5e26711..6ef1f4e7d 100644 --- a/client/internal/netflow/manager.go +++ b/client/internal/netflow/manager.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "runtime" "sync" "time" @@ -15,6 +16,7 @@ import ( "github.com/netbirdio/netbird/client/internal/netflow/conntrack" "github.com/netbirdio/netbird/client/internal/netflow/logger" nftypes "github.com/netbirdio/netbird/client/internal/netflow/types" + "github.com/netbirdio/netbird/client/internal/peer" "github.com/netbirdio/netbird/flow/client" "github.com/netbirdio/netbird/flow/proto" ) @@ -31,8 +33,12 @@ type Manager struct { } // NewManager creates a new netflow manager -func NewManager(ctx context.Context, iface nftypes.IFaceMapper, publicKey []byte) *Manager { - flowLogger := logger.New(ctx) +func NewManager(ctx context.Context, iface nftypes.IFaceMapper, publicKey []byte, statusRecorder *peer.Status) *Manager { + var ipNet net.IPNet + if iface != nil { + ipNet = *iface.Address().Network + } + flowLogger := logger.New(ctx, statusRecorder, ipNet) var ct nftypes.ConnTracker if runtime.GOOS == "linux" && iface != nil && !iface.IsUserspaceBind() { @@ -197,17 +203,19 @@ func toProtoEvent(publicKey []byte, event *nftypes.Event) *proto.FlowEvent { Timestamp: timestamppb.New(event.Timestamp), PublicKey: publicKey, FlowFields: &proto.FlowFields{ - FlowId: event.FlowID[:], - RuleId: event.RuleID, - Type: proto.Type(event.Type), - Direction: proto.Direction(event.Direction), - Protocol: uint32(event.Protocol), - SourceIp: event.SourceIP.AsSlice(), - DestIp: event.DestIP.AsSlice(), - RxPackets: event.RxPackets, - TxPackets: event.TxPackets, - RxBytes: event.RxBytes, - TxBytes: event.TxBytes, + FlowId: event.FlowID[:], + RuleId: event.RuleID, + Type: proto.Type(event.Type), + Direction: proto.Direction(event.Direction), + Protocol: uint32(event.Protocol), + SourceIp: event.SourceIP.AsSlice(), + DestIp: event.DestIP.AsSlice(), + RxPackets: event.RxPackets, + TxPackets: event.TxPackets, + RxBytes: event.RxBytes, + TxBytes: event.TxBytes, + SourceResourceId: event.SourceResourceID, + DestResourceId: event.DestResourceID, }, } diff --git a/client/internal/netflow/types/types.go b/client/internal/netflow/types/types.go index 1e61038cf..2787b6284 100644 --- a/client/internal/netflow/types/types.go +++ b/client/internal/netflow/types/types.go @@ -70,21 +70,23 @@ type Event struct { } type EventFields struct { - FlowID uuid.UUID - Type Type - RuleID []byte - Direction Direction - Protocol Protocol - SourceIP netip.Addr - DestIP netip.Addr - SourcePort uint16 - DestPort uint16 - ICMPType uint8 - ICMPCode uint8 - RxPackets uint64 - TxPackets uint64 - RxBytes uint64 - TxBytes uint64 + FlowID uuid.UUID + Type Type + RuleID []byte + Direction Direction + Protocol Protocol + SourceIP netip.Addr + DestIP netip.Addr + SourceResourceID []byte + DestResourceID []byte + SourcePort uint16 + DestPort uint16 + ICMPType uint8 + ICMPCode uint8 + RxPackets uint64 + TxPackets uint64 + RxBytes uint64 + TxBytes uint64 } type FlowConfig struct { diff --git a/client/internal/peer/route.go b/client/internal/peer/route.go new file mode 100644 index 000000000..926d7c442 --- /dev/null +++ b/client/internal/peer/route.go @@ -0,0 +1,73 @@ +package peer + +import ( + "net/netip" + "sync" + + log "github.com/sirupsen/logrus" +) + +type routeIDLookup struct { + localMap sync.Map + remoteMap sync.Map + resolvedIPs sync.Map +} + +func (r *routeIDLookup) AddLocalRouteID(resourceID string, route netip.Prefix) { + _, exists := r.localMap.LoadOrStore(route, resourceID) + if exists { + log.Tracef("resourceID %s already exists in local map", resourceID) + } +} + +func (r *routeIDLookup) RemoveLocalRouteID(route netip.Prefix) { + r.localMap.Delete(route) +} + +func (r *routeIDLookup) AddRemoteRouteID(resourceID string, route netip.Prefix) { + _, exists := r.remoteMap.LoadOrStore(route, resourceID) + if exists { + log.Tracef("resourceID %s already exists in remote map", resourceID) + } +} + +func (r *routeIDLookup) RemoveRemoteRouteID(route netip.Prefix) { + r.remoteMap.Delete(route) +} + +func (r *routeIDLookup) AddResolvedIP(resourceID string, route netip.Prefix) { + r.resolvedIPs.Store(route.Addr(), resourceID) +} + +func (r *routeIDLookup) RemoveResolvedIP(route netip.Prefix) { + r.resolvedIPs.Delete(route.Addr()) +} + +func (r *routeIDLookup) Lookup(ip netip.Addr) string { + resId, ok := r.resolvedIPs.Load(ip) + if ok { + return resId.(string) + } + + var resourceID string + r.localMap.Range(func(key, value interface{}) bool { + if key.(netip.Prefix).Contains(ip) { + resourceID = value.(string) + return false + + } + return true + }) + + if resourceID == "" { + r.remoteMap.Range(func(key, value interface{}) bool { + if key.(netip.Prefix).Contains(ip) { + resourceID = value.(string) + return false + } + return true + }) + } + + return resourceID +} diff --git a/client/internal/peer/status.go b/client/internal/peer/status.go index ee884a76e..b6be827f8 100644 --- a/client/internal/peer/status.go +++ b/client/internal/peer/status.go @@ -176,6 +176,8 @@ type Status struct { eventQueue *EventQueue ingressGwMgr *ingressgw.Manager + + routeIDLookup routeIDLookup } // NewRecorder returns a new Status instance @@ -311,7 +313,7 @@ func (d *Status) UpdatePeerState(receivedState State) error { return nil } -func (d *Status) AddPeerStateRoute(peer string, route string) error { +func (d *Status) AddPeerStateRoute(peer string, route string, resourceId string) error { d.mux.Lock() defer d.mux.Unlock() @@ -323,6 +325,14 @@ func (d *Status) AddPeerStateRoute(peer string, route string) error { peerState.AddRoute(route) d.peers[peer] = peerState + pref, err := netip.ParsePrefix(route) + if err != nil { + log.Errorf("failed to parse prefix %s: %v", route, err) + } else { + + d.routeIDLookup.AddRemoteRouteID(resourceId, pref) + } + // todo: consider to make sense of this notification or not d.notifyPeerListChanged() return nil @@ -340,11 +350,27 @@ func (d *Status) RemovePeerStateRoute(peer string, route string) error { peerState.DeleteRoute(route) d.peers[peer] = peerState + pref, err := netip.ParsePrefix(route) + if err != nil { + log.Errorf("failed to parse prefix %s: %v", route, err) + } else { + d.routeIDLookup.RemoveRemoteRouteID(pref) + } + // todo: consider to make sense of this notification or not d.notifyPeerListChanged() return nil } +// CheckRoutes checks if the source and destination addresses are within the same route +// and returns the resource ID of the route that contains the addresses +func (d *Status) CheckRoutes(ip netip.Addr) (resId string) { + if d == nil { + return + } + return d.routeIDLookup.Lookup(ip) +} + func (d *Status) UpdatePeerICEState(receivedState State) error { d.mux.Lock() defer d.mux.Unlock() @@ -558,6 +584,50 @@ func (d *Status) UpdateLocalPeerState(localPeerState LocalPeerState) { d.notifyAddressChanged() } +// AddLocalPeerStateRoute adds a route to the local peer state +func (d *Status) AddLocalPeerStateRoute(route, resourceId string) { + d.mux.Lock() + defer d.mux.Unlock() + + pref, err := netip.ParsePrefix(route) + if err != nil { + log.Errorf("failed to parse prefix %s: %v", route, err) + return + } + + if d.localPeer.Routes == nil { + d.localPeer.Routes = map[string]struct{}{} + } + + d.localPeer.Routes[route] = struct{}{} + + d.routeIDLookup.AddLocalRouteID(resourceId, pref) +} + +// RemoveLocalPeerStateRoute removes a route from the local peer state +func (d *Status) RemoveLocalPeerStateRoute(route string) { + d.mux.Lock() + defer d.mux.Unlock() + + pref, err := netip.ParsePrefix(route) + if err != nil { + log.Errorf("failed to parse prefix %s: %v", route, err) + return + } + + delete(d.localPeer.Routes, route) + + d.routeIDLookup.RemoveLocalRouteID(pref) +} + +// CleanLocalPeerStateRoutes cleans all routes from the local peer state +func (d *Status) CleanLocalPeerStateRoutes() { + d.mux.Lock() + defer d.mux.Unlock() + + d.localPeer.Routes = map[string]struct{}{} +} + // CleanLocalPeerState cleans local peer status func (d *Status) CleanLocalPeerState() { d.mux.Lock() @@ -641,7 +711,7 @@ func (d *Status) UpdateDNSStates(dnsStates []NSGroupState) { d.nsGroupStates = dnsStates } -func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resolvedDomain domain.Domain, prefixes []netip.Prefix) { +func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resolvedDomain domain.Domain, prefixes []netip.Prefix, resourceId string) { d.mux.Lock() defer d.mux.Unlock() @@ -650,6 +720,10 @@ func (d *Status) UpdateResolvedDomainsStates(originalDomain domain.Domain, resol Prefixes: prefixes, ParentDomain: originalDomain, } + + for _, prefix := range prefixes { + d.routeIDLookup.AddResolvedIP(resourceId, prefix) + } } func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) { @@ -660,6 +734,10 @@ func (d *Status) DeleteResolvedDomainsStates(domain domain.Domain) { for k, v := range d.resolvedDomainsStates { if v.ParentDomain == domain { delete(d.resolvedDomainsStates, k) + + for _, prefix := range v.Prefixes { + d.routeIDLookup.RemoveResolvedIP(prefix) + } } } } diff --git a/client/internal/routemanager/client.go b/client/internal/routemanager/client.go index 6680f727a..847949a53 100644 --- a/client/internal/routemanager/client.go +++ b/client/internal/routemanager/client.go @@ -330,7 +330,7 @@ func (c *clientNetwork) recalculateRouteAndUpdatePeerAndSystem(rsn reason) error c.connectEvent() } - err := c.statusRecorder.AddPeerStateRoute(c.currentChosen.Peer, c.handler.String()) + err := c.statusRecorder.AddPeerStateRoute(c.currentChosen.Peer, c.handler.String(), c.currentChosen.GetResourceID()) if err != nil { return fmt.Errorf("add peer state route: %w", err) } diff --git a/client/internal/routemanager/dnsinterceptor/handler.go b/client/internal/routemanager/dnsinterceptor/handler.go index da1056e2d..2e6e4fede 100644 --- a/client/internal/routemanager/dnsinterceptor/handler.go +++ b/client/internal/routemanager/dnsinterceptor/handler.go @@ -321,7 +321,7 @@ func (d *DnsInterceptor) updateDomainPrefixes(resolvedDomain, originalDomain dom if len(toAdd) > 0 || len(toRemove) > 0 { d.interceptedDomains[resolvedDomain] = newPrefixes originalDomain = domain.Domain(strings.TrimSuffix(string(originalDomain), ".")) - d.statusRecorder.UpdateResolvedDomainsStates(originalDomain, resolvedDomain, newPrefixes) + d.statusRecorder.UpdateResolvedDomainsStates(originalDomain, resolvedDomain, newPrefixes, d.route.GetResourceID()) if len(toAdd) > 0 { log.Debugf("added dynamic route(s) for domain=%s (pattern: domain=%s): %s", diff --git a/client/internal/routemanager/dynamic/route.go b/client/internal/routemanager/dynamic/route.go index 5ef18a47e..079134701 100644 --- a/client/internal/routemanager/dynamic/route.go +++ b/client/internal/routemanager/dynamic/route.go @@ -288,7 +288,7 @@ func (r *Route) updateDynamicRoutes(ctx context.Context, newDomains domainMap) e updatedPrefixes := combinePrefixes(oldPrefixes, removedPrefixes, addedPrefixes) r.dynamicDomains[domain] = updatedPrefixes - r.statusRecorder.UpdateResolvedDomainsStates(domain, domain, updatedPrefixes) + r.statusRecorder.UpdateResolvedDomainsStates(domain, domain, updatedPrefixes, r.route.GetResourceID()) } return nberrors.FormatErrorOrNil(merr) diff --git a/client/internal/routemanager/server_nonandroid.go b/client/internal/routemanager/server_nonandroid.go index 5b6a788f8..ac2233d4e 100644 --- a/client/internal/routemanager/server_nonandroid.go +++ b/client/internal/routemanager/server_nonandroid.go @@ -103,9 +103,7 @@ func (m *serverRouter) removeFromServerNetwork(route *route.Route) error { delete(m.routes, route.ID) - state := m.statusRecorder.GetLocalPeerState() - delete(state.Routes, route.Network.String()) - m.statusRecorder.UpdateLocalPeerState(state) + m.statusRecorder.RemoveLocalPeerStateRoute(route.Network.String()) return nil } @@ -131,18 +129,12 @@ func (m *serverRouter) addToServerNetwork(route *route.Route) error { m.routes[route.ID] = route - state := m.statusRecorder.GetLocalPeerState() - if state.Routes == nil { - state.Routes = map[string]struct{}{} - } - routeStr := route.Network.String() if route.IsDynamic() { routeStr = route.Domains.SafeString() } - state.Routes[routeStr] = struct{}{} - m.statusRecorder.UpdateLocalPeerState(state) + m.statusRecorder.AddLocalPeerStateRoute(routeStr, route.GetResourceID()) return nil } @@ -164,9 +156,7 @@ func (m *serverRouter) cleanUp() { } - state := m.statusRecorder.GetLocalPeerState() - state.Routes = nil - m.statusRecorder.UpdateLocalPeerState(state) + m.statusRecorder.CleanLocalPeerStateRoutes() } func routeToRouterPair(route *route.Route) (firewall.RouterPair, error) { diff --git a/flow/proto/flow.pb.go b/flow/proto/flow.pb.go index 1a94fd103..8b34b0f62 100644 --- a/flow/proto/flow.pb.go +++ b/flow/proto/flow.pb.go @@ -278,6 +278,9 @@ type FlowFields struct { // Number of bytes RxBytes uint64 `protobuf:"varint,12,opt,name=rx_bytes,json=rxBytes,proto3" json:"rx_bytes,omitempty"` TxBytes uint64 `protobuf:"varint,13,opt,name=tx_bytes,json=txBytes,proto3" json:"tx_bytes,omitempty"` + // Resource ID + SourceResourceId []byte `protobuf:"bytes,14,opt,name=source_resource_id,json=sourceResourceId,proto3" json:"source_resource_id,omitempty"` + DestResourceId []byte `protobuf:"bytes,15,opt,name=dest_resource_id,json=destResourceId,proto3" json:"dest_resource_id,omitempty"` } func (x *FlowFields) Reset() { @@ -410,6 +413,20 @@ func (x *FlowFields) GetTxBytes() uint64 { return 0 } +func (x *FlowFields) GetSourceResourceId() []byte { + if x != nil { + return x.SourceResourceId + } + return nil +} + +func (x *FlowFields) GetDestResourceId() []byte { + if x != nil { + return x.DestResourceId + } + return nil +} + type isFlowFields_ConnectionInfo interface { isFlowFields_ConnectionInfo() } @@ -560,7 +577,7 @@ var file_flow_proto_rawDesc = []byte{ 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x22, 0x29, 0x0a, 0x0c, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x12, 0x19, 0x0a, 0x08, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x49, 0x64, 0x22, 0xc4, 0x03, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, + 0x74, 0x49, 0x64, 0x22, 0x9c, 0x04, 0x0a, 0x0a, 0x46, 0x6c, 0x6f, 0x77, 0x46, 0x69, 0x65, 0x6c, 0x64, 0x73, 0x12, 0x17, 0x0a, 0x07, 0x66, 0x6c, 0x6f, 0x77, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x06, 0x66, 0x6c, 0x6f, 0x77, 0x49, 0x64, 0x12, 0x1e, 0x0a, 0x04, 0x74, 0x79, 0x70, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x0a, 0x2e, 0x66, 0x6c, 0x6f, 0x77, @@ -587,31 +604,36 @@ var file_flow_proto_rawDesc = []byte{ 0x73, 0x12, 0x19, 0x0a, 0x08, 0x72, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0c, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x72, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x19, 0x0a, 0x08, 0x74, 0x78, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x0d, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, - 0x74, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x42, 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, - 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x6e, 0x66, 0x6f, 0x22, 0x48, 0x0a, 0x08, 0x50, 0x6f, - 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, - 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x6f, 0x75, - 0x72, 0x63, 0x65, 0x50, 0x6f, 0x72, 0x74, 0x12, 0x1b, 0x0a, 0x09, 0x64, 0x65, 0x73, 0x74, 0x5f, - 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x64, 0x65, 0x73, 0x74, - 0x50, 0x6f, 0x72, 0x74, 0x22, 0x44, 0x0a, 0x08, 0x49, 0x43, 0x4d, 0x50, 0x49, 0x6e, 0x66, 0x6f, - 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, - 0x01, 0x28, 0x0d, 0x52, 0x08, 0x69, 0x63, 0x6d, 0x70, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, - 0x09, 0x69, 0x63, 0x6d, 0x70, 0x5f, 0x63, 0x6f, 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, - 0x52, 0x08, 0x69, 0x63, 0x6d, 0x70, 0x43, 0x6f, 0x64, 0x65, 0x2a, 0x45, 0x0a, 0x04, 0x54, 0x79, - 0x70, 0x65, 0x12, 0x10, 0x0a, 0x0c, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, - 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, - 0x52, 0x54, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x4e, 0x44, - 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x59, 0x50, 0x45, 0x5f, 0x44, 0x52, 0x4f, 0x50, 0x10, - 0x03, 0x2a, 0x3b, 0x0a, 0x09, 0x44, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x15, - 0x0a, 0x11, 0x44, 0x49, 0x52, 0x45, 0x43, 0x54, 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, - 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x49, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, - 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x02, 0x32, 0x42, - 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x33, 0x0a, - 0x06, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, 0x0f, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, - 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x1a, 0x12, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, - 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x41, 0x63, 0x6b, 0x22, 0x00, 0x28, 0x01, - 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x33, + 0x74, 0x78, 0x42, 0x79, 0x74, 0x65, 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x73, 0x6f, 0x75, 0x72, 0x63, + 0x65, 0x5f, 0x72, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0e, 0x20, + 0x01, 0x28, 0x0c, 0x52, 0x10, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x52, 0x65, 0x73, 0x6f, 0x75, + 0x72, 0x63, 0x65, 0x49, 0x64, 0x12, 0x28, 0x0a, 0x10, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x72, 0x65, + 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x69, 0x64, 0x18, 0x0f, 0x20, 0x01, 0x28, 0x0c, 0x52, + 0x0e, 0x64, 0x65, 0x73, 0x74, 0x52, 0x65, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x49, 0x64, 0x42, + 0x11, 0x0a, 0x0f, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x69, 0x6e, + 0x66, 0x6f, 0x22, 0x48, 0x0a, 0x08, 0x50, 0x6f, 0x72, 0x74, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1f, + 0x0a, 0x0b, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0d, 0x52, 0x0a, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x50, 0x6f, 0x72, 0x74, 0x12, + 0x1b, 0x0a, 0x09, 0x64, 0x65, 0x73, 0x74, 0x5f, 0x70, 0x6f, 0x72, 0x74, 0x18, 0x02, 0x20, 0x01, + 0x28, 0x0d, 0x52, 0x08, 0x64, 0x65, 0x73, 0x74, 0x50, 0x6f, 0x72, 0x74, 0x22, 0x44, 0x0a, 0x08, + 0x49, 0x43, 0x4d, 0x50, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70, + 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x69, 0x63, 0x6d, + 0x70, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x69, 0x63, 0x6d, 0x70, 0x5f, 0x63, 0x6f, + 0x64, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x69, 0x63, 0x6d, 0x70, 0x43, 0x6f, + 0x64, 0x65, 0x2a, 0x45, 0x0a, 0x04, 0x54, 0x79, 0x70, 0x65, 0x12, 0x10, 0x0a, 0x0c, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0e, 0x0a, 0x0a, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x10, 0x01, 0x12, 0x0c, 0x0a, 0x08, + 0x54, 0x59, 0x50, 0x45, 0x5f, 0x45, 0x4e, 0x44, 0x10, 0x02, 0x12, 0x0d, 0x0a, 0x09, 0x54, 0x59, + 0x50, 0x45, 0x5f, 0x44, 0x52, 0x4f, 0x50, 0x10, 0x03, 0x2a, 0x3b, 0x0a, 0x09, 0x44, 0x69, 0x72, + 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x15, 0x0a, 0x11, 0x44, 0x49, 0x52, 0x45, 0x43, 0x54, + 0x49, 0x4f, 0x4e, 0x5f, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, + 0x07, 0x49, 0x4e, 0x47, 0x52, 0x45, 0x53, 0x53, 0x10, 0x01, 0x12, 0x0a, 0x0a, 0x06, 0x45, 0x47, + 0x52, 0x45, 0x53, 0x53, 0x10, 0x02, 0x32, 0x42, 0x0a, 0x0b, 0x46, 0x6c, 0x6f, 0x77, 0x53, 0x65, + 0x72, 0x76, 0x69, 0x63, 0x65, 0x12, 0x33, 0x0a, 0x06, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x12, + 0x0f, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, 0x74, + 0x1a, 0x12, 0x2e, 0x66, 0x6c, 0x6f, 0x77, 0x2e, 0x46, 0x6c, 0x6f, 0x77, 0x45, 0x76, 0x65, 0x6e, + 0x74, 0x41, 0x63, 0x6b, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2f, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/flow/proto/flow.proto b/flow/proto/flow.proto index a4bd56137..d11af623a 100644 --- a/flow/proto/flow.proto +++ b/flow/proto/flow.proto @@ -67,6 +67,11 @@ message FlowFields { // Number of bytes uint64 rx_bytes = 12; uint64 tx_bytes = 13; + + // Resource ID + bytes source_resource_id = 14; + bytes dest_resource_id = 15; + } // Flow event types