filehandle.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. package mount
  2. import (
  3. "os"
  4. "sync"
  5. "github.com/seaweedfs/seaweedfs/weed/filer"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/util"
  9. )
  10. type FileHandleId uint64
  11. var IsDebugFileReadWrite = false
  12. type FileHandle struct {
  13. fh FileHandleId
  14. counter int64
  15. entry *LockedEntry
  16. entryLock sync.RWMutex
  17. entryChunkGroup *filer.ChunkGroup
  18. inode uint64
  19. wfs *WFS
  20. // cache file has been written to
  21. dirtyMetadata bool
  22. dirtyPages *PageWriter
  23. reader *filer.ChunkReadAt
  24. contentType string
  25. isDeleted bool
  26. // RDMA chunk offset cache for performance optimization
  27. chunkOffsetCache []int64
  28. chunkCacheValid bool
  29. chunkCacheLock sync.RWMutex
  30. // for debugging
  31. mirrorFile *os.File
  32. }
  33. func newFileHandle(wfs *WFS, handleId FileHandleId, inode uint64, entry *filer_pb.Entry) *FileHandle {
  34. fh := &FileHandle{
  35. fh: handleId,
  36. counter: 1,
  37. inode: inode,
  38. wfs: wfs,
  39. }
  40. // dirtyPages: newContinuousDirtyPages(file, writeOnly),
  41. fh.dirtyPages = newPageWriter(fh, wfs.option.ChunkSizeLimit)
  42. fh.entry = &LockedEntry{
  43. Entry: entry,
  44. }
  45. if entry != nil {
  46. fh.SetEntry(entry)
  47. }
  48. if IsDebugFileReadWrite {
  49. var err error
  50. fh.mirrorFile, err = os.OpenFile("/tmp/sw/"+entry.Name, os.O_RDWR|os.O_CREATE, 0600)
  51. if err != nil {
  52. println("failed to create mirror:", err.Error())
  53. }
  54. }
  55. return fh
  56. }
  57. func (fh *FileHandle) FullPath() util.FullPath {
  58. fp, _ := fh.wfs.inodeToPath.GetPath(fh.inode)
  59. return fp
  60. }
  61. func (fh *FileHandle) GetEntry() *LockedEntry {
  62. return fh.entry
  63. }
  64. func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
  65. if entry != nil {
  66. fileSize := filer.FileSize(entry)
  67. entry.Attributes.FileSize = fileSize
  68. var resolveManifestErr error
  69. fh.entryChunkGroup, resolveManifestErr = filer.NewChunkGroup(fh.wfs.LookupFn(), fh.wfs.chunkCache, entry.Chunks)
  70. if resolveManifestErr != nil {
  71. glog.Warningf("failed to resolve manifest chunks in %+v", entry)
  72. }
  73. } else {
  74. glog.Fatalf("setting file handle entry to nil")
  75. }
  76. fh.entry.SetEntry(entry)
  77. // Invalidate chunk offset cache since chunks may have changed
  78. fh.invalidateChunkCache()
  79. }
  80. func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
  81. result := fh.entry.UpdateEntry(fn)
  82. // Invalidate chunk offset cache since entry may have been modified
  83. fh.invalidateChunkCache()
  84. return result
  85. }
  86. func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
  87. fh.entry.AppendChunks(chunks)
  88. // Invalidate chunk offset cache since new chunks were added
  89. fh.invalidateChunkCache()
  90. }
  91. func (fh *FileHandle) ReleaseHandle() {
  92. fhActiveLock := fh.wfs.fhLockTable.AcquireLock("ReleaseHandle", fh.fh, util.ExclusiveLock)
  93. defer fh.wfs.fhLockTable.ReleaseLock(fh.fh, fhActiveLock)
  94. fh.dirtyPages.Destroy()
  95. if IsDebugFileReadWrite {
  96. fh.mirrorFile.Close()
  97. }
  98. }
  99. func lessThan(a, b *filer_pb.FileChunk) bool {
  100. if a.ModifiedTsNs == b.ModifiedTsNs {
  101. return a.Fid.FileKey < b.Fid.FileKey
  102. }
  103. return a.ModifiedTsNs < b.ModifiedTsNs
  104. }
  105. // getCumulativeOffsets returns cached cumulative offsets for chunks, computing them if necessary
  106. func (fh *FileHandle) getCumulativeOffsets(chunks []*filer_pb.FileChunk) []int64 {
  107. fh.chunkCacheLock.RLock()
  108. if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 {
  109. // Cache is valid and matches current chunk count
  110. result := make([]int64, len(fh.chunkOffsetCache))
  111. copy(result, fh.chunkOffsetCache)
  112. fh.chunkCacheLock.RUnlock()
  113. return result
  114. }
  115. fh.chunkCacheLock.RUnlock()
  116. // Need to compute/recompute cache
  117. fh.chunkCacheLock.Lock()
  118. defer fh.chunkCacheLock.Unlock()
  119. // Double-check in case another goroutine computed it while we waited for the lock
  120. if fh.chunkCacheValid && len(fh.chunkOffsetCache) == len(chunks)+1 {
  121. result := make([]int64, len(fh.chunkOffsetCache))
  122. copy(result, fh.chunkOffsetCache)
  123. return result
  124. }
  125. // Compute cumulative offsets
  126. cumulativeOffsets := make([]int64, len(chunks)+1)
  127. for i, chunk := range chunks {
  128. cumulativeOffsets[i+1] = cumulativeOffsets[i] + int64(chunk.Size)
  129. }
  130. // Cache the result
  131. fh.chunkOffsetCache = make([]int64, len(cumulativeOffsets))
  132. copy(fh.chunkOffsetCache, cumulativeOffsets)
  133. fh.chunkCacheValid = true
  134. return cumulativeOffsets
  135. }
  136. // invalidateChunkCache invalidates the chunk offset cache when chunks are modified
  137. func (fh *FileHandle) invalidateChunkCache() {
  138. fh.chunkCacheLock.Lock()
  139. fh.chunkCacheValid = false
  140. fh.chunkOffsetCache = nil
  141. fh.chunkCacheLock.Unlock()
  142. }