filer_server_handlers_write_merge.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. package weed_server
  2. import (
  3. "context"
  4. "io"
  5. "math"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/operation"
  9. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  10. "github.com/seaweedfs/seaweedfs/weed/stats"
  11. )
  12. const MergeChunkMinCount int = 1000
  13. func (fs *FilerServer) maybeMergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
  14. // Don't merge SSE-encrypted chunks to preserve per-chunk metadata
  15. for _, chunk := range inputChunks {
  16. if chunk.GetSseType() != 0 { // Any SSE type (SSE-C or SSE-KMS)
  17. glog.V(3).InfofCtx(ctx, "Skipping chunk merge for SSE-encrypted chunks")
  18. return inputChunks, nil
  19. }
  20. }
  21. // Only merge small chunks more than half of the file
  22. var chunkSize = fs.option.MaxMB * 1024 * 1024
  23. var smallChunk, sumChunk int
  24. var minOffset int64 = math.MaxInt64
  25. for _, chunk := range inputChunks {
  26. if chunk.IsChunkManifest {
  27. continue
  28. }
  29. if chunk.Size < uint64(chunkSize/2) {
  30. smallChunk++
  31. if chunk.Offset < minOffset {
  32. minOffset = chunk.Offset
  33. }
  34. }
  35. sumChunk++
  36. }
  37. if smallChunk < MergeChunkMinCount || smallChunk < sumChunk/2 {
  38. return inputChunks, nil
  39. }
  40. return fs.mergeChunks(ctx, so, inputChunks, minOffset)
  41. }
  42. func (fs *FilerServer) mergeChunks(ctx context.Context, so *operation.StorageOption, inputChunks []*filer_pb.FileChunk, chunkOffset int64) (mergedChunks []*filer_pb.FileChunk, mergeErr error) {
  43. chunkedFileReader := filer.NewChunkStreamReaderFromFiler(ctx, fs.filer.MasterClient, inputChunks)
  44. _, mergeErr = chunkedFileReader.Seek(chunkOffset, io.SeekCurrent)
  45. if mergeErr != nil {
  46. return nil, mergeErr
  47. }
  48. mergedChunks, _, _, mergeErr, _ = fs.uploadReaderToChunks(ctx, nil, chunkedFileReader, chunkOffset, int32(fs.option.MaxMB*1024*1024), "", "", true, so)
  49. if mergeErr != nil {
  50. return
  51. }
  52. stats.FilerHandlerCounter.WithLabelValues(stats.ChunkMerge).Inc()
  53. for _, chunk := range inputChunks {
  54. if chunk.Offset < chunkOffset || chunk.IsChunkManifest {
  55. mergedChunks = append(mergedChunks, chunk)
  56. }
  57. }
  58. garbage, err := filer.MinusChunks(ctx, fs.lookupFileId, inputChunks, mergedChunks)
  59. if err != nil {
  60. glog.ErrorfCtx(ctx, "Failed to resolve old entry chunks when delete old entry chunks. new: %s, old: %s",
  61. mergedChunks, inputChunks)
  62. return mergedChunks, err
  63. }
  64. fs.filer.DeleteChunksNotRecursive(garbage)
  65. return
  66. }