Add system activity tracking and event store (#636)

This PR adds system activity tracking. 
The management service records events like 
add/remove peer,  group, rule, route, etc.

The activity events are stored in the SQLite event store
and can be queried by the HTTP API.
This commit is contained in:
Misha Bragin
2023-01-02 15:11:32 +01:00
committed by GitHub
parent 50caacff69
commit 5c0b8a46f0
42 changed files with 1827 additions and 227 deletions

View File

@@ -0,0 +1,202 @@
package activity
const (
// PeerAddedByUser indicates that a user added a new peer to the system
PeerAddedByUser Activity = iota
// PeerAddedWithSetupKey indicates that a new peer joined the system using a setup key
PeerAddedWithSetupKey
// UserJoined indicates that a new user joined the account
UserJoined
// UserInvited indicates that a new user was invited to join the account
UserInvited
// AccountCreated indicates that a new account has been created
AccountCreated
// PeerRemovedByUser indicates that a user removed a peer from the system
PeerRemovedByUser
// RuleAdded indicates that a user added a new rule
RuleAdded
// RuleUpdated indicates that a user updated a rule
RuleUpdated
// RuleRemoved indicates that a user removed a rule
RuleRemoved
// SetupKeyCreated indicates that a user created a new setup key
SetupKeyCreated
// SetupKeyUpdated indicates that a user updated a setup key
SetupKeyUpdated
// SetupKeyRevoked indicates that a user revoked a setup key
SetupKeyRevoked
// SetupKeyOverused indicates that setup key usage exhausted
SetupKeyOverused
// GroupCreated indicates that a user created a group
GroupCreated
// GroupUpdated indicates that a user updated a group
GroupUpdated
// GroupAddedToPeer indicates that a user added group to a peer
GroupAddedToPeer
// GroupRemovedFromPeer indicates that a user removed peer group
GroupRemovedFromPeer
// GroupAddedToUser indicates that a user added group to a user
GroupAddedToUser
// GroupRemovedFromUser indicates that a user removed a group from a user
GroupRemovedFromUser
// UserRoleUpdated indicates that a user changed the role of a user
UserRoleUpdated
// GroupAddedToSetupKey indicates that a user added group to a setup key
GroupAddedToSetupKey
// GroupRemovedFromSetupKey indicates that a user removed a group from a setup key
GroupRemovedFromSetupKey
)
const (
// PeerAddedByUserMessage is a human-readable text message of the PeerAddedByUser activity
PeerAddedByUserMessage string = "Peer added"
// PeerAddedWithSetupKeyMessage is a human-readable text message of the PeerAddedWithSetupKey activity
PeerAddedWithSetupKeyMessage = PeerAddedByUserMessage
//UserJoinedMessage is a human-readable text message of the UserJoined activity
UserJoinedMessage string = "User joined"
//UserInvitedMessage is a human-readable text message of the UserInvited activity
UserInvitedMessage string = "User invited"
//AccountCreatedMessage is a human-readable text message of the AccountCreated activity
AccountCreatedMessage string = "Account created"
// PeerRemovedByUserMessage is a human-readable text message of the PeerRemovedByUser activity
PeerRemovedByUserMessage string = "Peer deleted"
// RuleAddedMessage is a human-readable text message of the RuleAdded activity
RuleAddedMessage string = "Rule added"
// RuleRemovedMessage is a human-readable text message of the RuleRemoved activity
RuleRemovedMessage string = "Rule deleted"
// RuleUpdatedMessage is a human-readable text message of the RuleRemoved activity
RuleUpdatedMessage string = "Rule updated"
// SetupKeyCreatedMessage is a human-readable text message of the SetupKeyCreated activity
SetupKeyCreatedMessage string = "Setup key created"
// SetupKeyUpdatedMessage is a human-readable text message of the SetupKeyUpdated activity
SetupKeyUpdatedMessage string = "Setup key updated"
// SetupKeyRevokedMessage is a human-readable text message of the SetupKeyRevoked activity
SetupKeyRevokedMessage string = "Setup key revoked"
// SetupKeyOverusedMessage is a human-readable text message of the SetupKeyOverused activity
SetupKeyOverusedMessage string = "Setup key overused"
// GroupCreatedMessage is a human-readable text message of the GroupCreated activity
GroupCreatedMessage string = "Group created"
// GroupUpdatedMessage is a human-readable text message of the GroupUpdated activity
GroupUpdatedMessage string = "Group updated"
// GroupAddedToPeerMessage is a human-readable text message of the GroupAddedToPeer activity
GroupAddedToPeerMessage string = "Group added to peer"
// GroupRemovedFromPeerMessage is a human-readable text message of the GroupRemovedFromPeer activity
GroupRemovedFromPeerMessage string = "Group removed from peer"
// GroupAddedToUserMessage is a human-readable text message of the GroupAddedToUser activity
GroupAddedToUserMessage string = "Group added to user"
// GroupRemovedFromUserMessage is a human-readable text message of the GroupRemovedFromUser activity
GroupRemovedFromUserMessage string = "Group removed from user"
// UserRoleUpdatedMessage is a human-readable text message of the UserRoleUpdatedMessage activity
UserRoleUpdatedMessage string = "User role updated"
// GroupAddedToSetupKeyMessage is a human-readable text message of the GroupAddedToSetupKey activity
GroupAddedToSetupKeyMessage string = "Group added to setup key"
// GroupRemovedFromSetupKeyMessage is a human-readable text message of the GroupRemovedFromSetupKey activity
GroupRemovedFromSetupKeyMessage string = "Group removed from user setup key"
)
// Activity that triggered an Event
type Activity int
// Message returns a string representation of an activity
func (a Activity) Message() string {
switch a {
case PeerAddedByUser:
return PeerAddedByUserMessage
case PeerRemovedByUser:
return PeerRemovedByUserMessage
case PeerAddedWithSetupKey:
return PeerAddedWithSetupKeyMessage
case UserJoined:
return UserJoinedMessage
case UserInvited:
return UserInvitedMessage
case AccountCreated:
return AccountCreatedMessage
case RuleAdded:
return RuleAddedMessage
case RuleRemoved:
return RuleRemovedMessage
case RuleUpdated:
return RuleUpdatedMessage
case SetupKeyCreated:
return SetupKeyCreatedMessage
case SetupKeyUpdated:
return SetupKeyUpdatedMessage
case SetupKeyRevoked:
return SetupKeyRevokedMessage
case SetupKeyOverused:
return SetupKeyOverusedMessage
case GroupCreated:
return GroupCreatedMessage
case GroupUpdated:
return GroupUpdatedMessage
case GroupAddedToPeer:
return GroupAddedToPeerMessage
case GroupRemovedFromPeer:
return GroupRemovedFromPeerMessage
case GroupRemovedFromUser:
return GroupRemovedFromUserMessage
case GroupAddedToUser:
return GroupAddedToUserMessage
case UserRoleUpdated:
return UserRoleUpdatedMessage
case GroupAddedToSetupKey:
return GroupAddedToSetupKeyMessage
case GroupRemovedFromSetupKey:
return GroupRemovedFromSetupKeyMessage
default:
return "UNKNOWN_ACTIVITY"
}
}
// StringCode returns a string code of the activity
func (a Activity) StringCode() string {
switch a {
case PeerAddedByUser:
return "user.peer.add"
case PeerRemovedByUser:
return "user.peer.delete"
case PeerAddedWithSetupKey:
return "setupkey.peer.add"
case UserJoined:
return "user.join"
case UserInvited:
return "user.invite"
case AccountCreated:
return "account.create"
case RuleAdded:
return "rule.add"
case RuleRemoved:
return "rule.delete"
case RuleUpdated:
return "rule.update"
case SetupKeyCreated:
return "setupkey.add"
case SetupKeyRevoked:
return "setupkey.revoke"
case SetupKeyOverused:
return "setupkey.overuse"
case SetupKeyUpdated:
return "setupkey.update"
case GroupCreated:
return "group.add"
case GroupUpdated:
return "group.update"
case GroupRemovedFromPeer:
return "peer.group.delete"
case GroupAddedToPeer:
return "peer.group.add"
case GroupAddedToUser:
return "user.group.add"
case GroupRemovedFromUser:
return "user.group.delete"
case UserRoleUpdated:
return "user.role.update"
case GroupAddedToSetupKey:
return "setupkey.group.add"
case GroupRemovedFromSetupKey:
return "setupkey.group.delete"
default:
return "UNKNOWN_ACTIVITY"
}
}

View File

@@ -0,0 +1,42 @@
package activity
import (
"time"
)
// Event represents a network/system activity event.
type Event struct {
// Timestamp of the event
Timestamp time.Time
// Activity that was performed during the event
Activity Activity
// ID of the event (can be empty, meaning that it wasn't yet generated)
ID uint64
// InitiatorID is the ID of an object that initiated the event (e.g., a user)
InitiatorID string
// TargetID is the ID of an object that was effected by the event (e.g., a peer)
TargetID string
// AccountID is the ID of an account where the event happened
AccountID string
// Meta of the event, e.g. deleted peer information like name, IP, etc
Meta map[string]any
}
// Copy the event
func (e *Event) Copy() *Event {
meta := make(map[string]any, len(e.Meta))
for key, value := range e.Meta {
meta[key] = value
}
return &Event{
Timestamp: e.Timestamp,
Activity: e.Activity,
ID: e.ID,
InitiatorID: e.InitiatorID,
TargetID: e.TargetID,
AccountID: e.AccountID,
Meta: meta,
}
}

View File

@@ -0,0 +1,149 @@
package sqlite
import (
"database/sql"
"encoding/json"
"fmt"
"github.com/netbirdio/netbird/management/server/activity"
// sqlite driver
_ "github.com/mattn/go-sqlite3"
"path/filepath"
"time"
)
const (
//eventSinkDB is the default name of the events database
eventSinkDB = "events.db"
createTableQuery = "CREATE TABLE IF NOT EXISTS events " +
"(id INTEGER PRIMARY KEY AUTOINCREMENT, " +
"activity INTEGER, " +
"timestamp DATETIME, " +
"initiator_id TEXT," +
"account_id TEXT," +
"meta TEXT," +
" target_id TEXT);"
selectStatement = "SELECT id, activity, timestamp, initiator_id, target_id, account_id, meta" +
" FROM events WHERE account_id = ? ORDER BY timestamp %s LIMIT ? OFFSET ?;"
insertStatement = "INSERT INTO events(activity, timestamp, initiator_id, target_id, account_id, meta) " +
"VALUES(?, ?, ?, ?, ?, ?)"
)
// Store is the implementation of the activity.Store interface backed by SQLite
type Store struct {
db *sql.DB
}
// NewSQLiteStore creates a new Store with an event table if not exists.
func NewSQLiteStore(dataDir string) (*Store, error) {
dbFile := filepath.Join(dataDir, eventSinkDB)
db, err := sql.Open("sqlite3", dbFile)
if err != nil {
return nil, err
}
_, err = db.Exec(createTableQuery)
if err != nil {
return nil, err
}
return &Store{db: db}, nil
}
func processResult(result *sql.Rows) ([]*activity.Event, error) {
events := make([]*activity.Event, 0)
for result.Next() {
var id int64
var operation activity.Activity
var timestamp time.Time
var initiator string
var target string
var account string
var jsonMeta string
err := result.Scan(&id, &operation, &timestamp, &initiator, &target, &account, &jsonMeta)
if err != nil {
return nil, err
}
meta := make(map[string]any)
if jsonMeta != "" {
err = json.Unmarshal([]byte(jsonMeta), &meta)
if err != nil {
return nil, err
}
}
events = append(events, &activity.Event{
Timestamp: timestamp,
Activity: operation,
ID: uint64(id),
InitiatorID: initiator,
TargetID: target,
AccountID: account,
Meta: meta,
})
}
return events, nil
}
// Get returns "limit" number of events from index ordered descending or ascending by a timestamp
func (store *Store) Get(accountID string, offset, limit int, descending bool) ([]*activity.Event, error) {
order := "DESC"
if !descending {
order = "ASC"
}
stmt, err := store.db.Prepare(fmt.Sprintf(selectStatement, order))
if err != nil {
return nil, err
}
result, err := stmt.Query(accountID, limit, offset)
if err != nil {
return nil, err
}
defer result.Close() //nolint
return processResult(result)
}
// Save an event in the SQLite events table
func (store *Store) Save(event *activity.Event) (*activity.Event, error) {
stmt, err := store.db.Prepare(insertStatement)
if err != nil {
return nil, err
}
var jsonMeta string
if event.Meta != nil {
metaBytes, err := json.Marshal(event.Meta)
if err != nil {
return nil, err
}
jsonMeta = string(metaBytes)
}
result, err := stmt.Exec(event.Activity, event.Timestamp, event.InitiatorID, event.TargetID, event.AccountID, jsonMeta)
if err != nil {
return nil, err
}
id, err := result.LastInsertId()
if err != nil {
return nil, err
}
eventCopy := event.Copy()
eventCopy.ID = uint64(id)
return eventCopy, nil
}
// Close the Store
func (store *Store) Close() error {
if store.db != nil {
return store.db.Close()
}
return nil
}

View File

@@ -0,0 +1,53 @@
package sqlite
import (
"fmt"
"github.com/netbirdio/netbird/management/server/activity"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
func TestNewSQLiteStore(t *testing.T) {
dataDir := t.TempDir()
store, err := NewSQLiteStore(dataDir)
if err != nil {
t.Fatal(err)
return
}
defer store.Close() //nolint
accountID := "account_1"
for i := 0; i < 10; i++ {
_, err = store.Save(&activity.Event{
Timestamp: time.Now(),
Activity: activity.PeerAddedByUser,
InitiatorID: "user_" + fmt.Sprint(i),
TargetID: "peer_" + fmt.Sprint(i),
AccountID: accountID,
})
if err != nil {
t.Fatal(err)
return
}
}
result, err := store.Get(accountID, 0, 10, false)
if err != nil {
t.Fatal(err)
return
}
assert.Len(t, result, 10)
assert.True(t, result[0].Timestamp.Before(result[len(result)-1].Timestamp))
result, err = store.Get(accountID, 0, 5, true)
if err != nil {
t.Fatal(err)
return
}
assert.Len(t, result, 5)
assert.True(t, result[0].Timestamp.After(result[len(result)-1].Timestamp))
}

View File

@@ -0,0 +1,54 @@
package activity
import "sync"
// Store provides an interface to store or stream events.
type Store interface {
// Save an event in the store
Save(event *Event) (*Event, error)
// Get returns "limit" number of events from the "offset" index ordered descending or ascending by a timestamp
Get(accountID string, offset, limit int, descending bool) ([]*Event, error)
// Close the sink flushing events if necessary
Close() error
}
// InMemoryEventStore implements the Store interface storing data in-memory
type InMemoryEventStore struct {
mu sync.Mutex
nextID uint64
events []*Event
}
// Save sets the Event.ID to 1
func (store *InMemoryEventStore) Save(event *Event) (*Event, error) {
store.mu.Lock()
defer store.mu.Unlock()
if store.events == nil {
store.events = make([]*Event, 0)
}
event.ID = store.nextID
store.nextID++
store.events = append(store.events, event)
return event, nil
}
// Get returns a list of ALL events that belong to the given accountID without taking offset, limit and order into consideration
func (store *InMemoryEventStore) Get(accountID string, offset, limit int, descending bool) ([]*Event, error) {
store.mu.Lock()
defer store.mu.Unlock()
events := make([]*Event, 0)
for _, event := range store.events {
if event.AccountID == accountID {
events = append(events, event)
}
}
return events, nil
}
// Close cleans up the event list
func (store *InMemoryEventStore) Close() error {
store.mu.Lock()
defer store.mu.Unlock()
store.events = make([]*Event, 0)
return nil
}