internal.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package topology
  2. import (
  3. "fmt"
  4. "time"
  5. )
  6. // reassignTaskStates assigns tasks to the appropriate disks
  7. func (at *ActiveTopology) reassignTaskStates() {
  8. // Clear existing task assignments
  9. for _, disk := range at.disks {
  10. disk.pendingTasks = nil
  11. disk.assignedTasks = nil
  12. disk.recentTasks = nil
  13. }
  14. // Reassign pending tasks
  15. for _, task := range at.pendingTasks {
  16. at.assignTaskToDisk(task)
  17. }
  18. // Reassign assigned tasks
  19. for _, task := range at.assignedTasks {
  20. at.assignTaskToDisk(task)
  21. }
  22. // Reassign recent tasks
  23. for _, task := range at.recentTasks {
  24. at.assignTaskToDisk(task)
  25. }
  26. }
  27. // assignTaskToDisk assigns a task to the appropriate disk(s)
  28. func (at *ActiveTopology) assignTaskToDisk(task *taskState) {
  29. addedDisks := make(map[string]bool)
  30. // Local helper function to assign task to a disk and avoid code duplication
  31. assign := func(server string, diskID uint32) {
  32. key := fmt.Sprintf("%s:%d", server, diskID)
  33. if server == "" || addedDisks[key] {
  34. return
  35. }
  36. if disk, exists := at.disks[key]; exists {
  37. switch task.Status {
  38. case TaskStatusPending:
  39. disk.pendingTasks = append(disk.pendingTasks, task)
  40. case TaskStatusInProgress:
  41. disk.assignedTasks = append(disk.assignedTasks, task)
  42. case TaskStatusCompleted:
  43. disk.recentTasks = append(disk.recentTasks, task)
  44. }
  45. addedDisks[key] = true
  46. }
  47. }
  48. // Assign to all source disks
  49. for _, source := range task.Sources {
  50. assign(source.SourceServer, source.SourceDisk)
  51. }
  52. // Assign to all destination disks (duplicates automatically avoided by helper)
  53. for _, dest := range task.Destinations {
  54. assign(dest.TargetServer, dest.TargetDisk)
  55. }
  56. }
  57. // isDiskAvailable checks if a disk can accept new tasks
  58. func (at *ActiveTopology) isDiskAvailable(disk *activeDisk, taskType TaskType) bool {
  59. // Check if disk has too many pending and active tasks
  60. activeLoad := len(disk.pendingTasks) + len(disk.assignedTasks)
  61. if activeLoad >= MaxConcurrentTasksPerDisk {
  62. return false
  63. }
  64. // Check for conflicting task types
  65. for _, task := range disk.assignedTasks {
  66. if at.areTaskTypesConflicting(task.TaskType, taskType) {
  67. return false
  68. }
  69. }
  70. return true
  71. }
  72. // areTaskTypesConflicting checks if two task types conflict
  73. func (at *ActiveTopology) areTaskTypesConflicting(existing, new TaskType) bool {
  74. // Examples of conflicting task types
  75. conflictMap := map[TaskType][]TaskType{
  76. TaskTypeVacuum: {TaskTypeBalance, TaskTypeErasureCoding},
  77. TaskTypeBalance: {TaskTypeVacuum, TaskTypeErasureCoding},
  78. TaskTypeErasureCoding: {TaskTypeVacuum, TaskTypeBalance},
  79. }
  80. if conflicts, exists := conflictMap[existing]; exists {
  81. for _, conflictType := range conflicts {
  82. if conflictType == new {
  83. return true
  84. }
  85. }
  86. }
  87. return false
  88. }
  89. // cleanupRecentTasks removes old recent tasks
  90. func (at *ActiveTopology) cleanupRecentTasks() {
  91. cutoff := time.Now().Add(-time.Duration(at.recentTaskWindowSeconds) * time.Second)
  92. for taskID, task := range at.recentTasks {
  93. if task.CompletedAt.Before(cutoff) {
  94. delete(at.recentTasks, taskID)
  95. }
  96. }
  97. }