mirror of
https://github.com/netbirdio/netbird.git
synced 2026-04-18 08:16:39 +00:00
[management] Add MySQL Support (#3108)
* Add mysql store support * Add support to disable activity events recording
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
nbutil "github.com/netbirdio/netbird/management/server/util"
|
||||
"github.com/netbirdio/netbird/util"
|
||||
)
|
||||
|
||||
@@ -175,8 +176,8 @@ func restore(ctx context.Context, file string) (*FileStore, error) {
|
||||
migrationPeers := make(map[string]*nbpeer.Peer) // key to Peer
|
||||
for key, peer := range account.Peers {
|
||||
// set LastLogin for the peers that were onboarded before the peer login expiration feature
|
||||
if peer.LastLogin.IsZero() {
|
||||
peer.LastLogin = time.Now().UTC()
|
||||
if peer.GetLastLogin().IsZero() {
|
||||
peer.LastLogin = nbutil.ToPtr(time.Now().UTC())
|
||||
}
|
||||
if peer.ID != "" {
|
||||
continue
|
||||
|
||||
@@ -16,6 +16,7 @@ import (
|
||||
"time"
|
||||
|
||||
log "github.com/sirupsen/logrus"
|
||||
"gorm.io/driver/mysql"
|
||||
"gorm.io/driver/postgres"
|
||||
"gorm.io/driver/sqlite"
|
||||
"gorm.io/gorm"
|
||||
@@ -39,6 +40,7 @@ const (
|
||||
storeSqliteFileName = "store.db"
|
||||
idQueryCondition = "id = ?"
|
||||
keyQueryCondition = "key = ?"
|
||||
mysqlKeyQueryCondition = "`key` = ?"
|
||||
accountAndIDQueryCondition = "account_id = ? and id = ?"
|
||||
accountAndIDsQueryCondition = "account_id = ? AND id IN ?"
|
||||
accountIDCondition = "account_id = ?"
|
||||
@@ -101,6 +103,13 @@ func NewSqlStore(ctx context.Context, db *gorm.DB, storeEngine Engine, metrics t
|
||||
return &SqlStore{db: db, storeEngine: storeEngine, metrics: metrics, installationPK: 1}, nil
|
||||
}
|
||||
|
||||
func GetKeyQueryCondition(s *SqlStore) string {
|
||||
if s.storeEngine == MysqlStoreEngine {
|
||||
return mysqlKeyQueryCondition
|
||||
}
|
||||
return keyQueryCondition
|
||||
}
|
||||
|
||||
// AcquireGlobalLock acquires global lock across all the accounts and returns a function that releases the lock
|
||||
func (s *SqlStore) AcquireGlobalLock(ctx context.Context) (unlock func()) {
|
||||
log.WithContext(ctx).Tracef("acquiring global lock")
|
||||
@@ -397,7 +406,7 @@ func (s *SqlStore) SavePeerLocation(accountID string, peerWithLocation *nbpeer.P
|
||||
return result.Error
|
||||
}
|
||||
|
||||
if result.RowsAffected == 0 {
|
||||
if result.RowsAffected == 0 && s.storeEngine != MysqlStoreEngine {
|
||||
return status.Errorf(status.NotFound, peerNotFoundFMT, peerWithLocation.ID)
|
||||
}
|
||||
|
||||
@@ -487,7 +496,7 @@ func (s *SqlStore) GetAccountIDByPrivateDomain(ctx context.Context, lockStrength
|
||||
|
||||
func (s *SqlStore) GetAccountBySetupKey(ctx context.Context, setupKey string) (*types.Account, error) {
|
||||
var key types.SetupKey
|
||||
result := s.db.Select("account_id").First(&key, keyQueryCondition, setupKey)
|
||||
result := s.db.Select("account_id").First(&key, GetKeyQueryCondition(s), setupKey)
|
||||
if result.Error != nil {
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return nil, status.NewSetupKeyNotFoundError(setupKey)
|
||||
@@ -734,7 +743,8 @@ func (s *SqlStore) GetAccountByPeerID(ctx context.Context, peerID string) (*type
|
||||
|
||||
func (s *SqlStore) GetAccountByPeerPubKey(ctx context.Context, peerKey string) (*types.Account, error) {
|
||||
var peer nbpeer.Peer
|
||||
result := s.db.Select("account_id").First(&peer, keyQueryCondition, peerKey)
|
||||
result := s.db.Select("account_id").First(&peer, GetKeyQueryCondition(s), peerKey)
|
||||
|
||||
if result.Error != nil {
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return nil, status.Errorf(status.NotFound, "account not found: index lookup failed")
|
||||
@@ -752,7 +762,7 @@ func (s *SqlStore) GetAccountByPeerPubKey(ctx context.Context, peerKey string) (
|
||||
func (s *SqlStore) GetAccountIDByPeerPubKey(ctx context.Context, peerKey string) (string, error) {
|
||||
var peer nbpeer.Peer
|
||||
var accountID string
|
||||
result := s.db.Model(&peer).Select("account_id").Where(keyQueryCondition, peerKey).First(&accountID)
|
||||
result := s.db.Model(&peer).Select("account_id").Where(GetKeyQueryCondition(s), peerKey).First(&accountID)
|
||||
if result.Error != nil {
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return "", status.Errorf(status.NotFound, "account not found: index lookup failed")
|
||||
@@ -778,7 +788,7 @@ func (s *SqlStore) GetAccountIDByUserID(userID string) (string, error) {
|
||||
|
||||
func (s *SqlStore) GetAccountIDBySetupKey(ctx context.Context, setupKey string) (string, error) {
|
||||
var accountID string
|
||||
result := s.db.Model(&types.SetupKey{}).Select("account_id").Where(keyQueryCondition, setupKey).First(&accountID)
|
||||
result := s.db.Model(&types.SetupKey{}).Select("account_id").Where(GetKeyQueryCondition(s), setupKey).First(&accountID)
|
||||
if result.Error != nil {
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return "", status.NewSetupKeyNotFoundError(setupKey)
|
||||
@@ -851,7 +861,8 @@ func (s *SqlStore) GetAccountNetwork(ctx context.Context, lockStrength LockingSt
|
||||
|
||||
func (s *SqlStore) GetPeerByPeerPubKey(ctx context.Context, lockStrength LockingStrength, peerKey string) (*nbpeer.Peer, error) {
|
||||
var peer nbpeer.Peer
|
||||
result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).First(&peer, keyQueryCondition, peerKey)
|
||||
result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).First(&peer, GetKeyQueryCondition(s), peerKey)
|
||||
|
||||
if result.Error != nil {
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return nil, status.Errorf(status.NotFound, "peer not found")
|
||||
@@ -883,9 +894,13 @@ func (s *SqlStore) SaveUserLastLogin(ctx context.Context, accountID, userID stri
|
||||
}
|
||||
return status.NewGetUserFromStoreError()
|
||||
}
|
||||
user.LastLogin = lastLogin
|
||||
|
||||
return s.db.Save(&user).Error
|
||||
if !lastLogin.IsZero() {
|
||||
user.LastLogin = &lastLogin
|
||||
return s.db.Save(&user).Error
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *SqlStore) GetPostureCheckByChecksDefinition(accountID string, checks *posture.ChecksDefinition) (*posture.Checks, error) {
|
||||
@@ -944,6 +959,16 @@ func NewPostgresqlStore(ctx context.Context, dsn string, metrics telemetry.AppMe
|
||||
return NewSqlStore(ctx, db, PostgresStoreEngine, metrics)
|
||||
}
|
||||
|
||||
// NewMysqlStore creates a new MySQL store.
|
||||
func NewMysqlStore(ctx context.Context, dsn string, metrics telemetry.AppMetrics) (*SqlStore, error) {
|
||||
db, err := gorm.Open(mysql.Open(dsn+"?charset=utf8&parseTime=True&loc=Local"), getGormConfig())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewSqlStore(ctx, db, MysqlStoreEngine, metrics)
|
||||
}
|
||||
|
||||
func getGormConfig() *gorm.Config {
|
||||
return &gorm.Config{
|
||||
Logger: logger.Default.LogMode(logger.Silent),
|
||||
@@ -961,6 +986,15 @@ func newPostgresStore(ctx context.Context, metrics telemetry.AppMetrics) (Store,
|
||||
return NewPostgresqlStore(ctx, dsn, metrics)
|
||||
}
|
||||
|
||||
// newMysqlStore initializes a new MySQL store.
|
||||
func newMysqlStore(ctx context.Context, metrics telemetry.AppMetrics) (Store, error) {
|
||||
dsn, ok := os.LookupEnv(mysqlDsnEnv)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("%s is not set", mysqlDsnEnv)
|
||||
}
|
||||
return NewMysqlStore(ctx, dsn, metrics)
|
||||
}
|
||||
|
||||
// NewSqliteStoreFromFileStore restores a store from FileStore and stores SQLite DB in the file located in datadir.
|
||||
func NewSqliteStoreFromFileStore(ctx context.Context, fileStore *FileStore, dataDir string, metrics telemetry.AppMetrics) (*SqlStore, error) {
|
||||
store, err := NewSqliteStore(ctx, dataDir, metrics)
|
||||
@@ -1005,10 +1039,33 @@ func NewPostgresqlStoreFromSqlStore(ctx context.Context, sqliteStore *SqlStore,
|
||||
return store, nil
|
||||
}
|
||||
|
||||
// NewMysqlStoreFromSqlStore restores a store from SqlStore and stores MySQL DB.
|
||||
func NewMysqlStoreFromSqlStore(ctx context.Context, sqliteStore *SqlStore, dsn string, metrics telemetry.AppMetrics) (*SqlStore, error) {
|
||||
store, err := NewMysqlStore(ctx, dsn, metrics)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = store.SaveInstallationID(ctx, sqliteStore.GetInstallationID())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, account := range sqliteStore.GetAllAccounts(ctx) {
|
||||
err := store.SaveAccount(ctx, account)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return store, nil
|
||||
}
|
||||
|
||||
func (s *SqlStore) GetSetupKeyBySecret(ctx context.Context, lockStrength LockingStrength, key string) (*types.SetupKey, error) {
|
||||
var setupKey types.SetupKey
|
||||
result := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).
|
||||
First(&setupKey, keyQueryCondition, key)
|
||||
First(&setupKey, GetKeyQueryCondition(s), key)
|
||||
|
||||
if result.Error != nil {
|
||||
if errors.Is(result.Error, gorm.ErrRecordNotFound) {
|
||||
return nil, status.NewSetupKeyNotFoundError(key)
|
||||
@@ -1300,9 +1357,13 @@ func (s *SqlStore) GetGroupByName(ctx context.Context, lockStrength LockingStren
|
||||
// TODO: This fix is accepted for now, but if we need to handle this more frequently
|
||||
// we may need to reconsider changing the types.
|
||||
query := s.db.Clauses(clause.Locking{Strength: string(lockStrength)}).Preload(clause.Associations)
|
||||
if s.storeEngine == PostgresStoreEngine {
|
||||
|
||||
switch s.storeEngine {
|
||||
case PostgresStoreEngine:
|
||||
query = query.Order("json_array_length(peers::json) DESC")
|
||||
} else {
|
||||
case MysqlStoreEngine:
|
||||
query = query.Order("JSON_LENGTH(JSON_EXTRACT(peers, \"$\")) DESC")
|
||||
default:
|
||||
query = query.Order("json_array_length(peers) DESC")
|
||||
}
|
||||
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"gorm.io/gorm"
|
||||
|
||||
"github.com/netbirdio/netbird/dns"
|
||||
"github.com/netbirdio/netbird/management/server/testutil"
|
||||
"github.com/netbirdio/netbird/management/server/types"
|
||||
|
||||
"github.com/netbirdio/netbird/management/server/telemetry"
|
||||
@@ -29,7 +30,6 @@ import (
|
||||
networkTypes "github.com/netbirdio/netbird/management/server/networks/types"
|
||||
nbpeer "github.com/netbirdio/netbird/management/server/peer"
|
||||
"github.com/netbirdio/netbird/management/server/posture"
|
||||
"github.com/netbirdio/netbird/management/server/testutil"
|
||||
"github.com/netbirdio/netbird/route"
|
||||
)
|
||||
|
||||
@@ -171,8 +171,10 @@ const (
|
||||
FileStoreEngine Engine = "jsonfile"
|
||||
SqliteStoreEngine Engine = "sqlite"
|
||||
PostgresStoreEngine Engine = "postgres"
|
||||
MysqlStoreEngine Engine = "mysql"
|
||||
|
||||
postgresDsnEnv = "NETBIRD_STORE_ENGINE_POSTGRES_DSN"
|
||||
mysqlDsnEnv = "NETBIRD_STORE_ENGINE_MYSQL_DSN"
|
||||
)
|
||||
|
||||
func getStoreEngineFromEnv() Engine {
|
||||
@@ -183,7 +185,7 @@ func getStoreEngineFromEnv() Engine {
|
||||
}
|
||||
|
||||
value := Engine(strings.ToLower(kind))
|
||||
if value == SqliteStoreEngine || value == PostgresStoreEngine {
|
||||
if value == SqliteStoreEngine || value == PostgresStoreEngine || value == MysqlStoreEngine {
|
||||
return value
|
||||
}
|
||||
|
||||
@@ -234,6 +236,9 @@ func NewStore(ctx context.Context, kind Engine, dataDir string, metrics telemetr
|
||||
case PostgresStoreEngine:
|
||||
log.WithContext(ctx).Info("using Postgres store engine")
|
||||
return newPostgresStore(ctx, metrics)
|
||||
case MysqlStoreEngine:
|
||||
log.WithContext(ctx).Info("using MySQL store engine")
|
||||
return newMysqlStore(ctx, metrics)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported kind of store: %s", kind)
|
||||
}
|
||||
@@ -317,12 +322,13 @@ func NewTestStoreFromSQL(ctx context.Context, filename string, dataDir string) (
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("failed to create test store: %v", err)
|
||||
}
|
||||
cleanUp := func() {
|
||||
store.Close(ctx)
|
||||
}
|
||||
|
||||
return getSqlStoreEngine(ctx, store, kind)
|
||||
}
|
||||
|
||||
func getSqlStoreEngine(ctx context.Context, store *SqlStore, kind Engine) (Store, func(), error) {
|
||||
if kind == PostgresStoreEngine {
|
||||
cleanUp, err = testutil.CreatePGDB()
|
||||
cleanUp, err := testutil.CreatePostgresTestContainer()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -336,9 +342,34 @@ func NewTestStoreFromSQL(ctx context.Context, filename string, dataDir string) (
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return store, cleanUp, nil
|
||||
}
|
||||
|
||||
return store, cleanUp, nil
|
||||
if kind == MysqlStoreEngine {
|
||||
cleanUp, err := testutil.CreateMysqlTestContainer()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
dsn, ok := os.LookupEnv(mysqlDsnEnv)
|
||||
if !ok {
|
||||
return nil, nil, fmt.Errorf("%s is not set", mysqlDsnEnv)
|
||||
}
|
||||
|
||||
store, err = NewMysqlStoreFromSqlStore(ctx, store, dsn, nil)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
return store, cleanUp, nil
|
||||
}
|
||||
|
||||
closeConnection := func() {
|
||||
store.Close(ctx)
|
||||
}
|
||||
|
||||
return store, closeConnection, nil
|
||||
}
|
||||
|
||||
func loadSQL(db *gorm.DB, filepath string) error {
|
||||
|
||||
Reference in New Issue
Block a user