weedfs_file_copy_range.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package mount
  2. import (
  3. "net/http"
  4. "time"
  5. "github.com/hanwen/go-fuse/v2/fuse"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/util"
  8. )
  9. // CopyFileRange copies data from one file to another from and to specified offsets.
  10. //
  11. // See https://man7.org/linux/man-pages/man2/copy_file_range.2.html
  12. // See https://github.com/libfuse/libfuse/commit/fe4f9428fc403fa8b99051f52d84ea5bd13f3855
  13. /**
  14. * Copy a range of data from one file to another
  15. *
  16. * Niels de Vos: • libfuse: add copy_file_range() support
  17. *
  18. * Performs an optimized copy between two file descriptors without the
  19. * additional cost of transferring data through the FUSE kernel module
  20. * to user space (glibc) and then back into the FUSE filesystem again.
  21. *
  22. * In case this method is not implemented, applications are expected to
  23. * fall back to a regular file copy. (Some glibc versions did this
  24. * emulation automatically, but the emulation has been removed from all
  25. * glibc release branches.)
  26. */
  27. func (wfs *WFS) CopyFileRange(cancel <-chan struct{}, in *fuse.CopyFileRangeIn) (written uint32, code fuse.Status) {
  28. // flags must equal 0 for this syscall as of now
  29. if in.Flags != 0 {
  30. return 0, fuse.EINVAL
  31. }
  32. // files must exist
  33. fhOut := wfs.GetHandle(FileHandleId(in.FhOut))
  34. if fhOut == nil {
  35. return 0, fuse.EBADF
  36. }
  37. fhIn := wfs.GetHandle(FileHandleId(in.FhIn))
  38. if fhIn == nil {
  39. return 0, fuse.EBADF
  40. }
  41. // lock source and target file handles
  42. fhOutActiveLock := fhOut.wfs.fhLockTable.AcquireLock("CopyFileRange", fhOut.fh, util.ExclusiveLock)
  43. defer fhOut.wfs.fhLockTable.ReleaseLock(fhOut.fh, fhOutActiveLock)
  44. if fhOut.entry == nil {
  45. return 0, fuse.ENOENT
  46. }
  47. if fhIn.fh != fhOut.fh {
  48. fhInActiveLock := fhIn.wfs.fhLockTable.AcquireLock("CopyFileRange", fhIn.fh, util.SharedLock)
  49. defer fhIn.wfs.fhLockTable.ReleaseLock(fhIn.fh, fhInActiveLock)
  50. }
  51. // directories are not supported
  52. if fhIn.entry.IsDirectory || fhOut.entry.IsDirectory {
  53. return 0, fuse.EISDIR
  54. }
  55. glog.V(4).Infof(
  56. "CopyFileRange %s fhIn %d -> %s fhOut %d, [%d,%d) -> [%d,%d)",
  57. fhIn.FullPath(), fhIn.fh,
  58. fhOut.FullPath(), fhOut.fh,
  59. in.OffIn, in.OffIn+in.Len,
  60. in.OffOut, in.OffOut+in.Len,
  61. )
  62. // Concurrent copy operations could allocate too much memory, so we want to
  63. // throttle our concurrency, scaling with the number of writers the mount
  64. // was configured with.
  65. if wfs.concurrentCopiersSem != nil {
  66. wfs.concurrentCopiersSem <- struct{}{}
  67. defer func() { <-wfs.concurrentCopiersSem }()
  68. }
  69. // We want to stream the copy operation to avoid allocating massive buffers.
  70. nowUnixNano := time.Now().UnixNano()
  71. totalCopied := int64(0)
  72. buff := wfs.copyBufferPool.Get().([]byte)
  73. defer wfs.copyBufferPool.Put(buff)
  74. for {
  75. // Comply with cancellation as best as we can, given that the underlying
  76. // IO functions aren't cancellation-aware.
  77. select {
  78. case <-cancel:
  79. glog.Warningf("canceled CopyFileRange for %s (copied %d)",
  80. fhIn.FullPath(), totalCopied)
  81. return uint32(totalCopied), fuse.EINTR
  82. default: // keep going
  83. }
  84. // We can save one IO by breaking early if we already know the next read
  85. // will result in zero bytes.
  86. remaining := int64(in.Len) - totalCopied
  87. readLen := min(remaining, int64(len(buff)))
  88. if readLen == 0 {
  89. break
  90. }
  91. // Perform the read
  92. offsetIn := totalCopied + int64(in.OffIn)
  93. numBytesRead, err := readDataByFileHandle(
  94. buff[:readLen], fhIn, offsetIn)
  95. if err != nil {
  96. glog.Warningf("file handle read %s %d (total %d): %v",
  97. fhIn.FullPath(), numBytesRead, totalCopied, err)
  98. return 0, fuse.EIO
  99. }
  100. // Break if we're done copying (no more bytes to read)
  101. if numBytesRead == 0 {
  102. break
  103. }
  104. offsetOut := int64(in.OffOut) + totalCopied
  105. // Detect mime type only during the beginning of our stream, since
  106. // DetectContentType is expecting some of the first 512 bytes of the
  107. // file. See [http.DetectContentType] for details.
  108. if offsetOut <= 512 {
  109. fhOut.contentType = http.DetectContentType(buff[:numBytesRead])
  110. }
  111. // Perform the write
  112. fhOut.dirtyPages.writerPattern.MonitorWriteAt(offsetOut, int(numBytesRead))
  113. fhOut.dirtyPages.AddPage(
  114. offsetOut,
  115. buff[:numBytesRead],
  116. fhOut.dirtyPages.writerPattern.IsSequentialMode(),
  117. nowUnixNano)
  118. // Accumulate for the next loop iteration
  119. totalCopied += numBytesRead
  120. }
  121. if totalCopied == 0 {
  122. return 0, fuse.OK
  123. }
  124. fhOut.entry.Attributes.FileSize = uint64(max(
  125. totalCopied+int64(in.OffOut),
  126. int64(fhOut.entry.Attributes.FileSize),
  127. ))
  128. fhOut.entry.Content = nil
  129. fhOut.dirtyMetadata = true
  130. written = uint32(totalCopied)
  131. return written, fuse.OK
  132. }