s3api_bucket_config.go 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947
  1. package s3api
  2. import (
  3. "context"
  4. "encoding/json"
  5. "errors"
  6. "fmt"
  7. "path/filepath"
  8. "strings"
  9. "sync"
  10. "time"
  11. "github.com/aws/aws-sdk-go/service/s3"
  12. "google.golang.org/protobuf/proto"
  13. "github.com/seaweedfs/seaweedfs/weed/glog"
  14. "github.com/seaweedfs/seaweedfs/weed/kms"
  15. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  16. "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
  17. "github.com/seaweedfs/seaweedfs/weed/s3api/cors"
  18. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  19. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  20. )
  21. // BucketConfig represents cached bucket configuration
  22. type BucketConfig struct {
  23. Name string
  24. Versioning string // "Enabled", "Suspended", or ""
  25. Ownership string
  26. ACL []byte
  27. Owner string
  28. IsPublicRead bool // Cached flag to avoid JSON parsing on every request
  29. CORS *cors.CORSConfiguration
  30. ObjectLockConfig *ObjectLockConfiguration // Cached parsed Object Lock configuration
  31. KMSKeyCache *BucketKMSCache // Per-bucket KMS key cache for SSE-KMS operations
  32. LastModified time.Time
  33. Entry *filer_pb.Entry
  34. }
  35. // BucketKMSCache represents per-bucket KMS key caching for SSE-KMS operations
  36. // This provides better isolation and automatic cleanup compared to global caching
  37. type BucketKMSCache struct {
  38. cache map[string]*BucketKMSCacheEntry // Key: contextHash, Value: cached data key
  39. mutex sync.RWMutex
  40. bucket string // Bucket name for logging/debugging
  41. lastTTL time.Duration // TTL used for cache entries (typically 1 hour)
  42. }
  43. // BucketKMSCacheEntry represents a single cached KMS data key
  44. type BucketKMSCacheEntry struct {
  45. DataKey interface{} // Could be *kms.GenerateDataKeyResponse or similar
  46. ExpiresAt time.Time
  47. KeyID string
  48. ContextHash string // Hash of encryption context for cache validation
  49. }
  50. // NewBucketKMSCache creates a new per-bucket KMS key cache
  51. func NewBucketKMSCache(bucketName string, ttl time.Duration) *BucketKMSCache {
  52. return &BucketKMSCache{
  53. cache: make(map[string]*BucketKMSCacheEntry),
  54. bucket: bucketName,
  55. lastTTL: ttl,
  56. }
  57. }
  58. // Get retrieves a cached KMS data key if it exists and hasn't expired
  59. func (bkc *BucketKMSCache) Get(contextHash string) (*BucketKMSCacheEntry, bool) {
  60. if bkc == nil {
  61. return nil, false
  62. }
  63. bkc.mutex.RLock()
  64. defer bkc.mutex.RUnlock()
  65. entry, exists := bkc.cache[contextHash]
  66. if !exists {
  67. return nil, false
  68. }
  69. // Check if entry has expired
  70. if time.Now().After(entry.ExpiresAt) {
  71. return nil, false
  72. }
  73. return entry, true
  74. }
  75. // Set stores a KMS data key in the cache
  76. func (bkc *BucketKMSCache) Set(contextHash, keyID string, dataKey interface{}, ttl time.Duration) {
  77. if bkc == nil {
  78. return
  79. }
  80. bkc.mutex.Lock()
  81. defer bkc.mutex.Unlock()
  82. bkc.cache[contextHash] = &BucketKMSCacheEntry{
  83. DataKey: dataKey,
  84. ExpiresAt: time.Now().Add(ttl),
  85. KeyID: keyID,
  86. ContextHash: contextHash,
  87. }
  88. bkc.lastTTL = ttl
  89. }
  90. // CleanupExpired removes expired entries from the cache
  91. func (bkc *BucketKMSCache) CleanupExpired() int {
  92. if bkc == nil {
  93. return 0
  94. }
  95. bkc.mutex.Lock()
  96. defer bkc.mutex.Unlock()
  97. now := time.Now()
  98. expiredCount := 0
  99. for key, entry := range bkc.cache {
  100. if now.After(entry.ExpiresAt) {
  101. // Clear sensitive data before removing from cache
  102. bkc.clearSensitiveData(entry)
  103. delete(bkc.cache, key)
  104. expiredCount++
  105. }
  106. }
  107. return expiredCount
  108. }
  109. // Size returns the current number of cached entries
  110. func (bkc *BucketKMSCache) Size() int {
  111. if bkc == nil {
  112. return 0
  113. }
  114. bkc.mutex.RLock()
  115. defer bkc.mutex.RUnlock()
  116. return len(bkc.cache)
  117. }
  118. // clearSensitiveData securely clears sensitive data from a cache entry
  119. func (bkc *BucketKMSCache) clearSensitiveData(entry *BucketKMSCacheEntry) {
  120. if dataKeyResp, ok := entry.DataKey.(*kms.GenerateDataKeyResponse); ok {
  121. // Zero out the plaintext data key to prevent it from lingering in memory
  122. if dataKeyResp.Plaintext != nil {
  123. for i := range dataKeyResp.Plaintext {
  124. dataKeyResp.Plaintext[i] = 0
  125. }
  126. dataKeyResp.Plaintext = nil
  127. }
  128. }
  129. }
  130. // Clear clears all cached KMS entries, securely zeroing sensitive data first
  131. func (bkc *BucketKMSCache) Clear() {
  132. if bkc == nil {
  133. return
  134. }
  135. bkc.mutex.Lock()
  136. defer bkc.mutex.Unlock()
  137. // Clear sensitive data from all entries before deletion
  138. for _, entry := range bkc.cache {
  139. bkc.clearSensitiveData(entry)
  140. }
  141. // Clear the cache map
  142. bkc.cache = make(map[string]*BucketKMSCacheEntry)
  143. }
  144. // BucketConfigCache provides caching for bucket configurations
  145. // Cache entries are automatically updated/invalidated through metadata subscription events,
  146. // so TTL serves as a safety fallback rather than the primary consistency mechanism
  147. type BucketConfigCache struct {
  148. cache map[string]*BucketConfig
  149. negativeCache map[string]time.Time // Cache for non-existent buckets
  150. mutex sync.RWMutex
  151. ttl time.Duration // Safety fallback TTL; real-time consistency maintained via events
  152. negativeTTL time.Duration // TTL for negative cache entries
  153. }
  154. // BucketMetadata represents the complete metadata for a bucket
  155. type BucketMetadata struct {
  156. Tags map[string]string `json:"tags,omitempty"`
  157. CORS *cors.CORSConfiguration `json:"cors,omitempty"`
  158. Encryption *s3_pb.EncryptionConfiguration `json:"encryption,omitempty"`
  159. // Future extensions can be added here:
  160. // Versioning *s3_pb.VersioningConfiguration `json:"versioning,omitempty"`
  161. // Lifecycle *s3_pb.LifecycleConfiguration `json:"lifecycle,omitempty"`
  162. // Notification *s3_pb.NotificationConfiguration `json:"notification,omitempty"`
  163. // Replication *s3_pb.ReplicationConfiguration `json:"replication,omitempty"`
  164. // Analytics *s3_pb.AnalyticsConfiguration `json:"analytics,omitempty"`
  165. // Logging *s3_pb.LoggingConfiguration `json:"logging,omitempty"`
  166. // Website *s3_pb.WebsiteConfiguration `json:"website,omitempty"`
  167. // RequestPayer *s3_pb.RequestPayerConfiguration `json:"requestPayer,omitempty"`
  168. // PublicAccess *s3_pb.PublicAccessConfiguration `json:"publicAccess,omitempty"`
  169. }
  170. // NewBucketMetadata creates a new BucketMetadata with default values
  171. func NewBucketMetadata() *BucketMetadata {
  172. return &BucketMetadata{
  173. Tags: make(map[string]string),
  174. }
  175. }
  176. // IsEmpty returns true if the metadata has no configuration set
  177. func (bm *BucketMetadata) IsEmpty() bool {
  178. return len(bm.Tags) == 0 && bm.CORS == nil && bm.Encryption == nil
  179. }
  180. // HasEncryption returns true if bucket has encryption configuration
  181. func (bm *BucketMetadata) HasEncryption() bool {
  182. return bm.Encryption != nil
  183. }
  184. // HasCORS returns true if bucket has CORS configuration
  185. func (bm *BucketMetadata) HasCORS() bool {
  186. return bm.CORS != nil
  187. }
  188. // HasTags returns true if bucket has tags
  189. func (bm *BucketMetadata) HasTags() bool {
  190. return len(bm.Tags) > 0
  191. }
  192. // NewBucketConfigCache creates a new bucket configuration cache
  193. // TTL can be set to a longer duration since cache consistency is maintained
  194. // through real-time metadata subscription events rather than TTL expiration
  195. func NewBucketConfigCache(ttl time.Duration) *BucketConfigCache {
  196. negativeTTL := ttl / 4 // Negative cache TTL is shorter than positive cache
  197. if negativeTTL < 30*time.Second {
  198. negativeTTL = 30 * time.Second // Minimum 30 seconds for negative cache
  199. }
  200. return &BucketConfigCache{
  201. cache: make(map[string]*BucketConfig),
  202. negativeCache: make(map[string]time.Time),
  203. ttl: ttl,
  204. negativeTTL: negativeTTL,
  205. }
  206. }
  207. // Get retrieves bucket configuration from cache
  208. func (bcc *BucketConfigCache) Get(bucket string) (*BucketConfig, bool) {
  209. bcc.mutex.RLock()
  210. defer bcc.mutex.RUnlock()
  211. config, exists := bcc.cache[bucket]
  212. if !exists {
  213. return nil, false
  214. }
  215. // Check if cache entry is expired (safety fallback; entries are normally updated via events)
  216. if time.Since(config.LastModified) > bcc.ttl {
  217. return nil, false
  218. }
  219. return config, true
  220. }
  221. // Set stores bucket configuration in cache
  222. func (bcc *BucketConfigCache) Set(bucket string, config *BucketConfig) {
  223. bcc.mutex.Lock()
  224. defer bcc.mutex.Unlock()
  225. config.LastModified = time.Now()
  226. bcc.cache[bucket] = config
  227. }
  228. // Remove removes bucket configuration from cache
  229. func (bcc *BucketConfigCache) Remove(bucket string) {
  230. bcc.mutex.Lock()
  231. defer bcc.mutex.Unlock()
  232. delete(bcc.cache, bucket)
  233. }
  234. // Clear clears all cached configurations
  235. func (bcc *BucketConfigCache) Clear() {
  236. bcc.mutex.Lock()
  237. defer bcc.mutex.Unlock()
  238. bcc.cache = make(map[string]*BucketConfig)
  239. bcc.negativeCache = make(map[string]time.Time)
  240. }
  241. // IsNegativelyCached checks if a bucket is in the negative cache (doesn't exist)
  242. func (bcc *BucketConfigCache) IsNegativelyCached(bucket string) bool {
  243. bcc.mutex.RLock()
  244. defer bcc.mutex.RUnlock()
  245. if cachedTime, exists := bcc.negativeCache[bucket]; exists {
  246. // Check if the negative cache entry is still valid
  247. if time.Since(cachedTime) < bcc.negativeTTL {
  248. return true
  249. }
  250. // Entry expired, remove it
  251. delete(bcc.negativeCache, bucket)
  252. }
  253. return false
  254. }
  255. // SetNegativeCache marks a bucket as non-existent in the negative cache
  256. func (bcc *BucketConfigCache) SetNegativeCache(bucket string) {
  257. bcc.mutex.Lock()
  258. defer bcc.mutex.Unlock()
  259. bcc.negativeCache[bucket] = time.Now()
  260. }
  261. // RemoveNegativeCache removes a bucket from the negative cache
  262. func (bcc *BucketConfigCache) RemoveNegativeCache(bucket string) {
  263. bcc.mutex.Lock()
  264. defer bcc.mutex.Unlock()
  265. delete(bcc.negativeCache, bucket)
  266. }
  267. // getBucketConfig retrieves bucket configuration with caching
  268. func (s3a *S3ApiServer) getBucketConfig(bucket string) (*BucketConfig, s3err.ErrorCode) {
  269. // Check negative cache first
  270. if s3a.bucketConfigCache.IsNegativelyCached(bucket) {
  271. return nil, s3err.ErrNoSuchBucket
  272. }
  273. // Try positive cache
  274. if config, found := s3a.bucketConfigCache.Get(bucket); found {
  275. return config, s3err.ErrNone
  276. }
  277. // Try to get from filer
  278. entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
  279. if err != nil {
  280. if errors.Is(err, filer_pb.ErrNotFound) {
  281. // Bucket doesn't exist - set negative cache
  282. s3a.bucketConfigCache.SetNegativeCache(bucket)
  283. return nil, s3err.ErrNoSuchBucket
  284. }
  285. glog.Errorf("getBucketConfig: failed to get bucket entry for %s: %v", bucket, err)
  286. return nil, s3err.ErrInternalError
  287. }
  288. config := &BucketConfig{
  289. Name: bucket,
  290. Entry: entry,
  291. IsPublicRead: false, // Explicitly default to false for private buckets
  292. }
  293. // Extract configuration from extended attributes
  294. if entry.Extended != nil {
  295. if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
  296. config.Versioning = string(versioning)
  297. }
  298. if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists {
  299. config.Ownership = string(ownership)
  300. }
  301. if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists {
  302. config.ACL = acl
  303. // Parse ACL once and cache public-read status
  304. config.IsPublicRead = parseAndCachePublicReadStatus(acl)
  305. } else {
  306. // No ACL means private bucket
  307. config.IsPublicRead = false
  308. }
  309. if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
  310. config.Owner = string(owner)
  311. }
  312. // Parse Object Lock configuration if present
  313. if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found {
  314. config.ObjectLockConfig = objectLockConfig
  315. glog.V(2).Infof("getBucketConfig: cached Object Lock configuration for bucket %s", bucket)
  316. }
  317. }
  318. // Load CORS configuration from bucket directory content
  319. if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
  320. if errors.Is(err, filer_pb.ErrNotFound) {
  321. // Missing metadata is not an error; fall back cleanly
  322. glog.V(2).Infof("CORS metadata not found for bucket %s, falling back to default behavior", bucket)
  323. } else {
  324. // Log parsing or validation errors
  325. glog.Errorf("Failed to load CORS configuration for bucket %s: %v", bucket, err)
  326. }
  327. } else {
  328. config.CORS = corsConfig
  329. }
  330. // Cache the result
  331. s3a.bucketConfigCache.Set(bucket, config)
  332. return config, s3err.ErrNone
  333. }
  334. // updateBucketConfig updates bucket configuration and invalidates cache
  335. func (s3a *S3ApiServer) updateBucketConfig(bucket string, updateFn func(*BucketConfig) error) s3err.ErrorCode {
  336. config, errCode := s3a.getBucketConfig(bucket)
  337. if errCode != s3err.ErrNone {
  338. return errCode
  339. }
  340. // Apply update function
  341. if err := updateFn(config); err != nil {
  342. glog.Errorf("updateBucketConfig: update function failed for bucket %s: %v", bucket, err)
  343. return s3err.ErrInternalError
  344. }
  345. // Prepare extended attributes
  346. if config.Entry.Extended == nil {
  347. config.Entry.Extended = make(map[string][]byte)
  348. }
  349. // Update extended attributes
  350. if config.Versioning != "" {
  351. config.Entry.Extended[s3_constants.ExtVersioningKey] = []byte(config.Versioning)
  352. }
  353. if config.Ownership != "" {
  354. config.Entry.Extended[s3_constants.ExtOwnershipKey] = []byte(config.Ownership)
  355. }
  356. if config.ACL != nil {
  357. config.Entry.Extended[s3_constants.ExtAmzAclKey] = config.ACL
  358. }
  359. if config.Owner != "" {
  360. config.Entry.Extended[s3_constants.ExtAmzOwnerKey] = []byte(config.Owner)
  361. }
  362. // Update Object Lock configuration
  363. if config.ObjectLockConfig != nil {
  364. if err := StoreObjectLockConfigurationInExtended(config.Entry, config.ObjectLockConfig); err != nil {
  365. glog.Errorf("updateBucketConfig: failed to store Object Lock configuration for bucket %s: %v", bucket, err)
  366. return s3err.ErrInternalError
  367. }
  368. }
  369. // Save to filer
  370. err := s3a.updateEntry(s3a.option.BucketsPath, config.Entry)
  371. if err != nil {
  372. glog.Errorf("updateBucketConfig: failed to update bucket entry for %s: %v", bucket, err)
  373. return s3err.ErrInternalError
  374. }
  375. // Update cache
  376. s3a.bucketConfigCache.Set(bucket, config)
  377. return s3err.ErrNone
  378. }
  379. // isVersioningEnabled checks if versioning is enabled for a bucket (with caching)
  380. func (s3a *S3ApiServer) isVersioningEnabled(bucket string) (bool, error) {
  381. config, errCode := s3a.getBucketConfig(bucket)
  382. if errCode != s3err.ErrNone {
  383. if errCode == s3err.ErrNoSuchBucket {
  384. return false, filer_pb.ErrNotFound
  385. }
  386. return false, fmt.Errorf("failed to get bucket config: %v", errCode)
  387. }
  388. // Versioning is enabled if explicitly set to "Enabled" OR if object lock is enabled
  389. // (since object lock requires versioning to be enabled)
  390. return config.Versioning == s3_constants.VersioningEnabled || config.ObjectLockConfig != nil, nil
  391. }
  392. // isVersioningConfigured checks if versioning has been configured (either Enabled or Suspended)
  393. func (s3a *S3ApiServer) isVersioningConfigured(bucket string) (bool, error) {
  394. config, errCode := s3a.getBucketConfig(bucket)
  395. if errCode != s3err.ErrNone {
  396. if errCode == s3err.ErrNoSuchBucket {
  397. return false, filer_pb.ErrNotFound
  398. }
  399. return false, fmt.Errorf("failed to get bucket config: %v", errCode)
  400. }
  401. // Versioning is configured if explicitly set to either "Enabled" or "Suspended"
  402. // OR if object lock is enabled (which forces versioning)
  403. return config.Versioning != "" || config.ObjectLockConfig != nil, nil
  404. }
  405. // getVersioningState returns the detailed versioning state for a bucket
  406. func (s3a *S3ApiServer) getVersioningState(bucket string) (string, error) {
  407. config, errCode := s3a.getBucketConfig(bucket)
  408. if errCode != s3err.ErrNone {
  409. if errCode == s3err.ErrNoSuchBucket {
  410. return "", nil
  411. }
  412. return "", fmt.Errorf("failed to get bucket config: %v", errCode)
  413. }
  414. // If object lock is enabled, versioning must be enabled regardless of explicit setting
  415. if config.ObjectLockConfig != nil {
  416. return s3_constants.VersioningEnabled, nil
  417. }
  418. // Return the explicit versioning status (empty string means never configured)
  419. return config.Versioning, nil
  420. }
  421. // getBucketVersioningStatus returns the versioning status for a bucket
  422. func (s3a *S3ApiServer) getBucketVersioningStatus(bucket string) (string, s3err.ErrorCode) {
  423. config, errCode := s3a.getBucketConfig(bucket)
  424. if errCode != s3err.ErrNone {
  425. return "", errCode
  426. }
  427. // Return exactly what's stored - empty string means versioning was never configured
  428. // This matches AWS S3 behavior where new buckets have no Status field in GetBucketVersioning response
  429. return config.Versioning, s3err.ErrNone
  430. }
  431. // setBucketVersioningStatus sets the versioning status for a bucket
  432. func (s3a *S3ApiServer) setBucketVersioningStatus(bucket, status string) s3err.ErrorCode {
  433. return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
  434. config.Versioning = status
  435. return nil
  436. })
  437. }
  438. // getBucketOwnership returns the ownership setting for a bucket
  439. func (s3a *S3ApiServer) getBucketOwnership(bucket string) (string, s3err.ErrorCode) {
  440. config, errCode := s3a.getBucketConfig(bucket)
  441. if errCode != s3err.ErrNone {
  442. return "", errCode
  443. }
  444. return config.Ownership, s3err.ErrNone
  445. }
  446. // setBucketOwnership sets the ownership setting for a bucket
  447. func (s3a *S3ApiServer) setBucketOwnership(bucket, ownership string) s3err.ErrorCode {
  448. return s3a.updateBucketConfig(bucket, func(config *BucketConfig) error {
  449. config.Ownership = ownership
  450. return nil
  451. })
  452. }
  453. // loadCORSFromBucketContent loads CORS configuration from bucket directory content
  454. func (s3a *S3ApiServer) loadCORSFromBucketContent(bucket string) (*cors.CORSConfiguration, error) {
  455. metadata, err := s3a.GetBucketMetadata(bucket)
  456. if err != nil {
  457. return nil, err
  458. }
  459. // Note: corsConfig can be nil if no CORS configuration is set, which is valid
  460. return metadata.CORS, nil
  461. }
  462. // getCORSConfiguration retrieves CORS configuration with caching
  463. func (s3a *S3ApiServer) getCORSConfiguration(bucket string) (*cors.CORSConfiguration, s3err.ErrorCode) {
  464. config, errCode := s3a.getBucketConfig(bucket)
  465. if errCode != s3err.ErrNone {
  466. return nil, errCode
  467. }
  468. return config.CORS, s3err.ErrNone
  469. }
  470. // updateCORSConfiguration updates the CORS configuration for a bucket
  471. func (s3a *S3ApiServer) updateCORSConfiguration(bucket string, corsConfig *cors.CORSConfiguration) s3err.ErrorCode {
  472. // Update using structured API
  473. err := s3a.UpdateBucketCORS(bucket, corsConfig)
  474. if err != nil {
  475. glog.Errorf("updateCORSConfiguration: failed to update CORS config for bucket %s: %v", bucket, err)
  476. return s3err.ErrInternalError
  477. }
  478. // Cache will be updated automatically via metadata subscription
  479. return s3err.ErrNone
  480. }
  481. // removeCORSConfiguration removes the CORS configuration for a bucket
  482. func (s3a *S3ApiServer) removeCORSConfiguration(bucket string) s3err.ErrorCode {
  483. // Update using structured API
  484. err := s3a.ClearBucketCORS(bucket)
  485. if err != nil {
  486. glog.Errorf("removeCORSConfiguration: failed to remove CORS config for bucket %s: %v", bucket, err)
  487. return s3err.ErrInternalError
  488. }
  489. // Cache will be updated automatically via metadata subscription
  490. return s3err.ErrNone
  491. }
  492. // Conversion functions between CORS types and protobuf types
  493. // corsRuleToProto converts a CORS rule to protobuf format
  494. func corsRuleToProto(rule cors.CORSRule) *s3_pb.CORSRule {
  495. return &s3_pb.CORSRule{
  496. AllowedHeaders: rule.AllowedHeaders,
  497. AllowedMethods: rule.AllowedMethods,
  498. AllowedOrigins: rule.AllowedOrigins,
  499. ExposeHeaders: rule.ExposeHeaders,
  500. MaxAgeSeconds: int32(getMaxAgeSecondsValue(rule.MaxAgeSeconds)),
  501. Id: rule.ID,
  502. }
  503. }
  504. // corsRuleFromProto converts a protobuf CORS rule to standard format
  505. func corsRuleFromProto(protoRule *s3_pb.CORSRule) cors.CORSRule {
  506. var maxAge *int
  507. // Always create the pointer if MaxAgeSeconds is >= 0
  508. // This prevents nil pointer dereferences in tests and matches AWS behavior
  509. if protoRule.MaxAgeSeconds >= 0 {
  510. age := int(protoRule.MaxAgeSeconds)
  511. maxAge = &age
  512. }
  513. // Only leave maxAge as nil if MaxAgeSeconds was explicitly set to a negative value
  514. return cors.CORSRule{
  515. AllowedHeaders: protoRule.AllowedHeaders,
  516. AllowedMethods: protoRule.AllowedMethods,
  517. AllowedOrigins: protoRule.AllowedOrigins,
  518. ExposeHeaders: protoRule.ExposeHeaders,
  519. MaxAgeSeconds: maxAge,
  520. ID: protoRule.Id,
  521. }
  522. }
  523. // corsConfigToProto converts CORS configuration to protobuf format
  524. func corsConfigToProto(config *cors.CORSConfiguration) *s3_pb.CORSConfiguration {
  525. if config == nil {
  526. return nil
  527. }
  528. protoRules := make([]*s3_pb.CORSRule, len(config.CORSRules))
  529. for i, rule := range config.CORSRules {
  530. protoRules[i] = corsRuleToProto(rule)
  531. }
  532. return &s3_pb.CORSConfiguration{
  533. CorsRules: protoRules,
  534. }
  535. }
  536. // corsConfigFromProto converts protobuf CORS configuration to standard format
  537. func corsConfigFromProto(protoConfig *s3_pb.CORSConfiguration) *cors.CORSConfiguration {
  538. if protoConfig == nil {
  539. return nil
  540. }
  541. rules := make([]cors.CORSRule, len(protoConfig.CorsRules))
  542. for i, protoRule := range protoConfig.CorsRules {
  543. rules[i] = corsRuleFromProto(protoRule)
  544. }
  545. return &cors.CORSConfiguration{
  546. CORSRules: rules,
  547. }
  548. }
  549. // getMaxAgeSecondsValue safely extracts max age seconds value
  550. func getMaxAgeSecondsValue(maxAge *int) int {
  551. if maxAge == nil {
  552. return 0
  553. }
  554. return *maxAge
  555. }
  556. // parseAndCachePublicReadStatus parses the ACL and caches the public-read status
  557. func parseAndCachePublicReadStatus(acl []byte) bool {
  558. var grants []*s3.Grant
  559. if err := json.Unmarshal(acl, &grants); err != nil {
  560. return false
  561. }
  562. // Check if any grant gives read permission to "AllUsers" group
  563. for _, grant := range grants {
  564. if grant.Grantee != nil && grant.Grantee.URI != nil && grant.Permission != nil {
  565. // Check for AllUsers group with Read permission
  566. if *grant.Grantee.URI == s3_constants.GranteeGroupAllUsers &&
  567. (*grant.Permission == s3_constants.PermissionRead || *grant.Permission == s3_constants.PermissionFullControl) {
  568. return true
  569. }
  570. }
  571. }
  572. return false
  573. }
  574. // getBucketMetadata retrieves bucket metadata as a structured object with caching
  575. func (s3a *S3ApiServer) getBucketMetadata(bucket string) (*BucketMetadata, error) {
  576. if s3a.bucketConfigCache != nil {
  577. // Check negative cache first
  578. if s3a.bucketConfigCache.IsNegativelyCached(bucket) {
  579. return nil, fmt.Errorf("bucket directory not found %s", bucket)
  580. }
  581. // Try to get from positive cache
  582. if config, found := s3a.bucketConfigCache.Get(bucket); found {
  583. // Extract metadata from cached config
  584. if metadata, err := s3a.extractMetadataFromConfig(config); err == nil {
  585. return metadata, nil
  586. }
  587. // If extraction fails, fall through to direct load
  588. }
  589. }
  590. // Load directly from filer
  591. return s3a.loadBucketMetadataFromFiler(bucket)
  592. }
  593. // extractMetadataFromConfig extracts BucketMetadata from cached BucketConfig
  594. func (s3a *S3ApiServer) extractMetadataFromConfig(config *BucketConfig) (*BucketMetadata, error) {
  595. if config == nil || config.Entry == nil {
  596. return NewBucketMetadata(), nil
  597. }
  598. // Parse metadata from entry content if available
  599. if len(config.Entry.Content) > 0 {
  600. var protoMetadata s3_pb.BucketMetadata
  601. if err := proto.Unmarshal(config.Entry.Content, &protoMetadata); err != nil {
  602. glog.Errorf("extractMetadataFromConfig: failed to unmarshal protobuf metadata for bucket %s: %v", config.Name, err)
  603. return nil, err
  604. }
  605. // Convert protobuf to structured metadata
  606. metadata := &BucketMetadata{
  607. Tags: protoMetadata.Tags,
  608. CORS: corsConfigFromProto(protoMetadata.Cors),
  609. Encryption: protoMetadata.Encryption,
  610. }
  611. return metadata, nil
  612. }
  613. // Fallback: create metadata from cached CORS config
  614. metadata := NewBucketMetadata()
  615. if config.CORS != nil {
  616. metadata.CORS = config.CORS
  617. }
  618. return metadata, nil
  619. }
  620. // loadBucketMetadataFromFiler loads bucket metadata directly from the filer
  621. func (s3a *S3ApiServer) loadBucketMetadataFromFiler(bucket string) (*BucketMetadata, error) {
  622. // Validate bucket name to prevent path traversal attacks
  623. if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
  624. strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
  625. return nil, fmt.Errorf("invalid bucket name: %s", bucket)
  626. }
  627. // Clean the bucket name further to prevent any potential path traversal
  628. bucket = filepath.Clean(bucket)
  629. if bucket == "." || bucket == ".." {
  630. return nil, fmt.Errorf("invalid bucket name: %s", bucket)
  631. }
  632. // Get bucket directory entry to access its content
  633. entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
  634. if err != nil {
  635. // Check if this is a "not found" error
  636. if errors.Is(err, filer_pb.ErrNotFound) {
  637. // Set negative cache for non-existent bucket
  638. if s3a.bucketConfigCache != nil {
  639. s3a.bucketConfigCache.SetNegativeCache(bucket)
  640. }
  641. }
  642. return nil, fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
  643. }
  644. if entry == nil {
  645. // Set negative cache for non-existent bucket
  646. if s3a.bucketConfigCache != nil {
  647. s3a.bucketConfigCache.SetNegativeCache(bucket)
  648. }
  649. return nil, fmt.Errorf("bucket directory not found %s", bucket)
  650. }
  651. // If no content, return empty metadata
  652. if len(entry.Content) == 0 {
  653. return NewBucketMetadata(), nil
  654. }
  655. // Unmarshal metadata from protobuf
  656. var protoMetadata s3_pb.BucketMetadata
  657. if err := proto.Unmarshal(entry.Content, &protoMetadata); err != nil {
  658. glog.Errorf("getBucketMetadata: failed to unmarshal protobuf metadata for bucket %s: %v", bucket, err)
  659. return nil, fmt.Errorf("failed to unmarshal bucket metadata for %s: %w", bucket, err)
  660. }
  661. // Convert protobuf CORS to standard CORS
  662. corsConfig := corsConfigFromProto(protoMetadata.Cors)
  663. // Create and return structured metadata
  664. metadata := &BucketMetadata{
  665. Tags: protoMetadata.Tags,
  666. CORS: corsConfig,
  667. Encryption: protoMetadata.Encryption,
  668. }
  669. return metadata, nil
  670. }
  671. // setBucketMetadata stores bucket metadata from a structured object
  672. func (s3a *S3ApiServer) setBucketMetadata(bucket string, metadata *BucketMetadata) error {
  673. // Validate bucket name to prevent path traversal attacks
  674. if bucket == "" || strings.Contains(bucket, "/") || strings.Contains(bucket, "\\") ||
  675. strings.Contains(bucket, "..") || strings.Contains(bucket, "~") {
  676. return fmt.Errorf("invalid bucket name: %s", bucket)
  677. }
  678. // Clean the bucket name further to prevent any potential path traversal
  679. bucket = filepath.Clean(bucket)
  680. if bucket == "." || bucket == ".." {
  681. return fmt.Errorf("invalid bucket name: %s", bucket)
  682. }
  683. // Default to empty metadata if nil
  684. if metadata == nil {
  685. metadata = NewBucketMetadata()
  686. }
  687. // Create protobuf metadata
  688. protoMetadata := &s3_pb.BucketMetadata{
  689. Tags: metadata.Tags,
  690. Cors: corsConfigToProto(metadata.CORS),
  691. Encryption: metadata.Encryption,
  692. }
  693. // Marshal metadata to protobuf
  694. metadataBytes, err := proto.Marshal(protoMetadata)
  695. if err != nil {
  696. return fmt.Errorf("failed to marshal bucket metadata to protobuf: %w", err)
  697. }
  698. // Update the bucket entry with new content
  699. err = s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  700. // Get current bucket entry
  701. entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket)
  702. if err != nil {
  703. return fmt.Errorf("error retrieving bucket directory %s: %w", bucket, err)
  704. }
  705. if entry == nil {
  706. return fmt.Errorf("bucket directory not found %s", bucket)
  707. }
  708. // Update content with metadata
  709. entry.Content = metadataBytes
  710. request := &filer_pb.UpdateEntryRequest{
  711. Directory: s3a.option.BucketsPath,
  712. Entry: entry,
  713. }
  714. _, err = client.UpdateEntry(context.Background(), request)
  715. return err
  716. })
  717. // Invalidate cache after successful update
  718. if err == nil && s3a.bucketConfigCache != nil {
  719. s3a.bucketConfigCache.Remove(bucket)
  720. s3a.bucketConfigCache.RemoveNegativeCache(bucket) // Remove from negative cache too
  721. }
  722. return err
  723. }
  724. // New structured API functions using BucketMetadata
  725. // GetBucketMetadata retrieves complete bucket metadata as a structured object
  726. func (s3a *S3ApiServer) GetBucketMetadata(bucket string) (*BucketMetadata, error) {
  727. return s3a.getBucketMetadata(bucket)
  728. }
  729. // SetBucketMetadata stores complete bucket metadata from a structured object
  730. func (s3a *S3ApiServer) SetBucketMetadata(bucket string, metadata *BucketMetadata) error {
  731. return s3a.setBucketMetadata(bucket, metadata)
  732. }
  733. // UpdateBucketMetadata updates specific parts of bucket metadata while preserving others
  734. //
  735. // DISTRIBUTED SYSTEM DESIGN NOTE:
  736. // This function implements a read-modify-write pattern with "last write wins" semantics.
  737. // In the rare case of concurrent updates to different parts of bucket metadata
  738. // (e.g., simultaneous tag and CORS updates), the last write may overwrite previous changes.
  739. //
  740. // This is an acceptable trade-off because:
  741. // 1. Bucket metadata updates are infrequent in typical S3 usage
  742. // 2. Traditional locking doesn't work in distributed systems across multiple nodes
  743. // 3. The complexity of distributed consensus (e.g., Raft) for metadata updates would
  744. // be disproportionate to the low frequency of bucket configuration changes
  745. // 4. Most bucket operations (tags, CORS, encryption) are typically configured once
  746. // during setup rather than being frequently modified
  747. //
  748. // If stronger consistency is required, consider implementing optimistic concurrency
  749. // control with version numbers or ETags at the storage layer.
  750. func (s3a *S3ApiServer) UpdateBucketMetadata(bucket string, update func(*BucketMetadata) error) error {
  751. // Get current metadata
  752. metadata, err := s3a.GetBucketMetadata(bucket)
  753. if err != nil {
  754. return fmt.Errorf("failed to get current bucket metadata: %w", err)
  755. }
  756. // Apply update function
  757. if err := update(metadata); err != nil {
  758. return fmt.Errorf("failed to apply metadata update: %w", err)
  759. }
  760. // Store updated metadata (last write wins)
  761. return s3a.SetBucketMetadata(bucket, metadata)
  762. }
  763. // Helper functions for specific metadata operations using structured API
  764. // UpdateBucketTags sets bucket tags using the structured API
  765. func (s3a *S3ApiServer) UpdateBucketTags(bucket string, tags map[string]string) error {
  766. return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
  767. metadata.Tags = tags
  768. return nil
  769. })
  770. }
  771. // UpdateBucketCORS sets bucket CORS configuration using the structured API
  772. func (s3a *S3ApiServer) UpdateBucketCORS(bucket string, corsConfig *cors.CORSConfiguration) error {
  773. return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
  774. metadata.CORS = corsConfig
  775. return nil
  776. })
  777. }
  778. // UpdateBucketEncryption sets bucket encryption configuration using the structured API
  779. func (s3a *S3ApiServer) UpdateBucketEncryption(bucket string, encryptionConfig *s3_pb.EncryptionConfiguration) error {
  780. return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
  781. metadata.Encryption = encryptionConfig
  782. return nil
  783. })
  784. }
  785. // ClearBucketTags removes all bucket tags using the structured API
  786. func (s3a *S3ApiServer) ClearBucketTags(bucket string) error {
  787. return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
  788. metadata.Tags = make(map[string]string)
  789. return nil
  790. })
  791. }
  792. // ClearBucketCORS removes bucket CORS configuration using the structured API
  793. func (s3a *S3ApiServer) ClearBucketCORS(bucket string) error {
  794. return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
  795. metadata.CORS = nil
  796. return nil
  797. })
  798. }
  799. // ClearBucketEncryption removes bucket encryption configuration using the structured API
  800. func (s3a *S3ApiServer) ClearBucketEncryption(bucket string) error {
  801. return s3a.UpdateBucketMetadata(bucket, func(metadata *BucketMetadata) error {
  802. metadata.Encryption = nil
  803. return nil
  804. })
  805. }