needle_write.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package needle
  2. import (
  3. "bytes"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/glog"
  6. "github.com/seaweedfs/seaweedfs/weed/storage/backend"
  7. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. "github.com/seaweedfs/seaweedfs/weed/util/buffer_pool"
  10. )
  11. func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset uint64, size Size, actualSize int64, err error) {
  12. end, _, e := w.GetStat()
  13. if e != nil {
  14. err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
  15. return
  16. }
  17. offset = uint64(end)
  18. if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
  19. err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
  20. return
  21. }
  22. bytesBuffer := buffer_pool.SyncPoolGetBuffer()
  23. defer func() {
  24. if err != nil {
  25. if te := w.Truncate(end); te != nil {
  26. // handle error or log
  27. }
  28. }
  29. buffer_pool.SyncPoolPutBuffer(bytesBuffer)
  30. }()
  31. size, actualSize, err = writeNeedleByVersion(version, n, offset, bytesBuffer)
  32. if err != nil {
  33. return
  34. }
  35. _, err = w.WriteAt(bytesBuffer.Bytes(), int64(offset))
  36. if err != nil {
  37. err = fmt.Errorf("failed to write %d bytes to %s at offset %d: %w", actualSize, w.Name(), offset, err)
  38. }
  39. return offset, size, actualSize, err
  40. }
  41. func WriteNeedleBlob(w backend.BackendStorageFile, dataSlice []byte, size Size, appendAtNs uint64, version Version) (offset uint64, err error) {
  42. if end, _, e := w.GetStat(); e == nil {
  43. defer func(w backend.BackendStorageFile, off int64) {
  44. if err != nil {
  45. if te := w.Truncate(end); te != nil {
  46. glog.V(0).Infof("Failed to truncate %s back to %d with error: %v", w.Name(), end, te)
  47. }
  48. }
  49. }(w, end)
  50. offset = uint64(end)
  51. } else {
  52. err = fmt.Errorf("Cannot Read Current Volume Position: %v", e)
  53. return
  54. }
  55. if version == Version3 {
  56. tsOffset := NeedleHeaderSize + size + NeedleChecksumSize
  57. util.Uint64toBytes(dataSlice[tsOffset:tsOffset+TimestampSize], appendAtNs)
  58. }
  59. if err == nil {
  60. _, err = w.WriteAt(dataSlice, int64(offset))
  61. }
  62. return
  63. }
  64. // prepareNeedleWrite encapsulates the common beginning logic for all versioned writeNeedle functions.
  65. func prepareNeedleWrite(w backend.BackendStorageFile, n *Needle) (offset uint64, bytesBuffer *bytes.Buffer, cleanup func(err error), err error) {
  66. end, _, e := w.GetStat()
  67. if e != nil {
  68. err = fmt.Errorf("Cannot Read Current Volume Position: %w", e)
  69. return
  70. }
  71. offset = uint64(end)
  72. if offset >= MaxPossibleVolumeSize && len(n.Data) != 0 {
  73. err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize)
  74. return
  75. }
  76. bytesBuffer = buffer_pool.SyncPoolGetBuffer()
  77. cleanup = func(err error) {
  78. if err != nil {
  79. if te := w.Truncate(end); te != nil {
  80. // handle error or log
  81. }
  82. }
  83. buffer_pool.SyncPoolPutBuffer(bytesBuffer)
  84. }
  85. return
  86. }