| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- package broker
- import (
- "context"
- "encoding/binary"
- "fmt"
- "os"
- "time"
- "github.com/seaweedfs/seaweedfs/weed/filer"
- "github.com/seaweedfs/seaweedfs/weed/glog"
- "github.com/seaweedfs/seaweedfs/weed/operation"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
- )
- func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error {
- return b.appendToFileWithBufferIndex(targetFile, data, 0)
- }
- func (b *MessageQueueBroker) appendToFileWithBufferIndex(targetFile string, data []byte, bufferIndex int64) error {
- fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data)
- if err2 != nil {
- return err2
- }
- // find out existing entry
- fullpath := util.FullPath(targetFile)
- dir, name := fullpath.DirAndName()
- entry, err := filer_pb.GetEntry(context.Background(), b, fullpath)
- var offset int64 = 0
- if err == filer_pb.ErrNotFound {
- entry = &filer_pb.Entry{
- Name: name,
- IsDirectory: false,
- Attributes: &filer_pb.FuseAttributes{
- Crtime: time.Now().Unix(),
- Mtime: time.Now().Unix(),
- FileMode: uint32(os.FileMode(0644)),
- Uid: uint32(os.Getuid()),
- Gid: uint32(os.Getgid()),
- },
- }
- // Add buffer start index for deduplication tracking (binary format)
- if bufferIndex != 0 {
- entry.Extended = make(map[string][]byte)
- bufferStartBytes := make([]byte, 8)
- binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
- entry.Extended["buffer_start"] = bufferStartBytes
- }
- } else if err != nil {
- return fmt.Errorf("find %s: %v", fullpath, err)
- } else {
- offset = int64(filer.TotalSize(entry.GetChunks()))
- // Verify buffer index continuity for existing files (append operations)
- if bufferIndex != 0 {
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
- // Check for existing buffer start (binary format)
- if existingData, exists := entry.Extended["buffer_start"]; exists {
- if len(existingData) == 8 {
- existingStartIndex := int64(binary.BigEndian.Uint64(existingData))
- // Verify that the new buffer index is consecutive
- // Expected index = start + number of existing chunks
- expectedIndex := existingStartIndex + int64(len(entry.GetChunks()))
- if bufferIndex != expectedIndex {
- // This shouldn't happen in normal operation
- // Log warning but continue (don't crash the system)
- glog.Warningf("non-consecutive buffer index for %s. Expected %d, got %d",
- fullpath, expectedIndex, bufferIndex)
- }
- // Note: We don't update the start index - it stays the same
- }
- } else {
- // No existing buffer start, create new one (shouldn't happen for existing files)
- bufferStartBytes := make([]byte, 8)
- binary.BigEndian.PutUint64(bufferStartBytes, uint64(bufferIndex))
- entry.Extended["buffer_start"] = bufferStartBytes
- }
- }
- }
- // append to existing chunks
- entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(fileId, offset, time.Now().UnixNano()))
- // update the entry
- return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- return filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{
- Directory: dir,
- Entry: entry,
- })
- })
- }
- func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) {
- reader := util.NewBytesReader(data)
- uploader, err := operation.NewUploader()
- if err != nil {
- return
- }
- fileId, uploadResult, err, _ = uploader.UploadWithRetry(
- b,
- &filer_pb.AssignVolumeRequest{
- Count: 1,
- Replication: b.option.DefaultReplication,
- Collection: "topics",
- // TtlSec: wfs.option.TtlSec,
- // DiskType: string(wfs.option.DiskType),
- DataCenter: b.option.DataCenter,
- Path: targetFile,
- },
- &operation.UploadOption{
- Cipher: b.option.Cipher,
- },
- func(host, fileId string) string {
- fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- if b.option.VolumeServerAccess == "filerProxy" {
- fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", b.currentFiler, fileId)
- }
- return fileUrl
- },
- reader,
- )
- return
- }
|