reader_cache.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. package filer
  2. import (
  3. "context"
  4. "fmt"
  5. "sync"
  6. "sync/atomic"
  7. "time"
  8. "github.com/seaweedfs/seaweedfs/weed/glog"
  9. "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
  10. util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
  11. "github.com/seaweedfs/seaweedfs/weed/util/mem"
  12. "github.com/seaweedfs/seaweedfs/weed/wdclient"
  13. )
  14. type ReaderCache struct {
  15. chunkCache chunk_cache.ChunkCache
  16. lookupFileIdFn wdclient.LookupFileIdFunctionType
  17. sync.Mutex
  18. downloaders map[string]*SingleChunkCacher
  19. limit int
  20. }
  21. type SingleChunkCacher struct {
  22. completedTimeNew int64
  23. sync.Mutex
  24. parent *ReaderCache
  25. chunkFileId string
  26. data []byte
  27. err error
  28. cipherKey []byte
  29. isGzipped bool
  30. chunkSize int
  31. shouldCache bool
  32. wg sync.WaitGroup
  33. cacheStartedCh chan struct{}
  34. }
  35. func NewReaderCache(limit int, chunkCache chunk_cache.ChunkCache, lookupFileIdFn wdclient.LookupFileIdFunctionType) *ReaderCache {
  36. return &ReaderCache{
  37. limit: limit,
  38. chunkCache: chunkCache,
  39. lookupFileIdFn: lookupFileIdFn,
  40. downloaders: make(map[string]*SingleChunkCacher),
  41. }
  42. }
  43. func (rc *ReaderCache) MaybeCache(chunkViews *Interval[*ChunkView]) {
  44. if rc.lookupFileIdFn == nil {
  45. return
  46. }
  47. rc.Lock()
  48. defer rc.Unlock()
  49. if len(rc.downloaders) >= rc.limit {
  50. return
  51. }
  52. for x := chunkViews; x != nil; x = x.Next {
  53. chunkView := x.Value
  54. if _, found := rc.downloaders[chunkView.FileId]; found {
  55. continue
  56. }
  57. if rc.chunkCache.IsInCache(chunkView.FileId, true) {
  58. glog.V(4).Infof("%s is in cache", chunkView.FileId)
  59. continue
  60. }
  61. if len(rc.downloaders) >= rc.limit {
  62. // abort when slots are filled
  63. return
  64. }
  65. // glog.V(4).Infof("prefetch %s offset %d", chunkView.FileId, chunkView.ViewOffset)
  66. // cache this chunk if not yet
  67. shouldCache := (uint64(chunkView.ViewOffset) + chunkView.ChunkSize) <= rc.chunkCache.GetMaxFilePartSizeInCache()
  68. cacher := newSingleChunkCacher(rc, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int(chunkView.ChunkSize), shouldCache)
  69. go cacher.startCaching()
  70. <-cacher.cacheStartedCh
  71. rc.downloaders[chunkView.FileId] = cacher
  72. }
  73. return
  74. }
  75. func (rc *ReaderCache) ReadChunkAt(buffer []byte, fileId string, cipherKey []byte, isGzipped bool, offset int64, chunkSize int, shouldCache bool) (int, error) {
  76. rc.Lock()
  77. if cacher, found := rc.downloaders[fileId]; found {
  78. if n, err := cacher.readChunkAt(buffer, offset); n != 0 && err == nil {
  79. rc.Unlock()
  80. return n, err
  81. }
  82. }
  83. if shouldCache || rc.lookupFileIdFn == nil {
  84. n, err := rc.chunkCache.ReadChunkAt(buffer, fileId, uint64(offset))
  85. if n > 0 {
  86. rc.Unlock()
  87. return n, err
  88. }
  89. }
  90. // clean up old downloaders
  91. if len(rc.downloaders) >= rc.limit {
  92. oldestFid, oldestTime := "", time.Now().UnixNano()
  93. for fid, downloader := range rc.downloaders {
  94. completedTime := atomic.LoadInt64(&downloader.completedTimeNew)
  95. if completedTime > 0 && completedTime < oldestTime {
  96. oldestFid, oldestTime = fid, completedTime
  97. }
  98. }
  99. if oldestFid != "" {
  100. oldDownloader := rc.downloaders[oldestFid]
  101. delete(rc.downloaders, oldestFid)
  102. oldDownloader.destroy()
  103. }
  104. }
  105. // glog.V(4).Infof("cache1 %s", fileId)
  106. cacher := newSingleChunkCacher(rc, fileId, cipherKey, isGzipped, chunkSize, shouldCache)
  107. go cacher.startCaching()
  108. <-cacher.cacheStartedCh
  109. rc.downloaders[fileId] = cacher
  110. rc.Unlock()
  111. return cacher.readChunkAt(buffer, offset)
  112. }
  113. func (rc *ReaderCache) UnCache(fileId string) {
  114. rc.Lock()
  115. defer rc.Unlock()
  116. // glog.V(4).Infof("uncache %s", fileId)
  117. if downloader, found := rc.downloaders[fileId]; found {
  118. downloader.destroy()
  119. delete(rc.downloaders, fileId)
  120. }
  121. }
  122. func (rc *ReaderCache) destroy() {
  123. rc.Lock()
  124. defer rc.Unlock()
  125. for _, downloader := range rc.downloaders {
  126. downloader.destroy()
  127. }
  128. }
  129. func newSingleChunkCacher(parent *ReaderCache, fileId string, cipherKey []byte, isGzipped bool, chunkSize int, shouldCache bool) *SingleChunkCacher {
  130. return &SingleChunkCacher{
  131. parent: parent,
  132. chunkFileId: fileId,
  133. cipherKey: cipherKey,
  134. isGzipped: isGzipped,
  135. chunkSize: chunkSize,
  136. shouldCache: shouldCache,
  137. cacheStartedCh: make(chan struct{}),
  138. }
  139. }
  140. func (s *SingleChunkCacher) startCaching() {
  141. s.wg.Add(1)
  142. defer s.wg.Done()
  143. s.Lock()
  144. defer s.Unlock()
  145. s.cacheStartedCh <- struct{}{} // means this has been started
  146. urlStrings, err := s.parent.lookupFileIdFn(context.Background(), s.chunkFileId)
  147. if err != nil {
  148. s.err = fmt.Errorf("operation LookupFileId %s failed, err: %v", s.chunkFileId, err)
  149. return
  150. }
  151. s.data = mem.Allocate(s.chunkSize)
  152. _, s.err = util_http.RetriedFetchChunkData(context.Background(), s.data, urlStrings, s.cipherKey, s.isGzipped, true, 0, s.chunkFileId)
  153. if s.err != nil {
  154. mem.Free(s.data)
  155. s.data = nil
  156. return
  157. }
  158. if s.shouldCache {
  159. s.parent.chunkCache.SetChunk(s.chunkFileId, s.data)
  160. }
  161. atomic.StoreInt64(&s.completedTimeNew, time.Now().UnixNano())
  162. return
  163. }
  164. func (s *SingleChunkCacher) destroy() {
  165. // wait for all reads to finish before destroying the data
  166. s.wg.Wait()
  167. s.Lock()
  168. defer s.Unlock()
  169. if s.data != nil {
  170. mem.Free(s.data)
  171. s.data = nil
  172. close(s.cacheStartedCh)
  173. }
  174. }
  175. func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) {
  176. s.wg.Add(1)
  177. defer s.wg.Done()
  178. s.Lock()
  179. defer s.Unlock()
  180. if s.err != nil {
  181. return 0, s.err
  182. }
  183. if len(s.data) <= int(offset) {
  184. return 0, nil
  185. }
  186. return copy(buf, s.data[offset:]), nil
  187. }