| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561 |
- package s3api
- import (
- "context"
- "crypto/md5"
- "crypto/sha256"
- "encoding/hex"
- "fmt"
- "hash"
- "io"
- "net/http"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- "github.com/seaweedfs/seaweedfs/weed/util"
- )
- // StreamingCopySpec defines the specification for streaming copy operations
- type StreamingCopySpec struct {
- SourceReader io.Reader
- TargetSize int64
- EncryptionSpec *EncryptionSpec
- CompressionSpec *CompressionSpec
- HashCalculation bool
- BufferSize int
- }
- // EncryptionSpec defines encryption parameters for streaming
- type EncryptionSpec struct {
- NeedsDecryption bool
- NeedsEncryption bool
- SourceKey interface{} // SSECustomerKey or SSEKMSKey
- DestinationKey interface{} // SSECustomerKey or SSEKMSKey
- SourceType EncryptionType
- DestinationType EncryptionType
- SourceMetadata map[string][]byte // Source metadata for IV extraction
- DestinationIV []byte // Generated IV for destination
- }
- // CompressionSpec defines compression parameters for streaming
- type CompressionSpec struct {
- IsCompressed bool
- CompressionType string
- NeedsDecompression bool
- NeedsCompression bool
- }
- // StreamingCopyManager handles streaming copy operations
- type StreamingCopyManager struct {
- s3a *S3ApiServer
- bufferSize int
- }
- // NewStreamingCopyManager creates a new streaming copy manager
- func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
- return &StreamingCopyManager{
- s3a: s3a,
- bufferSize: 64 * 1024, // 64KB default buffer
- }
- }
- // ExecuteStreamingCopy performs a streaming copy operation
- func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
- // Create streaming copy specification
- spec, err := scm.createStreamingSpec(entry, r, state)
- if err != nil {
- return nil, fmt.Errorf("create streaming spec: %w", err)
- }
- // Create source reader from entry
- sourceReader, err := scm.createSourceReader(entry)
- if err != nil {
- return nil, fmt.Errorf("create source reader: %w", err)
- }
- defer sourceReader.Close()
- spec.SourceReader = sourceReader
- // Create processing pipeline
- processedReader, err := scm.createProcessingPipeline(spec)
- if err != nil {
- return nil, fmt.Errorf("create processing pipeline: %w", err)
- }
- // Stream to destination
- return scm.streamToDestination(ctx, processedReader, spec, dstPath)
- }
- // createStreamingSpec creates a streaming specification based on copy parameters
- func (scm *StreamingCopyManager) createStreamingSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*StreamingCopySpec, error) {
- spec := &StreamingCopySpec{
- BufferSize: scm.bufferSize,
- HashCalculation: true,
- }
- // Calculate target size
- sizeCalc := NewCopySizeCalculator(entry, r)
- spec.TargetSize = sizeCalc.CalculateTargetSize()
- // Create encryption specification
- encSpec, err := scm.createEncryptionSpec(entry, r, state)
- if err != nil {
- return nil, err
- }
- spec.EncryptionSpec = encSpec
- // Create compression specification
- spec.CompressionSpec = scm.createCompressionSpec(entry, r)
- return spec, nil
- }
- // createEncryptionSpec creates encryption specification for streaming
- func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*EncryptionSpec, error) {
- spec := &EncryptionSpec{
- NeedsDecryption: state.IsSourceEncrypted(),
- NeedsEncryption: state.IsTargetEncrypted(),
- SourceMetadata: entry.Extended, // Pass source metadata for IV extraction
- }
- // Set source encryption details
- if state.SrcSSEC {
- spec.SourceType = EncryptionTypeSSEC
- sourceKey, err := ParseSSECCopySourceHeaders(r)
- if err != nil {
- return nil, fmt.Errorf("parse SSE-C copy source headers: %w", err)
- }
- spec.SourceKey = sourceKey
- } else if state.SrcSSEKMS {
- spec.SourceType = EncryptionTypeSSEKMS
- // Extract SSE-KMS key from metadata
- if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
- sseKey, err := DeserializeSSEKMSMetadata(keyData)
- if err != nil {
- return nil, fmt.Errorf("deserialize SSE-KMS metadata: %w", err)
- }
- spec.SourceKey = sseKey
- }
- } else if state.SrcSSES3 {
- spec.SourceType = EncryptionTypeSSES3
- // Extract SSE-S3 key from metadata
- if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists {
- // TODO: This should use a proper SSE-S3 key manager from S3ApiServer
- // For now, create a temporary key manager to handle deserialization
- tempKeyManager := NewSSES3KeyManager()
- sseKey, err := DeserializeSSES3Metadata(keyData, tempKeyManager)
- if err != nil {
- return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err)
- }
- spec.SourceKey = sseKey
- }
- }
- // Set destination encryption details
- if state.DstSSEC {
- spec.DestinationType = EncryptionTypeSSEC
- destKey, err := ParseSSECHeaders(r)
- if err != nil {
- return nil, fmt.Errorf("parse SSE-C headers: %w", err)
- }
- spec.DestinationKey = destKey
- } else if state.DstSSEKMS {
- spec.DestinationType = EncryptionTypeSSEKMS
- // Parse KMS parameters
- keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
- if err != nil {
- return nil, fmt.Errorf("parse SSE-KMS copy headers: %w", err)
- }
- // Create SSE-KMS key for destination
- sseKey := &SSEKMSKey{
- KeyID: keyID,
- EncryptionContext: encryptionContext,
- BucketKeyEnabled: bucketKeyEnabled,
- }
- spec.DestinationKey = sseKey
- } else if state.DstSSES3 {
- spec.DestinationType = EncryptionTypeSSES3
- // Generate or retrieve SSE-S3 key
- keyManager := GetSSES3KeyManager()
- sseKey, err := keyManager.GetOrCreateKey("")
- if err != nil {
- return nil, fmt.Errorf("get SSE-S3 key: %w", err)
- }
- spec.DestinationKey = sseKey
- }
- return spec, nil
- }
- // createCompressionSpec creates compression specification for streaming
- func (scm *StreamingCopyManager) createCompressionSpec(entry *filer_pb.Entry, r *http.Request) *CompressionSpec {
- return &CompressionSpec{
- IsCompressed: isCompressedEntry(entry),
- // For now, we don't change compression during copy
- NeedsDecompression: false,
- NeedsCompression: false,
- }
- }
- // createSourceReader creates a reader for the source entry
- func (scm *StreamingCopyManager) createSourceReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
- // Create a multi-chunk reader that streams from all chunks
- return scm.s3a.createMultiChunkReader(entry)
- }
- // createProcessingPipeline creates a processing pipeline for the copy operation
- func (scm *StreamingCopyManager) createProcessingPipeline(spec *StreamingCopySpec) (io.Reader, error) {
- reader := spec.SourceReader
- // Add decryption if needed
- if spec.EncryptionSpec.NeedsDecryption {
- decryptedReader, err := scm.createDecryptionReader(reader, spec.EncryptionSpec)
- if err != nil {
- return nil, fmt.Errorf("create decryption reader: %w", err)
- }
- reader = decryptedReader
- }
- // Add decompression if needed
- if spec.CompressionSpec.NeedsDecompression {
- decompressedReader, err := scm.createDecompressionReader(reader, spec.CompressionSpec)
- if err != nil {
- return nil, fmt.Errorf("create decompression reader: %w", err)
- }
- reader = decompressedReader
- }
- // Add compression if needed
- if spec.CompressionSpec.NeedsCompression {
- compressedReader, err := scm.createCompressionReader(reader, spec.CompressionSpec)
- if err != nil {
- return nil, fmt.Errorf("create compression reader: %w", err)
- }
- reader = compressedReader
- }
- // Add encryption if needed
- if spec.EncryptionSpec.NeedsEncryption {
- encryptedReader, err := scm.createEncryptionReader(reader, spec.EncryptionSpec)
- if err != nil {
- return nil, fmt.Errorf("create encryption reader: %w", err)
- }
- reader = encryptedReader
- }
- // Add hash calculation if needed
- if spec.HashCalculation {
- reader = scm.createHashReader(reader)
- }
- return reader, nil
- }
- // createDecryptionReader creates a decryption reader based on encryption type
- func (scm *StreamingCopyManager) createDecryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
- switch encSpec.SourceType {
- case EncryptionTypeSSEC:
- if sourceKey, ok := encSpec.SourceKey.(*SSECustomerKey); ok {
- // Get IV from metadata
- iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
- if err != nil {
- return nil, fmt.Errorf("get IV from metadata: %w", err)
- }
- return CreateSSECDecryptedReader(reader, sourceKey, iv)
- }
- return nil, fmt.Errorf("invalid SSE-C source key type")
- case EncryptionTypeSSEKMS:
- if sseKey, ok := encSpec.SourceKey.(*SSEKMSKey); ok {
- return CreateSSEKMSDecryptedReader(reader, sseKey)
- }
- return nil, fmt.Errorf("invalid SSE-KMS source key type")
- case EncryptionTypeSSES3:
- if sseKey, ok := encSpec.SourceKey.(*SSES3Key); ok {
- // Get IV from metadata
- iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
- if err != nil {
- return nil, fmt.Errorf("get IV from metadata: %w", err)
- }
- return CreateSSES3DecryptedReader(reader, sseKey, iv)
- }
- return nil, fmt.Errorf("invalid SSE-S3 source key type")
- default:
- return reader, nil
- }
- }
- // createEncryptionReader creates an encryption reader based on encryption type
- func (scm *StreamingCopyManager) createEncryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
- switch encSpec.DestinationType {
- case EncryptionTypeSSEC:
- if destKey, ok := encSpec.DestinationKey.(*SSECustomerKey); ok {
- encryptedReader, iv, err := CreateSSECEncryptedReader(reader, destKey)
- if err != nil {
- return nil, err
- }
- // Store IV in destination metadata (this would need to be handled by caller)
- encSpec.DestinationIV = iv
- return encryptedReader, nil
- }
- return nil, fmt.Errorf("invalid SSE-C destination key type")
- case EncryptionTypeSSEKMS:
- if sseKey, ok := encSpec.DestinationKey.(*SSEKMSKey); ok {
- encryptedReader, updatedKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(reader, sseKey.KeyID, sseKey.EncryptionContext, sseKey.BucketKeyEnabled)
- if err != nil {
- return nil, err
- }
- // Store IV from the updated key
- encSpec.DestinationIV = updatedKey.IV
- return encryptedReader, nil
- }
- return nil, fmt.Errorf("invalid SSE-KMS destination key type")
- case EncryptionTypeSSES3:
- if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
- encryptedReader, iv, err := CreateSSES3EncryptedReader(reader, sseKey)
- if err != nil {
- return nil, err
- }
- // Store IV for metadata
- encSpec.DestinationIV = iv
- return encryptedReader, nil
- }
- return nil, fmt.Errorf("invalid SSE-S3 destination key type")
- default:
- return reader, nil
- }
- }
- // createDecompressionReader creates a decompression reader
- func (scm *StreamingCopyManager) createDecompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
- if !compSpec.NeedsDecompression {
- return reader, nil
- }
- switch compSpec.CompressionType {
- case "gzip":
- // Use SeaweedFS's streaming gzip decompression
- pr, pw := io.Pipe()
- go func() {
- defer pw.Close()
- _, err := util.GunzipStream(pw, reader)
- if err != nil {
- pw.CloseWithError(fmt.Errorf("gzip decompression failed: %v", err))
- }
- }()
- return pr, nil
- default:
- // Unknown compression type, return as-is
- return reader, nil
- }
- }
- // createCompressionReader creates a compression reader
- func (scm *StreamingCopyManager) createCompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
- if !compSpec.NeedsCompression {
- return reader, nil
- }
- switch compSpec.CompressionType {
- case "gzip":
- // Use SeaweedFS's streaming gzip compression
- pr, pw := io.Pipe()
- go func() {
- defer pw.Close()
- _, err := util.GzipStream(pw, reader)
- if err != nil {
- pw.CloseWithError(fmt.Errorf("gzip compression failed: %v", err))
- }
- }()
- return pr, nil
- default:
- // Unknown compression type, return as-is
- return reader, nil
- }
- }
- // HashReader wraps an io.Reader to calculate MD5 and SHA256 hashes
- type HashReader struct {
- reader io.Reader
- md5Hash hash.Hash
- sha256Hash hash.Hash
- }
- // NewHashReader creates a new hash calculating reader
- func NewHashReader(reader io.Reader) *HashReader {
- return &HashReader{
- reader: reader,
- md5Hash: md5.New(),
- sha256Hash: sha256.New(),
- }
- }
- // Read implements io.Reader and calculates hashes as data flows through
- func (hr *HashReader) Read(p []byte) (n int, err error) {
- n, err = hr.reader.Read(p)
- if n > 0 {
- // Update both hashes with the data read
- hr.md5Hash.Write(p[:n])
- hr.sha256Hash.Write(p[:n])
- }
- return n, err
- }
- // MD5Sum returns the current MD5 hash
- func (hr *HashReader) MD5Sum() []byte {
- return hr.md5Hash.Sum(nil)
- }
- // SHA256Sum returns the current SHA256 hash
- func (hr *HashReader) SHA256Sum() []byte {
- return hr.sha256Hash.Sum(nil)
- }
- // MD5Hex returns the MD5 hash as a hex string
- func (hr *HashReader) MD5Hex() string {
- return hex.EncodeToString(hr.MD5Sum())
- }
- // SHA256Hex returns the SHA256 hash as a hex string
- func (hr *HashReader) SHA256Hex() string {
- return hex.EncodeToString(hr.SHA256Sum())
- }
- // createHashReader creates a hash calculation reader
- func (scm *StreamingCopyManager) createHashReader(reader io.Reader) io.Reader {
- return NewHashReader(reader)
- }
- // streamToDestination streams the processed data to the destination
- func (scm *StreamingCopyManager) streamToDestination(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
- // For now, we'll use the existing chunk-based approach
- // In a full implementation, this would stream directly to the destination
- // without creating intermediate chunks
- // This is a placeholder that converts back to chunk-based approach
- // A full streaming implementation would write directly to the destination
- return scm.streamToChunks(ctx, reader, spec, dstPath)
- }
- // streamToChunks converts streaming data back to chunks (temporary implementation)
- func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
- // This is a simplified implementation that reads the stream and creates chunks
- // A full implementation would be more sophisticated
- var chunks []*filer_pb.FileChunk
- buffer := make([]byte, spec.BufferSize)
- offset := int64(0)
- for {
- n, err := reader.Read(buffer)
- if n > 0 {
- // Create chunk for this data
- chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
- if chunkErr != nil {
- return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
- }
- chunks = append(chunks, chunk)
- offset += int64(n)
- }
- if err == io.EOF {
- break
- }
- if err != nil {
- return nil, fmt.Errorf("read stream: %w", err)
- }
- }
- return chunks, nil
- }
- // createChunkFromData creates a chunk from streaming data
- func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
- // Assign new volume
- assignResult, err := scm.s3a.assignNewVolume(dstPath)
- if err != nil {
- return nil, fmt.Errorf("assign volume: %w", err)
- }
- // Create chunk
- chunk := &filer_pb.FileChunk{
- Offset: offset,
- Size: uint64(len(data)),
- }
- // Set file ID
- if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
- return nil, err
- }
- // Upload data
- if err := scm.s3a.uploadChunkData(data, assignResult); err != nil {
- return nil, fmt.Errorf("upload chunk data: %w", err)
- }
- return chunk, nil
- }
- // createMultiChunkReader creates a reader that streams from multiple chunks
- func (s3a *S3ApiServer) createMultiChunkReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
- // Create a multi-reader that combines all chunks
- var readers []io.Reader
- for _, chunk := range entry.GetChunks() {
- chunkReader, err := s3a.createChunkReader(chunk)
- if err != nil {
- return nil, fmt.Errorf("create chunk reader: %w", err)
- }
- readers = append(readers, chunkReader)
- }
- multiReader := io.MultiReader(readers...)
- return &multiReadCloser{reader: multiReader}, nil
- }
- // createChunkReader creates a reader for a single chunk
- func (s3a *S3ApiServer) createChunkReader(chunk *filer_pb.FileChunk) (io.Reader, error) {
- // Get chunk URL
- srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
- if err != nil {
- return nil, fmt.Errorf("lookup volume URL: %w", err)
- }
- // Create HTTP request for chunk data
- req, err := http.NewRequest("GET", srcUrl, nil)
- if err != nil {
- return nil, fmt.Errorf("create HTTP request: %w", err)
- }
- // Execute request
- resp, err := http.DefaultClient.Do(req)
- if err != nil {
- return nil, fmt.Errorf("execute HTTP request: %w", err)
- }
- if resp.StatusCode != http.StatusOK {
- resp.Body.Close()
- return nil, fmt.Errorf("HTTP request failed: %d", resp.StatusCode)
- }
- return resp.Body, nil
- }
- // multiReadCloser wraps a multi-reader with a close method
- type multiReadCloser struct {
- reader io.Reader
- }
- func (mrc *multiReadCloser) Read(p []byte) (int, error) {
- return mrc.reader.Read(p)
- }
- func (mrc *multiReadCloser) Close() error {
- return nil
- }
|