s3api_object_handlers_copy_unified.go 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. package s3api
  2. import (
  3. "context"
  4. "fmt"
  5. "net/http"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  9. )
  10. // executeUnifiedCopyStrategy executes the appropriate copy strategy based on encryption state
  11. // Returns chunks and destination metadata that should be applied to the destination entry
  12. func (s3a *S3ApiServer) executeUnifiedCopyStrategy(entry *filer_pb.Entry, r *http.Request, dstBucket, srcObject, dstObject string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  13. // Detect encryption state (using entry-aware detection for multipart objects)
  14. srcPath := fmt.Sprintf("/%s/%s", r.Header.Get("X-Amz-Copy-Source-Bucket"), srcObject)
  15. dstPath := fmt.Sprintf("/%s/%s", dstBucket, dstObject)
  16. state := DetectEncryptionStateWithEntry(entry, r, srcPath, dstPath)
  17. // Debug logging for encryption state
  18. // Apply bucket default encryption if no explicit encryption specified
  19. if !state.IsTargetEncrypted() {
  20. bucketMetadata, err := s3a.getBucketMetadata(dstBucket)
  21. if err == nil && bucketMetadata != nil && bucketMetadata.Encryption != nil {
  22. switch bucketMetadata.Encryption.SseAlgorithm {
  23. case "aws:kms":
  24. state.DstSSEKMS = true
  25. case "AES256":
  26. state.DstSSES3 = true
  27. }
  28. }
  29. }
  30. // Determine copy strategy
  31. strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r)
  32. if err != nil {
  33. return nil, nil, err
  34. }
  35. glog.V(2).Infof("Unified copy strategy for %s → %s: %v", srcPath, dstPath, strategy)
  36. // Calculate optimized sizes for the strategy
  37. sizeCalc := CalculateOptimizedSizes(entry, r, strategy)
  38. glog.V(2).Infof("Size calculation: src=%d, target=%d, actual=%d, overhead=%d, preallocate=%v",
  39. sizeCalc.SourceSize, sizeCalc.TargetSize, sizeCalc.ActualContentSize,
  40. sizeCalc.EncryptionOverhead, sizeCalc.CanPreallocate)
  41. // Execute strategy
  42. switch strategy {
  43. case CopyStrategyDirect:
  44. chunks, err := s3a.copyChunks(entry, dstPath)
  45. return chunks, nil, err
  46. case CopyStrategyKeyRotation:
  47. return s3a.executeKeyRotation(entry, r, state)
  48. case CopyStrategyEncrypt:
  49. return s3a.executeEncryptCopy(entry, r, state, dstBucket, dstPath)
  50. case CopyStrategyDecrypt:
  51. return s3a.executeDecryptCopy(entry, r, state, dstPath)
  52. case CopyStrategyReencrypt:
  53. return s3a.executeReencryptCopy(entry, r, state, dstBucket, dstPath)
  54. default:
  55. return nil, nil, fmt.Errorf("unknown unified copy strategy: %v", strategy)
  56. }
  57. }
  58. // mapCopyErrorToS3Error maps various copy errors to appropriate S3 error codes
  59. func (s3a *S3ApiServer) mapCopyErrorToS3Error(err error) s3err.ErrorCode {
  60. if err == nil {
  61. return s3err.ErrNone
  62. }
  63. // Check for KMS errors first
  64. if kmsErr := MapKMSErrorToS3Error(err); kmsErr != s3err.ErrInvalidRequest {
  65. return kmsErr
  66. }
  67. // Check for SSE-C errors
  68. if ssecErr := MapSSECErrorToS3Error(err); ssecErr != s3err.ErrInvalidRequest {
  69. return ssecErr
  70. }
  71. // Default to internal error for unknown errors
  72. return s3err.ErrInternalError
  73. }
  74. // executeKeyRotation handles key rotation for same-object copies
  75. func (s3a *S3ApiServer) executeKeyRotation(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  76. // For key rotation, we only need to update metadata, not re-copy chunks
  77. // This is a significant optimization for same-object key changes
  78. if state.SrcSSEC && state.DstSSEC {
  79. // SSE-C key rotation - need to handle new key/IV, use reencrypt logic
  80. return s3a.executeReencryptCopy(entry, r, state, "", "")
  81. }
  82. if state.SrcSSEKMS && state.DstSSEKMS {
  83. // SSE-KMS key rotation - return existing chunks, metadata will be updated by caller
  84. return entry.GetChunks(), nil, nil
  85. }
  86. // Fallback to reencrypt if we can't do metadata-only rotation
  87. return s3a.executeReencryptCopy(entry, r, state, "", "")
  88. }
  89. // executeEncryptCopy handles plain → encrypted copies
  90. func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  91. if state.DstSSEC {
  92. // Use existing SSE-C copy logic
  93. return s3a.copyChunksWithSSEC(entry, r)
  94. }
  95. if state.DstSSEKMS {
  96. // Use existing SSE-KMS copy logic - metadata is now generated internally
  97. chunks, dstMetadata, err := s3a.copyChunksWithSSEKMS(entry, r, dstBucket)
  98. return chunks, dstMetadata, err
  99. }
  100. if state.DstSSES3 {
  101. // Use streaming copy for SSE-S3 encryption
  102. chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
  103. return chunks, nil, err
  104. }
  105. return nil, nil, fmt.Errorf("unknown target encryption type")
  106. }
  107. // executeDecryptCopy handles encrypted → plain copies
  108. func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  109. // Use unified multipart-aware decrypt copy for all encryption types
  110. if state.SrcSSEC || state.SrcSSEKMS {
  111. glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy")
  112. return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath)
  113. }
  114. if state.SrcSSES3 {
  115. // Use streaming copy for SSE-S3 decryption
  116. chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
  117. return chunks, nil, err
  118. }
  119. return nil, nil, fmt.Errorf("unknown source encryption type")
  120. }
  121. // executeReencryptCopy handles encrypted → encrypted copies with different keys/methods
  122. func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
  123. // Check if we should use streaming copy for better performance
  124. if s3a.shouldUseStreamingCopy(entry, state) {
  125. chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
  126. return chunks, nil, err
  127. }
  128. // Fallback to chunk-by-chunk approach for compatibility
  129. if state.SrcSSEC && state.DstSSEC {
  130. return s3a.copyChunksWithSSEC(entry, r)
  131. }
  132. if state.SrcSSEKMS && state.DstSSEKMS {
  133. // Use existing SSE-KMS copy logic - metadata is now generated internally
  134. chunks, dstMetadata, err := s3a.copyChunksWithSSEKMS(entry, r, dstBucket)
  135. return chunks, dstMetadata, err
  136. }
  137. if state.SrcSSEC && state.DstSSEKMS {
  138. // SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy
  139. glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy")
  140. return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
  141. }
  142. if state.SrcSSEKMS && state.DstSSEC {
  143. // SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy
  144. glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy")
  145. return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
  146. }
  147. // Handle SSE-S3 cross-encryption scenarios
  148. if state.SrcSSES3 || state.DstSSES3 {
  149. // Any scenario involving SSE-S3 uses streaming copy
  150. chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
  151. return chunks, nil, err
  152. }
  153. return nil, nil, fmt.Errorf("unsupported cross-encryption scenario")
  154. }
  155. // shouldUseStreamingCopy determines if streaming copy should be used
  156. func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool {
  157. // Use streaming copy for large files or when beneficial
  158. fileSize := entry.Attributes.FileSize
  159. // Use streaming for files larger than 10MB
  160. if fileSize > 10*1024*1024 {
  161. return true
  162. }
  163. // Check if this is a multipart encrypted object
  164. isMultipartEncrypted := false
  165. if state.IsSourceEncrypted() {
  166. encryptedChunks := 0
  167. for _, chunk := range entry.GetChunks() {
  168. if chunk.GetSseType() != filer_pb.SSEType_NONE {
  169. encryptedChunks++
  170. }
  171. }
  172. isMultipartEncrypted = encryptedChunks > 1
  173. }
  174. // For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach
  175. if isMultipartEncrypted {
  176. glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach")
  177. return false
  178. }
  179. // Use streaming for cross-encryption scenarios (for single-part objects only)
  180. if state.IsSourceEncrypted() && state.IsTargetEncrypted() {
  181. srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3)
  182. dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3)
  183. if srcType != dstType {
  184. return true
  185. }
  186. }
  187. // Use streaming for compressed files
  188. if isCompressedEntry(entry) {
  189. return true
  190. }
  191. // Use streaming for SSE-S3 scenarios (always)
  192. if state.SrcSSES3 || state.DstSSES3 {
  193. return true
  194. }
  195. return false
  196. }
  197. // executeStreamingReencryptCopy performs streaming re-encryption copy
  198. func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, error) {
  199. // Create streaming copy manager
  200. streamingManager := NewStreamingCopyManager(s3a)
  201. // Execute streaming copy
  202. return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state)
  203. }