| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249 |
- package s3api
- import (
- "context"
- "fmt"
- "net/http"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
- )
- // executeUnifiedCopyStrategy executes the appropriate copy strategy based on encryption state
- // Returns chunks and destination metadata that should be applied to the destination entry
- func (s3a *S3ApiServer) executeUnifiedCopyStrategy(entry *filer_pb.Entry, r *http.Request, dstBucket, srcObject, dstObject string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
- // Detect encryption state (using entry-aware detection for multipart objects)
- srcPath := fmt.Sprintf("/%s/%s", r.Header.Get("X-Amz-Copy-Source-Bucket"), srcObject)
- dstPath := fmt.Sprintf("/%s/%s", dstBucket, dstObject)
- state := DetectEncryptionStateWithEntry(entry, r, srcPath, dstPath)
- // Debug logging for encryption state
- // Apply bucket default encryption if no explicit encryption specified
- if !state.IsTargetEncrypted() {
- bucketMetadata, err := s3a.getBucketMetadata(dstBucket)
- if err == nil && bucketMetadata != nil && bucketMetadata.Encryption != nil {
- switch bucketMetadata.Encryption.SseAlgorithm {
- case "aws:kms":
- state.DstSSEKMS = true
- case "AES256":
- state.DstSSES3 = true
- }
- }
- }
- // Determine copy strategy
- strategy, err := DetermineUnifiedCopyStrategy(state, entry.Extended, r)
- if err != nil {
- return nil, nil, err
- }
- glog.V(2).Infof("Unified copy strategy for %s → %s: %v", srcPath, dstPath, strategy)
- // Calculate optimized sizes for the strategy
- sizeCalc := CalculateOptimizedSizes(entry, r, strategy)
- glog.V(2).Infof("Size calculation: src=%d, target=%d, actual=%d, overhead=%d, preallocate=%v",
- sizeCalc.SourceSize, sizeCalc.TargetSize, sizeCalc.ActualContentSize,
- sizeCalc.EncryptionOverhead, sizeCalc.CanPreallocate)
- // Execute strategy
- switch strategy {
- case CopyStrategyDirect:
- chunks, err := s3a.copyChunks(entry, dstPath)
- return chunks, nil, err
- case CopyStrategyKeyRotation:
- return s3a.executeKeyRotation(entry, r, state)
- case CopyStrategyEncrypt:
- return s3a.executeEncryptCopy(entry, r, state, dstBucket, dstPath)
- case CopyStrategyDecrypt:
- return s3a.executeDecryptCopy(entry, r, state, dstPath)
- case CopyStrategyReencrypt:
- return s3a.executeReencryptCopy(entry, r, state, dstBucket, dstPath)
- default:
- return nil, nil, fmt.Errorf("unknown unified copy strategy: %v", strategy)
- }
- }
- // mapCopyErrorToS3Error maps various copy errors to appropriate S3 error codes
- func (s3a *S3ApiServer) mapCopyErrorToS3Error(err error) s3err.ErrorCode {
- if err == nil {
- return s3err.ErrNone
- }
- // Check for KMS errors first
- if kmsErr := MapKMSErrorToS3Error(err); kmsErr != s3err.ErrInvalidRequest {
- return kmsErr
- }
- // Check for SSE-C errors
- if ssecErr := MapSSECErrorToS3Error(err); ssecErr != s3err.ErrInvalidRequest {
- return ssecErr
- }
- // Default to internal error for unknown errors
- return s3err.ErrInternalError
- }
- // executeKeyRotation handles key rotation for same-object copies
- func (s3a *S3ApiServer) executeKeyRotation(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) ([]*filer_pb.FileChunk, map[string][]byte, error) {
- // For key rotation, we only need to update metadata, not re-copy chunks
- // This is a significant optimization for same-object key changes
- if state.SrcSSEC && state.DstSSEC {
- // SSE-C key rotation - need to handle new key/IV, use reencrypt logic
- return s3a.executeReencryptCopy(entry, r, state, "", "")
- }
- if state.SrcSSEKMS && state.DstSSEKMS {
- // SSE-KMS key rotation - return existing chunks, metadata will be updated by caller
- return entry.GetChunks(), nil, nil
- }
- // Fallback to reencrypt if we can't do metadata-only rotation
- return s3a.executeReencryptCopy(entry, r, state, "", "")
- }
- // executeEncryptCopy handles plain → encrypted copies
- func (s3a *S3ApiServer) executeEncryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
- if state.DstSSEC {
- // Use existing SSE-C copy logic
- return s3a.copyChunksWithSSEC(entry, r)
- }
- if state.DstSSEKMS {
- // Use existing SSE-KMS copy logic - metadata is now generated internally
- chunks, dstMetadata, err := s3a.copyChunksWithSSEKMS(entry, r, dstBucket)
- return chunks, dstMetadata, err
- }
- if state.DstSSES3 {
- // Use streaming copy for SSE-S3 encryption
- chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
- return chunks, nil, err
- }
- return nil, nil, fmt.Errorf("unknown target encryption type")
- }
- // executeDecryptCopy handles encrypted → plain copies
- func (s3a *S3ApiServer) executeDecryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
- // Use unified multipart-aware decrypt copy for all encryption types
- if state.SrcSSEC || state.SrcSSEKMS {
- glog.V(2).Infof("Encrypted→Plain copy: using unified multipart decrypt copy")
- return s3a.copyMultipartCrossEncryption(entry, r, state, "", dstPath)
- }
- if state.SrcSSES3 {
- // Use streaming copy for SSE-S3 decryption
- chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
- return chunks, nil, err
- }
- return nil, nil, fmt.Errorf("unknown source encryption type")
- }
- // executeReencryptCopy handles encrypted → encrypted copies with different keys/methods
- func (s3a *S3ApiServer) executeReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstBucket, dstPath string) ([]*filer_pb.FileChunk, map[string][]byte, error) {
- // Check if we should use streaming copy for better performance
- if s3a.shouldUseStreamingCopy(entry, state) {
- chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
- return chunks, nil, err
- }
- // Fallback to chunk-by-chunk approach for compatibility
- if state.SrcSSEC && state.DstSSEC {
- return s3a.copyChunksWithSSEC(entry, r)
- }
- if state.SrcSSEKMS && state.DstSSEKMS {
- // Use existing SSE-KMS copy logic - metadata is now generated internally
- chunks, dstMetadata, err := s3a.copyChunksWithSSEKMS(entry, r, dstBucket)
- return chunks, dstMetadata, err
- }
- if state.SrcSSEC && state.DstSSEKMS {
- // SSE-C → SSE-KMS: use unified multipart-aware cross-encryption copy
- glog.V(2).Infof("SSE-C→SSE-KMS cross-encryption copy: using unified multipart copy")
- return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
- }
- if state.SrcSSEKMS && state.DstSSEC {
- // SSE-KMS → SSE-C: use unified multipart-aware cross-encryption copy
- glog.V(2).Infof("SSE-KMS→SSE-C cross-encryption copy: using unified multipart copy")
- return s3a.copyMultipartCrossEncryption(entry, r, state, dstBucket, dstPath)
- }
- // Handle SSE-S3 cross-encryption scenarios
- if state.SrcSSES3 || state.DstSSES3 {
- // Any scenario involving SSE-S3 uses streaming copy
- chunks, err := s3a.executeStreamingReencryptCopy(entry, r, state, dstPath)
- return chunks, nil, err
- }
- return nil, nil, fmt.Errorf("unsupported cross-encryption scenario")
- }
- // shouldUseStreamingCopy determines if streaming copy should be used
- func (s3a *S3ApiServer) shouldUseStreamingCopy(entry *filer_pb.Entry, state *EncryptionState) bool {
- // Use streaming copy for large files or when beneficial
- fileSize := entry.Attributes.FileSize
- // Use streaming for files larger than 10MB
- if fileSize > 10*1024*1024 {
- return true
- }
- // Check if this is a multipart encrypted object
- isMultipartEncrypted := false
- if state.IsSourceEncrypted() {
- encryptedChunks := 0
- for _, chunk := range entry.GetChunks() {
- if chunk.GetSseType() != filer_pb.SSEType_NONE {
- encryptedChunks++
- }
- }
- isMultipartEncrypted = encryptedChunks > 1
- }
- // For multipart encrypted objects, avoid streaming copy to use per-chunk metadata approach
- if isMultipartEncrypted {
- glog.V(3).Infof("Multipart encrypted object detected, using chunk-by-chunk approach")
- return false
- }
- // Use streaming for cross-encryption scenarios (for single-part objects only)
- if state.IsSourceEncrypted() && state.IsTargetEncrypted() {
- srcType := s3a.getEncryptionTypeString(state.SrcSSEC, state.SrcSSEKMS, state.SrcSSES3)
- dstType := s3a.getEncryptionTypeString(state.DstSSEC, state.DstSSEKMS, state.DstSSES3)
- if srcType != dstType {
- return true
- }
- }
- // Use streaming for compressed files
- if isCompressedEntry(entry) {
- return true
- }
- // Use streaming for SSE-S3 scenarios (always)
- if state.SrcSSES3 || state.DstSSES3 {
- return true
- }
- return false
- }
- // executeStreamingReencryptCopy performs streaming re-encryption copy
- func (s3a *S3ApiServer) executeStreamingReencryptCopy(entry *filer_pb.Entry, r *http.Request, state *EncryptionState, dstPath string) ([]*filer_pb.FileChunk, error) {
- // Create streaming copy manager
- streamingManager := NewStreamingCopyManager(s3a)
- // Execute streaming copy
- return streamingManager.ExecuteStreamingCopy(context.Background(), entry, r, dstPath, state)
- }
|