detection.go 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272
  1. package balance
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/seaweedfs/seaweedfs/weed/admin/topology"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
  9. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  10. )
  11. // Detection implements the detection logic for balance tasks
  12. func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
  13. if !config.IsEnabled() {
  14. return nil, nil
  15. }
  16. balanceConfig := config.(*Config)
  17. // Skip if cluster is too small
  18. minVolumeCount := 2 // More reasonable for small clusters
  19. if len(metrics) < minVolumeCount {
  20. glog.Infof("BALANCE: No tasks created - cluster too small (%d volumes, need ≥%d)", len(metrics), minVolumeCount)
  21. return nil, nil
  22. }
  23. // Analyze volume distribution across servers
  24. serverVolumeCounts := make(map[string]int)
  25. for _, metric := range metrics {
  26. serverVolumeCounts[metric.Server]++
  27. }
  28. if len(serverVolumeCounts) < balanceConfig.MinServerCount {
  29. glog.Infof("BALANCE: No tasks created - too few servers (%d servers, need ≥%d)", len(serverVolumeCounts), balanceConfig.MinServerCount)
  30. return nil, nil
  31. }
  32. // Calculate balance metrics
  33. totalVolumes := len(metrics)
  34. avgVolumesPerServer := float64(totalVolumes) / float64(len(serverVolumeCounts))
  35. maxVolumes := 0
  36. minVolumes := totalVolumes
  37. maxServer := ""
  38. minServer := ""
  39. for server, count := range serverVolumeCounts {
  40. if count > maxVolumes {
  41. maxVolumes = count
  42. maxServer = server
  43. }
  44. if count < minVolumes {
  45. minVolumes = count
  46. minServer = server
  47. }
  48. }
  49. // Check if imbalance exceeds threshold
  50. imbalanceRatio := float64(maxVolumes-minVolumes) / avgVolumesPerServer
  51. if imbalanceRatio <= balanceConfig.ImbalanceThreshold {
  52. glog.Infof("BALANCE: No tasks created - cluster well balanced. Imbalance=%.1f%% (threshold=%.1f%%). Max=%d volumes on %s, Min=%d on %s, Avg=%.1f",
  53. imbalanceRatio*100, balanceConfig.ImbalanceThreshold*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
  54. return nil, nil
  55. }
  56. // Select a volume from the overloaded server for balance
  57. var selectedVolume *types.VolumeHealthMetrics
  58. for _, metric := range metrics {
  59. if metric.Server == maxServer {
  60. selectedVolume = metric
  61. break
  62. }
  63. }
  64. if selectedVolume == nil {
  65. glog.Warningf("BALANCE: Could not find volume on overloaded server %s", maxServer)
  66. return nil, nil
  67. }
  68. // Create balance task with volume and destination planning info
  69. reason := fmt.Sprintf("Cluster imbalance detected: %.1f%% (max: %d on %s, min: %d on %s, avg: %.1f)",
  70. imbalanceRatio*100, maxVolumes, maxServer, minVolumes, minServer, avgVolumesPerServer)
  71. // Generate task ID for ActiveTopology integration
  72. taskID := fmt.Sprintf("balance_vol_%d_%d", selectedVolume.VolumeID, time.Now().Unix())
  73. task := &types.TaskDetectionResult{
  74. TaskID: taskID, // Link to ActiveTopology pending task
  75. TaskType: types.TaskTypeBalance,
  76. VolumeID: selectedVolume.VolumeID,
  77. Server: selectedVolume.Server,
  78. Collection: selectedVolume.Collection,
  79. Priority: types.TaskPriorityNormal,
  80. Reason: reason,
  81. ScheduleAt: time.Now(),
  82. }
  83. // Plan destination if ActiveTopology is available
  84. if clusterInfo.ActiveTopology != nil {
  85. destinationPlan, err := planBalanceDestination(clusterInfo.ActiveTopology, selectedVolume)
  86. if err != nil {
  87. glog.Warningf("Failed to plan balance destination for volume %d: %v", selectedVolume.VolumeID, err)
  88. return nil, nil // Skip this task if destination planning fails
  89. }
  90. // Find the actual disk containing the volume on the source server
  91. sourceDisk, found := base.FindVolumeDisk(clusterInfo.ActiveTopology, selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
  92. if !found {
  93. return nil, fmt.Errorf("BALANCE: Could not find volume %d (collection: %s) on source server %s - unable to create balance task",
  94. selectedVolume.VolumeID, selectedVolume.Collection, selectedVolume.Server)
  95. }
  96. // Create typed parameters with unified source and target information
  97. task.TypedParams = &worker_pb.TaskParams{
  98. TaskId: taskID, // Link to ActiveTopology pending task
  99. VolumeId: selectedVolume.VolumeID,
  100. Collection: selectedVolume.Collection,
  101. VolumeSize: selectedVolume.Size, // Store original volume size for tracking changes
  102. // Unified sources and targets - the only way to specify locations
  103. Sources: []*worker_pb.TaskSource{
  104. {
  105. Node: selectedVolume.Server,
  106. DiskId: sourceDisk,
  107. VolumeId: selectedVolume.VolumeID,
  108. EstimatedSize: selectedVolume.Size,
  109. DataCenter: selectedVolume.DataCenter,
  110. Rack: selectedVolume.Rack,
  111. },
  112. },
  113. Targets: []*worker_pb.TaskTarget{
  114. {
  115. Node: destinationPlan.TargetNode,
  116. DiskId: destinationPlan.TargetDisk,
  117. VolumeId: selectedVolume.VolumeID,
  118. EstimatedSize: destinationPlan.ExpectedSize,
  119. DataCenter: destinationPlan.TargetDC,
  120. Rack: destinationPlan.TargetRack,
  121. },
  122. },
  123. TaskParams: &worker_pb.TaskParams_BalanceParams{
  124. BalanceParams: &worker_pb.BalanceTaskParams{
  125. ForceMove: false,
  126. TimeoutSeconds: 600, // 10 minutes default
  127. },
  128. },
  129. }
  130. glog.V(1).Infof("Planned balance destination for volume %d: %s -> %s",
  131. selectedVolume.VolumeID, selectedVolume.Server, destinationPlan.TargetNode)
  132. // Add pending balance task to ActiveTopology for capacity management
  133. targetDisk := destinationPlan.TargetDisk
  134. err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
  135. TaskID: taskID,
  136. TaskType: topology.TaskTypeBalance,
  137. VolumeID: selectedVolume.VolumeID,
  138. VolumeSize: int64(selectedVolume.Size),
  139. Sources: []topology.TaskSourceSpec{
  140. {ServerID: selectedVolume.Server, DiskID: sourceDisk},
  141. },
  142. Destinations: []topology.TaskDestinationSpec{
  143. {ServerID: destinationPlan.TargetNode, DiskID: targetDisk},
  144. },
  145. })
  146. if err != nil {
  147. return nil, fmt.Errorf("BALANCE: Failed to add pending task for volume %d: %v", selectedVolume.VolumeID, err)
  148. }
  149. glog.V(2).Infof("Added pending balance task %s to ActiveTopology for volume %d: %s:%d -> %s:%d",
  150. taskID, selectedVolume.VolumeID, selectedVolume.Server, sourceDisk, destinationPlan.TargetNode, targetDisk)
  151. } else {
  152. glog.Warningf("No ActiveTopology available for destination planning in balance detection")
  153. return nil, nil
  154. }
  155. return []*types.TaskDetectionResult{task}, nil
  156. }
  157. // planBalanceDestination plans the destination for a balance operation
  158. // This function implements destination planning logic directly in the detection phase
  159. func planBalanceDestination(activeTopology *topology.ActiveTopology, selectedVolume *types.VolumeHealthMetrics) (*topology.DestinationPlan, error) {
  160. // Get source node information from topology
  161. var sourceRack, sourceDC string
  162. // Extract rack and DC from topology info
  163. topologyInfo := activeTopology.GetTopologyInfo()
  164. if topologyInfo != nil {
  165. for _, dc := range topologyInfo.DataCenterInfos {
  166. for _, rack := range dc.RackInfos {
  167. for _, dataNodeInfo := range rack.DataNodeInfos {
  168. if dataNodeInfo.Id == selectedVolume.Server {
  169. sourceDC = dc.Id
  170. sourceRack = rack.Id
  171. break
  172. }
  173. }
  174. if sourceRack != "" {
  175. break
  176. }
  177. }
  178. if sourceDC != "" {
  179. break
  180. }
  181. }
  182. }
  183. // Get available disks, excluding the source node
  184. availableDisks := activeTopology.GetAvailableDisks(topology.TaskTypeBalance, selectedVolume.Server)
  185. if len(availableDisks) == 0 {
  186. return nil, fmt.Errorf("no available disks for balance operation")
  187. }
  188. // Find the best destination disk based on balance criteria
  189. var bestDisk *topology.DiskInfo
  190. bestScore := -1.0
  191. for _, disk := range availableDisks {
  192. score := calculateBalanceScore(disk, sourceRack, sourceDC, selectedVolume.Size)
  193. if score > bestScore {
  194. bestScore = score
  195. bestDisk = disk
  196. }
  197. }
  198. if bestDisk == nil {
  199. return nil, fmt.Errorf("no suitable destination found for balance operation")
  200. }
  201. return &topology.DestinationPlan{
  202. TargetNode: bestDisk.NodeID,
  203. TargetDisk: bestDisk.DiskID,
  204. TargetRack: bestDisk.Rack,
  205. TargetDC: bestDisk.DataCenter,
  206. ExpectedSize: selectedVolume.Size,
  207. PlacementScore: bestScore,
  208. }, nil
  209. }
  210. // calculateBalanceScore calculates placement score for balance operations
  211. func calculateBalanceScore(disk *topology.DiskInfo, sourceRack, sourceDC string, volumeSize uint64) float64 {
  212. if disk.DiskInfo == nil {
  213. return 0.0
  214. }
  215. score := 0.0
  216. // Prefer disks with lower current volume count (better for balance)
  217. if disk.DiskInfo.MaxVolumeCount > 0 {
  218. utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
  219. score += (1.0 - utilization) * 40.0 // Up to 40 points for low utilization
  220. }
  221. // Prefer different racks for better distribution
  222. if disk.Rack != sourceRack {
  223. score += 30.0
  224. }
  225. // Prefer different data centers for better distribution
  226. if disk.DataCenter != sourceDC {
  227. score += 20.0
  228. }
  229. // Prefer disks with lower current load
  230. score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
  231. return score
  232. }