maintenance_queue_test.go 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package maintenance
  2. import (
  3. "testing"
  4. "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"
  5. )
  6. // Test suite for canScheduleTaskNow() function and related scheduling logic
  7. //
  8. // This test suite ensures that:
  9. // 1. The fallback scheduling logic works correctly when no integration is present
  10. // 2. Task concurrency limits are properly enforced per task type
  11. // 3. Different task types don't interfere with each other's concurrency limits
  12. // 4. Custom policies with higher concurrency limits work correctly
  13. // 5. Edge cases (nil tasks, empty task types) are handled gracefully
  14. // 6. Helper functions (GetRunningTaskCount, canExecuteTaskType, etc.) work correctly
  15. //
  16. // Background: The canScheduleTaskNow() function is critical for task assignment.
  17. // It was previously failing due to an overly restrictive integration scheduler,
  18. // so we implemented a temporary fix that bypasses the integration and uses
  19. // fallback logic based on simple concurrency limits per task type.
  20. func TestCanScheduleTaskNow_FallbackLogic(t *testing.T) {
  21. // Test the current implementation which uses fallback logic
  22. mq := &MaintenanceQueue{
  23. tasks: make(map[string]*MaintenanceTask),
  24. pendingTasks: []*MaintenanceTask{},
  25. workers: make(map[string]*MaintenanceWorker),
  26. policy: nil, // No policy for default behavior
  27. integration: nil, // No integration to force fallback
  28. }
  29. task := &MaintenanceTask{
  30. ID: "test-task-1",
  31. Type: MaintenanceTaskType("erasure_coding"),
  32. Status: TaskStatusPending,
  33. }
  34. // Should return true with fallback logic (no running tasks, default max concurrent = 1)
  35. result := mq.canScheduleTaskNow(task)
  36. if !result {
  37. t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false")
  38. }
  39. }
  40. func TestCanScheduleTaskNow_FallbackWithRunningTasks(t *testing.T) {
  41. // Test fallback logic when there are already running tasks
  42. mq := &MaintenanceQueue{
  43. tasks: map[string]*MaintenanceTask{
  44. "running-task": {
  45. ID: "running-task",
  46. Type: MaintenanceTaskType("erasure_coding"),
  47. Status: TaskStatusInProgress,
  48. },
  49. },
  50. pendingTasks: []*MaintenanceTask{},
  51. workers: make(map[string]*MaintenanceWorker),
  52. policy: nil,
  53. integration: nil,
  54. }
  55. task := &MaintenanceTask{
  56. ID: "test-task-2",
  57. Type: MaintenanceTaskType("erasure_coding"),
  58. Status: TaskStatusPending,
  59. }
  60. // Should return false because max concurrent is 1 and we have 1 running task
  61. result := mq.canScheduleTaskNow(task)
  62. if result {
  63. t.Errorf("Expected canScheduleTaskNow to return false when at capacity, got true")
  64. }
  65. }
  66. func TestCanScheduleTaskNow_DifferentTaskTypes(t *testing.T) {
  67. // Test that different task types don't interfere with each other
  68. mq := &MaintenanceQueue{
  69. tasks: map[string]*MaintenanceTask{
  70. "running-ec-task": {
  71. ID: "running-ec-task",
  72. Type: MaintenanceTaskType("erasure_coding"),
  73. Status: TaskStatusInProgress,
  74. },
  75. },
  76. pendingTasks: []*MaintenanceTask{},
  77. workers: make(map[string]*MaintenanceWorker),
  78. policy: nil,
  79. integration: nil,
  80. }
  81. // Test vacuum task when EC task is running
  82. vacuumTask := &MaintenanceTask{
  83. ID: "vacuum-task",
  84. Type: MaintenanceTaskType("vacuum"),
  85. Status: TaskStatusPending,
  86. }
  87. // Should return true because vacuum and erasure_coding are different task types
  88. result := mq.canScheduleTaskNow(vacuumTask)
  89. if !result {
  90. t.Errorf("Expected canScheduleTaskNow to return true for different task type, got false")
  91. }
  92. // Test another EC task when one is already running
  93. ecTask := &MaintenanceTask{
  94. ID: "ec-task",
  95. Type: MaintenanceTaskType("erasure_coding"),
  96. Status: TaskStatusPending,
  97. }
  98. // Should return false because max concurrent for EC is 1 and we have 1 running
  99. result = mq.canScheduleTaskNow(ecTask)
  100. if result {
  101. t.Errorf("Expected canScheduleTaskNow to return false for same task type at capacity, got true")
  102. }
  103. }
  104. func TestCanScheduleTaskNow_WithIntegration(t *testing.T) {
  105. // Test with a real MaintenanceIntegration (will use fallback logic in current implementation)
  106. policy := &MaintenancePolicy{
  107. TaskPolicies: make(map[string]*worker_pb.TaskPolicy),
  108. GlobalMaxConcurrent: 10,
  109. DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds
  110. DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds
  111. }
  112. mq := NewMaintenanceQueue(policy)
  113. // Create a basic integration (this would normally be more complex)
  114. integration := NewMaintenanceIntegration(mq, policy)
  115. mq.SetIntegration(integration)
  116. task := &MaintenanceTask{
  117. ID: "test-task-3",
  118. Type: MaintenanceTaskType("erasure_coding"),
  119. Status: TaskStatusPending,
  120. }
  121. // With our current implementation (fallback logic), this should return true
  122. result := mq.canScheduleTaskNow(task)
  123. if !result {
  124. t.Errorf("Expected canScheduleTaskNow to return true with fallback logic, got false")
  125. }
  126. }
  127. func TestGetRunningTaskCount(t *testing.T) {
  128. // Test the helper function used by fallback logic
  129. mq := &MaintenanceQueue{
  130. tasks: map[string]*MaintenanceTask{
  131. "task1": {
  132. ID: "task1",
  133. Type: MaintenanceTaskType("erasure_coding"),
  134. Status: TaskStatusInProgress,
  135. },
  136. "task2": {
  137. ID: "task2",
  138. Type: MaintenanceTaskType("erasure_coding"),
  139. Status: TaskStatusAssigned,
  140. },
  141. "task3": {
  142. ID: "task3",
  143. Type: MaintenanceTaskType("vacuum"),
  144. Status: TaskStatusInProgress,
  145. },
  146. "task4": {
  147. ID: "task4",
  148. Type: MaintenanceTaskType("erasure_coding"),
  149. Status: TaskStatusCompleted,
  150. },
  151. },
  152. pendingTasks: []*MaintenanceTask{},
  153. workers: make(map[string]*MaintenanceWorker),
  154. }
  155. // Should count 2 running EC tasks (in_progress + assigned)
  156. ecCount := mq.GetRunningTaskCount(MaintenanceTaskType("erasure_coding"))
  157. if ecCount != 2 {
  158. t.Errorf("Expected 2 running EC tasks, got %d", ecCount)
  159. }
  160. // Should count 1 running vacuum task
  161. vacuumCount := mq.GetRunningTaskCount(MaintenanceTaskType("vacuum"))
  162. if vacuumCount != 1 {
  163. t.Errorf("Expected 1 running vacuum task, got %d", vacuumCount)
  164. }
  165. // Should count 0 running balance tasks
  166. balanceCount := mq.GetRunningTaskCount(MaintenanceTaskType("balance"))
  167. if balanceCount != 0 {
  168. t.Errorf("Expected 0 running balance tasks, got %d", balanceCount)
  169. }
  170. }
  171. func TestCanExecuteTaskType(t *testing.T) {
  172. // Test the fallback logic helper function
  173. mq := &MaintenanceQueue{
  174. tasks: map[string]*MaintenanceTask{
  175. "running-task": {
  176. ID: "running-task",
  177. Type: MaintenanceTaskType("erasure_coding"),
  178. Status: TaskStatusInProgress,
  179. },
  180. },
  181. pendingTasks: []*MaintenanceTask{},
  182. workers: make(map[string]*MaintenanceWorker),
  183. policy: nil, // Will use default max concurrent = 1
  184. integration: nil,
  185. }
  186. // Should return false for EC (1 running, max = 1)
  187. result := mq.canExecuteTaskType(MaintenanceTaskType("erasure_coding"))
  188. if result {
  189. t.Errorf("Expected canExecuteTaskType to return false for EC at capacity, got true")
  190. }
  191. // Should return true for vacuum (0 running, max = 1)
  192. result = mq.canExecuteTaskType(MaintenanceTaskType("vacuum"))
  193. if !result {
  194. t.Errorf("Expected canExecuteTaskType to return true for vacuum, got false")
  195. }
  196. }
  197. func TestGetMaxConcurrentForTaskType_DefaultBehavior(t *testing.T) {
  198. // Test the default behavior when no policy or integration is set
  199. mq := &MaintenanceQueue{
  200. tasks: make(map[string]*MaintenanceTask),
  201. pendingTasks: []*MaintenanceTask{},
  202. workers: make(map[string]*MaintenanceWorker),
  203. policy: nil,
  204. integration: nil,
  205. }
  206. // Should return default value of 1
  207. maxConcurrent := mq.getMaxConcurrentForTaskType(MaintenanceTaskType("erasure_coding"))
  208. if maxConcurrent != 1 {
  209. t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent)
  210. }
  211. maxConcurrent = mq.getMaxConcurrentForTaskType(MaintenanceTaskType("vacuum"))
  212. if maxConcurrent != 1 {
  213. t.Errorf("Expected default max concurrent to be 1, got %d", maxConcurrent)
  214. }
  215. }
  216. // Test edge cases and error conditions
  217. func TestCanScheduleTaskNow_NilTask(t *testing.T) {
  218. mq := &MaintenanceQueue{
  219. tasks: make(map[string]*MaintenanceTask),
  220. pendingTasks: []*MaintenanceTask{},
  221. workers: make(map[string]*MaintenanceWorker),
  222. policy: nil,
  223. integration: nil,
  224. }
  225. // This should panic with a nil task, so we expect and catch the panic
  226. defer func() {
  227. if r := recover(); r == nil {
  228. t.Errorf("Expected canScheduleTaskNow to panic with nil task, but it didn't")
  229. }
  230. }()
  231. // This should panic
  232. mq.canScheduleTaskNow(nil)
  233. }
  234. func TestCanScheduleTaskNow_EmptyTaskType(t *testing.T) {
  235. mq := &MaintenanceQueue{
  236. tasks: make(map[string]*MaintenanceTask),
  237. pendingTasks: []*MaintenanceTask{},
  238. workers: make(map[string]*MaintenanceWorker),
  239. policy: nil,
  240. integration: nil,
  241. }
  242. task := &MaintenanceTask{
  243. ID: "empty-type-task",
  244. Type: MaintenanceTaskType(""), // Empty task type
  245. Status: TaskStatusPending,
  246. }
  247. // Should handle empty task type gracefully
  248. result := mq.canScheduleTaskNow(task)
  249. if !result {
  250. t.Errorf("Expected canScheduleTaskNow to handle empty task type, got false")
  251. }
  252. }
  253. func TestCanScheduleTaskNow_WithPolicy(t *testing.T) {
  254. // Test with a policy that allows higher concurrency
  255. policy := &MaintenancePolicy{
  256. TaskPolicies: map[string]*worker_pb.TaskPolicy{
  257. string(MaintenanceTaskType("erasure_coding")): {
  258. Enabled: true,
  259. MaxConcurrent: 3,
  260. RepeatIntervalSeconds: 60 * 60, // 1 hour
  261. CheckIntervalSeconds: 60 * 60, // 1 hour
  262. },
  263. string(MaintenanceTaskType("vacuum")): {
  264. Enabled: true,
  265. MaxConcurrent: 2,
  266. RepeatIntervalSeconds: 60 * 60, // 1 hour
  267. CheckIntervalSeconds: 60 * 60, // 1 hour
  268. },
  269. },
  270. GlobalMaxConcurrent: 10,
  271. DefaultRepeatIntervalSeconds: 24 * 60 * 60, // 24 hours in seconds
  272. DefaultCheckIntervalSeconds: 60 * 60, // 1 hour in seconds
  273. }
  274. mq := &MaintenanceQueue{
  275. tasks: map[string]*MaintenanceTask{
  276. "running-task-1": {
  277. ID: "running-task-1",
  278. Type: MaintenanceTaskType("erasure_coding"),
  279. Status: TaskStatusInProgress,
  280. },
  281. "running-task-2": {
  282. ID: "running-task-2",
  283. Type: MaintenanceTaskType("erasure_coding"),
  284. Status: TaskStatusAssigned,
  285. },
  286. },
  287. pendingTasks: []*MaintenanceTask{},
  288. workers: make(map[string]*MaintenanceWorker),
  289. policy: policy,
  290. integration: nil,
  291. }
  292. task := &MaintenanceTask{
  293. ID: "test-task-policy",
  294. Type: MaintenanceTaskType("erasure_coding"),
  295. Status: TaskStatusPending,
  296. }
  297. // Should return true because we have 2 running EC tasks but max is 3
  298. result := mq.canScheduleTaskNow(task)
  299. if !result {
  300. t.Errorf("Expected canScheduleTaskNow to return true with policy allowing 3 concurrent, got false")
  301. }
  302. // Add one more running task to reach the limit
  303. mq.tasks["running-task-3"] = &MaintenanceTask{
  304. ID: "running-task-3",
  305. Type: MaintenanceTaskType("erasure_coding"),
  306. Status: TaskStatusInProgress,
  307. }
  308. // Should return false because we now have 3 running EC tasks (at limit)
  309. result = mq.canScheduleTaskNow(task)
  310. if result {
  311. t.Errorf("Expected canScheduleTaskNow to return false when at policy limit, got true")
  312. }
  313. }