chunk_cache.go 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package chunk_cache
  2. import (
  3. "encoding/binary"
  4. "errors"
  5. "sync"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  9. )
  10. var ErrorOutOfBounds = errors.New("attempt to read out of bounds")
  11. const cacheHeaderSize = 8 // 4 bytes volumeId + 4 bytes cookie
  12. // parseCacheHeader extracts volume ID and cookie from the 8-byte cache header
  13. func parseCacheHeader(header []byte) (needle.VolumeId, types.Cookie) {
  14. volumeId := needle.VolumeId(binary.BigEndian.Uint32(header[0:4]))
  15. cookie := types.BytesToCookie(header[4:8])
  16. return volumeId, cookie
  17. }
  18. type ChunkCache interface {
  19. ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error)
  20. SetChunk(fileId string, data []byte)
  21. IsInCache(fileId string, lockNeeded bool) (answer bool)
  22. GetMaxFilePartSizeInCache() (answer uint64)
  23. }
  24. // a global cache for recently accessed file chunks
  25. type TieredChunkCache struct {
  26. memCache *ChunkCacheInMemory
  27. diskCaches []*OnDiskCacheLayer
  28. sync.RWMutex
  29. onDiskCacheSizeLimit0 uint64
  30. onDiskCacheSizeLimit1 uint64
  31. onDiskCacheSizeLimit2 uint64
  32. maxFilePartSizeInCache uint64
  33. }
  34. var _ ChunkCache = &TieredChunkCache{}
  35. func NewTieredChunkCache(maxEntries int64, dir string, diskSizeInUnit int64, unitSize int64) *TieredChunkCache {
  36. c := &TieredChunkCache{
  37. memCache: NewChunkCacheInMemory(maxEntries),
  38. }
  39. c.diskCaches = make([]*OnDiskCacheLayer, 3)
  40. c.onDiskCacheSizeLimit0 = uint64(unitSize)
  41. c.onDiskCacheSizeLimit1 = 4 * c.onDiskCacheSizeLimit0
  42. c.onDiskCacheSizeLimit2 = 2 * c.onDiskCacheSizeLimit1
  43. c.diskCaches[0] = NewOnDiskCacheLayer(dir, "c0_2", diskSizeInUnit*unitSize/8, 2)
  44. c.diskCaches[1] = NewOnDiskCacheLayer(dir, "c1_3", diskSizeInUnit*unitSize/4+diskSizeInUnit*unitSize/8, 3)
  45. c.diskCaches[2] = NewOnDiskCacheLayer(dir, "c2_2", diskSizeInUnit*unitSize/2, 2)
  46. c.maxFilePartSizeInCache = uint64(unitSize*diskSizeInUnit) / 4
  47. return c
  48. }
  49. func (c *TieredChunkCache) GetMaxFilePartSizeInCache() (answer uint64) {
  50. if c == nil {
  51. return 0
  52. }
  53. return c.maxFilePartSizeInCache
  54. }
  55. func (c *TieredChunkCache) IsInCache(fileId string, lockNeeded bool) (answer bool) {
  56. if c == nil {
  57. return false
  58. }
  59. if lockNeeded {
  60. c.RLock()
  61. defer c.RUnlock()
  62. }
  63. item := c.memCache.cache.Get(fileId)
  64. if item != nil {
  65. glog.V(4).Infof("fileId %s is in memcache", fileId)
  66. return true
  67. }
  68. fid, err := needle.ParseFileIdFromString(fileId)
  69. if err != nil {
  70. glog.V(4).Infof("failed to parse file id %s", fileId)
  71. return false
  72. }
  73. // Check disk cache with volume ID and cookie validation
  74. for i, diskCacheLayer := range c.diskCaches {
  75. for k, v := range diskCacheLayer.diskCaches {
  76. if nv, ok := v.nm.Get(fid.Key); ok {
  77. // Read cache header to check volume ID and cookie
  78. headerBytes := make([]byte, cacheHeaderSize)
  79. if readN, readErr := v.DataBackend.ReadAt(headerBytes, nv.Offset.ToActualOffset()); readErr == nil && readN == cacheHeaderSize {
  80. // Parse volume ID and cookie from header
  81. storedVolumeId, storedCookie := parseCacheHeader(headerBytes)
  82. if storedVolumeId == fid.VolumeId && storedCookie == fid.Cookie {
  83. glog.V(4).Infof("fileId %s is in diskCaches[%d].volume[%d]", fileId, i, k)
  84. return true
  85. }
  86. glog.V(4).Infof("fileId %s header mismatch in diskCaches[%d].volume[%d]: stored volume %d cookie %x, expected volume %d cookie %x",
  87. fileId, i, k, storedVolumeId, storedCookie, fid.VolumeId, fid.Cookie)
  88. }
  89. }
  90. }
  91. }
  92. return false
  93. }
  94. func (c *TieredChunkCache) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
  95. if c == nil {
  96. return 0, nil
  97. }
  98. c.RLock()
  99. defer c.RUnlock()
  100. minSize := offset + uint64(len(data))
  101. if minSize <= c.onDiskCacheSizeLimit0 {
  102. n, err = c.memCache.readChunkAt(data, fileId, offset)
  103. if err != nil {
  104. glog.Errorf("failed to read from memcache: %s", err)
  105. }
  106. if n == int(len(data)) {
  107. return n, nil
  108. }
  109. }
  110. fid, err := needle.ParseFileIdFromString(fileId)
  111. if err != nil {
  112. glog.Errorf("failed to parse file id %s", fileId)
  113. return 0, nil
  114. }
  115. // Try disk caches with volume ID and cookie validation
  116. if minSize <= c.onDiskCacheSizeLimit0 {
  117. n, err = c.readChunkAtWithHeaderValidation(data, fid, offset, 0)
  118. if n == int(len(data)) {
  119. return
  120. }
  121. }
  122. if minSize <= c.onDiskCacheSizeLimit1 {
  123. n, err = c.readChunkAtWithHeaderValidation(data, fid, offset, 1)
  124. if n == int(len(data)) {
  125. return
  126. }
  127. }
  128. {
  129. n, err = c.readChunkAtWithHeaderValidation(data, fid, offset, 2)
  130. if n == int(len(data)) {
  131. return
  132. }
  133. }
  134. return 0, nil
  135. }
  136. func (c *TieredChunkCache) SetChunk(fileId string, data []byte) {
  137. if c == nil {
  138. return
  139. }
  140. c.Lock()
  141. defer c.Unlock()
  142. glog.V(4).Infof("SetChunk %s size %d\n", fileId, len(data))
  143. if c.IsInCache(fileId, false) {
  144. glog.V(4).Infof("fileId %s is already in cache", fileId)
  145. return
  146. }
  147. c.doSetChunk(fileId, data)
  148. }
  149. func (c *TieredChunkCache) doSetChunk(fileId string, data []byte) {
  150. // Disk cache format: [4-byte volumeId][4-byte cookie][chunk data]
  151. // Memory cache format: full fileId as key -> raw data (unchanged)
  152. // Memory cache unchanged - uses full fileId
  153. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  154. c.memCache.SetChunk(fileId, data)
  155. }
  156. fid, err := needle.ParseFileIdFromString(fileId)
  157. if err != nil {
  158. glog.Errorf("failed to parse file id %s", fileId)
  159. return
  160. }
  161. // Prepend volume ID and cookie to data for disk cache
  162. // Format: [4-byte volumeId][4-byte cookie][chunk data]
  163. headerBytes := make([]byte, cacheHeaderSize)
  164. // Store volume ID in first 4 bytes using big-endian
  165. binary.BigEndian.PutUint32(headerBytes[0:4], uint32(fid.VolumeId))
  166. // Store cookie in next 4 bytes
  167. types.CookieToBytes(headerBytes[4:8], fid.Cookie)
  168. dataWithHeader := append(headerBytes, data...)
  169. // Store with volume ID and cookie header in disk cache
  170. if len(data) <= int(c.onDiskCacheSizeLimit0) {
  171. c.diskCaches[0].setChunk(fid.Key, dataWithHeader)
  172. } else if len(data) <= int(c.onDiskCacheSizeLimit1) {
  173. c.diskCaches[1].setChunk(fid.Key, dataWithHeader)
  174. } else {
  175. c.diskCaches[2].setChunk(fid.Key, dataWithHeader)
  176. }
  177. }
  178. func (c *TieredChunkCache) Shutdown() {
  179. if c == nil {
  180. return
  181. }
  182. c.Lock()
  183. defer c.Unlock()
  184. for _, diskCache := range c.diskCaches {
  185. diskCache.shutdown()
  186. }
  187. }
  188. // readChunkAtWithHeaderValidation reads from disk cache with volume ID and cookie validation
  189. func (c *TieredChunkCache) readChunkAtWithHeaderValidation(data []byte, fid *needle.FileId, offset uint64, cacheLevel int) (n int, err error) {
  190. // Step 1: Read and validate header (volume ID + cookie)
  191. headerBuffer := make([]byte, cacheHeaderSize)
  192. headerRead, err := c.diskCaches[cacheLevel].readChunkAt(headerBuffer, fid.Key, 0)
  193. if err != nil {
  194. glog.V(4).Infof("failed to read header for %s from cache level %d: %v",
  195. fid.String(), cacheLevel, err)
  196. return 0, err
  197. }
  198. if headerRead < cacheHeaderSize {
  199. glog.V(4).Infof("insufficient data for header validation for %s from cache level %d: read %d bytes",
  200. fid.String(), cacheLevel, headerRead)
  201. return 0, nil // Not enough data for header - likely old format, treat as cache miss
  202. }
  203. // Parse volume ID and cookie from header
  204. storedVolumeId, storedCookie := parseCacheHeader(headerBuffer)
  205. // Validate both volume ID and cookie
  206. if storedVolumeId != fid.VolumeId || storedCookie != fid.Cookie {
  207. glog.V(4).Infof("header mismatch for %s in cache level %d: stored volume %d cookie %x, expected volume %d cookie %x (possible old format)",
  208. fid.String(), cacheLevel, storedVolumeId, storedCookie, fid.VolumeId, fid.Cookie)
  209. return 0, nil // Treat as cache miss - could be old format or actual mismatch
  210. }
  211. // Step 2: Read actual data from the offset position (after header)
  212. // The disk cache has format: [4-byte volumeId][4-byte cookie][actual chunk data]
  213. // We want to read from position: cacheHeaderSize + offset
  214. dataOffset := cacheHeaderSize + offset
  215. n, err = c.diskCaches[cacheLevel].readChunkAt(data, fid.Key, dataOffset)
  216. if err != nil {
  217. glog.V(4).Infof("failed to read data at offset %d for %s from cache level %d: %v",
  218. offset, fid.String(), cacheLevel, err)
  219. return 0, err
  220. }
  221. return n, nil
  222. }
  223. func min(x, y int) int {
  224. if x < y {
  225. return x
  226. }
  227. return y
  228. }