sftp_file_reader.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package sftpd
  2. import (
  3. "fmt"
  4. "io"
  5. "sync"
  6. "github.com/seaweedfs/seaweedfs/weed/filer"
  7. filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/sftpd/utils"
  9. )
  10. type SeaweedFileReaderAt struct {
  11. fs *SftpServer
  12. entry *filer_pb.Entry
  13. reader io.ReadSeeker
  14. mu sync.Mutex
  15. bufferSize int
  16. cache *utils.LruCache
  17. fileSize int64
  18. }
  19. func NewSeaweedFileReaderAt(fs *SftpServer, entry *filer_pb.Entry) *SeaweedFileReaderAt {
  20. return &SeaweedFileReaderAt{
  21. fs: fs,
  22. entry: entry,
  23. bufferSize: 5 * 1024 * 1024, // 5MB
  24. cache: utils.NewLRUCache(10), // Max 10 chunks = ~50MB
  25. fileSize: int64(entry.Attributes.FileSize),
  26. }
  27. }
  28. func (ra *SeaweedFileReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
  29. ra.mu.Lock()
  30. defer ra.mu.Unlock()
  31. if off >= ra.fileSize {
  32. return 0, io.EOF
  33. }
  34. remaining := len(p)
  35. readOffset := off
  36. totalRead := 0
  37. for remaining > 0 && readOffset < ra.fileSize {
  38. bufferKey := (readOffset / int64(ra.bufferSize)) * int64(ra.bufferSize)
  39. bufferOffset := int(readOffset - bufferKey)
  40. buffer, ok := ra.cache.Get(bufferKey)
  41. if !ok {
  42. readSize := ra.bufferSize
  43. if bufferKey+int64(readSize) > ra.fileSize {
  44. readSize = int(ra.fileSize - bufferKey)
  45. }
  46. if ra.reader == nil {
  47. r := filer.NewFileReader(ra.fs, ra.entry)
  48. if rs, ok := r.(io.ReadSeeker); ok {
  49. ra.reader = rs
  50. } else {
  51. return 0, fmt.Errorf("reader is not seekable")
  52. }
  53. }
  54. if _, err := ra.reader.Seek(bufferKey, io.SeekStart); err != nil {
  55. return 0, fmt.Errorf("seek error: %w", err)
  56. }
  57. buffer = make([]byte, readSize)
  58. readBytes, err := io.ReadFull(ra.reader, buffer)
  59. if err != nil && err != io.ErrUnexpectedEOF {
  60. return 0, fmt.Errorf("read error: %w", err)
  61. }
  62. buffer = buffer[:readBytes]
  63. ra.cache.Put(bufferKey, buffer)
  64. }
  65. toCopy := len(buffer) - bufferOffset
  66. if toCopy > remaining {
  67. toCopy = remaining
  68. }
  69. if toCopy <= 0 {
  70. break
  71. }
  72. copy(p[totalRead:], buffer[bufferOffset:bufferOffset+toCopy])
  73. totalRead += toCopy
  74. readOffset += int64(toCopy)
  75. remaining -= toCopy
  76. }
  77. if totalRead == 0 {
  78. return 0, io.EOF
  79. }
  80. if totalRead < len(p) {
  81. return totalRead, io.EOF
  82. }
  83. return totalRead, nil
  84. }