broker_write.go 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. package broker
  2. import (
  3. "context"
  4. "encoding/binary"
  5. "fmt"
  6. "os"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/filer"
  9. "github.com/seaweedfs/seaweedfs/weed/glog"
  10. "github.com/seaweedfs/seaweedfs/weed/operation"
  11. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  12. "github.com/seaweedfs/seaweedfs/weed/util"
  13. )
  14. func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error {
  15. return b.appendToFileWithBufferIndex(targetFile, data, 0)
  16. }
  17. func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferIndex int64) error {
  18. fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data)
  19. if err2 != nil {
  20. return err2
  21. }
  22. // find out existing entry
  23. fullpath := util.FullPath(targetFile)
  24. dir, name := fullpath.DirAndName()
  25. entry, err := filer_pb.GetEntry(context.Background(), b, fullpath)
  26. var offset int64 = 0
  27. if err == filer_pb.ErrNotFound {
  28. entry = &filer_pb.Entry{
  29. Name: name,
  30. IsDirectory: false,
  31. Attributes: &filer_pb.FuseAttributes{
  32. Crtime: time.Now().Unix(),
  33. Mtime: time.Now().Unix(),
  34. FileMode: uint32(os.FileMode(0644)),
  35. Uid: uint32(os.Getuid()),
  36. Gid: uint32(os.Getgid()),
  37. },
  38. }
  39. // Add buffer start index for deduplication tracking (binary format)
  40. if bufferIndex != 0 {
  41. entry.Extended = make(map[string][]byte)
  42. bufferStartBytes := make([]byte, 8)
  43. binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
  44. entry.Extended["buffer_start"] = bufferStartBytes
  45. }
  46. } else if err != nil {
  47. return fmt.Errorf("find %s: %v", fullpath, err)
  48. } else {
  49. offset = int64(filer.TotalSize(entry.GetChunks()))
  50. // Verify buffer index continuity for existing files (append operations)
  51. if bufferIndex != 0 {
  52. if entry.Extended == nil {
  53. entry.Extended = make(map[string][]byte)
  54. }
  55. // Check for existing buffer start (binary format)
  56. if existingData, exists := entry.Extended["buffer_start"]; exists {
  57. if len(existingData) == 8 {
  58. existingStartIndex := int64(binary.BigEndian.Uint64(existingData))
  59. // Verify that the new buffer index is consecutive
  60. // Expected index = start + number of existing chunks
  61. expectedIndex := existingStartIndex + int64(len(entry.GetChunks()))
  62. if bufferIndex != expectedIndex {
  63. // This shouldn't happen in normal operation
  64. // Log warning but continue (don't crash the system)
  65. glog.Warningf("non-consecutive buffer index for %s. Expected %d, got %d",
  66. fullpath, expectedIndex, bufferIndex)
  67. }
  68. // Note: We don't update the start index - it stays the same
  69. }
  70. } else {
  71. // No existing buffer start, create new one (shouldn't happen for existing files)
  72. bufferStartBytes := make([]byte, 8)
  73. binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
  74. entry.Extended["buffer_start"] = bufferStartBytes
  75. }
  76. }
  77. }
  78. // append to existing chunks
  79. entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(fileId, offset, time.Now().UnixNano()))
  80. // update the entry
  81. return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  82. return filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{
  83. Directory: dir,
  84. Entry: entry,
  85. })
  86. })
  87. }
  88. func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
  89. reader := util.NewBytesReader(data)
  90. uploader, err := operation.NewUploader()
  91. if err != nil {
  92. return
  93. }
  94. fileId, uploadResult, err, _ = uploader.UploadWithRetry(
  95. b,
  96. &filer_pb.AssignVolumeRequest{
  97. Count: 1,
  98. Replication: b.option.DefaultReplication,
  99. Collection: "topics",
  100. // TtlSec: wfs.option.TtlSec,
  101. // DiskType: string(wfs.option.DiskType),
  102. DataCenter: b.option.DataCenter,
  103. Path: targetFile,
  104. },
  105. &operation.UploadOption{
  106. Cipher: b.option.Cipher,
  107. },
  108. func(host, fileId string) string {
  109. fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
  110. if b.option.VolumeServerAccess == "filerProxy" {
  111. fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", b.currentFiler, fileId)
  112. }
  113. return fileUrl
  114. },
  115. reader,
  116. )
  117. return
  118. }