[management] Migrate events sqlite store to gorm (#3837)

This commit is contained in:
Bethuel Mmbaga
2025-05-20 17:00:37 +03:00
committed by GitHub
parent 1d4cfb83e7
commit 4785f23fc4
6 changed files with 365 additions and 399 deletions

View File

@@ -2,75 +2,21 @@ package sqlite
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"path/filepath"
"runtime"
"time"
_ "github.com/mattn/go-sqlite3"
log "github.com/sirupsen/logrus"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/clause"
"gorm.io/gorm/logger"
"github.com/netbirdio/netbird/management/server/activity"
)
const (
// eventSinkDB is the default name of the events database
eventSinkDB = "events.db"
createTableQuery = "CREATE TABLE IF NOT EXISTS events " +
"(id INTEGER PRIMARY KEY AUTOINCREMENT, " +
"activity INTEGER, " +
"timestamp DATETIME, " +
"initiator_id TEXT," +
"account_id TEXT," +
"meta TEXT," +
" target_id TEXT);"
creatTableDeletedUsersQuery = `CREATE TABLE IF NOT EXISTS deleted_users (id TEXT NOT NULL, email TEXT NOT NULL, name TEXT, enc_algo TEXT NOT NULL);`
selectDescQuery = `SELECT events.id, activity, timestamp, initiator_id, i.name as "initiator_name", i.email as "initiator_email", target_id, t.name as "target_name", t.email as "target_email", account_id, meta
FROM events
LEFT JOIN (
SELECT id, MAX(name) as name, MAX(email) as email
FROM deleted_users
GROUP BY id
) i ON events.initiator_id = i.id
LEFT JOIN (
SELECT id, MAX(name) as name, MAX(email) as email
FROM deleted_users
GROUP BY id
) t ON events.target_id = t.id
WHERE account_id = ?
ORDER BY timestamp DESC LIMIT ? OFFSET ?;`
selectAscQuery = `SELECT events.id, activity, timestamp, initiator_id, i.name as "initiator_name", i.email as "initiator_email", target_id, t.name as "target_name", t.email as "target_email", account_id, meta
FROM events
LEFT JOIN (
SELECT id, MAX(name) as name, MAX(email) as email
FROM deleted_users
GROUP BY id
) i ON events.initiator_id = i.id
LEFT JOIN (
SELECT id, MAX(name) as name, MAX(email) as email
FROM deleted_users
GROUP BY id
) t ON events.target_id = t.id
WHERE account_id = ?
ORDER BY timestamp ASC LIMIT ? OFFSET ?;`
insertQuery = "INSERT INTO events(activity, timestamp, initiator_id, target_id, account_id, meta) " +
"VALUES(?, ?, ?, ?, ?, ?)"
/*
TODO:
The insert should avoid duplicated IDs in the table. So the query should be changes to something like:
`INSERT INTO deleted_users(id, email, name) VALUES(?, ?, ?) ON CONFLICT (id) DO UPDATE SET email = EXCLUDED.email, name = EXCLUDED.name;`
For this to work we have to set the id column as primary key. But this is not possible because the id column is not unique
and some selfhosted deployments might have duplicates already so we need to clean the table first.
*/
insertDeleteUserQuery = `INSERT INTO deleted_users(id, email, name, enc_algo) VALUES(?, ?, ?, ?)`
eventSinkDB = "events.db"
fallbackName = "unknown"
fallbackEmail = "unknown@unknown.com"
@@ -78,172 +24,158 @@ const (
gcmEncAlgo = "GCM"
)
type eventWithNames struct {
activity.Event
InitiatorName string
InitiatorEmail string
TargetName string
TargetEmail string
}
// Store is the implementation of the activity.Store interface backed by SQLite
type Store struct {
db *sql.DB
db *gorm.DB
fieldEncrypt *FieldEncrypt
insertStatement *sql.Stmt
selectAscStatement *sql.Stmt
selectDescStatement *sql.Stmt
deleteUserStmt *sql.Stmt
}
// NewSQLiteStore creates a new Store with an event table if not exists.
func NewSQLiteStore(ctx context.Context, dataDir string, encryptionKey string) (*Store, error) {
dbFile := filepath.Join(dataDir, eventSinkDB)
db, err := sql.Open("sqlite3", dbFile)
if err != nil {
return nil, err
}
db.SetMaxOpenConns(runtime.NumCPU())
crypt, err := NewFieldEncrypt(encryptionKey)
if err != nil {
_ = db.Close()
return nil, err
}
dbFile := filepath.Join(dataDir, eventSinkDB)
db, err := gorm.Open(sqlite.Open(dbFile), &gorm.Config{
Logger: logger.Default.LogMode(logger.Silent),
})
if err != nil {
return nil, err
}
sql, err := db.DB()
if err != nil {
return nil, err
}
sql.SetMaxOpenConns(1)
if err = migrate(ctx, crypt, db); err != nil {
_ = db.Close()
return nil, fmt.Errorf("events database migration: %w", err)
}
return createStore(crypt, db)
err = db.AutoMigrate(&activity.Event{}, &activity.DeletedUser{})
if err != nil {
return nil, fmt.Errorf("events auto migrate: %w", err)
}
return &Store{
db: db,
fieldEncrypt: crypt,
}, nil
}
func (store *Store) processResult(ctx context.Context, result *sql.Rows) ([]*activity.Event, error) {
events := make([]*activity.Event, 0)
func (store *Store) processResult(ctx context.Context, events []*eventWithNames) ([]*activity.Event, error) {
activityEvents := make([]*activity.Event, 0)
var cryptErr error
for result.Next() {
var id int64
var operation activity.Activity
var timestamp time.Time
var initiator string
var initiatorName *string
var initiatorEmail *string
var target string
var targetUserName *string
var targetEmail *string
var account string
var jsonMeta string
err := result.Scan(&id, &operation, &timestamp, &initiator, &initiatorName, &initiatorEmail, &target, &targetUserName, &targetEmail, &account, &jsonMeta)
if err != nil {
return nil, err
for _, event := range events {
e := event.Event
if e.Meta == nil {
e.Meta = make(map[string]any)
}
meta := make(map[string]any)
if jsonMeta != "" {
err = json.Unmarshal([]byte(jsonMeta), &meta)
if event.TargetName != "" {
name, err := store.fieldEncrypt.Decrypt(event.TargetName)
if err != nil {
return nil, err
}
}
if targetUserName != nil {
name, err := store.fieldEncrypt.Decrypt(*targetUserName)
if err != nil {
cryptErr = fmt.Errorf("failed to decrypt username for target id: %s", target)
meta["username"] = fallbackName
cryptErr = fmt.Errorf("failed to decrypt username for target id: %s", event.TargetName)
e.Meta["username"] = fallbackName
} else {
meta["username"] = name
e.Meta["username"] = name
}
}
if targetEmail != nil {
email, err := store.fieldEncrypt.Decrypt(*targetEmail)
if event.TargetEmail != "" {
email, err := store.fieldEncrypt.Decrypt(event.TargetEmail)
if err != nil {
cryptErr = fmt.Errorf("failed to decrypt email address for target id: %s", target)
meta["email"] = fallbackEmail
cryptErr = fmt.Errorf("failed to decrypt email address for target id: %s", event.TargetEmail)
e.Meta["email"] = fallbackEmail
} else {
meta["email"] = email
e.Meta["email"] = email
}
}
event := &activity.Event{
Timestamp: timestamp,
Activity: operation,
ID: uint64(id),
InitiatorID: initiator,
TargetID: target,
AccountID: account,
Meta: meta,
}
if initiatorName != nil {
name, err := store.fieldEncrypt.Decrypt(*initiatorName)
if event.InitiatorName != "" {
name, err := store.fieldEncrypt.Decrypt(event.InitiatorName)
if err != nil {
cryptErr = fmt.Errorf("failed to decrypt username of initiator: %s", initiator)
event.InitiatorName = fallbackName
cryptErr = fmt.Errorf("failed to decrypt username of initiator: %s", event.InitiatorName)
e.InitiatorName = fallbackName
} else {
event.InitiatorName = name
e.InitiatorName = name
}
}
if initiatorEmail != nil {
email, err := store.fieldEncrypt.Decrypt(*initiatorEmail)
if event.InitiatorEmail != "" {
email, err := store.fieldEncrypt.Decrypt(event.InitiatorEmail)
if err != nil {
cryptErr = fmt.Errorf("failed to decrypt email address of initiator: %s", initiator)
event.InitiatorEmail = fallbackEmail
cryptErr = fmt.Errorf("failed to decrypt email address of initiator: %s", event.InitiatorEmail)
e.InitiatorEmail = fallbackEmail
} else {
event.InitiatorEmail = email
e.InitiatorEmail = email
}
}
events = append(events, event)
activityEvents = append(activityEvents, &e)
}
if cryptErr != nil {
log.WithContext(ctx).Warnf("%s", cryptErr)
}
return events, nil
return activityEvents, nil
}
// Get returns "limit" number of events from index ordered descending or ascending by a timestamp
func (store *Store) Get(ctx context.Context, accountID string, offset, limit int, descending bool) ([]*activity.Event, error) {
stmt := store.selectDescStatement
baseQuery := store.db.Model(&activity.Event{}).
Select(`
events.*,
u.name AS initiator_name,
u.email AS initiator_email,
t.name AS target_name,
t.email AS target_email
`).
Joins(`LEFT JOIN deleted_users u ON u.id = events.initiator_id`).
Joins(`LEFT JOIN deleted_users t ON t.id = events.target_id`)
orderDir := "DESC"
if !descending {
stmt = store.selectAscStatement
orderDir = "ASC"
}
result, err := stmt.Query(accountID, limit, offset)
var events []*eventWithNames
err := baseQuery.Order("events.timestamp "+orderDir).Offset(offset).Limit(limit).
Find(&events, "account_id = ?", accountID).Error
if err != nil {
return nil, err
}
defer result.Close() //nolint
return store.processResult(ctx, result)
return store.processResult(ctx, events)
}
// Save an event in the SQLite events table end encrypt the "email" element in meta map
func (store *Store) Save(_ context.Context, event *activity.Event) (*activity.Event, error) {
var jsonMeta string
meta, err := store.saveDeletedUserEmailAndNameInEncrypted(event)
if err != nil {
return nil, err
}
if meta != nil {
metaBytes, err := json.Marshal(event.Meta)
if err != nil {
return nil, err
}
jsonMeta = string(metaBytes)
}
result, err := store.insertStatement.Exec(event.Activity, event.Timestamp, event.InitiatorID, event.TargetID, event.AccountID, jsonMeta)
if err != nil {
return nil, err
}
id, err := result.LastInsertId()
if err != nil {
return nil, err
}
eventCopy := event.Copy()
eventCopy.ID = uint64(id)
meta, err := store.saveDeletedUserEmailAndNameInEncrypted(eventCopy)
if err != nil {
return nil, err
}
eventCopy.Meta = meta
if err = store.db.Create(eventCopy).Error; err != nil {
return nil, err
}
return eventCopy, nil
}
@@ -260,16 +192,27 @@ func (store *Store) saveDeletedUserEmailAndNameInEncrypted(event *activity.Event
return event.Meta, nil
}
deletedUser := activity.DeletedUser{
ID: event.TargetID,
EncAlgo: gcmEncAlgo,
}
encryptedEmail, err := store.fieldEncrypt.Encrypt(fmt.Sprintf("%s", email))
if err != nil {
return nil, err
}
deletedUser.Email = encryptedEmail
encryptedName, err := store.fieldEncrypt.Encrypt(fmt.Sprintf("%s", name))
if err != nil {
return nil, err
}
deletedUser.Name = encryptedName
_, err = store.deleteUserStmt.Exec(event.TargetID, encryptedEmail, encryptedName, gcmEncAlgo)
err = store.db.Clauses(clause.OnConflict{
Columns: []clause.Column{{Name: "id"}},
DoUpdates: clause.AssignmentColumns([]string{"email", "name"}),
}).Create(deletedUser).Error
if err != nil {
return nil, err
}
@@ -285,75 +228,11 @@ func (store *Store) saveDeletedUserEmailAndNameInEncrypted(event *activity.Event
// Close the Store
func (store *Store) Close(_ context.Context) error {
if store.db != nil {
return store.db.Close()
sql, err := store.db.DB()
if err != nil {
return err
}
return sql.Close()
}
return nil
}
// createStore initializes and returns a new Store instance with prepared SQL statements.
func createStore(crypt *FieldEncrypt, db *sql.DB) (*Store, error) {
insertStmt, err := db.Prepare(insertQuery)
if err != nil {
_ = db.Close()
return nil, err
}
selectDescStmt, err := db.Prepare(selectDescQuery)
if err != nil {
_ = db.Close()
return nil, err
}
selectAscStmt, err := db.Prepare(selectAscQuery)
if err != nil {
_ = db.Close()
return nil, err
}
deleteUserStmt, err := db.Prepare(insertDeleteUserQuery)
if err != nil {
_ = db.Close()
return nil, err
}
return &Store{
db: db,
fieldEncrypt: crypt,
insertStatement: insertStmt,
selectDescStatement: selectDescStmt,
selectAscStatement: selectAscStmt,
deleteUserStmt: deleteUserStmt,
}, nil
}
// checkColumnExists checks if a column exists in a specified table
func checkColumnExists(db *sql.DB, tableName, columnName string) (bool, error) {
query := fmt.Sprintf("PRAGMA table_info(%s);", tableName)
rows, err := db.Query(query)
if err != nil {
return false, fmt.Errorf("failed to query table info: %w", err)
}
defer rows.Close()
for rows.Next() {
var cid int
var name, ctype string
var notnull, pk int
var dfltValue sql.NullString
err = rows.Scan(&cid, &name, &ctype, &notnull, &dfltValue, &pk)
if err != nil {
return false, fmt.Errorf("failed to scan row: %w", err)
}
if name == columnName {
return true, nil
}
}
if err = rows.Err(); err != nil {
return false, err
}
return false, nil
}