mirror of
https://github.com/pocket-id/pocket-id.git
synced 2026-03-27 17:56:36 +00:00
feat: add support for S3 storage backend (#1080)
Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
193
backend/internal/storage/filesystem.go
Normal file
193
backend/internal/storage/filesystem.go
Normal file
@@ -0,0 +1,193 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
type filesystemStorage struct {
|
||||
root *os.Root
|
||||
absoluteRootPath string
|
||||
}
|
||||
|
||||
func NewFilesystemStorage(rootPath string) (FileStorage, error) {
|
||||
if err := os.MkdirAll(rootPath, 0700); err != nil {
|
||||
return nil, fmt.Errorf("failed to create root directory '%s': %w", rootPath, err)
|
||||
}
|
||||
root, err := os.OpenRoot(rootPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to open root directory '%s': %w", rootPath, err)
|
||||
}
|
||||
|
||||
absoluteRootPath, err := filepath.Abs(rootPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get absolute path of root directory '%s': %w", rootPath, err)
|
||||
}
|
||||
|
||||
return &filesystemStorage{root: root, absoluteRootPath: absoluteRootPath}, err
|
||||
}
|
||||
|
||||
func (s *filesystemStorage) Type() string {
|
||||
return TypeFileSystem
|
||||
}
|
||||
|
||||
func (s *filesystemStorage) Save(_ context.Context, path string, data io.Reader) error {
|
||||
path = filepath.FromSlash(path)
|
||||
|
||||
if err := s.root.MkdirAll(filepath.Dir(path), 0700); err != nil {
|
||||
return fmt.Errorf("failed to create directories for path '%s': %w", path, err)
|
||||
}
|
||||
|
||||
// Our strategy is to save to a separate file and then rename it to override the original file
|
||||
tmpName := path + "." + uuid.NewString() + "-tmp"
|
||||
|
||||
// Write to the temporary file
|
||||
tmpFile, err := s.root.Create(tmpName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open file '%s' for writing: %w", tmpName, err)
|
||||
}
|
||||
|
||||
_, err = io.Copy(tmpFile, data)
|
||||
if err != nil {
|
||||
tmpFile.Close()
|
||||
_ = s.root.Remove(tmpName)
|
||||
return fmt.Errorf("failed to write temporary file: %w", err)
|
||||
}
|
||||
|
||||
if err = tmpFile.Close(); err != nil {
|
||||
_ = s.root.Remove(tmpName)
|
||||
return fmt.Errorf("failed to close temporary file: %w", err)
|
||||
}
|
||||
|
||||
// Rename to the final file, which overrides existing files
|
||||
// This is an atomic operation
|
||||
if err = s.root.Rename(tmpName, path); err != nil {
|
||||
_ = s.root.Remove(tmpName)
|
||||
return fmt.Errorf("failed to move temporary file: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *filesystemStorage) Open(_ context.Context, path string) (io.ReadCloser, int64, error) {
|
||||
path = filepath.FromSlash(path)
|
||||
|
||||
file, err := s.root.Open(path)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
info, err := file.Stat()
|
||||
if err != nil {
|
||||
file.Close()
|
||||
return nil, 0, err
|
||||
}
|
||||
return file, info.Size(), nil
|
||||
}
|
||||
|
||||
func (s *filesystemStorage) Delete(_ context.Context, path string) error {
|
||||
path = filepath.FromSlash(path)
|
||||
|
||||
err := s.root.Remove(path)
|
||||
if err != nil && !errors.Is(err, fs.ErrNotExist) {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *filesystemStorage) DeleteAll(_ context.Context, path string) error {
|
||||
path = filepath.FromSlash(path)
|
||||
|
||||
// If "/", "." or "" is requested, we delete all contents of the root.
|
||||
if path == "" || path == "/" || path == "." {
|
||||
dir, err := s.root.Open(".")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to open root directory: %w", err)
|
||||
}
|
||||
defer dir.Close()
|
||||
|
||||
entries, err := dir.ReadDir(-1)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to list root directory: %w", err)
|
||||
}
|
||||
for _, entry := range entries {
|
||||
if err := s.root.RemoveAll(entry.Name()); err != nil {
|
||||
return fmt.Errorf("failed to delete '%s': %w", entry.Name(), err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.root.RemoveAll(path)
|
||||
}
|
||||
func (s *filesystemStorage) List(_ context.Context, path string) ([]ObjectInfo, error) {
|
||||
path = filepath.FromSlash(path)
|
||||
|
||||
dir, err := s.root.Open(path)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer dir.Close()
|
||||
|
||||
entries, err := dir.ReadDir(-1)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
objects := make([]ObjectInfo, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
if entry.IsDir() {
|
||||
continue
|
||||
}
|
||||
info, err := entry.Info()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
objects = append(objects, ObjectInfo{
|
||||
Path: filepath.Join(path, entry.Name()),
|
||||
Size: info.Size(),
|
||||
ModTime: info.ModTime(),
|
||||
})
|
||||
}
|
||||
return objects, nil
|
||||
}
|
||||
func (s *filesystemStorage) Walk(_ context.Context, root string, fn func(ObjectInfo) error) error {
|
||||
root = filepath.FromSlash(root)
|
||||
|
||||
fullPath := filepath.Clean(filepath.Join(s.absoluteRootPath, root))
|
||||
|
||||
// As we can't use os.Root here, we manually ensure that the fullPath is within the root directory
|
||||
sep := string(filepath.Separator)
|
||||
if !strings.HasPrefix(fullPath+sep, s.absoluteRootPath+sep) {
|
||||
return fmt.Errorf("invalid root path: %s", root)
|
||||
}
|
||||
|
||||
return filepath.WalkDir(fullPath, func(full string, d fs.DirEntry, err error) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
rel, err := filepath.Rel(s.absoluteRootPath, full)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
info, err := d.Info()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return fn(ObjectInfo{
|
||||
Path: filepath.ToSlash(rel),
|
||||
Size: info.Size(),
|
||||
ModTime: info.ModTime(),
|
||||
})
|
||||
})
|
||||
}
|
||||
68
backend/internal/storage/filesystem_test.go
Normal file
68
backend/internal/storage/filesystem_test.go
Normal file
@@ -0,0 +1,68 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFilesystemStorageOperations(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
store, err := NewFilesystemStorage(t.TempDir())
|
||||
require.NoError(t, err)
|
||||
|
||||
t.Run("save, open and list files", func(t *testing.T) {
|
||||
err := store.Save(ctx, "images/logo.png", bytes.NewBufferString("logo-data"))
|
||||
require.NoError(t, err)
|
||||
|
||||
reader, size, err := store.Open(ctx, "images/logo.png")
|
||||
require.NoError(t, err)
|
||||
defer reader.Close()
|
||||
|
||||
contents, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, []byte("logo-data"), contents)
|
||||
assert.Equal(t, int64(len(contents)), size)
|
||||
|
||||
err = store.Save(ctx, "images/nested/child.txt", bytes.NewBufferString("child"))
|
||||
require.NoError(t, err)
|
||||
|
||||
files, err := store.List(ctx, "images")
|
||||
require.NoError(t, err)
|
||||
require.Len(t, files, 1)
|
||||
assert.Equal(t, filepath.Join("images", "logo.png"), files[0].Path)
|
||||
assert.Equal(t, int64(len("logo-data")), files[0].Size)
|
||||
})
|
||||
|
||||
t.Run("delete files individually and idempotently", func(t *testing.T) {
|
||||
err := store.Save(ctx, "images/delete-me.txt", bytes.NewBufferString("temp"))
|
||||
require.NoError(t, err)
|
||||
|
||||
require.NoError(t, store.Delete(ctx, "images/delete-me.txt"))
|
||||
_, _, err = store.Open(ctx, "images/delete-me.txt")
|
||||
require.Error(t, err)
|
||||
assert.True(t, IsNotExist(err))
|
||||
|
||||
// Deleting a missing object should be a no-op.
|
||||
require.NoError(t, store.Delete(ctx, "images/missing.txt"))
|
||||
})
|
||||
|
||||
t.Run("delete all files under a prefix", func(t *testing.T) {
|
||||
require.NoError(t, store.Save(ctx, "images/a.txt", bytes.NewBufferString("a")))
|
||||
require.NoError(t, store.Save(ctx, "images/b.txt", bytes.NewBufferString("b")))
|
||||
require.NoError(t, store.DeleteAll(ctx, "images"))
|
||||
|
||||
_, _, err := store.Open(ctx, "images/a.txt")
|
||||
require.Error(t, err)
|
||||
assert.True(t, IsNotExist(err))
|
||||
|
||||
_, _, err = store.Open(ctx, "images/b.txt")
|
||||
require.Error(t, err)
|
||||
assert.True(t, IsNotExist(err))
|
||||
})
|
||||
}
|
||||
185
backend/internal/storage/s3.go
Normal file
185
backend/internal/storage/s3.go
Normal file
@@ -0,0 +1,185 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
awscfg "github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/s3"
|
||||
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/aws/smithy-go"
|
||||
)
|
||||
|
||||
type S3Config struct {
|
||||
Bucket string
|
||||
Region string
|
||||
Endpoint string
|
||||
AccessKeyID string
|
||||
SecretAccessKey string
|
||||
ForcePathStyle bool
|
||||
Root string
|
||||
}
|
||||
|
||||
type s3Storage struct {
|
||||
client *s3.Client
|
||||
bucket string
|
||||
prefix string
|
||||
}
|
||||
|
||||
func NewS3Storage(ctx context.Context, cfg S3Config) (FileStorage, error) {
|
||||
creds := credentials.NewStaticCredentialsProvider(cfg.AccessKeyID, cfg.SecretAccessKey, "")
|
||||
awsCfg, err := awscfg.LoadDefaultConfig(ctx, awscfg.WithRegion(cfg.Region), awscfg.WithCredentialsProvider(creds))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to load AWS configuration: %w", err)
|
||||
}
|
||||
client := s3.NewFromConfig(awsCfg, func(o *s3.Options) {
|
||||
if cfg.Endpoint != "" {
|
||||
o.BaseEndpoint = aws.String(cfg.Endpoint)
|
||||
}
|
||||
o.UsePathStyle = cfg.ForcePathStyle
|
||||
})
|
||||
|
||||
return &s3Storage{
|
||||
client: client,
|
||||
bucket: cfg.Bucket,
|
||||
prefix: strings.Trim(cfg.Root, "/"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *s3Storage) Type() string {
|
||||
return TypeS3
|
||||
}
|
||||
|
||||
func (s *s3Storage) Save(ctx context.Context, path string, data io.Reader) error {
|
||||
_, err := s.client.PutObject(ctx, &s3.PutObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(s.buildObjectKey(path)),
|
||||
Body: data,
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *s3Storage) Open(ctx context.Context, path string) (io.ReadCloser, int64, error) {
|
||||
resp, err := s.client.GetObject(ctx, &s3.GetObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(s.buildObjectKey(path)),
|
||||
})
|
||||
if err != nil {
|
||||
if isS3NotFound(err) {
|
||||
return nil, 0, fs.ErrNotExist
|
||||
}
|
||||
return nil, 0, err
|
||||
}
|
||||
return resp.Body, aws.ToInt64(resp.ContentLength), nil
|
||||
}
|
||||
|
||||
func (s *s3Storage) Delete(ctx context.Context, path string) error {
|
||||
_, err := s.client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Key: aws.String(s.buildObjectKey(path)),
|
||||
})
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *s3Storage) DeleteAll(ctx context.Context, path string) error {
|
||||
|
||||
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Prefix: aws.String(s.buildObjectKey(path)),
|
||||
})
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(page.Contents) == 0 {
|
||||
continue
|
||||
}
|
||||
objects := make([]s3types.ObjectIdentifier, 0, len(page.Contents))
|
||||
for _, obj := range page.Contents {
|
||||
objects = append(objects, s3types.ObjectIdentifier{Key: obj.Key})
|
||||
}
|
||||
_, err = s.client.DeleteObjects(ctx, &s3.DeleteObjectsInput{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Delete: &s3types.Delete{Objects: objects, Quiet: aws.Bool(true)},
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *s3Storage) List(ctx context.Context, path string) ([]ObjectInfo, error) {
|
||||
paginator := s3.NewListObjectsV2Paginator(s.client, &s3.ListObjectsV2Input{
|
||||
Bucket: aws.String(s.bucket),
|
||||
Prefix: aws.String(s.buildObjectKey(path)),
|
||||
})
|
||||
var objects []ObjectInfo
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
for _, obj := range page.Contents {
|
||||
if obj.Key == nil {
|
||||
continue
|
||||
}
|
||||
objects = append(objects, ObjectInfo{
|
||||
Path: aws.ToString(obj.Key),
|
||||
Size: aws.ToInt64(obj.Size),
|
||||
ModTime: aws.ToTime(obj.LastModified),
|
||||
})
|
||||
}
|
||||
}
|
||||
return objects, nil
|
||||
}
|
||||
|
||||
func (s *s3Storage) Walk(ctx context.Context, root string, fn func(ObjectInfo) error) error {
|
||||
objects, err := s.List(ctx, root)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, obj := range objects {
|
||||
if err := fn(obj); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *s3Storage) buildObjectKey(p string) string {
|
||||
p = filepath.Clean(p)
|
||||
p = filepath.ToSlash(p)
|
||||
p = strings.Trim(p, "/")
|
||||
|
||||
if p == "" || p == "." {
|
||||
return s.prefix
|
||||
}
|
||||
|
||||
if s.prefix == "" {
|
||||
return p
|
||||
}
|
||||
|
||||
return s.prefix + "/" + p
|
||||
}
|
||||
|
||||
func isS3NotFound(err error) bool {
|
||||
var apiErr smithy.APIError
|
||||
if errors.As(err, &apiErr) {
|
||||
if apiErr.ErrorCode() == "NotFound" || apiErr.ErrorCode() == "NoSuchKey" {
|
||||
return true
|
||||
}
|
||||
}
|
||||
var missingKey *s3types.NoSuchKey
|
||||
return errors.As(err, &missingKey)
|
||||
}
|
||||
44
backend/internal/storage/s3_test.go
Normal file
44
backend/internal/storage/s3_test.go
Normal file
@@ -0,0 +1,44 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
s3types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
||||
"github.com/aws/smithy-go"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestS3Helpers(t *testing.T) {
|
||||
t.Run("buildObjectKey trims and joins prefix", func(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
prefix string
|
||||
path string
|
||||
expected string
|
||||
}{
|
||||
{name: "no prefix no path", prefix: "", path: "", expected: ""},
|
||||
{name: "prefix no path", prefix: "root", path: "", expected: "root"},
|
||||
{name: "prefix with nested path", prefix: "root", path: "foo/bar/baz", expected: "root/foo/bar/baz"},
|
||||
{name: "trimmed path and prefix", prefix: "root", path: "/foo//bar/", expected: "root/foo/bar"},
|
||||
{name: "no prefix path only", prefix: "", path: "./images/logo.png", expected: "images/logo.png"},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
s := &s3Storage{
|
||||
bucket: "bucket",
|
||||
prefix: tc.prefix,
|
||||
}
|
||||
assert.Equal(t, tc.expected, s.buildObjectKey(tc.path))
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("isS3NotFound detects expected errors", func(t *testing.T) {
|
||||
assert.True(t, isS3NotFound(&smithy.GenericAPIError{Code: "NoSuchKey"}))
|
||||
assert.True(t, isS3NotFound(&smithy.GenericAPIError{Code: "NotFound"}))
|
||||
assert.True(t, isS3NotFound(&s3types.NoSuchKey{}))
|
||||
assert.False(t, isS3NotFound(errors.New("boom")))
|
||||
})
|
||||
}
|
||||
33
backend/internal/storage/storage.go
Normal file
33
backend/internal/storage/storage.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"os"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
TypeFileSystem = "fs"
|
||||
TypeS3 = "s3"
|
||||
)
|
||||
|
||||
type ObjectInfo struct {
|
||||
Path string
|
||||
Size int64
|
||||
ModTime time.Time
|
||||
}
|
||||
|
||||
type FileStorage interface {
|
||||
Save(ctx context.Context, relativePath string, data io.Reader) error
|
||||
Open(ctx context.Context, relativePath string) (io.ReadCloser, int64, error)
|
||||
Delete(ctx context.Context, relativePath string) error
|
||||
DeleteAll(ctx context.Context, prefix string) error
|
||||
List(ctx context.Context, prefix string) ([]ObjectInfo, error)
|
||||
Walk(ctx context.Context, root string, fn func(ObjectInfo) error) error
|
||||
Type() string
|
||||
}
|
||||
|
||||
func IsNotExist(err error) bool {
|
||||
return os.IsNotExist(err)
|
||||
}
|
||||
Reference in New Issue
Block a user