s3api_streaming_copy.go 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561
  1. package s3api
  2. import (
  3. "context"
  4. "crypto/md5"
  5. "crypto/sha256"
  6. "encoding/hex"
  7. "fmt"
  8. "hash"
  9. "io"
  10. "net/http"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  13. "github.com/seaweedfs/seaweedfs/weed/util"
  14. )
  15. // StreamingCopySpec defines the specification for streaming copy operations
  16. type StreamingCopySpec struct {
  17. SourceReader io.Reader
  18. TargetSize int64
  19. EncryptionSpec *EncryptionSpec
  20. CompressionSpec *CompressionSpec
  21. HashCalculation bool
  22. BufferSize int
  23. }
  24. // EncryptionSpec defines encryption parameters for streaming
  25. type EncryptionSpec struct {
  26. NeedsDecryption bool
  27. NeedsEncryption bool
  28. SourceKey interface{} // SSECustomerKey or SSEKMSKey
  29. DestinationKey interface{} // SSECustomerKey or SSEKMSKey
  30. SourceType EncryptionType
  31. DestinationType EncryptionType
  32. SourceMetadata map[string][]byte // Source metadata for IV extraction
  33. DestinationIV []byte // Generated IV for destination
  34. }
  35. // CompressionSpec defines compression parameters for streaming
  36. type CompressionSpec struct {
  37. IsCompressed bool
  38. CompressionType string
  39. NeedsDecompression bool
  40. NeedsCompression bool
  41. }
  42. // StreamingCopyManager handles streaming copy operations
  43. type StreamingCopyManager struct {
  44. s3a *S3ApiServer
  45. bufferSize int
  46. }
  47. // NewStreamingCopyManager creates a new streaming copy manager
  48. func NewStreamingCopyManager(s3a *S3ApiServer) *StreamingCopyManager {
  49. return &StreamingCopyManager{
  50. s3a: s3a,
  51. bufferSize: 64 * 1024, // 64KB default buffer
  52. }
  53. }
  54. // ExecuteStreamingCopy performs a streaming copy operation
  55. func (scm *StreamingCopyManager) ExecuteStreamingCopy(ctx context.Context, entry *filer_pb.Entry, r *http.Request, dstPath string, state *EncryptionState) ([]*filer_pb.FileChunk, error) {
  56. // Create streaming copy specification
  57. spec, err := scm.createStreamingSpec(entry, r, state)
  58. if err != nil {
  59. return nil, fmt.Errorf("create streaming spec: %w", err)
  60. }
  61. // Create source reader from entry
  62. sourceReader, err := scm.createSourceReader(entry)
  63. if err != nil {
  64. return nil, fmt.Errorf("create source reader: %w", err)
  65. }
  66. defer sourceReader.Close()
  67. spec.SourceReader = sourceReader
  68. // Create processing pipeline
  69. processedReader, err := scm.createProcessingPipeline(spec)
  70. if err != nil {
  71. return nil, fmt.Errorf("create processing pipeline: %w", err)
  72. }
  73. // Stream to destination
  74. return scm.streamToDestination(ctx, processedReader, spec, dstPath)
  75. }
  76. // createStreamingSpec creates a streaming specification based on copy parameters
  77. func (scm *StreamingCopyManager) createStreamingSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*StreamingCopySpec, error) {
  78. spec := &StreamingCopySpec{
  79. BufferSize: scm.bufferSize,
  80. HashCalculation: true,
  81. }
  82. // Calculate target size
  83. sizeCalc := NewCopySizeCalculator(entry, r)
  84. spec.TargetSize = sizeCalc.CalculateTargetSize()
  85. // Create encryption specification
  86. encSpec, err := scm.createEncryptionSpec(entry, r, state)
  87. if err != nil {
  88. return nil, err
  89. }
  90. spec.EncryptionSpec = encSpec
  91. // Create compression specification
  92. spec.CompressionSpec = scm.createCompressionSpec(entry, r)
  93. return spec, nil
  94. }
  95. // createEncryptionSpec creates encryption specification for streaming
  96. func (scm *StreamingCopyManager) createEncryptionSpec(entry *filer_pb.Entry, r *http.Request, state *EncryptionState) (*EncryptionSpec, error) {
  97. spec := &EncryptionSpec{
  98. NeedsDecryption: state.IsSourceEncrypted(),
  99. NeedsEncryption: state.IsTargetEncrypted(),
  100. SourceMetadata: entry.Extended, // Pass source metadata for IV extraction
  101. }
  102. // Set source encryption details
  103. if state.SrcSSEC {
  104. spec.SourceType = EncryptionTypeSSEC
  105. sourceKey, err := ParseSSECCopySourceHeaders(r)
  106. if err != nil {
  107. return nil, fmt.Errorf("parse SSE-C copy source headers: %w", err)
  108. }
  109. spec.SourceKey = sourceKey
  110. } else if state.SrcSSEKMS {
  111. spec.SourceType = EncryptionTypeSSEKMS
  112. // Extract SSE-KMS key from metadata
  113. if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSEKMSKey]; exists {
  114. sseKey, err := DeserializeSSEKMSMetadata(keyData)
  115. if err != nil {
  116. return nil, fmt.Errorf("deserialize SSE-KMS metadata: %w", err)
  117. }
  118. spec.SourceKey = sseKey
  119. }
  120. } else if state.SrcSSES3 {
  121. spec.SourceType = EncryptionTypeSSES3
  122. // Extract SSE-S3 key from metadata
  123. if keyData, exists := entry.Extended[s3_constants.SeaweedFSSSES3Key]; exists {
  124. // TODO: This should use a proper SSE-S3 key manager from S3ApiServer
  125. // For now, create a temporary key manager to handle deserialization
  126. tempKeyManager := NewSSES3KeyManager()
  127. sseKey, err := DeserializeSSES3Metadata(keyData, tempKeyManager)
  128. if err != nil {
  129. return nil, fmt.Errorf("deserialize SSE-S3 metadata: %w", err)
  130. }
  131. spec.SourceKey = sseKey
  132. }
  133. }
  134. // Set destination encryption details
  135. if state.DstSSEC {
  136. spec.DestinationType = EncryptionTypeSSEC
  137. destKey, err := ParseSSECHeaders(r)
  138. if err != nil {
  139. return nil, fmt.Errorf("parse SSE-C headers: %w", err)
  140. }
  141. spec.DestinationKey = destKey
  142. } else if state.DstSSEKMS {
  143. spec.DestinationType = EncryptionTypeSSEKMS
  144. // Parse KMS parameters
  145. keyID, encryptionContext, bucketKeyEnabled, err := ParseSSEKMSCopyHeaders(r)
  146. if err != nil {
  147. return nil, fmt.Errorf("parse SSE-KMS copy headers: %w", err)
  148. }
  149. // Create SSE-KMS key for destination
  150. sseKey := &SSEKMSKey{
  151. KeyID: keyID,
  152. EncryptionContext: encryptionContext,
  153. BucketKeyEnabled: bucketKeyEnabled,
  154. }
  155. spec.DestinationKey = sseKey
  156. } else if state.DstSSES3 {
  157. spec.DestinationType = EncryptionTypeSSES3
  158. // Generate or retrieve SSE-S3 key
  159. keyManager := GetSSES3KeyManager()
  160. sseKey, err := keyManager.GetOrCreateKey("")
  161. if err != nil {
  162. return nil, fmt.Errorf("get SSE-S3 key: %w", err)
  163. }
  164. spec.DestinationKey = sseKey
  165. }
  166. return spec, nil
  167. }
  168. // createCompressionSpec creates compression specification for streaming
  169. func (scm *StreamingCopyManager) createCompressionSpec(entry *filer_pb.Entry, r *http.Request) *CompressionSpec {
  170. return &CompressionSpec{
  171. IsCompressed: isCompressedEntry(entry),
  172. // For now, we don't change compression during copy
  173. NeedsDecompression: false,
  174. NeedsCompression: false,
  175. }
  176. }
  177. // createSourceReader creates a reader for the source entry
  178. func (scm *StreamingCopyManager) createSourceReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
  179. // Create a multi-chunk reader that streams from all chunks
  180. return scm.s3a.createMultiChunkReader(entry)
  181. }
  182. // createProcessingPipeline creates a processing pipeline for the copy operation
  183. func (scm *StreamingCopyManager) createProcessingPipeline(spec *StreamingCopySpec) (io.Reader, error) {
  184. reader := spec.SourceReader
  185. // Add decryption if needed
  186. if spec.EncryptionSpec.NeedsDecryption {
  187. decryptedReader, err := scm.createDecryptionReader(reader, spec.EncryptionSpec)
  188. if err != nil {
  189. return nil, fmt.Errorf("create decryption reader: %w", err)
  190. }
  191. reader = decryptedReader
  192. }
  193. // Add decompression if needed
  194. if spec.CompressionSpec.NeedsDecompression {
  195. decompressedReader, err := scm.createDecompressionReader(reader, spec.CompressionSpec)
  196. if err != nil {
  197. return nil, fmt.Errorf("create decompression reader: %w", err)
  198. }
  199. reader = decompressedReader
  200. }
  201. // Add compression if needed
  202. if spec.CompressionSpec.NeedsCompression {
  203. compressedReader, err := scm.createCompressionReader(reader, spec.CompressionSpec)
  204. if err != nil {
  205. return nil, fmt.Errorf("create compression reader: %w", err)
  206. }
  207. reader = compressedReader
  208. }
  209. // Add encryption if needed
  210. if spec.EncryptionSpec.NeedsEncryption {
  211. encryptedReader, err := scm.createEncryptionReader(reader, spec.EncryptionSpec)
  212. if err != nil {
  213. return nil, fmt.Errorf("create encryption reader: %w", err)
  214. }
  215. reader = encryptedReader
  216. }
  217. // Add hash calculation if needed
  218. if spec.HashCalculation {
  219. reader = scm.createHashReader(reader)
  220. }
  221. return reader, nil
  222. }
  223. // createDecryptionReader creates a decryption reader based on encryption type
  224. func (scm *StreamingCopyManager) createDecryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
  225. switch encSpec.SourceType {
  226. case EncryptionTypeSSEC:
  227. if sourceKey, ok := encSpec.SourceKey.(*SSECustomerKey); ok {
  228. // Get IV from metadata
  229. iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
  230. if err != nil {
  231. return nil, fmt.Errorf("get IV from metadata: %w", err)
  232. }
  233. return CreateSSECDecryptedReader(reader, sourceKey, iv)
  234. }
  235. return nil, fmt.Errorf("invalid SSE-C source key type")
  236. case EncryptionTypeSSEKMS:
  237. if sseKey, ok := encSpec.SourceKey.(*SSEKMSKey); ok {
  238. return CreateSSEKMSDecryptedReader(reader, sseKey)
  239. }
  240. return nil, fmt.Errorf("invalid SSE-KMS source key type")
  241. case EncryptionTypeSSES3:
  242. if sseKey, ok := encSpec.SourceKey.(*SSES3Key); ok {
  243. // Get IV from metadata
  244. iv, err := GetIVFromMetadata(encSpec.SourceMetadata)
  245. if err != nil {
  246. return nil, fmt.Errorf("get IV from metadata: %w", err)
  247. }
  248. return CreateSSES3DecryptedReader(reader, sseKey, iv)
  249. }
  250. return nil, fmt.Errorf("invalid SSE-S3 source key type")
  251. default:
  252. return reader, nil
  253. }
  254. }
  255. // createEncryptionReader creates an encryption reader based on encryption type
  256. func (scm *StreamingCopyManager) createEncryptionReader(reader io.Reader, encSpec *EncryptionSpec) (io.Reader, error) {
  257. switch encSpec.DestinationType {
  258. case EncryptionTypeSSEC:
  259. if destKey, ok := encSpec.DestinationKey.(*SSECustomerKey); ok {
  260. encryptedReader, iv, err := CreateSSECEncryptedReader(reader, destKey)
  261. if err != nil {
  262. return nil, err
  263. }
  264. // Store IV in destination metadata (this would need to be handled by caller)
  265. encSpec.DestinationIV = iv
  266. return encryptedReader, nil
  267. }
  268. return nil, fmt.Errorf("invalid SSE-C destination key type")
  269. case EncryptionTypeSSEKMS:
  270. if sseKey, ok := encSpec.DestinationKey.(*SSEKMSKey); ok {
  271. encryptedReader, updatedKey, err := CreateSSEKMSEncryptedReaderWithBucketKey(reader, sseKey.KeyID, sseKey.EncryptionContext, sseKey.BucketKeyEnabled)
  272. if err != nil {
  273. return nil, err
  274. }
  275. // Store IV from the updated key
  276. encSpec.DestinationIV = updatedKey.IV
  277. return encryptedReader, nil
  278. }
  279. return nil, fmt.Errorf("invalid SSE-KMS destination key type")
  280. case EncryptionTypeSSES3:
  281. if sseKey, ok := encSpec.DestinationKey.(*SSES3Key); ok {
  282. encryptedReader, iv, err := CreateSSES3EncryptedReader(reader, sseKey)
  283. if err != nil {
  284. return nil, err
  285. }
  286. // Store IV for metadata
  287. encSpec.DestinationIV = iv
  288. return encryptedReader, nil
  289. }
  290. return nil, fmt.Errorf("invalid SSE-S3 destination key type")
  291. default:
  292. return reader, nil
  293. }
  294. }
  295. // createDecompressionReader creates a decompression reader
  296. func (scm *StreamingCopyManager) createDecompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
  297. if !compSpec.NeedsDecompression {
  298. return reader, nil
  299. }
  300. switch compSpec.CompressionType {
  301. case "gzip":
  302. // Use SeaweedFS's streaming gzip decompression
  303. pr, pw := io.Pipe()
  304. go func() {
  305. defer pw.Close()
  306. _, err := util.GunzipStream(pw, reader)
  307. if err != nil {
  308. pw.CloseWithError(fmt.Errorf("gzip decompression failed: %v", err))
  309. }
  310. }()
  311. return pr, nil
  312. default:
  313. // Unknown compression type, return as-is
  314. return reader, nil
  315. }
  316. }
  317. // createCompressionReader creates a compression reader
  318. func (scm *StreamingCopyManager) createCompressionReader(reader io.Reader, compSpec *CompressionSpec) (io.Reader, error) {
  319. if !compSpec.NeedsCompression {
  320. return reader, nil
  321. }
  322. switch compSpec.CompressionType {
  323. case "gzip":
  324. // Use SeaweedFS's streaming gzip compression
  325. pr, pw := io.Pipe()
  326. go func() {
  327. defer pw.Close()
  328. _, err := util.GzipStream(pw, reader)
  329. if err != nil {
  330. pw.CloseWithError(fmt.Errorf("gzip compression failed: %v", err))
  331. }
  332. }()
  333. return pr, nil
  334. default:
  335. // Unknown compression type, return as-is
  336. return reader, nil
  337. }
  338. }
  339. // HashReader wraps an io.Reader to calculate MD5 and SHA256 hashes
  340. type HashReader struct {
  341. reader io.Reader
  342. md5Hash hash.Hash
  343. sha256Hash hash.Hash
  344. }
  345. // NewHashReader creates a new hash calculating reader
  346. func NewHashReader(reader io.Reader) *HashReader {
  347. return &HashReader{
  348. reader: reader,
  349. md5Hash: md5.New(),
  350. sha256Hash: sha256.New(),
  351. }
  352. }
  353. // Read implements io.Reader and calculates hashes as data flows through
  354. func (hr *HashReader) Read(p []byte) (n int, err error) {
  355. n, err = hr.reader.Read(p)
  356. if n > 0 {
  357. // Update both hashes with the data read
  358. hr.md5Hash.Write(p[:n])
  359. hr.sha256Hash.Write(p[:n])
  360. }
  361. return n, err
  362. }
  363. // MD5Sum returns the current MD5 hash
  364. func (hr *HashReader) MD5Sum() []byte {
  365. return hr.md5Hash.Sum(nil)
  366. }
  367. // SHA256Sum returns the current SHA256 hash
  368. func (hr *HashReader) SHA256Sum() []byte {
  369. return hr.sha256Hash.Sum(nil)
  370. }
  371. // MD5Hex returns the MD5 hash as a hex string
  372. func (hr *HashReader) MD5Hex() string {
  373. return hex.EncodeToString(hr.MD5Sum())
  374. }
  375. // SHA256Hex returns the SHA256 hash as a hex string
  376. func (hr *HashReader) SHA256Hex() string {
  377. return hex.EncodeToString(hr.SHA256Sum())
  378. }
  379. // createHashReader creates a hash calculation reader
  380. func (scm *StreamingCopyManager) createHashReader(reader io.Reader) io.Reader {
  381. return NewHashReader(reader)
  382. }
  383. // streamToDestination streams the processed data to the destination
  384. func (scm *StreamingCopyManager) streamToDestination(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
  385. // For now, we'll use the existing chunk-based approach
  386. // In a full implementation, this would stream directly to the destination
  387. // without creating intermediate chunks
  388. // This is a placeholder that converts back to chunk-based approach
  389. // A full streaming implementation would write directly to the destination
  390. return scm.streamToChunks(ctx, reader, spec, dstPath)
  391. }
  392. // streamToChunks converts streaming data back to chunks (temporary implementation)
  393. func (scm *StreamingCopyManager) streamToChunks(ctx context.Context, reader io.Reader, spec *StreamingCopySpec, dstPath string) ([]*filer_pb.FileChunk, error) {
  394. // This is a simplified implementation that reads the stream and creates chunks
  395. // A full implementation would be more sophisticated
  396. var chunks []*filer_pb.FileChunk
  397. buffer := make([]byte, spec.BufferSize)
  398. offset := int64(0)
  399. for {
  400. n, err := reader.Read(buffer)
  401. if n > 0 {
  402. // Create chunk for this data
  403. chunk, chunkErr := scm.createChunkFromData(buffer[:n], offset, dstPath)
  404. if chunkErr != nil {
  405. return nil, fmt.Errorf("create chunk from data: %w", chunkErr)
  406. }
  407. chunks = append(chunks, chunk)
  408. offset += int64(n)
  409. }
  410. if err == io.EOF {
  411. break
  412. }
  413. if err != nil {
  414. return nil, fmt.Errorf("read stream: %w", err)
  415. }
  416. }
  417. return chunks, nil
  418. }
  419. // createChunkFromData creates a chunk from streaming data
  420. func (scm *StreamingCopyManager) createChunkFromData(data []byte, offset int64, dstPath string) (*filer_pb.FileChunk, error) {
  421. // Assign new volume
  422. assignResult, err := scm.s3a.assignNewVolume(dstPath)
  423. if err != nil {
  424. return nil, fmt.Errorf("assign volume: %w", err)
  425. }
  426. // Create chunk
  427. chunk := &filer_pb.FileChunk{
  428. Offset: offset,
  429. Size: uint64(len(data)),
  430. }
  431. // Set file ID
  432. if err := scm.s3a.setChunkFileId(chunk, assignResult); err != nil {
  433. return nil, err
  434. }
  435. // Upload data
  436. if err := scm.s3a.uploadChunkData(data, assignResult); err != nil {
  437. return nil, fmt.Errorf("upload chunk data: %w", err)
  438. }
  439. return chunk, nil
  440. }
  441. // createMultiChunkReader creates a reader that streams from multiple chunks
  442. func (s3a *S3ApiServer) createMultiChunkReader(entry *filer_pb.Entry) (io.ReadCloser, error) {
  443. // Create a multi-reader that combines all chunks
  444. var readers []io.Reader
  445. for _, chunk := range entry.GetChunks() {
  446. chunkReader, err := s3a.createChunkReader(chunk)
  447. if err != nil {
  448. return nil, fmt.Errorf("create chunk reader: %w", err)
  449. }
  450. readers = append(readers, chunkReader)
  451. }
  452. multiReader := io.MultiReader(readers...)
  453. return &multiReadCloser{reader: multiReader}, nil
  454. }
  455. // createChunkReader creates a reader for a single chunk
  456. func (s3a *S3ApiServer) createChunkReader(chunk *filer_pb.FileChunk) (io.Reader, error) {
  457. // Get chunk URL
  458. srcUrl, err := s3a.lookupVolumeUrl(chunk.GetFileIdString())
  459. if err != nil {
  460. return nil, fmt.Errorf("lookup volume URL: %w", err)
  461. }
  462. // Create HTTP request for chunk data
  463. req, err := http.NewRequest("GET", srcUrl, nil)
  464. if err != nil {
  465. return nil, fmt.Errorf("create HTTP request: %w", err)
  466. }
  467. // Execute request
  468. resp, err := http.DefaultClient.Do(req)
  469. if err != nil {
  470. return nil, fmt.Errorf("execute HTTP request: %w", err)
  471. }
  472. if resp.StatusCode != http.StatusOK {
  473. resp.Body.Close()
  474. return nil, fmt.Errorf("HTTP request failed: %d", resp.StatusCode)
  475. }
  476. return resp.Body, nil
  477. }
  478. // multiReadCloser wraps a multi-reader with a close method
  479. type multiReadCloser struct {
  480. reader io.Reader
  481. }
  482. func (mrc *multiReadCloser) Read(p []byte) (int, error) {
  483. return mrc.reader.Read(p)
  484. }
  485. func (mrc *multiReadCloser) Close() error {
  486. return nil
  487. }