capacity.go 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
  5. )
  6. // GetEffectiveAvailableCapacity returns the effective available capacity for a disk
  7. // This considers BOTH pending and assigned tasks for capacity reservation.
  8. //
  9. // Formula: BaseAvailable - (VolumeSlots + ShardSlots/ShardsPerVolumeSlot) from all tasks
  10. //
  11. // The calculation includes:
  12. // - Pending tasks: Reserve capacity immediately when added
  13. // - Assigned tasks: Continue to reserve capacity during execution
  14. // - Recently completed tasks are NOT counted against capacity
  15. func (at *ActiveTopology) GetEffectiveAvailableCapacity(nodeID string, diskID uint32) int64 {
  16. at.mutex.RLock()
  17. defer at.mutex.RUnlock()
  18. diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
  19. disk, exists := at.disks[diskKey]
  20. if !exists {
  21. return 0
  22. }
  23. if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
  24. return 0
  25. }
  26. // Use the same logic as getEffectiveAvailableCapacityUnsafe but with locking
  27. capacity := at.getEffectiveAvailableCapacityUnsafe(disk)
  28. return int64(capacity.VolumeSlots)
  29. }
  30. // GetEffectiveAvailableCapacityDetailed returns detailed available capacity as StorageSlotChange
  31. // This provides granular information about available volume slots and shard slots
  32. func (at *ActiveTopology) GetEffectiveAvailableCapacityDetailed(nodeID string, diskID uint32) StorageSlotChange {
  33. at.mutex.RLock()
  34. defer at.mutex.RUnlock()
  35. diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
  36. disk, exists := at.disks[diskKey]
  37. if !exists {
  38. return StorageSlotChange{}
  39. }
  40. if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
  41. return StorageSlotChange{}
  42. }
  43. return at.getEffectiveAvailableCapacityUnsafe(disk)
  44. }
  45. // GetEffectiveCapacityImpact returns the StorageSlotChange impact for a disk
  46. // This shows the net impact from all pending and assigned tasks
  47. func (at *ActiveTopology) GetEffectiveCapacityImpact(nodeID string, diskID uint32) StorageSlotChange {
  48. at.mutex.RLock()
  49. defer at.mutex.RUnlock()
  50. diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
  51. disk, exists := at.disks[diskKey]
  52. if !exists {
  53. return StorageSlotChange{}
  54. }
  55. return at.getEffectiveCapacityUnsafe(disk)
  56. }
  57. // GetDisksWithEffectiveCapacity returns disks with sufficient effective capacity
  58. // This method considers BOTH pending and assigned tasks for capacity reservation using StorageSlotChange.
  59. //
  60. // Parameters:
  61. // - taskType: type of task to check compatibility for
  62. // - excludeNodeID: node to exclude from results
  63. // - minCapacity: minimum effective capacity required (in volume slots)
  64. //
  65. // Returns: DiskInfo objects where VolumeCount reflects capacity reserved by all tasks
  66. func (at *ActiveTopology) GetDisksWithEffectiveCapacity(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
  67. at.mutex.RLock()
  68. defer at.mutex.RUnlock()
  69. var available []*DiskInfo
  70. for _, disk := range at.disks {
  71. if disk.NodeID == excludeNodeID {
  72. continue // Skip excluded node
  73. }
  74. if at.isDiskAvailable(disk, taskType) {
  75. effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
  76. // Only include disks that meet minimum capacity requirement
  77. if int64(effectiveCapacity.VolumeSlots) >= minCapacity {
  78. // Create a new DiskInfo with current capacity information
  79. diskCopy := DiskInfo{
  80. NodeID: disk.DiskInfo.NodeID,
  81. DiskID: disk.DiskInfo.DiskID,
  82. DiskType: disk.DiskInfo.DiskType,
  83. DataCenter: disk.DiskInfo.DataCenter,
  84. Rack: disk.DiskInfo.Rack,
  85. LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks), // Count all tasks
  86. }
  87. // Create a new protobuf DiskInfo to avoid modifying the original
  88. diskInfoCopy := &master_pb.DiskInfo{
  89. DiskId: disk.DiskInfo.DiskInfo.DiskId,
  90. MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
  91. VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(effectiveCapacity.VolumeSlots),
  92. VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
  93. EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
  94. RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
  95. ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
  96. FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
  97. }
  98. diskCopy.DiskInfo = diskInfoCopy
  99. available = append(available, &diskCopy)
  100. }
  101. }
  102. }
  103. return available
  104. }
  105. // GetDisksForPlanning returns disks considering both active and pending tasks for planning decisions
  106. // This helps avoid over-scheduling tasks to the same disk
  107. func (at *ActiveTopology) GetDisksForPlanning(taskType TaskType, excludeNodeID string, minCapacity int64) []*DiskInfo {
  108. at.mutex.RLock()
  109. defer at.mutex.RUnlock()
  110. var available []*DiskInfo
  111. for _, disk := range at.disks {
  112. if disk.NodeID == excludeNodeID {
  113. continue // Skip excluded node
  114. }
  115. // Consider both pending and active tasks for scheduling decisions
  116. if at.isDiskAvailableForPlanning(disk, taskType) {
  117. // Check if disk can accommodate new task considering pending tasks
  118. planningCapacity := at.getPlanningCapacityUnsafe(disk)
  119. if int64(planningCapacity.VolumeSlots) >= minCapacity {
  120. // Create a new DiskInfo with planning information
  121. diskCopy := DiskInfo{
  122. NodeID: disk.DiskInfo.NodeID,
  123. DiskID: disk.DiskInfo.DiskID,
  124. DiskType: disk.DiskInfo.DiskType,
  125. DataCenter: disk.DiskInfo.DataCenter,
  126. Rack: disk.DiskInfo.Rack,
  127. LoadCount: len(disk.pendingTasks) + len(disk.assignedTasks),
  128. }
  129. // Create a new protobuf DiskInfo to avoid modifying the original
  130. diskInfoCopy := &master_pb.DiskInfo{
  131. DiskId: disk.DiskInfo.DiskInfo.DiskId,
  132. MaxVolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount,
  133. VolumeCount: disk.DiskInfo.DiskInfo.MaxVolumeCount - int64(planningCapacity.VolumeSlots),
  134. VolumeInfos: disk.DiskInfo.DiskInfo.VolumeInfos,
  135. EcShardInfos: disk.DiskInfo.DiskInfo.EcShardInfos,
  136. RemoteVolumeCount: disk.DiskInfo.DiskInfo.RemoteVolumeCount,
  137. ActiveVolumeCount: disk.DiskInfo.DiskInfo.ActiveVolumeCount,
  138. FreeVolumeCount: disk.DiskInfo.DiskInfo.FreeVolumeCount,
  139. }
  140. diskCopy.DiskInfo = diskInfoCopy
  141. available = append(available, &diskCopy)
  142. }
  143. }
  144. }
  145. return available
  146. }
  147. // CanAccommodateTask checks if a disk can accommodate a new task considering all constraints
  148. func (at *ActiveTopology) CanAccommodateTask(nodeID string, diskID uint32, taskType TaskType, volumesNeeded int64) bool {
  149. at.mutex.RLock()
  150. defer at.mutex.RUnlock()
  151. diskKey := fmt.Sprintf("%s:%d", nodeID, diskID)
  152. disk, exists := at.disks[diskKey]
  153. if !exists {
  154. return false
  155. }
  156. // Check basic availability
  157. if !at.isDiskAvailable(disk, taskType) {
  158. return false
  159. }
  160. // Check effective capacity
  161. effectiveCapacity := at.getEffectiveAvailableCapacityUnsafe(disk)
  162. return int64(effectiveCapacity.VolumeSlots) >= volumesNeeded
  163. }
  164. // getPlanningCapacityUnsafe considers both pending and active tasks for planning
  165. func (at *ActiveTopology) getPlanningCapacityUnsafe(disk *activeDisk) StorageSlotChange {
  166. if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
  167. return StorageSlotChange{}
  168. }
  169. baseAvailableVolumes := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
  170. // Use the centralized helper function to calculate task storage impact
  171. totalImpact := at.calculateTaskStorageImpact(disk)
  172. // Calculate available capacity considering impact (negative impact reduces availability)
  173. availableVolumeSlots := baseAvailableVolumes - totalImpact.ToVolumeSlots()
  174. if availableVolumeSlots < 0 {
  175. availableVolumeSlots = 0
  176. }
  177. // Return detailed capacity information
  178. return StorageSlotChange{
  179. VolumeSlots: int32(availableVolumeSlots),
  180. ShardSlots: -totalImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
  181. }
  182. }
  183. // isDiskAvailableForPlanning checks if disk can accept new tasks considering pending load
  184. func (at *ActiveTopology) isDiskAvailableForPlanning(disk *activeDisk, taskType TaskType) bool {
  185. // Check total load including pending tasks
  186. totalLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
  187. if totalLoad >= MaxTotalTaskLoadPerDisk {
  188. return false
  189. }
  190. // Check for conflicting task types in active tasks only
  191. for _, task := range disk.assignedTasks {
  192. if at.areTaskTypesConflicting(task.TaskType, taskType) {
  193. return false
  194. }
  195. }
  196. return true
  197. }
  198. // calculateTaskStorageImpact is a helper function that calculates the total storage impact
  199. // from all tasks (pending and assigned) on a given disk. This eliminates code duplication
  200. // between multiple capacity calculation functions.
  201. func (at *ActiveTopology) calculateTaskStorageImpact(disk *activeDisk) StorageSlotChange {
  202. if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
  203. return StorageSlotChange{}
  204. }
  205. totalImpact := StorageSlotChange{}
  206. // Process both pending and assigned tasks with identical logic
  207. taskLists := [][]*taskState{disk.pendingTasks, disk.assignedTasks}
  208. for _, taskList := range taskLists {
  209. for _, task := range taskList {
  210. // Calculate impact for all source locations
  211. for _, source := range task.Sources {
  212. if source.SourceServer == disk.NodeID && source.SourceDisk == disk.DiskID {
  213. totalImpact.AddInPlace(source.StorageChange)
  214. }
  215. }
  216. // Calculate impact for all destination locations
  217. for _, dest := range task.Destinations {
  218. if dest.TargetServer == disk.NodeID && dest.TargetDisk == disk.DiskID {
  219. totalImpact.AddInPlace(dest.StorageChange)
  220. }
  221. }
  222. }
  223. }
  224. return totalImpact
  225. }
  226. // getEffectiveCapacityUnsafe returns effective capacity impact without locking (for internal use)
  227. // Returns StorageSlotChange representing the net impact from all tasks
  228. func (at *ActiveTopology) getEffectiveCapacityUnsafe(disk *activeDisk) StorageSlotChange {
  229. return at.calculateTaskStorageImpact(disk)
  230. }
  231. // getEffectiveAvailableCapacityUnsafe returns detailed available capacity as StorageSlotChange
  232. func (at *ActiveTopology) getEffectiveAvailableCapacityUnsafe(disk *activeDisk) StorageSlotChange {
  233. if disk.DiskInfo == nil || disk.DiskInfo.DiskInfo == nil {
  234. return StorageSlotChange{}
  235. }
  236. baseAvailable := disk.DiskInfo.DiskInfo.MaxVolumeCount - disk.DiskInfo.DiskInfo.VolumeCount
  237. netImpact := at.getEffectiveCapacityUnsafe(disk)
  238. // Calculate available volume slots (negative impact reduces availability)
  239. availableVolumeSlots := baseAvailable - netImpact.ToVolumeSlots()
  240. if availableVolumeSlots < 0 {
  241. availableVolumeSlots = 0
  242. }
  243. // Return detailed capacity information
  244. return StorageSlotChange{
  245. VolumeSlots: int32(availableVolumeSlots),
  246. ShardSlots: -netImpact.ShardSlots, // Available shard capacity (negative impact becomes positive availability)
  247. }
  248. }