diff --git a/management/server/event/event.go b/management/server/event/event.go index b1945abf4..dd80eadf8 100644 --- a/management/server/event/event.go +++ b/management/server/event/event.go @@ -11,25 +11,45 @@ const ( type Type string -// Sink provides an interface to store or stream events. -type Sink interface { - // Add an event to the sink. - Add(event *Event) error +// Store provides an interface to store or stream events. +type Store interface { + // Save an event in the store + Save(event Event) (*Event, error) + // GetSince returns a list of events from the store for a given account since the specified time + GetSince(accountID string, from time.Time) ([]Event, error) + // GetLast returns a top N of events from the store for a given account (ordered by timestamp desc) + GetLast(accountID string, limit int) ([]Event, error) // Close the sink flushing events if necessary Close() error } -// Event represents a network activity event. +// Event represents a network/system activity event. type Event struct { // Timestamp of the event Timestamp time.Time - // Message of the event - Message string + // Operation that was performed during the event + Operation string // ID of the event (can be empty, meaning that it wasn't yet generated) ID uint64 // Type of the event Type Type - - // - + // ModifierID is the ID of an object that modifies a Target + ModifierID string + // TargetID is the ID of an object that a Modifier modifies + TargetID string + // AccountID where event happened + AccountID string +} + +// Copy the event +func (e *Event) Copy() *Event { + return &Event{ + Timestamp: e.Timestamp, + Operation: e.Operation, + ID: e.ID, + Type: e.Type, + ModifierID: e.ModifierID, + TargetID: e.TargetID, + AccountID: e.AccountID, + } } diff --git a/management/server/event/sqlite.go b/management/server/event/sqlite.go index 7617ca55e..3eda3c994 100644 --- a/management/server/event/sqlite.go +++ b/management/server/event/sqlite.go @@ -2,23 +2,31 @@ package event import ( "database/sql" - "fmt" _ "github.com/mattn/go-sqlite3" + "path/filepath" + "time" ) const ( SQLiteEventSinkDB = "events.db" createTableQuery = "CREATE TABLE IF NOT EXISTS events " + - "(id UNSIGNED BIG INT PRIMARY KEY AUTOINCREMENT, message TEXT, timestamp DATETIME);" + "(id INTEGER PRIMARY KEY AUTOINCREMENT, account TEXT NOT NULL, " + + "operation TEXT, " + + "type TEXT, " + + "timestamp DATETIME, " + + "modifier TEXT," + + " target TEXT);" ) -type SQLiteSink struct { +// SQLiteStore is the implementation of the event.Store interface backed by SQLite +type SQLiteStore struct { db *sql.DB } -// NewSQLiteSink creates a new SQLiteSink with an event table if not exists. -func NewSQLiteSink(dbPath string) (*SQLiteSink, error) { - db, err := sql.Open("sqlite3", dbPath) +// NewSQLiteStore creates a new SQLiteStore with an event table if not exists. +func NewSQLiteStore(dataDir string) (*SQLiteStore, error) { + dbFile := filepath.Join(dataDir, SQLiteEventSinkDB) + db, err := sql.Open("sqlite3", dbFile) if err != nil { return nil, err } @@ -28,39 +36,102 @@ func NewSQLiteSink(dbPath string) (*SQLiteSink, error) { return nil, err } - return &SQLiteSink{db: db}, nil + return &SQLiteStore{db: db}, nil } -// Add an event to the SQLite table -func (sink *SQLiteSink) Add(event *Event) error { +func processResult(result *sql.Rows) ([]Event, error) { + events := make([]Event, 0) + for result.Next() { + var id int64 + var operation string + var timestamp time.Time + var modifier string + var target string + var account string + var typ Type + err := result.Scan(&id, &operation, ×tamp, &modifier, &target, &account, &typ) + if err != nil { + return nil, err + } - _, err := sink.db.Exec(createTableQuery) - if err != nil { - return err + events = append(events, Event{ + Timestamp: timestamp, + Operation: operation, + ID: uint64(id), + Type: typ, + ModifierID: modifier, + TargetID: target, + AccountID: account, + }) } - stmt, err := sink.db.Prepare("INSERT INTO events(message, timestamp) values(?, ?)") + return events, nil +} + +// GetLast returns a top N of events from the store for a given account (ordered by timestamp desc) +func (store *SQLiteStore) GetLast(accountID string, limit int) ([]Event, error) { + stmt, err := store.db.Prepare("SELECT id, operation, timestamp, modifier, target, account, type" + + " FROM events WHERE account = ? ORDER BY timestamp DESC limit ?;") if err != nil { - return err + return nil, err } - result, err := stmt.Exec(event.Message, event.Timestamp) + result, err := stmt.Query(accountID, limit) + if err != nil { + return nil, err + } + + defer result.Close() //nolint + return processResult(result) +} + +// GetSince returns a list of events from the store for a given account since the specified time +func (store *SQLiteStore) GetSince(accountID string, from time.Time) ([]Event, error) { + stmt, err := store.db.Prepare("SELECT id, operation, timestamp, modifier, target, account, type" + + " FROM events WHERE account = ? and timestamp >= ?;") + if err != nil { + return nil, err + } + + result, err := stmt.Query(accountID, from) + if err != nil { + return nil, err + } + + defer result.Close() //nolint + return processResult(result) +} + +// Save an event in the SQLite events table +func (store *SQLiteStore) Save(event Event) (*Event, error) { + + stmt, err := store.db.Prepare("INSERT INTO events(operation, timestamp, modifier, target, account, type) VALUES(?, ?, ?, ?, ?, ?)") + if err != nil { + return nil, err + } + + result, err := stmt.Exec(event.Operation, event.Timestamp, event.ModifierID, event.TargetID, event.AccountID, event.Type) if err != nil { if err == sql.ErrNoRows { // Handle the case of no rows returned. } - return album, err + return nil, err } - fmt.Println(result) + id, err := result.LastInsertId() + if err != nil { + return nil, err + } - return nil + eventCopy := event.Copy() + eventCopy.ID = uint64(id) + return eventCopy, nil } -// Close the SQLiteSink -func (sink *SQLiteSink) Close() error { - if sink.db != nil { - return sink.db.Close() +// Close the SQLiteStore +func (store *SQLiteStore) Close() error { + if store.db != nil { + return store.db.Close() } return nil } diff --git a/management/server/event/sqlite_test.go b/management/server/event/sqlite_test.go new file mode 100644 index 000000000..9c2400c4a --- /dev/null +++ b/management/server/event/sqlite_test.go @@ -0,0 +1,47 @@ +package event + +import ( + "fmt" + "testing" + "time" +) + +func TestNewSQLiteStore(t *testing.T) { + dataDir := t.TempDir() + store, err := NewSQLiteStore(dataDir) + if err != nil { + t.Fatal(err) + return + } + + accountID := "account_1" + eventTime := time.Now() + _, err = store.Save(Event{ + Timestamp: eventTime, + Operation: "cool operation", + Type: ManagementEvent, + ModifierID: "user_1", + TargetID: "peer_1", + AccountID: accountID, + }) + if err != nil { + t.Fatal(err) + return + } + + result, err := store.GetSince(accountID, eventTime.Add(-10*time.Second)) + if err != nil { + t.Fatal(err) + return + } + + fmt.Println(result) + + result, err = store.GetLast(accountID, 10) + if err != nil { + t.Fatal(err) + return + } + + fmt.Println(result) +}