| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947 |
- package s3api
- import (
- "context"
- "encoding/json"
- "errors"
- "fmt"
- "path/filepath"
- "strings"
- "sync"
- "time"
- "github.com/aws/aws-sdk-go/service/s3"
- "google.golang.org/protobuf/proto"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/kms"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
- "github.com/seaweedfs/seaweedfs/weed/s3api/cors"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- )
- // BucketConfig represents cached bucket configuration
- type BucketConfig struct {
- Name string
- Versioning string // "Enabled", "Suspended", or ""
- Ownership string
- ACL []byte
- Owner string
- IsPublicRead bool // Cached flag to avoid JSON parsing on every request
- CORS *cors.CORSConfiguration
- ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration
- KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations
- LastModified time.Time
- Entry *filer_pb.Entry
- }
- // BucketKMSCache represents per-bucket KMS key caching for SSE-KMS operations
- // This provides better isolation and automatic cleanup compared to global caching
- type BucketKMSCache struct {
- cache map[string]*BucketKMSCacheEntry // Key: contextHash, Value: cached data key
- mutex sync.RWMutex
- bucket string // Bucket name for logging/debugging
- lastTTL time.Duration // TTL used for cache entries (typically 1 hour)
- }
- // BucketKMSCacheEntry represents a single cached KMS data key
- type BucketKMSCacheEntry struct {
- DataKey interface{} // Could be *kms.GenerateDataKeyResponse or similar
- ExpiresAt time.Time
- KeyID string
- ContextHash string // Hash of encryption context for cache validation
- }
- // NewBucketKMSCache creates a new per-bucket KMS key cache
- func NewBucketKMSCache(bucketName string, ttl time.Duration) *BucketKMSCache {
- return &BucketKMSCache{
- cache: make(map[string]*BucketKMSCacheEntry),
- bucket: bucketName,
- lastTTL: ttl,
- }
- }
- // Get retrieves a cached KMS data key if it exists and hasn't expired
- func (bkc *BucketKMSCache) Get(contextHash string) (*BucketKMSCacheEntry, bool) {
- if bkc == nil {
- return nil, false
- }
- bkc.mutex.RLock()
- defer bkc.mutex.RUnlock()
- entry, exists := bkc.cache[contextHash]
- if !exists {
- return nil, false
- }
- // Check if entry has expired
- if time.Now().After(entry.ExpiresAt) {
- return nil, false
- }
- return entry, true
- }
- // Set stores a KMS data key in the cache
- func (bkc *BucketKMSCache) Set(contextHash, keyID string, dataKey interface{}, ttl time.Duration) {
- if bkc == nil {
- return
- }
- bkc.mutex.Lock()
- defer bkc.mutex.Unlock()
- bkc.cache[contextHash] = &BucketKMSCacheEntry{
- DataKey: dataKey,
- ExpiresAt: time.Now().Add(ttl),
- KeyID: keyID,
- ContextHash: contextHash,
- }
- bkc.lastTTL = ttl
- }
- // CleanupExpired removes expired entries from the cache
- func (bkc *BucketKMSCache) CleanupExpired() int {
- if bkc == nil {
- return 0
- }
- bkc.mutex.Lock()
- defer bkc.mutex.Unlock()
- now := time.Now()
- expiredCount := 0
- for key, entry := range bkc.cache {
- if now.After(entry.ExpiresAt) {
- // Clear sensitive data before removing from cache
- bkc.clearSensitiveData(entry)
- delete(bkc.cache, key)
- expiredCount++
- }
- }
- return expiredCount
- }
- // Size returns the current number of cached entries
- func (bkc *BucketKMSCache) Size() int {
- if bkc == nil {
- return 0
- }
- bkc.mutex.RLock()
- defer bkc.mutex.RUnlock()
- return len(bkc.cache)
- }
- // clearSensitiveData securely clears sensitive data from a cache entry
- func (bkc *BucketKMSCache) clearSensitiveData(entry *BucketKMSCacheEntry) {
- if dataKeyResp, ok := entry.DataKey.(*kms.GenerateDataKeyResponse); ok {
- // Zero out the plaintext data key to prevent it from lingering in memory
- if dataKeyResp.Plaintext != nil {
- for i := range dataKeyResp.Plaintext {
- dataKeyResp.Plaintext[i] = 0
- }
- dataKeyResp.Plaintext = nil
- }
- }
- }
- // Clear clears all cached KMS entries, securely zeroing sensitive data first
- func (bkc *BucketKMSCache) Clear() {
- if bkc == nil {
- return
- }
- bkc.mutex.Lock()
- defer bkc.mutex.Unlock()
- // Clear sensitive data from all entries before deletion
- for _, entry := range bkc.cache {
- bkc.clearSensitiveData(entry)
- }
- // Clear the cache map
- bkc.cache = make(map[string]*BucketKMSCacheEntry)
- }
- // BucketConfigCache provides caching for bucket configurations
- // Cache entries are automatically updated/invalidated through metadata subscription events,
- // so TTL serves as a safety fallback rather than the primary consistency mechanism
- type BucketConfigCache struct {
- cache map[string]*BucketConfig
- negativeCache map[string]time.Time // Cache for non-existent buckets
- mutex sync.RWMutex
- ttl time.Duration // Safety fallback TTL; real-time consistency maintained via events
- negativeTTL time.Duration // TTL for negative cache entries
- }
- // BucketMetadata represents the complete metadata for a bucket
- type BucketMetadata struct {
- Tags map[string]string `json:"tags,omitempty"`
- CORS *cors.CORSConfiguration `json:"cors,omitempty"`
- Encryption *s3_pb.EncryptionConfiguration `json:"encryption,omitempty"`
- // Future extensions can be added here:
- // Versioning *s3_pb.VersioningConfiguration `json:"versioning,omitempty"`
- // Lifecycle *s3_pb.LifecycleConfiguration `json:"lifecycle,omitempty"`
- // Notification *s3_pb.NotificationConfiguration `json:"notification,omitempty"`
- // Replication *s3_pb.ReplicationConfiguration `json:"replication,omitempty"`
- // Analytics *s3_pb.AnalyticsConfiguration `json:"analytics,omitempty"`
- // Logging *s3_pb.LoggingConfiguration `json:"logging,omitempty"`
- // Website *s3_pb.WebsiteConfiguration `json:"website,omitempty"`
- // RequestPayer *s3_pb.RequestPayerConfiguration `json:"requestPayer,omitempty"`
- // PublicAccess *s3_pb.PublicAccessConfiguration `json:"publicAccess,omitempty"`
- }
- // NewBucketMetadata creates a new BucketMetadata with default values
- func NewBucketMetadata() *BucketMetadata {
- return &BucketMetadata{
- Tags: make(map[string]string),
- }
- }
- // IsEmpty returns true if the metadata has no configuration set
- func (bm *BucketMetadata) IsEmpty() bool {
- return len(bm.Tags) == 0 && bm.CORS == nil && bm.Encryption == nil
- }
- // HasEncryption returns true if bucket has encryption configuration
- func (bm *BucketMetadata) HasEncryption() bool {
- return bm.Encryption != nil
- }
- // HasCORS returns true if bucket has CORS configuration
- func (bm *BucketMetadata) HasCORS() bool {
- return bm.CORS != nil
- }
- // HasTags returns true if bucket has tags
- func (bm *BucketMetadata) HasTags() bool {
- return len(bm.Tags) > 0
- }
- // NewBucketConfigCache creates a new bucket configuration cache
- // TTL can be set to a longer duration since cache consistency is maintained
- // through real-time metadata subscription events rather than TTL expiration
- func NewBucketConfigCache(ttl time.Duration) *BucketConfigCache {
- negativeTTL := ttl / 4 // Negative cache TTL is shorter than positive cache
- if negativeTTL < 30*time.Second {
- negativeTTL = 30 * time.Second // Minimum 30 seconds for negative cache
- }
- return &BucketConfigCache{
- cache: make(map[string]*BucketConfig),
- negativeCache: make(map[string]time.Time),
- ttl: ttl,
- negativeTTL: negativeTTL,
- }
- }
- // Get retrieves bucket configuration from cache
- func (bcc *BucketConfigCache) Get(bucket string) (*BucketConfig, bool) {
- bcc.mutex.RLock()
- defer bcc.mutex.RUnlock()
- config, exists := bcc.cache[bucket]
- if !exists {
- return nil, false
- }
- // Check if cache entry is expired (safety fallback; entries are normally updated via events)
- if time.Since(config.LastModified) > bcc.ttl {
- return nil, false
- }
- return config, true
- }
- // Set stores bucket configuration in cache
- func (bcc *BucketConfigCache) Set(bucket string, config *BucketConfig) {
- bcc.mutex.Lock()
- defer bcc.mutex.Unlock()
- config.LastModified = time.Now()
- bcc.cache[bucket] = config
- }
- // Remove removes bucket configuration from cache
- func (bcc *BucketConfigCache) Remove(bucket string) {
- bcc.mutex.Lock()
- defer bcc.mutex.Unlock()
- delete(bcc.cache, bucket)
- }
- // Clear clears all cached configurations
- func (bcc *BucketConfigCache) Clear() {
- bcc.mutex.Lock()
- defer bcc.mutex.Unlock()
- bcc.cache = make(map[string]*BucketConfig)
- bcc.negativeCache = make(map[string]time.Time)
- }
- // IsNegativelyCached checks if a bucket is in the negative cache (doesn't exist)
- func (bcc *BucketConfigCache) IsNegativelyCached(bucket string) bool {
- bcc.mutex.RLock()
- defer bcc.mutex.RUnlock()
- if cachedTime, exists := bcc.negativeCache[bucket]; exists {
- // Check if the negative cache entry is still valid
- if time.Since(cachedTime) < bcc.negativeTTL {
- return true
- }
- // Entry expired, remove it
- delete(bcc.negativeCache, bucket)
- }
- return false
- }
- // SetNegativeCache marks a bucket as non-existent in the negative cache
- func (bcc *BucketConfigCache) SetNegativeCache(bucket string) {
- bcc.mutex.Lock()
- defer bcc.mutex.Unlock()
- bcc.negativeCache[bucket] = time.Now()
- }
- // RemoveNegativeCache removes a bucket from the negative cache
- func (bcc *BucketConfigCache) RemoveNegativeCache(bucket string) {
- bcc.mutex.Lock()
- defer bcc.mutex.Unlock()
- delete(bcc.negativeCache, bucket)
- }
- // getBucketConfig retrieves bucket configuration with caching
- func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.ErrorCode) {
- // Check negative cache first
- if s3a.bucketConfigCache.IsNegativelyCached(bucket) {
- return nil, s3err.ErrNoSuchBucket
- }
- // Try positive cache
- if config, found := s3a.bucketConfigCache.Get(bucket); found {
- return config, s3err.ErrNone
- }
- // Try to get from filer
- entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
- if err != nil {
- if errors.Is(err, filer_pb.ErrNotFound) {
- // Bucket doesn't exist - set negative cache
- s3a.bucketConfigCache.SetNegativeCache(bucket)
- return nil, s3err.ErrNoSuchBucket
- }
- glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err)
- return nil, s3err.ErrInternalError
- }
- config := &BucketConfig{
- Name: bucket,
- Entry: entry,
- IsPublicRead: false, // Explicitly default to false for private buckets
- }
- // Extract configuration from extended attributes
- if entry.Extended != nil {
- if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
- config.Versioning = string(versioning)
- }
- if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists {
- config.Ownership = string(ownership)
- }
- if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists {
- config.ACL = acl
- // Parse ACL once and cache public-read status
- config.IsPublicRead = parseAndCachePublicReadStatus(acl)
- } else {
- // No ACL means private bucket
- config.IsPublicRead = false
- }
- if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
- config.Owner = string(owner)
- }
- // Parse Object Lock configuration if present
- if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found {
- config.ObjectLockConfig = objectLockConfig
- glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket)
- }
- }
- // Load CORS configuration from bucket directory content
- if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
- if errors.Is(err, filer_pb.ErrNotFound) {
- // Missing metadata is not an error; fall back cleanly
- glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket)
- } else {
- // Log parsing or validation errors
- glog.Errorf("Failed to load CORS configuration for bucket %s: %v", bucket, err)
- }
- } else {
- config.CORS = corsConfig
- }
- // Cache the result
- s3a.bucketConfigCache.Set(bucket, config)
- return config, s3err.ErrNone
- }
- // updateBucketConfig updates bucket configuration and invalidates cache
- func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketConfig) error) s3err.ErrorCode {
- config, errCode := s3a.getBucketConfig(bucket)
- if errCode != s3err.ErrNone {
- return errCode
- }
- // Apply update function
- if err := updateFn(config); err != nil {
- glog.Errorf("updateBucketConfig: update function failed for bucket %s: %v", bucket, err)
- return s3err.ErrInternalError
- }
- // Prepare extended attributes
- if config.Entry.Extended == nil {
- config.Entry.Extended = make(map[string][]byte)
- }
- // Update extended attributes
- if config.Versioning != "" {
- config.Entry.Extended[s3_constants.ExtVersioningKey] = []byte(config.Versioning)
- }
- if config.Ownership != "" {
- config.Entry.Extended[s3_constants.ExtOwnershipKey] = []byte(config.Ownership)
- }
- if config.ACL != nil {
- config.Entry.Extended[s3_constants.ExtAmzAclKey] = config.ACL
- }
- if config.Owner != "" {
- config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner)
- }
- // Update Object Lock configuration
- if config.ObjectLockConfig != nil {
- if err := StoreObjectLockConfigurationInExtended(config.Entry, config.ObjectLockConfig); err != nil {
- glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err)
- return s3err.ErrInternalError
- }
- }
- // Save to filer
- err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry)
- if err != nil {
- glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err)
- return s3err.ErrInternalError
- }
- // Update cache
- s3a.bucketConfigCache.Set(bucket, config)
- return s3err.ErrNone
- }
- // isVersioningEnabled checks if versioning is enabled for a bucket (with caching)
- func (s3a *S3ApiServer) isVersioningEnabled(bucket string) (bool, error) {
- config, errCode := s3a.getBucketConfig(bucket)
- if errCode != s3err.ErrNone {
- if errCode == s3err.ErrNoSuchBucket {
- return false, filer_pb.ErrNotFound
- }
- return false, fmt.Errorf("failed to get bucket config: %v", errCode)
- }
- // Versioning is enabled if explicitly set to "Enabled" OR if object lock is enabled
- // (since object lock requires versioning to be enabled)
- return config.Versioning == s3_constants.VersioningEnabled || config.ObjectLockConfig != nil, nil
- }
- // isVersioningConfigured checks if versioning has been configured (either Enabled or Suspended)
- func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) {
- config, errCode := s3a.getBucketConfig(bucket)
- if errCode != s3err.ErrNone {
- if errCode == s3err.ErrNoSuchBucket {
- return false, filer_pb.ErrNotFound
- }
- return false, fmt.Errorf("failed to get bucket config: %v", errCode)
- }
- // Versioning is configured if explicitly set to either "Enabled" or "Suspended"
- // OR if object lock is enabled (which forces versioning)
- return config.Versioning != "" || config.ObjectLockConfig != nil, nil
- }
- // getVersioningState returns the detailed versioning state for a bucket
- func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
- config, errCode := s3a.getBucketConfig(bucket)
- if errCode != s3err.ErrNone {
- if errCode == s3err.ErrNoSuchBucket {
- return "", nil
- }
- return "", fmt.Errorf("failed to get bucket config: %v", errCode)
- }
- // If object lock is enabled, versioning must be enabled regardless of explicit setting
- if config.ObjectLockConfig != nil {
- return s3_constants.VersioningEnabled, nil
- }
- // Return the explicit versioning status (empty string means never configured)
- return config.Versioning, nil
- }
- // getBucketVersioningStatus returns the versioning status for a bucket
- func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.ErrorCode) {
- config, errCode := s3a.getBucketConfig(bucket)
- if errCode != s3err.ErrNone {
- return "", errCode
- }
- // Return exactly what's stored - empty string means versioning was never configured
- // This matches AWS S3 behavior where new buckets have no Status field in GetBucketVersioning response
- return config.Versioning, s3err.ErrNone
- }
- // setBucketVersioningStatus sets the versioning status for a bucket
- func (s3a *S3ApiServer) setBucketVersioningStatus(bucket, status string) s3err.ErrorCode {
- return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
- config.Versioning = status
- return nil
- })
- }
- // getBucketOwnership returns the ownership setting for a bucket
- func (s3a *S3ApiServer) getBucketOwnership(bucket string) (string, s3err.ErrorCode) {
- config, errCode := s3a.getBucketConfig(bucket)
- if errCode != s3err.ErrNone {
- return "", errCode
- }
- return config.Ownership, s3err.ErrNone
- }
- // setBucketOwnership sets the ownership setting for a bucket
- func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.ErrorCode {
- return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
- config.Ownership = ownership
- return nil
- })
- }
- // loadCORSFromBucketContent loads CORS configuration from bucket directory content
- func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) {
- metadata, err := s3a.GetBucketMetadata(bucket)
- if err != nil {
- return nil, err
- }
- // Note: corsConfig can be nil if no CORS configuration is set, which is valid
- return metadata.CORS, nil
- }
- // getCORSConfiguration retrieves CORS configuration with caching
- func (s3a *S3ApiServer) getCORSConfiguration(bucket string) (*cors.CORSConfiguration, s3err.ErrorCode) {
- config, errCode := s3a.getBucketConfig(bucket)
- if errCode != s3err.ErrNone {
- return nil, errCode
- }
- return config.CORS, s3err.ErrNone
- }
- // updateCORSConfiguration updates the CORS configuration for a bucket
- func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors.CORSConfiguration) s3err.ErrorCode {
- // Update using structured API
- err := s3a.UpdateBucketCORS(bucket, corsConfig)
- if err != nil {
- glog.Errorf("updateCORSConfiguration: failed to update CORS config for bucket %s: %v", bucket, err)
- return s3err.ErrInternalError
- }
- // Cache will be updated automatically via metadata subscription
- return s3err.ErrNone
- }
- // removeCORSConfiguration removes the CORS configuration for a bucket
- func (s3a *S3ApiServer) removeCORSConfiguration(bucket string) s3err.ErrorCode {
- // Update using structured API
- err := s3a.ClearBucketCORS(bucket)
- if err != nil {
- glog.Errorf("removeCORSConfiguration: failed to remove CORS config for bucket %s: %v", bucket, err)
- return s3err.ErrInternalError
- }
- // Cache will be updated automatically via metadata subscription
- return s3err.ErrNone
- }
- // Conversion functions between CORS types and protobuf types
- // corsRuleToProto converts a CORS rule to protobuf format
- func corsRuleToProto(rule cors.CORSRule) *s3_pb.CORSRule {
- return &s3_pb.CORSRule{
- AllowedHeaders: rule.AllowedHeaders,
- AllowedMethods: rule.AllowedMethods,
- AllowedOrigins: rule.AllowedOrigins,
- ExposeHeaders: rule.ExposeHeaders,
- MaxAgeSeconds: int32(getMaxAgeSecondsValue(rule.MaxAgeSeconds)),
- Id: rule.ID,
- }
- }
- // corsRuleFromProto converts a protobuf CORS rule to standard format
- func corsRuleFromProto(protoRule *s3_pb.CORSRule) cors.CORSRule {
- var maxAge *int
- // Always create the pointer if MaxAgeSeconds is >= 0
- // This prevents nil pointer dereferences in tests and matches AWS behavior
- if protoRule.MaxAgeSeconds >= 0 {
- age := int(protoRule.MaxAgeSeconds)
- maxAge = &age
- }
- // Only leave maxAge as nil if MaxAgeSeconds was explicitly set to a negative value
- return cors.CORSRule{
- AllowedHeaders: protoRule.AllowedHeaders,
- AllowedMethods: protoRule.AllowedMethods,
- AllowedOrigins: protoRule.AllowedOrigins,
- ExposeHeaders: protoRule.ExposeHeaders,
- MaxAgeSeconds: maxAge,
- ID: protoRule.Id,
- }
- }
- // corsConfigToProto converts CORS configuration to protobuf format
- func corsConfigToProto(config *cors.CORSConfiguration) *s3_pb.CORSConfiguration {
- if config == nil {
- return nil
- }
- protoRules := make([]*s3_pb.CORSRule, len(config.CORSRules))
- for i, rule := range config.CORSRules {
- protoRules[i] = corsRuleToProto(rule)
- }
- return &s3_pb.CORSConfiguration{
- CorsRules: protoRules,
- }
- }
- // corsConfigFromProto converts protobuf CORS configuration to standard format
- func corsConfigFromProto(protoConfig *s3_pb.CORSConfiguration) *cors.CORSConfiguration {
- if protoConfig == nil {
- return nil
- }
- rules := make([]cors.CORSRule, len(protoConfig.CorsRules))
- for i, protoRule := range protoConfig.CorsRules {
- rules[i] = corsRuleFromProto(protoRule)
- }
- return &cors.CORSConfiguration{
- CORSRules: rules,
- }
- }
- // getMaxAgeSecondsValue safely extracts max age seconds value
- func getMaxAgeSecondsValue(maxAge *int) int {
- if maxAge == nil {
- return 0
- }
- return *maxAge
- }
- // parseAndCachePublicReadStatus parses the ACL and caches the public-read status
- func parseAndCachePublicReadStatus(acl []byte) bool {
- var grants []*s3.Grant
- if err := json.Unmarshal(acl, &grants); err != nil {
- return false
- }
- // Check if any grant gives read permission to "AllUsers" group
- for _, grant := range grants {
- if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil {
- // Check for AllUsers group with Read permission
- if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers &&
- (*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) {
- return true
- }
- }
- }
- return false
- }
- // getBucketMetadata retrieves bucket metadata as a structured object with caching
- func (s3a *S3ApiServer) getBucketMetadata(bucket string) (*BucketMetadata, error) {
- if s3a.bucketConfigCache != nil {
- // Check negative cache first
- if s3a.bucketConfigCache.IsNegativelyCached(bucket) {
- return nil, fmt.Errorf("bucket directory not found %s", bucket)
- }
- // Try to get from positive cache
- if config, found := s3a.bucketConfigCache.Get(bucket); found {
- // Extract metadata from cached config
- if metadata, err := s3a.extractMetadataFromConfig(config); err == nil {
- return metadata, nil
- }
- // If extraction fails, fall through to direct load
- }
- }
- // Load directly from filer
- return s3a.loadBucketMetadataFromFiler(bucket)
- }
- // extractMetadataFromConfig extracts BucketMetadata from cached BucketConfig
- func (s3a *S3ApiServer) extractMetadataFromConfig(config *BucketConfig) (*BucketMetadata, error) {
- if config == nil || config.Entry == nil {
- return NewBucketMetadata(), nil
- }
- // Parse metadata from entry content if available
- if len(config.Entry.Content) > 0 {
- var protoMetadata s3_pb.BucketMetadata
- if err := proto.Unmarshal(config.Entry.Content, &protoMetadata); err != nil {
- glog.Errorf("extractMetadataFromConfig: failed to unmarshal protobuf metadata for bucket %s: %v", config.Name, err)
- return nil, err
- }
- // Convert protobuf to structured metadata
- metadata := &BucketMetadata{
- Tags: protoMetadata.Tags,
- CORS: corsConfigFromProto(protoMetadata.Cors),
- Encryption: protoMetadata.Encryption,
- }
- return metadata, nil
- }
- // Fallback: create metadata from cached CORS config
- metadata := NewBucketMetadata()
- if config.CORS != nil {
- metadata.CORS = config.CORS
- }
- return metadata, nil
- }
- // loadBucketMetadataFromFiler loads bucket metadata directly from the filer
- func (s3a *S3ApiServer) loadBucketMetadataFromFiler(bucket string) (*BucketMetadata, error) {
- // Validate bucket name to prevent path traversal attacks
- if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
- strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
- return nil, fmt.Errorf("invalid bucket name: %s", bucket)
- }
- // Clean the bucket name further to prevent any potential path traversal
- bucket = filepath.Clean(bucket)
- if bucket == "." || bucket == ".." {
- return nil, fmt.Errorf("invalid bucket name: %s", bucket)
- }
- // Get bucket directory entry to access its content
- entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
- if err != nil {
- // Check if this is a "not found" error
- if errors.Is(err, filer_pb.ErrNotFound) {
- // Set negative cache for non-existent bucket
- if s3a.bucketConfigCache != nil {
- s3a.bucketConfigCache.SetNegativeCache(bucket)
- }
- }
- return nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
- }
- if entry == nil {
- // Set negative cache for non-existent bucket
- if s3a.bucketConfigCache != nil {
- s3a.bucketConfigCache.SetNegativeCache(bucket)
- }
- return nil, fmt.Errorf("bucket directory not found %s", bucket)
- }
- // If no content, return empty metadata
- if len(entry.Content) == 0 {
- return NewBucketMetadata(), nil
- }
- // Unmarshal metadata from protobuf
- var protoMetadata s3_pb.BucketMetadata
- if err := proto.Unmarshal(entry.Content, &protoMetadata); err != nil {
- glog.Errorf("getBucketMetadata: failed to unmarshal protobuf metadata for bucket %s: %v", bucket, err)
- return nil, fmt.Errorf("failed to unmarshal bucket metadata for %s: %w", bucket, err)
- }
- // Convert protobuf CORS to standard CORS
- corsConfig := corsConfigFromProto(protoMetadata.Cors)
- // Create and return structured metadata
- metadata := &BucketMetadata{
- Tags: protoMetadata.Tags,
- CORS: corsConfig,
- Encryption: protoMetadata.Encryption,
- }
- return metadata, nil
- }
- // setBucketMetadata stores bucket metadata from a structured object
- func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadata) error {
- // Validate bucket name to prevent path traversal attacks
- if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
- strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
- return fmt.Errorf("invalid bucket name: %s", bucket)
- }
- // Clean the bucket name further to prevent any potential path traversal
- bucket = filepath.Clean(bucket)
- if bucket == "." || bucket == ".." {
- return fmt.Errorf("invalid bucket name: %s", bucket)
- }
- // Default to empty metadata if nil
- if metadata == nil {
- metadata = NewBucketMetadata()
- }
- // Create protobuf metadata
- protoMetadata := &s3_pb.BucketMetadata{
- Tags: metadata.Tags,
- Cors: corsConfigToProto(metadata.CORS),
- Encryption: metadata.Encryption,
- }
- // Marshal metadata to protobuf
- metadataBytes, err := proto.Marshal(protoMetadata)
- if err != nil {
- return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err)
- }
- // Update the bucket entry with new content
- err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- // Get current bucket entry
- entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
- if err != nil {
- return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
- }
- if entry == nil {
- return fmt.Errorf("bucket directory not found %s", bucket)
- }
- // Update content with metadata
- entry.Content = metadataBytes
- request := &filer_pb.UpdateEntryRequest{
- Directory: s3a.option.BucketsPath,
- Entry: entry,
- }
- _, err = client.UpdateEntry(context.Background(), request)
- return err
- })
- // Invalidate cache after successful update
- if err == nil && s3a.bucketConfigCache != nil {
- s3a.bucketConfigCache.Remove(bucket)
- s3a.bucketConfigCache.RemoveNegativeCache(bucket) // Remove from negative cache too
- }
- return err
- }
- // New structured API functions using BucketMetadata
- // GetBucketMetadata retrieves complete bucket metadata as a structured object
- func (s3a *S3ApiServer) GetBucketMetadata(bucket string) (*BucketMetadata, error) {
- return s3a.getBucketMetadata(bucket)
- }
- // SetBucketMetadata stores complete bucket metadata from a structured object
- func (s3a *S3ApiServer) SetBucketMetadata(bucket string, metadata *BucketMetadata) error {
- return s3a.setBucketMetadata(bucket, metadata)
- }
- // UpdateBucketMetadata updates specific parts of bucket metadata while preserving others
- //
- // DISTRIBUTED SYSTEM DESIGN NOTE:
- // This function implements a read-modify-write pattern with "last write wins" semantics.
- // In the rare case of concurrent updates to different parts of bucket metadata
- // (e.g., simultaneous tag and CORS updates), the last write may overwrite previous changes.
- //
- // This is an acceptable trade-off because:
- // 1. Bucket metadata updates are infrequent in typical S3 usage
- // 2. Traditional locking doesn't work in distributed systems across multiple nodes
- // 3. The complexity of distributed consensus (e.g., Raft) for metadata updates would
- // be disproportionate to the low frequency of bucket configuration changes
- // 4. Most bucket operations (tags, CORS, encryption) are typically configured once
- // during setup rather than being frequently modified
- //
- // If stronger consistency is required, consider implementing optimistic concurrency
- // control with version numbers or ETags at the storage layer.
- func (s3a *S3ApiServer) UpdateBucketMetadata(bucket string, update func(*BucketMetadata) error) error {
- // Get current metadata
- metadata, err := s3a.GetBucketMetadata(bucket)
- if err != nil {
- return fmt.Errorf("failed to get current bucket metadata: %w", err)
- }
- // Apply update function
- if err := update(metadata); err != nil {
- return fmt.Errorf("failed to apply metadata update: %w", err)
- }
- // Store updated metadata (last write wins)
- return s3a.SetBucketMetadata(bucket, metadata)
- }
- // Helper functions for specific metadata operations using structured API
- // UpdateBucketTags sets bucket tags using the structured API
- func (s3a *S3ApiServer) UpdateBucketTags(bucket string, tags map[string]string) error {
- return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
- metadata.Tags = tags
- return nil
- })
- }
- // UpdateBucketCORS sets bucket CORS configuration using the structured API
- func (s3a *S3ApiServer) UpdateBucketCORS(bucket string, corsConfig *cors.CORSConfiguration) error {
- return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
- metadata.CORS = corsConfig
- return nil
- })
- }
- // UpdateBucketEncryption sets bucket encryption configuration using the structured API
- func (s3a *S3ApiServer) UpdateBucketEncryption(bucket string, encryptionConfig *s3_pb.EncryptionConfiguration) error {
- return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
- metadata.Encryption = encryptionConfig
- return nil
- })
- }
- // ClearBucketTags removes all bucket tags using the structured API
- func (s3a *S3ApiServer) ClearBucketTags(bucket string) error {
- return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
- metadata.Tags = make(map[string]string)
- return nil
- })
- }
- // ClearBucketCORS removes bucket CORS configuration using the structured API
- func (s3a *S3ApiServer) ClearBucketCORS(bucket string) error {
- return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
- metadata.CORS = nil
- return nil
- })
- }
- // ClearBucketEncryption removes bucket encryption configuration using the structured API
- func (s3a *S3ApiServer) ClearBucketEncryption(bucket string) error {
- return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
- metadata.Encryption = nil
- return nil
- })
- }
|