auth_credentials_subscribe.go 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. package s3api
  2. import (
  3. "errors"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  10. "github.com/seaweedfs/seaweedfs/weed/util"
  11. )
  12. func (s3a *S3ApiServer) subscribeMetaEvents(clientName string, lastTsNs int64, prefix string, directoriesToWatch []string) {
  13. processEventFn := func(resp *filer_pb.SubscribeMetadataResponse) error {
  14. message := resp.EventNotification
  15. if message.NewEntry == nil {
  16. return nil
  17. }
  18. dir := resp.Directory
  19. if message.NewParentPath != "" {
  20. dir = message.NewParentPath
  21. }
  22. fileName := message.NewEntry.Name
  23. content := message.NewEntry.Content
  24. _ = s3a.onIamConfigUpdate(dir, fileName, content)
  25. _ = s3a.onCircuitBreakerConfigUpdate(dir, fileName, content)
  26. _ = s3a.onBucketMetadataChange(dir, message.OldEntry, message.NewEntry)
  27. return nil
  28. }
  29. metadataFollowOption := &pb.MetadataFollowOption{
  30. ClientName: clientName,
  31. ClientId: s3a.randomClientId,
  32. ClientEpoch: 1,
  33. SelfSignature: 0,
  34. PathPrefix: prefix,
  35. AdditionalPathPrefixes: nil,
  36. DirectoriesToWatch: directoriesToWatch,
  37. StartTsNs: lastTsNs,
  38. StopTsNs: 0,
  39. EventErrorType: pb.FatalOnError,
  40. }
  41. util.RetryUntil("followIamChanges", func() error {
  42. metadataFollowOption.ClientEpoch++
  43. return pb.WithFilerClientFollowMetadata(s3a, metadataFollowOption, processEventFn)
  44. }, func(err error) bool {
  45. glog.V(0).Infof("iam follow metadata changes: %v", err)
  46. return true
  47. })
  48. }
  49. // reload iam config
  50. func (s3a *S3ApiServer) onIamConfigUpdate(dir, filename string, content []byte) error {
  51. if dir == filer.IamConfigDirectory && filename == filer.IamIdentityFile {
  52. if err := s3a.iam.LoadS3ApiConfigurationFromBytes(content); err != nil {
  53. return err
  54. }
  55. glog.V(0).Infof("updated %s/%s", dir, filename)
  56. }
  57. return nil
  58. }
  59. // reload circuit breaker config
  60. func (s3a *S3ApiServer) onCircuitBreakerConfigUpdate(dir, filename string, content []byte) error {
  61. if dir == s3_constants.CircuitBreakerConfigDir && filename == s3_constants.CircuitBreakerConfigFile {
  62. if err := s3a.cb.LoadS3ApiConfigurationFromBytes(content); err != nil {
  63. return err
  64. }
  65. glog.V(0).Infof("updated %s/%s", dir, filename)
  66. }
  67. return nil
  68. }
  69. // reload bucket metadata
  70. func (s3a *S3ApiServer) onBucketMetadataChange(dir string, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) error {
  71. if dir == s3a.option.BucketsPath {
  72. if newEntry != nil {
  73. // Update bucket registry (existing functionality)
  74. s3a.bucketRegistry.LoadBucketMetadata(newEntry)
  75. glog.V(0).Infof("updated bucketMetadata %s/%s", dir, newEntry.Name)
  76. // Update bucket configuration cache with new entry
  77. s3a.updateBucketConfigCacheFromEntry(newEntry)
  78. } else if oldEntry != nil {
  79. // Remove from bucket registry (existing functionality)
  80. s3a.bucketRegistry.RemoveBucketMetadata(oldEntry)
  81. glog.V(0).Infof("remove bucketMetadata %s/%s", dir, oldEntry.Name)
  82. // Remove from bucket configuration cache
  83. s3a.invalidateBucketConfigCache(oldEntry.Name)
  84. }
  85. }
  86. return nil
  87. }
  88. // updateBucketConfigCacheFromEntry updates the bucket config cache when a bucket entry changes
  89. func (s3a *S3ApiServer) updateBucketConfigCacheFromEntry(entry *filer_pb.Entry) {
  90. if s3a.bucketConfigCache == nil {
  91. return
  92. }
  93. bucket := entry.Name
  94. // Create new bucket config from the entry
  95. config := &BucketConfig{
  96. Name: bucket,
  97. Entry: entry,
  98. IsPublicRead: false, // Explicitly default to false for private buckets
  99. }
  100. // Extract configuration from extended attributes
  101. if entry.Extended != nil {
  102. if versioning, exists := entry.Extended[s3_constants.ExtVersioningKey]; exists {
  103. config.Versioning = string(versioning)
  104. }
  105. if ownership, exists := entry.Extended[s3_constants.ExtOwnershipKey]; exists {
  106. config.Ownership = string(ownership)
  107. }
  108. if acl, exists := entry.Extended[s3_constants.ExtAmzAclKey]; exists {
  109. config.ACL = acl
  110. // Parse ACL and cache public-read status
  111. config.IsPublicRead = parseAndCachePublicReadStatus(acl)
  112. } else {
  113. // No ACL means private bucket
  114. config.IsPublicRead = false
  115. }
  116. if owner, exists := entry.Extended[s3_constants.ExtAmzOwnerKey]; exists {
  117. config.Owner = string(owner)
  118. }
  119. // Parse Object Lock configuration if present
  120. if objectLockConfig, found := LoadObjectLockConfigurationFromExtended(entry); found {
  121. config.ObjectLockConfig = objectLockConfig
  122. glog.V(2).Infof("updateBucketConfigCacheFromEntry: cached Object Lock configuration for bucket %s", bucket)
  123. }
  124. }
  125. // Load CORS configuration from bucket directory content
  126. if corsConfig, err := s3a.loadCORSFromBucketContent(bucket); err != nil {
  127. if !errors.Is(err, filer_pb.ErrNotFound) {
  128. glog.Errorf("updateBucketConfigCacheFromEntry: failed to load CORS configuration for bucket %s: %v", bucket, err)
  129. }
  130. } else {
  131. config.CORS = corsConfig
  132. glog.V(2).Infof("updateBucketConfigCacheFromEntry: loaded CORS config for bucket %s", bucket)
  133. }
  134. // Update timestamp
  135. config.LastModified = time.Now()
  136. // Update cache
  137. s3a.bucketConfigCache.Set(bucket, config)
  138. }
  139. // invalidateBucketConfigCache removes a bucket from the configuration cache
  140. func (s3a *S3ApiServer) invalidateBucketConfigCache(bucket string) {
  141. if s3a.bucketConfigCache == nil {
  142. return
  143. }
  144. s3a.bucketConfigCache.Remove(bucket)
  145. s3a.bucketConfigCache.RemoveNegativeCache(bucket) // Also remove from negative cache
  146. glog.V(2).Infof("invalidateBucketConfigCache: removed bucket %s from cache", bucket)
  147. }