Persistenz implementiert : Test-3
All checks were successful
release-tag / release-image (push) Successful in 1m27s
All checks were successful
release-tag / release-image (push) Successful in 1m27s
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"path/filepath"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
@@ -480,11 +481,14 @@ func main() {
|
|||||||
|
|
||||||
// Domain-Store (mesh-fähig)
|
// Domain-Store (mesh-fähig)
|
||||||
nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL)
|
nodeID := strings.TrimSpace(cfg.Mesh.AdvertURL)
|
||||||
st := filesvc.NewMemStore(nodeID)
|
//st := filesvc.NewMemStore(nodeID)
|
||||||
|
|
||||||
// Mesh starten
|
// Mesh starten
|
||||||
//mcfg := mesh.FromEnv()
|
//mcfg := mesh.FromEnv()
|
||||||
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
|
blobs := blobfs.New(getenvDefault("DATA_DIR", "./data"))
|
||||||
|
dataDir := getenvDefault("DATA_DIR", "./data")
|
||||||
|
metaPath := filepath.Join(dataDir, "meta", "items.json")
|
||||||
|
st := filesvc.NewMemStorePersistent(nodeID, metaPath)
|
||||||
|
|
||||||
mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{
|
mnode, err := mesh.New(cfg.Mesh, mesh.Callbacks{
|
||||||
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
|
GetSnapshot: func(ctx context.Context) (mesh.Snapshot, error) {
|
||||||
|
|||||||
@@ -2,6 +2,9 @@ package filesvc
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"slices"
|
"slices"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -13,13 +16,82 @@ type MemStore struct {
|
|||||||
items map[ID]File
|
items map[ID]File
|
||||||
self string
|
self string
|
||||||
// optionales Eventing
|
// optionales Eventing
|
||||||
subs []chan ChangeEvent
|
subs []chan ChangeEvent
|
||||||
|
persistPath string
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMemStore(self string) *MemStore {
|
func NewMemStore(self string) *MemStore {
|
||||||
return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)}
|
return &MemStore{self: strings.TrimSpace(self), items: make(map[ID]File)}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func NewMemStorePersistent(self, path string) *MemStore {
|
||||||
|
m := NewMemStore(self)
|
||||||
|
m.persistPath = strings.TrimSpace(path)
|
||||||
|
// beim Start versuchen zu laden
|
||||||
|
_ = m.loadFromDisk()
|
||||||
|
return m
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Persistenz-Helper (NEU) ---
|
||||||
|
|
||||||
|
func (m *MemStore) loadFromDisk() error {
|
||||||
|
if m.persistPath == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
f, err := os.Open(m.persistPath)
|
||||||
|
if err != nil {
|
||||||
|
return nil // Datei existiert beim ersten Start nicht – ok
|
||||||
|
}
|
||||||
|
defer f.Close()
|
||||||
|
var snap Snapshot
|
||||||
|
if err := json.NewDecoder(f).Decode(&snap); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
m.mu.Lock()
|
||||||
|
for _, it := range snap.Items {
|
||||||
|
m.items[it.ID] = it
|
||||||
|
}
|
||||||
|
m.mu.Unlock()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *MemStore) saveLocked() error {
|
||||||
|
if m.persistPath == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err := os.MkdirAll(filepath.Dir(m.persistPath), 0o755); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Snapshot aus Map bauen
|
||||||
|
snap := Snapshot{Items: make([]File, 0, len(m.items))}
|
||||||
|
for _, it := range m.items {
|
||||||
|
snap.Items = append(snap.Items, it)
|
||||||
|
}
|
||||||
|
// atomar schreiben
|
||||||
|
tmp := m.persistPath + ".tmp"
|
||||||
|
f, err := os.Create(tmp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
enc := json.NewEncoder(f)
|
||||||
|
enc.SetIndent("", " ")
|
||||||
|
if err := enc.Encode(&snap); err != nil {
|
||||||
|
f.Close()
|
||||||
|
_ = os.Remove(tmp)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f.Sync(); err != nil {
|
||||||
|
f.Close()
|
||||||
|
_ = os.Remove(tmp)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := f.Close(); err != nil {
|
||||||
|
_ = os.Remove(tmp)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return os.Rename(tmp, m.persistPath)
|
||||||
|
}
|
||||||
|
|
||||||
/*** Store ***/
|
/*** Store ***/
|
||||||
|
|
||||||
func (m *MemStore) Get(_ context.Context, id ID) (File, error) {
|
func (m *MemStore) Get(_ context.Context, id ID) (File, error) {
|
||||||
@@ -102,6 +174,7 @@ func (m *MemStore) Create(_ context.Context, name string) (File, error) {
|
|||||||
}
|
}
|
||||||
it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self}
|
it := File{ID: uid, Name: name, UpdatedAt: now, Owner: m.self}
|
||||||
m.items[it.ID] = it
|
m.items[it.ID] = it
|
||||||
|
_ = m.saveLocked()
|
||||||
m.emit(it)
|
m.emit(it)
|
||||||
return it, nil
|
return it, nil
|
||||||
}
|
}
|
||||||
@@ -124,6 +197,7 @@ func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error
|
|||||||
it.Name = strings.TrimSpace(newName)
|
it.Name = strings.TrimSpace(newName)
|
||||||
it.UpdatedAt = time.Now().UnixNano()
|
it.UpdatedAt = time.Now().UnixNano()
|
||||||
m.items[id] = it
|
m.items[id] = it
|
||||||
|
_ = m.saveLocked()
|
||||||
m.emit(it)
|
m.emit(it)
|
||||||
return it, nil
|
return it, nil
|
||||||
}
|
}
|
||||||
@@ -144,6 +218,7 @@ func (m *MemStore) Delete(_ context.Context, id ID) (File, error) {
|
|||||||
it.Deleted = true
|
it.Deleted = true
|
||||||
it.UpdatedAt = time.Now().UnixNano()
|
it.UpdatedAt = time.Now().UnixNano()
|
||||||
m.items[id] = it
|
m.items[id] = it
|
||||||
|
_ = m.saveLocked()
|
||||||
m.emit(it)
|
m.emit(it)
|
||||||
return it, nil
|
return it, nil
|
||||||
}
|
}
|
||||||
@@ -169,6 +244,7 @@ func (m *MemStore) TakeoverOwner(_ context.Context, id ID, newOwner string) (Fil
|
|||||||
it.Owner = newOwner
|
it.Owner = newOwner
|
||||||
it.UpdatedAt = time.Now().UnixNano()
|
it.UpdatedAt = time.Now().UnixNano()
|
||||||
m.items[id] = it
|
m.items[id] = it
|
||||||
|
_ = m.saveLocked()
|
||||||
m.emitLocked(it)
|
m.emitLocked(it)
|
||||||
return it, nil
|
return it, nil
|
||||||
}
|
}
|
||||||
@@ -188,6 +264,7 @@ func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
|
|||||||
func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
|
func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
|
||||||
m.mu.Lock()
|
m.mu.Lock()
|
||||||
defer m.mu.Unlock()
|
defer m.mu.Unlock()
|
||||||
|
changed := false
|
||||||
for _, ri := range s.Items {
|
for _, ri := range s.Items {
|
||||||
li, ok := m.items[ri.ID]
|
li, ok := m.items[ri.ID]
|
||||||
if !ok || ri.UpdatedAt > li.UpdatedAt {
|
if !ok || ri.UpdatedAt > li.UpdatedAt {
|
||||||
@@ -196,9 +273,13 @@ func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
|
|||||||
ri.Owner = li.Owner
|
ri.Owner = li.Owner
|
||||||
}
|
}
|
||||||
m.items[ri.ID] = ri
|
m.items[ri.ID] = ri
|
||||||
|
changed = true
|
||||||
m.emitLocked(ri)
|
m.emitLocked(ri)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if changed {
|
||||||
|
_ = m.saveLocked() // ← NEU
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user