create_aio
This commit is contained in:
195
internal/filesvc/memstore.go
Normal file
195
internal/filesvc/memstore.go
Normal file
@@ -0,0 +1,195 @@
|
||||
package filesvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"slices"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type MemStore struct {
|
||||
mu sync.Mutex
|
||||
items map[ID]File
|
||||
next ID
|
||||
|
||||
// optionales Eventing
|
||||
subs []chan ChangeEvent
|
||||
}
|
||||
|
||||
func NewMemStore() *MemStore {
|
||||
return &MemStore{
|
||||
items: make(map[ID]File),
|
||||
next: 1,
|
||||
}
|
||||
}
|
||||
|
||||
/*** Store ***/
|
||||
|
||||
func (m *MemStore) Get(_ context.Context, id ID) (File, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
it, ok := m.items[id]
|
||||
if !ok || it.Deleted {
|
||||
return File{}, ErrNotFound
|
||||
}
|
||||
return it, nil
|
||||
}
|
||||
|
||||
func (m *MemStore) List(_ context.Context, next ID, limit int) ([]File, ID, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if limit <= 0 || limit > 1000 {
|
||||
limit = 100
|
||||
}
|
||||
ids := make([]ID, 0, len(m.items))
|
||||
for id := range m.items {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
slices.Sort(ids)
|
||||
|
||||
start := 0
|
||||
if next > 0 {
|
||||
for i, id := range ids {
|
||||
if id >= next {
|
||||
start = i
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
end := start + limit
|
||||
if end > len(ids) {
|
||||
end = len(ids)
|
||||
}
|
||||
|
||||
out := make([]File, 0, end-start)
|
||||
for _, id := range ids[start:end] {
|
||||
it := m.items[id]
|
||||
if !it.Deleted {
|
||||
out = append(out, it)
|
||||
}
|
||||
}
|
||||
var nextOut ID
|
||||
if end < len(ids) {
|
||||
nextOut = ids[end]
|
||||
}
|
||||
return out, nextOut, nil
|
||||
}
|
||||
|
||||
func (m *MemStore) Create(_ context.Context, name string) (File, error) {
|
||||
name = strings.TrimSpace(name)
|
||||
if name == "" {
|
||||
return File{}, ErrBadInput
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
now := time.Now().UnixNano()
|
||||
it := File{ID: m.next, Name: name, UpdatedAt: now}
|
||||
m.items[it.ID] = it
|
||||
m.next++
|
||||
m.emit(it)
|
||||
return it, nil
|
||||
}
|
||||
|
||||
func (m *MemStore) Rename(_ context.Context, id ID, newName string) (File, error) {
|
||||
newName = strings.TrimSpace(newName)
|
||||
if newName == "" {
|
||||
return File{}, ErrBadInput
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
it, ok := m.items[id]
|
||||
if !ok || it.Deleted {
|
||||
return File{}, ErrNotFound
|
||||
}
|
||||
it.Name = newName
|
||||
it.UpdatedAt = time.Now().UnixNano()
|
||||
m.items[id] = it
|
||||
m.emit(it)
|
||||
return it, nil
|
||||
}
|
||||
|
||||
func (m *MemStore) Delete(_ context.Context, id ID) (File, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
it, ok := m.items[id]
|
||||
if !ok {
|
||||
return File{}, ErrNotFound
|
||||
}
|
||||
if it.Deleted {
|
||||
return it, nil
|
||||
}
|
||||
it.Deleted = true
|
||||
it.UpdatedAt = time.Now().UnixNano()
|
||||
m.items[id] = it
|
||||
m.emit(it)
|
||||
return it, nil
|
||||
}
|
||||
|
||||
/*** Replicable ***/
|
||||
|
||||
func (m *MemStore) Snapshot(_ context.Context) (Snapshot, error) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
s := Snapshot{Items: make([]File, 0, len(m.items))}
|
||||
for _, it := range m.items {
|
||||
s.Items = append(s.Items, it) // inkl. Tombstones
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (m *MemStore) ApplyRemote(_ context.Context, s Snapshot) error {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
for _, ri := range s.Items {
|
||||
li, ok := m.items[ri.ID]
|
||||
if !ok || ri.UpdatedAt > li.UpdatedAt {
|
||||
m.items[ri.ID] = ri
|
||||
if ri.ID >= m.next {
|
||||
m.next = ri.ID + 1
|
||||
}
|
||||
m.emitLocked(ri)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
/*** Watchable (optional) ***/
|
||||
|
||||
func (m *MemStore) Watch(stop <-chan struct{}) <-chan ChangeEvent {
|
||||
ch := make(chan ChangeEvent, 32)
|
||||
|
||||
m.mu.Lock()
|
||||
m.subs = append(m.subs, ch)
|
||||
m.mu.Unlock()
|
||||
|
||||
go func() {
|
||||
<-stop
|
||||
m.mu.Lock()
|
||||
// entferne ch aus subs
|
||||
for i, s := range m.subs {
|
||||
if s == ch {
|
||||
m.subs = append(m.subs[:i], m.subs[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
close(ch)
|
||||
}()
|
||||
return ch
|
||||
}
|
||||
|
||||
func (m *MemStore) emit(it File) {
|
||||
m.emitLocked(it) // mu wird im Aufrufer gehalten
|
||||
}
|
||||
func (m *MemStore) emitLocked(it File) {
|
||||
ev := ChangeEvent{At: time.Now(), Item: it}
|
||||
for _, s := range m.subs {
|
||||
select {
|
||||
case s <- ev:
|
||||
default: /* drop wenn voll */
|
||||
}
|
||||
}
|
||||
}
|
||||
78
internal/filesvc/store.go
Normal file
78
internal/filesvc/store.go
Normal file
@@ -0,0 +1,78 @@
|
||||
package filesvc
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
)
|
||||
|
||||
/*** Domain ***/
|
||||
|
||||
type ID = int64
|
||||
|
||||
type File struct {
|
||||
ID ID `json:"id"`
|
||||
Name string `json:"name"`
|
||||
// weitere Metadaten optional: Size, Hash, Owner, Tags, ...
|
||||
UpdatedAt int64 `json:"updatedAt"` // UnixNano für LWW
|
||||
Deleted bool `json:"deleted"` // Tombstone für Mesh-Delete
|
||||
}
|
||||
|
||||
/*** Fehler ***/
|
||||
|
||||
var (
|
||||
ErrNotFound = errors.New("file not found")
|
||||
ErrBadInput = errors.New("bad input")
|
||||
ErrConflict = errors.New("conflict")
|
||||
ErrForbidden = errors.New("forbidden")
|
||||
ErrTransient = errors.New("transient")
|
||||
)
|
||||
|
||||
/*** Basis-API (lokal nutzbar) ***/
|
||||
|
||||
type Store interface {
|
||||
// Lesen & Auflisten
|
||||
Get(ctx context.Context, id ID) (File, error)
|
||||
List(ctx context.Context, next ID, limit int) (items []File, nextOut ID, err error)
|
||||
|
||||
// Mutationen mit LWW-Semantik (UpdatedAt wird intern gesetzt, außer bei ApplyRemote)
|
||||
Create(ctx context.Context, name string) (File, error)
|
||||
Rename(ctx context.Context, id ID, newName string) (File, error)
|
||||
Delete(ctx context.Context, id ID) (File, error)
|
||||
}
|
||||
|
||||
/*** Mesh-Replikation ***/
|
||||
|
||||
type Snapshot struct {
|
||||
Items []File `json:"items"`
|
||||
}
|
||||
|
||||
type Replicable interface {
|
||||
// Snapshot liefert den vollständigen aktuellen Stand (inkl. Tombstones).
|
||||
Snapshot(ctx context.Context) (Snapshot, error)
|
||||
// ApplyRemote wendet LWW an. next-ID wird dabei korrekt fortgeschrieben.
|
||||
ApplyRemote(ctx context.Context, s Snapshot) error
|
||||
}
|
||||
|
||||
/*** Events (optional) ***/
|
||||
|
||||
// ChangeEvent kann genutzt werden, um proaktive Mesh-Pushes zu triggern.
|
||||
// Bei deiner Pull-basierten Anti-Entropy ist es optional.
|
||||
type ChangeEvent struct {
|
||||
At time.Time
|
||||
Item File
|
||||
}
|
||||
|
||||
// Watch gibt Änderungen aus; close(stop) beendet den Stream.
|
||||
// Eine Noop-Implementierung ist erlaubt, wenn Pull-Sync genügt.
|
||||
type Watchable interface {
|
||||
Watch(stop <-chan struct{}) <-chan ChangeEvent
|
||||
}
|
||||
|
||||
/*** Kombiniertes Interface ***/
|
||||
|
||||
type MeshStore interface {
|
||||
Store
|
||||
Replicable
|
||||
Watchable // optional – kann Noop sein
|
||||
}
|
||||
Reference in New Issue
Block a user