node.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. package topology
  2. import (
  3. "errors"
  4. "fmt"
  5. "math/rand/v2"
  6. "strings"
  7. "sync"
  8. "sync/atomic"
  9. "time"
  10. "github.com/seaweedfs/seaweedfs/weed/glog"
  11. "github.com/seaweedfs/seaweedfs/weed/stats"
  12. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  13. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  14. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  15. )
  16. type NodeId string
  17. // CapacityReservation represents a temporary reservation of capacity
  18. type CapacityReservation struct {
  19. reservationId string
  20. diskType types.DiskType
  21. count int64
  22. createdAt time.Time
  23. }
  24. // CapacityReservations manages capacity reservations for a node
  25. type CapacityReservations struct {
  26. sync.RWMutex
  27. reservations map[string]*CapacityReservation
  28. reservedCounts map[types.DiskType]int64
  29. }
  30. func newCapacityReservations() *CapacityReservations {
  31. return &CapacityReservations{
  32. reservations: make(map[string]*CapacityReservation),
  33. reservedCounts: make(map[types.DiskType]int64),
  34. }
  35. }
  36. func (cr *CapacityReservations) addReservation(diskType types.DiskType, count int64) string {
  37. cr.Lock()
  38. defer cr.Unlock()
  39. return cr.doAddReservation(diskType, count)
  40. }
  41. func (cr *CapacityReservations) removeReservation(reservationId string) bool {
  42. cr.Lock()
  43. defer cr.Unlock()
  44. if reservation, exists := cr.reservations[reservationId]; exists {
  45. delete(cr.reservations, reservationId)
  46. cr.decrementCount(reservation.diskType, reservation.count)
  47. return true
  48. }
  49. return false
  50. }
  51. func (cr *CapacityReservations) getReservedCount(diskType types.DiskType) int64 {
  52. cr.RLock()
  53. defer cr.RUnlock()
  54. return cr.reservedCounts[diskType]
  55. }
  56. // decrementCount is a helper to decrement reserved count and clean up zero entries
  57. func (cr *CapacityReservations) decrementCount(diskType types.DiskType, count int64) {
  58. cr.reservedCounts[diskType] -= count
  59. // Clean up zero counts to prevent map growth
  60. if cr.reservedCounts[diskType] <= 0 {
  61. delete(cr.reservedCounts, diskType)
  62. }
  63. }
  64. // doAddReservation is a helper to add a reservation, assuming the lock is already held
  65. func (cr *CapacityReservations) doAddReservation(diskType types.DiskType, count int64) string {
  66. now := time.Now()
  67. reservationId := fmt.Sprintf("%s-%d-%d-%d", diskType, count, now.UnixNano(), rand.Int64())
  68. cr.reservations[reservationId] = &CapacityReservation{
  69. reservationId: reservationId,
  70. diskType: diskType,
  71. count: count,
  72. createdAt: now,
  73. }
  74. cr.reservedCounts[diskType] += count
  75. return reservationId
  76. }
  77. // tryReserveAtomic atomically checks available space and reserves if possible
  78. func (cr *CapacityReservations) tryReserveAtomic(diskType types.DiskType, count int64, availableSpaceFunc func() int64) (reservationId string, success bool) {
  79. cr.Lock()
  80. defer cr.Unlock()
  81. // Check available space under lock
  82. currentReserved := cr.reservedCounts[diskType]
  83. availableSpace := availableSpaceFunc() - currentReserved
  84. if availableSpace >= count {
  85. // Create and add reservation atomically
  86. return cr.doAddReservation(diskType, count), true
  87. }
  88. return "", false
  89. }
  90. func (cr *CapacityReservations) cleanExpiredReservations(expirationDuration time.Duration) {
  91. cr.Lock()
  92. defer cr.Unlock()
  93. now := time.Now()
  94. for id, reservation := range cr.reservations {
  95. if now.Sub(reservation.createdAt) > expirationDuration {
  96. delete(cr.reservations, id)
  97. cr.decrementCount(reservation.diskType, reservation.count)
  98. glog.V(1).Infof("Cleaned up expired capacity reservation: %s", id)
  99. }
  100. }
  101. }
  102. type Node interface {
  103. Id() NodeId
  104. String() string
  105. AvailableSpaceFor(option *VolumeGrowOption) int64
  106. ReserveOneVolume(r int64, option *VolumeGrowOption) (*DataNode, error)
  107. ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (*DataNode, error)
  108. UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts)
  109. UpAdjustMaxVolumeId(vid needle.VolumeId)
  110. GetDiskUsages() *DiskUsages
  111. // Capacity reservation methods for avoiding race conditions
  112. TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool)
  113. ReleaseReservedCapacity(reservationId string)
  114. AvailableSpaceForReservation(option *VolumeGrowOption) int64
  115. GetMaxVolumeId() needle.VolumeId
  116. SetParent(Node)
  117. LinkChildNode(node Node)
  118. UnlinkChildNode(nodeId NodeId)
  119. CollectDeadNodeAndFullVolumes(freshThreshHold int64, volumeSizeLimit uint64, growThreshold float64)
  120. IsDataNode() bool
  121. IsRack() bool
  122. IsDataCenter() bool
  123. IsLocked() bool
  124. Children() []Node
  125. Parent() Node
  126. GetValue() interface{} //get reference to the topology,dc,rack,datanode
  127. }
  128. type NodeImpl struct {
  129. diskUsages *DiskUsages
  130. id NodeId
  131. parent Node
  132. sync.RWMutex // lock children
  133. children map[NodeId]Node
  134. maxVolumeId needle.VolumeId
  135. //for rack, data center, topology
  136. nodeType string
  137. value interface{}
  138. // capacity reservations to prevent race conditions during volume creation
  139. capacityReservations *CapacityReservations
  140. }
  141. func (n *NodeImpl) GetDiskUsages() *DiskUsages {
  142. return n.diskUsages
  143. }
  144. // the first node must satisfy filterFirstNodeFn(), the rest nodes must have one free slot
  145. func (n *NodeImpl) PickNodesByWeight(numberOfNodes int, option *VolumeGrowOption, filterFirstNodeFn func(dn Node) error) (firstNode Node, restNodes []Node, err error) {
  146. var totalWeights int64
  147. var errs []string
  148. n.RLock()
  149. candidates := make([]Node, 0, len(n.children))
  150. candidatesWeights := make([]int64, 0, len(n.children))
  151. //pick nodes which has enough free volumes as candidates, and use free volumes number as node weight.
  152. for _, node := range n.children {
  153. if node.AvailableSpaceFor(option) <= 0 {
  154. continue
  155. }
  156. totalWeights += node.AvailableSpaceFor(option)
  157. candidates = append(candidates, node)
  158. candidatesWeights = append(candidatesWeights, node.AvailableSpaceFor(option))
  159. }
  160. n.RUnlock()
  161. if len(candidates) < numberOfNodes {
  162. glog.V(0).Infoln(n.Id(), "failed to pick", numberOfNodes, "from ", len(candidates), "node candidates")
  163. return nil, nil, errors.New("Not enough data nodes found!")
  164. }
  165. //pick nodes randomly by weights, the node picked earlier has higher final weights
  166. sortedCandidates := make([]Node, 0, len(candidates))
  167. for i := 0; i < len(candidates); i++ {
  168. weightsInterval := rand.Int64N(totalWeights)
  169. lastWeights := int64(0)
  170. for k, weights := range candidatesWeights {
  171. if (weightsInterval >= lastWeights) && (weightsInterval < lastWeights+weights) {
  172. sortedCandidates = append(sortedCandidates, candidates[k])
  173. candidatesWeights[k] = 0
  174. totalWeights -= weights
  175. break
  176. }
  177. lastWeights += weights
  178. }
  179. }
  180. restNodes = make([]Node, 0, numberOfNodes-1)
  181. ret := false
  182. n.RLock()
  183. for k, node := range sortedCandidates {
  184. if err := filterFirstNodeFn(node); err == nil {
  185. firstNode = node
  186. if k >= numberOfNodes-1 {
  187. restNodes = sortedCandidates[:numberOfNodes-1]
  188. } else {
  189. restNodes = append(restNodes, sortedCandidates[:k]...)
  190. restNodes = append(restNodes, sortedCandidates[k+1:numberOfNodes]...)
  191. }
  192. ret = true
  193. break
  194. } else {
  195. errs = append(errs, string(node.Id())+":"+err.Error())
  196. }
  197. }
  198. n.RUnlock()
  199. if !ret {
  200. return nil, nil, errors.New("No matching data node found! \n" + strings.Join(errs, "\n"))
  201. }
  202. return
  203. }
  204. func (n *NodeImpl) IsDataNode() bool {
  205. return n.nodeType == "DataNode"
  206. }
  207. func (n *NodeImpl) IsRack() bool {
  208. return n.nodeType == "Rack"
  209. }
  210. func (n *NodeImpl) IsDataCenter() bool {
  211. return n.nodeType == "DataCenter"
  212. }
  213. func (n *NodeImpl) IsLocked() (isTryLock bool) {
  214. if isTryLock = n.TryRLock(); isTryLock {
  215. n.RUnlock()
  216. }
  217. return !isTryLock
  218. }
  219. func (n *NodeImpl) String() string {
  220. if n.parent != nil {
  221. return n.parent.String() + ":" + string(n.id)
  222. }
  223. return string(n.id)
  224. }
  225. func (n *NodeImpl) Id() NodeId {
  226. return n.id
  227. }
  228. func (n *NodeImpl) getOrCreateDisk(diskType types.DiskType) *DiskUsageCounts {
  229. return n.diskUsages.getOrCreateDisk(diskType)
  230. }
  231. func (n *NodeImpl) AvailableSpaceFor(option *VolumeGrowOption) int64 {
  232. t := n.getOrCreateDisk(option.DiskType)
  233. freeVolumeSlotCount := atomic.LoadInt64(&t.maxVolumeCount) + atomic.LoadInt64(&t.remoteVolumeCount) - atomic.LoadInt64(&t.volumeCount)
  234. ecShardCount := atomic.LoadInt64(&t.ecShardCount)
  235. if ecShardCount > 0 {
  236. freeVolumeSlotCount = freeVolumeSlotCount - ecShardCount/erasure_coding.DataShardsCount - 1
  237. }
  238. return freeVolumeSlotCount
  239. }
  240. // AvailableSpaceForReservation returns available space considering existing reservations
  241. func (n *NodeImpl) AvailableSpaceForReservation(option *VolumeGrowOption) int64 {
  242. baseAvailable := n.AvailableSpaceFor(option)
  243. reservedCount := n.capacityReservations.getReservedCount(option.DiskType)
  244. return baseAvailable - reservedCount
  245. }
  246. // TryReserveCapacity attempts to atomically reserve capacity for volume creation
  247. func (n *NodeImpl) TryReserveCapacity(diskType types.DiskType, count int64) (reservationId string, success bool) {
  248. const reservationTimeout = 5 * time.Minute // TODO: make this configurable
  249. // Clean up any expired reservations first
  250. n.capacityReservations.cleanExpiredReservations(reservationTimeout)
  251. // Atomically check and reserve space
  252. option := &VolumeGrowOption{DiskType: diskType}
  253. reservationId, success = n.capacityReservations.tryReserveAtomic(diskType, count, func() int64 {
  254. return n.AvailableSpaceFor(option)
  255. })
  256. if success {
  257. glog.V(1).Infof("Reserved %d capacity for diskType %s on node %s: %s", count, diskType, n.Id(), reservationId)
  258. }
  259. return reservationId, success
  260. }
  261. // ReleaseReservedCapacity releases a previously reserved capacity
  262. func (n *NodeImpl) ReleaseReservedCapacity(reservationId string) {
  263. if n.capacityReservations.removeReservation(reservationId) {
  264. glog.V(1).Infof("Released capacity reservation on node %s: %s", n.Id(), reservationId)
  265. } else {
  266. glog.V(1).Infof("Attempted to release non-existent reservation on node %s: %s", n.Id(), reservationId)
  267. }
  268. }
  269. func (n *NodeImpl) SetParent(node Node) {
  270. n.parent = node
  271. }
  272. func (n *NodeImpl) Children() (ret []Node) {
  273. n.RLock()
  274. defer n.RUnlock()
  275. for _, c := range n.children {
  276. ret = append(ret, c)
  277. }
  278. return ret
  279. }
  280. func (n *NodeImpl) Parent() Node {
  281. return n.parent
  282. }
  283. func (n *NodeImpl) GetValue() interface{} {
  284. return n.value
  285. }
  286. func (n *NodeImpl) ReserveOneVolume(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
  287. return n.reserveOneVolumeInternal(r, option, false)
  288. }
  289. // ReserveOneVolumeForReservation selects a node using reservation-aware capacity checks
  290. func (n *NodeImpl) ReserveOneVolumeForReservation(r int64, option *VolumeGrowOption) (assignedNode *DataNode, err error) {
  291. return n.reserveOneVolumeInternal(r, option, true)
  292. }
  293. func (n *NodeImpl) reserveOneVolumeInternal(r int64, option *VolumeGrowOption, useReservations bool) (assignedNode *DataNode, err error) {
  294. n.RLock()
  295. defer n.RUnlock()
  296. for _, node := range n.children {
  297. var freeSpace int64
  298. if useReservations {
  299. freeSpace = node.AvailableSpaceForReservation(option)
  300. } else {
  301. freeSpace = node.AvailableSpaceFor(option)
  302. }
  303. // fmt.Println("r =", r, ", node =", node, ", freeSpace =", freeSpace)
  304. if freeSpace <= 0 {
  305. continue
  306. }
  307. if r >= freeSpace {
  308. r -= freeSpace
  309. } else {
  310. var hasSpace bool
  311. if useReservations {
  312. hasSpace = node.IsDataNode() && node.AvailableSpaceForReservation(option) > 0
  313. } else {
  314. hasSpace = node.IsDataNode() && node.AvailableSpaceFor(option) > 0
  315. }
  316. if hasSpace {
  317. // fmt.Println("vid =", vid, " assigned to node =", node, ", freeSpace =", node.FreeSpace())
  318. dn := node.(*DataNode)
  319. if dn.IsTerminating {
  320. continue
  321. }
  322. return dn, nil
  323. }
  324. if useReservations {
  325. assignedNode, err = node.ReserveOneVolumeForReservation(r, option)
  326. } else {
  327. assignedNode, err = node.ReserveOneVolume(r, option)
  328. }
  329. if err == nil {
  330. return
  331. }
  332. }
  333. }
  334. return nil, errors.New("No free volume slot found!")
  335. }
  336. func (n *NodeImpl) UpAdjustDiskUsageDelta(diskType types.DiskType, diskUsage *DiskUsageCounts) { //can be negative
  337. existingDisk := n.getOrCreateDisk(diskType)
  338. existingDisk.addDiskUsageCounts(diskUsage)
  339. if n.parent != nil {
  340. n.parent.UpAdjustDiskUsageDelta(diskType, diskUsage)
  341. }
  342. }
  343. func (n *NodeImpl) UpAdjustMaxVolumeId(vid needle.VolumeId) { //can be negative
  344. if n.maxVolumeId < vid {
  345. n.maxVolumeId = vid
  346. if n.parent != nil {
  347. n.parent.UpAdjustMaxVolumeId(vid)
  348. }
  349. }
  350. }
  351. func (n *NodeImpl) GetMaxVolumeId() needle.VolumeId {
  352. return n.maxVolumeId
  353. }
  354. func (n *NodeImpl) LinkChildNode(node Node) {
  355. n.Lock()
  356. defer n.Unlock()
  357. n.doLinkChildNode(node)
  358. }
  359. func (n *NodeImpl) doLinkChildNode(node Node) {
  360. if n.children[node.Id()] == nil {
  361. n.children[node.Id()] = node
  362. for dt, du := range node.GetDiskUsages().usages {
  363. n.UpAdjustDiskUsageDelta(dt, du)
  364. }
  365. n.UpAdjustMaxVolumeId(node.GetMaxVolumeId())
  366. node.SetParent(n)
  367. glog.V(0).Infoln(n, "adds child", node.Id())
  368. }
  369. }
  370. func (n *NodeImpl) UnlinkChildNode(nodeId NodeId) {
  371. n.Lock()
  372. defer n.Unlock()
  373. node := n.children[nodeId]
  374. if node != nil {
  375. node.SetParent(nil)
  376. delete(n.children, node.Id())
  377. for dt, du := range node.GetDiskUsages().negative().usages {
  378. n.UpAdjustDiskUsageDelta(dt, du)
  379. }
  380. glog.V(0).Infoln(n, "removes", node.Id())
  381. }
  382. }
  383. func (n *NodeImpl) CollectDeadNodeAndFullVolumes(freshThreshHoldUnixTime int64, volumeSizeLimit uint64, growThreshold float64) {
  384. if n.IsRack() {
  385. for _, c := range n.Children() {
  386. dn := c.(*DataNode) //can not cast n to DataNode
  387. for _, v := range dn.GetVolumes() {
  388. topo := n.GetTopology()
  389. diskType := types.ToDiskType(v.DiskType)
  390. vl := topo.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
  391. if v.Size >= volumeSizeLimit {
  392. vl.accessLock.RLock()
  393. vacuumTime, ok := vl.vacuumedVolumes[v.Id]
  394. vl.accessLock.RUnlock()
  395. // If a volume has been vacuumed in the past 20 seconds, we do not check whether it has reached full capacity.
  396. // After 20s(grpc timeout), theoretically all the heartbeats of the volume server have reached the master,
  397. // the volume size should be correct, not the size before the vacuum.
  398. if !ok || time.Now().Add(-20*time.Second).After(vacuumTime) {
  399. //fmt.Println("volume",v.Id,"size",v.Size,">",volumeSizeLimit)
  400. topo.chanFullVolumes <- v
  401. }
  402. } else if float64(v.Size) > float64(volumeSizeLimit)*growThreshold {
  403. topo.chanCrowdedVolumes <- v
  404. }
  405. copyCount := v.ReplicaPlacement.GetCopyCount()
  406. if copyCount > 1 {
  407. if copyCount > len(topo.Lookup(v.Collection, v.Id)) {
  408. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(1)
  409. } else {
  410. stats.MasterReplicaPlacementMismatch.WithLabelValues(v.Collection, v.Id.String()).Set(0)
  411. }
  412. }
  413. }
  414. }
  415. } else {
  416. for _, c := range n.Children() {
  417. c.CollectDeadNodeAndFullVolumes(freshThreshHoldUnixTime, volumeSizeLimit, growThreshold)
  418. }
  419. }
  420. }
  421. func (n *NodeImpl) GetTopology() *Topology {
  422. var p Node
  423. p = n
  424. for p.Parent() != nil {
  425. p = p.Parent()
  426. }
  427. return p.GetValue().(*Topology)
  428. }