diff --git a/go.mod b/go.mod index c86acdf26..11dc88c43 100644 --- a/go.mod +++ b/go.mod @@ -63,7 +63,7 @@ require ( github.com/miekg/dns v1.1.59 github.com/mitchellh/hashstructure/v2 v2.0.2 github.com/nadoo/ipset v0.5.0 - github.com/netbirdio/management-integrations/integrations v0.0.0-20250330143713-7901e0a82203 + github.com/netbirdio/management-integrations/integrations v0.0.0-20250529122842-6700aa91190c github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250514131221-a464fd5f30cb github.com/okta/okta-sdk-golang/v2 v2.18.0 github.com/oschwald/maxminddb-golang v1.12.0 diff --git a/go.sum b/go.sum index 226ee94c2..f887cee94 100644 --- a/go.sum +++ b/go.sum @@ -503,8 +503,8 @@ github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944 h1:TDtJKmM6S github.com/netbirdio/go-netroute v0.0.0-20240611143515-f59b0e1d3944/go.mod h1:sHA6TRxjQ6RLbnI+3R4DZo2Eseg/iKiPRfNmcuNySVQ= github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e h1:PURA50S8u4mF6RrkYYCAvvPCixhqqEiEy3Ej6avh04c= github.com/netbirdio/ice/v3 v3.0.0-20240315174635-e72a50fcb64e/go.mod h1:YMLU7qbKfVjmEv7EoZPIVEI+kNYxWCdPK3VS0BU+U4Q= -github.com/netbirdio/management-integrations/integrations v0.0.0-20250330143713-7901e0a82203 h1:uxxbLPXQgC9VO15epNPtrD6zazyd5rZeqC5hQSmCdZU= -github.com/netbirdio/management-integrations/integrations v0.0.0-20250330143713-7901e0a82203/go.mod h1:2ZE6/tBBCKHQggPfO2UOQjyjXI7k+JDVl2ymorTOVQs= +github.com/netbirdio/management-integrations/integrations v0.0.0-20250529122842-6700aa91190c h1:SdZxYjR9XXHLyRsTbS1EHBr6+RI15oie1K9Q8yvi3FY= +github.com/netbirdio/management-integrations/integrations v0.0.0-20250529122842-6700aa91190c/go.mod h1:Gi9raplYzCCyh07Olw/DVfCJTFgpr1WCXJ/Q+8TSA9Q= github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502 h1:3tHlFmhTdX9axERMVN63dqyFqnvuD+EMJHzM7mNGON8= github.com/netbirdio/service v0.0.0-20240911161631-f62744f42502/go.mod h1:CIMRFEJVL+0DS1a3Nx06NaMn4Dz63Ng6O7dl0qH0zVM= github.com/netbirdio/signal-dispatcher/dispatcher v0.0.0-20250514131221-a464fd5f30cb h1:Cr6age+ePALqlSvtp7wc6lYY97XN7rkD1K4XEDmY+TU= diff --git a/management/server/activity/sqlite/crypt.go b/management/server/activity/store/crypt.go similarity index 99% rename from management/server/activity/sqlite/crypt.go rename to management/server/activity/store/crypt.go index 096f49ea3..ce97347d4 100644 --- a/management/server/activity/sqlite/crypt.go +++ b/management/server/activity/store/crypt.go @@ -1,4 +1,4 @@ -package sqlite +package store import ( "bytes" diff --git a/management/server/activity/sqlite/crypt_test.go b/management/server/activity/store/crypt_test.go similarity index 99% rename from management/server/activity/sqlite/crypt_test.go rename to management/server/activity/store/crypt_test.go index aff3a08b1..700bbcd6b 100644 --- a/management/server/activity/sqlite/crypt_test.go +++ b/management/server/activity/store/crypt_test.go @@ -1,4 +1,4 @@ -package sqlite +package store import ( "bytes" diff --git a/management/server/activity/sqlite/migration.go b/management/server/activity/store/migration.go similarity index 91% rename from management/server/activity/sqlite/migration.go rename to management/server/activity/store/migration.go index 6da7893a0..af19a34eb 100644 --- a/management/server/activity/sqlite/migration.go +++ b/management/server/activity/store/migration.go @@ -1,4 +1,4 @@ -package sqlite +package store import ( "context" @@ -6,6 +6,7 @@ import ( log "github.com/sirupsen/logrus" "gorm.io/gorm" + "gorm.io/gorm/clause" "github.com/netbirdio/netbird/management/server/activity" "github.com/netbirdio/netbird/management/server/migration" @@ -132,11 +133,6 @@ func migrateDuplicateDeletedUsers(ctx context.Context, db *gorm.DB) error { } if err = db.Transaction(func(tx *gorm.DB) error { - groupById := tx.Model(model).Select("MAX(rowid)").Group("id") - if err = tx.Delete(model, "rowid NOT IN (?)", groupById).Error; err != nil { - return err - } - if err = tx.Migrator().RenameTable("deleted_users", "deleted_users_old"); err != nil { return err } @@ -145,12 +141,20 @@ func migrateDuplicateDeletedUsers(ctx context.Context, db *gorm.DB) error { return err } - if err = tx.Exec(` - INSERT INTO deleted_users (id, email, name, enc_algo) SELECT id, email, name, enc_algo - FROM deleted_users_old;`).Error; err != nil { + var deletedUsers []activity.DeletedUser + if err = tx.Table("deleted_users_old").Find(&deletedUsers).Error; err != nil { return err } + for _, deletedUser := range deletedUsers { + if err = tx.Clauses(clause.OnConflict{ + Columns: []clause.Column{{Name: "id"}}, + DoUpdates: clause.AssignmentColumns([]string{"email", "name", "enc_algo"}), + }).Create(&deletedUser).Error; err != nil { + return err + } + } + return tx.Migrator().DropTable("deleted_users_old") }); err != nil { return err diff --git a/management/server/activity/sqlite/migration_test.go b/management/server/activity/store/migration_test.go similarity index 93% rename from management/server/activity/sqlite/migration_test.go rename to management/server/activity/store/migration_test.go index 498c976d9..e3261d9fa 100644 --- a/management/server/activity/sqlite/migration_test.go +++ b/management/server/activity/store/migration_test.go @@ -1,17 +1,17 @@ -package sqlite +package store import ( "context" - "path/filepath" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "gorm.io/driver/sqlite" + "gorm.io/driver/postgres" "gorm.io/gorm" "github.com/netbirdio/netbird/management/server/activity" "github.com/netbirdio/netbird/management/server/migration" + "github.com/netbirdio/netbird/management/server/testutil" ) const ( @@ -21,8 +21,11 @@ const ( func setupDatabase(t *testing.T) *gorm.DB { t.Helper() - dbFile := filepath.Join(t.TempDir(), eventSinkDB) - db, err := gorm.Open(sqlite.Open(dbFile)) + cleanup, dsn, err := testutil.CreatePostgresTestContainer() + require.NoError(t, err, "Failed to create Postgres test container") + t.Cleanup(cleanup) + + db, err := gorm.Open(postgres.Open(dsn)) require.NoError(t, err) sql, err := db.DB() diff --git a/management/server/activity/sqlite/sqlite.go b/management/server/activity/store/sql_store.go similarity index 74% rename from management/server/activity/sqlite/sqlite.go rename to management/server/activity/store/sql_store.go index 6d198fca9..80b165938 100644 --- a/management/server/activity/sqlite/sqlite.go +++ b/management/server/activity/store/sql_store.go @@ -1,17 +1,22 @@ -package sqlite +package store import ( "context" "fmt" + "os" "path/filepath" + "runtime" + "strconv" log "github.com/sirupsen/logrus" + "gorm.io/driver/postgres" "gorm.io/driver/sqlite" "gorm.io/gorm" "gorm.io/gorm/clause" "gorm.io/gorm/logger" "github.com/netbirdio/netbird/management/server/activity" + "github.com/netbirdio/netbird/management/server/types" ) const ( @@ -22,6 +27,10 @@ const ( fallbackEmail = "unknown@unknown.com" gcmEncAlgo = "GCM" + + storeEngineEnv = "NB_ACTIVITY_EVENT_STORE_ENGINE" + postgresDsnEnv = "NB_ACTIVITY_EVENT_POSTGRES_DSN" + sqlMaxOpenConnsEnv = "NB_SQL_MAX_OPEN_CONNS" ) type eventWithNames struct { @@ -38,28 +47,19 @@ type Store struct { fieldEncrypt *FieldEncrypt } -// NewSQLiteStore creates a new Store with an event table if not exists. -func NewSQLiteStore(ctx context.Context, dataDir string, encryptionKey string) (*Store, error) { +// NewSqlStore creates a new Store with an event table if not exists. +func NewSqlStore(ctx context.Context, dataDir string, encryptionKey string) (*Store, error) { crypt, err := NewFieldEncrypt(encryptionKey) if err != nil { return nil, err } - dbFile := filepath.Join(dataDir, eventSinkDB) - db, err := gorm.Open(sqlite.Open(dbFile), &gorm.Config{ - Logger: logger.Default.LogMode(logger.Silent), - }) + db, err := initDatabase(ctx, dataDir) if err != nil { - return nil, err + return nil, fmt.Errorf("initialize database: %w", err) } - sql, err := db.DB() - if err != nil { - return nil, err - } - sql.SetMaxOpenConns(1) - if err = migrate(ctx, crypt, db); err != nil { return nil, fmt.Errorf("events database migration: %w", err) } @@ -236,3 +236,52 @@ func (store *Store) Close(_ context.Context) error { } return nil } + +func initDatabase(ctx context.Context, dataDir string) (*gorm.DB, error) { + var dialector gorm.Dialector + var storeEngine = types.SqliteStoreEngine + + if engine, ok := os.LookupEnv(storeEngineEnv); ok { + storeEngine = types.Engine(engine) + } + + switch storeEngine { + case types.SqliteStoreEngine: + dialector = sqlite.Open(filepath.Join(dataDir, eventSinkDB)) + case types.PostgresStoreEngine: + dsn, ok := os.LookupEnv(postgresDsnEnv) + if !ok { + return nil, fmt.Errorf("%s environment variable not set", postgresDsnEnv) + } + dialector = postgres.Open(dsn) + default: + return nil, fmt.Errorf("unsupported store engine: %s", storeEngine) + } + log.WithContext(ctx).Infof("using %s as activity event store engine", storeEngine) + + db, err := gorm.Open(dialector, &gorm.Config{Logger: logger.Default.LogMode(logger.Silent)}) + if err != nil { + return nil, fmt.Errorf("open db connection: %w", err) + } + + return configureConnectionPool(db, storeEngine) +} + +func configureConnectionPool(db *gorm.DB, storeEngine types.Engine) (*gorm.DB, error) { + sqlDB, err := db.DB() + if err != nil { + return nil, err + } + + if storeEngine == types.SqliteStoreEngine { + sqlDB.SetMaxOpenConns(1) + } else { + conns, err := strconv.Atoi(os.Getenv(sqlMaxOpenConnsEnv)) + if err != nil { + conns = runtime.NumCPU() + } + sqlDB.SetMaxOpenConns(conns) + } + + return db, nil +} diff --git a/management/server/activity/sqlite/sqlite_test.go b/management/server/activity/store/sql_store_test.go similarity index 90% rename from management/server/activity/sqlite/sqlite_test.go rename to management/server/activity/store/sql_store_test.go index b10f9b58a..8c0d159df 100644 --- a/management/server/activity/sqlite/sqlite_test.go +++ b/management/server/activity/store/sql_store_test.go @@ -1,4 +1,4 @@ -package sqlite +package store import ( "context" @@ -11,10 +11,10 @@ import ( "github.com/netbirdio/netbird/management/server/activity" ) -func TestNewSQLiteStore(t *testing.T) { +func TestNewSqlStore(t *testing.T) { dataDir := t.TempDir() key, _ := GenerateKey() - store, err := NewSQLiteStore(context.Background(), dataDir, key) + store, err := NewSqlStore(context.Background(), dataDir, key) if err != nil { t.Fatal(err) return