mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-21 17:56:39 +00:00
Add basic Save and Get operations to the event store
This commit is contained in:
@@ -11,25 +11,45 @@ const (
|
||||
|
||||
type Type string
|
||||
|
||||
// Sink provides an interface to store or stream events.
|
||||
type Sink interface {
|
||||
// Add an event to the sink.
|
||||
Add(event *Event) error
|
||||
// Store provides an interface to store or stream events.
|
||||
type Store interface {
|
||||
// Save an event in the store
|
||||
Save(event Event) (*Event, error)
|
||||
// GetSince returns a list of events from the store for a given account since the specified time
|
||||
GetSince(accountID string, from time.Time) ([]Event, error)
|
||||
// GetLast returns a top N of events from the store for a given account (ordered by timestamp desc)
|
||||
GetLast(accountID string, limit int) ([]Event, error)
|
||||
// Close the sink flushing events if necessary
|
||||
Close() error
|
||||
}
|
||||
|
||||
// Event represents a network activity event.
|
||||
// Event represents a network/system activity event.
|
||||
type Event struct {
|
||||
// Timestamp of the event
|
||||
Timestamp time.Time
|
||||
// Message of the event
|
||||
Message string
|
||||
// Operation that was performed during the event
|
||||
Operation string
|
||||
// ID of the event (can be empty, meaning that it wasn't yet generated)
|
||||
ID uint64
|
||||
// Type of the event
|
||||
Type Type
|
||||
|
||||
//
|
||||
|
||||
// ModifierID is the ID of an object that modifies a Target
|
||||
ModifierID string
|
||||
// TargetID is the ID of an object that a Modifier modifies
|
||||
TargetID string
|
||||
// AccountID where event happened
|
||||
AccountID string
|
||||
}
|
||||
|
||||
// Copy the event
|
||||
func (e *Event) Copy() *Event {
|
||||
return &Event{
|
||||
Timestamp: e.Timestamp,
|
||||
Operation: e.Operation,
|
||||
ID: e.ID,
|
||||
Type: e.Type,
|
||||
ModifierID: e.ModifierID,
|
||||
TargetID: e.TargetID,
|
||||
AccountID: e.AccountID,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,23 +2,31 @@ package event
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
_ "github.com/mattn/go-sqlite3"
|
||||
"path/filepath"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
SQLiteEventSinkDB = "events.db"
|
||||
createTableQuery = "CREATE TABLE IF NOT EXISTS events " +
|
||||
"(id UNSIGNED BIG INT PRIMARY KEY AUTOINCREMENT, message TEXT, timestamp DATETIME);"
|
||||
"(id INTEGER PRIMARY KEY AUTOINCREMENT, account TEXT NOT NULL, " +
|
||||
"operation TEXT, " +
|
||||
"type TEXT, " +
|
||||
"timestamp DATETIME, " +
|
||||
"modifier TEXT," +
|
||||
" target TEXT);"
|
||||
)
|
||||
|
||||
type SQLiteSink struct {
|
||||
// SQLiteStore is the implementation of the event.Store interface backed by SQLite
|
||||
type SQLiteStore struct {
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
// NewSQLiteSink creates a new SQLiteSink with an event table if not exists.
|
||||
func NewSQLiteSink(dbPath string) (*SQLiteSink, error) {
|
||||
db, err := sql.Open("sqlite3", dbPath)
|
||||
// NewSQLiteStore creates a new SQLiteStore with an event table if not exists.
|
||||
func NewSQLiteStore(dataDir string) (*SQLiteStore, error) {
|
||||
dbFile := filepath.Join(dataDir, SQLiteEventSinkDB)
|
||||
db, err := sql.Open("sqlite3", dbFile)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -28,39 +36,102 @@ func NewSQLiteSink(dbPath string) (*SQLiteSink, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &SQLiteSink{db: db}, nil
|
||||
return &SQLiteStore{db: db}, nil
|
||||
}
|
||||
|
||||
// Add an event to the SQLite table
|
||||
func (sink *SQLiteSink) Add(event *Event) error {
|
||||
func processResult(result *sql.Rows) ([]Event, error) {
|
||||
events := make([]Event, 0)
|
||||
for result.Next() {
|
||||
var id int64
|
||||
var operation string
|
||||
var timestamp time.Time
|
||||
var modifier string
|
||||
var target string
|
||||
var account string
|
||||
var typ Type
|
||||
err := result.Scan(&id, &operation, ×tamp, &modifier, &target, &account, &typ)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
_, err := sink.db.Exec(createTableQuery)
|
||||
if err != nil {
|
||||
return err
|
||||
events = append(events, Event{
|
||||
Timestamp: timestamp,
|
||||
Operation: operation,
|
||||
ID: uint64(id),
|
||||
Type: typ,
|
||||
ModifierID: modifier,
|
||||
TargetID: target,
|
||||
AccountID: account,
|
||||
})
|
||||
}
|
||||
|
||||
stmt, err := sink.db.Prepare("INSERT INTO events(message, timestamp) values(?, ?)")
|
||||
return events, nil
|
||||
}
|
||||
|
||||
// GetLast returns a top N of events from the store for a given account (ordered by timestamp desc)
|
||||
func (store *SQLiteStore) GetLast(accountID string, limit int) ([]Event, error) {
|
||||
stmt, err := store.db.Prepare("SELECT id, operation, timestamp, modifier, target, account, type" +
|
||||
" FROM events WHERE account = ? ORDER BY timestamp DESC limit ?;")
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, err := stmt.Exec(event.Message, event.Timestamp)
|
||||
result, err := stmt.Query(accountID, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer result.Close() //nolint
|
||||
return processResult(result)
|
||||
}
|
||||
|
||||
// GetSince returns a list of events from the store for a given account since the specified time
|
||||
func (store *SQLiteStore) GetSince(accountID string, from time.Time) ([]Event, error) {
|
||||
stmt, err := store.db.Prepare("SELECT id, operation, timestamp, modifier, target, account, type" +
|
||||
" FROM events WHERE account = ? and timestamp >= ?;")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, err := stmt.Query(accountID, from)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer result.Close() //nolint
|
||||
return processResult(result)
|
||||
}
|
||||
|
||||
// Save an event in the SQLite events table
|
||||
func (store *SQLiteStore) Save(event Event) (*Event, error) {
|
||||
|
||||
stmt, err := store.db.Prepare("INSERT INTO events(operation, timestamp, modifier, target, account, type) VALUES(?, ?, ?, ?, ?, ?)")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
result, err := stmt.Exec(event.Operation, event.Timestamp, event.ModifierID, event.TargetID, event.AccountID, event.Type)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
// Handle the case of no rows returned.
|
||||
}
|
||||
return album, err
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fmt.Println(result)
|
||||
id, err := result.LastInsertId()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil
|
||||
eventCopy := event.Copy()
|
||||
eventCopy.ID = uint64(id)
|
||||
return eventCopy, nil
|
||||
}
|
||||
|
||||
// Close the SQLiteSink
|
||||
func (sink *SQLiteSink) Close() error {
|
||||
if sink.db != nil {
|
||||
return sink.db.Close()
|
||||
// Close the SQLiteStore
|
||||
func (store *SQLiteStore) Close() error {
|
||||
if store.db != nil {
|
||||
return store.db.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
47
management/server/event/sqlite_test.go
Normal file
47
management/server/event/sqlite_test.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package event
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestNewSQLiteStore(t *testing.T) {
|
||||
dataDir := t.TempDir()
|
||||
store, err := NewSQLiteStore(dataDir)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
accountID := "account_1"
|
||||
eventTime := time.Now()
|
||||
_, err = store.Save(Event{
|
||||
Timestamp: eventTime,
|
||||
Operation: "cool operation",
|
||||
Type: ManagementEvent,
|
||||
ModifierID: "user_1",
|
||||
TargetID: "peer_1",
|
||||
AccountID: accountID,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
result, err := store.GetSince(accountID, eventTime.Add(-10*time.Second))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println(result)
|
||||
|
||||
result, err = store.GetLast(accountID, 10)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Println(result)
|
||||
}
|
||||
Reference in New Issue
Block a user