needle_read_page.go 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. package needle
  2. import (
  3. "fmt"
  4. "io"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  7. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. // ReadNeedleData uses a needle without n.Data to read the content
  11. // volumeOffset: the offset within the volume
  12. // needleOffset: the offset within the needle Data
  13. func (n *Needle) ReadNeedleData(r backend.BackendStorageFile, volumeOffset int64, data []byte, needleOffset int64) (count int, err error) {
  14. sizeToRead := min(int64(len(data)), int64(n.DataSize)-needleOffset)
  15. if sizeToRead <= 0 {
  16. return 0, io.EOF
  17. }
  18. startOffset := volumeOffset + NeedleHeaderSize + DataSizeSize + needleOffset
  19. count, err = r.ReadAt(data[:sizeToRead], startOffset)
  20. if err == io.EOF && int64(count) == sizeToRead {
  21. err = nil
  22. }
  23. if err != nil {
  24. fileSize, _, _ := r.GetStat()
  25. glog.Errorf("%s read %d %d size %d at offset %d fileSize %d: %v", r.Name(), n.Id, needleOffset, sizeToRead, volumeOffset, fileSize, err)
  26. }
  27. return
  28. }
  29. // ReadNeedleMeta fills all metadata except the n.Data
  30. func (n *Needle) ReadNeedleMeta(r backend.BackendStorageFile, offset int64, size Size, version Version) (err error) {
  31. defer func() {
  32. if r := recover(); r != nil {
  33. err = fmt.Errorf("panic occurred: %+v", r)
  34. }
  35. }()
  36. bytes := make([]byte, NeedleHeaderSize+DataSizeSize)
  37. count, err := r.ReadAt(bytes, offset)
  38. if err == io.EOF && count == NeedleHeaderSize+DataSizeSize {
  39. err = nil
  40. }
  41. if count != NeedleHeaderSize+DataSizeSize || err != nil {
  42. return err
  43. }
  44. n.ParseNeedleHeader(bytes)
  45. if n.Size != size {
  46. if OffsetSize == 4 && offset < int64(MaxPossibleVolumeSize) {
  47. return ErrorSizeMismatch
  48. }
  49. }
  50. n.DataSize = util.BytesToUint32(bytes[NeedleHeaderSize : NeedleHeaderSize+DataSizeSize])
  51. startOffset := offset + NeedleHeaderSize
  52. if size.IsValid() {
  53. startOffset = offset + NeedleHeaderSize + DataSizeSize + int64(n.DataSize)
  54. }
  55. dataSize := GetActualSize(size, version)
  56. stopOffset := offset + dataSize
  57. metaSize := stopOffset - startOffset
  58. metaSlice := make([]byte, int(metaSize))
  59. count, err = r.ReadAt(metaSlice, startOffset)
  60. if err != nil && int64(count) == metaSize {
  61. err = nil
  62. }
  63. if err != nil {
  64. return err
  65. }
  66. var index int
  67. if size.IsValid() {
  68. index, err = n.readNeedleDataVersion2NonData(metaSlice)
  69. }
  70. err = n.readNeedleTail(metaSlice[index:], version)
  71. return err
  72. }
  73. func min(x, y int64) int64 {
  74. if x < y {
  75. return x
  76. }
  77. return y
  78. }