detection.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596
  1. package erasure_coding
  2. import (
  3. "fmt"
  4. "strings"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/admin/topology"
  7. "github.com/seaweedfs/seaweedfs/weed/glog"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  10. "github.com/seaweedfs/seaweedfs/weed/worker/tasks/base"
  11. "github.com/seaweedfs/seaweedfs/weed/worker/types"
  12. )
  13. // Detection implements the detection logic for erasure coding tasks
  14. func Detection(metrics []*types.VolumeHealthMetrics, clusterInfo *types.ClusterInfo, config base.TaskConfig) ([]*types.TaskDetectionResult, error) {
  15. if !config.IsEnabled() {
  16. return nil, nil
  17. }
  18. ecConfig := config.(*Config)
  19. var results []*types.TaskDetectionResult
  20. now := time.Now()
  21. quietThreshold := time.Duration(ecConfig.QuietForSeconds) * time.Second
  22. minSizeBytes := uint64(ecConfig.MinSizeMB) * 1024 * 1024 // Configurable minimum
  23. debugCount := 0
  24. skippedAlreadyEC := 0
  25. skippedTooSmall := 0
  26. skippedCollectionFilter := 0
  27. skippedQuietTime := 0
  28. skippedFullness := 0
  29. for _, metric := range metrics {
  30. // Skip if already EC volume
  31. if metric.IsECVolume {
  32. skippedAlreadyEC++
  33. continue
  34. }
  35. // Check minimum size requirement
  36. if metric.Size < minSizeBytes {
  37. skippedTooSmall++
  38. continue
  39. }
  40. // Check collection filter if specified
  41. if ecConfig.CollectionFilter != "" {
  42. // Parse comma-separated collections
  43. allowedCollections := make(map[string]bool)
  44. for _, collection := range strings.Split(ecConfig.CollectionFilter, ",") {
  45. allowedCollections[strings.TrimSpace(collection)] = true
  46. }
  47. // Skip if volume's collection is not in the allowed list
  48. if !allowedCollections[metric.Collection] {
  49. skippedCollectionFilter++
  50. continue
  51. }
  52. }
  53. // Check quiet duration and fullness criteria
  54. if metric.Age >= quietThreshold && metric.FullnessRatio >= ecConfig.FullnessRatio {
  55. glog.Infof("EC Detection: Volume %d meets all criteria, attempting to create task", metric.VolumeID)
  56. // Generate task ID for ActiveTopology integration
  57. taskID := fmt.Sprintf("ec_vol_%d_%d", metric.VolumeID, now.Unix())
  58. result := &types.TaskDetectionResult{
  59. TaskID: taskID, // Link to ActiveTopology pending task
  60. TaskType: types.TaskTypeErasureCoding,
  61. VolumeID: metric.VolumeID,
  62. Server: metric.Server,
  63. Collection: metric.Collection,
  64. Priority: types.TaskPriorityLow, // EC is not urgent
  65. Reason: fmt.Sprintf("Volume meets EC criteria: quiet for %.1fs (>%ds), fullness=%.1f%% (>%.1f%%), size=%.1fMB (>%dMB)",
  66. metric.Age.Seconds(), ecConfig.QuietForSeconds, metric.FullnessRatio*100, ecConfig.FullnessRatio*100,
  67. float64(metric.Size)/(1024*1024), ecConfig.MinSizeMB),
  68. ScheduleAt: now,
  69. }
  70. // Plan EC destinations if ActiveTopology is available
  71. if clusterInfo.ActiveTopology != nil {
  72. glog.Infof("EC Detection: ActiveTopology available, planning destinations for volume %d", metric.VolumeID)
  73. multiPlan, err := planECDestinations(clusterInfo.ActiveTopology, metric, ecConfig)
  74. if err != nil {
  75. glog.Warningf("Failed to plan EC destinations for volume %d: %v", metric.VolumeID, err)
  76. continue // Skip this volume if destination planning fails
  77. }
  78. glog.Infof("EC Detection: Successfully planned %d destinations for volume %d", len(multiPlan.Plans), metric.VolumeID)
  79. // Calculate expected shard size for EC operation
  80. // Each data shard will be approximately volumeSize / dataShards
  81. expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
  82. // Add pending EC shard task to ActiveTopology for capacity management
  83. // Extract shard destinations from multiPlan
  84. var shardDestinations []string
  85. var shardDiskIDs []uint32
  86. for _, plan := range multiPlan.Plans {
  87. shardDestinations = append(shardDestinations, plan.TargetNode)
  88. shardDiskIDs = append(shardDiskIDs, plan.TargetDisk)
  89. }
  90. // Find all volume replica locations (server + disk) from topology
  91. glog.Infof("EC Detection: Looking for replica locations for volume %d", metric.VolumeID)
  92. replicaLocations := findVolumeReplicaLocations(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
  93. if len(replicaLocations) == 0 {
  94. glog.Warningf("No replica locations found for volume %d, skipping EC", metric.VolumeID)
  95. continue
  96. }
  97. glog.Infof("EC Detection: Found %d replica locations for volume %d", len(replicaLocations), metric.VolumeID)
  98. // Find existing EC shards from previous failed attempts
  99. existingECShards := findExistingECShards(clusterInfo.ActiveTopology, metric.VolumeID, metric.Collection)
  100. // Combine volume replicas and existing EC shards for cleanup
  101. var sources []topology.TaskSourceSpec
  102. // Add volume replicas (will free volume slots)
  103. for _, replica := range replicaLocations {
  104. sources = append(sources, topology.TaskSourceSpec{
  105. ServerID: replica.ServerID,
  106. DiskID: replica.DiskID,
  107. DataCenter: replica.DataCenter,
  108. Rack: replica.Rack,
  109. CleanupType: topology.CleanupVolumeReplica,
  110. })
  111. }
  112. // Add existing EC shards (will free shard slots)
  113. duplicateCheck := make(map[string]bool)
  114. for _, replica := range replicaLocations {
  115. key := fmt.Sprintf("%s:%d", replica.ServerID, replica.DiskID)
  116. duplicateCheck[key] = true
  117. }
  118. for _, shard := range existingECShards {
  119. key := fmt.Sprintf("%s:%d", shard.ServerID, shard.DiskID)
  120. if !duplicateCheck[key] { // Avoid duplicates if EC shards are on same disk as volume replicas
  121. sources = append(sources, topology.TaskSourceSpec{
  122. ServerID: shard.ServerID,
  123. DiskID: shard.DiskID,
  124. DataCenter: shard.DataCenter,
  125. Rack: shard.Rack,
  126. CleanupType: topology.CleanupECShards,
  127. })
  128. duplicateCheck[key] = true
  129. }
  130. }
  131. glog.V(2).Infof("Found %d volume replicas and %d existing EC shards for volume %d (total %d cleanup sources)",
  132. len(replicaLocations), len(existingECShards), metric.VolumeID, len(sources))
  133. // Convert shard destinations to TaskDestinationSpec
  134. destinations := make([]topology.TaskDestinationSpec, len(shardDestinations))
  135. shardImpact := topology.CalculateECShardStorageImpact(1, int64(expectedShardSize)) // 1 shard per destination
  136. shardSize := int64(expectedShardSize)
  137. for i, dest := range shardDestinations {
  138. destinations[i] = topology.TaskDestinationSpec{
  139. ServerID: dest,
  140. DiskID: shardDiskIDs[i],
  141. StorageImpact: &shardImpact,
  142. EstimatedSize: &shardSize,
  143. }
  144. }
  145. err = clusterInfo.ActiveTopology.AddPendingTask(topology.TaskSpec{
  146. TaskID: taskID,
  147. TaskType: topology.TaskTypeErasureCoding,
  148. VolumeID: metric.VolumeID,
  149. VolumeSize: int64(metric.Size),
  150. Sources: sources,
  151. Destinations: destinations,
  152. })
  153. if err != nil {
  154. glog.Warningf("Failed to add pending EC shard task to ActiveTopology for volume %d: %v", metric.VolumeID, err)
  155. continue // Skip this volume if topology task addition fails
  156. }
  157. glog.V(2).Infof("Added pending EC shard task %s to ActiveTopology for volume %d with %d cleanup sources and %d shard destinations",
  158. taskID, metric.VolumeID, len(sources), len(multiPlan.Plans))
  159. // Create unified sources and targets for EC task
  160. result.TypedParams = &worker_pb.TaskParams{
  161. TaskId: taskID, // Link to ActiveTopology pending task
  162. VolumeId: metric.VolumeID,
  163. Collection: metric.Collection,
  164. VolumeSize: metric.Size, // Store original volume size for tracking changes
  165. // Unified sources - all sources that will be processed/cleaned up
  166. Sources: convertTaskSourcesToProtobuf(sources, metric.VolumeID),
  167. // Unified targets - all EC shard destinations
  168. Targets: createECTargets(multiPlan),
  169. TaskParams: &worker_pb.TaskParams_ErasureCodingParams{
  170. ErasureCodingParams: createECTaskParams(multiPlan),
  171. },
  172. }
  173. glog.V(1).Infof("Planned EC destinations for volume %d: %d shards across %d racks, %d DCs",
  174. metric.VolumeID, len(multiPlan.Plans), multiPlan.SuccessfulRack, multiPlan.SuccessfulDCs)
  175. } else {
  176. glog.Warningf("No ActiveTopology available for destination planning in EC detection")
  177. continue // Skip this volume if no topology available
  178. }
  179. glog.Infof("EC Detection: Successfully created EC task for volume %d, adding to results", metric.VolumeID)
  180. results = append(results, result)
  181. } else {
  182. // Count debug reasons
  183. if debugCount < 5 { // Limit to avoid spam
  184. if metric.Age < quietThreshold {
  185. skippedQuietTime++
  186. }
  187. if metric.FullnessRatio < ecConfig.FullnessRatio {
  188. skippedFullness++
  189. }
  190. }
  191. debugCount++
  192. }
  193. }
  194. // Log debug summary if no tasks were created
  195. if len(results) == 0 && len(metrics) > 0 {
  196. totalVolumes := len(metrics)
  197. glog.V(1).Infof("EC detection: No tasks created for %d volumes (skipped: %d already EC, %d too small, %d filtered, %d not quiet, %d not full)",
  198. totalVolumes, skippedAlreadyEC, skippedTooSmall, skippedCollectionFilter, skippedQuietTime, skippedFullness)
  199. // Show details for first few volumes
  200. for i, metric := range metrics {
  201. if i >= 3 || metric.IsECVolume { // Limit to first 3 non-EC volumes
  202. continue
  203. }
  204. sizeMB := float64(metric.Size) / (1024 * 1024)
  205. glog.Infof("ERASURE CODING: Volume %d: size=%.1fMB (need ≥%dMB), age=%s (need ≥%s), fullness=%.1f%% (need ≥%.1f%%)",
  206. metric.VolumeID, sizeMB, ecConfig.MinSizeMB, metric.Age.Truncate(time.Minute), quietThreshold.Truncate(time.Minute),
  207. metric.FullnessRatio*100, ecConfig.FullnessRatio*100)
  208. }
  209. }
  210. return results, nil
  211. }
  212. // planECDestinations plans the destinations for erasure coding operation
  213. // This function implements EC destination planning logic directly in the detection phase
  214. func planECDestinations(activeTopology *topology.ActiveTopology, metric *types.VolumeHealthMetrics, ecConfig *Config) (*topology.MultiDestinationPlan, error) {
  215. // Calculate expected shard size for EC operation
  216. expectedShardSize := uint64(metric.Size) / uint64(erasure_coding.DataShardsCount)
  217. // Get source node information from topology
  218. var sourceRack, sourceDC string
  219. // Extract rack and DC from topology info
  220. topologyInfo := activeTopology.GetTopologyInfo()
  221. if topologyInfo != nil {
  222. for _, dc := range topologyInfo.DataCenterInfos {
  223. for _, rack := range dc.RackInfos {
  224. for _, dataNodeInfo := range rack.DataNodeInfos {
  225. if dataNodeInfo.Id == metric.Server {
  226. sourceDC = dc.Id
  227. sourceRack = rack.Id
  228. break
  229. }
  230. }
  231. if sourceRack != "" {
  232. break
  233. }
  234. }
  235. if sourceDC != "" {
  236. break
  237. }
  238. }
  239. }
  240. // Get available disks for EC placement with effective capacity consideration (includes pending tasks)
  241. // For EC, we typically need 1 volume slot per shard, so use minimum capacity of 1
  242. // For EC, we need at least 1 available volume slot on a disk to consider it for placement.
  243. // Note: We don't exclude the source server since the original volume will be deleted after EC conversion
  244. availableDisks := activeTopology.GetDisksWithEffectiveCapacity(topology.TaskTypeErasureCoding, "", 1)
  245. if len(availableDisks) < erasure_coding.MinTotalDisks {
  246. return nil, fmt.Errorf("insufficient disks for EC placement: need %d, have %d (considering pending/active tasks)", erasure_coding.MinTotalDisks, len(availableDisks))
  247. }
  248. // Select best disks for EC placement with rack/DC diversity
  249. selectedDisks := selectBestECDestinations(availableDisks, sourceRack, sourceDC, erasure_coding.TotalShardsCount)
  250. if len(selectedDisks) < erasure_coding.MinTotalDisks {
  251. return nil, fmt.Errorf("found %d disks, but could not find %d suitable destinations for EC placement", len(selectedDisks), erasure_coding.MinTotalDisks)
  252. }
  253. var plans []*topology.DestinationPlan
  254. rackCount := make(map[string]int)
  255. dcCount := make(map[string]int)
  256. for _, disk := range selectedDisks {
  257. plan := &topology.DestinationPlan{
  258. TargetNode: disk.NodeID,
  259. TargetDisk: disk.DiskID,
  260. TargetRack: disk.Rack,
  261. TargetDC: disk.DataCenter,
  262. ExpectedSize: expectedShardSize, // Set calculated EC shard size
  263. PlacementScore: calculateECScore(disk, sourceRack, sourceDC),
  264. }
  265. plans = append(plans, plan)
  266. // Count rack and DC diversity
  267. rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
  268. rackCount[rackKey]++
  269. dcCount[disk.DataCenter]++
  270. }
  271. // Log capacity utilization information using ActiveTopology's encapsulated logic
  272. totalEffectiveCapacity := int64(0)
  273. for _, plan := range plans {
  274. effectiveCapacity := activeTopology.GetEffectiveAvailableCapacity(plan.TargetNode, plan.TargetDisk)
  275. totalEffectiveCapacity += effectiveCapacity
  276. }
  277. glog.V(1).Infof("Planned EC destinations for volume %d (size=%d bytes): expected shard size=%d bytes, %d shards across %d racks, %d DCs, total effective capacity=%d slots",
  278. metric.VolumeID, metric.Size, expectedShardSize, len(plans), len(rackCount), len(dcCount), totalEffectiveCapacity)
  279. // Log storage impact for EC task (source only - EC has multiple targets handled individually)
  280. sourceChange, _ := topology.CalculateTaskStorageImpact(topology.TaskTypeErasureCoding, int64(metric.Size))
  281. glog.V(2).Infof("EC task capacity management: source_reserves_with_zero_impact={VolumeSlots:%d, ShardSlots:%d}, %d_targets_will_receive_shards, estimated_size=%d",
  282. sourceChange.VolumeSlots, sourceChange.ShardSlots, len(plans), metric.Size)
  283. glog.V(2).Infof("EC source reserves capacity but with zero StorageSlotChange impact")
  284. return &topology.MultiDestinationPlan{
  285. Plans: plans,
  286. TotalShards: len(plans),
  287. SuccessfulRack: len(rackCount),
  288. SuccessfulDCs: len(dcCount),
  289. }, nil
  290. }
  291. // createECTargets creates unified TaskTarget structures from the multi-destination plan
  292. // with proper shard ID assignment during planning phase
  293. func createECTargets(multiPlan *topology.MultiDestinationPlan) []*worker_pb.TaskTarget {
  294. var targets []*worker_pb.TaskTarget
  295. numTargets := len(multiPlan.Plans)
  296. // Create shard assignment arrays for each target (round-robin distribution)
  297. targetShards := make([][]uint32, numTargets)
  298. for i := range targetShards {
  299. targetShards[i] = make([]uint32, 0)
  300. }
  301. // Distribute shards in round-robin fashion to spread both data and parity shards
  302. // This ensures each target gets a mix of data shards (0-9) and parity shards (10-13)
  303. for shardId := uint32(0); shardId < uint32(erasure_coding.TotalShardsCount); shardId++ {
  304. targetIndex := int(shardId) % numTargets
  305. targetShards[targetIndex] = append(targetShards[targetIndex], shardId)
  306. }
  307. // Create targets with assigned shard IDs
  308. for i, plan := range multiPlan.Plans {
  309. target := &worker_pb.TaskTarget{
  310. Node: plan.TargetNode,
  311. DiskId: plan.TargetDisk,
  312. Rack: plan.TargetRack,
  313. DataCenter: plan.TargetDC,
  314. ShardIds: targetShards[i], // Round-robin assigned shards
  315. EstimatedSize: plan.ExpectedSize,
  316. }
  317. targets = append(targets, target)
  318. // Log shard assignment with data/parity classification
  319. dataShards := make([]uint32, 0)
  320. parityShards := make([]uint32, 0)
  321. for _, shardId := range targetShards[i] {
  322. if shardId < uint32(erasure_coding.DataShardsCount) {
  323. dataShards = append(dataShards, shardId)
  324. } else {
  325. parityShards = append(parityShards, shardId)
  326. }
  327. }
  328. glog.V(2).Infof("EC planning: target %s assigned shards %v (data: %v, parity: %v)",
  329. plan.TargetNode, targetShards[i], dataShards, parityShards)
  330. }
  331. glog.V(1).Infof("EC planning: distributed %d shards across %d targets using round-robin (data shards 0-%d, parity shards %d-%d)",
  332. erasure_coding.TotalShardsCount, numTargets,
  333. erasure_coding.DataShardsCount-1, erasure_coding.DataShardsCount, erasure_coding.TotalShardsCount-1)
  334. return targets
  335. }
  336. // convertTaskSourcesToProtobuf converts topology.TaskSourceSpec to worker_pb.TaskSource
  337. func convertTaskSourcesToProtobuf(sources []topology.TaskSourceSpec, volumeID uint32) []*worker_pb.TaskSource {
  338. var protobufSources []*worker_pb.TaskSource
  339. for _, source := range sources {
  340. pbSource := &worker_pb.TaskSource{
  341. Node: source.ServerID,
  342. DiskId: source.DiskID,
  343. DataCenter: source.DataCenter,
  344. Rack: source.Rack,
  345. }
  346. // Convert storage impact to estimated size
  347. if source.EstimatedSize != nil {
  348. pbSource.EstimatedSize = uint64(*source.EstimatedSize)
  349. }
  350. // Set appropriate volume ID or shard IDs based on cleanup type
  351. switch source.CleanupType {
  352. case topology.CleanupVolumeReplica:
  353. // This is a volume replica, use the actual volume ID
  354. pbSource.VolumeId = volumeID
  355. case topology.CleanupECShards:
  356. // This is EC shards, also use the volume ID for consistency
  357. pbSource.VolumeId = volumeID
  358. // Note: ShardIds would need to be passed separately if we need specific shard info
  359. }
  360. protobufSources = append(protobufSources, pbSource)
  361. }
  362. return protobufSources
  363. }
  364. // createECTaskParams creates clean EC task parameters (destinations now in unified targets)
  365. func createECTaskParams(multiPlan *topology.MultiDestinationPlan) *worker_pb.ErasureCodingTaskParams {
  366. return &worker_pb.ErasureCodingTaskParams{
  367. DataShards: erasure_coding.DataShardsCount, // Standard data shards
  368. ParityShards: erasure_coding.ParityShardsCount, // Standard parity shards
  369. }
  370. }
  371. // selectBestECDestinations selects multiple disks for EC shard placement with diversity
  372. func selectBestECDestinations(disks []*topology.DiskInfo, sourceRack, sourceDC string, shardsNeeded int) []*topology.DiskInfo {
  373. if len(disks) == 0 {
  374. return nil
  375. }
  376. // Group disks by rack and DC for diversity
  377. rackGroups := make(map[string][]*topology.DiskInfo)
  378. for _, disk := range disks {
  379. rackKey := fmt.Sprintf("%s:%s", disk.DataCenter, disk.Rack)
  380. rackGroups[rackKey] = append(rackGroups[rackKey], disk)
  381. }
  382. var selected []*topology.DiskInfo
  383. usedRacks := make(map[string]bool)
  384. // First pass: select one disk from each rack for maximum diversity
  385. for rackKey, rackDisks := range rackGroups {
  386. if len(selected) >= shardsNeeded {
  387. break
  388. }
  389. // Select best disk from this rack
  390. bestDisk := selectBestFromRack(rackDisks, sourceRack, sourceDC)
  391. if bestDisk != nil {
  392. selected = append(selected, bestDisk)
  393. usedRacks[rackKey] = true
  394. }
  395. }
  396. // Second pass: if we need more disks, select from racks we've already used
  397. if len(selected) < shardsNeeded {
  398. for _, disk := range disks {
  399. if len(selected) >= shardsNeeded {
  400. break
  401. }
  402. // Skip if already selected
  403. alreadySelected := false
  404. for _, sel := range selected {
  405. if sel.NodeID == disk.NodeID && sel.DiskID == disk.DiskID {
  406. alreadySelected = true
  407. break
  408. }
  409. }
  410. if !alreadySelected && isDiskSuitableForEC(disk) {
  411. selected = append(selected, disk)
  412. }
  413. }
  414. }
  415. return selected
  416. }
  417. // selectBestFromRack selects the best disk from a rack for EC placement
  418. func selectBestFromRack(disks []*topology.DiskInfo, sourceRack, sourceDC string) *topology.DiskInfo {
  419. if len(disks) == 0 {
  420. return nil
  421. }
  422. var bestDisk *topology.DiskInfo
  423. bestScore := -1.0
  424. for _, disk := range disks {
  425. if !isDiskSuitableForEC(disk) {
  426. continue
  427. }
  428. score := calculateECScore(disk, sourceRack, sourceDC)
  429. if score > bestScore {
  430. bestScore = score
  431. bestDisk = disk
  432. }
  433. }
  434. return bestDisk
  435. }
  436. // calculateECScore calculates placement score for EC operations
  437. func calculateECScore(disk *topology.DiskInfo, sourceRack, sourceDC string) float64 {
  438. if disk.DiskInfo == nil {
  439. return 0.0
  440. }
  441. score := 0.0
  442. // Prefer disks with available capacity (primary factor)
  443. if disk.DiskInfo.MaxVolumeCount > 0 {
  444. utilization := float64(disk.DiskInfo.VolumeCount) / float64(disk.DiskInfo.MaxVolumeCount)
  445. score += (1.0 - utilization) * 60.0 // Up to 60 points for available capacity
  446. }
  447. // Consider current load (secondary factor)
  448. score += (10.0 - float64(disk.LoadCount)) // Up to 10 points for low load
  449. // Note: We don't penalize placing shards on the same rack/DC as source
  450. // since the original volume will be deleted after EC conversion.
  451. // This allows for better network efficiency and storage utilization.
  452. return score
  453. }
  454. // isDiskSuitableForEC checks if a disk is suitable for EC placement
  455. func isDiskSuitableForEC(disk *topology.DiskInfo) bool {
  456. if disk.DiskInfo == nil {
  457. return false
  458. }
  459. // Check if disk is not overloaded with tasks
  460. if disk.LoadCount > topology.MaxTaskLoadForECPlacement {
  461. return false
  462. }
  463. return true
  464. }
  465. // findVolumeReplicaLocations finds all replica locations (server + disk) for the specified volume
  466. // Uses O(1) indexed lookup for optimal performance on large clusters.
  467. func findVolumeReplicaLocations(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
  468. if activeTopology == nil {
  469. return nil
  470. }
  471. return activeTopology.GetVolumeLocations(volumeID, collection)
  472. }
  473. // findExistingECShards finds existing EC shards for a volume (from previous failed EC attempts)
  474. // Uses O(1) indexed lookup for optimal performance on large clusters.
  475. func findExistingECShards(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []topology.VolumeReplica {
  476. if activeTopology == nil {
  477. return nil
  478. }
  479. return activeTopology.GetECShardLocations(volumeID, collection)
  480. }
  481. // findVolumeReplicas finds all servers that have replicas of the specified volume
  482. func findVolumeReplicas(activeTopology *topology.ActiveTopology, volumeID uint32, collection string) []string {
  483. if activeTopology == nil {
  484. return []string{}
  485. }
  486. topologyInfo := activeTopology.GetTopologyInfo()
  487. if topologyInfo == nil {
  488. return []string{}
  489. }
  490. var replicaServers []string
  491. // Iterate through all nodes to find volume replicas
  492. for _, dc := range topologyInfo.DataCenterInfos {
  493. for _, rack := range dc.RackInfos {
  494. for _, nodeInfo := range rack.DataNodeInfos {
  495. for _, diskInfo := range nodeInfo.DiskInfos {
  496. for _, volumeInfo := range diskInfo.VolumeInfos {
  497. if volumeInfo.Id == volumeID && volumeInfo.Collection == collection {
  498. replicaServers = append(replicaServers, nodeInfo.Id)
  499. break // Found volume on this node, move to next node
  500. }
  501. }
  502. }
  503. }
  504. }
  505. }
  506. return replicaServers
  507. }