compact_map.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332
  1. package needle_map
  2. import (
  3. "sort"
  4. "sync"
  5. . "github.com/seaweedfs/seaweedfs/weed/storage/types"
  6. new_map "github.com/seaweedfs/seaweedfs/weed/storage/needle_map"
  7. )
  8. const (
  9. MaxSectionBucketSize = 1024 * 8
  10. LookBackWindowSize = 1024 // how many entries to look back when inserting into a section
  11. )
  12. type SectionalNeedleId uint32
  13. const SectionalNeedleIdLimit = 1<<32 - 1
  14. type SectionalNeedleValue struct {
  15. Key SectionalNeedleId
  16. OffsetLower OffsetLower `comment:"Volume offset"` //since aligned to 8 bytes, range is 4G*8=32G
  17. Size Size `comment:"Size of the data portion"`
  18. OffsetHigher OffsetHigher
  19. }
  20. type CompactSection struct {
  21. sync.RWMutex
  22. values []SectionalNeedleValue
  23. overflow Overflow
  24. start NeedleId
  25. end NeedleId
  26. }
  27. type Overflow []SectionalNeedleValue
  28. func NewCompactSection(start NeedleId) *CompactSection {
  29. return &CompactSection{
  30. values: make([]SectionalNeedleValue, 0),
  31. overflow: Overflow(make([]SectionalNeedleValue, 0)),
  32. start: start,
  33. }
  34. }
  35. // return old entry size
  36. func (cs *CompactSection) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) {
  37. cs.Lock()
  38. defer cs.Unlock()
  39. if key > cs.end {
  40. cs.end = key
  41. }
  42. skey := SectionalNeedleId(key - cs.start)
  43. if i := cs.binarySearchValues(skey); i >= 0 {
  44. // update
  45. oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size
  46. cs.values[i].OffsetHigher, cs.values[i].OffsetLower, cs.values[i].Size = offset.OffsetHigher, offset.OffsetLower, size
  47. return
  48. }
  49. var lkey SectionalNeedleId
  50. if len(cs.values) > 0 {
  51. lkey = cs.values[len(cs.values)-1].Key
  52. }
  53. hasAdded := false
  54. switch {
  55. case len(cs.values) < MaxSectionBucketSize && lkey <= skey:
  56. // non-overflow insert
  57. cs.values = append(cs.values, SectionalNeedleValue{
  58. Key: skey,
  59. OffsetLower: offset.OffsetLower,
  60. Size: size,
  61. OffsetHigher: offset.OffsetHigher,
  62. })
  63. hasAdded = true
  64. case len(cs.values) < MaxSectionBucketSize:
  65. // still has capacity and only partially out of order
  66. lookBackIndex := len(cs.values) - LookBackWindowSize
  67. if lookBackIndex < 0 {
  68. lookBackIndex = 0
  69. }
  70. if cs.values[lookBackIndex].Key <= skey {
  71. for ; lookBackIndex < len(cs.values); lookBackIndex++ {
  72. if cs.values[lookBackIndex].Key >= skey {
  73. break
  74. }
  75. }
  76. cs.values = append(cs.values, SectionalNeedleValue{})
  77. copy(cs.values[lookBackIndex+1:], cs.values[lookBackIndex:])
  78. cs.values[lookBackIndex].Key, cs.values[lookBackIndex].Size = skey, size
  79. cs.values[lookBackIndex].OffsetLower, cs.values[lookBackIndex].OffsetHigher = offset.OffsetLower, offset.OffsetHigher
  80. hasAdded = true
  81. }
  82. }
  83. // overflow insert
  84. if !hasAdded {
  85. if oldValue, found := cs.findOverflowEntry(skey); found {
  86. oldOffset.OffsetHigher, oldOffset.OffsetLower, oldSize = oldValue.OffsetHigher, oldValue.OffsetLower, oldValue.Size
  87. }
  88. cs.setOverflowEntry(skey, offset, size)
  89. } else {
  90. // if we maxed out our values bucket, pin its capacity to minimize memory usage
  91. if len(cs.values) == MaxSectionBucketSize {
  92. bucket := make([]SectionalNeedleValue, len(cs.values))
  93. copy(bucket, cs.values)
  94. cs.values = bucket
  95. }
  96. }
  97. return
  98. }
  99. func (cs *CompactSection) setOverflowEntry(skey SectionalNeedleId, offset Offset, size Size) {
  100. needleValue := SectionalNeedleValue{Key: skey, OffsetLower: offset.OffsetLower, Size: size, OffsetHigher: offset.OffsetHigher}
  101. insertCandidate := sort.Search(len(cs.overflow), func(i int) bool {
  102. return cs.overflow[i].Key >= needleValue.Key
  103. })
  104. if insertCandidate != len(cs.overflow) && cs.overflow[insertCandidate].Key == needleValue.Key {
  105. cs.overflow[insertCandidate] = needleValue
  106. return
  107. }
  108. cs.overflow = append(cs.overflow, SectionalNeedleValue{})
  109. copy(cs.overflow[insertCandidate+1:], cs.overflow[insertCandidate:])
  110. cs.overflow[insertCandidate] = needleValue
  111. }
  112. func (cs *CompactSection) findOverflowEntry(key SectionalNeedleId) (nv SectionalNeedleValue, found bool) {
  113. foundCandidate := sort.Search(len(cs.overflow), func(i int) bool {
  114. return cs.overflow[i].Key >= key
  115. })
  116. if foundCandidate != len(cs.overflow) && cs.overflow[foundCandidate].Key == key {
  117. return cs.overflow[foundCandidate], true
  118. }
  119. return nv, false
  120. }
  121. func (cs *CompactSection) deleteOverflowEntry(key SectionalNeedleId) {
  122. length := len(cs.overflow)
  123. deleteCandidate := sort.Search(length, func(i int) bool {
  124. return cs.overflow[i].Key >= key
  125. })
  126. if deleteCandidate != length && cs.overflow[deleteCandidate].Key == key {
  127. if cs.overflow[deleteCandidate].Size.IsValid() {
  128. cs.overflow[deleteCandidate].Size = -cs.overflow[deleteCandidate].Size
  129. }
  130. }
  131. }
  132. // return old entry size
  133. func (cs *CompactSection) Delete(key NeedleId) Size {
  134. cs.Lock()
  135. defer cs.Unlock()
  136. ret := Size(0)
  137. if key > cs.end {
  138. return ret
  139. }
  140. skey := SectionalNeedleId(key - cs.start)
  141. if i := cs.binarySearchValues(skey); i >= 0 {
  142. if cs.values[i].Size > 0 && cs.values[i].Size.IsValid() {
  143. ret = cs.values[i].Size
  144. cs.values[i].Size = -cs.values[i].Size
  145. }
  146. }
  147. if v, found := cs.findOverflowEntry(skey); found {
  148. cs.deleteOverflowEntry(skey)
  149. ret = v.Size
  150. }
  151. return ret
  152. }
  153. func (cs *CompactSection) Get(key NeedleId) (*new_map.NeedleValue, bool) {
  154. cs.RLock()
  155. defer cs.RUnlock()
  156. if key > cs.end {
  157. return nil, false
  158. }
  159. skey := SectionalNeedleId(key - cs.start)
  160. if v, ok := cs.findOverflowEntry(skey); ok {
  161. nv := toNeedleValue(v, cs)
  162. return &nv, true
  163. }
  164. if i := cs.binarySearchValues(skey); i >= 0 {
  165. nv := toNeedleValue(cs.values[i], cs)
  166. return &nv, true
  167. }
  168. return nil, false
  169. }
  170. func (cs *CompactSection) binarySearchValues(key SectionalNeedleId) int {
  171. x := sort.Search(len(cs.values), func(i int) bool {
  172. return cs.values[i].Key >= key
  173. })
  174. if x >= len(cs.values) {
  175. return -1
  176. }
  177. if cs.values[x].Key > key {
  178. return -2
  179. }
  180. return x
  181. }
  182. // This map assumes mostly inserting increasing keys
  183. // This map assumes mostly inserting increasing keys
  184. type CompactMap struct {
  185. list []*CompactSection
  186. }
  187. func NewCompactMap() *CompactMap {
  188. return &CompactMap{}
  189. }
  190. func (cm *CompactMap) Set(key NeedleId, offset Offset, size Size) (oldOffset Offset, oldSize Size) {
  191. x := cm.binarySearchCompactSection(key)
  192. if x < 0 || (key-cm.list[x].start) > SectionalNeedleIdLimit {
  193. // println(x, "adding to existing", len(cm.list), "sections, starting", key)
  194. cs := NewCompactSection(key)
  195. cm.list = append(cm.list, cs)
  196. x = len(cm.list) - 1
  197. //keep compact section sorted by start
  198. for x >= 0 {
  199. if x > 0 && cm.list[x-1].start > key {
  200. cm.list[x] = cm.list[x-1]
  201. // println("shift", x, "start", cs.start, "to", x-1)
  202. x = x - 1
  203. } else {
  204. cm.list[x] = cs
  205. // println("cs", x, "start", cs.start)
  206. break
  207. }
  208. }
  209. }
  210. // println(key, "set to section[", x, "].start", cm.list[x].start)
  211. return cm.list[x].Set(key, offset, size)
  212. }
  213. func (cm *CompactMap) Delete(key NeedleId) Size {
  214. x := cm.binarySearchCompactSection(key)
  215. if x < 0 {
  216. return Size(0)
  217. }
  218. return cm.list[x].Delete(key)
  219. }
  220. func (cm *CompactMap) Get(key NeedleId) (*new_map.NeedleValue, bool) {
  221. x := cm.binarySearchCompactSection(key)
  222. if x < 0 {
  223. return nil, false
  224. }
  225. return cm.list[x].Get(key)
  226. }
  227. func (cm *CompactMap) binarySearchCompactSection(key NeedleId) int {
  228. l, h := 0, len(cm.list)-1
  229. if h < 0 {
  230. return -5
  231. }
  232. if cm.list[h].start <= key {
  233. if len(cm.list[h].values) < MaxSectionBucketSize || key <= cm.list[h].end {
  234. return h
  235. }
  236. return -4
  237. }
  238. for l <= h {
  239. m := (l + h) / 2
  240. if key < cm.list[m].start {
  241. h = m - 1
  242. } else { // cm.list[m].start <= key
  243. if cm.list[m+1].start <= key {
  244. l = m + 1
  245. } else {
  246. return m
  247. }
  248. }
  249. }
  250. return -3
  251. }
  252. // Visit visits all entries or stop if any error when visiting
  253. func (cm *CompactMap) AscendingVisit(visit func(new_map.NeedleValue) error) error {
  254. for _, cs := range cm.list {
  255. cs.RLock()
  256. var i, j int
  257. for i, j = 0, 0; i < len(cs.overflow) && j < len(cs.values); {
  258. if cs.overflow[i].Key < cs.values[j].Key {
  259. if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil {
  260. cs.RUnlock()
  261. return err
  262. }
  263. i++
  264. } else if cs.overflow[i].Key == cs.values[j].Key {
  265. j++
  266. } else {
  267. if err := visit(toNeedleValue(cs.values[j], cs)); err != nil {
  268. cs.RUnlock()
  269. return err
  270. }
  271. j++
  272. }
  273. }
  274. for ; i < len(cs.overflow); i++ {
  275. if err := visit(toNeedleValue(cs.overflow[i], cs)); err != nil {
  276. cs.RUnlock()
  277. return err
  278. }
  279. }
  280. for ; j < len(cs.values); j++ {
  281. if err := visit(toNeedleValue(cs.values[j], cs)); err != nil {
  282. cs.RUnlock()
  283. return err
  284. }
  285. }
  286. cs.RUnlock()
  287. }
  288. return nil
  289. }
  290. func toNeedleValue(snv SectionalNeedleValue, cs *CompactSection) new_map.NeedleValue {
  291. offset := Offset{
  292. OffsetHigher: snv.OffsetHigher,
  293. OffsetLower: snv.OffsetLower,
  294. }
  295. return new_map.NeedleValue{Key: NeedleId(snv.Key) + cs.start, Offset: offset, Size: snv.Size}
  296. }
  297. func toSectionalNeedleValue(nv new_map.NeedleValue, cs *CompactSection) SectionalNeedleValue {
  298. return SectionalNeedleValue{
  299. Key: SectionalNeedleId(nv.Key - cs.start),
  300. OffsetLower: nv.Offset.OffsetLower,
  301. Size: nv.Size,
  302. OffsetHigher: nv.Offset.OffsetHigher,
  303. }
  304. }